#coding utf-8 import threading from concurrent.futures import ThreadPoolExecutor from queue import Queue from tqdm import tqdm def worker(queue, executor): while True: row = queue.get() if row is None: # 结束信号 queue.task_done() break executor(row) queue.task_done() def records_process(records, executor, max_size=50, num_workers=10): # 创建一个线程安全的队列 queue = Queue(maxsize=max_size) # 可以调整 maxsize 以控制内存使用 # 设置线程池大小 num_workers = num_workers # 启动工作线程 threads = [] for _ in range(num_workers): t = threading.Thread(target=worker, args=(queue, executor)) t.start() threads.append(t) # 读取数据并放入队列 with records.open_reader() as reader: for row in tqdm(reader): queue.put(row) # 发送结束信号 for _ in range(num_workers): queue.put(None) # 等待所有任务完成 queue.join() # 等待所有工作线程结束 for t in threads: t.join()