Jelajahi Sumber

报错处理

jihuaqiang 2 minggu lalu
induk
melakukan
355604d114
1 mengubah file dengan 112 tambahan dan 93 penghapusan
  1. 112 93
      agent.py

+ 112 - 93
agent.py

@@ -17,6 +17,9 @@ import errno
 import multiprocessing
 from typing import Any, Dict, List, Optional, TypedDict, Annotated
 from contextlib import asynccontextmanager
+
+# 设置环境变量以抑制 gRPC fork 警告
+os.environ.setdefault('GRPC_POLL_STRATEGY', 'poll')
 from utils.mysql_db import MysqlHelper
 from fastapi import FastAPI, HTTPException, BackgroundTasks
 from fastapi.responses import JSONResponse
@@ -358,6 +361,101 @@ RUNNING_LOCK = asyncio.Lock()
 # LangGraph 工作流定义
 # =========================
 
+def process_single_item(args):
+    """处理单个数据项的函数,用于多进程 (模块级,便于pickle)"""
+    idx, item, request_id = args
+    try:
+        crawl_data = item.get('crawl_data') or {}
+        content_id = item.get('content_id') or ''
+        task_id = item.get('task_id') or ''
+
+        # 先在库中查询是否已经处理过
+        check_sql = "SELECT id,status,indentify_data FROM knowledge_parsing_content WHERE request_id = %s AND content_id = %s"
+        check_result = MysqlHelper.get_values(check_sql, (request_id, content_id))
+        result_status = 0
+        result_id = 0
+        result_indentify_data = {}
+        if check_result:
+            id, status, indentify_data = check_result[0]
+            logger.info(f"查询到待结构化处理的条目,id: {id}, status: {status}, indentify_data: {str(indentify_data)[:100]}")
+            result_status = status
+            result_id = id
+            result_indentify_data = indentify_data
+            if status == 5:
+                return {
+                    "index": idx,
+                    "dbInserted": True,
+                    "identifyError": None,
+                    "status": 2,
+                    "success": True
+                }
+
+        # 0 未识别  3识别失败,需要重新进行识别
+        if result_status == 0 or result_status == 3:
+            # Step 1: 识别
+            identify_result = identify_tool.run(
+                crawl_data if isinstance(crawl_data, dict) else {}
+            )
+            
+            # Step 2: 结构化并入库
+            affected = UpdateDataTool.store_indentify_result(
+                request_id, 
+                {
+                    "content_id": content_id,
+                    "task_id": task_id
+                }, 
+                identify_result
+            )
+        else:
+            # result_indentify_data是JSON字符串,需要解析为对象
+            identify_result = json.loads(result_indentify_data) if isinstance(result_indentify_data, str) else result_indentify_data
+            affected = result_id
+        
+        # 使用StructureTool进行内容结构化处理
+        structure_tool = StructureTool()
+        structure_result = structure_tool.process_content_structure(identify_result)
+        
+        # 存储结构化解析结果
+        parsing_affected = UpdateDataTool.store_parsing_result(
+            request_id,
+            {
+                "id": affected,
+                "content_id": content_id,
+                "task_id": task_id
+            },
+            structure_result
+        )
+        logger.info(f"调试信息: affected={affected}, content_id={content_id}, result_status={result_status}")
+        ok = affected is not None and affected > 0 and parsing_affected is not None and parsing_affected > 0
+        if ok:
+            success = True
+        else:
+            success = True
+            logger.error(f"处理第 {idx} 项时出错: {identify_result.get('error') or structure_result.get('error')}")
+        
+        # 记录处理详情
+        detail = {
+            "index": idx,
+            "dbInserted": ok,
+            "identifyError": identify_result.get('error') or structure_result.get('error'),
+            "status": 2 if ok else 3,
+            "success": success
+        }
+        
+        logger.info(f"处理进度: {idx} - {'成功' if ok else '失败'}")
+        return detail
+        
+    except Exception as e:
+        logger.error(f"处理第 {idx} 项时出错: {e}")
+        return {
+            "index": idx,
+            "dbInserted": False,
+            "identifyError": str(e),
+            "status": 3,
+            "success": False
+        }
+
+
 def create_langgraph_workflow():
     """创建 LangGraph 工作流"""
     if not HAS_LANGGRAPH:
@@ -388,99 +486,7 @@ def create_langgraph_workflow():
             state["status"] = "error"
             return state
     
-    def process_single_item(args):
-        """处理单个数据项的函数,用于多进程"""
-        idx, item, request_id = args
-        try:
-            crawl_data = item.get('crawl_data') or {}
-            content_id = item.get('content_id') or ''
-            task_id = item.get('task_id') or ''
-
-            # 先在库中查询是否已经处理过
-            check_sql = "SELECT id,status,indentify_data FROM knowledge_parsing_content WHERE request_id = %s AND content_id = %s"
-            check_result = MysqlHelper.get_values(check_sql, (request_id, content_id))
-            result_status = 0
-            result_id = 0
-            result_indentify_data = {}
-            if check_result:
-                id, status, indentify_data = check_result[0]
-                logger.info(f"查询到待结构化处理的条目,id: {id}, status: {status}, indentify_data: {str(indentify_data)[:100]}")
-                result_status = status
-                result_id = id
-                result_indentify_data = indentify_data
-                if status == 5:
-                    return {
-                        "index": idx,
-                        "dbInserted": True,
-                        "identifyError": None,
-                        "status": 2,
-                        "success": True
-                    }
-
-            # 0 未识别  3识别失败,需要重新进行识别
-            if result_status == 0 or result_status == 3:
-                # Step 1: 识别
-                identify_result = identify_tool.run(
-                    crawl_data if isinstance(crawl_data, dict) else {}
-                )
-                
-                # Step 2: 结构化并入库
-                affected = UpdateDataTool.store_indentify_result(
-                    request_id, 
-                    {
-                        "content_id": content_id,
-                        "task_id": task_id
-                    }, 
-                    identify_result
-                )
-            else:
-                # result_indentify_data是JSON字符串,需要解析为对象
-                identify_result = json.loads(result_indentify_data) if isinstance(result_indentify_data, str) else result_indentify_data
-                affected = result_id
-            
-            # 使用StructureTool进行内容结构化处理
-            structure_tool = StructureTool()
-            structure_result = structure_tool.process_content_structure(identify_result)
-            
-            # 存储结构化解析结果
-            parsing_affected = UpdateDataTool.store_parsing_result(
-                request_id,
-                {
-                    "id": affected,
-                    "content_id": content_id,
-                    "task_id": task_id
-                },
-                structure_result
-            )
-            logger.info(f"调试信息: affected={affected}, content_id={content_id}, result_status={result_status}")
-            ok = affected is not None and affected > 0 and parsing_affected is not None and parsing_affected > 0
-            if ok:
-                success = True
-            else:
-                success = True
-                logger.error(f"处理第 {idx} 项时出错: {identify_result.get('error') or structure_result.get('error')}")
-            
-            # 记录处理详情
-            detail = {
-                "index": idx,
-                "dbInserted": ok,
-                "identifyError": identify_result.get('error') or structure_result.get('error'),
-                "status": 2 if ok else 3,
-                "success": success
-            }
-            
-            logger.info(f"处理进度: {idx} - {'成功' if ok else '失败'}")
-            return detail
-            
-        except Exception as e:
-            logger.error(f"处理第 {idx} 项时出错: {e}")
-            return {
-                "index": idx,
-                "dbInserted": False,
-                "identifyError": str(e),
-                "status": 3,
-                "success": False
-            }
+    
 
     def process_items_batch(state: AgentState) -> AgentState:
         """批量处理所有数据项 - 使用多进程并行处理"""
@@ -495,9 +501,22 @@ def create_langgraph_workflow():
             
             # 使用3个进程并行处理,添加多进程保护
             if __name__ == '__main__' or multiprocessing.current_process().name == 'MainProcess':
+                # 设置多进程启动方法为 'spawn' 以避免 gRPC fork 问题
+                original_start_method = multiprocessing.get_start_method()
+                try:
+                    multiprocessing.set_start_method('spawn', force=True)
+                except RuntimeError:
+                    pass  # 如果已经设置过,忽略错误
+                
                 with multiprocessing.Pool(processes=3) as pool:
                     logger.info(f"开始多进程处理: 数量={len(process_args)}")
                     results = pool.map(process_single_item, process_args)
+                
+                # 恢复原始启动方法
+                try:
+                    multiprocessing.set_start_method(original_start_method, force=True)
+                except RuntimeError:
+                    pass
             else:
                 # 如果不在主进程中,回退到串行处理
                 logger.warning("不在主进程中,回退到串行处理")