agent.py 35 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. 使用 FastAPI + LangGraph 重构的 Agent 服务
  5. 提供强大的工作流管理和状态控制
  6. """
  7. import json
  8. import sys
  9. import os
  10. import time
  11. import threading
  12. import asyncio
  13. import concurrent.futures
  14. import fcntl
  15. import errno
  16. import multiprocessing
  17. import signal
  18. from typing import Any, Dict, List, Optional, TypedDict, Annotated
  19. from contextlib import asynccontextmanager
  20. # 设置环境变量以抑制 gRPC fork 警告
  21. os.environ.setdefault('GRPC_POLL_STRATEGY', 'poll')
  22. from utils.mysql_db import MysqlHelper
  23. from fastapi import FastAPI, HTTPException, BackgroundTasks
  24. from fastapi.responses import JSONResponse
  25. from pydantic import BaseModel, Field
  26. import uvicorn
  27. from agents.clean_agent.agent import execute_agent_with_api, execute
  28. from agents.expand_agent.agent import execute_expand_agent_with_api, _update_expansion_status
  29. # LangGraph 相关导入
  30. try:
  31. from langgraph.graph import StateGraph, END
  32. HAS_LANGGRAPH = True
  33. except ImportError:
  34. HAS_LANGGRAPH = False
  35. print("警告: LangGraph 未安装")
  36. from utils.logging_config import get_logger
  37. from tools.agent_tools import QueryDataTool, IdentifyTool, UpdateDataTool, StructureTool
  38. # 保证可以导入本项目模块
  39. sys.path.append(os.path.dirname(os.path.abspath(__file__)))
  40. # 创建 logger
  41. logger = get_logger('Agent')
  42. # 状态定义
  43. class AgentState(TypedDict):
  44. request_id: str
  45. items: List[Dict[str, Any]]
  46. details: List[Dict[str, Any]]
  47. processed: int
  48. success: int
  49. error: Optional[str]
  50. status: str
  51. class ExpandRequest(BaseModel):
  52. requestId: str = Field(..., description="请求ID")
  53. query: str = Field(..., description="查询词")
  54. # 请求模型
  55. class TriggerRequest(BaseModel):
  56. requestId: str = Field(..., description="请求ID")
  57. # 响应模型
  58. class TriggerResponse(BaseModel):
  59. requestId: str
  60. processed: int
  61. success: int
  62. details: List[Dict[str, Any]]
  63. class ExtractRequest(BaseModel):
  64. requestId: str = Field(..., description="请求ID")
  65. query: str = Field(..., description="查询词")
  66. # 全局变量
  67. identify_tool = None
  68. # 全局线程池
  69. THREAD_POOL = concurrent.futures.ThreadPoolExecutor(max_workers=20)
  70. # 活跃的进程池列表
  71. ACTIVE_POOLS = []
  72. POOLS_LOCK = threading.Lock()
  73. def cleanup_all_pools():
  74. """清理所有活跃的进程池"""
  75. global ACTIVE_POOLS, POOLS_LOCK
  76. with POOLS_LOCK:
  77. logger.info(f"开始清理 {len(ACTIVE_POOLS)} 个活跃进程池...")
  78. for pool in ACTIVE_POOLS:
  79. try:
  80. logger.info("正在终止进程池...")
  81. pool.terminate()
  82. pool.join(timeout=5) # 等待5秒
  83. if pool._state != 'CLOSED':
  84. logger.warning("进程池未正常关闭,强制终止")
  85. pool.kill()
  86. except Exception as e:
  87. logger.error(f"清理进程池时出错: {e}")
  88. ACTIVE_POOLS.clear()
  89. logger.info("所有进程池已清理")
  90. def signal_handler(signum, frame):
  91. """信号处理器"""
  92. logger.info(f"收到信号 {signum},开始清理...")
  93. cleanup_all_pools()
  94. # 关闭线程池
  95. THREAD_POOL.shutdown(wait=False)
  96. logger.info("清理完成,退出程序")
  97. sys.exit(0)
  98. def register_signal_handlers():
  99. """注册信号处理器"""
  100. signal.signal(signal.SIGTERM, signal_handler)
  101. signal.signal(signal.SIGINT, signal_handler)
  102. def get_identify_tool():
  103. """惰性初始化 IdentifyTool,确保在子进程中可用"""
  104. global identify_tool
  105. if identify_tool is None:
  106. identify_tool = IdentifyTool()
  107. return identify_tool
  108. def update_request_status(request_id: str, status: int):
  109. """
  110. 更新 knowledge_request 表中的 parsing_status
  111. Args:
  112. request_id: 请求ID
  113. status: 状态值 (1: 处理中, 2: 处理完成, 3: 处理失败)
  114. """
  115. try:
  116. from utils.mysql_db import MysqlHelper
  117. sql = "UPDATE knowledge_request SET parsing_status = %s WHERE request_id = %s"
  118. result = MysqlHelper.update_values(sql, (status, request_id))
  119. if result is not None:
  120. logger.info(f"更新请求状态成功: requestId={request_id}, status={status}")
  121. else:
  122. logger.error(f"更新请求状态失败: requestId={request_id}, status={status}")
  123. except Exception as e:
  124. logger.error(f"更新请求状态异常: requestId={request_id}, status={status}, error={e}")
  125. def _update_expansion_status(requestId: str, status: int):
  126. """更新扩展查询状态"""
  127. try:
  128. from utils.mysql_db import MysqlHelper
  129. sql = "UPDATE knowledge_request SET expansion_status = %s WHERE request_id = %s"
  130. MysqlHelper.update_values(sql, (status, requestId))
  131. logger.info(f"更新扩展查询状态成功: requestId={requestId}, status={status}")
  132. except Exception as e:
  133. logger.error(f"更新扩展查询状态失败: requestId={requestId}, status={status}, error={e}")
  134. @asynccontextmanager
  135. async def lifespan(app: FastAPI):
  136. """应用生命周期管理"""
  137. # 启动时执行
  138. logger.info("🚀 启动 Knowledge Agent 服务...")
  139. # 注册信号处理器
  140. register_signal_handlers()
  141. # 初始化全局工具
  142. global identify_tool
  143. identify_tool = IdentifyTool()
  144. # 启动后恢复中断的流程
  145. # 使用线程池恢复中断流程,避免阻塞启动
  146. thread = threading.Thread(target=restore_interrupted_processes)
  147. thread.daemon = True
  148. thread.start()
  149. app.state.restore_thread = thread
  150. yield
  151. # 关闭时执行
  152. logger.info("🛑 关闭 Knowledge Agent 服务...")
  153. # 清理所有进程池
  154. cleanup_all_pools()
  155. # 关闭线程池
  156. THREAD_POOL.shutdown(wait=False)
  157. logger.info("✅ 已关闭线程池")
  158. def restore_interrupted_processes():
  159. """
  160. 启动后恢复中断的流程
  161. 1. 找到knowledge_request表中parsing_status=1的request_id,去请求 /parse/async
  162. 2. 找到knowledge_request表中extraction_status=1的request_id和query,去请求 /extract
  163. 3. 找到knowledge_request表中expansion_status=1的request_id和query,去请求 /expand
  164. 使用文件锁确保只有一个进程执行恢复操作
  165. """
  166. # 定义锁文件路径
  167. lock_file_path = "/tmp/knowledge_agent_restore.lock"
  168. try:
  169. # 创建或打开锁文件
  170. lock_file = open(lock_file_path, 'w')
  171. try:
  172. # 尝试获取文件锁(非阻塞模式)
  173. fcntl.flock(lock_file, fcntl.LOCK_EX | fcntl.LOCK_NB)
  174. logger.info("🔄 获取恢复锁成功,开始恢复中断的流程...")
  175. # 等待服务完全启动
  176. time.sleep(3)
  177. # 1. 恢复解析中断的流程
  178. restore_parsing_processes()
  179. # 2. 恢复提取中断的流程
  180. restore_extraction_processes()
  181. # 3. 恢复扩展中断的流程
  182. restore_expansion_processes()
  183. logger.info("✅ 流程恢复完成")
  184. # 释放锁
  185. fcntl.flock(lock_file, fcntl.LOCK_UN)
  186. except IOError as e:
  187. # 如果错误是因为无法获取锁(资源暂时不可用),说明已有其他进程在执行恢复
  188. if e.errno == errno.EAGAIN:
  189. logger.info("⏩ 另一个进程正在执行恢复操作,跳过本次恢复")
  190. else:
  191. logger.error(f"❌ 获取恢复锁时发生错误: {e}")
  192. finally:
  193. # 关闭锁文件
  194. lock_file.close()
  195. except Exception as e:
  196. logger.error(f"❌ 流程恢复失败: {e}")
  197. # 尝试清理锁文件
  198. try:
  199. if os.path.exists(lock_file_path):
  200. os.remove(lock_file_path)
  201. except:
  202. pass
  203. def restore_parsing_processes():
  204. """恢复解析中断的流程"""
  205. try:
  206. # 查询parsing_status=1的请求
  207. sql = "SELECT request_id FROM knowledge_request WHERE parsing_status = 1"
  208. rows = MysqlHelper.get_values(sql)
  209. if not rows:
  210. logger.info("📋 没有发现中断的解析流程")
  211. return
  212. logger.info(f"🔄 发现 {len(rows)} 个中断的解析流程,开始恢复...")
  213. for row in rows:
  214. request_id = row[0]
  215. try:
  216. # 调用 /parse/async 接口,带重试机制
  217. call_parse_async_with_retry(request_id)
  218. logger.info(f"✅ 恢复解析流程成功: request_id={request_id}")
  219. except Exception as e:
  220. logger.error(f"❌ 恢复解析流程失败: request_id={request_id}, error={e}")
  221. except Exception as e:
  222. logger.error(f"❌ 恢复解析流程时发生错误: {e}")
  223. def restore_extraction_processes():
  224. """恢复提取中断的流程"""
  225. try:
  226. # 查询extraction_status=1的请求和query
  227. sql = "SELECT request_id, query FROM knowledge_request WHERE extraction_status = 1"
  228. rows = MysqlHelper.get_values(sql)
  229. if not rows:
  230. logger.info("📋 没有发现中断的提取流程")
  231. return
  232. logger.info(f"🔄 发现 {len(rows)} 个中断的提取流程,开始恢复...")
  233. for row in rows:
  234. request_id = row[0]
  235. query = row[1] if len(row) > 1 else ""
  236. try:
  237. # 直接调用提取函数,带重试机制(函数内部会处理状态更新)
  238. call_extract_with_retry(request_id, query)
  239. logger.info(f"✅ 恢复提取流程成功: request_id={request_id}")
  240. except Exception as e:
  241. logger.error(f"❌ 恢复提取流程失败: request_id={request_id}, error={e}")
  242. except Exception as e:
  243. logger.error(f"❌ 恢复提取流程时发生错误: {e}")
  244. def restore_expansion_processes():
  245. """恢复扩展中断的流程"""
  246. try:
  247. # 查询expansion_status=1的请求和query
  248. sql = "SELECT request_id, query FROM knowledge_request WHERE expansion_status = 1"
  249. rows = MysqlHelper.get_values(sql)
  250. if not rows:
  251. logger.info("📋 没有发现中断的扩展流程")
  252. return
  253. logger.info(f"🔄 发现 {len(rows)} 个中断的扩展流程,开始恢复...")
  254. for row in rows:
  255. request_id = row[0]
  256. query = row[1] if len(row) > 1 else ""
  257. try:
  258. # 直接调用扩展函数,带重试机制(函数内部会处理状态更新)
  259. call_expand_with_retry(request_id, query)
  260. logger.info(f"✅ 恢复扩展流程成功: request_id={request_id}")
  261. except Exception as e:
  262. logger.error(f"❌ 恢复扩展流程失败: request_id={request_id}, error={e}")
  263. except Exception as e:
  264. logger.error(f"❌ 恢复扩展流程时发生错误: {e}")
  265. def call_parse_async_with_retry(request_id: str, max_retries: int = 3):
  266. """直接调用解析函数,带重试机制"""
  267. for attempt in range(max_retries):
  268. try:
  269. # 直接调用后台处理函数,使用线程池
  270. future = THREAD_POOL.submit(process_request_background_sync, request_id)
  271. result = future.result()
  272. logger.info(f"直接调用解析函数成功: request_id={request_id}")
  273. return
  274. except Exception as e:
  275. logger.warning(f"直接调用解析函数异常: request_id={request_id}, error={e}, attempt={attempt+1}")
  276. # 如果不是最后一次尝试,等待后重试
  277. if attempt < max_retries - 1:
  278. time.sleep(2 ** attempt) # 指数退避
  279. logger.error(f"直接调用解析函数最终失败: request_id={request_id}, 已重试{max_retries}次")
  280. def call_extract_with_retry(request_id: str, query: str, max_retries: int = 3):
  281. """直接调用提取函数,带重试机制"""
  282. for attempt in range(max_retries):
  283. try:
  284. # 更新状态为处理中
  285. update_extract_status(request_id, 1)
  286. # 直接调用提取函数(同步函数,在线程池中执行)
  287. # 在全局线程池中执行同步函数
  288. future = THREAD_POOL.submit(
  289. execute_agent_with_api,
  290. json.dumps({"query_word": query, "request_id": request_id})
  291. )
  292. result = future.result()
  293. # 更新状态为处理完成
  294. update_extract_status(request_id, 2)
  295. logger.info(f"直接调用提取函数成功: request_id={request_id}, result={result}")
  296. return
  297. except Exception as e:
  298. logger.warning(f"直接调用提取函数异常: request_id={request_id}, error={e}, attempt={attempt+1}")
  299. # 更新状态为处理失败
  300. update_extract_status(request_id, 3)
  301. # 如果不是最后一次尝试,等待后重试
  302. if attempt < max_retries - 1:
  303. time.sleep(2 ** attempt) # 指数退避
  304. logger.error(f"直接调用提取函数最终失败: request_id={request_id}, 已重试{max_retries}次")
  305. def call_expand_with_retry(request_id: str, query: str, max_retries: int = 3):
  306. """直接调用扩展函数,带重试机制"""
  307. for attempt in range(max_retries):
  308. try:
  309. # 直接调用扩展函数
  310. # 在全局线程池中执行同步函数
  311. future = THREAD_POOL.submit(execute_expand_agent_with_api, request_id, query)
  312. result = future.result()
  313. logger.info(f"直接调用扩展函数成功: request_id={request_id}")
  314. return
  315. except Exception as e:
  316. logger.warning(f"直接调用扩展函数异常: request_id={request_id}, error={e}, attempt={attempt+1}")
  317. # 如果不是最后一次尝试,等待后重试
  318. if attempt < max_retries - 1:
  319. time.sleep(2 ** attempt) # 指数退避
  320. logger.error(f"直接调用扩展函数最终失败: request_id={request_id}, 已重试{max_retries}次")
  321. # 这些函数已被删除,因为我们现在直接调用相应的函数而不是通过HTTP请求
  322. # 创建 FastAPI 应用
  323. app = FastAPI(
  324. title="Knowledge Agent API",
  325. description="基于 LangGraph 的智能内容识别和结构化处理服务",
  326. version="2.0.0",
  327. lifespan=lifespan
  328. )
  329. # 并发控制:跟踪正在处理的 requestId,防止重复并发提交
  330. RUNNING_REQUESTS: set = set()
  331. RUNNING_LOCK = asyncio.Lock()
  332. # =========================
  333. # LangGraph 工作流定义
  334. # =========================
  335. def process_single_item(args):
  336. """处理单个数据项的函数,用于多进程 (模块级,便于pickle)"""
  337. idx, item, request_id, api_key = args
  338. try:
  339. # 临时设置环境变量以使用指定的API密钥
  340. original_api_key = os.getenv('GEMINI_API_KEY_1')
  341. os.environ['GEMINI_API_KEY'] = api_key
  342. crawl_data = item.get('crawl_data') or {}
  343. content_id = item.get('content_id') or ''
  344. task_id = item.get('task_id') or ''
  345. # 先在库中查询是否已经处理过
  346. check_sql = "SELECT id,status,indentify_data FROM knowledge_parsing_content WHERE request_id = %s AND content_id = %s"
  347. check_result = MysqlHelper.get_values(check_sql, (request_id, content_id))
  348. result_status = 0
  349. result_id = 0
  350. result_indentify_data = {}
  351. if check_result:
  352. id, status, indentify_data = check_result[0]
  353. logger.info(f"查询到待结构化处理的条目,id: {id}, status: {status}, indentify_data: {str(indentify_data)[:100]}")
  354. result_status = status
  355. result_id = id
  356. result_indentify_data = indentify_data
  357. if status == 5:
  358. return {
  359. "index": idx,
  360. "dbInserted": True,
  361. "identifyError": None,
  362. "status": 2,
  363. "success": True
  364. }
  365. # 0 未识别 3识别失败,需要重新进行识别
  366. if result_status == 0 or result_status == 3:
  367. # Step 1: 识别
  368. identify_result = get_identify_tool().run(
  369. crawl_data if isinstance(crawl_data, dict) else {}
  370. )
  371. # Step 2: 结构化并入库
  372. affected = UpdateDataTool.store_indentify_result(
  373. request_id,
  374. {
  375. "content_id": content_id,
  376. "task_id": task_id
  377. },
  378. identify_result
  379. )
  380. else:
  381. # result_indentify_data是JSON字符串,需要解析为对象
  382. identify_result = json.loads(result_indentify_data) if isinstance(result_indentify_data, str) else result_indentify_data
  383. affected = result_id
  384. # 使用StructureTool进行内容结构化处理
  385. structure_tool = StructureTool()
  386. structure_result = structure_tool.process_content_structure(identify_result)
  387. # 存储结构化解析结果
  388. parsing_affected = UpdateDataTool.store_parsing_result(
  389. request_id,
  390. {
  391. "id": affected,
  392. "content_id": content_id,
  393. "task_id": task_id
  394. },
  395. structure_result
  396. )
  397. logger.info(f"调试信息: affected={affected}, content_id={content_id}, result_status={result_status}")
  398. ok = affected is not None and affected > 0 and parsing_affected is not None and parsing_affected > 0
  399. if ok:
  400. success = True
  401. else:
  402. success = True
  403. logger.error(f"处理第 {idx} 项时出错: {identify_result.get('error') or structure_result.get('error')}")
  404. # 记录处理详情
  405. detail = {
  406. "index": idx,
  407. "dbInserted": ok,
  408. "identifyError": identify_result.get('error') or structure_result.get('error'),
  409. "status": 2 if ok else 3,
  410. "success": success
  411. }
  412. logger.info(f"处理进度: {request_id} - {idx} - {'成功' if ok else '失败'}")
  413. return detail
  414. except Exception as e:
  415. logger.error(f"处理第 {idx} 项时出错: {e}")
  416. return {
  417. "index": idx,
  418. "dbInserted": False,
  419. "identifyError": str(e),
  420. "status": 3,
  421. "success": False
  422. }
  423. finally:
  424. # 恢复原始API密钥
  425. if 'original_api_key' in locals():
  426. if original_api_key is not None:
  427. os.environ['GEMINI_API_KEY'] = original_api_key
  428. else:
  429. os.environ.pop('GEMINI_API_KEY', None)
  430. def create_langgraph_workflow():
  431. """创建 LangGraph 工作流"""
  432. if not HAS_LANGGRAPH:
  433. return None
  434. # 工作流节点定义
  435. def fetch_data(state: AgentState) -> AgentState:
  436. """获取待处理数据"""
  437. try:
  438. request_id = state["request_id"]
  439. logger.info(f"开始获取数据: requestId={request_id}")
  440. # 更新状态为处理中
  441. update_request_status(request_id, 1)
  442. items = QueryDataTool.fetch_crawl_data_list(request_id)
  443. state["items"] = items
  444. state["processed"] = len(items)
  445. state["status"] = "data_fetched"
  446. logger.info(f"数据获取完成: requestId={request_id}, 数量={len(items)}")
  447. return state
  448. except Exception as e:
  449. logger.error(f"获取数据失败: {e}")
  450. state["error"] = str(e)
  451. state["status"] = "error"
  452. return state
  453. def process_items_batch(state: AgentState) -> AgentState:
  454. """批量处理所有数据项 - 使用多进程并行处理"""
  455. try:
  456. items = state["items"]
  457. if not items:
  458. state["status"] = "completed"
  459. return state
  460. # 获取7个不同的GEMINI API密钥
  461. api_keys = []
  462. default_key = os.getenv('GEMINI_API_KEY') or os.getenv('GEMINI_API_KEY_1')
  463. for i in range(1, 8): # GEMINI_API_KEY_1 到 GEMINI_API_KEY_7
  464. api_key = os.getenv(f'GEMINI_API_KEY_{i}')
  465. if api_key:
  466. api_keys.append(api_key)
  467. else:
  468. logger.warning(f"未找到 GEMINI_API_KEY_{i},使用默认密钥")
  469. api_keys.append(default_key)
  470. # 准备多进程参数,为每个任务分配API密钥
  471. process_args = []
  472. for idx, item in enumerate(items, start=1):
  473. # 循环使用7个API密钥
  474. api_key = api_keys[(idx - 1) % 7]
  475. process_args.append((idx, item, state["request_id"], api_key))
  476. # 使用7个进程并行处理,添加多进程保护
  477. if __name__ == '__main__' or multiprocessing.current_process().name == 'MainProcess':
  478. # 设置多进程启动方法为 'spawn' 以避免 gRPC fork 问题
  479. original_start_method = multiprocessing.get_start_method()
  480. try:
  481. multiprocessing.set_start_method('spawn', force=True)
  482. except RuntimeError:
  483. pass # 如果已经设置过,忽略错误
  484. pool = None
  485. try:
  486. pool = multiprocessing.Pool(processes=7)
  487. with POOLS_LOCK:
  488. ACTIVE_POOLS.append(pool)
  489. logger.info(f"开始多进程处理: 数量={len(process_args)}, 使用7个进程")
  490. results = pool.map(process_single_item, process_args)
  491. except Exception as e:
  492. logger.error(f"多进程处理异常: {e}")
  493. results = []
  494. finally:
  495. if pool is not None:
  496. logger.info("正在关闭多进程池...")
  497. pool.close()
  498. pool.join()
  499. with POOLS_LOCK:
  500. if pool in ACTIVE_POOLS:
  501. ACTIVE_POOLS.remove(pool)
  502. logger.info("多进程池已关闭")
  503. # 恢复原始启动方法
  504. try:
  505. multiprocessing.set_start_method(original_start_method, force=True)
  506. except RuntimeError:
  507. pass
  508. else:
  509. # 如果不在主进程中,回退到串行处理
  510. logger.warning("不在主进程中,回退到串行处理")
  511. results = [process_single_item(args) for args in process_args]
  512. # 统计结果
  513. success_count = sum(1 for result in results if result.get("success", False))
  514. details = [result for result in results]
  515. state["success"] = success_count
  516. state["details"] = details
  517. state["status"] = "completed"
  518. logger.info(f"多进程处理完成: 成功 {success_count}/{len(items)} 项")
  519. return state
  520. except Exception as e:
  521. logger.error(f"批量处理失败: {e}")
  522. state["error"] = str(e)
  523. state["status"] = "error"
  524. return state
  525. def should_continue(state: AgentState) -> str:
  526. """判断是否继续处理"""
  527. if state.get("error"):
  528. # 处理失败,更新状态为3
  529. update_request_status(state["request_id"], 3)
  530. return "end"
  531. # 所有数据处理完毕,更新状态为2
  532. update_request_status(state["request_id"], 2)
  533. return "end"
  534. # 构建工作流图
  535. workflow = StateGraph(AgentState)
  536. # 添加节点
  537. workflow.add_node("fetch_data", fetch_data)
  538. workflow.add_node("process_items_batch", process_items_batch)
  539. # 设置入口点
  540. workflow.set_entry_point("fetch_data")
  541. # 添加边
  542. workflow.add_edge("fetch_data", "process_items_batch")
  543. workflow.add_edge("process_items_batch", END)
  544. # 编译工作流
  545. return workflow.compile()
  546. # 全局工作流实例
  547. WORKFLOW = create_langgraph_workflow() if HAS_LANGGRAPH else None
  548. # =========================
  549. # FastAPI 接口定义
  550. # =========================
  551. @app.get("/")
  552. async def root():
  553. """根路径,返回服务信息"""
  554. return {
  555. "service": "Knowledge Agent API",
  556. "version": "2.0.0",
  557. "status": "running",
  558. "langgraph_enabled": HAS_LANGGRAPH,
  559. "endpoints": {
  560. "parse": "/parse",
  561. "parse/async": "/parse/async",
  562. "health": "/health",
  563. "docs": "/docs"
  564. }
  565. }
  566. @app.get("/health")
  567. async def health_check():
  568. """健康检查接口"""
  569. return {
  570. "status": "healthy",
  571. "timestamp": time.time(),
  572. "langgraph_enabled": HAS_LANGGRAPH
  573. }
  574. @app.post("/parse/async", status_code=200)
  575. async def parse_processing_async(request: TriggerRequest, background_tasks: BackgroundTasks):
  576. """
  577. 异步解析内容处理(后台任务)
  578. - **requestId**: 请求ID,用于标识处理任务
  579. 行为:立即返回 200,并在后台继续处理任务。
  580. 若同一个 requestId 已有任务进行中,则立即返回失败(status=3)。
  581. """
  582. try:
  583. logger.info(f"收到异步解析请求: requestId={request.requestId}")
  584. # 并发防抖:同一 requestId 只允许一个在运行
  585. async with RUNNING_LOCK:
  586. if request.requestId in RUNNING_REQUESTS:
  587. return {
  588. "requestId": request.requestId,
  589. "status": 3,
  590. "message": "已有任务进行中,稍后再试",
  591. "langgraph_enabled": HAS_LANGGRAPH
  592. }
  593. RUNNING_REQUESTS.add(request.requestId)
  594. def _background_wrapper_sync(rid: str):
  595. try:
  596. process_request_background_sync(rid)
  597. finally:
  598. # 使用线程安全的方式移除请求ID
  599. with threading.Lock():
  600. RUNNING_REQUESTS.discard(rid)
  601. # 使用全局线程池提交后台任务
  602. THREAD_POOL.submit(_background_wrapper_sync, request.requestId)
  603. # 立即返回(不阻塞)
  604. return {
  605. "requestId": request.requestId,
  606. "status": 1,
  607. "message": "任务已进入队列并在后台处理",
  608. "langgraph_enabled": HAS_LANGGRAPH
  609. }
  610. except Exception as e:
  611. logger.error(f"提交异步任务失败: {e}")
  612. raise HTTPException(status_code=500, detail=f"提交任务失败: {str(e)}")
  613. def process_request_background_sync(request_id: str):
  614. """后台处理请求(同步版本)"""
  615. try:
  616. logger.info(f"开始后台处理: requestId={request_id}")
  617. if WORKFLOW and HAS_LANGGRAPH:
  618. # 使用 LangGraph 工作流
  619. # 更新状态为处理中
  620. update_request_status(request_id, 1)
  621. initial_state = AgentState(
  622. request_id=request_id,
  623. items=[],
  624. details=[],
  625. processed=0,
  626. success=0,
  627. error=None,
  628. status="started"
  629. )
  630. final_state = WORKFLOW.invoke(
  631. initial_state,
  632. config={
  633. "configurable": {"thread_id": f"thread_{request_id}"},
  634. "recursion_limit": 100 # 增加递归限制
  635. }
  636. )
  637. # 所有数据处理完毕,更新状态为2
  638. update_request_status(request_id, 2)
  639. logger.info(f"LangGraph 后台处理完成: requestId={request_id}, processed={final_state.get('processed', 0)}, success={final_state.get('success', 0)}")
  640. except Exception as e:
  641. logger.error(f"后台处理失败: requestId={request_id}, error={e}")
  642. # 处理失败,更新状态为3
  643. update_request_status(request_id, 3)
  644. extraction_requests: set = set()
  645. @app.post("/extract")
  646. async def extract(request: ExtractRequest):
  647. """
  648. 执行提取处理(异步方式)
  649. Args:
  650. request: 包含请求ID和查询词的请求体
  651. Returns:
  652. dict: 包含执行状态的字典
  653. """
  654. try:
  655. requestId = request.requestId
  656. query = request.query
  657. logger.info(f"收到提取请求: requestId={requestId}, query={query}")
  658. # 并发防抖:同一 requestId 只允许一个在运行
  659. if requestId in extraction_requests:
  660. return {"status": 1, "requestId": requestId, "message": "请求已在处理中"}
  661. extraction_requests.add(requestId)
  662. # 更新状态为处理中
  663. update_extract_status(requestId, 1)
  664. # 创建线程池任务执行Agent
  665. def _execute_extract_sync():
  666. try:
  667. # result = execute_agent_with_api(json.dumps({"query_word": query, "request_id": requestId}))
  668. result = execute(query, requestId)
  669. # 更新状态为处理完成
  670. update_extract_status(requestId, 2)
  671. logger.info(f"异步提取任务完成: requestId={requestId}")
  672. return result
  673. except Exception as e:
  674. logger.error(f"异步提取任务失败: requestId={requestId}, error={e}")
  675. # 更新状态为处理失败
  676. update_extract_status(requestId, 3)
  677. finally:
  678. # 移除请求ID
  679. extraction_requests.discard(requestId)
  680. # 使用全局线程池提交任务
  681. THREAD_POOL.submit(_execute_extract_sync)
  682. # 立即返回状态
  683. return {"status": 1, "requestId": requestId, "message": "提取任务已启动并在后台处理"}
  684. except Exception as e:
  685. logger.error(f"启动提取任务失败: requestId={requestId}, error={e}")
  686. # 发生异常,更新状态为处理失败
  687. update_extract_status(requestId, 3)
  688. # 从运行集合中移除
  689. extraction_requests.discard(requestId)
  690. raise HTTPException(status_code=500, detail=f"启动提取任务失败: {str(e)}")
  691. @app.post("/expand")
  692. async def expand(request: ExpandRequest):
  693. """
  694. 执行扩展查询处理(异步方式)
  695. Args:
  696. request: 包含请求ID和查询词的请求体
  697. Returns:
  698. dict: 包含执行状态的字典
  699. """
  700. try:
  701. requestId = request.requestId
  702. query = request.query
  703. logger.info(f"收到扩展查询请求: requestId={requestId}, query={query}")
  704. # 并发防抖:同一 requestId 只允许一个在运行
  705. expansion_requests = getattr(app.state, 'expansion_requests', set())
  706. # 使用线程锁而不是asyncio锁
  707. with threading.Lock():
  708. if requestId in expansion_requests:
  709. return {"status": 1, "requestId": requestId, "message": "扩展查询已在处理中"}
  710. # 如果集合不存在,创建它
  711. if not hasattr(app.state, 'expansion_requests'):
  712. app.state.expansion_requests = set()
  713. app.state.expansion_requests.add(requestId)
  714. # 立即更新状态为处理中
  715. _update_expansion_status(requestId, 1)
  716. # 创建线程池任务执行扩展Agent
  717. def _execute_expand_sync():
  718. try:
  719. # 直接调用同步函数,使用线程池
  720. result = execute_expand_agent_with_api(requestId, query)
  721. # 更新状态为处理完成
  722. _update_expansion_status(requestId, 2)
  723. logger.info(f"异步扩展查询任务完成: requestId={requestId}")
  724. return result
  725. except Exception as e:
  726. logger.error(f"异步扩展查询任务失败: requestId={requestId}, error={e}")
  727. # 更新状态为处理失败
  728. _update_expansion_status(requestId, 3)
  729. finally:
  730. # 无论成功失败,都从运行集合中移除
  731. with threading.Lock():
  732. if hasattr(app.state, 'expansion_requests'):
  733. app.state.expansion_requests.discard(requestId)
  734. # 使用全局线程池提交任务
  735. THREAD_POOL.submit(_execute_expand_sync)
  736. # 立即返回状态
  737. return {"status": 1, "requestId": requestId, "message": "扩展查询任务已启动并在后台处理"}
  738. except Exception as e:
  739. logger.error(f"启动扩展查询任务失败: requestId={requestId}, error={e}")
  740. # 发生异常,更新状态为处理失败
  741. _update_expansion_status(requestId, 3)
  742. # 从运行集合中移除
  743. async with RUNNING_LOCK:
  744. if hasattr(app.state, 'expansion_requests'):
  745. app.state.expansion_requests.discard(requestId)
  746. raise HTTPException(status_code=500, detail=f"启动扩展查询任务失败: {str(e)}")
  747. except Exception as e:
  748. # 发生异常,更新状态为处理失败
  749. _update_expansion_status(request.requestId, 3)
  750. # 从运行集合中移除
  751. async with RUNNING_LOCK:
  752. if hasattr(app.state, 'expansion_requests'):
  753. app.state.expansion_requests.discard(request.requestId)
  754. raise HTTPException(status_code=500, detail=f"启动扩展查询任务失败: {str(e)}")
  755. def update_extract_status(request_id: str, status: int):
  756. try:
  757. from utils.mysql_db import MysqlHelper
  758. sql = "UPDATE knowledge_request SET extraction_status = %s WHERE request_id = %s"
  759. result = MysqlHelper.update_values(sql, (status, request_id))
  760. if result is not None:
  761. logger.info(f"更新请求状态成功: requestId={request_id}, status={status}")
  762. else:
  763. logger.error(f"更新请求状态失败: requestId={request_id}, status={status}")
  764. except Exception as e:
  765. logger.error(f"更新请求状态异常: requestId={request_id}, status={status}, error={e}")
  766. if __name__ == "__main__":
  767. # 从环境变量获取配置
  768. import os
  769. reload_enabled = os.getenv("RELOAD_ENABLED", "false").lower() == "true"
  770. log_level = os.getenv("LOG_LEVEL", "info")
  771. # 启动服务
  772. uvicorn.run(
  773. "agent:app",
  774. host="0.0.0.0",
  775. port=8080,
  776. reload=reload_enabled, # 通过环境变量控制
  777. log_level=log_level
  778. )