function_knowledge.py 22 KB


  1. '''
  2. 方法知识获取模块
  3. 1. 输入:问题 + 帖子信息 + 账号人设信息
  4. 2. 将输入的问题转化成query,调用大模型,prompt在 function_knowledge_generate_query_prompt.md 中
  5. 3. 从已有方法工具库中尝试选择合适的方法工具(调用大模型执行,prompt在 function_knowledge_select_tools_prompt.md 中),如果有,则返回选择的方法工具,否则:
  6. - 调用 multi_search_knowledge.py 获取知识
  7. - 返回新的方法工具知识
  8. - 异步从新方法知识中获取新工具(调用大模型执行,prompt在 function_knowledge_generate_new_tool_prompt.md 中),调用工具库系统,接入新的工具
  9. 4. 调用选择的方法工具执行验证,返回工具执行结果
  10. '''
  11. import os
  12. import sys
  13. import json
  14. import threading
  15. from loguru import logger
  16. # 设置路径以便导入工具类
  17. current_dir = os.path.dirname(os.path.abspath(__file__))
  18. root_dir = os.path.dirname(current_dir)
  19. sys.path.insert(0, root_dir)
  20. from utils.gemini_client import generate_text
  21. from knowledge_v2.tools_library import call_tool, save_tool_info, get_all_tool_infos, get_tool_info
  22. from knowledge_v2.multi_search_knowledge import get_knowledge as get_multi_search_knowledge
  23. from knowledge_v2.cache_manager import CacheManager
  24. class FunctionKnowledge:
  25. """方法知识获取类"""
  26. def __init__(self, use_cache: bool = True):
  27. """
  28. 初始化
  29. Args:
  30. use_cache: 是否启用缓存,默认启用
  31. """
  32. logger.info("=" * 80)
  33. logger.info("初始化 FunctionKnowledge - 方法知识获取入口")
  34. self.prompt_dir = os.path.join(current_dir, "prompt")
  35. self.use_cache = use_cache
  36. self.cache = CacheManager() if use_cache else None
  37. logger.info(f"缓存状态: {'启用' if use_cache else '禁用'}")
  38. logger.info("=" * 80)
  39. def _load_prompt(self, filename: str) -> str:
  40. """加载prompt文件内容"""
  41. prompt_path = os.path.join(self.prompt_dir, filename)
  42. if not os.path.exists(prompt_path):
  43. raise FileNotFoundError(f"Prompt文件不存在: {prompt_path}")
  44. with open(prompt_path, 'r', encoding='utf-8') as f:
  45. return f.read().strip()
  46. def generate_query(self, question: str, post_info: str, persona_info: str) -> tuple:
  47. """
  48. 生成查询语句
  49. Returns:
  50. tuple: (query, detail_info)
  51. - query: 生成的查询语句
  52. - detail_info: 详细信息dict,包含prompt和response
  53. """
  54. logger.info(f"[步骤1] 生成Query...")
  55. # 组合问题的唯一标识
  56. combined_question = f"{question}||{post_info}||{persona_info}"
  57. detail_info = {"cached": False, "prompt": None, "response": None}
  58. # 尝试从缓存读取
  59. if self.use_cache:
  60. cached_query = self.cache.get(combined_question, 'function_knowledge', 'generated_query.txt')
  61. if cached_query:
  62. logger.info(f"✓ 使用缓存的Query: {cached_query}")
  63. detail_info["cached"] = True
  64. return cached_query, detail_info
  65. try:
  66. prompt_template = self._load_prompt("function_generate_query_prompt.md")
  67. prompt = prompt_template.replace("{question}", question)
  68. detail_info["prompt"] = prompt
  69. logger.info("→ 调用Gemini生成Query...")
  70. query = generate_text(prompt=prompt)
  71. query = query.strip()
  72. detail_info["response"] = query
  73. logger.info(f"✓ 生成Query: {query}")
  74. # 写入缓存
  75. if self.use_cache:
  76. self.cache.set(combined_question, 'function_knowledge', 'generated_query.txt', query)
  77. return query, detail_info
  78. except Exception as e:
  79. logger.error(f"✗ 生成Query失败: {e}")
  80. detail_info["error"] = str(e)
  81. return question, detail_info # 降级使用原问题
  82. def select_tool(self, combined_question: str, query: str) -> tuple:
  83. """
  84. 选择合适的工具
  85. Returns:
  86. tuple: (tool_name, detail_info)
  87. """
  88. logger.info(f"[步骤2] 选择工具...")
  89. detail_info = {"cached": False, "prompt": None, "response": None, "available_tools_count": 0}
  90. # 尝试从缓存读取
  91. if self.use_cache:
  92. cached_tool = self.cache.get(combined_question, 'function_knowledge', 'selected_tool.txt')
  93. if cached_tool:
  94. logger.info(f"✓ 使用缓存的工具: {cached_tool}")
  95. detail_info["cached"] = True
  96. return cached_tool, detail_info
  97. try:
  98. all_tool_infos = self._load_prompt("all_tools_infos.md")
  99. if not all_tool_infos:
  100. logger.info(" 工具库为空,无可用工具")
  101. return "None", detail_info
  102. tool_count = len(all_tool_infos.split('--- Tool:')) - 1
  103. detail_info["available_tools_count"] = tool_count
  104. logger.info(f" 当前可用工具数: {tool_count}")
  105. prompt_template = self._load_prompt("function_knowledge_select_tools_prompt.md")
  106. prompt = prompt_template.replace("{all_tool_infos}", all_tool_infos)
  107. detail_info["prompt"] = prompt
  108. detail_info["tool_infos"] = all_tool_infos
  109. logger.info("→ 调用Gemini选择工具...")
  110. tool_name = generate_text(prompt=prompt)
  111. tool_name = tool_name.strip()
  112. detail_info["response"] = tool_name
  113. logger.info(f"✓ 选择结果: {tool_name}")
  114. # 写入缓存
  115. if self.use_cache:
  116. self.cache.set(combined_question, 'function_knowledge', 'selected_tool.txt', tool_name)
  117. return tool_name, detail_info
  118. except Exception as e:
  119. logger.error(f"✗ 选择工具失败: {e}")
  120. detail_info["error"] = str(e)
  121. return "None", detail_info
  122. def extract_tool_params(self, combined_question: str, tool_name: str, query: str) -> tuple:
  123. """
  124. 根据工具信息和查询提取调用参数
  125. Args:
  126. combined_question: 组合问题(用于缓存)
  127. tool_name: 工具名称
  128. query: 查询内容
  129. Returns:
  130. tuple: (params, detail_info)
  131. """
  132. logger.info(f"[步骤3] 提取工具参数...")
  133. # 初始化detail_info
  134. detail_info = {"cached": False, "prompt": None, "response": None, "tool_info": None}
  135. # 尝试从缓存读取
  136. if self.use_cache:
  137. cached_params = self.cache.get(combined_question, 'function_knowledge', 'tool_params.json')
  138. if cached_params:
  139. logger.info(f"✓ 使用缓存的参数: {cached_params}")
  140. detail_info["cached"] = True
  141. return cached_params, detail_info
  142. try:
  143. # 获取工具信息
  144. tool_info = get_tool_info(tool_name)
  145. if not tool_info:
  146. logger.warning(f" ⚠ 未找到工具 {tool_name} 的信息,使用默认参数")
  147. # 降级:使用query作为keyword
  148. default_params = {"keyword": query}
  149. detail_info["fallback"] = "tool_info_not_found"
  150. return default_params, detail_info
  151. detail_info["tool_info"] = tool_info
  152. logger.info(f" 工具 {tool_name} 信息长度: {len(tool_info)}")
  153. # 加载prompt
  154. prompt_template = self._load_prompt("function_knowledge_extract_tool_params_prompt.md")
  155. prompt = prompt_template.format(
  156. query=query,
  157. tool_info=tool_info
  158. )
  159. detail_info["prompt"] = prompt
  160. # 调用LLM提取参数
  161. logger.info(" → 调用Gemini提取参数...")
  162. response_text = generate_text(prompt=prompt)
  163. detail_info["response"] = response_text
  164. # 解析JSON
  165. logger.info(" → 解析参数JSON...")
  166. try:
  167. # 清理可能的markdown标记
  168. response_text = response_text.strip()
  169. if response_text.startswith("```json"):
  170. response_text = response_text[7:]
  171. if response_text.startswith("```"):
  172. response_text = response_text[3:]
  173. if response_text.endswith("```"):
  174. response_text = response_text[:-3]
  175. response_text = response_text.strip()
  176. params = json.loads(response_text)
  177. logger.info(f"✓ 提取参数成功: {params}")
  178. # 写入缓存
  179. if self.use_cache:
  180. self.cache.set(combined_question, 'function_knowledge', 'tool_params.json', params)
  181. return params, detail_info
  182. except json.JSONDecodeError as e:
  183. logger.error(f" ✗ 解析JSON失败: {e}")
  184. logger.error(f" 响应内容: {response_text}")
  185. # 降级:使用query作为keyword
  186. default_params = {"keyword": query}
  187. logger.warning(f" 使用默认参数: {default_params}")
  188. detail_info["fallback"] = "json_decode_error"
  189. return default_params, detail_info
  190. except Exception as e:
  191. logger.error(f"✗ 提取工具参数失败: {e}")
  192. # 降级:使用query作为keyword
  193. default_params = {"keyword": query}
  194. detail_info["error"] = str(e)
  195. detail_info["fallback"] = "exception"
  196. return default_params, detail_info
  197. def save_knowledge_to_file(self, knowledge: str, combined_question: str):
  198. """保存获取到的知识到文件"""
  199. try:
  200. logger.info("[保存知识] 开始保存知识到文件...")
  201. # 获取问题hash
  202. import hashlib
  203. question_hash = hashlib.md5(combined_question.encode('utf-8')).hexdigest()[:12]
  204. # 获取缓存目录(和execution_record.json同级)
  205. if self.use_cache and self.cache:
  206. cache_dir = os.path.join(self.cache.base_cache_dir, question_hash)
  207. else:
  208. cache_dir = os.path.join(os.path.dirname(__file__), '.cache', question_hash)
  209. os.makedirs(cache_dir, exist_ok=True)
  210. # 保存到knowledge.txt
  211. knowledge_file = os.path.join(cache_dir, 'knowledge.txt')
  212. with open(knowledge_file, 'w', encoding='utf-8') as f:
  213. f.write(knowledge)
  214. logger.info(f"✓ 知识已保存到: {knowledge_file}")
  215. logger.info(f" 知识长度: {len(knowledge)} 字符")
  216. except Exception as e:
  217. logger.error(f"✗ 保存知识失败: {e}")
  218. def get_knowledge(self, question: str, post_info: str, persona_info: str) -> dict:
  219. """
  220. 获取方法知识的主流程
  221. Returns:
  222. dict: 包含完整执行信息的字典
  223. {
  224. "input": {...}, # 原始输入
  225. "execution": {...}, # 执行过程信息
  226. "result": {...}, # 最终结果
  227. "metadata": {...} # 元数据
  228. }
  229. """
  230. import time
  231. start_time = time.time()
  232. timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
  233. logger.info("=" * 80)
  234. logger.info(f"Function Knowledge - 开始处理")
  235. logger.info(f"问题: {question}")
  236. logger.info(f"帖子信息: {post_info}")
  237. logger.info(f"人设信息: {persona_info}")
  238. logger.info("=" * 80)
  239. # 组合问题的唯一标识
  240. combined_question = f"{question}||{post_info}||{persona_info}"
  241. # 初始化执行记录
  242. execution_record = {
  243. "input": {
  244. "question": question,
  245. "post_info": post_info,
  246. "persona_info": persona_info,
  247. "timestamp": timestamp
  248. },
  249. "execution": {
  250. "steps": [],
  251. "tool_info": None,
  252. "knowledge_search_info": None
  253. },
  254. "result": {
  255. "type": None, # "tool" 或 "knowledge"
  256. "content": None,
  257. "raw_data": None
  258. },
  259. "metadata": {
  260. "execution_time": None,
  261. "cache_hits": [],
  262. "errors": []
  263. }
  264. }
  265. # 检查最终结果缓存
  266. if self.use_cache:
  267. cached_final = self.cache.get(combined_question, 'function_knowledge', 'final_result.json')
  268. if cached_final:
  269. logger.info(f"✓ 使用缓存的最终结果")
  270. logger.info("=" * 80 + "\n")
  271. # 如果是完整的执行记录,直接返回
  272. if isinstance(cached_final, dict) and "execution" in cached_final:
  273. return cached_final
  274. # 否则构造一个简单的返回
  275. return {
  276. "input": execution_record["input"],
  277. "execution": {"cached": True},
  278. "result": {"type": "cached", "content": cached_final},
  279. "metadata": {"cache_hit": True}
  280. }
  281. try:
  282. # 步骤1: 生成Query
  283. step1_start = time.time()
  284. query, query_detail = self.generate_query(question, post_info, persona_info)
  285. execution_record["execution"]["steps"].append({
  286. "step": 1,
  287. "name": "generate_query",
  288. "duration": time.time() - step1_start,
  289. "output": query,
  290. "detail": query_detail # 包含prompt和response
  291. })
  292. # 步骤2: 选择工具
  293. step2_start = time.time()
  294. tool_name, tool_select_detail = self.select_tool(combined_question, query)
  295. execution_record["execution"]["steps"].append({
  296. "step": 2,
  297. "name": "select_tool",
  298. "duration": time.time() - step2_start,
  299. "output": tool_name,
  300. "detail": tool_select_detail # 包含prompt、response和可用工具列表
  301. })
  302. result_content = None
  303. if tool_name and tool_name != "None":
  304. # 路径A: 使用工具
  305. execution_record["result"]["type"] = "tool"
  306. # 步骤3: 提取参数
  307. step3_start = time.time()
  308. arguments, params_detail = self.extract_tool_params(combined_question, tool_name, query)
  309. execution_record["execution"]["steps"].append({
  310. "step": 3,
  311. "name": "extract_tool_params",
  312. "duration": time.time() - step3_start,
  313. "output": arguments,
  314. "detail": params_detail # 包含prompt、response和工具信息
  315. })
  316. # 步骤4: 调用工具
  317. logger.info(f"[步骤4] 调用工具: {tool_name}")
  318. # 检查工具调用缓存
  319. if self.use_cache:
  320. cached_tool_result = self.cache.get(combined_question, 'function_knowledge', 'tool_result.json')
  321. if cached_tool_result:
  322. logger.info(f"✓ 使用缓存的工具调用结果")
  323. execution_record["metadata"]["cache_hits"].append("tool_result")
  324. tool_result = cached_tool_result
  325. else:
  326. step4_start = time.time()
  327. logger.info(f" → 调用工具,参数: {arguments}")
  328. tool_result = call_tool(tool_name, arguments)
  329. # 缓存工具调用结果
  330. self.cache.set(combined_question, 'function_knowledge', 'tool_result.json', tool_result)
  331. execution_record["execution"]["steps"].append({
  332. "step": 4,
  333. "name": "call_tool",
  334. "duration": time.time() - step4_start,
  335. "output": "success"
  336. })
  337. else:
  338. step4_start = time.time()
  339. logger.info(f" → 调用工具,参数: {arguments}")
  340. tool_result = call_tool(tool_name, arguments)
  341. execution_record["execution"]["steps"].append({
  342. "step": 4,
  343. "name": "call_tool",
  344. "duration": time.time() - step4_start,
  345. "output": "success"
  346. })
  347. # 记录工具调用信息
  348. execution_record["execution"]["tool_info"] = {
  349. "tool_name": tool_name,
  350. "parameters": arguments,
  351. "result": tool_result
  352. }
  353. result_content = f"工具 {tool_name} 执行结果: {json.dumps(tool_result, ensure_ascii=False)}"
  354. execution_record["result"]["content"] = result_content
  355. execution_record["result"]["raw_data"] = tool_result
  356. logger.info(f"✓ 工具调用完成")
  357. else:
  358. # 路径B: 知识搜索
  359. execution_record["result"]["type"] = "knowledge_search"
  360. logger.info("[步骤4] 未找到合适工具,调用 MultiSearch...")
  361. step4_start = time.time()
  362. knowledge = get_multi_search_knowledge(query, cache_key=combined_question)
  363. execution_record["execution"]["steps"].append({
  364. "step": 4,
  365. "name": "multi_search_knowledge",
  366. "duration": time.time() - step4_start,
  367. "output": f"knowledge_length: {len(knowledge)}"
  368. })
  369. # 记录知识搜索信息
  370. execution_record["execution"]["knowledge_search_info"] = {
  371. "query": query,
  372. "knowledge_length": len(knowledge),
  373. "source": "multi_search"
  374. }
  375. result_content = knowledge
  376. execution_record["result"]["content"] = knowledge
  377. execution_record["result"]["raw_data"] = {"knowledge": knowledge, "query": query}
  378. # 异步生成新工具
  379. logger.info("[后台任务] 启动新工具生成线程...")
  380. threading.Thread(target=self.save_knowledge_to_file, args=(knowledge, combined_question)).start()
  381. # 计算总执行时间
  382. execution_record["metadata"]["execution_time"] = time.time() - start_time
  383. # 保存完整的执行记录到JSON文件
  384. if self.use_cache:
  385. self.cache.set(combined_question, 'function_knowledge', 'final_result.json', execution_record)
  386. # 同时保存一个格式化的JSON文件供人类阅读
  387. from knowledge_v2.cache_manager import CacheManager
  388. cache = CacheManager()
  389. import hashlib
  390. question_hash = hashlib.md5(combined_question.encode('utf-8')).hexdigest()[:12]
  391. output_file = os.path.join(cache.base_cache_dir, question_hash, 'execution_record.json')
  392. try:
  393. with open(output_file, 'w', encoding='utf-8') as f:
  394. json.dump(execution_record, f, ensure_ascii=False, indent=2)
  395. logger.info(f"✓ 完整执行记录已保存: {output_file}")
  396. except Exception as e:
  397. logger.error(f"保存执行记录失败: {e}")
  398. logger.info("=" * 80)
  399. logger.info(f"✓ Function Knowledge 完成")
  400. logger.info(f" 类型: {execution_record['result']['type']}")
  401. logger.info(f" 结果长度: {len(result_content) if result_content else 0}")
  402. logger.info(f" 执行时间: {execution_record['metadata']['execution_time']:.2f}秒")
  403. logger.info("=" * 80 + "\n")
  404. return execution_record
  405. except Exception as e:
  406. logger.error(f"✗ 执行失败: {e}")
  407. import traceback
  408. error_trace = traceback.format_exc()
  409. execution_record["metadata"]["errors"].append({
  410. "error": str(e),
  411. "traceback": error_trace
  412. })
  413. execution_record["result"]["type"] = "error"
  414. execution_record["result"]["content"] = f"执行失败: {str(e)}"
  415. execution_record["metadata"]["execution_time"] = time.time() - start_time
  416. return execution_record
  417. def get_knowledge(question: str, post_info: str, persona_info: str) -> dict:
  418. """
  419. 便捷调用函数
  420. Returns:
  421. dict: 完整的执行记录,包含输入、执行过程、结果和元数据
  422. """
  423. agent = FunctionKnowledge()
  424. return agent.get_knowledge(question, post_info, persona_info)
  425. if __name__ == "__main__":
  426. # 测试代码
  427. question = "教资查分这个信息怎么来的"
  428. post_info = "发帖时间:2025.11.07"
  429. persona_info = ""
  430. try:
  431. agent = FunctionKnowledge()
  432. execution_result = agent.get_knowledge(question, post_info, persona_info)
  433. print("=" * 50)
  434. print("执行结果:")
  435. print("=" * 50)
  436. print(f"类型: {execution_result['result']['type']}")
  437. print(f"内容预览: {execution_result['result']['content'][:200]}...")
  438. print(f"执行时间: {execution_result['metadata']['execution_time']:.2f}秒")
  439. print(f"\n完整JSON已保存到缓存目录")
  440. except Exception as e:
  441. logger.error(f"测试失败: {e}")