|
@@ -1,87 +0,0 @@
|
|
|
-import os
|
|
|
-
|
|
|
-from extract_data.douyin.douyin_author import douyinAuthor
|
|
|
-from extract_data.kuaishou.kuaishou_author import kuaishouAuthor
|
|
|
-from extract_data.zhannei.zhannei_author import ZhanNeiAuthor
|
|
|
-
|
|
|
-import schedule
|
|
|
-import concurrent.futures
|
|
|
-import time
|
|
|
-import threading
|
|
|
-from common import Material
|
|
|
-# 控制读写速度的参数
|
|
|
-MAX_BPS = 120 * 1024 * 1024 # 120MB/s
|
|
|
-MAX_WORKERS = os.cpu_count() * 2 # 线程池最大工作线程数量
|
|
|
-READ_WRITE_CHUNK_SIZE = 1024 * 1024 # 每次读写的块大小 (1MB)
|
|
|
-SLEEP_INTERVAL = READ_WRITE_CHUNK_SIZE / MAX_BPS # 控制每次读写的延迟时间
|
|
|
-# 全局锁,用于同步读写操作
|
|
|
-lock = threading.Lock()
|
|
|
-
|
|
|
-def gs_start(platform, user_data):
|
|
|
- print(f"执行{platform}数据抓取{user_data}")
|
|
|
- if platform == "douyin":
|
|
|
- douyinAuthor.get_videoList(user_data)
|
|
|
- elif platform == "kuaishou":
|
|
|
- kuaishouAuthor.get_kuaishou_videoList(user_data)
|
|
|
- elif platform == "zhannei":
|
|
|
- ZhanNeiAuthor.get_zhannei_videoList(user_data)
|
|
|
-
|
|
|
-
|
|
|
-def gs_task(platform):
|
|
|
- data = Material.get_all_gs_user(platform)
|
|
|
- valid_data = [user_data for user_data in data if user_data['sheet'] is not None]
|
|
|
- with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
|
|
|
- futures = {executor.submit(gs_operation, platform, user_data): user_data for user_data in valid_data}
|
|
|
- for future in concurrent.futures.as_completed(futures):
|
|
|
- result = future.result()
|
|
|
- print("处理结果:", result)
|
|
|
- print(f"{platform.capitalize()}数据抓取定时任务执行完成.")
|
|
|
-
|
|
|
-
|
|
|
-def gs_operation(platform, data):
|
|
|
- with lock:
|
|
|
- start_time = time.time()
|
|
|
- time.sleep(SLEEP_INTERVAL)
|
|
|
- end_time = time.time()
|
|
|
- elapsed_time = end_time - start_time
|
|
|
- if elapsed_time < SLEEP_INTERVAL:
|
|
|
- time.sleep(SLEEP_INTERVAL - elapsed_time)
|
|
|
- gs_start(platform, data)
|
|
|
-
|
|
|
-
|
|
|
-def cg_start(platform, user_data):
|
|
|
- print(f"执行{platform}数据抓取{user_data}")
|
|
|
- if platform == "douyin":
|
|
|
- douyinAuthor.get_videoList(user_data)
|
|
|
- elif platform == "kuaishou":
|
|
|
- kuaishouAuthor.get_kuaishou_videoList(user_data)
|
|
|
-
|
|
|
-def cg_task(platform):
|
|
|
- data = Material.get_all_user(platform)
|
|
|
- with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
|
|
|
- futures = {executor.submit(cg_operation, platform, user_data): user_data for user_data in data}
|
|
|
- for future in concurrent.futures.as_completed(futures):
|
|
|
- result = future.result()
|
|
|
- print("处理结果:", result)
|
|
|
- print(f"{platform.capitalize()}数据抓取定时任务执行完成.")
|
|
|
-
|
|
|
-def cg_operation(platform, data):
|
|
|
- with lock:
|
|
|
- start_time = time.time()
|
|
|
- time.sleep(SLEEP_INTERVAL)
|
|
|
- end_time = time.time()
|
|
|
- elapsed_time = end_time - start_time
|
|
|
- if elapsed_time < SLEEP_INTERVAL:
|
|
|
- time.sleep(SLEEP_INTERVAL - elapsed_time)
|
|
|
- cg_start(platform, data)
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-schedule.every().day.at("19:20").do(gs_task, "kuaishou")
|
|
|
-schedule.every().day.at("19:30").do(gs_task, "douyin")
|
|
|
-schedule.every().day.at("18:00").do(gs_task, "zhannei")
|
|
|
-schedule.every(4).hours.do(cg_task, "douyin")
|
|
|
-schedule.every(4).hours.do(cg_task, "kuaishou")
|
|
|
-while True:
|
|
|
- schedule.run_pending()
|
|
|
- time.sleep(1)
|