agent.py 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204
  1. import sys
  2. import os
  3. from typing import Any, Dict, List
  4. import json
  5. # 添加项目根目录到 Python 路径
  6. sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
  7. from utils.logging_config import get_logger
  8. from utils.mysql_db import MysqlHelper
  9. logger = get_logger('ExpandAgent')
  10. try:
  11. from gemini import GeminiProcessor
  12. HAS_GEMINI = True
  13. except ImportError:
  14. HAS_GEMINI = False
  15. def _fetch_parsing_data_by_request(request_id: str) -> List[str]:
  16. """从 knowledge_extraction_content 表中根据 request_id 获取 data 字段"""
  17. sql = "SELECT data, content_id FROM knowledge_extraction_content WHERE request_id = %s"
  18. rows = MysqlHelper.get_values(sql, (request_id,)) or []
  19. results = []
  20. for row in rows:
  21. data = row[0] # 获取 data 字段
  22. content_id = row[1] # 获取 content_id 字段
  23. if data:
  24. results.append({"data": data, "content_id": content_id})
  25. print(f"Final results: {len(results)} items")
  26. return results
  27. def _build_prompt(data_samples: str, input_query: str) -> str:
  28. """构建用于扩展查询的 Prompt,使用 expansion.md 模板"""
  29. # 读取 expansion.md 模板文件
  30. try:
  31. template_path = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))), "prompt", "expansion.md")
  32. with open(template_path, 'r', encoding='utf-8') as f:
  33. template = f.read()
  34. # 替换模板中的占位符
  35. prompt = template.replace("{Original_Query}", input_query or "无")
  36. prompt = prompt.replace("{Content_For_Expansion}", data_samples)
  37. return prompt
  38. except Exception as e:
  39. logger.error(f"读取 expansion.md 模板失败: {e}")
  40. # 如果模板读取失败,使用备用 prompt
  41. return f"""你是一位顶级的知识库专家,精通语义分析、信息检索和搜索优化策略。
  42. 根据以下原始查询和内容,生成扩展的查询词列表:
  43. **原始查询:** {input_query or "无"}
  44. **扩展要基于的内容:** {data_samples}
  45. 请生成一个JSON数组形式的扩展查询词列表,确保每个查询词都具体、可操作、信息量丰富。
  46. 输出格式:
  47. ```json
  48. [
  49. "查询词1",
  50. "查询词2",
  51. ...
  52. ]
  53. ```"""
  54. def _run_llm(prompt: str) -> List[str]:
  55. """调用LLM生成扩展查询"""
  56. if not HAS_GEMINI:
  57. return []
  58. try:
  59. processor = GeminiProcessor()
  60. result = processor.process(content=prompt, system_prompt="你是专业的查询扩展助手")
  61. print(f"result: {result}")
  62. # 处理返回结果
  63. if isinstance(result, dict):
  64. # 如果有错误,直接返回空列表
  65. if "error" in result:
  66. logger.error(f"Gemini API 返回错误: {result['error']}")
  67. return []
  68. # 如果结果在 result 字段中
  69. text = result.get("result", "") or result.get("raw_response", "")
  70. else:
  71. text = str(result)
  72. # 清理文本,移除 markdown 代码块标记
  73. if "```json" in text:
  74. # 提取 ```json 和 ``` 之间的内容
  75. start = text.find("```json") + 7
  76. end = text.find("```", start)
  77. if end != -1:
  78. text = text[start:end].strip()
  79. elif "```" in text:
  80. # 提取 ``` 之间的内容
  81. start = text.find("```") + 3
  82. end = text.find("```", start)
  83. if end != -1:
  84. text = text[start:end].strip()
  85. # 尝试解析 JSON
  86. try:
  87. queries = json.loads(text)
  88. if isinstance(queries, list):
  89. # 确保所有元素都是字符串
  90. return [str(q) for q in queries if q]
  91. else:
  92. logger.warning(f"Gemini 返回的不是列表格式: {type(queries)}")
  93. return []
  94. except json.JSONDecodeError as e:
  95. logger.error(f"JSON 解析失败: {e}, 原始文本: {text}")
  96. return []
  97. except Exception as e:
  98. logger.error(f"LLM调用失败: {e}")
  99. return []
  100. def _heuristic_expand(input_query: str) -> List[str]:
  101. """启发式扩展(LLM不可用时的fallback)"""
  102. base = input_query.strip()
  103. if not base:
  104. return []
  105. return [
  106. base,
  107. f"{base} 教程",
  108. f"{base} 实战",
  109. f"{base} 入门指南",
  110. f"{base} 高级技巧"
  111. ]
  112. def _update_expansion_status(requestId: str, status: int):
  113. sql = "UPDATE knowledge_request SET expansion_status = %s WHERE request_id = %s"
  114. MysqlHelper.update_values(sql, (status, requestId))
  115. def execute_expand_agent_with_api(requestId: str, query: str = "") -> Dict[str, Any]:
  116. """对外暴露的API:根据requestId查询数据,生成扩展查询"""
  117. # 获取数据(可能多条)
  118. data_samples = _fetch_parsing_data_by_request(requestId)
  119. try:
  120. total = 0
  121. success = 0
  122. if not data_samples:
  123. # 即使没有数据,也基于 query 生成一次兜底扩展
  124. # prompt = _build_prompt("", query)
  125. # expanded = _run_llm(prompt)
  126. # if not expanded:
  127. # expanded = _heuristic_expand(query)
  128. # expand_querys_json = json.dumps(expanded, ensure_ascii=False)
  129. # insert_sql = """
  130. # INSERT INTO knowledge_expand_content
  131. # (request_id, create_time, expand_querys, query)
  132. # VALUES (%s, NOW(), %s, %s)
  133. # """
  134. # MysqlHelper.insert_and_get_id(insert_sql, (requestId, expand_querys_json, query))
  135. # total = 1
  136. # success = 1 if expanded else 0
  137. logger.info(f"没有数据,不进行扩展,直接返回: requestId={requestId}, total={total}, success={success}")
  138. else:
  139. # 针对每条 parsing_data 分别生成与入库
  140. insert_sql = """
  141. INSERT INTO knowledge_expand_content
  142. (request_id, create_time, expand_querys, query, content_id)
  143. VALUES (%s, NOW(), %s, %s, %s)
  144. """
  145. for sample in data_samples:
  146. total += 1
  147. if not sample:
  148. continue
  149. prompt = _build_prompt(sample["data"], query)
  150. expanded = _run_llm(prompt)
  151. if not expanded:
  152. expanded = _heuristic_expand(query)
  153. try:
  154. expand_querys_json = json.dumps(expanded, ensure_ascii=False)
  155. MysqlHelper.insert_and_get_id(insert_sql, (requestId, expand_querys_json, query, sample["content_id"]))
  156. success += 1
  157. except Exception as ie:
  158. logger.error(f"单条扩展结果入库失败: requestId={requestId}, error={ie}")
  159. # 更新状态为处理完成
  160. _update_expansion_status(requestId, 2)
  161. logger.info(f"扩展完成: requestId={requestId}, total={total}, success={success}")
  162. except Exception as e:
  163. logger.error(f"保存扩展查询结果到数据库时出错: requestId={requestId}, error={e}")
  164. _update_expansion_status(requestId, 3)
  165. return {"status": 1, "requestId": requestId}
  166. if __name__ == "__main__":
  167. queries = execute_expand_agent_with_api("REQUEST_001")
  168. print(queries)