decode.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275
  1. from utils.params import DecodeContentParam, SceneEnum, ContentTypeEnum, CapabilityEnum
  2. from models.task import WorkflowTask
  3. from models.decode_task_result import WorkflowDecodeTaskResult
  4. from utils.sync_mysql_help import mysql
  5. from pymysql.cursors import DictCursor
  6. from loguru import logger
  7. from datetime import datetime
  8. from typing import Dict, Any, Optional
  9. import sys
  10. import requests
  11. MAX_QUOTA_LIMIT = 500
  12. logger.add(sink=sys.stderr, level="ERROR", backtrace=True, diagnose=True)
  13. # 错误码常量
  14. ERROR_CODE_SUCCESS = 0
  15. ERROR_CODE_FAILED = -1
  16. ERROR_CODE_TASK_CREATE_FAILED = 2001
  17. def _build_error_response(code: int, reason: str) -> Dict[str, Any]:
  18. """构建错误响应"""
  19. return {
  20. "code": code,
  21. "task_id": None,
  22. "reason": reason
  23. }
  24. def _build_success_response(task_id: str) -> Dict[str, Any]:
  25. """构建成功响应"""
  26. return {
  27. "code": ERROR_CODE_SUCCESS,
  28. "task_id": task_id,
  29. "reason": ""
  30. }
  31. def _create_workflow_task(scene: SceneEnum, content_type: ContentTypeEnum) -> Optional[WorkflowTask]:
  32. """创建工作流任务"""
  33. try:
  34. task = WorkflowTask.create_task(
  35. scene=scene,
  36. capability=CapabilityEnum.DECODE,
  37. content_type=content_type,
  38. root_task_id=''
  39. )
  40. logger.info(f"创建解构任务成功,task_id: {task.task_id}")
  41. return task
  42. except Exception as e:
  43. logger.error(f"创建解构任务失败: {str(e)}")
  44. return None
  45. def _initialize_task_result(task_id: str, content) -> Optional[WorkflowDecodeTaskResult]:
  46. """初始化任务结果"""
  47. try:
  48. result = WorkflowDecodeTaskResult.create_result(
  49. task_id=task_id,
  50. content=content
  51. )
  52. logger.info(f"初始化任务结果成功,task_id: {task_id}")
  53. return result
  54. except Exception as e:
  55. logger.error(f"初始化任务结果失败,task_id: {task_id}, error: {str(e)}")
  56. return None
  57. def _check_quota(scene: SceneEnum, capability: CapabilityEnum = CapabilityEnum.DECODE, content_type: ContentTypeEnum = ContentTypeEnum.TEXT) -> bool:
  58. """检查配额是否充足(并发安全版本)"""
  59. try:
  60. # 获取今天的日期,格式为 YYYYMMDD
  61. quota_date = datetime.now().strftime('%Y%m%d')
  62. scene_value = scene.value
  63. capability_value = capability.value
  64. content_type_value = content_type.value
  65. # 使用事务和 SELECT FOR UPDATE 确保并发安全
  66. pool = mysql.get_pool()
  67. with pool.connection() as conn:
  68. try:
  69. # 开始事务
  70. conn.begin()
  71. with conn.cursor(DictCursor) as cursor:
  72. # 1. 使用 SELECT FOR UPDATE 锁定配额记录(如果存在)
  73. select_sql = """
  74. SELECT quota_limit, used_count, locked
  75. FROM workflow_daily_quota
  76. WHERE quota_date = %s AND scene = %s AND capability = %s AND content_type = %s
  77. FOR UPDATE
  78. """
  79. cursor.execute(select_sql, (quota_date, scene_value, capability_value, content_type_value))
  80. quota_record = cursor.fetchone()
  81. # 2. 如果没找到,创建一条新记录,quota_limit 默认为 MAX_QUOTA_LIMIT
  82. if not quota_record:
  83. insert_sql = """
  84. INSERT INTO workflow_daily_quota (scene, capability, content_type, quota_date, quota_limit, used_count, locked)
  85. VALUES (%s, %s, %s, %s, %s, %s, %s)
  86. """
  87. cursor.execute(insert_sql, (scene_value, capability_value, content_type_value, quota_date, MAX_QUOTA_LIMIT, 0, 0))
  88. quota_limit = MAX_QUOTA_LIMIT
  89. current_used_count = 0
  90. is_locked = 0
  91. else:
  92. quota_limit = quota_record.get('quota_limit', MAX_QUOTA_LIMIT)
  93. current_used_count = quota_record.get('used_count', 0)
  94. is_locked = quota_record.get('locked', 0)
  95. # 3. 检查配额是否被锁定或已用完
  96. if is_locked == 1 or current_used_count >= quota_limit:
  97. conn.rollback()
  98. logger.warning(f"配额已锁定或已用完,scene={scene_value}, capability={capability_value}, date={quota_date}, used={current_used_count}, limit={quota_limit}")
  99. return False
  100. # 4. 查询 workflow_task 表中今天已创建的任务数(实际使用量)
  101. count_sql = """
  102. SELECT COUNT(*) as task_count
  103. FROM workflow_task
  104. WHERE scene = %s
  105. AND capability = %s
  106. AND content_type = %s
  107. AND DATE(created_time) = CURDATE()
  108. """
  109. cursor.execute(count_sql, (scene_value, capability_value, content_type_value))
  110. count_result = cursor.fetchone()
  111. actual_used_count = count_result.get('task_count', 0) if count_result else 0
  112. # 5. 如果实际使用量 >= 配额限制,返回 False
  113. if actual_used_count >= quota_limit:
  114. conn.rollback()
  115. logger.warning(f"配额不足,scene={scene_value}, capability={capability_value}, used={actual_used_count}, limit={quota_limit}")
  116. return False
  117. # 提交事务
  118. conn.commit()
  119. logger.info(f"配额检查通过,scene={scene_value}, capability={capability_value}, used={actual_used_count}, limit={quota_limit}")
  120. return True
  121. except Exception as e:
  122. conn.rollback()
  123. raise e
  124. except Exception as e:
  125. logger.error(f"配额检查失败: {str(e)}")
  126. # 发生异常时,返回False,不允许继续执行
  127. return False
  128. def decode_topic(param: DecodeContentParam) -> Dict[str, Any]:
  129. """选题解构方法"""
  130. try:
  131. # 前置配额检查,用于超出每天解构次数时,直接返回错误
  132. # if not _check_quota(param.scene, CapabilityEnum.DECODE, param.content_type):
  133. # return _build_error_response(
  134. # ERROR_CODE_FAILED,
  135. # "配额不足"
  136. # )
  137. # 步骤1: 创建工作流task任务
  138. task = _create_workflow_task(param.scene, param.content_type)
  139. if not task or not task.task_id:
  140. return _build_error_response(
  141. ERROR_CODE_TASK_CREATE_FAILED,
  142. "创建解构任务失败"
  143. )
  144. # 步骤2: 初始化任务结果
  145. result = _initialize_task_result(task.task_id, param.content)
  146. if not result or not result.task_id:
  147. return _build_error_response(
  148. ERROR_CODE_FAILED,
  149. "初始化任务结果失败"
  150. )
  151. # 步骤3: 触发解构工作流
  152. # trigger_result = _trigger_topic_decode_workflow(task.task_id)
  153. # if trigger_result.get("code") != ERROR_CODE_SUCCESS:
  154. # return _build_error_response(
  155. # ERROR_CODE_FAILED,
  156. # trigger_result.get("reason") or "发起解构任务失败"
  157. # )
  158. # 所有步骤成功
  159. return _build_success_response(task.task_id)
  160. except Exception as e:
  161. logger.error(f"选题解构失败: {str(e)}")
  162. return _build_error_response(
  163. ERROR_CODE_TASK_CREATE_FAILED,
  164. f"解构任务创建失败: {str(e)}"
  165. )
  166. def begin_decode_task(param: DecodeContentParam) -> Dict[str, Any]:
  167. """根据场景分发任务"""
  168. if param.scene == SceneEnum.TOPIC:
  169. return decode_topic(param)
  170. elif param.scene == SceneEnum.CREATION:
  171. # TODO: 实现创作场景
  172. return _build_error_response(ERROR_CODE_FAILED, "创作场景暂未实现")
  173. elif param.scene == SceneEnum.PRODUCTION:
  174. # TODO: 实现制作场景
  175. return _build_error_response(ERROR_CODE_FAILED, "制作场景暂未实现")
  176. else:
  177. return _build_error_response(ERROR_CODE_FAILED, f"未知场景: {param.scene}")
  178. def _trigger_topic_decode_workflow(task_id: str) -> Dict[str, Any]:
  179. """发起解构任务(调用上游工作流服务)"""
  180. try:
  181. # url = "http://192.168.81.96:8000/workflow/topic/decode"
  182. url = "http://supply-content-deconstruction-workflow.piaoquantv.com/decode/workflow/topic/decode"
  183. params = {"taskId": task_id}
  184. resp = requests.get(url, params=params, timeout=10)
  185. # HTTP 层异常直接视为失败
  186. if resp.status_code != 200:
  187. logger.error(
  188. f"发起解构任务失败,HTTP 状态码异常,status={resp.status_code}, task_id={task_id}"
  189. )
  190. return {
  191. "code": ERROR_CODE_FAILED,
  192. "task_id": None,
  193. "reason": f"错误: {resp.status_code}",
  194. }
  195. try:
  196. data = resp.json()
  197. except Exception as e:
  198. logger.error(f"发起解构任务失败,返回非JSON,task_id={task_id}, error={str(e)}")
  199. return {
  200. "code": ERROR_CODE_FAILED,
  201. "task_id": None,
  202. "reason": "工作流接口返回非JSON格式",
  203. }
  204. code = data.get("code", ERROR_CODE_FAILED)
  205. msg = data.get("msg", "")
  206. # 按上游协议:0 成功,其他(1001/1002/-1)为失败
  207. if code == 0:
  208. return {
  209. "code": ERROR_CODE_SUCCESS,
  210. "task_id": task_id,
  211. "reason": "",
  212. }
  213. # 将上游错误信息透传到 reason 中
  214. logger.error(
  215. f"发起解构任务失败,上游返回错误,task_id={task_id}, code={code}, msg={msg}"
  216. )
  217. return {
  218. "code": ERROR_CODE_FAILED,
  219. "task_id": None,
  220. "reason": f"工作流接口失败: code={code}, msg={msg}",
  221. }
  222. except requests.RequestException as e:
  223. # 网络类异常(超时、连接失败等)
  224. logger.error(f"发起解构任务失败,请求异常,task_id={task_id}, error={str(e)}")
  225. return {
  226. "code": ERROR_CODE_FAILED,
  227. "task_id": None,
  228. "reason": f"工作流接口请求异常: {str(e)}",
  229. }
  230. except Exception as e:
  231. logger.error(f"发起解构任务失败,task_id={task_id}, error={str(e)}")
  232. return {
  233. "code": ERROR_CODE_FAILED,
  234. "task_id": None,
  235. "reason": f"解构任务执行失败: {str(e)}",
  236. }