|
|
@@ -21,7 +21,7 @@ def invoke_script_workflow(decode_result: Dict[str, Any], script_result: Dict[st
|
|
|
raw_results: List[Dict[str, Any]] = decode_result.get("results", []) or []
|
|
|
|
|
|
# 读取已有的脚本理解输出,支持增量追加
|
|
|
- output_data = script_result
|
|
|
+ output_data = script_result if isinstance(script_result, dict) else None
|
|
|
if not output_data:
|
|
|
output_data = {
|
|
|
"timestamp": datetime.now().strftime("%Y%m%d_%H%M%S"),
|
|
|
@@ -80,10 +80,8 @@ def invoke_script_workflow(decode_result: Dict[str, Any], script_result: Dict[st
|
|
|
output_data["results"].append(record)
|
|
|
output_data["total"] = output_data.get("total", 0) + 1
|
|
|
|
|
|
- # 处理完一条就保存一次,避免长任务中途失败导致全部丢失
|
|
|
-
|
|
|
-
|
|
|
- return json.dump(output_data, ensure_ascii=False, indent=2)
|
|
|
+ # 返回序列化后的 JSON 字符串(供数据库存储)
|
|
|
+ return json.dumps(output_data, ensure_ascii=False)
|
|
|
|
|
|
|
|
|
|
|
|
@@ -94,10 +92,10 @@ def get_script_result_by_id(task_id:str):
|
|
|
if not tasks:
|
|
|
logger.info(f"task_id = {task_id} , 任务不存在")
|
|
|
return None
|
|
|
- return tasks['script_result'], tasks['decode_result'], tasks['task_status']
|
|
|
+ return tasks['decode_result'], tasks['script_result'], tasks['task_status']
|
|
|
|
|
|
|
|
|
-def script_task_status_handler(task_id:str):
|
|
|
+def script_task_status_handler():
|
|
|
# 从数据库中获取任务,每次获取一个
|
|
|
sql = "SELECT * FROM decode_record WHERE task_status = 2 "
|
|
|
"""json"""
|
|
|
@@ -105,16 +103,39 @@ def script_task_status_handler(task_id:str):
|
|
|
# tasks = mysql.fetchall(sql,())
|
|
|
|
|
|
if not tasks:
|
|
|
- logger.info("任务列表为空")
|
|
|
+ logger.info("script任务列表为空")
|
|
|
return
|
|
|
for task in tasks:
|
|
|
task_id = task['task_id']
|
|
|
- decode_result = json.loads(task['decode_result'])
|
|
|
- script_result = json.loads(task['script_result'])
|
|
|
+ # 解析 decode_result(可能为 None 或已是对象)
|
|
|
+ decode_result_raw = task.get('decode_result')
|
|
|
+ if isinstance(decode_result_raw, (dict, list)):
|
|
|
+ decode_result = decode_result_raw
|
|
|
+ elif decode_result_raw:
|
|
|
+ try:
|
|
|
+ decode_result = json.loads(decode_result_raw)
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"task_id = {task_id} , decode_result 解析失败: {e}")
|
|
|
+ decode_result = {"results": []}
|
|
|
+ else:
|
|
|
+ decode_result = {"results": []}
|
|
|
+
|
|
|
+ # 解析 script_result(可能为 None 或已是对象)
|
|
|
+ script_result_raw = task.get('script_result')
|
|
|
+ if isinstance(script_result_raw, (dict, list)):
|
|
|
+ script_result = script_result_raw
|
|
|
+ elif script_result_raw:
|
|
|
+ try:
|
|
|
+ script_result = json.loads(script_result_raw)
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"task_id = {task_id} , script_result 解析失败: {e}")
|
|
|
+ script_result = None
|
|
|
+ else:
|
|
|
+ script_result = None
|
|
|
|
|
|
logger.info(f"task_id = {task_id} , decode_result = {decode_result}")
|
|
|
#如何任务超过30分钟,则认为任务超时,更新任务状态为3
|
|
|
- task_create_timestamp = task['script_timestamp']
|
|
|
+ task_create_timestamp = task['create_timestamp']
|
|
|
current_timestamp = int(time.time() * 1000)
|
|
|
|
|
|
|
|
|
@@ -147,4 +168,3 @@ def script_task_status_handler(task_id:str):
|
|
|
mysql.execute(sql, (task_id))
|
|
|
logger.info(f"task_id = {task_id} ,任务异常")
|
|
|
continue
|
|
|
-
|