zhangbo 1 年之前
父节点
当前提交
2ca65014b9
共有 1 个文件被更改,包括 34 次插入1 次删除
  1. 34 1
      alg_recsys_recall_4h_region_trend.py

+ 34 - 1
alg_recsys_recall_4h_region_trend.py

@@ -7,7 +7,9 @@ from utils import RedisHelper, get_data_from_odps, send_msg_to_feishu
 from config import set_config
 from log import Log
 from records_process import records_process
-
+from queue import Queue
+from tqdm import tqdm
+import threading
 
 config_, _ = set_config()
 log_ = Log()
@@ -17,6 +19,37 @@ log_ = Log()
 region_name2code: dict = config_.REGION_CODE
 redis_helper = RedisHelper()
 
+def worker(queue, executor):
+    while True:
+        row = queue.get()
+        if row is None:  # 结束信号
+            queue.task_done()
+            break
+        executor(row)
+        queue.task_done()
+def records_process_for_list(records, executor, max_size=50, num_workers=10):
+    # 创建一个线程安全的队列
+    queue = Queue(maxsize=max_size)  # 可以调整 maxsize 以控制内存使用
+    # 设置线程池大小
+    num_workers = num_workers
+    # 启动工作线程
+    threads = []
+    for _ in range(num_workers):
+        t = threading.Thread(target=worker, args=(queue, executor))
+        t.start()
+        threads.append(t)
+    # 读取数据并放入队列
+    for row in tqdm(records):
+        queue.put(row)
+    # 发送结束信号
+    for _ in range(num_workers):
+        queue.put(None)
+    # 等待所有任务完成
+    queue.join()
+    # 等待所有工作线程结束
+    for t in threads:
+        t.join()
+
 def check_data(project, table, partition) -> int:
     """检查数据是否准备好,输出数据条数"""
     odps = ODPS(