jihuaqiang 2 недель назад
Родитель
Сommit
b50fee9d7d
1 измененных файлов с 113 добавлено и 89 удалено
  1. 113 89
      agent.py

+ 113 - 89
agent.py

@@ -14,6 +14,7 @@ import asyncio
 import concurrent.futures
 import fcntl
 import errno
+import multiprocessing
 from typing import Any, Dict, List, Optional, TypedDict, Annotated
 from contextlib import asynccontextmanager
 from utils.mysql_db import MysqlHelper
@@ -387,107 +388,130 @@ def create_langgraph_workflow():
             state["status"] = "error"
             return state
     
+    def process_single_item(args):
+        """处理单个数据项的函数,用于多进程"""
+        idx, item, request_id = args
+        try:
+            crawl_data = item.get('crawl_data') or {}
+            content_id = item.get('content_id') or ''
+            task_id = item.get('task_id') or ''
+
+            # 先在库中查询是否已经处理过
+            check_sql = "SELECT id,status,indentify_data FROM knowledge_parsing_content WHERE request_id = %s AND content_id = %s"
+            check_result = MysqlHelper.get_values(check_sql, (request_id, content_id))
+            result_status = 0
+            result_id = 0
+            result_indentify_data = {}
+            if check_result:
+                id, status, indentify_data = check_result[0]
+                logger.info(f"查询到待结构化处理的条目,id: {id}, status: {status}, indentify_data: {str(indentify_data)[:100]}")
+                result_status = status
+                result_id = id
+                result_indentify_data = indentify_data
+                if status == 5:
+                    return {
+                        "index": idx,
+                        "dbInserted": True,
+                        "identifyError": None,
+                        "status": 2,
+                        "success": True
+                    }
+
+            # 0 未识别  3识别失败,需要重新进行识别
+            if result_status == 0 or result_status == 3:
+                # Step 1: 识别
+                identify_result = identify_tool.run(
+                    crawl_data if isinstance(crawl_data, dict) else {}
+                )
+                
+                # Step 2: 结构化并入库
+                affected = UpdateDataTool.store_indentify_result(
+                    request_id, 
+                    {
+                        "content_id": content_id,
+                        "task_id": task_id
+                    }, 
+                    identify_result
+                )
+            else:
+                # result_indentify_data是JSON字符串,需要解析为对象
+                identify_result = json.loads(result_indentify_data) if isinstance(result_indentify_data, str) else result_indentify_data
+                affected = result_id
+            
+            # 使用StructureTool进行内容结构化处理
+            structure_tool = StructureTool()
+            structure_result = structure_tool.process_content_structure(identify_result)
+            
+            # 存储结构化解析结果
+            parsing_affected = UpdateDataTool.store_parsing_result(
+                request_id,
+                {
+                    "id": affected,
+                    "content_id": content_id,
+                    "task_id": task_id
+                },
+                structure_result
+            )
+            logger.info(f"调试信息: affected={affected}, content_id={content_id}, result_status={result_status}")
+            ok = affected is not None and affected > 0 and parsing_affected is not None and parsing_affected > 0
+            if ok:
+                success = True
+            else:
+                success = True
+                logger.error(f"处理第 {idx} 项时出错: {identify_result.get('error') or structure_result.get('error')}")
+            
+            # 记录处理详情
+            detail = {
+                "index": idx,
+                "dbInserted": ok,
+                "identifyError": identify_result.get('error') or structure_result.get('error'),
+                "status": 2 if ok else 3,
+                "success": success
+            }
+            
+            logger.info(f"处理进度: {idx} - {'成功' if ok else '失败'}")
+            return detail
+            
+        except Exception as e:
+            logger.error(f"处理第 {idx} 项时出错: {e}")
+            return {
+                "index": idx,
+                "dbInserted": False,
+                "identifyError": str(e),
+                "status": 3,
+                "success": False
+            }
+
     def process_items_batch(state: AgentState) -> AgentState:
-        """批量处理所有数据项"""
+        """批量处理所有数据项 - 使用多进程并行处理"""
         try:
             items = state["items"]
             if not items:
                 state["status"] = "completed"
                 return state
             
-            success_count = 0
-            details = []
-            
-            for idx, item in enumerate(items, start=1):
-                try:
-                    crawl_data = item.get('crawl_data') or {}
-                    content_id = item.get('content_id') or ''
-                    task_id = item.get('task_id') or ''
-
-                    # 先在库中查询是否已经处理过
-                    check_sql = "SELECT id,status,indentify_data FROM knowledge_parsing_content WHERE request_id = %s AND content_id = %s"
-                    check_result = MysqlHelper.get_values(check_sql, (state["request_id"], content_id))
-                    result_status = 0
-                    result_id = 0
-                    result_indentify_data = {}
-                    if check_result:
-                        id, status, indentify_data = check_result[0]
-                        logger.info(f"查询到待结构化处理的条目,id: {id}, status: {status}, indentify_data: {str(indentify_data)[:100]}")
-                        result_status = status
-                        result_id = id
-                        result_indentify_data = indentify_data
-                        if status == 5:
-                            success_count += 1
-                            continue
-
-                    # 0 未识别  3识别失败,需要重新进行识别
-                    if result_status == 0 or result_status == 3:
-                        # Step 1: 识别
-                        identify_result = identify_tool.run(
-                            crawl_data if isinstance(crawl_data, dict) else {}
-                        )
-                        
-                        # Step 2: 结构化并入库
-                        affected = UpdateDataTool.store_indentify_result(
-                            state["request_id"], 
-                            {
-                                "content_id": content_id,
-                                "task_id": task_id
-                            }, 
-                            identify_result
-                        )
-                    else:
-                        # result_indentify_data是JSON字符串,需要解析为对象
-                        identify_result = json.loads(result_indentify_data) if isinstance(result_indentify_data, str) else result_indentify_data
-                        affected = result_id
-                    
-                    # 使用StructureTool进行内容结构化处理
-                    structure_tool = StructureTool()
-                    structure_result = structure_tool.process_content_structure(identify_result)
-                    
-                    # 存储结构化解析结果
-                    parsing_affected = UpdateDataTool.store_parsing_result(
-                        state["request_id"],
-                        {
-                            "id": affected,
-                            "content_id": content_id,
-                            "task_id": task_id
-                        },
-                        structure_result
-                    )
-                    logger.info(f"调试信息: affected={affected}, content_id={content_id}, result_status={result_status}")
-                    ok = affected is not None and affected > 0 and parsing_affected is not None and parsing_affected > 0
-                    if ok:
-                        success_count += 1
-                    else:
-                        success_count += 1
-                        logger.error(f"处理第 {idx} 项时出错: {identify_result.get('error') or structure_result.get('error')}")
-                    
-                    # 记录处理详情
-                    detail = {
-                        "index": idx,
-                        "dbInserted": ok,
-                        "identifyError": identify_result.get('error') or structure_result.get('error'),
-                        "status": 2 if ok else 3
-                    }
-                    details.append(detail)
-                    
-                    logger.info(f"处理进度: {idx}/{len(items)} - {'成功' if ok else '失败'}")
-                    
-                except Exception as e:
-                    logger.error(f"处理第 {idx} 项时出错: {e}")
-                    detail = {
-                        "index": idx,
-                        "dbInserted": False,
-                        "identifyError": str(e),
-                        "status": 3
-                    }
-                    details.append(detail)
+            # 准备多进程参数
+            process_args = [(idx, item, state["request_id"]) for idx, item in enumerate(items, start=1)]
+            
+            # 使用3个进程并行处理,添加多进程保护
+            if __name__ == '__main__' or multiprocessing.current_process().name == 'MainProcess':
+                with multiprocessing.Pool(processes=3) as pool:
+                    logger.info(f"开始多进程处理: 数量={len(process_args)}")
+                    results = pool.map(process_single_item, process_args)
+            else:
+                # 如果不在主进程中,回退到串行处理
+                logger.warning("不在主进程中,回退到串行处理")
+                results = [process_single_item(args) for args in process_args]
+            
+            # 统计结果
+            success_count = sum(1 for result in results if result.get("success", False))
+            details = [result for result in results]
             
             state["success"] = success_count
             state["details"] = details
             state["status"] = "completed"
             
+            logger.info(f"多进程处理完成: 成功 {success_count}/{len(items)} 项")
             return state
             
         except Exception as e: