decode.py 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245
  1. from utils.params import DecodeContentParam, SceneEnum, ContentTypeEnum, CapabilityEnum
  2. from models.task import WorkflowTask
  3. from models.decode_task_result import WorkflowDecodeTaskResult
  4. from utils.sync_mysql_help import mysql
  5. from pymysql.cursors import DictCursor
  6. from loguru import logger
  7. import sys
  8. from datetime import datetime
  9. from typing import Dict, Any, Optional
  10. logger.add(sink=sys.stderr, level="ERROR", backtrace=True, diagnose=True)
  11. # 错误码常量
  12. ERROR_CODE_SUCCESS = 0
  13. ERROR_CODE_FAILED = -1
  14. ERROR_CODE_TASK_CREATE_FAILED = 2001
  15. def _build_error_response(code: int, reason: str) -> Dict[str, Any]:
  16. """构建错误响应"""
  17. return {
  18. "code": code,
  19. "task_id": None,
  20. "reason": reason
  21. }
  22. def _build_success_response(task_id: str) -> Dict[str, Any]:
  23. """构建成功响应"""
  24. return {
  25. "code": ERROR_CODE_SUCCESS,
  26. "task_id": task_id,
  27. "reason": ""
  28. }
  29. def _create_workflow_task(scene: SceneEnum) -> Optional[WorkflowTask]:
  30. """创建工作流任务"""
  31. try:
  32. task = WorkflowTask.create_task(
  33. scene=scene,
  34. capability=CapabilityEnum.DECODE,
  35. root_task_id=''
  36. )
  37. logger.info(f"创建解构任务成功,task_id: {task.task_id}")
  38. return task
  39. except Exception as e:
  40. logger.error(f"创建解构任务失败: {str(e)}")
  41. return None
  42. def _initialize_task_result(task_id: str, content) -> Optional[WorkflowDecodeTaskResult]:
  43. """初始化任务结果"""
  44. try:
  45. result = WorkflowDecodeTaskResult.create_result(
  46. task_id=task_id,
  47. content=content
  48. )
  49. logger.info(f"初始化任务结果成功,task_id: {task_id}")
  50. return result
  51. except Exception as e:
  52. logger.error(f"初始化任务结果失败,task_id: {task_id}, error: {str(e)}")
  53. return None
  54. def _trigger_decode_workflow(task_id: str) -> bool:
  55. """触发解构工作流"""
  56. try:
  57. invoke_decode_workflow(task_id)
  58. logger.info(f"发起解构任务成功,task_id: {task_id}")
  59. return True
  60. except Exception as e:
  61. logger.error(f"发起解构任务失败,task_id: {task_id}, error: {str(e)}")
  62. return False
  63. def _check_quota(scene: SceneEnum, capability: CapabilityEnum = CapabilityEnum.DECODE) -> bool:
  64. """检查配额是否充足(并发安全版本)"""
  65. try:
  66. # 获取今天的日期,格式为 YYYYMMDD
  67. quota_date = datetime.now().strftime('%Y%m%d')
  68. scene_value = scene.value
  69. capability_value = capability.value
  70. # 使用事务和 SELECT FOR UPDATE 确保并发安全
  71. pool = mysql.get_pool()
  72. with pool.connection() as conn:
  73. try:
  74. # 开始事务
  75. conn.begin()
  76. with conn.cursor(DictCursor) as cursor:
  77. # 1. 使用 SELECT FOR UPDATE 锁定配额记录(如果存在)
  78. select_sql = """
  79. SELECT quota_limit, used_count, locked
  80. FROM workflow_daily_quota
  81. WHERE quota_date = %s AND scene = %s AND capability = %s
  82. FOR UPDATE
  83. """
  84. cursor.execute(select_sql, (quota_date, scene_value, capability_value))
  85. quota_record = cursor.fetchone()
  86. # 2. 如果没找到,创建一条新记录,quota_limit 默认为 10
  87. if not quota_record:
  88. insert_sql = """
  89. INSERT INTO workflow_daily_quota (scene, capability, quota_date, quota_limit, used_count, locked)
  90. VALUES (%s, %s, %s, %s, %s, %s)
  91. """
  92. cursor.execute(insert_sql, (scene_value, capability_value, quota_date, 10, 0, 0))
  93. quota_limit = 10
  94. current_used_count = 0
  95. is_locked = 0
  96. else:
  97. quota_limit = quota_record.get('quota_limit', 10)
  98. current_used_count = quota_record.get('used_count', 0)
  99. is_locked = quota_record.get('locked', 0)
  100. # 3. 检查配额是否被锁定或已用完
  101. if is_locked == 1 or current_used_count >= quota_limit:
  102. conn.rollback()
  103. logger.warning(f"配额已锁定或已用完,scene={scene_value}, capability={capability_value}, date={quota_date}, used={current_used_count}, limit={quota_limit}")
  104. return False
  105. # 4. 查询 workflow_task 表中今天已创建的任务数(实际使用量)
  106. count_sql = """
  107. SELECT COUNT(*) as task_count
  108. FROM workflow_task
  109. WHERE scene = %s
  110. AND capability = %s
  111. AND DATE(created_time) = CURDATE()
  112. """
  113. cursor.execute(count_sql, (scene_value, capability_value))
  114. count_result = cursor.fetchone()
  115. actual_used_count = count_result.get('task_count', 0) if count_result else 0
  116. # 5. 如果实际使用量 >= 配额限制,返回 False
  117. if actual_used_count >= quota_limit:
  118. conn.rollback()
  119. logger.warning(f"配额不足,scene={scene_value}, capability={capability_value}, used={actual_used_count}, limit={quota_limit}")
  120. return False
  121. # 6. 配额充足,更新 used_count(原子操作)
  122. update_sql = """
  123. UPDATE workflow_daily_quota
  124. SET used_count = used_count + 1
  125. WHERE quota_date = %s AND scene = %s AND capability = %s
  126. AND used_count < quota_limit AND locked = 0
  127. """
  128. cursor.execute(update_sql, (quota_date, scene_value, capability_value))
  129. # 检查是否更新成功(受影响的行数)
  130. if cursor.rowcount == 0:
  131. conn.rollback()
  132. logger.warning(f"配额更新失败,可能已被其他请求占用,scene={scene_value}, capability={capability_value}")
  133. return False
  134. # 提交事务
  135. conn.commit()
  136. logger.info(f"配额检查通过,scene={scene_value}, capability={capability_value}, used={actual_used_count + 1}, limit={quota_limit}")
  137. return True
  138. except Exception as e:
  139. conn.rollback()
  140. raise e
  141. except Exception as e:
  142. logger.error(f"配额检查失败: {str(e)}")
  143. # 发生异常时,为了不影响业务,返回 True 允许继续执行
  144. return True
  145. def decode_topic(param: DecodeContentParam) -> Dict[str, Any]:
  146. """选题解构方法"""
  147. try:
  148. # 前置配额检查,用于超出每天解构次数时,直接返回错误
  149. if not _check_quota(param.scene, CapabilityEnum.DECODE):
  150. return _build_error_response(
  151. ERROR_CODE_FAILED,
  152. "配额不足"
  153. )
  154. # 步骤1: 创建工作流task任务
  155. task = _create_workflow_task(param.scene)
  156. if not task or not task.task_id:
  157. return _build_error_response(
  158. ERROR_CODE_TASK_CREATE_FAILED,
  159. "创建解构任务失败"
  160. )
  161. # 步骤2: 初始化任务结果
  162. result = _initialize_task_result(task.task_id, param.content)
  163. if not result or not result.task_id:
  164. return _build_error_response(
  165. ERROR_CODE_FAILED,
  166. "初始化任务结果失败"
  167. )
  168. # 步骤3: 触发解构工作流
  169. if not _trigger_decode_workflow(task.task_id):
  170. return _build_error_response(
  171. ERROR_CODE_FAILED,
  172. "发起解构任务失败"
  173. )
  174. # 所有步骤成功
  175. return _build_success_response(task.task_id)
  176. except Exception as e:
  177. logger.error(f"选题解构失败: {str(e)}")
  178. return _build_error_response(
  179. ERROR_CODE_TASK_CREATE_FAILED,
  180. f"解构任务创建失败: {str(e)}"
  181. )
  182. def begin_decode_task(param: DecodeContentParam) -> Dict[str, Any]:
  183. """根据场景分发任务"""
  184. if param.scene == SceneEnum.TOPIC:
  185. return decode_topic(param)
  186. elif param.scene == SceneEnum.CREATION:
  187. # TODO: 实现创作场景
  188. return _build_error_response(ERROR_CODE_FAILED, "创作场景暂未实现")
  189. elif param.scene == SceneEnum.PRODUCTION:
  190. # TODO: 实现制作场景
  191. return _build_error_response(ERROR_CODE_FAILED, "制作场景暂未实现")
  192. else:
  193. return _build_error_response(ERROR_CODE_FAILED, f"未知场景: {param.scene}")
  194. def invoke_decode_workflow(task_id: str) -> Dict[str, Any]:
  195. """发起解构任务"""
  196. try:
  197. # TODO: 调用实际的工作流接口
  198. return {
  199. "code": ERROR_CODE_SUCCESS,
  200. "task_id": task_id,
  201. "reason": ""
  202. }
  203. except Exception as e:
  204. logger.error(f"发起解构任务失败: {str(e)}")
  205. return {
  206. "code": ERROR_CODE_FAILED,
  207. "task_id": None,
  208. "reason": f"解构任务执行失败: {str(e)}"
  209. }