scriptTask.py 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. import json
  2. from loguru import logger
  3. import sys
  4. import time
  5. from typing import Dict, Any, List
  6. from datetime import datetime
  7. from utils.sync_mysql_help import mysql
  8. from examples.run_batch_script import build_script_input
  9. from src.workflows.script_workflow import ScriptWorkflow
  10. logger.add(sink=sys.stderr, level="ERROR", backtrace=True, diagnose=True)
  11. def invoke_script_workflow(decode_result: Dict[str, Any], script_result: Dict[str, Any]):
  12. """主函数"""
  13. # 读取原始三点解构结果
  14. raw_results: List[Dict[str, Any]] = decode_result.get("results", []) or []
  15. # 读取已有的脚本理解输出,支持增量追加
  16. output_data = script_result if isinstance(script_result, dict) else None
  17. if not output_data:
  18. output_data = {
  19. "timestamp": datetime.now().strftime("%Y%m%d_%H%M%S"),
  20. "total": 0,
  21. "success_count": 0,
  22. "fail_count": 0,
  23. "results": [],
  24. }
  25. existing_results: List[Dict[str, Any]] = output_data.get("results", []) or []
  26. # 用 channel_content_id + video URL 去重,避免重复处理
  27. processed_keys = {
  28. f"{item.get('video_data', {}).get('channel_content_id','')}|"
  29. f"{item.get('video_data', {}).get('video','')}"
  30. for item in existing_results
  31. }
  32. workflow = ScriptWorkflow()
  33. for item in raw_results:
  34. video_data = item.get("video_data", {}) or {}
  35. result = item.get("result", {}) or {}
  36. key = f"{video_data.get('channel_content_id','')}|{video_data.get('video','')}"
  37. if key in processed_keys:
  38. logger.info(f"已处理过该视频,跳过: {key}")
  39. continue
  40. logger.info(f"处理视频: channel_content_id={video_data.get('channel_content_id')} title={video_data.get('title')}")
  41. try:
  42. script_input = build_script_input(video_data, result)
  43. script_result = workflow.invoke(script_input)
  44. record = {
  45. "video_data": video_data,
  46. "what_deconstruction_result": result,
  47. "script_result": script_result,
  48. "success": True,
  49. "error": None,
  50. }
  51. output_data["success_count"] = output_data.get("success_count", 0) + 1
  52. except Exception as e:
  53. logger.error(f"脚本理解处理失败: {e}", exc_info=True)
  54. record = {
  55. "video_data": video_data,
  56. "what_deconstruction_result": result,
  57. "script_result": None,
  58. "success": False,
  59. "error": str(e),
  60. }
  61. output_data["fail_count"] = output_data.get("fail_count", 0) + 1
  62. output_data["results"].append(record)
  63. output_data["total"] = output_data.get("total", 0) + 1
  64. # 返回序列化后的 JSON 字符串(供数据库存储)
  65. return json.dumps(output_data, ensure_ascii=False)
  66. def get_script_result_by_id(task_id:str):
  67. sql = "SELECT * FROM decode_record WHERE task_id = %s AND task_status = 4 "
  68. tasks = mysql.fetchone(sql, (task_id,))
  69. if not tasks:
  70. logger.info(f"task_id = {task_id} , 任务不存在")
  71. return None
  72. return tasks['decode_result'], tasks['script_result'], tasks['task_status']
  73. def script_task_status_handler():
  74. # 从数据库中获取任务,每次获取一个
  75. sql = "SELECT * FROM decode_record WHERE task_status = 2 "
  76. """json"""
  77. tasks = mysql.fetchall(sql)
  78. # tasks = mysql.fetchall(sql,())
  79. if not tasks:
  80. logger.info("script任务列表为空")
  81. return
  82. for task in tasks:
  83. task_id = task['task_id']
  84. # 解析 decode_result(可能为 None 或已是对象)
  85. decode_result_raw = task.get('decode_result')
  86. if isinstance(decode_result_raw, (dict, list)):
  87. decode_result = decode_result_raw
  88. elif decode_result_raw:
  89. try:
  90. decode_result = json.loads(decode_result_raw)
  91. except Exception as e:
  92. logger.error(f"task_id = {task_id} , decode_result 解析失败: {e}")
  93. decode_result = {"results": []}
  94. else:
  95. decode_result = {"results": []}
  96. # 解析 script_result(可能为 None 或已是对象)
  97. script_result_raw = task.get('script_result')
  98. if isinstance(script_result_raw, (dict, list)):
  99. script_result = script_result_raw
  100. elif script_result_raw:
  101. try:
  102. script_result = json.loads(script_result_raw)
  103. except Exception as e:
  104. logger.error(f"task_id = {task_id} , script_result 解析失败: {e}")
  105. script_result = None
  106. else:
  107. script_result = None
  108. logger.info(f"task_id = {task_id} , decode_result = {decode_result}")
  109. #如何任务超过30分钟,则认为任务超时,更新任务状态为3
  110. task_create_timestamp = task['create_timestamp']
  111. current_timestamp = int(time.time() * 1000)
  112. try:
  113. sql = "UPDATE decode_record SET task_status = 3,task_id = %s"
  114. mysql.execute(sql, (task_id))
  115. except Exception as e:
  116. logger.error(f"task_id = {task_id} , error = {e}")
  117. sql = "UPDATE decode_record SET task_status = 3, script_result = '任务异常' WHERE task_id = %s"
  118. mysql.execute(sql, (task_id))
  119. logger.info(f"task_id = {task_id} ,任务异常")
  120. continue
  121. try:
  122. result = invoke_script_workflow(decode_result, script_result)
  123. if result:
  124. # 更新任务状态为2,任务完成
  125. sql = "UPDATE decode_record SET task_status = 4, script_result = %s WHERE task_id = %s"
  126. mysql.execute(sql, (result, task_id))
  127. logger.info(f"task_id = {task_id} , script_result = {result}")
  128. else:
  129. if current_timestamp - task_create_timestamp > 1000 * 60 * 60:
  130. sql = "UPDATE decode_record SET task_status = 3, script_result = %s WHERE task_id = %s"
  131. mysql.execute(sql, (result, task_id))
  132. logger.info(f"task_id = {task_id} ,任务状态异常")
  133. except Exception as e:
  134. logger.error(f"task_id = {task_id} , error = {e}")
  135. sql = "UPDATE decode_record SET task_status = 3, script_result = '任务异常' WHERE task_id = %s"
  136. mysql.execute(sql, (task_id))
  137. logger.info(f"task_id = {task_id} ,任务异常")
  138. continue