pattern.py 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251
  1. from typing import Dict, Any, Optional, List
  2. from loguru import logger
  3. import sys
  4. import json
  5. import requests
  6. from utils.params import PatternContentParam, SceneEnum, ContentTypeEnum, CapabilityEnum, ContentParam
  7. from models.task import WorkflowTask
  8. from utils.sync_mysql_help import mysql
  9. logger.add(sink=sys.stderr, level="ERROR", backtrace=True, diagnose=True)
  10. ERROR_CODE_SUCCESS = 0
  11. ERROR_CODE_FAILED = -1
  12. ERROR_CODE_TASK_CREATE_FAILED = 2001
  13. def _build_error_response(code: int, reason: str) -> Dict[str, Any]:
  14. return {
  15. "code": code,
  16. "task_id": None,
  17. "reason": reason,
  18. }
  19. def _build_success_response(task_id: str) -> Dict[str, Any]:
  20. return {
  21. "code": ERROR_CODE_SUCCESS,
  22. "task_id": task_id,
  23. "reason": "",
  24. }
  25. def _validate_pattern_param(param: PatternContentParam) -> Optional[str]:
  26. """校验聚类入参的必填项"""
  27. if not param.pattern_name:
  28. return "pattern_name 不能为空"
  29. if not param.contents:
  30. return "contents 不能为空"
  31. for idx, content in enumerate(param.contents):
  32. if not content.channel_content_id:
  33. return f"contents[{idx}].channel_content_id 不能为空"
  34. if content.weight_score is None:
  35. return f"contents[{idx}].weight_score 不能为空"
  36. return None
  37. def _validate_decode_status(contents: List[ContentParam]) -> Optional[str]:
  38. """校验每个channel_content_id的解构状态"""
  39. STATUS_SUCCESS = 2 # 成功状态
  40. for content in contents:
  41. channel_content_id = content.channel_content_id
  42. # 查询workflow_decode_task_result表,获取最新的解构任务记录
  43. sql = """
  44. SELECT task_id
  45. FROM workflow_decode_task_result
  46. WHERE channel_content_id = %s
  47. ORDER BY created_time DESC
  48. LIMIT 1
  49. """
  50. result_record = mysql.fetchone(sql, (channel_content_id,))
  51. if not result_record:
  52. return f"channel_content_id {channel_content_id} 找不到解构结果"
  53. task_id = result_record.get("task_id")
  54. if not task_id:
  55. return f"channel_content_id {channel_content_id} 找不到解构结果"
  56. # 查询workflow_task表,获取任务状态
  57. task_sql = "SELECT status FROM workflow_task WHERE task_id = %s"
  58. task_record = mysql.fetchone(task_sql, (task_id,))
  59. if not task_record:
  60. return f"channel_content_id {channel_content_id} 找不到解构结果"
  61. status = task_record.get("status")
  62. if status != STATUS_SUCCESS:
  63. return f"channel_content_id {channel_content_id} 找不到解构结果"
  64. return None
  65. def _create_pattern_task(scene: SceneEnum, content_type: ContentTypeEnum) -> Optional[WorkflowTask]:
  66. """创建聚类 workflow_task 任务"""
  67. try:
  68. task = WorkflowTask.create_task(
  69. scene=scene,
  70. capability=CapabilityEnum.PATTERN,
  71. content_type=content_type,
  72. root_task_id="",
  73. )
  74. logger.info(f"创建聚类任务成功,task_id: {task.task_id}")
  75. return task
  76. except Exception as e:
  77. logger.error(f"创建聚类任务失败: {str(e)}")
  78. return None
  79. def _save_pattern_contents(task_id: str, pattern_name: str, contents: List[ContentParam]) -> bool:
  80. """将聚类内容写入 workflow_pattern_task_content 表"""
  81. sql = """
  82. INSERT INTO workflow_pattern_task_content (
  83. task_id,
  84. pattern_name,
  85. channel_content_id,
  86. images,
  87. title,
  88. channel_account_id,
  89. channel_account_name,
  90. body_text,
  91. video_url,
  92. weight_score
  93. ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
  94. """
  95. for content in contents:
  96. try:
  97. images_str = json.dumps(content.images or []) if isinstance(content.images, list) else ""
  98. params = (
  99. task_id,
  100. pattern_name,
  101. content.channel_content_id,
  102. images_str,
  103. content.title,
  104. content.channel_account_id,
  105. content.channel_account_name,
  106. content.body_text,
  107. content.video_url,
  108. content.weight_score,
  109. )
  110. mysql.execute(sql, params)
  111. except Exception as e:
  112. logger.error(f"写入聚类内容失败,task_id={task_id}, content_id={content.channel_content_id}, error={str(e)}")
  113. return False
  114. return True
  115. def _trigger_pattern_workflow(task_id: str) -> Dict[str, Any]:
  116. """发起真正的聚类请求,只携带 task_id"""
  117. try:
  118. url = "http://supply-content-deconstruction-workflow.piaoquantv.com/pattern/workflow/topic/pattern"
  119. payload = {
  120. "task_id": task_id
  121. }
  122. resp = requests.post(url, json=payload, timeout=10)
  123. if resp.status_code != 200:
  124. logger.error(
  125. f"发起聚类任务失败,HTTP 状态码异常,status={resp.status_code}, task_id={task_id}"
  126. )
  127. return {
  128. "code": ERROR_CODE_FAILED,
  129. "reason": f"错误: {resp.status_code}",
  130. }
  131. try:
  132. data = resp.json()
  133. except Exception as e:
  134. logger.error(f"发起聚类任务失败,返回非JSON,task_id={task_id}, error={str(e)}")
  135. return {
  136. "code": ERROR_CODE_FAILED,
  137. "reason": "聚类工作流接口返回非JSON格式",
  138. }
  139. code = data.get("code", ERROR_CODE_FAILED)
  140. msg = data.get("msg", "")
  141. if code == 0:
  142. return {
  143. "code": ERROR_CODE_SUCCESS,
  144. "reason": "",
  145. }
  146. logger.error(
  147. f"发起聚类任务失败,上游返回错误,task_id={task_id}, code={code}, msg={msg}"
  148. )
  149. return {
  150. "code": ERROR_CODE_FAILED,
  151. "reason": f"工作流接口失败: code={code}, msg={msg}",
  152. }
  153. except requests.RequestException as e:
  154. logger.error(f"发起聚类任务失败,请求异常,task_id={task_id}, error={str(e)}")
  155. return {
  156. "code": ERROR_CODE_FAILED,
  157. "reason": f"聚类工作流接口请求异常: {str(e)}",
  158. }
  159. except Exception as e:
  160. logger.error(f"发起聚类任务失败,task_id={task_id}, error={str(e)}")
  161. return {
  162. "code": ERROR_CODE_FAILED,
  163. "reason": f"聚类任务执行失败: {str(e)}",
  164. }
  165. def begin_pattern_task(param: PatternContentParam) -> Dict[str, Any]:
  166. """创建聚类任务"""
  167. try:
  168. # 1. 校验必填项
  169. error_msg = _validate_pattern_param(param)
  170. if error_msg:
  171. return _build_error_response(ERROR_CODE_FAILED, error_msg)
  172. # 1.1 校验解构状态
  173. error_msg = _validate_decode_status(param.contents)
  174. if error_msg:
  175. return _build_error_response(ERROR_CODE_FAILED, error_msg)
  176. # 2. 创建 workflow_task 任务
  177. task = _create_pattern_task(param.scene, param.content_type)
  178. if not task or not task.task_id:
  179. return _build_error_response(
  180. ERROR_CODE_TASK_CREATE_FAILED,
  181. "创建聚类任务失败",
  182. )
  183. # 3. 将内容写入 workflow_pattern_task_content 表
  184. if not _save_pattern_contents(task.task_id, param.pattern_name, param.contents):
  185. return _build_error_response(
  186. ERROR_CODE_FAILED,
  187. "写入聚类内容失败",
  188. )
  189. # 4. 发起真正的聚类请求
  190. trigger_result = _trigger_pattern_workflow(task.task_id)
  191. if trigger_result.get("code") != ERROR_CODE_SUCCESS:
  192. return _build_error_response(
  193. ERROR_CODE_FAILED,
  194. trigger_result.get("reason") or "发起聚类任务失败",
  195. )
  196. # 全部成功
  197. return _build_success_response(task.task_id)
  198. except Exception as e:
  199. logger.error(f"聚类任务创建失败: {str(e)}")
  200. return _build_error_response(
  201. ERROR_CODE_TASK_CREATE_FAILED,
  202. f"聚类任务创建失败: {str(e)}",
  203. )