123456789101112131415161718192021222324252627282930313233343536373839 |
- import threading
- from concurrent.futures import ThreadPoolExecutor
- from queue import Queue
- from tqdm import tqdm
- def worker(queue, executor):
- while True:
- row = queue.get()
- if row is None:
- queue.task_done()
- break
- executor(row)
- queue.task_done()
- def records_process(records, executor, max_size=50, num_workers=10):
-
- queue = Queue(maxsize=max_size)
-
- num_workers = num_workers
-
- threads = []
- for _ in range(num_workers):
- t = threading.Thread(target=worker, args=(queue, executor))
- t.start()
- threads.append(t)
-
- with records.open_reader() as reader:
- for row in tqdm(reader):
- queue.put(row)
-
- for _ in range(num_workers):
- queue.put(None)
-
- queue.join()
-
- for t in threads:
- t.join()
|