Browse Source

抓取任务定时脚本优化

zhangyong 1 year ago
parent
commit
a5c63987a1
2 changed files with 61 additions and 8 deletions
  1. 33 3
      data_assign_main.py
  2. 28 5
      data_main.py

+ 33 - 3
data_assign_main.py

@@ -6,7 +6,16 @@ from extract_data.zhannei.zhannei_author import ZhanNeiAuthor
 import schedule
 import time
 import concurrent.futures
+import threading
+import os
 
+# 控制读写速度的参数
+MAX_BPS = 120 * 1024 * 1024  # 120MB/s
+MAX_WORKERS = os.cpu_count() * 2  # 线程池最大工作线程数量
+READ_WRITE_CHUNK_SIZE = 1024 * 1024  # 每次读写的块大小 (1MB)
+SLEEP_INTERVAL = READ_WRITE_CHUNK_SIZE / MAX_BPS  # 控制每次读写的延迟时间
+# 全局锁,用于同步读写操作
+lock = threading.Lock()
 
 def douyin_start(user_data):
     print(f"执行抖音数据抓取{user_data}")
@@ -29,7 +38,14 @@ def zhannei_task():
     data = Material.get_all_gs_user("zhannei")
     # 创建一个线程池
     valid_data = [user_data for user_data in data if user_data['sheet'] is None]
-    with concurrent.futures.ThreadPoolExecutor() as executor:
+    with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
+        with lock:
+            start_time = time.time()
+            time.sleep(SLEEP_INTERVAL)
+            end_time = time.time()
+            elapsed_time = end_time - start_time
+            if elapsed_time < SLEEP_INTERVAL:
+                time.sleep(SLEEP_INTERVAL - elapsed_time)
         futures = [executor.submit(zhannei_start, user_data) for user_data in valid_data]
         # 等待所有任务执行完成
         for future in concurrent.futures.as_completed(futures):
@@ -44,7 +60,14 @@ def douyin_task():
     data = Material.get_all_gs_user("douyin")
     # 创建一个线程池
     valid_data = [user_data for user_data in data if user_data['sheet'] is not None]
-    with concurrent.futures.ThreadPoolExecutor() as executor:
+    with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
+        with lock:
+            start_time = time.time()
+            time.sleep(SLEEP_INTERVAL)
+            end_time = time.time()
+            elapsed_time = end_time - start_time
+            if elapsed_time < SLEEP_INTERVAL:
+                time.sleep(SLEEP_INTERVAL - elapsed_time)
         futures = {executor.submit(douyin_start, user_data): user_data for user_data in valid_data}
         # 等待所有任务执行完成
         for future in concurrent.futures.as_completed(futures):
@@ -58,7 +81,14 @@ def kuanshou_task():
     data = Material.get_all_gs_user("kuaishou")
     # 创建一个线程池
     valid_data = [user_data for user_data in data if user_data['sheet'] is not None]
-    with concurrent.futures.ThreadPoolExecutor() as executor:
+    with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
+        with lock:
+            start_time = time.time()
+            time.sleep(SLEEP_INTERVAL)
+            end_time = time.time()
+            elapsed_time = end_time - start_time
+            if elapsed_time < SLEEP_INTERVAL:
+                time.sleep(SLEEP_INTERVAL - elapsed_time)
         futures = {executor.submit(kuaishou_start, user_data): user_data for user_data in valid_data}
         # 等待所有任务执行完成
         for future in concurrent.futures.as_completed(futures):

+ 28 - 5
data_main.py

@@ -5,6 +5,16 @@ from extract_data.kuaishou.kuaishou_author import kuaishouAuthor
 import schedule
 import time
 import concurrent.futures
+import threading
+import os
+
+# 控制读写速度的参数
+MAX_BPS = 120 * 1024 * 1024  # 120MB/s
+MAX_WORKERS = os.cpu_count() * 2  # 线程池最大工作线程数量
+READ_WRITE_CHUNK_SIZE = 1024 * 1024  # 每次读写的块大小 (1MB)
+SLEEP_INTERVAL = READ_WRITE_CHUNK_SIZE / MAX_BPS  # 控制每次读写的延迟时间
+# 全局锁,用于同步读写操作
+lock = threading.Lock()
 
 # 定义读取表格的函数
 def douyin_start(user_data):
@@ -19,7 +29,14 @@ def kuaishou_start(user_data):
 def douyin_task():
     data = Material.get_all_user("douyin")
     # 创建一个线程池
-    with concurrent.futures.ThreadPoolExecutor() as executor:
+    with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
+        with lock:
+            start_time = time.time()
+            time.sleep(SLEEP_INTERVAL)
+            end_time = time.time()
+            elapsed_time = end_time - start_time
+            if elapsed_time < SLEEP_INTERVAL:
+                time.sleep(SLEEP_INTERVAL - elapsed_time)
         futures = {executor.submit(douyin_start, user_data): user_data for user_data in data}
         # 等待所有任务执行完成
         for future in concurrent.futures.as_completed(futures):
@@ -33,7 +50,14 @@ def douyin_task():
 def kuanshou_task():
     data = Material.get_all_user("kuaishou")
     # 创建一个线程池
-    with concurrent.futures.ThreadPoolExecutor() as executor:
+    with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
+        with lock:
+            start_time = time.time()
+            time.sleep(SLEEP_INTERVAL)
+            end_time = time.time()
+            elapsed_time = end_time - start_time
+            if elapsed_time < SLEEP_INTERVAL:
+                time.sleep(SLEEP_INTERVAL - elapsed_time)
         futures = {executor.submit(kuaishou_start, user_data): user_data for user_data in data}
         # 等待所有任务执行完成
         for future in concurrent.futures.as_completed(futures):
@@ -43,13 +67,12 @@ def kuanshou_task():
     print("快手数据抓取定时任务执行完成.")
 
 
-# schedule.every(2).minutes.do(douyin_task)
-#
-# schedule.every(2).minutes.do(kuanshou_task)
 schedule.every(8).hours.do(douyin_task)
 
 schedule.every(8).hours.do(kuanshou_task)
 
+kuanshou_task()
+
 # 持续运行,直到手动终止
 while True:
     schedule.run_pending()