records_process.py 1.1 KB

123456789101112131415161718192021222324252627282930313233343536373839
  1. #coding utf-8
  2. import threading
  3. from concurrent.futures import ThreadPoolExecutor
  4. from queue import Queue
  5. from tqdm import tqdm
  6. def worker(queue, executor):
  7. while True:
  8. row = queue.get()
  9. if row is None: # 结束信号
  10. queue.task_done()
  11. break
  12. executor(row)
  13. queue.task_done()
  14. def records_process(records, executor, max_size=50, num_workers=10):
  15. # 创建一个线程安全的队列
  16. queue = Queue(maxsize=max_size) # 可以调整 maxsize 以控制内存使用
  17. # 设置线程池大小
  18. num_workers = num_workers
  19. # 启动工作线程
  20. threads = []
  21. for _ in range(num_workers):
  22. t = threading.Thread(target=worker, args=(queue, executor))
  23. t.start()
  24. threads.append(t)
  25. # 读取数据并放入队列
  26. with records.open_reader() as reader:
  27. for row in tqdm(reader):
  28. queue.put(row)
  29. # 发送结束信号
  30. for _ in range(num_workers):
  31. queue.put(None)
  32. # 等待所有任务完成
  33. queue.join()
  34. # 等待所有工作线程结束
  35. for t in threads:
  36. t.join()