decode.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270
  1. from utils.params import DecodeContentParam, SceneEnum, ContentTypeEnum, CapabilityEnum
  2. from models.task import WorkflowTask
  3. from models.decode_task_result import WorkflowDecodeTaskResult
  4. from utils.sync_mysql_help import mysql
  5. from pymysql.cursors import DictCursor
  6. from loguru import logger
  7. from datetime import datetime
  8. from typing import Dict, Any, Optional
  9. import sys
  10. import requests
  11. logger.add(sink=sys.stderr, level="ERROR", backtrace=True, diagnose=True)
  12. # 错误码常量
  13. ERROR_CODE_SUCCESS = 0
  14. ERROR_CODE_FAILED = -1
  15. ERROR_CODE_TASK_CREATE_FAILED = 2001
  16. def _build_error_response(code: int, reason: str) -> Dict[str, Any]:
  17. """构建错误响应"""
  18. return {
  19. "code": code,
  20. "task_id": None,
  21. "reason": reason
  22. }
  23. def _build_success_response(task_id: str) -> Dict[str, Any]:
  24. """构建成功响应"""
  25. return {
  26. "code": ERROR_CODE_SUCCESS,
  27. "task_id": task_id,
  28. "reason": ""
  29. }
  30. def _create_workflow_task(scene: SceneEnum) -> Optional[WorkflowTask]:
  31. """创建工作流任务"""
  32. try:
  33. task = WorkflowTask.create_task(
  34. scene=scene,
  35. capability=CapabilityEnum.DECODE,
  36. root_task_id=''
  37. )
  38. logger.info(f"创建解构任务成功,task_id: {task.task_id}")
  39. return task
  40. except Exception as e:
  41. logger.error(f"创建解构任务失败: {str(e)}")
  42. return None
  43. def _initialize_task_result(task_id: str, content) -> Optional[WorkflowDecodeTaskResult]:
  44. """初始化任务结果"""
  45. try:
  46. result = WorkflowDecodeTaskResult.create_result(
  47. task_id=task_id,
  48. content=content
  49. )
  50. logger.info(f"初始化任务结果成功,task_id: {task_id}")
  51. return result
  52. except Exception as e:
  53. logger.error(f"初始化任务结果失败,task_id: {task_id}, error: {str(e)}")
  54. return None
  55. def _check_quota(scene: SceneEnum, capability: CapabilityEnum = CapabilityEnum.DECODE) -> bool:
  56. """检查配额是否充足(并发安全版本)"""
  57. try:
  58. # 获取今天的日期,格式为 YYYYMMDD
  59. quota_date = datetime.now().strftime('%Y%m%d')
  60. scene_value = scene.value
  61. capability_value = capability.value
  62. # 使用事务和 SELECT FOR UPDATE 确保并发安全
  63. pool = mysql.get_pool()
  64. with pool.connection() as conn:
  65. try:
  66. # 开始事务
  67. conn.begin()
  68. with conn.cursor(DictCursor) as cursor:
  69. # 1. 使用 SELECT FOR UPDATE 锁定配额记录(如果存在)
  70. select_sql = """
  71. SELECT quota_limit, used_count, locked
  72. FROM workflow_daily_quota
  73. WHERE quota_date = %s AND scene = %s AND capability = %s
  74. FOR UPDATE
  75. """
  76. cursor.execute(select_sql, (quota_date, scene_value, capability_value))
  77. quota_record = cursor.fetchone()
  78. # 2. 如果没找到,创建一条新记录,quota_limit 默认为 10
  79. if not quota_record:
  80. insert_sql = """
  81. INSERT INTO workflow_daily_quota (scene, capability, quota_date, quota_limit, used_count, locked)
  82. VALUES (%s, %s, %s, %s, %s, %s)
  83. """
  84. cursor.execute(insert_sql, (scene_value, capability_value, quota_date, 10, 0, 0))
  85. quota_limit = 10
  86. current_used_count = 0
  87. is_locked = 0
  88. else:
  89. quota_limit = quota_record.get('quota_limit', 10)
  90. current_used_count = quota_record.get('used_count', 0)
  91. is_locked = quota_record.get('locked', 0)
  92. # 3. 检查配额是否被锁定或已用完
  93. if is_locked == 1 or current_used_count >= quota_limit:
  94. conn.rollback()
  95. logger.warning(f"配额已锁定或已用完,scene={scene_value}, capability={capability_value}, date={quota_date}, used={current_used_count}, limit={quota_limit}")
  96. return False
  97. # 4. 查询 workflow_task 表中今天已创建的任务数(实际使用量)
  98. count_sql = """
  99. SELECT COUNT(*) as task_count
  100. FROM workflow_task
  101. WHERE scene = %s
  102. AND capability = %s
  103. AND DATE(created_time) = CURDATE()
  104. """
  105. cursor.execute(count_sql, (scene_value, capability_value))
  106. count_result = cursor.fetchone()
  107. actual_used_count = count_result.get('task_count', 0) if count_result else 0
  108. # 5. 如果实际使用量 >= 配额限制,返回 False
  109. if actual_used_count >= quota_limit:
  110. conn.rollback()
  111. logger.warning(f"配额不足,scene={scene_value}, capability={capability_value}, used={actual_used_count}, limit={quota_limit}")
  112. return False
  113. # 提交事务
  114. conn.commit()
  115. logger.info(f"配额检查通过,scene={scene_value}, capability={capability_value}, used={actual_used_count}, limit={quota_limit}")
  116. return True
  117. except Exception as e:
  118. conn.rollback()
  119. raise e
  120. except Exception as e:
  121. logger.error(f"配额检查失败: {str(e)}")
  122. # 发生异常时,返回False,不允许继续执行
  123. return False
  124. def decode_topic(param: DecodeContentParam) -> Dict[str, Any]:
  125. """选题解构方法"""
  126. try:
  127. # 前置配额检查,用于超出每天解构次数时,直接返回错误
  128. if not _check_quota(param.scene, CapabilityEnum.DECODE):
  129. return _build_error_response(
  130. ERROR_CODE_FAILED,
  131. "配额不足"
  132. )
  133. # 步骤1: 创建工作流task任务
  134. task = _create_workflow_task(param.scene)
  135. if not task or not task.task_id:
  136. return _build_error_response(
  137. ERROR_CODE_TASK_CREATE_FAILED,
  138. "创建解构任务失败"
  139. )
  140. # 步骤2: 初始化任务结果
  141. result = _initialize_task_result(task.task_id, param.content)
  142. if not result or not result.task_id:
  143. return _build_error_response(
  144. ERROR_CODE_FAILED,
  145. "初始化任务结果失败"
  146. )
  147. # 步骤3: 触发解构工作流
  148. if not _trigger_topic_decode_workflow(task.task_id):
  149. return _build_error_response(
  150. ERROR_CODE_FAILED,
  151. "发起解构任务失败"
  152. )
  153. # 所有步骤成功
  154. return _build_success_response(task.task_id)
  155. except Exception as e:
  156. logger.error(f"选题解构失败: {str(e)}")
  157. return _build_error_response(
  158. ERROR_CODE_TASK_CREATE_FAILED,
  159. f"解构任务创建失败: {str(e)}"
  160. )
  161. def begin_decode_task(param: DecodeContentParam) -> Dict[str, Any]:
  162. """根据场景分发任务"""
  163. if param.scene == SceneEnum.TOPIC:
  164. return decode_topic(param)
  165. elif param.scene == SceneEnum.CREATION:
  166. # TODO: 实现创作场景
  167. return _build_error_response(ERROR_CODE_FAILED, "创作场景暂未实现")
  168. elif param.scene == SceneEnum.PRODUCTION:
  169. # TODO: 实现制作场景
  170. return _build_error_response(ERROR_CODE_FAILED, "制作场景暂未实现")
  171. else:
  172. return _build_error_response(ERROR_CODE_FAILED, f"未知场景: {param.scene}")
  173. def _trigger_topic_decode_workflow(task_id: str) -> Dict[str, Any]:
  174. """发起解构任务(调用上游工作流服务)"""
  175. try:
  176. url = "http://192.168.81.96:8000/workflow/topic/decode"
  177. # url = "http://supply-content-deconstruction-workflow.piaoquantv.com/workflow/topic/decode"
  178. params = {"taskId": task_id}
  179. resp = requests.get(url, params=params, timeout=10)
  180. # HTTP 层异常直接视为失败
  181. if resp.status_code != 200:
  182. logger.error(
  183. f"发起解构任务失败,HTTP 状态码异常,status={resp.status_code}, task_id={task_id}"
  184. )
  185. return {
  186. "code": ERROR_CODE_FAILED,
  187. "task_id": None,
  188. "reason": f"错误: {resp.status_code}",
  189. }
  190. try:
  191. data = resp.json()
  192. except Exception as e:
  193. logger.error(f"发起解构任务失败,返回非JSON,task_id={task_id}, error={str(e)}")
  194. return {
  195. "code": ERROR_CODE_FAILED,
  196. "task_id": None,
  197. "reason": "工作流接口返回非JSON格式",
  198. }
  199. code = data.get("code", ERROR_CODE_FAILED)
  200. msg = data.get("msg", "")
  201. # 按上游协议:0 成功,其他(1001/1002/-1)为失败
  202. if code == 0:
  203. return {
  204. "code": ERROR_CODE_SUCCESS,
  205. "task_id": task_id,
  206. "reason": "",
  207. }
  208. # 将上游错误信息透传到 reason 中
  209. logger.error(
  210. f"发起解构任务失败,上游返回错误,task_id={task_id}, code={code}, msg={msg}"
  211. )
  212. return {
  213. "code": ERROR_CODE_FAILED,
  214. "task_id": None,
  215. "reason": f"工作流接口失败: code={code}, msg={msg}",
  216. }
  217. except requests.RequestException as e:
  218. # 网络类异常(超时、连接失败等)
  219. logger.error(f"发起解构任务失败,请求异常,task_id={task_id}, error={str(e)}")
  220. return {
  221. "code": ERROR_CODE_FAILED,
  222. "task_id": None,
  223. "reason": f"工作流接口请求异常: {str(e)}",
  224. }
  225. except Exception as e:
  226. logger.error(f"发起解构任务失败,task_id={task_id}, error={str(e)}")
  227. return {
  228. "code": ERROR_CODE_FAILED,
  229. "task_id": None,
  230. "reason": f"解构任务执行失败: {str(e)}",
  231. }