import json from loguru import logger import sys import time from typing import Dict, Any, List from datetime import datetime from utils.sync_mysql_help import mysql from examples.run_batch_script import build_script_input from src.workflows.script_workflow import ScriptWorkflow logger.add(sink=sys.stderr, level="ERROR", backtrace=True, diagnose=True) def invoke_script_workflow(decode_result: Dict[str, Any], script_result: Dict[str, Any]): """主函数""" # 读取原始三点解构结果 raw_results: List[Dict[str, Any]] = decode_result.get("results", []) or [] # 读取已有的脚本理解输出,支持增量追加 output_data = script_result if not output_data: output_data = { "timestamp": datetime.now().strftime("%Y%m%d_%H%M%S"), "total": 0, "success_count": 0, "fail_count": 0, "results": [], } existing_results: List[Dict[str, Any]] = output_data.get("results", []) or [] # 用 channel_content_id + video URL 去重,避免重复处理 processed_keys = { f"{item.get('video_data', {}).get('channel_content_id','')}|" f"{item.get('video_data', {}).get('video','')}" for item in existing_results } workflow = ScriptWorkflow() for item in raw_results: video_data = item.get("video_data", {}) or {} result = item.get("result", {}) or {} key = f"{video_data.get('channel_content_id','')}|{video_data.get('video','')}" if key in processed_keys: logger.info(f"已处理过该视频,跳过: {key}") continue logger.info(f"处理视频: channel_content_id={video_data.get('channel_content_id')} title={video_data.get('title')}") try: script_input = build_script_input(video_data, result) script_result = workflow.invoke(script_input) record = { "video_data": video_data, "what_deconstruction_result": result, "script_result": script_result, "success": True, "error": None, } output_data["success_count"] = output_data.get("success_count", 0) + 1 except Exception as e: logger.error(f"脚本理解处理失败: {e}", exc_info=True) record = { "video_data": video_data, "what_deconstruction_result": result, "script_result": None, "success": False, "error": str(e), } output_data["fail_count"] = output_data.get("fail_count", 0) + 1 output_data["results"].append(record) output_data["total"] = output_data.get("total", 0) + 1 # 处理完一条就保存一次,避免长任务中途失败导致全部丢失 return json.dump(output_data, ensure_ascii=False, indent=2) def get_script_result_by_id(task_id:str): sql = "SELECT * FROM decode_record WHERE task_id = %s AND task_status = 4 " tasks = mysql.fetchone(sql, (task_id,)) if not tasks: logger.info(f"task_id = {task_id} , 任务不存在") return None return tasks['script_result'], tasks['decode_result'], tasks['task_status'] def script_task_status_handler(task_id:str): # 从数据库中获取任务,每次获取一个 sql = "SELECT * FROM decode_record WHERE task_status = 2 " """json""" tasks = mysql.fetchall(sql) # tasks = mysql.fetchall(sql,()) if not tasks: logger.info("任务列表为空") return for task in tasks: task_id = task['task_id'] decode_result = json.loads(task['decode_result']) script_result = json.loads(task['script_result']) logger.info(f"task_id = {task_id} , decode_result = {decode_result}") #如何任务超过30分钟,则认为任务超时,更新任务状态为3 task_create_timestamp = task['script_timestamp'] current_timestamp = int(time.time() * 1000) try: sql = "UPDATE decode_record SET task_status = 3,task_id = %s" mysql.execute(sql, (task_id)) except Exception as e: logger.error(f"task_id = {task_id} , error = {e}") sql = "UPDATE decode_record SET task_status = 3, script_result = '任务异常' WHERE task_id = %s" mysql.execute(sql, (task_id)) logger.info(f"task_id = {task_id} ,任务异常") continue try: result = invoke_script_workflow(decode_result, script_result) if result: # 更新任务状态为2,任务完成 sql = "UPDATE decode_record SET task_status = 4, script_result = %s WHERE task_id = %s" mysql.execute(sql, (result, task_id)) logger.info(f"task_id = {task_id} , script_result = {result}") else: if current_timestamp - task_create_timestamp > 1000 * 60 * 60: sql = "UPDATE decode_record SET task_status = 3, script_result = %s WHERE task_id = %s" mysql.execute(sql, (result, task_id)) logger.info(f"task_id = {task_id} ,任务状态异常") except Exception as e: logger.error(f"task_id = {task_id} , error = {e}") sql = "UPDATE decode_record SET task_status = 3, script_result = '任务异常' WHERE task_id = %s" mysql.execute(sql, (task_id)) logger.info(f"task_id = {task_id} ,任务异常") continue