|
|
@@ -19,28 +19,27 @@ logger.add(sink=sys.stderr, level="ERROR", backtrace=True, diagnose=True)
|
|
|
|
|
|
def invoke_decode_workflow(task_params: Dict[str, Any]):
|
|
|
"""主函数"""
|
|
|
-
|
|
|
- result = DecodeWorkflow(task_params)
|
|
|
+ workflow = DecodeWorkflow()
|
|
|
+ result = workflow.invoke(task_params)
|
|
|
if result:
|
|
|
- return result
|
|
|
+ return result
|
|
|
else:
|
|
|
print(f"❌ 保存结果失败,但将继续处理")
|
|
|
return None
|
|
|
|
|
|
|
|
|
def get_decode_result_by_id(task_id:str):
|
|
|
- sql = "SELECT * FROM decode_videos WHERE task_id = %s AND task_status = 2 "
|
|
|
+ sql = "SELECT * FROM decode_videos WHERE task_id = %s"
|
|
|
tasks = mysql.fetchone(sql, (task_id,))
|
|
|
if not tasks:
|
|
|
logger.info(f"task_id = {task_id} , 任务不存在")
|
|
|
return None
|
|
|
- return tasks['result'], tasks['status']
|
|
|
+ return tasks['result'], tasks['status'],tasks['error_reason']
|
|
|
|
|
|
|
|
|
def decode_task_status_handler():
|
|
|
# 从数据库中获取任务,每次获取一个
|
|
|
- sql = "SELECT * FROM decode_record WHERE task_status = 0 "
|
|
|
- """json"""
|
|
|
+ sql = "SELECT * FROM decode_record WHERE task_status = 0 ORDER BY create_timestamp ASC LIMIT 1"
|
|
|
task = mysql.fetchone(sql)
|
|
|
|
|
|
|
|
|
@@ -49,8 +48,8 @@ def decode_task_status_handler():
|
|
|
return
|
|
|
else:
|
|
|
task_id = task['task_id']
|
|
|
- sql = "UPDATE decode_record SET task_status = 1 WHERE task_id = %s"
|
|
|
- mysql.execute(sql, (task_id,))
|
|
|
+ sql = "UPDATE decode_record SET task_status = 1 WHERE task_id = %s AND task_status = 0"
|
|
|
+ mysql.execute(sql, (task_id,))
|
|
|
|
|
|
# 获取任务结果
|
|
|
try:
|
|
|
@@ -66,20 +65,24 @@ def decode_task_status_handler():
|
|
|
|
|
|
if decode_result:
|
|
|
# 更新任务状态为2,任务完成
|
|
|
- sql = "UPDATE decode_record SET task_status = 2, WHERE task_id = %s"
|
|
|
- mysql.execute(sql, (task_id))
|
|
|
+ sql = "UPDATE decode_record SET task_status = 2 WHERE task_id = %s"
|
|
|
+ mysql.execute(sql, (task_id,))
|
|
|
logger.info(f"task_id = {task_id} , decode_result = {decode_result}")
|
|
|
|
|
|
|
|
|
else:
|
|
|
if current_timestamp - task_create_timestamp > 1000 * 60 * 60:
|
|
|
- sql = "UPDATE decode_record SET task_status = 3, WHERE task_id = %s"
|
|
|
- mysql.execute(sql, (task_id))
|
|
|
+ sql = "UPDATE decode_record SET task_status = 3 WHERE task_id = %s"
|
|
|
+ mysql.execute(sql, (task_id,))
|
|
|
logger.info(f"task_id = {task_id} ,任务状态异常")
|
|
|
except Exception as e:
|
|
|
logger.error(f"task_id = {task_id} , error = {e}")
|
|
|
- sql = "UPDATE decode_record SET task_status = 3, WHERE task_id = %s"
|
|
|
- mysql.execute(sql, (task_id))
|
|
|
+ sql = "UPDATE decode_record SET task_status = 3 WHERE task_id = %s"
|
|
|
+ mysql.execute(sql, (task_id,))
|
|
|
logger.info(f"task_id = {task_id} ,任务异常")
|
|
|
- raise {"task_id": task_id, "error": '任务异常'}
|
|
|
+ raise RuntimeError(f"task_id={task_id} 任务异常: {e}")
|
|
|
+
|
|
|
|
|
|
+if __name__ == "__main__":
|
|
|
+ decode_task_status_handler()
|
|
|
+
|