from utils.params import DecodeContentParam, SceneEnum, ContentTypeEnum, CapabilityEnum from models.task import WorkflowTask from models.decode_task_result import WorkflowDecodeTaskResult from utils.sync_mysql_help import mysql from pymysql.cursors import DictCursor from loguru import logger import sys from datetime import datetime from typing import Dict, Any, Optional logger.add(sink=sys.stderr, level="ERROR", backtrace=True, diagnose=True) # 错误码常量 ERROR_CODE_SUCCESS = 0 ERROR_CODE_FAILED = -1 ERROR_CODE_TASK_CREATE_FAILED = 2001 def _build_error_response(code: int, reason: str) -> Dict[str, Any]: """构建错误响应""" return { "code": code, "task_id": None, "reason": reason } def _build_success_response(task_id: str) -> Dict[str, Any]: """构建成功响应""" return { "code": ERROR_CODE_SUCCESS, "task_id": task_id, "reason": "" } def _create_workflow_task(scene: SceneEnum) -> Optional[WorkflowTask]: """创建工作流任务""" try: task = WorkflowTask.create_task( scene=scene, capability=CapabilityEnum.DECODE, root_task_id='' ) logger.info(f"创建解构任务成功,task_id: {task.task_id}") return task except Exception as e: logger.error(f"创建解构任务失败: {str(e)}") return None def _initialize_task_result(task_id: str, content) -> Optional[WorkflowDecodeTaskResult]: """初始化任务结果""" try: result = WorkflowDecodeTaskResult.create_result( task_id=task_id, content=content ) logger.info(f"初始化任务结果成功,task_id: {task_id}") return result except Exception as e: logger.error(f"初始化任务结果失败,task_id: {task_id}, error: {str(e)}") return None def _trigger_decode_workflow(task_id: str) -> bool: """触发解构工作流""" try: invoke_decode_workflow(task_id) logger.info(f"发起解构任务成功,task_id: {task_id}") return True except Exception as e: logger.error(f"发起解构任务失败,task_id: {task_id}, error: {str(e)}") return False def _check_quota(scene: SceneEnum, capability: CapabilityEnum = CapabilityEnum.DECODE) -> bool: """检查配额是否充足(并发安全版本)""" try: # 获取今天的日期,格式为 YYYYMMDD quota_date = datetime.now().strftime('%Y%m%d') scene_value = scene.value capability_value = capability.value # 使用事务和 SELECT FOR UPDATE 确保并发安全 pool = mysql.get_pool() with pool.connection() as conn: try: # 开始事务 conn.begin() with conn.cursor(DictCursor) as cursor: # 1. 使用 SELECT FOR UPDATE 锁定配额记录(如果存在) select_sql = """ SELECT quota_limit, used_count, locked FROM workflow_daily_quota WHERE quota_date = %s AND scene = %s AND capability = %s FOR UPDATE """ cursor.execute(select_sql, (quota_date, scene_value, capability_value)) quota_record = cursor.fetchone() # 2. 如果没找到,创建一条新记录,quota_limit 默认为 10 if not quota_record: insert_sql = """ INSERT INTO workflow_daily_quota (scene, capability, quota_date, quota_limit, used_count, locked) VALUES (%s, %s, %s, %s, %s, %s) """ cursor.execute(insert_sql, (scene_value, capability_value, quota_date, 10, 0, 0)) quota_limit = 10 current_used_count = 0 is_locked = 0 else: quota_limit = quota_record.get('quota_limit', 10) current_used_count = quota_record.get('used_count', 0) is_locked = quota_record.get('locked', 0) # 3. 检查配额是否被锁定或已用完 if is_locked == 1 or current_used_count >= quota_limit: conn.rollback() logger.warning(f"配额已锁定或已用完,scene={scene_value}, capability={capability_value}, date={quota_date}, used={current_used_count}, limit={quota_limit}") return False # 4. 查询 workflow_task 表中今天已创建的任务数(实际使用量) count_sql = """ SELECT COUNT(*) as task_count FROM workflow_task WHERE scene = %s AND capability = %s AND DATE(created_time) = CURDATE() """ cursor.execute(count_sql, (scene_value, capability_value)) count_result = cursor.fetchone() actual_used_count = count_result.get('task_count', 0) if count_result else 0 # 5. 如果实际使用量 >= 配额限制,返回 False if actual_used_count >= quota_limit: conn.rollback() logger.warning(f"配额不足,scene={scene_value}, capability={capability_value}, used={actual_used_count}, limit={quota_limit}") return False # 6. 配额充足,更新 used_count(原子操作) update_sql = """ UPDATE workflow_daily_quota SET used_count = used_count + 1 WHERE quota_date = %s AND scene = %s AND capability = %s AND used_count < quota_limit AND locked = 0 """ cursor.execute(update_sql, (quota_date, scene_value, capability_value)) # 检查是否更新成功(受影响的行数) if cursor.rowcount == 0: conn.rollback() logger.warning(f"配额更新失败,可能已被其他请求占用,scene={scene_value}, capability={capability_value}") return False # 提交事务 conn.commit() logger.info(f"配额检查通过,scene={scene_value}, capability={capability_value}, used={actual_used_count + 1}, limit={quota_limit}") return True except Exception as e: conn.rollback() raise e except Exception as e: logger.error(f"配额检查失败: {str(e)}") # 发生异常时,为了不影响业务,返回 True 允许继续执行 return True def decode_topic(param: DecodeContentParam) -> Dict[str, Any]: """选题解构方法""" try: # 前置配额检查,用于超出每天解构次数时,直接返回错误 if not _check_quota(param.scene, CapabilityEnum.DECODE): return _build_error_response( ERROR_CODE_FAILED, "配额不足" ) # 步骤1: 创建工作流task任务 task = _create_workflow_task(param.scene) if not task or not task.task_id: return _build_error_response( ERROR_CODE_TASK_CREATE_FAILED, "创建解构任务失败" ) # 步骤2: 初始化任务结果 result = _initialize_task_result(task.task_id, param.content) if not result or not result.task_id: return _build_error_response( ERROR_CODE_FAILED, "初始化任务结果失败" ) # 步骤3: 触发解构工作流 if not _trigger_decode_workflow(task.task_id): return _build_error_response( ERROR_CODE_FAILED, "发起解构任务失败" ) # 所有步骤成功 return _build_success_response(task.task_id) except Exception as e: logger.error(f"选题解构失败: {str(e)}") return _build_error_response( ERROR_CODE_TASK_CREATE_FAILED, f"解构任务创建失败: {str(e)}" ) def begin_decode_task(param: DecodeContentParam) -> Dict[str, Any]: """根据场景分发任务""" if param.scene == SceneEnum.TOPIC: return decode_topic(param) elif param.scene == SceneEnum.CREATION: # TODO: 实现创作场景 return _build_error_response(ERROR_CODE_FAILED, "创作场景暂未实现") elif param.scene == SceneEnum.PRODUCTION: # TODO: 实现制作场景 return _build_error_response(ERROR_CODE_FAILED, "制作场景暂未实现") else: return _build_error_response(ERROR_CODE_FAILED, f"未知场景: {param.scene}") def invoke_decode_workflow(task_id: str) -> Dict[str, Any]: """发起解构任务""" try: # TODO: 调用实际的工作流接口 return { "code": ERROR_CODE_SUCCESS, "task_id": task_id, "reason": "" } except Exception as e: logger.error(f"发起解构任务失败: {str(e)}") return { "code": ERROR_CODE_FAILED, "task_id": None, "reason": f"解构任务执行失败: {str(e)}" }