123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116 |
- import json
- import os
- import time
- import uuid
- from datetime import datetime
- import schedule
- from loguru import logger
- from common.aliyun_log import AliyunLogger
- from common.feishu_utils import Feishu
- from common.redis import get_top_data, in_job_video_data
- from top_automatic.top_data_processing import Top
- def get_data_task():
- top_tasks = [] # 使用集合去重
- for i in range(120):
- top_task = get_top_data("task:top_all_data")
- logger.info(f"[+] 获取第{i}个")
- current_time = datetime.now()
- formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S")
- if top_task:
- time.sleep(1)
- data = json.loads(top_task)
- channel_id = data['channel']
- if channel_id not in ["抖音关键词抓取", "快手关键词抓取", "搬运改造", "搬运工具"]:
- if channel_id:
- AliyunLogger.logging(channel_id, data, "不处理","fail")
- else:
- AliyunLogger.logging("空", data, "不处理","fail")
- values = [
- [
- data['uid'],
- data['videoid'],
- data['return_uv'],
- data['type'],
- data['type_owner'],
- data['channel'],
- data['channel_owner'],
- data['title'],
- data['dt'],
- "",
- "",
- formatted_time
- ]
- ]
- Feishu.insert_columns("KUIksoqZkhvZOrtqA1McPwObn7d", "p6PfEG", "ROWS", 1, 2)
- time.sleep(0.5)
- Feishu.update_values("KUIksoqZkhvZOrtqA1McPwObn7d", "p6PfEG", "A2:Z2", values)
- logger.info(f"[+] 成功写入飞书表格")
- logger.info(f"[+] 改内容为:{channel_id},不做处理")
- continue
- else:
- top_tasks.append(top_task)
- AliyunLogger.logging(channel_id, data, "等待处理", "success")
- # values = [
- # [
- # data['uid'],
- # data['videoid'],
- # data['return_uv'],
- # data['type'],
- # data['type_owner'],
- # data['channel'],
- # data['channel_owner'],
- # data['title'],
- # data['dt'],
- # "",
- # "",
- # formatted_time
- # ]
- # ]
- # Feishu.insert_columns("KUIksoqZkhvZOrtqA1McPwObn7d", "57c076", "ROWS", 1, 2)
- # time.sleep(0.5)
- # Feishu.update_values("KUIksoqZkhvZOrtqA1McPwObn7d", "57c076", "A2:Z2", values)
- logger.info(f"[+] 成功写入飞书表格")
- else:
- return top_tasks
- return top_tasks
- def video_task_start():
- logger.info(f"[+] 任务开始获取小时级top数据")
- data_list = get_data_task()
- if data_list:
- logger.info(f"[+] 共获取{len(data_list)}条")
- else:
- logger.info(f"[+] 共获取0条")
- if not data_list:
- return
- for data in data_list:
- try:
- logger.info(f"[+] 任务处理{data}任务")
- top = Top()
- top.main(data)
- logger.info(f"[+] {data}处理成功")
- time.sleep(5)
- continue
- except Exception as e:
- data = json.loads(data)
- in_job_video_data("task:top_all_data", json.dumps(data, ensure_ascii=False, indent=4))
- AliyunLogger.logging(data['channel'], data, "处理失败重新处理","fail")
- logger.error(f"[+] {data}处理失败,失败信息{e}")
- continue
- def schedule_tasks():
- schedule.every(20).minutes.do(video_task_start)
- if __name__ == '__main__':
- video_task_start()
- schedule_tasks() # 调用任务调度函数
- while True:
- schedule.run_pending()
- time.sleep(1) # 每秒钟检查一次
|