function_knowledge.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478
  1. '''
  2. 方法知识获取模块
  3. 1. 输入:问题 + 帖子信息 + 账号人设信息
  4. 2. 将输入的问题转化成query,调用大模型,prompt在 function_knowledge_generate_query_prompt.md 中
  5. 3. 从已有方法工具库中尝试选择合适的方法工具(调用大模型执行,prompt在 function_knowledge_select_tools_prompt.md 中),如果有,则返回选择的方法工具,否则:
  6. - 调用 multi_search_knowledge.py 获取知识
  7. - 返回新的方法工具知识
  8. - 异步从新方法知识中获取新工具(调用大模型执行,prompt在 function_knowledge_generate_new_tool_prompt.md 中),调用工具库系统,接入新的工具
  9. 4. 调用选择的方法工具执行验证,返回工具执行结果
  10. '''
  11. import os
  12. import sys
  13. import json
  14. import threading
  15. from loguru import logger
  16. import re
  17. # 设置路径以便导入工具类
  18. current_dir = os.path.dirname(os.path.abspath(__file__))
  19. root_dir = os.path.dirname(current_dir)
  20. sys.path.insert(0, root_dir)
  21. from utils.gemini_client import generate_text
  22. from knowledge_v2.tools_library import call_tool, save_tool_info, get_all_tool_infos, get_tool_info, get_tool_params
  23. from knowledge_v2.multi_search_knowledge import get_knowledge as get_multi_search_knowledge
  24. from knowledge_v2.cache_manager import CacheManager
  25. class FunctionKnowledge:
  26. """方法知识获取类"""
  27. def __init__(self, use_cache: bool = True):
  28. """
  29. 初始化
  30. Args:
  31. use_cache: 是否启用缓存,默认启用
  32. """
  33. logger.info("=" * 80)
  34. logger.info("初始化 FunctionKnowledge - 方法知识获取入口")
  35. self.prompt_dir = os.path.join(current_dir, "prompt")
  36. self.use_cache = use_cache
  37. self.cache = CacheManager() if use_cache else None
  38. logger.info(f"缓存状态: {'启用' if use_cache else '禁用'}")
  39. logger.info("=" * 80)
  40. def _load_prompt(self, filename: str) -> str:
  41. """加载prompt文件内容"""
  42. prompt_path = os.path.join(self.prompt_dir, filename)
  43. if not os.path.exists(prompt_path):
  44. raise FileNotFoundError(f"Prompt文件不存在: {prompt_path}")
  45. with open(prompt_path, 'r', encoding='utf-8') as f:
  46. return f.read().strip()
  47. def generate_query(self, question: str, post_info: str, persona_info: str) -> str:
  48. """
  49. 生成查询语句
  50. Returns:
  51. str: 生成的查询语句
  52. """
  53. logger.info(f"[步骤1] 生成Query...")
  54. # 组合问题的唯一标识
  55. combined_question = f"{question}||{post_info}||{persona_info}"
  56. try:
  57. prompt_template = self._load_prompt("function_generate_query_prompt.md")
  58. prompt = prompt_template.format(
  59. question=question,
  60. post_info=post_info,
  61. persona_info=persona_info
  62. )
  63. # 尝试从缓存读取
  64. if self.use_cache:
  65. cached_data = self.cache.get(combined_question, 'function_knowledge', 'generated_query.json')
  66. if cached_data:
  67. query = cached_data.get('query', cached_data.get('response', ''))
  68. logger.info(f"✓ 使用缓存的Query: {query}")
  69. return query
  70. logger.info("→ 调用Gemini生成Query...")
  71. query = generate_text(prompt=prompt)
  72. query = query.strip()
  73. logger.info(f"✓ 生成Query: {query}")
  74. # 保存到缓存(包含完整的prompt和response)
  75. if self.use_cache:
  76. query_data = {
  77. "prompt": prompt,
  78. "response": query,
  79. "query": query
  80. }
  81. self.cache.set(combined_question, 'function_knowledge', 'generated_query.json', query_data)
  82. return query
  83. except Exception as e:
  84. logger.error(f"✗ 生成Query失败: {e}")
  85. return question # 降级使用原问题
  86. def select_tool(self, combined_question: str, query: str) -> str:
  87. """
  88. 选择合适的工具
  89. Returns:
  90. str: 工具名称,如果没有合适的工具则返回"None"
  91. """
  92. logger.info(f"[步骤2] 选择工具...")
  93. try:
  94. all_tool_infos = self._load_prompt("all_tools_infos.md")
  95. if not all_tool_infos:
  96. logger.info(" 工具库为空,无可用工具")
  97. return "None"
  98. prompt_template = self._load_prompt("function_knowledge_select_tools_prompt.md")
  99. prompt = prompt_template.replace("{all_tool_infos}", all_tool_infos).replace("query", query)
  100. # 尝试从缓存读取
  101. if self.use_cache:
  102. cached_data = self.cache.get(combined_question, 'function_knowledge', 'selected_tool.json')
  103. if cached_data:
  104. result_json = cached_data.get('response', {})
  105. logger.info(f"✓ 使用缓存的工具: {result_json}")
  106. return result_json
  107. logger.info("→ 调用Gemini选择工具...")
  108. result = generate_text(prompt=prompt)
  109. result = self.extract_and_validate_json(result)
  110. if not result:
  111. logger.error("✗ 选择工具失败: 无法提取有效JSON")
  112. return "None"
  113. result_json = json.loads(result)
  114. logger.info(f"✓ 选择结果: {result_json.get('工具名', 'None')}")
  115. # 保存到缓存(包含完整的prompt和response)
  116. if self.use_cache:
  117. tool_data = {
  118. "prompt": prompt,
  119. "response": result_json
  120. }
  121. self.cache.set(combined_question, 'function_knowledge', 'selected_tool.json', tool_data)
  122. return result_json
  123. except Exception as e:
  124. logger.error(f"✗ 选择工具失败: {e}")
  125. return "None"
  126. def extract_and_validate_json(self, text: str):
  127. """
  128. 从字符串中提取 JSON 部分,并返回标准的 JSON 字符串。
  129. 如果无法提取或解析失败,返回 None (或者你可以改为抛出异常)。
  130. """
  131. # 1. 使用正则表达式寻找最大的 JSON 块
  132. # r"(\{[\s\S]*\}|\[[\s\S]*\])" 的含义:
  133. # - \{[\s\S]*\} : 匹配以 { 开头,} 结尾的最长字符串([\s\S] 包含换行符)
  134. # - | : 或者
  135. # - \[[\s\S]*\] : 匹配以 [ 开头,] 结尾的最长字符串(处理 JSON 数组)
  136. match = re.search(r"(\{[\s\S]*\}|\[[\s\S]*\])", text)
  137. if match:
  138. json_str = match.group(0)
  139. try:
  140. # 2. 尝试解析提取出的字符串,验证是否为合法 JSON
  141. parsed_json = json.loads(json_str)
  142. # 3. 重新转储为标准字符串 (去除原本可能存在的缩进、多余空格等)
  143. # ensure_ascii=False 保证中文不会变成 \uXXXX
  144. return json.dumps(parsed_json, ensure_ascii=False)
  145. except json.JSONDecodeError as e:
  146. print(f"提取到了类似JSON的片段,但解析失败: {e}")
  147. return None
  148. else:
  149. print("未在文本中发现 JSON 结构")
  150. return None
  151. def extract_tool_params(self, combined_question: str, query: str, tool_id: str, tool_instructions: str) -> dict:
  152. """
  153. 根据工具信息和查询提取调用参数
  154. Args:
  155. combined_question: 组合问题(用于缓存)
  156. tool_name: 工具名称
  157. query: 查询内容
  158. Returns:
  159. dict: 提取的参数字典
  160. """
  161. logger.info(f"[步骤3] 提取工具参数...")
  162. try:
  163. # 获取工具信息
  164. tool_params = get_tool_params(tool_id)
  165. if not tool_params:
  166. logger.warning(f" ⚠ 未找到工具 {tool_id} 的信息,使用默认参数")
  167. return {"keyword": query}
  168. # 加载prompt
  169. prompt_template = self._load_prompt("function_knowledge_extract_tool_params_prompt.md")
  170. prompt = prompt_template.format(
  171. tool_mcp_name=tool_id,
  172. tool_instructions=tool_instructions,
  173. all_tool_params=tool_params
  174. )
  175. # 尝试从缓存读取
  176. if self.use_cache:
  177. cached_data = self.cache.get(combined_question, 'function_knowledge', 'extracted_params.json')
  178. if cached_data:
  179. params = cached_data.get('params', {})
  180. logger.info(f"✓ 使用缓存的参数: {params}")
  181. return params
  182. # 调用LLM提取参数
  183. logger.info(" → 调用Gemini提取参数...")
  184. response_text = generate_text(prompt=prompt)
  185. # 解析JSON
  186. logger.info(" → 解析参数JSON...")
  187. try:
  188. # 清理可能的markdown标记
  189. response_text = response_text.strip()
  190. if response_text.startswith("```json"):
  191. response_text = response_text[7:]
  192. if response_text.startswith("```"):
  193. response_text = response_text[3:]
  194. if response_text.endswith("```"):
  195. response_text = response_text[:-3]
  196. response_text = response_text.strip()
  197. params = json.loads(response_text)
  198. logger.info(f"✓ 提取参数成功: {params}")
  199. # 保存到缓存(包含完整的prompt和response)
  200. if self.use_cache:
  201. params_data = {
  202. "prompt": prompt,
  203. "response": response_text,
  204. "params": params
  205. }
  206. self.cache.set(combined_question, 'function_knowledge', 'extracted_params.json', params_data)
  207. return params
  208. except json.JSONDecodeError as e:
  209. logger.error(f" ✗ 解析JSON失败: {e}")
  210. logger.error(f" 响应内容: {response_text}")
  211. # 降级:使用query作为keyword
  212. default_params = {"keyword": query}
  213. logger.warning(f" 使用默认参数: {default_params}")
  214. return default_params
  215. except Exception as e:
  216. logger.error(f"✗ 提取工具参数失败: {e}")
  217. # 降级:使用query作为keyword
  218. return {"keyword": query}
  219. def save_knowledge_to_file(self, knowledge: str, combined_question: str):
  220. """保存获取到的知识到文件"""
  221. try:
  222. logger.info("[保存知识] 开始保存知识到文件...")
  223. # 获取问题hash
  224. import hashlib
  225. question_hash = hashlib.md5(combined_question.encode('utf-8')).hexdigest()[:12]
  226. # 获取缓存目录(和execution_record.json同级)
  227. if self.use_cache and self.cache:
  228. cache_dir = os.path.join(self.cache.base_cache_dir, question_hash)
  229. else:
  230. cache_dir = os.path.join(os.path.dirname(__file__), '.cache', question_hash)
  231. os.makedirs(cache_dir, exist_ok=True)
  232. # 保存到knowledge.txt
  233. knowledge_file = os.path.join(cache_dir, 'knowledge.txt')
  234. with open(knowledge_file, 'w', encoding='utf-8') as f:
  235. f.write(knowledge)
  236. logger.info(f"✓ 知识已保存到: {knowledge_file}")
  237. logger.info(f" 知识长度: {len(knowledge)} 字符")
  238. except Exception as e:
  239. logger.error(f"✗ 保存知识失败: {e}")
  240. def organize_tool_result(self, tool_result: dict) -> dict:
  241. """
  242. 组织工具调用结果,确保包含必要字段
  243. Args:
  244. tool_result: 原始工具调用结果
  245. Returns:
  246. dict: 组织后的工具调用结果
  247. """
  248. prompt_template = self._load_prompt("tool_result_prettify_prompt.md")
  249. prompt = prompt_template.format(
  250. input=tool_result,
  251. )
  252. organized_result = generate_text(prompt=prompt)
  253. organized_result = organized_result.strip()
  254. return organized_result
  255. def get_knowledge(self, question: str, post_info: str, persona_info: str) -> dict:
  256. """
  257. 获取方法知识的主流程(重构后)
  258. Returns:
  259. dict: 完整的执行记录
  260. """
  261. import time
  262. timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
  263. start_time = time.time()
  264. logger.info("=" * 80)
  265. logger.info(f"Function Knowledge - 开始处理")
  266. logger.info(f"问题: {question}")
  267. logger.info(f"帖子信息: {post_info}")
  268. logger.info(f"人设信息: {persona_info}")
  269. logger.info("=" * 80)
  270. # 组合问题的唯一标识
  271. combined_question = f"{question}||{post_info}||{persona_info}"
  272. try:
  273. # 步骤1: 生成Query
  274. query = self.generate_query(question, post_info, persona_info)
  275. # 步骤2: 选择工具
  276. tool_info = self.select_tool(combined_question, query)
  277. # tool_name = tool_info.get("工具名")
  278. tool_id = tool_info.get("工具调用ID")
  279. tool_instructions = tool_info.get("使用方法")
  280. if tool_id and tool_instructions:
  281. # 路径A: 使用工具
  282. # 步骤3: 提取参数
  283. arguments = self.extract_tool_params(combined_question, query, tool_id, tool_instructions)
  284. # 步骤4: 调用工具
  285. logger.info(f"[步骤4] 调用工具: {tool_id}")
  286. # 检查工具调用缓存
  287. if self.use_cache:
  288. cached_tool_call = self.cache.get(combined_question, 'function_knowledge', 'tool_call.json')
  289. if cached_tool_call:
  290. logger.info(f"✓ 使用缓存的工具调用结果")
  291. response = cached_tool_call.get('response', {})
  292. tool_result = self.organize_tool_result(response)
  293. # 保存工具调用信息(包含工具名、入参、结果)
  294. tool_call_data = {
  295. "tool_name": tool_id,
  296. "arguments": arguments,
  297. "result": tool_result,
  298. "response": response
  299. }
  300. self.cache.set(combined_question, 'function_knowledge', 'tool_call.json', tool_call_data)
  301. else:
  302. logger.info(f" → 调用工具,参数: {arguments}")
  303. rs = call_tool(tool_id, arguments)
  304. tool_result = self.organize_tool_result(rs)
  305. # 保存工具调用信息(包含工具名、入参、结果)
  306. tool_call_data = {
  307. "tool_name": tool_id,
  308. "arguments": arguments,
  309. "result": tool_result,
  310. "response": rs
  311. }
  312. self.cache.set(combined_question, 'function_knowledge', 'tool_call.json', tool_call_data)
  313. else:
  314. logger.info(f" → 调用工具,参数: {arguments}")
  315. tool_result = call_tool(tool_id, arguments)
  316. logger.info(f"✓ 工具调用完成")
  317. else:
  318. # 路径B: 知识搜索
  319. logger.info("[步骤4] 未找到合适工具,调用 MultiSearch...")
  320. knowledge = get_multi_search_knowledge(query, cache_key=combined_question)
  321. # 异步保存知识到文件
  322. logger.info("[后台任务] 保存知识到文件...")
  323. threading.Thread(target=self.save_knowledge_to_file, args=(knowledge, combined_question)).start()
  324. # 计算执行时间
  325. execution_time = time.time() - start_time
  326. # 收集所有执行记录
  327. logger.info("=" * 80)
  328. logger.info("收集执行记录...")
  329. logger.info("=" * 80)
  330. from knowledge_v2.execution_collector import collect_and_save_execution_record
  331. execution_record = collect_and_save_execution_record(
  332. combined_question,
  333. {
  334. "question": question,
  335. "post_info": post_info,
  336. "persona_info": persona_info,
  337. "timestamp": timestamp
  338. }
  339. )
  340. logger.info("=" * 80)
  341. logger.info(f"✓ Function Knowledge 完成")
  342. logger.info(f" 执行时间: {execution_record.get('metadata', {}).get('execution_time', 0):.2f}秒")
  343. logger.info("=" * 80 + "\n")
  344. return execution_record
  345. except Exception as e:
  346. logger.error(f"✗ 执行失败: {e}")
  347. import traceback
  348. logger.error(traceback.format_exc())
  349. # 即使失败也尝试收集记录
  350. try:
  351. execution_time = time.time() - start_time
  352. from knowledge_v2.execution_collector import collect_and_save_execution_record
  353. execution_record = collect_and_save_execution_record(
  354. combined_question,
  355. {
  356. "question": question,
  357. "post_info": post_info,
  358. "persona_info": persona_info,
  359. "timestamp": timestamp
  360. }
  361. )
  362. return execution_record
  363. except Exception as collect_error:
  364. logger.error(f"收集执行记录也失败: {collect_error}")
  365. # 返回基本错误信息
  366. return {
  367. "input": {
  368. "question": question,
  369. "post_info": post_info,
  370. "persona_info": persona_info,
  371. "timestamp": timestamp
  372. },
  373. "result": {
  374. "type": "error",
  375. "content": f"执行失败: {str(e)}"
  376. },
  377. "metadata": {
  378. "errors": [str(e)]
  379. }
  380. }
  381. if __name__ == "__main__":
  382. # 测试代码
  383. question = "花艺造型蛋糕这个选题点怎么来的"
  384. post_info = "发帖时间:2025-09-16"
  385. persona_info = ""
  386. try:
  387. agent = FunctionKnowledge()
  388. execution_result = agent.get_knowledge(question, post_info, persona_info)
  389. print("=" * 50)
  390. print("执行结果:")
  391. print("=" * 50)
  392. print(json.dumps(execution_result, ensure_ascii=False, indent=2))
  393. print(f"\n完整JSON已保存到缓存目录")
  394. except Exception as e:
  395. logger.error(f"测试失败: {e}")