agentMerge.py 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. import json
  4. import os
  5. import time
  6. from typing import List, Tuple, Optional, Dict
  7. import requests
  8. from utils.logging_config import get_logger
  9. from utils.mysql_db import MysqlHelper
  10. logger = get_logger('StoreAgent')
  11. # 导入 LLM 处理器
  12. try:
  13. from gemini import GeminiProcessor
  14. HAS_GEMINI = True
  15. except ImportError:
  16. HAS_GEMINI = False
  17. CHUNK_API_URL = "http://61.48.133.26:8001/api/chunk"
  18. SCORE_THRESHOLD = 70
  19. def _update_store_status(request_id: str, status: int) -> None:
  20. try:
  21. sql = "UPDATE knowledge_request SET store_status = %s WHERE request_id = %s"
  22. MysqlHelper.update_values(sql, (status, request_id))
  23. logger.info(f"更新store状态成功: requestId={request_id}, status={status}")
  24. except Exception as e:
  25. logger.error(f"更新store状态失败: requestId={request_id}, status={status}, error={e}")
  26. def _update_store_content_status(request_id: str, dataset_id: int, data: str, doc_id: str, query: str) -> None:
  27. try:
  28. sql = "INSERT INTO knowledge_store_content (request_id, doc_id, query, data, dataset_id, create_timestamp) VALUES (%s, %s, %s, %s, %s, NOW())"
  29. MysqlHelper.update_values(sql, (request_id, doc_id, query, data, dataset_id))
  30. logger.info(f"存储store_content成功: requestId={request_id}, doc_id={doc_id}, query={query}, data={data}, dataset_id={dataset_id}")
  31. except Exception as e:
  32. logger.error(f"存储store_content失败: requestId={request_id or 'None'}, doc_id={doc_id or 'None'}, query={query or 'None'}, data={data or 'None'}, dataset_id={dataset_id or 'None'}, error={e}")
  33. def _fetch_query(request_id: str) -> str:
  34. sql = "SELECT query FROM knowledge_request WHERE request_id = %s"
  35. rows = MysqlHelper.get_values(sql, (request_id,))
  36. if rows and len(rows[0]) > 0:
  37. return rows[0][0] or ""
  38. return ""
  39. def _fetch_extraction_data(request_id: str) -> List[Dict[str, str]]:
  40. sql = (
  41. "SELECT parsing_id, data FROM knowledge_extraction_content "
  42. "WHERE request_id = %s AND data IS NOT NULL AND data != '' AND score >= %s"
  43. )
  44. rows = MysqlHelper.get_values(sql, (request_id, SCORE_THRESHOLD))
  45. if not rows:
  46. return []
  47. return [{"parsing_id": str(row[0]), "data": row[1]} for row in rows]
  48. def _fetch_content_id_by_parsing_id(parsing_id: str) -> Optional[str]:
  49. sql = "SELECT content_id FROM knowledge_parsing_content WHERE id = %s"
  50. rows = MysqlHelper.get_values(sql, (parsing_id,))
  51. if rows and len(rows[0]) > 0:
  52. return rows[0][0]
  53. return None
  54. def _fetch_channel_by_content_id(content_id: str) -> Optional[str]:
  55. sql = "SELECT channel FROM knowledge_crawl_content WHERE content_id = %s LIMIT 1"
  56. rows = MysqlHelper.get_values(sql, (content_id,))
  57. if rows and len(rows[0]) > 0:
  58. return rows[0][0]
  59. return None
  60. def _resolve_dataset_id(request_id: str) -> int:
  61. """根据 knowledge_query.knowledge_type 解析 dataset_id"""
  62. try:
  63. sql = "SELECT knowledge_type FROM knowledge_query WHERE request_id = %s ORDER BY id DESC LIMIT 1"
  64. rows = MysqlHelper.get_values(sql, (request_id,))
  65. if rows:
  66. knowledge_type = rows[0][0] or ""
  67. if knowledge_type == "工具知识":
  68. return 12
  69. if knowledge_type == "内容知识":
  70. return 11
  71. except Exception as e:
  72. logger.warning(f"解析dataset_id失败,使用默认: requestId={request_id}, error={e}")
  73. # 默认兜底
  74. return 12
  75. def _load_system_prompt() -> str:
  76. """加载 merge.md 作为 system prompt"""
  77. try:
  78. template_path = os.path.join(
  79. os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))),
  80. "prompt",
  81. "merge.md"
  82. )
  83. with open(template_path, 'r', encoding='utf-8') as f:
  84. return f.read()
  85. except Exception as e:
  86. logger.error(f"读取 merge.md 失败: {e}")
  87. # 备用 system prompt
  88. return """你是一名专业的知识整合专家,负责根据同一 query 下的多份互联网清洗知识,生成一份融合性强、内容准确、表达自然的综合知识文本。
  89. 请根据提供的 query 与其对应的多个知识片段,融合生成一份高质量的综合知识文本。
  90. 输出一篇完整的综合知识文本,不输出任何解释、提示语或"以下是结果"。"""
  91. def _build_user_message(query: str, data_list: List[Dict[str, str]]) -> str:
  92. """按照 merge.md 格式构建用户消息"""
  93. # 构建知识片段列表
  94. knowledge_parts = []
  95. for idx, item in enumerate(data_list, 1):
  96. knowledge_parts.append(f"[知识片段{idx}]\n{item.get('data', '')}")
  97. knowledge_text = "\n\n".join(knowledge_parts)
  98. # 按照 merge.md 中的输入格式构建消息
  99. return f"[query]\n{query}\n\n{knowledge_text}"
  100. def _merge_with_llm(query: str, data_list: List[Dict[str, str]]) -> Optional[str]:
  101. """调用 Gemini LLM 进行知识合并"""
  102. if not HAS_GEMINI:
  103. logger.error("Gemini 处理器不可用")
  104. return None
  105. try:
  106. # 加载 system prompt
  107. system_prompt = _load_system_prompt()
  108. # 构建用户消息
  109. user_message = _build_user_message(query, data_list)
  110. # 调用 Gemini
  111. processor = GeminiProcessor()
  112. result = processor.process(content=user_message, system_prompt=system_prompt)
  113. # 处理返回结果
  114. if isinstance(result, dict):
  115. if "error" in result:
  116. logger.error(f"Gemini API 返回错误: {result['error']}")
  117. return None
  118. text = result.get("result", "") or result.get("raw_response", "")
  119. else:
  120. text = str(result)
  121. logger.info(f"Gemini 合并成功,结果长度: {len(text) if text else 0}")
  122. return text if text else None
  123. except Exception as e:
  124. logger.error(f"Gemini 调用失败: {e}")
  125. return None
  126. def _upload_chunk(text: str, query: str, channel: str = "", dataset_id: int = 12, request_id: str = None, max_retries: int = 1, backoff_sec: float = 1.0) -> bool:
  127. # ext 需要是字符串 JSON
  128. payload = {
  129. "dataset_id": dataset_id,
  130. "title": "",
  131. "text": text,
  132. "ext": json.dumps({"query": query, "channel": channel or ""}, ensure_ascii=False),
  133. }
  134. headers = {"Content-Type": "application/json; charset=utf-8"}
  135. for attempt in range(max_retries):
  136. try:
  137. # 以 GET 方法发送,body 为 JSON 字符串
  138. body = json.dumps(payload, ensure_ascii=False).encode('utf-8')
  139. resp = requests.post(CHUNK_API_URL, headers=headers, data=body, timeout=30)
  140. try:
  141. logger.info(f"上传chunk成功: resp={resp.json()}")
  142. except Exception:
  143. logger.info(f"上传chunk返回非JSON: text={resp.text[:500]}")
  144. if resp.json().get("doc_id"):
  145. # 取出doc_id,存储到knowledge_store_content表的doc_id字段
  146. _update_store_content_status(request_id, dataset_id, text, resp.json().get("doc_id"), query)
  147. return True
  148. logger.warning(f"上传失败,状态码: {resp.status_code}, 第{attempt+1}次重试")
  149. except Exception as e:
  150. logger.warning(f"上传异常: {e}, 第{attempt+1}次重试")
  151. time.sleep(backoff_sec * (2 ** attempt))
  152. return False
  153. def execute_store_agent(request_id: str) -> Tuple[int, int]:
  154. """
  155. 执行存储流程:
  156. 1) 更新 store_status = 1
  157. 2) 读取 query 和符合条件的 data 列表
  158. 3) 使用 merge.md 结合 query 和内容列表进行汇总合并
  159. 4) 仅上传合并后的结果
  160. 5) 成功 -> store_status = 2;失败 -> store_status = 3
  161. 返回: (total, success)
  162. """
  163. _update_store_status(request_id, 1)
  164. try:
  165. # 1. 获取 query 和数据列表
  166. query = _fetch_query(request_id)
  167. data_list = _fetch_extraction_data(request_id)
  168. dataset_id = _resolve_dataset_id(request_id)
  169. total = len(data_list)
  170. if total == 0:
  171. # 没有可上传数据,按失败处理
  172. _update_store_status(request_id, 2)
  173. _update_store_content_status(request_id, dataset_id, '', '', query)
  174. logger.info(f"无可上传数据: requestId={request_id}")
  175. return (0, 0)
  176. logger.info(f"开始合并知识: requestId={request_id}, query={query}, 片段数={total}")
  177. # 2. 使用 LLM 合并内容
  178. merged_content = _merge_with_llm(query, data_list)
  179. if not merged_content:
  180. # 合并失败,按失败处理
  181. _update_store_status(request_id, 3)
  182. logger.error(f"知识合并失败: requestId={request_id}")
  183. return (total, 0)
  184. logger.info(f"知识合并成功: requestId={request_id}, 合并后内容={merged_content[:500]}")
  185. # 4. 上传合并后的内容
  186. ok = _upload_chunk(merged_content, query, '', dataset_id, request_id)
  187. if ok:
  188. _update_store_status(request_id, 2)
  189. logger.info(f"store完成: requestId={request_id}, 原始片段数={total}, 合并后上传成功")
  190. return (total, 1)
  191. else:
  192. _update_store_status(request_id, 3)
  193. logger.error(f"store失败: requestId={request_id}, 上传合并内容失败")
  194. return (total, 0)
  195. except Exception as e:
  196. logger.error(f"store流程异常: requestId={request_id}, error={e}")
  197. _update_store_status(request_id, 3)
  198. return (0, 0)