job_top_data.py 3.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. import json
  2. import os
  3. import time
  4. import uuid
  5. from datetime import datetime
  6. import schedule
  7. from loguru import logger
  8. from common.aliyun_log import AliyunLogger
  9. from common.feishu_utils import Feishu
  10. from common.redis import get_top_data, in_job_video_data
  11. from top_automatic.top_data_processing import Top
  12. def get_data_task():
  13. top_tasks = [] # 使用集合去重
  14. for i in range(100):
  15. top_task = get_top_data("task:top_all_data")
  16. if top_task:
  17. time.sleep(1)
  18. data = json.loads(top_task)
  19. channel_id = data['channel']
  20. if channel_id not in ["抖音关键词抓取", "快手关键词抓取", "搬运改造", "搬运工具"]:
  21. if channel_id:
  22. AliyunLogger.logging(channel_id, data, "不处理","fail")
  23. else:
  24. AliyunLogger.logging("空", data, "不处理","fail")
  25. current_time = datetime.now()
  26. formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S")
  27. values = [
  28. [
  29. data['uid'],
  30. data['videoid'],
  31. data['return_uv'],
  32. data['type'],
  33. data['type_owner'],
  34. data['channel'],
  35. data['channel_owner'],
  36. data['title'],
  37. data['dt'],
  38. "",
  39. "",
  40. formatted_time
  41. ]
  42. ]
  43. Feishu.insert_columns("KUIksoqZkhvZOrtqA1McPwObn7d", "57c076", "ROWS", 1, 2)
  44. time.sleep(0.5)
  45. Feishu.update_values("KUIksoqZkhvZOrtqA1McPwObn7d", "57c076", "A2:Z2", values)
  46. logger.info(f"[+] 成功写入飞书表格")
  47. logger.info(f"[+] 改内容为:{channel_id},不做处理")
  48. continue
  49. else:
  50. AliyunLogger.logging(channel_id, data, "等待处理", "success")
  51. top_tasks.append(top_task)
  52. return top_tasks
  53. def video_task_start():
  54. logger.info(f"[+] 任务开始获取小时级top数据")
  55. data_list = get_data_task()
  56. if data_list:
  57. logger.info(f"[+] 共获取{len(data_list)}条")
  58. else:
  59. logger.info(f"[+] 共获取0条")
  60. if not data_list:
  61. return
  62. for data in data_list:
  63. try:
  64. logger.info(f"[+] 任务处理{data}任务")
  65. top = Top()
  66. top.main(data)
  67. logger.info(f"[+] {data}处理成功")
  68. time.sleep(5)
  69. continue
  70. except Exception as e:
  71. data = json.loads(data)
  72. in_job_video_data("task:top_all_data", json.dumps(data, ensure_ascii=False, indent=4))
  73. AliyunLogger.logging(data['channel'], data, "处理失败重新处理","fail")
  74. logger.error(f"[+] {data}处理失败,失败信息{e}")
  75. continue
  76. def schedule_tasks():
  77. schedule.every(20).minutes.do(video_task_start)
  78. if __name__ == '__main__':
  79. video_task_start()
  80. schedule_tasks() # 调用任务调度函数
  81. while True:
  82. schedule.run_pending()
  83. time.sleep(1) # 每秒钟检查一次