agent.py 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. 使用 FastAPI + LangGraph 重构的 Agent 服务
  5. 提供强大的工作流管理和状态控制
  6. """
  7. import json
  8. import sys
  9. import os
  10. import time
  11. from typing import Any, Dict, List, Optional, TypedDict, Annotated
  12. from contextlib import asynccontextmanager
  13. import asyncio
  14. from utils.mysql_db import MysqlHelper
  15. # 保证可以导入本项目模块
  16. sys.path.append(os.path.dirname(os.path.abspath(__file__)))
  17. from fastapi import FastAPI, HTTPException, BackgroundTasks
  18. from fastapi.responses import JSONResponse
  19. from pydantic import BaseModel, Field
  20. import uvicorn
  21. from agents.clean_agent.agent import execute_agent_with_api
  22. from agents.expand_agent.agent import execute_expand_agent_with_api, _update_expansion_status
  23. # LangGraph 相关导入
  24. try:
  25. from langgraph.graph import StateGraph, END
  26. HAS_LANGGRAPH = True
  27. except ImportError:
  28. HAS_LANGGRAPH = False
  29. print("警告: LangGraph 未安装")
  30. from utils.logging_config import get_logger
  31. from tools.agent_tools import QueryDataTool, IdentifyTool, UpdateDataTool, StructureTool
  32. # 创建 logger
  33. logger = get_logger('Agent')
  34. # 状态定义
  35. class AgentState(TypedDict):
  36. request_id: str
  37. items: List[Dict[str, Any]]
  38. details: List[Dict[str, Any]]
  39. processed: int
  40. success: int
  41. error: Optional[str]
  42. status: str
  43. class ExpandRequest(BaseModel):
  44. requestId: str = Field(..., description="请求ID")
  45. query: str = Field(..., description="查询词")
  46. # 请求模型
  47. class TriggerRequest(BaseModel):
  48. requestId: str = Field(..., description="请求ID")
  49. # 响应模型
  50. class TriggerResponse(BaseModel):
  51. requestId: str
  52. processed: int
  53. success: int
  54. details: List[Dict[str, Any]]
  55. class ExtractRequest(BaseModel):
  56. requestId: str = Field(..., description="请求ID")
  57. query: str = Field(..., description="查询词")
  58. # 全局变量
  59. identify_tool = None
  60. def update_request_status(request_id: str, status: int):
  61. """
  62. 更新 knowledge_request 表中的 parsing_status
  63. Args:
  64. request_id: 请求ID
  65. status: 状态值 (1: 处理中, 2: 处理完成, 3: 处理失败)
  66. """
  67. try:
  68. from utils.mysql_db import MysqlHelper
  69. sql = "UPDATE knowledge_request SET parsing_status = %s WHERE request_id = %s"
  70. result = MysqlHelper.update_values(sql, (status, request_id))
  71. if result is not None:
  72. logger.info(f"更新请求状态成功: requestId={request_id}, status={status}")
  73. else:
  74. logger.error(f"更新请求状态失败: requestId={request_id}, status={status}")
  75. except Exception as e:
  76. logger.error(f"更新请求状态异常: requestId={request_id}, status={status}, error={e}")
  77. def _update_expansion_status(requestId: str, status: int):
  78. """更新扩展查询状态"""
  79. try:
  80. from utils.mysql_db import MysqlHelper
  81. sql = "UPDATE knowledge_request SET expansion_status = %s WHERE request_id = %s"
  82. MysqlHelper.update_values(sql, (status, requestId))
  83. logger.info(f"更新扩展查询状态成功: requestId={requestId}, status={status}")
  84. except Exception as e:
  85. logger.error(f"更新扩展查询状态失败: requestId={requestId}, status={status}, error={e}")
  86. @asynccontextmanager
  87. async def lifespan(app: FastAPI):
  88. """应用生命周期管理"""
  89. # 启动时执行
  90. logger.info("🚀 启动 Knowledge Agent 服务...")
  91. # 初始化全局工具
  92. global identify_tool
  93. identify_tool = IdentifyTool()
  94. # 启动后恢复中断的流程
  95. await restore_interrupted_processes()
  96. yield
  97. # 关闭时执行
  98. logger.info("🛑 关闭 Knowledge Agent 服务...")
  99. async def restore_interrupted_processes():
  100. """
  101. 启动后恢复中断的流程
  102. 1. 找到knowledge_request表中parsing_status=1的request_id,去请求 /parse/async
  103. 2. 找到knowledge_request表中extraction_status=1的request_id和query,去请求 /extract
  104. 3. 找到knowledge_request表中expansion_status=1的request_id和query,去请求 /expand
  105. """
  106. try:
  107. logger.info("🔄 开始恢复中断的流程...")
  108. # 等待服务完全启动
  109. await asyncio.sleep(3)
  110. # 1. 恢复解析中断的流程
  111. await restore_parsing_processes()
  112. # 2. 恢复提取中断的流程
  113. await restore_extraction_processes()
  114. # 3. 恢复扩展中断的流程
  115. await restore_expansion_processes()
  116. logger.info("✅ 流程恢复完成")
  117. except Exception as e:
  118. logger.error(f"❌ 流程恢复失败: {e}")
  119. async def restore_parsing_processes():
  120. """恢复解析中断的流程"""
  121. try:
  122. from utils.mysql_db import MysqlHelper
  123. # 查询parsing_status=1的请求
  124. sql = "SELECT request_id FROM knowledge_request WHERE parsing_status = 1"
  125. rows = MysqlHelper.get_values(sql)
  126. if not rows:
  127. logger.info("📋 没有发现中断的解析流程")
  128. return
  129. logger.info(f"🔄 发现 {len(rows)} 个中断的解析流程,开始恢复...")
  130. for row in rows:
  131. request_id = row[0]
  132. try:
  133. # 调用 /parse/async 接口,带重试机制
  134. await call_parse_async_with_retry(request_id)
  135. logger.info(f"✅ 恢复解析流程成功: request_id={request_id}")
  136. except Exception as e:
  137. logger.error(f"❌ 恢复解析流程失败: request_id={request_id}, error={e}")
  138. except Exception as e:
  139. logger.error(f"❌ 恢复解析流程时发生错误: {e}")
  140. async def restore_extraction_processes():
  141. """恢复提取中断的流程"""
  142. try:
  143. from utils.mysql_db import MysqlHelper
  144. # 查询extraction_status=1的请求和query
  145. sql = "SELECT request_id, query FROM knowledge_request WHERE extraction_status = 1"
  146. rows = MysqlHelper.get_values(sql)
  147. if not rows:
  148. logger.info("📋 没有发现中断的提取流程")
  149. return
  150. logger.info(f"🔄 发现 {len(rows)} 个中断的提取流程,开始恢复...")
  151. for row in rows:
  152. request_id = row[0]
  153. query = row[1] if len(row) > 1 else ""
  154. try:
  155. # 调用 /extract 接口,带重试机制
  156. await call_extract_with_retry(request_id, query)
  157. logger.info(f"✅ 恢复提取流程成功: request_id={request_id}")
  158. except Exception as e:
  159. logger.error(f"❌ 恢复提取流程失败: request_id={request_id}, error={e}")
  160. except Exception as e:
  161. logger.error(f"❌ 恢复提取流程时发生错误: {e}")
  162. async def restore_expansion_processes():
  163. """恢复扩展中断的流程"""
  164. try:
  165. from utils.mysql_db import MysqlHelper
  166. # 查询expansion_status=1的请求和query
  167. sql = "SELECT request_id, query FROM knowledge_request WHERE expansion_status = 1"
  168. rows = MysqlHelper.get_values(sql)
  169. if not rows:
  170. logger.info("📋 没有发现中断的扩展流程")
  171. return
  172. logger.info(f"🔄 发现 {len(rows)} 个中断的扩展流程,开始恢复...")
  173. for row in rows:
  174. request_id = row[0]
  175. query = row[1] if len(row) > 1 else ""
  176. try:
  177. # 调用 /expand 接口,带重试机制
  178. await call_expand_with_retry(request_id, query)
  179. logger.info(f"✅ 恢复扩展流程成功: request_id={request_id}")
  180. except Exception as e:
  181. logger.error(f"❌ 恢复扩展流程失败: request_id={request_id}, error={e}")
  182. except Exception as e:
  183. logger.error(f"❌ 恢复扩展流程时发生错误: {e}")
  184. async def call_parse_async_with_retry(request_id: str, max_retries: int = 3):
  185. """调用 /parse/async 接口,带重试机制"""
  186. for attempt in range(max_retries):
  187. try:
  188. import httpx
  189. # 创建异步HTTP客户端
  190. async with httpx.AsyncClient(timeout=30.0) as client:
  191. response = await client.post(
  192. "http://localhost:8080/parse/async",
  193. json={"requestId": request_id}
  194. )
  195. print(f"response={response.json()}")
  196. if response.status_code == 200:
  197. result = response.json()
  198. logger.info(f"调用 /parse/async 成功: request_id={request_id}, result={result}")
  199. return
  200. else:
  201. logger.warning(f"调用 /parse/async 失败: request_id={request_id}, status_code={response.status_code}, attempt={attempt+1}")
  202. except Exception as e:
  203. logger.warning(f"调用 /parse/async 异常: request_id={request_id}, error={e}, attempt={attempt+1}")
  204. # 如果不是最后一次尝试,等待后重试
  205. if attempt < max_retries - 1:
  206. await asyncio.sleep(2 ** attempt) # 指数退避
  207. logger.error(f"调用 /parse/async 最终失败: request_id={request_id}, 已重试{max_retries}次")
  208. async def call_extract_with_retry(request_id: str, query: str, max_retries: int = 3):
  209. """调用 /extract 接口,带重试机制"""
  210. for attempt in range(max_retries):
  211. try:
  212. import httpx
  213. # 创建异步HTTP客户端
  214. async with httpx.AsyncClient(timeout=30.0) as client:
  215. response = await client.post(
  216. "http://localhost:8080/extract",
  217. json={"requestId": request_id, "query": query}
  218. )
  219. if response.status_code == 200:
  220. result = response.json()
  221. logger.info(f"调用 /extract 成功: request_id={request_id}, result={result}")
  222. return
  223. else:
  224. logger.warning(f"调用 /extract 失败: request_id={request_id}, status_code={response.status_code}, attempt={attempt+1}")
  225. except Exception as e:
  226. logger.warning(f"调用 /extract 异常: request_id={request_id}, error={e}, attempt={attempt+1}")
  227. # 如果不是最后一次尝试,等待后重试
  228. if attempt < max_retries - 1:
  229. await asyncio.sleep(2 ** attempt) # 指数退避
  230. logger.error(f"调用 /extract 最终失败: request_id={request_id}, 已重试{max_retries}次")
  231. async def call_expand_with_retry(request_id: str, query: str, max_retries: int = 3):
  232. """调用 /expand 接口,带重试机制"""
  233. for attempt in range(max_retries):
  234. try:
  235. import httpx
  236. # 创建异步HTTP客户端
  237. async with httpx.AsyncClient(timeout=30.0) as client:
  238. response = await client.post(
  239. "http://localhost:8080/expand",
  240. json={"requestId": request_id, "query": query}
  241. )
  242. if response.status_code == 200:
  243. result = response.json()
  244. logger.info(f"调用 /expand 成功: request_id={request_id}, result={result}")
  245. return
  246. else:
  247. logger.warning(f"调用 /expand 失败: request_id={request_id}, status_code={response.status_code}, attempt={attempt+1}")
  248. except Exception as e:
  249. logger.warning(f"调用 /expand 异常: request_id={request_id}, error={e}, attempt={attempt+1}")
  250. # 如果不是最后一次尝试,等待后重试
  251. if attempt < max_retries - 1:
  252. await asyncio.sleep(2 ** attempt) # 指数退避
  253. logger.error(f"调用 /expand 最终失败: request_id={request_id}, 已重试{max_retries}次")
  254. async def call_parse_async(request_id: str):
  255. """调用 /parse/async 接口"""
  256. try:
  257. import httpx
  258. import asyncio
  259. # 创建异步HTTP客户端
  260. async with httpx.AsyncClient(timeout=30.0) as client:
  261. response = await client.post(
  262. "http://localhost:8080/parse/async",
  263. json={"requestId": request_id}
  264. )
  265. if response.status_code == 200:
  266. result = response.json()
  267. logger.info(f"调用 /parse/async 成功: request_id={request_id}, result={result}")
  268. else:
  269. logger.error(f"调用 /parse/async 失败: request_id={request_id}, status_code={response.status_code}")
  270. except Exception as e:
  271. logger.error(f"调用 /parse/async 异常: request_id={request_id}, error={e}")
  272. async def call_extract(request_id: str, query: str):
  273. """调用 /extract 接口"""
  274. try:
  275. import httpx
  276. # 创建异步HTTP客户端
  277. async with httpx.AsyncClient(timeout=30.0) as client:
  278. response = await client.post(
  279. "http://localhost:8080/extract",
  280. json={"requestId": request_id, "query": query}
  281. )
  282. if response.status_code == 200:
  283. result = response.json()
  284. logger.info(f"调用 /extract 成功: request_id={request_id}, result={result}")
  285. else:
  286. logger.error(f"调用 /extract 失败: request_id={request_id}, status_code={response.status_code}")
  287. except Exception as e:
  288. logger.error(f"调用 /extract 异常: request_id={request_id}, error={e}")
  289. async def call_expand(request_id: str, query: str):
  290. """调用 /expand 接口"""
  291. try:
  292. import httpx
  293. # 创建异步HTTP客户端
  294. async with httpx.AsyncClient(timeout=30.0) as client:
  295. response = await client.post(
  296. "http://localhost:8080/expand",
  297. json={"requestId": request_id, "query": query}
  298. )
  299. if response.status_code == 200:
  300. result = response.json()
  301. logger.info(f"调用 /expand 成功: request_id={request_id}, result={result}")
  302. else:
  303. logger.error(f"调用 /expand 失败: request_id={request_id}, status_code={response.status_code}")
  304. except Exception as e:
  305. logger.error(f"调用 /expand 异常: request_id={request_id}, error={e}")
  306. # 创建 FastAPI 应用
  307. app = FastAPI(
  308. title="Knowledge Agent API",
  309. description="基于 LangGraph 的智能内容识别和结构化处理服务",
  310. version="2.0.0",
  311. lifespan=lifespan
  312. )
  313. # 并发控制:跟踪正在处理的 requestId,防止重复并发提交
  314. RUNNING_REQUESTS: set = set()
  315. RUNNING_LOCK = asyncio.Lock()
  316. # =========================
  317. # LangGraph 工作流定义
  318. # =========================
  319. def create_langgraph_workflow():
  320. """创建 LangGraph 工作流"""
  321. if not HAS_LANGGRAPH:
  322. return None
  323. # 工作流节点定义
  324. def fetch_data(state: AgentState) -> AgentState:
  325. """获取待处理数据"""
  326. try:
  327. request_id = state["request_id"]
  328. logger.info(f"开始获取数据: requestId={request_id}")
  329. # 更新状态为处理中
  330. update_request_status(request_id, 1)
  331. items = QueryDataTool.fetch_crawl_data_list(request_id)
  332. state["items"] = items
  333. state["processed"] = len(items)
  334. state["status"] = "data_fetched"
  335. logger.info(f"数据获取完成: requestId={request_id}, 数量={len(items)}")
  336. return state
  337. except Exception as e:
  338. logger.error(f"获取数据失败: {e}")
  339. state["error"] = str(e)
  340. state["status"] = "error"
  341. return state
  342. def process_items_batch(state: AgentState) -> AgentState:
  343. """批量处理所有数据项"""
  344. try:
  345. items = state["items"]
  346. if not items:
  347. state["status"] = "completed"
  348. return state
  349. success_count = 0
  350. details = []
  351. for idx, item in enumerate(items, start=1):
  352. try:
  353. crawl_data = item.get('crawl_data') or {}
  354. content_id = item.get('content_id') or ''
  355. task_id = item.get('task_id') or ''
  356. # 先在库中查询是否已经处理过
  357. check_sql = "SELECT id,status FROM knowledge_parsing_content WHERE request_id = %s AND content_id = %s"
  358. check_result = MysqlHelper.get_values(check_sql, (state["request_id"], content_id))
  359. if check_result:
  360. id, status = check_result[0]
  361. if status == 5:
  362. success_count += 1
  363. continue
  364. # Step 1: 识别
  365. identify_result = identify_tool.run(
  366. crawl_data if isinstance(crawl_data, dict) else {}
  367. )
  368. # Step 2: 结构化并入库
  369. affected = UpdateDataTool.store_indentify_result(
  370. state["request_id"],
  371. {
  372. "content_id": content_id,
  373. "task_id": task_id
  374. },
  375. identify_result
  376. )
  377. # 使用StructureTool进行内容结构化处理
  378. structure_tool = StructureTool()
  379. structure_result = structure_tool.process_content_structure(identify_result)
  380. # 存储结构化解析结果
  381. parsing_affected = UpdateDataTool.store_parsing_result(
  382. state["request_id"],
  383. {
  384. "id": affected,
  385. "content_id": content_id,
  386. "task_id": task_id
  387. },
  388. structure_result
  389. )
  390. ok = affected is not None and affected > 0 and parsing_affected is not None and parsing_affected > 0
  391. if ok:
  392. success_count += 1
  393. else:
  394. success_count += 1
  395. logger.error(f"处理第 {idx} 项时出错: {identify_result.get('error') or structure_result.get('error')}")
  396. # 记录处理详情
  397. detail = {
  398. "index": idx,
  399. "dbInserted": ok,
  400. "identifyError": identify_result.get('error') or structure_result.get('error'),
  401. "status": 2 if ok else 3
  402. }
  403. details.append(detail)
  404. logger.info(f"处理进度: {idx}/{len(items)} - {'成功' if ok else '失败'}")
  405. except Exception as e:
  406. logger.error(f"处理第 {idx} 项时出错: {e}")
  407. detail = {
  408. "index": idx,
  409. "dbInserted": False,
  410. "identifyError": str(e),
  411. "status": 3
  412. }
  413. details.append(detail)
  414. state["success"] = success_count
  415. state["details"] = details
  416. state["status"] = "completed"
  417. return state
  418. except Exception as e:
  419. logger.error(f"批量处理失败: {e}")
  420. state["error"] = str(e)
  421. state["status"] = "error"
  422. return state
  423. def should_continue(state: AgentState) -> str:
  424. """判断是否继续处理"""
  425. if state.get("error"):
  426. # 处理失败,更新状态为3
  427. update_request_status(state["request_id"], 3)
  428. return "end"
  429. # 所有数据处理完毕,更新状态为2
  430. update_request_status(state["request_id"], 2)
  431. return "end"
  432. # 构建工作流图
  433. workflow = StateGraph(AgentState)
  434. # 添加节点
  435. workflow.add_node("fetch_data", fetch_data)
  436. workflow.add_node("process_items_batch", process_items_batch)
  437. # 设置入口点
  438. workflow.set_entry_point("fetch_data")
  439. # 添加边
  440. workflow.add_edge("fetch_data", "process_items_batch")
  441. workflow.add_edge("process_items_batch", END)
  442. # 编译工作流
  443. return workflow.compile()
  444. # 全局工作流实例
  445. WORKFLOW = create_langgraph_workflow() if HAS_LANGGRAPH else None
  446. # =========================
  447. # FastAPI 接口定义
  448. # =========================
  449. @app.get("/")
  450. async def root():
  451. """根路径,返回服务信息"""
  452. return {
  453. "service": "Knowledge Agent API",
  454. "version": "2.0.0",
  455. "status": "running",
  456. "langgraph_enabled": HAS_LANGGRAPH,
  457. "endpoints": {
  458. "parse": "/parse",
  459. "parse/async": "/parse/async",
  460. "health": "/health",
  461. "docs": "/docs"
  462. }
  463. }
  464. @app.get("/health")
  465. async def health_check():
  466. """健康检查接口"""
  467. return {
  468. "status": "healthy",
  469. "timestamp": time.time(),
  470. "langgraph_enabled": HAS_LANGGRAPH
  471. }
  472. @app.post("/parse", response_model=TriggerResponse)
  473. async def parse_processing(request: TriggerRequest, background_tasks: BackgroundTasks):
  474. """
  475. 解析内容处理
  476. - **requestId**: 请求ID,用于标识处理任务
  477. """
  478. try:
  479. logger.info(f"收到解析请求: requestId={request.requestId}")
  480. if WORKFLOW and HAS_LANGGRAPH:
  481. # 使用 LangGraph 工作流
  482. logger.info("使用 LangGraph 工作流处理")
  483. # 初始化状态
  484. initial_state = AgentState(
  485. request_id=request.requestId,
  486. items=[],
  487. details=[],
  488. processed=0,
  489. success=0,
  490. error=None,
  491. status="started"
  492. )
  493. # 执行工作流
  494. final_state = WORKFLOW.invoke(
  495. initial_state,
  496. config={
  497. "configurable": {"thread_id": f"thread_{request.requestId}"},
  498. "recursion_limit": 100 # 增加递归限制
  499. }
  500. )
  501. # 构建响应
  502. result = TriggerResponse(
  503. requestId=request.requestId,
  504. processed=final_state.get("processed", 0),
  505. success=final_state.get("success", 0),
  506. details=final_state.get("details", [])
  507. )
  508. return result
  509. except Exception as e:
  510. logger.error(f"处理请求失败: {e}")
  511. # 处理失败,更新状态为3
  512. update_request_status(request.requestId, 3)
  513. raise HTTPException(status_code=500, detail=f"处理失败: {str(e)}")
  514. @app.post("/parse/async", status_code=200)
  515. async def parse_processing_async(request: TriggerRequest, background_tasks: BackgroundTasks):
  516. """
  517. 异步解析内容处理(后台任务)
  518. - **requestId**: 请求ID,用于标识处理任务
  519. 行为:立即返回 200,并在后台继续处理任务。
  520. 若同一个 requestId 已有任务进行中,则立即返回失败(status=3)。
  521. """
  522. try:
  523. logger.info(f"收到异步解析请求: requestId={request.requestId}")
  524. # 并发防抖:同一 requestId 只允许一个在运行
  525. async with RUNNING_LOCK:
  526. if request.requestId in RUNNING_REQUESTS:
  527. return {
  528. "requestId": request.requestId,
  529. "status": 3,
  530. "message": "已有任务进行中,稍后再试",
  531. "langgraph_enabled": HAS_LANGGRAPH
  532. }
  533. RUNNING_REQUESTS.add(request.requestId)
  534. async def _background_wrapper(rid: str):
  535. try:
  536. await process_request_background(rid)
  537. finally:
  538. async with RUNNING_LOCK:
  539. RUNNING_REQUESTS.discard(rid)
  540. # 直接使用 asyncio 创建后台任务(不阻塞当前请求返回)
  541. asyncio.create_task(_background_wrapper(request.requestId))
  542. # 立即返回(不阻塞)
  543. return {
  544. "requestId": request.requestId,
  545. "status": 1,
  546. "message": "任务已进入队列并在后台处理",
  547. "langgraph_enabled": HAS_LANGGRAPH
  548. }
  549. except Exception as e:
  550. logger.error(f"提交异步任务失败: {e}")
  551. raise HTTPException(status_code=500, detail=f"提交任务失败: {str(e)}")
  552. async def process_request_background(request_id: str):
  553. """后台处理请求"""
  554. try:
  555. logger.info(f"开始后台处理: requestId={request_id}")
  556. if WORKFLOW and HAS_LANGGRAPH:
  557. # 使用 LangGraph 工作流
  558. # 更新状态为处理中
  559. update_request_status(request_id, 1)
  560. initial_state = AgentState(
  561. request_id=request_id,
  562. items=[],
  563. details=[],
  564. processed=0,
  565. success=0,
  566. error=None,
  567. status="started"
  568. )
  569. final_state = WORKFLOW.invoke(
  570. initial_state,
  571. config={
  572. "configurable": {"thread_id": f"thread_{request_id}"},
  573. "recursion_limit": 100 # 增加递归限制
  574. }
  575. )
  576. # 所有数据处理完毕,更新状态为2
  577. update_request_status(request_id, 2)
  578. logger.info(f"LangGraph 后台处理完成: requestId={request_id}, processed={final_state.get('processed', 0)}, success={final_state.get('success', 0)}")
  579. except Exception as e:
  580. logger.error(f"后台处理失败: requestId={request_id}, error={e}")
  581. # 处理失败,更新状态为3
  582. update_request_status(request_id, 3)
  583. extraction_requests: set = set()
  584. @app.post("/extract")
  585. async def extract(request: ExtractRequest):
  586. try:
  587. requestId = request.requestId
  588. query = request.query
  589. # 检查请求是否已经在处理中
  590. async with RUNNING_LOCK:
  591. if requestId in extraction_requests:
  592. return {"status": 1, "request_id": requestId, "message": "请求已在处理中"}
  593. extraction_requests.add(requestId)
  594. try:
  595. # 更新状态为处理中
  596. update_extract_status(requestId, 1)
  597. # 执行Agent
  598. result = execute_agent_with_api(json.dumps({"query_word": query, "request_id": requestId}))
  599. update_extract_status(requestId, 2)
  600. finally:
  601. # 无论成功失败,都从运行集合中移除
  602. async with RUNNING_LOCK:
  603. extraction_requests.discard(requestId)
  604. return {"status": "success", "result": result}
  605. except Exception as e:
  606. # 发生异常,更新状态为处理失败
  607. update_extract_status(requestId, 3)
  608. raise HTTPException(status_code=500, detail=f"执行Agent时出错: {str(e)}")
  609. @app.post("/expand")
  610. async def expand(request: ExpandRequest, background_tasks: BackgroundTasks):
  611. """
  612. 执行扩展查询处理
  613. Args:
  614. request: 包含请求ID的请求体
  615. background_tasks: FastAPI 后台任务
  616. Returns:
  617. dict: 包含执行状态的字典
  618. """
  619. try:
  620. # 立即更新状态为处理中
  621. _update_expansion_status(request.requestId, 1)
  622. # 添加后台任务
  623. background_tasks.add_task(execute_expand_agent_with_api, request.requestId, request.query)
  624. # 立即返回状态
  625. return {"status": 1, "requestId": request.requestId, "message": "扩展查询处理已启动"}
  626. except Exception as e:
  627. raise HTTPException(status_code=500, detail=f"执行Agent时出错: {str(e)}")
  628. def update_extract_status(request_id: str, status: int):
  629. try:
  630. from utils.mysql_db import MysqlHelper
  631. sql = "UPDATE knowledge_request SET extraction_status = %s WHERE request_id = %s"
  632. result = MysqlHelper.update_values(sql, (status, request_id))
  633. if result is not None:
  634. logger.info(f"更新请求状态成功: requestId={request_id}, status={status}")
  635. else:
  636. logger.error(f"更新请求状态失败: requestId={request_id}, status={status}")
  637. except Exception as e:
  638. logger.error(f"更新请求状态异常: requestId={request_id}, status={status}, error={e}")
  639. if __name__ == "__main__":
  640. # 从环境变量获取配置
  641. import os
  642. reload_enabled = os.getenv("RELOAD_ENABLED", "false").lower() == "true"
  643. log_level = os.getenv("LOG_LEVEL", "info")
  644. # 启动服务
  645. uvicorn.run(
  646. "agent:app",
  647. host="0.0.0.0",
  648. port=8080,
  649. reload=reload_enabled, # 通过环境变量控制
  650. log_level=log_level
  651. )