decode.py 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205
  1. from utils.params import DecodeContentParam, SceneEnum, ContentTypeEnum, CapabilityEnum
  2. from models.task import WorkflowTask
  3. from models.decode_task_result import WorkflowDecodeTaskResult
  4. from utils.sync_mysql_help import mysql
  5. from loguru import logger
  6. import sys
  7. from datetime import datetime
  8. from typing import Dict, Any, Optional
  9. logger.add(sink=sys.stderr, level="ERROR", backtrace=True, diagnose=True)
  10. # 错误码常量
  11. ERROR_CODE_SUCCESS = 0
  12. ERROR_CODE_FAILED = -1
  13. ERROR_CODE_TASK_CREATE_FAILED = 2001
  14. def _build_error_response(code: int, reason: str) -> Dict[str, Any]:
  15. """构建错误响应"""
  16. return {
  17. "code": code,
  18. "task_id": None,
  19. "reason": reason
  20. }
  21. def _build_success_response(task_id: str) -> Dict[str, Any]:
  22. """构建成功响应"""
  23. return {
  24. "code": ERROR_CODE_SUCCESS,
  25. "task_id": task_id,
  26. "reason": ""
  27. }
  28. def _create_workflow_task(scene: SceneEnum) -> Optional[WorkflowTask]:
  29. """创建工作流任务"""
  30. try:
  31. task = WorkflowTask.create_task(
  32. scene=scene,
  33. capability=CapabilityEnum.DECODE,
  34. root_task_id=''
  35. )
  36. logger.info(f"创建解构任务成功,task_id: {task.task_id}")
  37. return task
  38. except Exception as e:
  39. logger.error(f"创建解构任务失败: {str(e)}")
  40. return None
  41. def _initialize_task_result(task_id: str, content) -> Optional[WorkflowDecodeTaskResult]:
  42. """初始化任务结果"""
  43. try:
  44. result = WorkflowDecodeTaskResult.create_result(
  45. task_id=task_id,
  46. content=content
  47. )
  48. logger.info(f"初始化任务结果成功,task_id: {task_id}")
  49. return result
  50. except Exception as e:
  51. logger.error(f"初始化任务结果失败,task_id: {task_id}, error: {str(e)}")
  52. return None
  53. def _trigger_decode_workflow(task_id: str) -> bool:
  54. """触发解构工作流"""
  55. try:
  56. invoke_decode_workflow(task_id)
  57. logger.info(f"发起解构任务成功,task_id: {task_id}")
  58. return True
  59. except Exception as e:
  60. logger.error(f"发起解构任务失败,task_id: {task_id}, error: {str(e)}")
  61. return False
  62. def _check_quota(scene: SceneEnum, capability: CapabilityEnum = CapabilityEnum.DECODE) -> bool:
  63. """检查配额是否充足"""
  64. try:
  65. # 获取今天的日期,格式为 YYYYMMDD
  66. quota_date = datetime.now().strftime('%Y%m%d')
  67. scene_value = scene.value
  68. capability_value = capability.value
  69. # 1. 查询 workflow_daily_quota 表,查找今天的配额记录
  70. sql = """
  71. SELECT quota_limit, used_count, locked
  72. FROM workflow_daily_quota
  73. WHERE quota_date = %s AND scene = %s AND capability = %s
  74. """
  75. quota_record = mysql.fetchone(sql, (quota_date, scene_value, capability_value))
  76. # 2. 如果没找到,创建一条新记录,quota_limit 默认为 10
  77. if not quota_record:
  78. insert_sql = """
  79. INSERT INTO workflow_daily_quota (scene, capability, quota_date, quota_limit, used_count, locked)
  80. VALUES (%s, %s, %s, %s, %s, %s)
  81. """
  82. mysql.execute(insert_sql, (scene_value, capability_value, quota_date, 10, 0, 0))
  83. quota_limit = 10
  84. else:
  85. quota_limit = quota_record.get('quota_limit', 10)
  86. # 如果配额被锁定,直接返回 False
  87. if quota_record.get('locked', 0) == 1 or quota_record.get('used_count', 0) >= quota_record.get('quota_limit', 10):
  88. logger.warning(f"配额已锁定,scene={scene_value}, capability={capability_value}, date={quota_date}")
  89. return False
  90. # 3. 查询 workflow_task 表中今天已创建的任务数
  91. count_sql = """
  92. SELECT COUNT(*) as task_count
  93. FROM workflow_task
  94. WHERE scene = %s
  95. AND capability = %s
  96. AND DATE(created_time) = CURDATE()
  97. """
  98. count_result = mysql.fetchone(count_sql, (scene_value, capability_value))
  99. used_count = count_result.get('task_count', 0) if count_result else 0
  100. # 4. 如果已创建数 < quota_limit,返回 True,否则返回 False
  101. if used_count < quota_limit:
  102. logger.info(f"配额检查通过,scene={scene_value}, capability={capability_value}, used={used_count}, limit={quota_limit}")
  103. return True
  104. else:
  105. logger.warning(f"配额不足,scene={scene_value}, capability={capability_value}, used={used_count}, limit={quota_limit}")
  106. return False
  107. except Exception as e:
  108. logger.error(f"配额检查失败: {str(e)}")
  109. # 发生异常时,为了不影响业务,返回 True 允许继续执行
  110. return True
  111. def decode_topic(param: DecodeContentParam) -> Dict[str, Any]:
  112. """选题解构方法"""
  113. try:
  114. # 前置配额检查,用于超出每天解构次数时,直接返回错误
  115. if not _check_quota(param.scene, CapabilityEnum.DECODE):
  116. return _build_error_response(
  117. ERROR_CODE_FAILED,
  118. "配额不足"
  119. )
  120. # 步骤1: 创建工作流task任务
  121. task = _create_workflow_task(param.scene)
  122. if not task or not task.task_id:
  123. return _build_error_response(
  124. ERROR_CODE_TASK_CREATE_FAILED,
  125. "创建解构任务失败"
  126. )
  127. # 步骤2: 初始化任务结果
  128. result = _initialize_task_result(task.task_id, param.content)
  129. if not result or not result.task_id:
  130. return _build_error_response(
  131. ERROR_CODE_FAILED,
  132. "初始化任务结果失败"
  133. )
  134. # 步骤3: 触发解构工作流
  135. if not _trigger_decode_workflow(task.task_id):
  136. return _build_error_response(
  137. ERROR_CODE_FAILED,
  138. "发起解构任务失败"
  139. )
  140. # 所有步骤成功
  141. return _build_success_response(task.task_id)
  142. except Exception as e:
  143. logger.error(f"选题解构失败: {str(e)}")
  144. return _build_error_response(
  145. ERROR_CODE_TASK_CREATE_FAILED,
  146. f"解构任务创建失败: {str(e)}"
  147. )
  148. def begin_decode_task(param: DecodeContentParam) -> Dict[str, Any]:
  149. """根据场景分发任务"""
  150. if param.scene == SceneEnum.TOPIC:
  151. return decode_topic(param)
  152. elif param.scene == SceneEnum.CREATION:
  153. # TODO: 实现创作场景
  154. return _build_error_response(ERROR_CODE_FAILED, "创作场景暂未实现")
  155. elif param.scene == SceneEnum.PRODUCTION:
  156. # TODO: 实现制作场景
  157. return _build_error_response(ERROR_CODE_FAILED, "制作场景暂未实现")
  158. else:
  159. return _build_error_response(ERROR_CODE_FAILED, f"未知场景: {param.scene}")
  160. def invoke_decode_workflow(task_id: str) -> Dict[str, Any]:
  161. """发起解构任务"""
  162. try:
  163. # TODO: 调用实际的工作流接口
  164. return {
  165. "code": ERROR_CODE_SUCCESS,
  166. "task_id": task_id,
  167. "reason": ""
  168. }
  169. except Exception as e:
  170. logger.error(f"发起解构任务失败: {str(e)}")
  171. return {
  172. "code": ERROR_CODE_FAILED,
  173. "task_id": None,
  174. "reason": f"解构任务执行失败: {str(e)}"
  175. }