#coding utf-8
import threading
from concurrent.futures import ThreadPoolExecutor
from queue import Queue
from tqdm import tqdm
def worker(queue, executor):
while True:
row = queue.get()
if row is None: # 结束信号
queue.task_done()
break
executor(row)
queue.task_done()
def records_process(records, executor, max_size=50, num_workers=10):
# 创建一个线程安全的队列
queue = Queue(maxsize=max_size) # 可以调整 maxsize 以控制内存使用
# 设置线程池大小
num_workers = num_workers
# 启动工作线程
threads = []
for _ in range(num_workers):
t = threading.Thread(target=worker, args=(queue, executor))
t.start()
threads.append(t)
# 读取数据并放入队列
with records.open_reader() as reader:
for row in tqdm(reader):
queue.put(row)
# 发送结束信号
for _ in range(num_workers):
queue.put(None)
# 等待所有任务完成
queue.join()
# 等待所有工作线程结束
for t in threads:
t.join()