|
@@ -30,7 +30,7 @@ try:
|
|
|
HAS_LANGGRAPH = True
|
|
|
except ImportError:
|
|
|
HAS_LANGGRAPH = False
|
|
|
- print("警告: LangGraph 未安装,将使用传统模式")
|
|
|
+ print("警告: LangGraph 未安装")
|
|
|
|
|
|
from utils.logging_config import get_logger
|
|
|
from tools.agent_tools import QueryDataTool, IdentifyTool, UpdateDataTool, StructureTool
|
|
@@ -346,96 +346,7 @@ async def parse_processing(request: TriggerRequest, background_tasks: Background
|
|
|
success=final_state.get("success", 0),
|
|
|
details=final_state.get("details", [])
|
|
|
)
|
|
|
-
|
|
|
- else:
|
|
|
- # 回退到传统模式
|
|
|
- logger.info("使用传统模式处理")
|
|
|
-
|
|
|
- # 更新状态为处理中
|
|
|
- update_request_status(request.requestId, 1)
|
|
|
-
|
|
|
- # 获取待处理数据
|
|
|
- items = QueryDataTool.fetch_crawl_data_list(request.requestId)
|
|
|
- print(f"传统模式---items: {items}")
|
|
|
- if not items:
|
|
|
- # 无数据需要处理,更新状态为完成
|
|
|
- update_request_status(request.requestId, 2)
|
|
|
- return TriggerResponse(
|
|
|
- requestId=request.requestId,
|
|
|
- processed=0,
|
|
|
- success=0,
|
|
|
- details=[]
|
|
|
- )
|
|
|
-
|
|
|
- # 处理数据
|
|
|
- success_count = 0
|
|
|
- details: List[Dict[str, Any]] = []
|
|
|
-
|
|
|
- for idx, item in enumerate(items, start=1):
|
|
|
- try:
|
|
|
- crawl_data = item.get('crawl_data') or {}
|
|
|
-
|
|
|
- # Step 1: 识别
|
|
|
- identify_result = identify_tool.run(
|
|
|
- crawl_data if isinstance(crawl_data, dict) else {}
|
|
|
- )
|
|
|
-
|
|
|
- # Step 2: 结构化并入库
|
|
|
- affected = UpdateDataTool.store_indentify_result(
|
|
|
- request.requestId,
|
|
|
- {
|
|
|
- content_id: item.get('content_id') or '',
|
|
|
- task_id: item.get('task_id') or ''
|
|
|
- },
|
|
|
- identify_result
|
|
|
- )
|
|
|
-
|
|
|
- # 使用StructureTool进行内容结构化处理
|
|
|
- structure_tool = StructureTool()
|
|
|
- structure_result = structure_tool.process_content_structure(identify_result)
|
|
|
-
|
|
|
- # 存储结构化解析结果
|
|
|
- parsing_affected = UpdateDataTool.store_parsing_result(
|
|
|
- request.requestId,
|
|
|
- {
|
|
|
- content_id: item.get('content_id') or '',
|
|
|
- task_id: item.get('task_id') or ''
|
|
|
- },
|
|
|
- structure_result
|
|
|
- )
|
|
|
-
|
|
|
- ok = affected is not None and affected > 0
|
|
|
- if ok:
|
|
|
- success_count += 1
|
|
|
-
|
|
|
- details.append({
|
|
|
- "index": idx,
|
|
|
- "dbInserted": ok,
|
|
|
- "identifyError": identify_result.get('error'),
|
|
|
- "status": 2 if ok else 3
|
|
|
- })
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"处理第 {idx} 项时出错: {e}")
|
|
|
- details.append({
|
|
|
- "index": idx,
|
|
|
- "dbInserted": False,
|
|
|
- "identifyError": str(e),
|
|
|
- "status": 3
|
|
|
- })
|
|
|
-
|
|
|
- result = TriggerResponse(
|
|
|
- requestId=request.requestId,
|
|
|
- processed=len(items),
|
|
|
- success=success_count,
|
|
|
- details=details
|
|
|
- )
|
|
|
-
|
|
|
- # 更新状态为处理完成
|
|
|
- update_request_status(request.requestId, 2)
|
|
|
-
|
|
|
- logger.info(f"处理完成: requestId={request.requestId}, processed={result.processed}, success={result.success}")
|
|
|
- return result
|
|
|
+ return result
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"处理请求失败: {e}")
|
|
@@ -517,64 +428,6 @@ async def process_request_background(request_id: str):
|
|
|
}
|
|
|
)
|
|
|
logger.info(f"LangGraph 后台处理完成: requestId={request_id}, processed={final_state.get('processed', 0)}, success={final_state.get('success', 0)}")
|
|
|
-
|
|
|
- else:
|
|
|
- # 传统模式
|
|
|
- # 更新状态为处理中
|
|
|
- update_request_status(request_id, 1)
|
|
|
-
|
|
|
- items = QueryDataTool.fetch_crawl_data_list(request_id)
|
|
|
- print(f"传统模式process_request_background---items: {items}")
|
|
|
- if not items:
|
|
|
- logger.info(f"后台处理完成: requestId={request_id}, 无数据需要处理")
|
|
|
- # 无数据需要处理,更新状态为完成
|
|
|
- update_request_status(request_id, 2)
|
|
|
- return
|
|
|
-
|
|
|
- success_count = 0
|
|
|
- for idx, item in enumerate(items, start=1):
|
|
|
- try:
|
|
|
- crawl_data = item.get('crawl_data') or {}
|
|
|
- content_id = item.get('content_id') or ''
|
|
|
-
|
|
|
- identify_result = identify_tool.run(
|
|
|
- crawl_data if isinstance(crawl_data, dict) else {}
|
|
|
- )
|
|
|
-
|
|
|
- affected = UpdateDataTool.store_indentify_result(
|
|
|
- request_id,
|
|
|
- {
|
|
|
- content_id: item.get('content_id') or '',
|
|
|
- task_id: item.get('task_id') or ''
|
|
|
- },
|
|
|
- identify_result
|
|
|
- )
|
|
|
-
|
|
|
- # 使用StructureTool进行内容结构化处理
|
|
|
- structure_tool = StructureTool()
|
|
|
- structure_result = structure_tool.process_content_structure(identify_result)
|
|
|
-
|
|
|
- # 存储结构化解析结果
|
|
|
- parsing_affected = UpdateDataTool.store_parsing_result(
|
|
|
- request_id,
|
|
|
- {
|
|
|
- content_id: item.get('content_id') or '',
|
|
|
- task_id: item.get('task_id') or ''
|
|
|
- },
|
|
|
- structure_result
|
|
|
- )
|
|
|
-
|
|
|
- if affected is not None and affected > 0:
|
|
|
- success_count += 1
|
|
|
-
|
|
|
- logger.info(f"后台处理进度: {idx}/{len(items)} - {'成功' if affected else '失败'} - 结构化{'成功' if parsing_affected else '失败'}")
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"后台处理第 {idx} 项时出错: {e}")
|
|
|
-
|
|
|
- logger.info(f"传统模式后台处理完成: requestId={request_id}, processed={len(items)}, success={success_count}")
|
|
|
- # 更新状态为处理完成
|
|
|
- update_request_status(request_id, 2)
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"后台处理失败: requestId={request_id}, error={e}")
|