| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173 |
- import json
- from loguru import logger
- import sys
- import time
- from typing import Dict, Any, List
- from datetime import datetime
- from utils.sync_mysql_help import mysql
- from examples.run_batch_script import build_script_input
- from src.workflows.script_workflow import ScriptWorkflow
- logger.add(sink=sys.stderr, level="ERROR", backtrace=True, diagnose=True)
- def invoke_script_workflow(decode_result: Dict[str, Any], script_result: Dict[str, Any]):
- """主函数"""
-
- # 读取原始三点解构结果
- raw_results: List[Dict[str, Any]] = decode_result.get("results", []) or []
- # 读取已有的脚本理解输出,支持增量追加
- output_data = script_result if isinstance(script_result, dict) else None
- if not output_data:
- output_data = {
- "timestamp": datetime.now().strftime("%Y%m%d_%H%M%S"),
- "total": 0,
- "success_count": 0,
- "fail_count": 0,
- "results": [],
- }
- existing_results: List[Dict[str, Any]] = output_data.get("results", []) or []
- # 用 channel_content_id + video URL 去重,避免重复处理(兼容旧字段名 video 和新字段名 video_url)
- processed_keys = {
- f"{item.get('video_data', {}).get('channel_content_id','') or item.get('video_data', {}).get('video_id','')}|"
- f"{item.get('video_data', {}).get('video_url','') or item.get('video_data', {}).get('video','')}"
- for item in existing_results
- }
- workflow = ScriptWorkflow()
- for item in raw_results:
- video_data = item.get("video_data", {}) or {}
- result = item.get("result", {}) or {}
- # 兼容旧字段名 channel_content_id/video 和新字段名 video_id/video_url
- video_id = video_data.get('channel_content_id','') or video_data.get('video_id','')
- video_url = video_data.get('video_url','') or video_data.get('video','')
- key = f"{video_id}|{video_url}"
- if key in processed_keys:
- logger.info(f"已处理过该视频,跳过: {key}")
- continue
- logger.info(f"处理视频: channel_content_id={video_data.get('channel_content_id')} title={video_data.get('title')}")
- try:
- script_input = build_script_input(video_data, result)
- script_result = workflow.invoke(script_input)
- record = {
- "video_data": video_data,
- "what_deconstruction_result": result,
- "script_result": script_result,
- "success": True,
- "error": None,
- }
- output_data["success_count"] = output_data.get("success_count", 0) + 1
- except Exception as e:
- logger.error(f"脚本理解处理失败: {e}", exc_info=True)
- record = {
- "video_data": video_data,
- "what_deconstruction_result": result,
- "script_result": None,
- "success": False,
- "error": str(e),
- }
- output_data["fail_count"] = output_data.get("fail_count", 0) + 1
- output_data["results"].append(record)
- output_data["total"] = output_data.get("total", 0) + 1
- # 返回序列化后的 JSON 字符串(供数据库存储)
- return json.dumps(output_data, ensure_ascii=False)
-
- def get_script_result_by_id(task_id:str):
- sql = "SELECT * FROM decode_record WHERE task_id = %s AND task_status = 4 "
- tasks = mysql.fetchone(sql, (task_id,))
- if not tasks:
- logger.info(f"task_id = {task_id} , 任务不存在")
- return None
- return tasks['decode_result'], tasks['script_result'], tasks['task_status']
- def script_task_status_handler():
- # 从数据库中获取任务,每次获取一个
- sql = "SELECT * FROM decode_record WHERE task_status = 2 "
- """json"""
- tasks = mysql.fetchall(sql)
- # tasks = mysql.fetchall(sql,())
- if not tasks:
- logger.info("script任务列表为空")
- return
- for task in tasks:
- task_id = task['task_id']
- # 解析 decode_result(可能为 None 或已是对象)
- decode_result_raw = task.get('decode_result')
- if isinstance(decode_result_raw, (dict, list)):
- decode_result = decode_result_raw
- elif decode_result_raw:
- try:
- decode_result = json.loads(decode_result_raw)
- except Exception as e:
- logger.error(f"task_id = {task_id} , decode_result 解析失败: {e}")
- decode_result = {"results": []}
- else:
- decode_result = {"results": []}
- # 解析 script_result(可能为 None 或已是对象)
- script_result_raw = task.get('script_result')
- if isinstance(script_result_raw, (dict, list)):
- script_result = script_result_raw
- elif script_result_raw:
- try:
- script_result = json.loads(script_result_raw)
- except Exception as e:
- logger.error(f"task_id = {task_id} , script_result 解析失败: {e}")
- script_result = None
- else:
- script_result = None
- logger.info(f"task_id = {task_id} , decode_result = {decode_result}")
- #如何任务超过30分钟,则认为任务超时,更新任务状态为3
- task_create_timestamp = task['create_timestamp']
- current_timestamp = int(time.time() * 1000)
- try:
- sql = "UPDATE decode_record SET task_status = 3,task_id = %s"
- mysql.execute(sql, (task_id))
-
- except Exception as e:
- logger.error(f"task_id = {task_id} , error = {e}")
- sql = "UPDATE decode_record SET task_status = 3, script_result = '任务异常' WHERE task_id = %s"
- mysql.execute(sql, (task_id))
- logger.info(f"task_id = {task_id} ,任务异常")
- continue
- try:
- result = invoke_script_workflow(decode_result, script_result)
- if result:
- # 更新任务状态为2,任务完成
- sql = "UPDATE decode_record SET task_status = 4, script_result = %s WHERE task_id = %s"
- mysql.execute(sql, (result, task_id))
- logger.info(f"task_id = {task_id} , script_result = {result}")
- else:
- if current_timestamp - task_create_timestamp > 1000 * 60 * 60:
- sql = "UPDATE decode_record SET task_status = 3, script_result = %s WHERE task_id = %s"
- mysql.execute(sql, (result, task_id))
- logger.info(f"task_id = {task_id} ,任务状态异常")
- except Exception as e:
- logger.error(f"task_id = {task_id} , error = {e}")
- sql = "UPDATE decode_record SET task_status = 3, script_result = '任务异常' WHERE task_id = %s"
- mysql.execute(sql, (task_id))
- logger.info(f"task_id = {task_id} ,任务异常")
- continue
|