瀏覽代碼

knowledge_parsing_content

jihuaqiang 23 小時之前
父節點
當前提交
88ae140010
共有 5 個文件被更改,包括 393 次插入20 次删除
  1. 1 1
      agent.py
  2. 0 17
      agents/expand_agent/agent.py
  3. 2 2
      tools/agent_tools.py
  4. 227 0
      tools/handle_429_error.py
  5. 163 0
      tools/update_parsing_status.py

+ 1 - 1
agent.py

@@ -420,7 +420,7 @@ def create_langgraph_workflow():
                             success_count += 1
                             continue
 
-                    # result_status == 0 表示为处理过,需要进行识别和结构化
+                    # 0 未识别  3识别失败,需要重新进行识别
                     if result_status == 0 or result_status == 3:
                         # Step 1: 识别
                         identify_result = identify_tool.run(

+ 0 - 17
agents/expand_agent/agent.py

@@ -125,21 +125,6 @@ def _run_llm(prompt: str) -> List[str]:
         logger.error(f"LLM调用失败: {e}")
         return []
 
-
-def _heuristic_expand(input_query: str) -> List[str]:
-    """启发式扩展(LLM不可用时的fallback)"""
-    base = input_query.strip()
-    if not base:
-        return []
-        
-    return [
-        base,
-        f"{base} 教程",
-        f"{base} 实战",
-        f"{base} 入门指南",
-        f"{base} 高级技巧"
-    ]
-
 def _update_expansion_status(requestId: str, status: int):
     sql = "UPDATE knowledge_request SET expansion_status = %s WHERE request_id = %s"
     MysqlHelper.update_values(sql, (status, requestId))
@@ -181,8 +166,6 @@ def execute_expand_agent_with_api(requestId: str, query: str = "") -> Dict[str,
                     continue
                 prompt = _build_prompt(sample["data"], query)
                 expanded = _run_llm(prompt)
-                if not expanded:
-                    expanded = _heuristic_expand(query)
                 try:
                     expand_querys_json = json.dumps(expanded, ensure_ascii=False)
                     MysqlHelper.insert_and_get_id(insert_sql, (requestId, expand_querys_json, query, sample["content_id"], sample["parsing_id"]))

+ 2 - 2
tools/agent_tools.py

@@ -272,7 +272,7 @@ class UpdateDataTool:
                 existing_id = exists[0][0] if isinstance(exists, list) and len(exists) > 0 else None
                 update_sql = (
                     "UPDATE knowledge_parsing_content "
-                    "SET indentify_data = %s, task_id = %s, status = %s "
+                    "SET indentify_data = %s, task_id = %s, status = %s, update_time = NOW() "
                     "WHERE request_id = %s AND content_id = %s"
                 )
                 update_params = (
@@ -337,7 +337,7 @@ class UpdateDataTool:
             # 更新数据
             sql = (
                 "UPDATE knowledge_parsing_content "
-                "SET parsing_data = %s, status = %s "
+                "SET parsing_data = %s, status = %s, update_time = NOW() "
                 "WHERE content_id = %s AND id = %s"
             )
             

+ 227 - 0
tools/handle_429_error.py

@@ -0,0 +1,227 @@
+# -*- coding: utf-8 -*-
+"""
+处理429错误的数据清理工具
+执行 SELECT * FROM `knowledge_parsing_content` WHERE `parsing_data` LIKE "%429 You exceeded your current quota%" OR `indentify_data`  LIKE "%429 You exceeded your current quota%"
+查找出结果之后,将结果逐条进行处理,如果是parsing_data,则将parsing_data字段设置为空,如果是indentify_data,则将indentify_data字段和parsing_data字段设置为空,
+并且将status字段设置为3
+再取出该条中的request_id字段,在knowledge_request中将该request_id的parsing_status、extraction_status和expansion_status字段都设置为3
+"""
+
+import sys
+import os
+sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
+
+from utils.mysql_db import MysqlHelper
+from loguru import logger
+
+
+class Handle429Error:
+    def __init__(self):
+        self.logger = logger
+        
+    def find_429_error_records(self):
+        """
+        查找包含429错误的记录
+        """
+        sql = """
+        SELECT id, content_id, request_id, task_id, parsing_data, indentify_data, status 
+        FROM knowledge_parsing_content 
+        WHERE parsing_data LIKE %s OR indentify_data LIKE %s
+        """
+        params = ("%429 You exceeded your current quota%", "%429 You exceeded your current quota%")
+        
+        try:
+            records = MysqlHelper.get_values(sql, params)
+            self.logger.info(f"找到 {len(records)} 条包含429错误的记录")
+            return records
+        except Exception as e:
+            self.logger.error(f"查询429错误记录失败: {e}")
+            return []
+    
+    def process_parsing_data_error(self, record_id, request_id):
+        """
+        处理parsing_data字段包含429错误的情况
+        将parsing_data字段设置为空,status设置为2
+        """
+        sql = "UPDATE knowledge_parsing_content SET parsing_data = NULL, status = 2 WHERE id = %s"
+        params = (record_id,)
+        
+        try:
+            result = MysqlHelper.update_values(sql, params)
+            if result:
+                self.logger.info(f"处理parsing_data错误成功: record_id={record_id}")
+                # 更新knowledge_request表的parsing_status
+                self.update_request_status(request_id, 3)
+                return True
+            return False
+        except Exception as e:
+            self.logger.error(f"处理parsing_data错误失败: {e}")
+            return False
+    
+    def process_indentify_data_error(self, record_id, request_id):
+        """
+        处理indentify_data字段包含429错误的情况
+        将indentify_data字段和parsing_data字段设置为空,status设置为3
+        """
+        sql = "UPDATE knowledge_parsing_content SET indentify_data = NULL, parsing_data = NULL, status = 3 WHERE id = %s"
+        params = (record_id,)
+        
+        try:
+            result = MysqlHelper.update_values(sql, params)
+            if result:
+                self.logger.info(f"处理indentify_data错误成功: record_id={record_id}")
+                # 更新knowledge_request表的parsing_status
+                self.update_request_status(request_id, 3)
+                return True
+            return False
+        except Exception as e:
+            self.logger.error(f"处理indentify_data错误失败: {e}")
+            return False
+    
+    def update_request_status(self, request_id, status):
+        """
+        更新knowledge_request表的parsing_status、extraction_status和expansion_status字段
+        """
+        sql = "UPDATE knowledge_request SET parsing_status = %s, extraction_status = %s, expansion_status = %s WHERE request_id = %s"
+        params = (status, status, status, request_id)
+        
+        try:
+            result = MysqlHelper.update_values(sql, params)
+            if result:
+                self.logger.info(f"更新request状态成功: request_id={request_id}, parsing_status={status}, extraction_status={status}, expansion_status={status}")
+                return True
+            return False
+        except Exception as e:
+            self.logger.error(f"更新request状态失败: {e}")
+            return False
+    
+    def batch_process_parsing_data_error(self, record_ids):
+        """
+        批量处理parsing_data字段包含429错误的情况
+        """
+        if not record_ids:
+            return 0
+            
+        # 构建批量更新的SQL
+        placeholders = ','.join(['%s'] * len(record_ids))
+        sql = f"UPDATE knowledge_parsing_content SET parsing_data = NULL, status = 2, update_time = NOW() WHERE id IN ({placeholders})"
+        
+        try:
+            result = MysqlHelper.update_values(sql, record_ids)
+            if result:
+                self.logger.info(f"批量处理parsing_data错误成功: 更新了 {result} 条记录")
+                return result
+            return 0
+        except Exception as e:
+            self.logger.error(f"批量处理parsing_data错误失败: {e}")
+            return 0
+    
+    def batch_process_indentify_data_error(self, record_ids):
+        """
+        批量处理indentify_data字段包含429错误的情况
+        """
+        if not record_ids:
+            return 0
+            
+        # 构建批量更新的SQL
+        placeholders = ','.join(['%s'] * len(record_ids))
+        sql = f"UPDATE knowledge_parsing_content SET indentify_data = NULL, parsing_data = NULL, status = 3, update_time = NOW() WHERE id IN ({placeholders})"
+        
+        try:
+            result = MysqlHelper.update_values(sql, record_ids)
+            if result:
+                self.logger.info(f"批量处理indentify_data错误成功: 更新了 {result} 条记录")
+                return result
+            return 0
+        except Exception as e:
+            self.logger.error(f"批量处理indentify_data错误失败: {e}")
+            return 0
+    
+    def batch_update_request_status(self, request_ids, status):
+        """
+        批量更新knowledge_request表的状态字段
+        """
+        if not request_ids:
+            return 0
+            
+        # 构建批量更新的SQL
+        placeholders = ','.join(['%s'] * len(request_ids))
+        sql = f"UPDATE knowledge_request SET parsing_status = %s, extraction_status = %s, expansion_status = %s WHERE request_id IN ({placeholders})"
+        params = [status, status, status] + request_ids
+        
+        try:
+            result = MysqlHelper.update_values(sql, params)
+            if result:
+                self.logger.info(f"批量更新request状态成功: 更新了 {result} 条记录,status={status}")
+                return result
+            return 0
+        except Exception as e:
+            self.logger.error(f"批量更新request状态失败: {e}")
+            return 0
+    
+    def process_records(self, records):
+        """
+        处理所有包含429错误的记录 - 使用批量操作提高性能
+        """
+        parsing_data_records = []
+        indentify_data_records = []
+        request_ids_to_update = set()
+        
+        # 分类记录
+        for record in records:
+            record_id, content_id, request_id, task_id, parsing_data, indentify_data, status = record
+            
+            self.logger.info(f"分析记录: id={record_id}, content_id={content_id}, request_id={request_id}")
+            
+            # 检查indentify_data是否包含429错误
+            if indentify_data and "429 You exceeded your current quota" in indentify_data:
+                indentify_data_records.append(record_id)
+                request_ids_to_update.add(request_id)
+                self.logger.info(f"记录 {record_id}: indentify_data包含429错误")
+            
+            # 检查parsing_data是否包含429错误
+            if parsing_data and "429 You exceeded your current quota" in parsing_data:
+                parsing_data_records.append(record_id)
+                request_ids_to_update.add(request_id)
+                self.logger.info(f"记录 {record_id}: parsing_data包含429错误")
+        
+        success_count = 0
+        error_count = 0
+        
+        # 批量处理parsing_data错误
+        if parsing_data_records:
+            success_count += self.batch_process_parsing_data_error(parsing_data_records)
+        
+        # 批量处理indentify_data错误
+        if indentify_data_records:
+            success_count += self.batch_process_indentify_data_error(indentify_data_records)
+        
+        # 批量更新request状态
+        if request_ids_to_update:
+            self.batch_update_request_status(list(request_ids_to_update), 3)
+        
+        self.logger.info(f"处理完成: 成功 {success_count} 条,失败 {error_count} 条")
+        return success_count, error_count
+    
+    def run(self):
+        """
+        执行429错误处理的主流程
+        """
+        self.logger.info("开始处理429错误...")
+        
+        # 查找包含429错误的记录
+        records = self.find_429_error_records()
+        
+        if not records:
+            self.logger.info("没有找到包含429错误的记录")
+            return
+        
+        # 处理所有记录
+        success_count, error_count = self.process_records(records)
+        
+        self.logger.info(f"429错误处理完成: 成功 {success_count} 条,失败 {error_count} 条")
+
+
+if __name__ == "__main__":
+    handler = Handle429Error()
+    handler.run()

+ 163 - 0
tools/update_parsing_status.py

@@ -0,0 +1,163 @@
+# -*- coding: utf-8 -*-
+"""
+更新解析状态的工具
+找出knowledge_parsing_content表中status=3或者=2的记录,依次进行以下判断设置
+如果indentify_data为空,则设置status为3
+如果indentify_data不为空,则设置status为2,
+如果parsing_data为空且indentify_data为空,则设置status为3
+如果parsing_data为空且indentify_data不为空,则设置status为2
+如果parsing_data不为空,则设置status为5
+"""
+
+import sys
+import os
+sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
+
+from utils.mysql_db import MysqlHelper
+from loguru import logger
+
+
+class UpdateParsingStatus:
+    def __init__(self):
+        self.logger = logger
+        
+    def find_status_records(self):
+        """
+        查找status为3或2的记录
+        """
+        sql = """
+        SELECT id, content_id, request_id, task_id, parsing_data, indentify_data, status 
+        FROM knowledge_parsing_content 
+        WHERE status = 3 OR status = 2
+        """
+        
+        try:
+            records = MysqlHelper.get_values(sql)
+            self.logger.info(f"找到 {len(records)} 条status为3或2的记录")
+            return records
+        except Exception as e:
+            self.logger.error(f"查询status为3或2的记录失败: {e}")
+            return []
+    
+    def update_record_status(self, record_id, new_status):
+        """
+        更新记录的status字段
+        """
+        sql = "UPDATE knowledge_parsing_content SET status = %s WHERE id = %s"
+        params = (new_status, record_id)
+        
+        try:
+            result = MysqlHelper.update_values(sql, params)
+            if result:
+                self.logger.info(f"更新记录状态成功: record_id={record_id}, status={new_status}")
+                return True
+            return False
+        except Exception as e:
+            self.logger.error(f"更新记录状态失败: {e}")
+            return False
+    
+    def batch_update_status(self, record_ids, new_status):
+        """
+        批量更新记录的status字段
+        """
+        if not record_ids:
+            return 0
+            
+        # 构建批量更新的SQL
+        placeholders = ','.join(['%s'] * len(record_ids))
+        sql = f"UPDATE knowledge_parsing_content SET status = %s, update_time = NOW() WHERE id IN ({placeholders})"
+        params = [new_status] + record_ids
+        
+        try:
+            result = MysqlHelper.update_values(sql, params)
+            if result:
+                self.logger.info(f"批量更新status成功: 更新了 {result} 条记录,status={new_status}")
+                return result
+            return 0
+        except Exception as e:
+            self.logger.error(f"批量更新status失败: {e}")
+            return 0
+    
+    def process_records(self, records):
+        """
+        处理所有status为3或2的记录 - 使用批量操作提高性能
+        """
+        status_2_records = []
+        status_3_records = []
+        status_5_records = []
+        
+        # 分类记录
+        for record in records:
+            record_id, content_id, request_id, task_id, parsing_data, indentify_data, status = record
+            
+            self.logger.info(f"分析记录: id={record_id}, content_id={content_id}, request_id={request_id}")
+            
+            # 检查indentify_data是否为空
+            indentify_data_empty = indentify_data is None or indentify_data.strip() == ""
+            # 检查indentify_data是否不为空
+            indentify_data_not_empty = indentify_data is not None and indentify_data.strip() != ""
+            # 检查parsing_data是否为空
+            parsing_data_empty = parsing_data is None or parsing_data.strip() == ""
+            # 检查parsing_data是否不为空
+            parsing_data_not_empty = parsing_data is not None and parsing_data.strip() != ""
+            
+            new_status = None
+            
+            # 如果parsing_data为空且indentify_data为空,则设置status为3
+            if parsing_data_empty and indentify_data_empty:
+                new_status = 3
+                status_3_records.append(record_id)
+                self.logger.info(f"记录 {record_id}: parsing_data为空且indentify_data为空,设置status为3")
+            
+            # 如果parsing_data为空且indentify_data不为空,则设置status为2
+            if parsing_data_empty and indentify_data_not_empty:
+                new_status = 2
+                status_2_records.append(record_id)
+                self.logger.info(f"记录 {record_id}: parsing_data为空且indentify_data不为空,设置status为2")
+            
+            # 如果parsing_data不为空,则设置status为5
+            if parsing_data_not_empty:
+                new_status = 5
+                status_5_records.append(record_id)
+                self.logger.info(f"记录 {record_id}: parsing_data不为空,设置status为5")
+        
+        success_count = 0
+        
+        # 批量更新status为2的记录
+        if status_2_records:
+            success_count += self.batch_update_status(status_2_records, 2)
+        
+        # 批量更新status为3的记录
+        if status_3_records:
+            success_count += self.batch_update_status(status_3_records, 3)
+        
+        # 批量更新status为5的记录
+        if status_5_records:
+            success_count += self.batch_update_status(status_5_records, 5)
+        
+        self.logger.info(f"处理完成: 成功 {success_count} 条")
+        self.logger.info(f"状态更新统计: 更新为status=2的有 {len(status_2_records)} 条,更新为status=3的有 {len(status_3_records)} 条,更新为status=5的有 {len(status_5_records)} 条")
+        return success_count, 0
+    
+    def run(self):
+        """
+        执行状态更新的主流程
+        """
+        self.logger.info("开始更新解析状态...")
+        
+        # 查找status为3或2的记录
+        records = self.find_status_records()
+        
+        if not records:
+            self.logger.info("没有找到status为3或2的记录")
+            return
+        
+        # 处理所有记录
+        success_count, error_count = self.process_records(records)
+        
+        self.logger.info(f"解析状态更新完成: 成功 {success_count} 条,失败 {error_count} 条")
+
+
+if __name__ == "__main__":
+    updater = UpdateParsingStatus()
+    updater.run()