123456789101112131415161718192021222324252627282930313233343536373839 |
- #coding utf-8
- import threading
- from concurrent.futures import ThreadPoolExecutor
- from queue import Queue
- from tqdm import tqdm
- def worker(queue, executor):
- while True:
- row = queue.get()
- if row is None: # 结束信号
- queue.task_done()
- break
- executor(row)
- queue.task_done()
- def records_process(records, executor, max_size=50, num_workers=10):
- # 创建一个线程安全的队列
- queue = Queue(maxsize=max_size) # 可以调整 maxsize 以控制内存使用
- # 设置线程池大小
- num_workers = num_workers
- # 启动工作线程
- threads = []
- for _ in range(num_workers):
- t = threading.Thread(target=worker, args=(queue, executor))
- t.start()
- threads.append(t)
- # 读取数据并放入队列
- with records.open_reader() as reader:
- for row in tqdm(reader):
- queue.put(row)
- # 发送结束信号
- for _ in range(num_workers):
- queue.put(None)
- # 等待所有任务完成
- queue.join()
- # 等待所有工作线程结束
- for t in threads:
- t.join()
|