job_data.py 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384
  1. import json
  2. import os
  3. import time
  4. import uuid
  5. from datetime import datetime
  6. import schedule
  7. from loguru import logger
  8. from common.aliyun_log import AliyunLogger
  9. from common.feishu_utils import Feishu
  10. from common.redis import get_top_data, in_job_video_data
  11. from top_automatic.top_data_processing import Top
  12. def get_data_task():
  13. top_tasks = set() # 使用集合去重
  14. for i in range(100):
  15. top_task = get_top_data("task:top_all_data")
  16. if top_task:
  17. time.sleep(1)
  18. data = json.loads(top_task)
  19. channel_id = data['channel']
  20. if channel_id not in ["抖音关键词抓取", "快手关键词抓取", "搬运改造", "搬运工具"]:
  21. current_time = datetime.now()
  22. formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S")
  23. values = [
  24. [
  25. data['uid'],
  26. data['videoid'],
  27. data['return_uv'],
  28. data['type'],
  29. data['type_owner'],
  30. data['channel'],
  31. data['channel_owner'],
  32. data['title'],
  33. data['dt'],
  34. "",
  35. "",
  36. formatted_time
  37. ]
  38. ]
  39. Feishu.insert_columns("KUIksoqZkhvZOrtqA1McPwObn7d", "57c076", "ROWS", 1, 2)
  40. time.sleep(0.5)
  41. Feishu.update_values("KUIksoqZkhvZOrtqA1McPwObn7d", "57c076", "A2:Z2", values)
  42. logger.info(f"[+] 成功写入飞书表格")
  43. logger.info(f"[+] 改内容为:{channel_id},不做处理")
  44. if channel_id:
  45. AliyunLogger.logging(channel_id, data, "不处理","fail")
  46. continue
  47. top_tasks.add(top_task)
  48. else:
  49. return list(top_tasks)
  50. def video_task_start():
  51. logger.info(f"[+] 任务开始获取小时级top数据")
  52. data_list = get_data_task()
  53. logger.info(f"[+] 共获取{len(data_list)}条")
  54. if not data_list:
  55. return
  56. for data in data_list:
  57. try:
  58. logger.info(f"[+] 任务处理{data}任务")
  59. top = Top()
  60. top.main(data)
  61. logger.info(f"[+] {data}处理成功")
  62. time.sleep(5)
  63. continue
  64. except Exception as e:
  65. data = json.loads(data)
  66. in_job_video_data("task:top_all_data", json.dumps(data, ensure_ascii=False, indent=4))
  67. AliyunLogger.logging(data['channel'], data, "处理失败重新处理","fail")
  68. logger.error(f"[+] {data}处理失败,失败信息{e}")
  69. continue
  70. def schedule_tasks():
  71. schedule.every(10).minutes.do(video_task_start)
  72. if __name__ == '__main__':
  73. schedule_tasks() # 调用任务调度函数
  74. while True:
  75. schedule.run_pending()
  76. time.sleep(1) # 每秒钟检查一次
  77. # video_task_start()