import json from loguru import logger import sys import time from typing import Dict, Any, List from datetime import datetime from utils.sync_mysql_help import mysql from examples.run_batch_script import build_script_input from src.workflows.script_workflow import ScriptWorkflow logger.add(sink=sys.stderr, level="ERROR", backtrace=True, diagnose=True) def invoke_script_workflow(decode_result: Dict[str, Any], script_result: Dict[str, Any]): """主函数""" # 读取原始三点解构结果 raw_results: List[Dict[str, Any]] = decode_result.get("results", []) or [] # 读取已有的脚本理解输出,支持增量追加 output_data = script_result if isinstance(script_result, dict) else None if not output_data: output_data = { "timestamp": datetime.now().strftime("%Y%m%d_%H%M%S"), "total": 0, "success_count": 0, "fail_count": 0, "results": [], } existing_results: List[Dict[str, Any]] = output_data.get("results", []) or [] # 用 channel_content_id + video URL 去重,避免重复处理 processed_keys = { f"{item.get('video_data', {}).get('channel_content_id','')}|" f"{item.get('video_data', {}).get('video','')}" for item in existing_results } workflow = ScriptWorkflow() for item in raw_results: video_data = item.get("video_data", {}) or {} result = item.get("result", {}) or {} key = f"{video_data.get('channel_content_id','')}|{video_data.get('video','')}" if key in processed_keys: logger.info(f"已处理过该视频,跳过: {key}") continue logger.info(f"处理视频: channel_content_id={video_data.get('channel_content_id')} title={video_data.get('title')}") try: script_input = build_script_input(video_data, result) script_result = workflow.invoke(script_input) record = { "video_data": video_data, "what_deconstruction_result": result, "script_result": script_result, "success": True, "error": None, } output_data["success_count"] = output_data.get("success_count", 0) + 1 except Exception as e: logger.error(f"脚本理解处理失败: {e}", exc_info=True) record = { "video_data": video_data, "what_deconstruction_result": result, "script_result": None, "success": False, "error": str(e), } output_data["fail_count"] = output_data.get("fail_count", 0) + 1 output_data["results"].append(record) output_data["total"] = output_data.get("total", 0) + 1 # 返回序列化后的 JSON 字符串(供数据库存储) return json.dumps(output_data, ensure_ascii=False) def get_script_result_by_id(task_id:str): sql = "SELECT * FROM decode_record WHERE task_id = %s AND task_status = 4 " tasks = mysql.fetchone(sql, (task_id,)) if not tasks: logger.info(f"task_id = {task_id} , 任务不存在") return None return tasks['decode_result'], tasks['script_result'], tasks['task_status'] def script_task_status_handler(): # 从数据库中获取任务,每次获取一个 sql = "SELECT * FROM decode_record WHERE task_status = 2 " """json""" tasks = mysql.fetchall(sql) # tasks = mysql.fetchall(sql,()) if not tasks: logger.info("script任务列表为空") return for task in tasks: task_id = task['task_id'] # 解析 decode_result(可能为 None 或已是对象) decode_result_raw = task.get('decode_result') if isinstance(decode_result_raw, (dict, list)): decode_result = decode_result_raw elif decode_result_raw: try: decode_result = json.loads(decode_result_raw) except Exception as e: logger.error(f"task_id = {task_id} , decode_result 解析失败: {e}") decode_result = {"results": []} else: decode_result = {"results": []} # 解析 script_result(可能为 None 或已是对象) script_result_raw = task.get('script_result') if isinstance(script_result_raw, (dict, list)): script_result = script_result_raw elif script_result_raw: try: script_result = json.loads(script_result_raw) except Exception as e: logger.error(f"task_id = {task_id} , script_result 解析失败: {e}") script_result = None else: script_result = None logger.info(f"task_id = {task_id} , decode_result = {decode_result}") #如何任务超过30分钟,则认为任务超时,更新任务状态为3 task_create_timestamp = task['create_timestamp'] current_timestamp = int(time.time() * 1000) try: sql = "UPDATE decode_record SET task_status = 3,task_id = %s" mysql.execute(sql, (task_id)) except Exception as e: logger.error(f"task_id = {task_id} , error = {e}") sql = "UPDATE decode_record SET task_status = 3, script_result = '任务异常' WHERE task_id = %s" mysql.execute(sql, (task_id)) logger.info(f"task_id = {task_id} ,任务异常") continue try: result = invoke_script_workflow(decode_result, script_result) if result: # 更新任务状态为2,任务完成 sql = "UPDATE decode_record SET task_status = 4, script_result = %s WHERE task_id = %s" mysql.execute(sql, (result, task_id)) logger.info(f"task_id = {task_id} , script_result = {result}") else: if current_timestamp - task_create_timestamp > 1000 * 60 * 60: sql = "UPDATE decode_record SET task_status = 3, script_result = %s WHERE task_id = %s" mysql.execute(sql, (result, task_id)) logger.info(f"task_id = {task_id} ,任务状态异常") except Exception as e: logger.error(f"task_id = {task_id} , error = {e}") sql = "UPDATE decode_record SET task_status = 3, script_result = '任务异常' WHERE task_id = %s" mysql.execute(sql, (task_id)) logger.info(f"task_id = {task_id} ,任务异常") continue