| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270 |
- from utils.params import DecodeContentParam, SceneEnum, ContentTypeEnum, CapabilityEnum
- from models.task import WorkflowTask
- from models.decode_task_result import WorkflowDecodeTaskResult
- from utils.sync_mysql_help import mysql
- from pymysql.cursors import DictCursor
- from loguru import logger
- from datetime import datetime
- from typing import Dict, Any, Optional
- import sys
- import requests
- logger.add(sink=sys.stderr, level="ERROR", backtrace=True, diagnose=True)
- # 错误码常量
- ERROR_CODE_SUCCESS = 0
- ERROR_CODE_FAILED = -1
- ERROR_CODE_TASK_CREATE_FAILED = 2001
- def _build_error_response(code: int, reason: str) -> Dict[str, Any]:
- """构建错误响应"""
- return {
- "code": code,
- "task_id": None,
- "reason": reason
- }
- def _build_success_response(task_id: str) -> Dict[str, Any]:
- """构建成功响应"""
- return {
- "code": ERROR_CODE_SUCCESS,
- "task_id": task_id,
- "reason": ""
- }
- def _create_workflow_task(scene: SceneEnum) -> Optional[WorkflowTask]:
- """创建工作流任务"""
- try:
- task = WorkflowTask.create_task(
- scene=scene,
- capability=CapabilityEnum.DECODE,
- root_task_id=''
- )
- logger.info(f"创建解构任务成功,task_id: {task.task_id}")
- return task
- except Exception as e:
- logger.error(f"创建解构任务失败: {str(e)}")
- return None
- def _initialize_task_result(task_id: str, content) -> Optional[WorkflowDecodeTaskResult]:
- """初始化任务结果"""
- try:
- result = WorkflowDecodeTaskResult.create_result(
- task_id=task_id,
- content=content
- )
- logger.info(f"初始化任务结果成功,task_id: {task_id}")
- return result
- except Exception as e:
- logger.error(f"初始化任务结果失败,task_id: {task_id}, error: {str(e)}")
- return None
- def _check_quota(scene: SceneEnum, capability: CapabilityEnum = CapabilityEnum.DECODE) -> bool:
- """检查配额是否充足(并发安全版本)"""
- try:
- # 获取今天的日期,格式为 YYYYMMDD
- quota_date = datetime.now().strftime('%Y%m%d')
- scene_value = scene.value
- capability_value = capability.value
-
- # 使用事务和 SELECT FOR UPDATE 确保并发安全
- pool = mysql.get_pool()
- with pool.connection() as conn:
- try:
- # 开始事务
- conn.begin()
-
- with conn.cursor(DictCursor) as cursor:
- # 1. 使用 SELECT FOR UPDATE 锁定配额记录(如果存在)
- select_sql = """
- SELECT quota_limit, used_count, locked
- FROM workflow_daily_quota
- WHERE quota_date = %s AND scene = %s AND capability = %s
- FOR UPDATE
- """
- cursor.execute(select_sql, (quota_date, scene_value, capability_value))
- quota_record = cursor.fetchone()
-
- # 2. 如果没找到,创建一条新记录,quota_limit 默认为 10
- if not quota_record:
- insert_sql = """
- INSERT INTO workflow_daily_quota (scene, capability, quota_date, quota_limit, used_count, locked)
- VALUES (%s, %s, %s, %s, %s, %s)
- """
- cursor.execute(insert_sql, (scene_value, capability_value, quota_date, 10, 0, 0))
- quota_limit = 10
- current_used_count = 0
- is_locked = 0
- else:
- quota_limit = quota_record.get('quota_limit', 10)
- current_used_count = quota_record.get('used_count', 0)
- is_locked = quota_record.get('locked', 0)
-
- # 3. 检查配额是否被锁定或已用完
- if is_locked == 1 or current_used_count >= quota_limit:
- conn.rollback()
- logger.warning(f"配额已锁定或已用完,scene={scene_value}, capability={capability_value}, date={quota_date}, used={current_used_count}, limit={quota_limit}")
- return False
-
- # 4. 查询 workflow_task 表中今天已创建的任务数(实际使用量)
- count_sql = """
- SELECT COUNT(*) as task_count
- FROM workflow_task
- WHERE scene = %s
- AND capability = %s
- AND DATE(created_time) = CURDATE()
- """
- cursor.execute(count_sql, (scene_value, capability_value))
- count_result = cursor.fetchone()
- actual_used_count = count_result.get('task_count', 0) if count_result else 0
-
- # 5. 如果实际使用量 >= 配额限制,返回 False
- if actual_used_count >= quota_limit:
- conn.rollback()
- logger.warning(f"配额不足,scene={scene_value}, capability={capability_value}, used={actual_used_count}, limit={quota_limit}")
- return False
-
- # 提交事务
- conn.commit()
- logger.info(f"配额检查通过,scene={scene_value}, capability={capability_value}, used={actual_used_count}, limit={quota_limit}")
- return True
-
- except Exception as e:
- conn.rollback()
- raise e
-
- except Exception as e:
- logger.error(f"配额检查失败: {str(e)}")
- # 发生异常时,返回False,不允许继续执行
- return False
- def decode_topic(param: DecodeContentParam) -> Dict[str, Any]:
- """选题解构方法"""
- try:
- # 前置配额检查,用于超出每天解构次数时,直接返回错误
- if not _check_quota(param.scene, CapabilityEnum.DECODE):
- return _build_error_response(
- ERROR_CODE_FAILED,
- "配额不足"
- )
- # 步骤1: 创建工作流task任务
- task = _create_workflow_task(param.scene)
- if not task or not task.task_id:
- return _build_error_response(
- ERROR_CODE_TASK_CREATE_FAILED,
- "创建解构任务失败"
- )
-
- # 步骤2: 初始化任务结果
- result = _initialize_task_result(task.task_id, param.content)
- if not result or not result.task_id:
- return _build_error_response(
- ERROR_CODE_FAILED,
- "初始化任务结果失败"
- )
-
- # 步骤3: 触发解构工作流
- if not _trigger_topic_decode_workflow(task.task_id):
- return _build_error_response(
- ERROR_CODE_FAILED,
- "发起解构任务失败"
- )
-
- # 所有步骤成功
- return _build_success_response(task.task_id)
-
- except Exception as e:
- logger.error(f"选题解构失败: {str(e)}")
- return _build_error_response(
- ERROR_CODE_TASK_CREATE_FAILED,
- f"解构任务创建失败: {str(e)}"
- )
- def begin_decode_task(param: DecodeContentParam) -> Dict[str, Any]:
- """根据场景分发任务"""
- if param.scene == SceneEnum.TOPIC:
- return decode_topic(param)
- elif param.scene == SceneEnum.CREATION:
- # TODO: 实现创作场景
- return _build_error_response(ERROR_CODE_FAILED, "创作场景暂未实现")
- elif param.scene == SceneEnum.PRODUCTION:
- # TODO: 实现制作场景
- return _build_error_response(ERROR_CODE_FAILED, "制作场景暂未实现")
- else:
- return _build_error_response(ERROR_CODE_FAILED, f"未知场景: {param.scene}")
- def _trigger_topic_decode_workflow(task_id: str) -> Dict[str, Any]:
- """发起解构任务(调用上游工作流服务)"""
- try:
- # url = "http://192.168.81.96:8000/workflow/topic/decode"
- url = "http://supply-content-deconstruction-workflow.piaoquantv.com/workflow/topic/decode"
- params = {"taskId": task_id}
- resp = requests.get(url, params=params, timeout=10)
- # HTTP 层异常直接视为失败
- if resp.status_code != 200:
- logger.error(
- f"发起解构任务失败,HTTP 状态码异常,status={resp.status_code}, task_id={task_id}"
- )
- return {
- "code": ERROR_CODE_FAILED,
- "task_id": None,
- "reason": f"错误: {resp.status_code}",
- }
- try:
- data = resp.json()
- except Exception as e:
- logger.error(f"发起解构任务失败,返回非JSON,task_id={task_id}, error={str(e)}")
- return {
- "code": ERROR_CODE_FAILED,
- "task_id": None,
- "reason": "工作流接口返回非JSON格式",
- }
- code = data.get("code", ERROR_CODE_FAILED)
- msg = data.get("msg", "")
- # 按上游协议:0 成功,其他(1001/1002/-1)为失败
- if code == 0:
- return {
- "code": ERROR_CODE_SUCCESS,
- "task_id": task_id,
- "reason": "",
- }
- # 将上游错误信息透传到 reason 中
- logger.error(
- f"发起解构任务失败,上游返回错误,task_id={task_id}, code={code}, msg={msg}"
- )
- return {
- "code": ERROR_CODE_FAILED,
- "task_id": None,
- "reason": f"工作流接口失败: code={code}, msg={msg}",
- }
- except requests.RequestException as e:
- # 网络类异常(超时、连接失败等)
- logger.error(f"发起解构任务失败,请求异常,task_id={task_id}, error={str(e)}")
- return {
- "code": ERROR_CODE_FAILED,
- "task_id": None,
- "reason": f"工作流接口请求异常: {str(e)}",
- }
- except Exception as e:
- logger.error(f"发起解构任务失败,task_id={task_id}, error={str(e)}")
- return {
- "code": ERROR_CODE_FAILED,
- "task_id": None,
- "reason": f"解构任务执行失败: {str(e)}",
- }
|