# -*- coding: utf-8 -*- from common import Material from extract_data.douyin.douyin_author import douyinAuthor from extract_data.kuaishou.kuaishou_author import kuaishouAuthor from extract_data.zhannei.zhannei_author import ZhanNeiAuthor import schedule import time import concurrent.futures import threading import os # 控制读写速度的参数 MAX_BPS = 10 * 1024 * 1024 # 120MB/s MAX_WORKERS = os.cpu_count() * 2 # 线程池最大工作线程数量 READ_WRITE_CHUNK_SIZE = 1024 * 1024 # 每次读写的块大小 (1MB) SLEEP_INTERVAL = READ_WRITE_CHUNK_SIZE / MAX_BPS # 控制每次读写的延迟时间 # 全局锁,用于同步读写操作 lock = threading.Lock() def douyin_start(user_data): print(f"执行抖音数据抓取{user_data}") douyinAuthor.get_videoList(user_data) def kuaishou_start(user_data): print(f"执行快手数据抓取{user_data}") kuaishouAuthor.get_kuaishou_videoList(user_data) def zhannei_start(user_data): print(f"执行站内数据抓取{user_data}") ZhanNeiAuthor.get_zhannei_videoList(user_data) # data = Material.get_all_gs_user("douyin") # douyin_start(data[0]) # 定义定时任务 def zhannei_task(): data = Material.get_all_gs_user("zhannei") # 创建一个线程池 valid_data = [user_data for user_data in data if user_data['sheet'] is None] with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: with lock: start_time = time.time() time.sleep(SLEEP_INTERVAL) end_time = time.time() elapsed_time = end_time - start_time if elapsed_time < SLEEP_INTERVAL: time.sleep(SLEEP_INTERVAL - elapsed_time) futures = [executor.submit(zhannei_start, user_data) for user_data in valid_data] # 等待所有任务执行完成 for future in concurrent.futures.as_completed(futures): # 获取每个任务的执行结果 result = future.result() print("处理结果:", result) print("站内数据抓取定时任务执行完成") # 定义定时任务 def douyin_task(): data = Material.get_all_gs_user("douyin") # 创建一个线程池 valid_data = [user_data for user_data in data if user_data['sheet'] is not None] with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: with lock: start_time = time.time() time.sleep(SLEEP_INTERVAL) end_time = time.time() elapsed_time = end_time - start_time if elapsed_time < SLEEP_INTERVAL: time.sleep(SLEEP_INTERVAL - elapsed_time) futures = {executor.submit(douyin_start, user_data): user_data for user_data in valid_data} # 等待所有任务执行完成 for future in concurrent.futures.as_completed(futures): # 获取每个任务的执行结果 result = future.result() print("处理结果:", result) print("抖音数据抓取定时任务执行完成") # 定义定时任务 def kuanshou_task(): data = Material.get_all_gs_user("kuaishou") # 创建一个线程池 valid_data = [user_data for user_data in data if user_data['sheet'] is not None] with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: with lock: start_time = time.time() time.sleep(SLEEP_INTERVAL) end_time = time.time() elapsed_time = end_time - start_time if elapsed_time < SLEEP_INTERVAL: time.sleep(SLEEP_INTERVAL - elapsed_time) futures = {executor.submit(kuaishou_start, user_data): user_data for user_data in valid_data} # 等待所有任务执行完成 for future in concurrent.futures.as_completed(futures): # 获取每个任务的执行结果 result = future.result() print("处理结果:", result) print("快手数据抓取定时任务执行完成.") schedule.every().day.at("19:20").do(kuanshou_task) schedule.every().day.at("19:30").do(douyin_task) schedule.every().day.at("18:00").do(zhannei_task) kuanshou_task() douyin_task() zhannei_task() # 持续运行,直到手动终止 while True: schedule.run_pending() time.sleep(1)