| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208 |
- from typing import Dict, Any, Optional, List
- from loguru import logger
- import sys
- import json
- import requests
- from utils.params import PatternContentParam, SceneEnum, ContentTypeEnum, CapabilityEnum, ContentParam
- from models.task import WorkflowTask
- from utils.sync_mysql_help import mysql
- logger.add(sink=sys.stderr, level="ERROR", backtrace=True, diagnose=True)
- ERROR_CODE_SUCCESS = 0
- ERROR_CODE_FAILED = -1
- ERROR_CODE_TASK_CREATE_FAILED = 2001
- def _build_error_response(code: int, reason: str) -> Dict[str, Any]:
- return {
- "code": code,
- "task_id": None,
- "reason": reason,
- }
- def _build_success_response(task_id: str) -> Dict[str, Any]:
- return {
- "code": ERROR_CODE_SUCCESS,
- "task_id": task_id,
- "reason": "",
- }
- def _validate_pattern_param(param: PatternContentParam) -> Optional[str]:
- """校验聚类入参的必填项"""
- if not param.pattern_name:
- return "pattern_name 不能为空"
- if not param.contents:
- return "contents 不能为空"
- for idx, content in enumerate(param.contents):
- if not content.channel_content_id:
- return f"contents[{idx}].channel_content_id 不能为空"
- if content.weight_score is None:
- return f"contents[{idx}].weight_score 不能为空"
- return None
- def _create_pattern_task(scene: SceneEnum, content_type: ContentTypeEnum) -> Optional[WorkflowTask]:
- """创建聚类 workflow_task 任务"""
- try:
- task = WorkflowTask.create_task(
- scene=scene,
- capability=CapabilityEnum.PATTERN,
- content_type=content_type,
- root_task_id="",
- )
- logger.info(f"创建聚类任务成功,task_id: {task.task_id}")
- return task
- except Exception as e:
- logger.error(f"创建聚类任务失败: {str(e)}")
- return None
- def _save_pattern_contents(task_id: str, pattern_name: str, contents: List[ContentParam]) -> bool:
- """将聚类内容写入 workflow_pattern_task_content 表"""
- sql = """
- INSERT INTO workflow_pattern_task_content (
- task_id,
- pattern_name,
- channel_content_id,
- images,
- title,
- channel_account_id,
- channel_account_name,
- body_text,
- video_url,
- weight_score
- ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
- """
- for content in contents:
- try:
- images_str = json.dumps(content.images or []) if isinstance(content.images, list) else ""
- params = (
- task_id,
- pattern_name,
- content.channel_content_id,
- images_str,
- content.title,
- content.channel_account_id,
- content.channel_account_name,
- content.body_text,
- content.video_url,
- content.weight_score,
- )
- mysql.execute(sql, params)
- except Exception as e:
- logger.error(f"写入聚类内容失败,task_id={task_id}, content_id={content.channel_content_id}, error={str(e)}")
- return False
- return True
- def _trigger_pattern_workflow(task_id: str) -> Dict[str, Any]:
- """发起真正的聚类请求,只携带 task_id"""
- try:
- url = "http://localhost:8100/pattern/workflow/topic/pattern"
- payload = {
- "task_id": task_id
- }
- resp = requests.post(url, json=payload, timeout=10)
- if resp.status_code != 200:
- logger.error(
- f"发起聚类任务失败,HTTP 状态码异常,status={resp.status_code}, task_id={task_id}"
- )
- return {
- "code": ERROR_CODE_FAILED,
- "reason": f"错误: {resp.status_code}",
- }
- try:
- data = resp.json()
- except Exception as e:
- logger.error(f"发起聚类任务失败,返回非JSON,task_id={task_id}, error={str(e)}")
- return {
- "code": ERROR_CODE_FAILED,
- "reason": "聚类工作流接口返回非JSON格式",
- }
- code = data.get("code", ERROR_CODE_FAILED)
- msg = data.get("msg", "")
- if code == 0:
- return {
- "code": ERROR_CODE_SUCCESS,
- "reason": "",
- }
- logger.error(
- f"发起聚类任务失败,上游返回错误,task_id={task_id}, code={code}, msg={msg}"
- )
- return {
- "code": ERROR_CODE_FAILED,
- "reason": f"工作流接口失败: code={code}, msg={msg}",
- }
- except requests.RequestException as e:
- logger.error(f"发起聚类任务失败,请求异常,task_id={task_id}, error={str(e)}")
- return {
- "code": ERROR_CODE_FAILED,
- "reason": f"聚类工作流接口请求异常: {str(e)}",
- }
- except Exception as e:
- logger.error(f"发起聚类任务失败,task_id={task_id}, error={str(e)}")
- return {
- "code": ERROR_CODE_FAILED,
- "reason": f"聚类任务执行失败: {str(e)}",
- }
- def begin_pattern_task(param: PatternContentParam) -> Dict[str, Any]:
- """创建聚类任务"""
- try:
- # 1. 校验必填项
- error_msg = _validate_pattern_param(param)
- if error_msg:
- return _build_error_response(ERROR_CODE_FAILED, error_msg)
- # 2. 创建 workflow_task 任务
- task = _create_pattern_task(param.scene, param.content_type)
- if not task or not task.task_id:
- return _build_error_response(
- ERROR_CODE_TASK_CREATE_FAILED,
- "创建聚类任务失败",
- )
- # 3. 将内容写入 workflow_pattern_task_content 表
- if not _save_pattern_contents(task.task_id, param.pattern_name, param.contents):
- return _build_error_response(
- ERROR_CODE_FAILED,
- "写入聚类内容失败",
- )
- # 4. 发起真正的聚类请求
- trigger_result = _trigger_pattern_workflow(task.task_id)
- if trigger_result.get("code") != ERROR_CODE_SUCCESS:
- return _build_error_response(
- ERROR_CODE_FAILED,
- trigger_result.get("reason") or "发起聚类任务失败",
- )
- # 全部成功
- return _build_success_response(task.task_id)
- except Exception as e:
- logger.error(f"聚类任务创建失败: {str(e)}")
- return _build_error_response(
- ERROR_CODE_TASK_CREATE_FAILED,
- f"聚类任务创建失败: {str(e)}",
- )
|