瀏覽代碼

数据抓取启动脚步代码优化

zhangyong 1 年之前
父節點
當前提交
f576c90bb1
共有 1 個文件被更改,包括 38 次插入7 次删除
  1. 38 7
      agc_data.py

+ 38 - 7
agc_data.py

@@ -1,12 +1,21 @@
-from common import Material
+import os
+
 from extract_data.douyin.douyin_author import douyinAuthor
 from extract_data.kuaishou.kuaishou_author import kuaishouAuthor
 from extract_data.zhannei.zhannei_author import ZhanNeiAuthor
 
 import schedule
-import time
 import concurrent.futures
-
+import time
+import threading
+from common import Material
+# 控制读写速度的参数
+MAX_BPS = 120 * 1024 * 1024  # 120MB/s
+MAX_WORKERS = os.cpu_count() * 2  # 线程池最大工作线程数量
+READ_WRITE_CHUNK_SIZE = 1024 * 1024  # 每次读写的块大小 (1MB)
+SLEEP_INTERVAL = READ_WRITE_CHUNK_SIZE / MAX_BPS  # 控制每次读写的延迟时间
+# 全局锁,用于同步读写操作
+lock = threading.Lock()
 
 def gs_start(platform, user_data):
     print(f"执行{platform}数据抓取{user_data}")
@@ -21,13 +30,25 @@ def gs_start(platform, user_data):
 def gs_task(platform):
     data = Material.get_all_gs_user(platform)
     valid_data = [user_data for user_data in data if user_data['sheet'] is not None]
-    with concurrent.futures.ThreadPoolExecutor() as executor:
-        futures = {executor.submit(gs_start, platform, user_data): user_data for user_data in valid_data}
+    with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
+        futures = {executor.submit(gs_operation, platform, user_data): user_data for user_data in valid_data}
         for future in concurrent.futures.as_completed(futures):
             result = future.result()
             print("处理结果:", result)
     print(f"{platform.capitalize()}数据抓取定时任务执行完成.")
 
+
+def gs_operation(platform, data):
+    with lock:
+        start_time = time.time()
+        time.sleep(SLEEP_INTERVAL)
+        end_time = time.time()
+        elapsed_time = end_time - start_time
+        if elapsed_time < SLEEP_INTERVAL:
+            time.sleep(SLEEP_INTERVAL - elapsed_time)
+        gs_start(platform, data)
+
+
 def cg_start(platform, user_data):
     print(f"执行{platform}数据抓取{user_data}")
     if platform == "douyin":
@@ -37,13 +58,23 @@ def cg_start(platform, user_data):
 
 def cg_task(platform):
     data = Material.get_all_user(platform)
-    with concurrent.futures.ThreadPoolExecutor() as executor:
-        futures = {executor.submit(cg_start, platform, user_data): user_data for user_data in data}
+    with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
+        futures = {executor.submit(cg_operation, platform, user_data): user_data for user_data in data}
         for future in concurrent.futures.as_completed(futures):
             result = future.result()
             print("处理结果:", result)
     print(f"{platform.capitalize()}数据抓取定时任务执行完成.")
 
+def cg_operation(platform, data):
+    with lock:
+        start_time = time.time()
+        time.sleep(SLEEP_INTERVAL)
+        end_time = time.time()
+        elapsed_time = end_time - start_time
+        if elapsed_time < SLEEP_INTERVAL:
+            time.sleep(SLEEP_INTERVAL - elapsed_time)
+        cg_start(platform, data)
+
 
 
 schedule.every().day.at("19:20").do(gs_task, "kuaishou")