scriptTask.py 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. import json
  2. from loguru import logger
  3. import sys
  4. import time
  5. from typing import Dict, Any, List
  6. from datetime import datetime
  7. from utils.sync_mysql_help import mysql
  8. from examples.run_batch_script import build_script_input
  9. from src.workflows.script_workflow import ScriptWorkflow
  10. logger.add(sink=sys.stderr, level="ERROR", backtrace=True, diagnose=True)
  11. def invoke_script_workflow(decode_result: Dict[str, Any], script_result: Dict[str, Any]):
  12. """主函数"""
  13. # 读取原始三点解构结果
  14. raw_results: List[Dict[str, Any]] = decode_result.get("results", []) or []
  15. # 读取已有的脚本理解输出,支持增量追加
  16. output_data = script_result if isinstance(script_result, dict) else None
  17. if not output_data:
  18. output_data = {
  19. "timestamp": datetime.now().strftime("%Y%m%d_%H%M%S"),
  20. "total": 0,
  21. "success_count": 0,
  22. "fail_count": 0,
  23. "results": [],
  24. }
  25. existing_results: List[Dict[str, Any]] = output_data.get("results", []) or []
  26. # 用 channel_content_id + video URL 去重,避免重复处理(兼容旧字段名 video 和新字段名 video_url)
  27. processed_keys = {
  28. f"{item.get('video_data', {}).get('channel_content_id','') or item.get('video_data', {}).get('video_id','')}|"
  29. f"{item.get('video_data', {}).get('video_url','') or item.get('video_data', {}).get('video','')}"
  30. for item in existing_results
  31. }
  32. workflow = ScriptWorkflow()
  33. for item in raw_results:
  34. video_data = item.get("video_data", {}) or {}
  35. result = item.get("result", {}) or {}
  36. # 兼容旧字段名 channel_content_id/video 和新字段名 video_id/video_url
  37. video_id = video_data.get('channel_content_id','') or video_data.get('video_id','')
  38. video_url = video_data.get('video_url','') or video_data.get('video','')
  39. key = f"{video_id}|{video_url}"
  40. if key in processed_keys:
  41. logger.info(f"已处理过该视频,跳过: {key}")
  42. continue
  43. logger.info(f"处理视频: channel_content_id={video_data.get('channel_content_id')} title={video_data.get('title')}")
  44. try:
  45. script_input = build_script_input(video_data, result)
  46. script_result = workflow.invoke(script_input)
  47. record = {
  48. "video_data": video_data,
  49. "what_deconstruction_result": result,
  50. "script_result": script_result,
  51. "success": True,
  52. "error": None,
  53. }
  54. output_data["success_count"] = output_data.get("success_count", 0) + 1
  55. except Exception as e:
  56. logger.error(f"脚本理解处理失败: {e}", exc_info=True)
  57. record = {
  58. "video_data": video_data,
  59. "what_deconstruction_result": result,
  60. "script_result": None,
  61. "success": False,
  62. "error": str(e),
  63. }
  64. output_data["fail_count"] = output_data.get("fail_count", 0) + 1
  65. output_data["results"].append(record)
  66. output_data["total"] = output_data.get("total", 0) + 1
  67. # 返回序列化后的 JSON 字符串(供数据库存储)
  68. return json.dumps(output_data, ensure_ascii=False)
  69. def get_script_result_by_id(task_id:str):
  70. sql = "SELECT * FROM decode_record WHERE task_id = %s AND task_status = 4 "
  71. tasks = mysql.fetchone(sql, (task_id,))
  72. if not tasks:
  73. logger.info(f"task_id = {task_id} , 任务不存在")
  74. return None
  75. return tasks['decode_result'], tasks['script_result'], tasks['task_status']
  76. def script_task_status_handler():
  77. # 从数据库中获取任务,每次获取一个
  78. sql = "SELECT * FROM decode_record WHERE task_status = 2 "
  79. """json"""
  80. tasks = mysql.fetchall(sql)
  81. # tasks = mysql.fetchall(sql,())
  82. if not tasks:
  83. logger.info("script任务列表为空")
  84. return
  85. for task in tasks:
  86. task_id = task['task_id']
  87. # 解析 decode_result(可能为 None 或已是对象)
  88. decode_result_raw = task.get('decode_result')
  89. if isinstance(decode_result_raw, (dict, list)):
  90. decode_result = decode_result_raw
  91. elif decode_result_raw:
  92. try:
  93. decode_result = json.loads(decode_result_raw)
  94. except Exception as e:
  95. logger.error(f"task_id = {task_id} , decode_result 解析失败: {e}")
  96. decode_result = {"results": []}
  97. else:
  98. decode_result = {"results": []}
  99. # 解析 script_result(可能为 None 或已是对象)
  100. script_result_raw = task.get('script_result')
  101. if isinstance(script_result_raw, (dict, list)):
  102. script_result = script_result_raw
  103. elif script_result_raw:
  104. try:
  105. script_result = json.loads(script_result_raw)
  106. except Exception as e:
  107. logger.error(f"task_id = {task_id} , script_result 解析失败: {e}")
  108. script_result = None
  109. else:
  110. script_result = None
  111. logger.info(f"task_id = {task_id} , decode_result = {decode_result}")
  112. #如何任务超过30分钟,则认为任务超时,更新任务状态为3
  113. task_create_timestamp = task['create_timestamp']
  114. current_timestamp = int(time.time() * 1000)
  115. try:
  116. sql = "UPDATE decode_record SET task_status = 3,task_id = %s"
  117. mysql.execute(sql, (task_id))
  118. except Exception as e:
  119. logger.error(f"task_id = {task_id} , error = {e}")
  120. sql = "UPDATE decode_record SET task_status = 3, script_result = '任务异常' WHERE task_id = %s"
  121. mysql.execute(sql, (task_id))
  122. logger.info(f"task_id = {task_id} ,任务异常")
  123. continue
  124. try:
  125. result = invoke_script_workflow(decode_result, script_result)
  126. if result:
  127. # 更新任务状态为2,任务完成
  128. sql = "UPDATE decode_record SET task_status = 4, script_result = %s WHERE task_id = %s"
  129. mysql.execute(sql, (result, task_id))
  130. logger.info(f"task_id = {task_id} , script_result = {result}")
  131. else:
  132. if current_timestamp - task_create_timestamp > 1000 * 60 * 60:
  133. sql = "UPDATE decode_record SET task_status = 3, script_result = %s WHERE task_id = %s"
  134. mysql.execute(sql, (result, task_id))
  135. logger.info(f"task_id = {task_id} ,任务状态异常")
  136. except Exception as e:
  137. logger.error(f"task_id = {task_id} , error = {e}")
  138. sql = "UPDATE decode_record SET task_status = 3, script_result = '任务异常' WHERE task_id = %s"
  139. mysql.execute(sql, (task_id))
  140. logger.info(f"task_id = {task_id} ,任务异常")
  141. continue