agc_data.py 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687
  1. import os
  2. from extract_data.douyin.douyin_author import douyinAuthor
  3. from extract_data.kuaishou.kuaishou_author import kuaishouAuthor
  4. from extract_data.zhannei.zhannei_author import ZhanNeiAuthor
  5. import schedule
  6. import concurrent.futures
  7. import time
  8. import threading
  9. from common import Material
  10. # 控制读写速度的参数
  11. MAX_BPS = 120 * 1024 * 1024 # 120MB/s
  12. MAX_WORKERS = os.cpu_count() * 2 # 线程池最大工作线程数量
  13. READ_WRITE_CHUNK_SIZE = 1024 * 1024 # 每次读写的块大小 (1MB)
  14. SLEEP_INTERVAL = READ_WRITE_CHUNK_SIZE / MAX_BPS # 控制每次读写的延迟时间
  15. # 全局锁,用于同步读写操作
  16. lock = threading.Lock()
  17. def gs_start(platform, user_data):
  18. print(f"执行{platform}数据抓取{user_data}")
  19. if platform == "douyin":
  20. douyinAuthor.get_videoList(user_data)
  21. elif platform == "kuaishou":
  22. kuaishouAuthor.get_kuaishou_videoList(user_data)
  23. elif platform == "zhannei":
  24. ZhanNeiAuthor.get_zhannei_videoList(user_data)
  25. def gs_task(platform):
  26. data = Material.get_all_gs_user(platform)
  27. valid_data = [user_data for user_data in data if user_data['sheet'] is not None]
  28. with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
  29. futures = {executor.submit(gs_operation, platform, user_data): user_data for user_data in valid_data}
  30. for future in concurrent.futures.as_completed(futures):
  31. result = future.result()
  32. print("处理结果:", result)
  33. print(f"{platform.capitalize()}数据抓取定时任务执行完成.")
  34. def gs_operation(platform, data):
  35. with lock:
  36. start_time = time.time()
  37. time.sleep(SLEEP_INTERVAL)
  38. end_time = time.time()
  39. elapsed_time = end_time - start_time
  40. if elapsed_time < SLEEP_INTERVAL:
  41. time.sleep(SLEEP_INTERVAL - elapsed_time)
  42. gs_start(platform, data)
  43. def cg_start(platform, user_data):
  44. print(f"执行{platform}数据抓取{user_data}")
  45. if platform == "douyin":
  46. douyinAuthor.get_videoList(user_data)
  47. elif platform == "kuaishou":
  48. kuaishouAuthor.get_kuaishou_videoList(user_data)
  49. def cg_task(platform):
  50. data = Material.get_all_user(platform)
  51. with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
  52. futures = {executor.submit(cg_operation, platform, user_data): user_data for user_data in data}
  53. for future in concurrent.futures.as_completed(futures):
  54. result = future.result()
  55. print("处理结果:", result)
  56. print(f"{platform.capitalize()}数据抓取定时任务执行完成.")
  57. def cg_operation(platform, data):
  58. with lock:
  59. start_time = time.time()
  60. time.sleep(SLEEP_INTERVAL)
  61. end_time = time.time()
  62. elapsed_time = end_time - start_time
  63. if elapsed_time < SLEEP_INTERVAL:
  64. time.sleep(SLEEP_INTERVAL - elapsed_time)
  65. cg_start(platform, data)
  66. schedule.every().day.at("19:20").do(gs_task, "kuaishou")
  67. schedule.every().day.at("19:30").do(gs_task, "douyin")
  68. schedule.every().day.at("18:00").do(gs_task, "zhannei")
  69. schedule.every(4).hours.do(cg_task, "douyin")
  70. schedule.every(4).hours.do(cg_task, "kuaishou")
  71. while True:
  72. schedule.run_pending()
  73. time.sleep(1)