job_data.py 3.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. import json
  2. import os
  3. import time
  4. import uuid
  5. from datetime import datetime
  6. import schedule
  7. from loguru import logger
  8. from common.aliyun_log import AliyunLogger
  9. from common.feishu_utils import Feishu
  10. from common.redis import get_top_data, in_job_video_data
  11. from top_automatic.top_data_processing import Top
  12. def get_data_task():
  13. top_tasks = set() # 使用集合去重
  14. for i in range(100):
  15. top_task = get_top_data("task:top_all_data")
  16. if top_task:
  17. data = json.loads(top_task)
  18. channel_id = data['channel']
  19. if channel_id not in ["抖音关键词抓取", "快手关键词抓取", "搬运改造", "搬运工具"]:
  20. current_time = datetime.now()
  21. formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S")
  22. values = [
  23. [
  24. data['uid'],
  25. data['videoid'],
  26. data['return_uv'],
  27. data['type'],
  28. data['type_owner'],
  29. data['channel'],
  30. data['channel_owner'],
  31. data['title'],
  32. data['dt'],
  33. "",
  34. "",
  35. formatted_time
  36. ]
  37. ]
  38. Feishu.insert_columns("KUIksoqZkhvZOrtqA1McPwObn7d", "57c076", "ROWS", 1, 2)
  39. time.sleep(0.5)
  40. Feishu.update_values("KUIksoqZkhvZOrtqA1McPwObn7d", "57c076", "A2:Z2", values)
  41. logger.info(f"[+] 成功写入飞书表格")
  42. logger.info(f"[+] 改内容为:{channel_id},不做处理")
  43. if channel_id:
  44. AliyunLogger.logging(channel_id, data, "不处理","不处理")
  45. continue
  46. top_tasks.add(top_task)
  47. else:
  48. return list(top_tasks)
  49. def video_task_start():
  50. logger.info(f"[+] 任务开始获取小时级top数据")
  51. data_list = get_data_task()
  52. logger.info(f"[+] 共获取{len(data_list)}条")
  53. if not data_list:
  54. return
  55. for data in data_list:
  56. try:
  57. logger.info(f"[+] 任务处理{data}任务")
  58. top = Top()
  59. top.main(data)
  60. logger.info(f"[+] {data}处理成功")
  61. time.sleep(5)
  62. continue
  63. except Exception as e:
  64. data = json.loads(data)
  65. in_job_video_data("task:top_all_data", json.dumps(data, ensure_ascii=False, indent=4))
  66. AliyunLogger.logging(data['channel'], data, "处理失败重新处理","处理失败重新处理")
  67. logger.error(f"[+] {data}处理失败,失败信息{e}")
  68. continue
  69. def schedule_tasks():
  70. schedule.every(10).minutes.do(video_task_start)
  71. if __name__ == '__main__':
  72. # schedule_tasks() # 调用任务调度函数
  73. # while True:
  74. # schedule.run_pending()
  75. # time.sleep(1) # 每秒钟检查一次
  76. video_task_start()