jihuaqiang преди 1 ден
родител
ревизия
2f8960eee8
променени са 1 файла, в които са добавени 83 реда и са изтрити 43 реда
  1. 83 43
      tasks/decode.py

+ 83 - 43
tasks/decode.py

@@ -2,6 +2,7 @@ from utils.params import DecodeContentParam, SceneEnum, ContentTypeEnum, Capabil
 from models.task import WorkflowTask
 from models.decode_task_result import WorkflowDecodeTaskResult
 from utils.sync_mysql_help import mysql
+from pymysql.cursors import DictCursor
 from loguru import logger
 import sys
 from datetime import datetime
@@ -74,55 +75,94 @@ def _trigger_decode_workflow(task_id: str) -> bool:
 
 
 def _check_quota(scene: SceneEnum, capability: CapabilityEnum = CapabilityEnum.DECODE) -> bool:
-    """检查配额是否充足"""
+    """检查配额是否充足(并发安全版本)"""
     try:
         # 获取今天的日期,格式为 YYYYMMDD
         quota_date = datetime.now().strftime('%Y%m%d')
         scene_value = scene.value
         capability_value = capability.value
         
-        # 1. 查询 workflow_daily_quota 表,查找今天的配额记录
-        sql = """
-            SELECT quota_limit, used_count, locked 
-            FROM workflow_daily_quota 
-            WHERE quota_date = %s AND scene = %s AND capability = %s
-        """
-        quota_record = mysql.fetchone(sql, (quota_date, scene_value, capability_value))
-        
-        # 2. 如果没找到,创建一条新记录,quota_limit 默认为 10
-        if not quota_record:
-            insert_sql = """
-                INSERT INTO workflow_daily_quota (scene, capability, quota_date, quota_limit, used_count, locked)
-                VALUES (%s, %s, %s, %s, %s, %s)
-            """
-            mysql.execute(insert_sql, (scene_value, capability_value, quota_date, 10, 0, 0))
-            quota_limit = 10
-        else:
-            quota_limit = quota_record.get('quota_limit', 10)
-            # 如果配额被锁定,直接返回 False
-            if quota_record.get('locked', 0) == 1 or quota_record.get('used_count', 0) >= quota_record.get('quota_limit', 10):
-                logger.warning(f"配额已锁定,scene={scene_value}, capability={capability_value}, date={quota_date}")
-                return False
-        
-        # 3. 查询 workflow_task 表中今天已创建的任务数
-        count_sql = """
-            SELECT COUNT(*) as task_count 
-            FROM workflow_task 
-            WHERE scene = %s 
-            AND capability = %s 
-            AND DATE(created_time) = CURDATE()
-        """
-        count_result = mysql.fetchone(count_sql, (scene_value, capability_value))
-        used_count = count_result.get('task_count', 0) if count_result else 0
-        
-        # 4. 如果已创建数 < quota_limit,返回 True,否则返回 False
-        if used_count < quota_limit:
-            logger.info(f"配额检查通过,scene={scene_value}, capability={capability_value}, used={used_count}, limit={quota_limit}")
-            return True
-        else:
-            logger.warning(f"配额不足,scene={scene_value}, capability={capability_value}, used={used_count}, limit={quota_limit}")
-            return False
-            
+        # 使用事务和 SELECT FOR UPDATE 确保并发安全
+        pool = mysql.get_pool()
+        with pool.connection() as conn:
+            try:
+                # 开始事务
+                conn.begin()
+                
+                with conn.cursor(DictCursor) as cursor:
+                    # 1. 使用 SELECT FOR UPDATE 锁定配额记录(如果存在)
+                    select_sql = """
+                        SELECT quota_limit, used_count, locked 
+                        FROM workflow_daily_quota 
+                        WHERE quota_date = %s AND scene = %s AND capability = %s
+                        FOR UPDATE
+                    """
+                    cursor.execute(select_sql, (quota_date, scene_value, capability_value))
+                    quota_record = cursor.fetchone()
+                    
+                    # 2. 如果没找到,创建一条新记录,quota_limit 默认为 10
+                    if not quota_record:
+                        insert_sql = """
+                            INSERT INTO workflow_daily_quota (scene, capability, quota_date, quota_limit, used_count, locked)
+                            VALUES (%s, %s, %s, %s, %s, %s)
+                        """
+                        cursor.execute(insert_sql, (scene_value, capability_value, quota_date, 10, 0, 0))
+                        quota_limit = 10
+                        current_used_count = 0
+                        is_locked = 0
+                    else:
+                        quota_limit = quota_record.get('quota_limit', 10)
+                        current_used_count = quota_record.get('used_count', 0)
+                        is_locked = quota_record.get('locked', 0)
+                    
+                    # 3. 检查配额是否被锁定或已用完
+                    if is_locked == 1 or current_used_count >= quota_limit:
+                        conn.rollback()
+                        logger.warning(f"配额已锁定或已用完,scene={scene_value}, capability={capability_value}, date={quota_date}, used={current_used_count}, limit={quota_limit}")
+                        return False
+                    
+                    # 4. 查询 workflow_task 表中今天已创建的任务数(实际使用量)
+                    count_sql = """
+                        SELECT COUNT(*) as task_count 
+                        FROM workflow_task 
+                        WHERE scene = %s 
+                        AND capability = %s 
+                        AND DATE(created_time) = CURDATE()
+                    """
+                    cursor.execute(count_sql, (scene_value, capability_value))
+                    count_result = cursor.fetchone()
+                    actual_used_count = count_result.get('task_count', 0) if count_result else 0
+                    
+                    # 5. 如果实际使用量 >= 配额限制,返回 False
+                    if actual_used_count >= quota_limit:
+                        conn.rollback()
+                        logger.warning(f"配额不足,scene={scene_value}, capability={capability_value}, used={actual_used_count}, limit={quota_limit}")
+                        return False
+                    
+                    # 6. 配额充足,更新 used_count(原子操作)
+                    update_sql = """
+                        UPDATE workflow_daily_quota 
+                        SET used_count = used_count + 1 
+                        WHERE quota_date = %s AND scene = %s AND capability = %s
+                        AND used_count < quota_limit AND locked = 0
+                    """
+                    cursor.execute(update_sql, (quota_date, scene_value, capability_value))
+                    
+                    # 检查是否更新成功(受影响的行数)
+                    if cursor.rowcount == 0:
+                        conn.rollback()
+                        logger.warning(f"配额更新失败,可能已被其他请求占用,scene={scene_value}, capability={capability_value}")
+                        return False
+                    
+                    # 提交事务
+                    conn.commit()
+                    logger.info(f"配额检查通过,scene={scene_value}, capability={capability_value}, used={actual_used_count + 1}, limit={quota_limit}")
+                    return True
+                    
+            except Exception as e:
+                conn.rollback()
+                raise e
+                
     except Exception as e:
         logger.error(f"配额检查失败: {str(e)}")
         # 发生异常时,为了不影响业务,返回 True 允许继续执行