decode.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323
  1. from utils.params import DecodeContentParam, SceneEnum, ContentTypeEnum, CapabilityEnum
  2. from models.task import WorkflowTask
  3. from models.decode_task_result import WorkflowDecodeTaskResult
  4. from utils.sync_mysql_help import mysql
  5. from pymysql.cursors import DictCursor
  6. from loguru import logger
  7. from datetime import datetime
  8. from typing import Dict, Any, Optional
  9. import sys
  10. import requests
  11. MAX_QUOTA_LIMIT = 500
  12. logger.add(sink=sys.stderr, level="ERROR", backtrace=True, diagnose=True)
  13. # 错误码常量
  14. ERROR_CODE_SUCCESS = 0
  15. ERROR_CODE_FAILED = -1
  16. ERROR_CODE_TASK_CREATE_FAILED = 2001
  17. def _build_error_response(code: int, reason: str) -> Dict[str, Any]:
  18. """构建错误响应"""
  19. return {
  20. "code": code,
  21. "task_id": None,
  22. "reason": reason
  23. }
  24. def _build_success_response(task_id: str) -> Dict[str, Any]:
  25. """构建成功响应"""
  26. return {
  27. "code": ERROR_CODE_SUCCESS,
  28. "task_id": task_id,
  29. "reason": ""
  30. }
  31. def _create_workflow_task(scene: SceneEnum, content_type: ContentTypeEnum) -> Optional[WorkflowTask]:
  32. """创建工作流任务"""
  33. try:
  34. task = WorkflowTask.create_task(
  35. scene=scene,
  36. capability=CapabilityEnum.DECODE,
  37. content_type=content_type,
  38. root_task_id=''
  39. )
  40. logger.info(f"创建解构任务成功,task_id: {task.task_id}")
  41. return task
  42. except Exception as e:
  43. logger.error(f"创建解构任务失败: {str(e)}")
  44. return None
  45. def _initialize_task_result(task_id: str, content) -> Optional[WorkflowDecodeTaskResult]:
  46. """初始化选题解构任务结果"""
  47. try:
  48. result = WorkflowDecodeTaskResult.create_result(
  49. task_id=task_id,
  50. content=content
  51. )
  52. logger.info(f"初始化任务结果成功,task_id: {task_id}")
  53. return result
  54. except Exception as e:
  55. logger.error(f"初始化任务结果失败,task_id: {task_id}, error: {str(e)}")
  56. return None
  57. def _initialize_script_task_result(task_id: str, content) -> Optional[WorkflowDecodeTaskResult]:
  58. """初始化创作解构任务结果(写入 workflow_script_task_result 表)"""
  59. try:
  60. result = WorkflowDecodeTaskResult.create_result(
  61. task_id=task_id,
  62. content=content,
  63. table_name="workflow_script_task_result",
  64. )
  65. logger.info(f"初始化创作解构任务结果成功,task_id: {task_id}")
  66. return result
  67. except Exception as e:
  68. logger.error(f"初始化创作解构任务结果失败,task_id: {task_id}, error: {str(e)}")
  69. return None
  70. def _check_quota(scene: SceneEnum, capability: CapabilityEnum = CapabilityEnum.DECODE, content_type: ContentTypeEnum = ContentTypeEnum.TEXT) -> bool:
  71. """检查配额是否充足(并发安全版本)"""
  72. try:
  73. # 获取今天的日期,格式为 YYYYMMDD
  74. quota_date = datetime.now().strftime('%Y%m%d')
  75. scene_value = scene.value
  76. capability_value = capability.value
  77. content_type_value = content_type.value
  78. # 使用事务和 SELECT FOR UPDATE 确保并发安全
  79. pool = mysql.get_pool()
  80. with pool.connection() as conn:
  81. try:
  82. # 开始事务
  83. conn.begin()
  84. with conn.cursor(DictCursor) as cursor:
  85. # 1. 使用 SELECT FOR UPDATE 锁定配额记录(如果存在)
  86. select_sql = """
  87. SELECT quota_limit, used_count, locked
  88. FROM workflow_daily_quota
  89. WHERE quota_date = %s AND scene = %s AND capability = %s AND content_type = %s
  90. FOR UPDATE
  91. """
  92. cursor.execute(select_sql, (quota_date, scene_value, capability_value, content_type_value))
  93. quota_record = cursor.fetchone()
  94. # 2. 如果没找到,创建一条新记录,quota_limit 默认为 MAX_QUOTA_LIMIT
  95. if not quota_record:
  96. insert_sql = """
  97. INSERT INTO workflow_daily_quota (scene, capability, content_type, quota_date, quota_limit, used_count, locked)
  98. VALUES (%s, %s, %s, %s, %s, %s, %s)
  99. """
  100. cursor.execute(insert_sql, (scene_value, capability_value, content_type_value, quota_date, MAX_QUOTA_LIMIT, 0, 0))
  101. quota_limit = MAX_QUOTA_LIMIT
  102. current_used_count = 0
  103. is_locked = 0
  104. else:
  105. quota_limit = quota_record.get('quota_limit', MAX_QUOTA_LIMIT)
  106. current_used_count = quota_record.get('used_count', 0)
  107. is_locked = quota_record.get('locked', 0)
  108. # 3. 检查配额是否被锁定或已用完
  109. if is_locked == 1 or current_used_count >= quota_limit:
  110. conn.rollback()
  111. logger.warning(f"配额已锁定或已用完,scene={scene_value}, capability={capability_value}, date={quota_date}, used={current_used_count}, limit={quota_limit}")
  112. return False
  113. # 4. 查询 workflow_task 表中今天已创建的任务数(实际使用量)
  114. count_sql = """
  115. SELECT COUNT(*) as task_count
  116. FROM workflow_task
  117. WHERE scene = %s
  118. AND capability = %s
  119. AND content_type = %s
  120. AND DATE(created_time) = CURDATE()
  121. """
  122. cursor.execute(count_sql, (scene_value, capability_value, content_type_value))
  123. count_result = cursor.fetchone()
  124. actual_used_count = count_result.get('task_count', 0) if count_result else 0
  125. # 5. 如果实际使用量 >= 配额限制,返回 False
  126. if actual_used_count >= quota_limit:
  127. conn.rollback()
  128. logger.warning(f"配额不足,scene={scene_value}, capability={capability_value}, used={actual_used_count}, limit={quota_limit}")
  129. return False
  130. # 提交事务
  131. conn.commit()
  132. logger.info(f"配额检查通过,scene={scene_value}, capability={capability_value}, used={actual_used_count}, limit={quota_limit}")
  133. return True
  134. except Exception as e:
  135. conn.rollback()
  136. raise e
  137. except Exception as e:
  138. logger.error(f"配额检查失败: {str(e)}")
  139. # 发生异常时,返回False,不允许继续执行
  140. return False
  141. def decode_topic(param: DecodeContentParam) -> Dict[str, Any]:
  142. """选题解构方法"""
  143. try:
  144. # 前置配额检查,用于超出每天解构次数时,直接返回错误
  145. # if not _check_quota(param.scene, CapabilityEnum.DECODE, param.content_type):
  146. # return _build_error_response(
  147. # ERROR_CODE_FAILED,
  148. # "配额不足"
  149. # )
  150. # 步骤1: 创建工作流task任务
  151. task = _create_workflow_task(param.scene, param.content_type)
  152. if not task or not task.task_id:
  153. return _build_error_response(
  154. ERROR_CODE_TASK_CREATE_FAILED,
  155. "创建解构任务失败"
  156. )
  157. # 步骤2: 初始化任务结果
  158. result = _initialize_task_result(task.task_id, param.content)
  159. if not result or not result.task_id:
  160. return _build_error_response(
  161. ERROR_CODE_FAILED,
  162. "初始化任务结果失败"
  163. )
  164. # 步骤3: 触发解构工作流
  165. # trigger_result = _trigger_topic_decode_workflow(task.task_id)
  166. # if trigger_result.get("code") != ERROR_CODE_SUCCESS:
  167. # return _build_error_response(
  168. # ERROR_CODE_FAILED,
  169. # trigger_result.get("reason") or "发起解构任务失败"
  170. # )
  171. # 所有步骤成功
  172. return _build_success_response(task.task_id)
  173. except Exception as e:
  174. logger.error(f"选题解构失败: {str(e)}")
  175. return _build_error_response(
  176. ERROR_CODE_TASK_CREATE_FAILED,
  177. f"解构任务创建失败: {str(e)}"
  178. )
  179. def begin_decode_task(param: DecodeContentParam) -> Dict[str, Any]:
  180. """根据场景分发任务"""
  181. if param.scene == SceneEnum.TOPIC:
  182. # 选题解构
  183. return decode_topic(param)
  184. elif param.scene == SceneEnum.CREATION:
  185. # 创作解构:流程与选题相同,只是结果表不同
  186. return decode_creation(param)
  187. elif param.scene == SceneEnum.PRODUCTION:
  188. # TODO: 实现制作场景
  189. return _build_error_response(ERROR_CODE_FAILED, "制作场景暂未实现")
  190. else:
  191. return _build_error_response(ERROR_CODE_FAILED, f"未知场景: {param.scene}")
  192. def decode_creation(param: DecodeContentParam) -> Dict[str, Any]:
  193. """创作解构方法"""
  194. try:
  195. # 步骤1: 创建工作流task任务(scene 为 CREATION)
  196. task = _create_workflow_task(param.scene, param.content_type)
  197. if not task or not task.task_id:
  198. return _build_error_response(
  199. ERROR_CODE_TASK_CREATE_FAILED,
  200. "创建创作解构任务失败"
  201. )
  202. # 步骤2: 初始化创作解构任务结果到 workflow_script_task_result
  203. result = _initialize_script_task_result(task.task_id, param.content)
  204. if not result or not result.task_id:
  205. return _build_error_response(
  206. ERROR_CODE_FAILED,
  207. "初始化创作解构任务结果失败"
  208. )
  209. # 步骤3:(可选)触发创作解构工作流,目前按选题逻辑仅返回 task_id
  210. # 所有步骤成功
  211. return _build_success_response(task.task_id)
  212. except Exception as e:
  213. logger.error(f"创作解构失败: {str(e)}")
  214. return _build_error_response(
  215. ERROR_CODE_TASK_CREATE_FAILED,
  216. f"创作解构任务创建失败: {str(e)}"
  217. )
  218. def _trigger_topic_decode_workflow(task_id: str) -> Dict[str, Any]:
  219. """发起解构任务(调用上游工作流服务)"""
  220. try:
  221. # url = "http://192.168.81.96:8000/workflow/topic/decode"
  222. url = "http://supply-content-deconstruction-workflow.piaoquantv.com/decode/workflow/topic/decode"
  223. params = {"taskId": task_id}
  224. resp = requests.get(url, params=params, timeout=10)
  225. # HTTP 层异常直接视为失败
  226. if resp.status_code != 200:
  227. logger.error(
  228. f"发起解构任务失败,HTTP 状态码异常,status={resp.status_code}, task_id={task_id}"
  229. )
  230. return {
  231. "code": ERROR_CODE_FAILED,
  232. "task_id": None,
  233. "reason": f"错误: {resp.status_code}",
  234. }
  235. try:
  236. data = resp.json()
  237. except Exception as e:
  238. logger.error(f"发起解构任务失败,返回非JSON,task_id={task_id}, error={str(e)}")
  239. return {
  240. "code": ERROR_CODE_FAILED,
  241. "task_id": None,
  242. "reason": "工作流接口返回非JSON格式",
  243. }
  244. code = data.get("code", ERROR_CODE_FAILED)
  245. msg = data.get("msg", "")
  246. # 按上游协议:0 成功,其他(1001/1002/-1)为失败
  247. if code == 0:
  248. return {
  249. "code": ERROR_CODE_SUCCESS,
  250. "task_id": task_id,
  251. "reason": "",
  252. }
  253. # 将上游错误信息透传到 reason 中
  254. logger.error(
  255. f"发起解构任务失败,上游返回错误,task_id={task_id}, code={code}, msg={msg}"
  256. )
  257. return {
  258. "code": ERROR_CODE_FAILED,
  259. "task_id": None,
  260. "reason": f"工作流接口失败: code={code}, msg={msg}",
  261. }
  262. except requests.RequestException as e:
  263. # 网络类异常(超时、连接失败等)
  264. logger.error(f"发起解构任务失败,请求异常,task_id={task_id}, error={str(e)}")
  265. return {
  266. "code": ERROR_CODE_FAILED,
  267. "task_id": None,
  268. "reason": f"工作流接口请求异常: {str(e)}",
  269. }
  270. except Exception as e:
  271. logger.error(f"发起解构任务失败,task_id={task_id}, error={str(e)}")
  272. return {
  273. "code": ERROR_CODE_FAILED,
  274. "task_id": None,
  275. "reason": f"解构任务执行失败: {str(e)}",
  276. }