import json import os import time import uuid from datetime import datetime import schedule from loguru import logger from common.aliyun_log import AliyunLogger from common.feishu_utils import Feishu from common.redis import get_top_data, in_job_video_data from top_automatic.top_data_processing import Top def get_data_task(): top_tasks = [] # 使用集合去重 for i in range(120): top_task = get_top_data("task:top_all_data") logger.info(f"[+] 获取第{i}个") current_time = datetime.now() formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S") if top_task: time.sleep(1) data = json.loads(top_task) channel_id = data['channel'] if channel_id not in ["抖音关键词抓取", "快手关键词抓取", "搬运改造", "搬运工具"]: if channel_id: AliyunLogger.logging(channel_id, data, "不处理","fail") else: AliyunLogger.logging("空", data, "不处理","fail") values = [ [ data['uid'], data['videoid'], data['return_uv'], data['type'], data['type_owner'], data['channel'], data['channel_owner'], data['title'], data['dt'], "", "", formatted_time ] ] Feishu.insert_columns("KUIksoqZkhvZOrtqA1McPwObn7d", "p6PfEG", "ROWS", 1, 2) time.sleep(0.5) Feishu.update_values("KUIksoqZkhvZOrtqA1McPwObn7d", "p6PfEG", "A2:Z2", values) logger.info(f"[+] 成功写入飞书表格") logger.info(f"[+] 改内容为:{channel_id},不做处理") continue else: top_tasks.append(top_task) AliyunLogger.logging(channel_id, data, "等待处理", "success") # values = [ # [ # data['uid'], # data['videoid'], # data['return_uv'], # data['type'], # data['type_owner'], # data['channel'], # data['channel_owner'], # data['title'], # data['dt'], # "", # "", # formatted_time # ] # ] # Feishu.insert_columns("KUIksoqZkhvZOrtqA1McPwObn7d", "57c076", "ROWS", 1, 2) # time.sleep(0.5) # Feishu.update_values("KUIksoqZkhvZOrtqA1McPwObn7d", "57c076", "A2:Z2", values) logger.info(f"[+] 成功写入飞书表格") else: return top_tasks return top_tasks def video_task_start(): logger.info(f"[+] 任务开始获取小时级top数据") data_list = get_data_task() if data_list: logger.info(f"[+] 共获取{len(data_list)}条") else: logger.info(f"[+] 共获取0条") if not data_list: return for data in data_list: try: logger.info(f"[+] 任务处理{data}任务") top = Top() top.main(data) logger.info(f"[+] {data}处理成功") time.sleep(5) continue except Exception as e: data = json.loads(data) in_job_video_data("task:top_all_data", json.dumps(data, ensure_ascii=False, indent=4)) AliyunLogger.logging(data['channel'], data, "处理失败重新处理","fail") logger.error(f"[+] {data}处理失败,失败信息{e}") continue def schedule_tasks(): schedule.every(20).minutes.do(video_task_start) if __name__ == '__main__': video_task_start() schedule_tasks() # 调用任务调度函数 while True: schedule.run_pending() time.sleep(1) # 每秒钟检查一次