job_top_data.py 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. import json
  2. import os
  3. import time
  4. import uuid
  5. from datetime import datetime
  6. import schedule
  7. from loguru import logger
  8. from common.aliyun_log import AliyunLogger
  9. from common.feishu_utils import Feishu
  10. from common.redis import get_top_data, in_job_video_data
  11. from top_automatic.top_data_processing import Top
  12. def get_data_task():
  13. top_tasks = [] # 使用集合去重
  14. for i in range(120):
  15. top_task = get_top_data("task:top_all_data")
  16. logger.info(f"[+] 获取第{i}个")
  17. current_time = datetime.now()
  18. formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S")
  19. if top_task:
  20. time.sleep(1)
  21. data = json.loads(top_task)
  22. channel_id = data['channel']
  23. if channel_id not in ["抖音关键词抓取", "快手关键词抓取", "搬运改造", "搬运工具"]:
  24. if channel_id:
  25. AliyunLogger.logging(channel_id, data, "不处理","fail")
  26. else:
  27. AliyunLogger.logging("空", data, "不处理","fail")
  28. values = [
  29. [
  30. data['uid'],
  31. data['videoid'],
  32. data['return_uv'],
  33. data['type'],
  34. data['type_owner'],
  35. data['channel'],
  36. data['channel_owner'],
  37. data['title'],
  38. data['dt'],
  39. "",
  40. "",
  41. formatted_time
  42. ]
  43. ]
  44. Feishu.insert_columns("KUIksoqZkhvZOrtqA1McPwObn7d", "p6PfEG", "ROWS", 1, 2)
  45. time.sleep(0.5)
  46. Feishu.update_values("KUIksoqZkhvZOrtqA1McPwObn7d", "p6PfEG", "A2:Z2", values)
  47. logger.info(f"[+] 成功写入飞书表格")
  48. logger.info(f"[+] 改内容为:{channel_id},不做处理")
  49. continue
  50. else:
  51. top_tasks.append(top_task)
  52. AliyunLogger.logging(channel_id, data, "等待处理", "success")
  53. # values = [
  54. # [
  55. # data['uid'],
  56. # data['videoid'],
  57. # data['return_uv'],
  58. # data['type'],
  59. # data['type_owner'],
  60. # data['channel'],
  61. # data['channel_owner'],
  62. # data['title'],
  63. # data['dt'],
  64. # "",
  65. # "",
  66. # formatted_time
  67. # ]
  68. # ]
  69. # Feishu.insert_columns("KUIksoqZkhvZOrtqA1McPwObn7d", "57c076", "ROWS", 1, 2)
  70. # time.sleep(0.5)
  71. # Feishu.update_values("KUIksoqZkhvZOrtqA1McPwObn7d", "57c076", "A2:Z2", values)
  72. logger.info(f"[+] 成功写入飞书表格")
  73. else:
  74. return top_tasks
  75. return top_tasks
  76. def video_task_start():
  77. logger.info(f"[+] 任务开始获取小时级top数据")
  78. data_list = get_data_task()
  79. if data_list:
  80. logger.info(f"[+] 共获取{len(data_list)}条")
  81. else:
  82. logger.info(f"[+] 共获取0条")
  83. if not data_list:
  84. return
  85. for data in data_list:
  86. try:
  87. logger.info(f"[+] 任务处理{data}任务")
  88. top = Top()
  89. top.main(data)
  90. logger.info(f"[+] {data}处理成功")
  91. time.sleep(5)
  92. continue
  93. except Exception as e:
  94. data = json.loads(data)
  95. in_job_video_data("task:top_all_data", json.dumps(data, ensure_ascii=False, indent=4))
  96. AliyunLogger.logging(data['channel'], data, "处理失败重新处理","fail")
  97. logger.error(f"[+] {data}处理失败,失败信息{e}")
  98. continue
  99. def schedule_tasks():
  100. schedule.every(20).minutes.do(video_task_start)
  101. if __name__ == '__main__':
  102. video_task_start()
  103. schedule_tasks() # 调用任务调度函数
  104. while True:
  105. schedule.run_pending()
  106. time.sleep(1) # 每秒钟检查一次