''' 方法知识获取模块 1. 输入:问题 + 帖子信息 + 账号人设信息 2. 将输入的问题转化成query,调用大模型,prompt在 function_knowledge_generate_query_prompt.md 中 3. 从已有方法工具库中尝试选择合适的方法工具(调用大模型执行,prompt在 function_knowledge_select_tools_prompt.md 中),如果有,则返回选择的方法工具,否则: - 调用 multi_search_knowledge.py 获取知识 - 返回新的方法工具知识 - 异步从新方法知识中获取新工具(调用大模型执行,prompt在 function_knowledge_generate_new_tool_prompt.md 中),调用工具库系统,接入新的工具 4. 调用选择的方法工具执行验证,返回工具执行结果 ''' import os import sys import json import threading from loguru import logger # 设置路径以便导入工具类 current_dir = os.path.dirname(os.path.abspath(__file__)) root_dir = os.path.dirname(current_dir) sys.path.insert(0, root_dir) from utils.gemini_client import generate_text from knowledge_v2.tools_library import call_tool, save_tool_info, get_all_tool_infos, get_tool_info from knowledge_v2.multi_search_knowledge import get_knowledge as get_multi_search_knowledge from knowledge_v2.cache_manager import CacheManager class FunctionKnowledge: """方法知识获取类""" def __init__(self, use_cache: bool = True): """ 初始化 Args: use_cache: 是否启用缓存,默认启用 """ logger.info("=" * 80) logger.info("初始化 FunctionKnowledge - 方法知识获取入口") self.prompt_dir = os.path.join(current_dir, "prompt") self.use_cache = use_cache self.cache = CacheManager() if use_cache else None logger.info(f"缓存状态: {'启用' if use_cache else '禁用'}") logger.info("=" * 80) def _load_prompt(self, filename: str) -> str: """加载prompt文件内容""" prompt_path = os.path.join(self.prompt_dir, filename) if not os.path.exists(prompt_path): raise FileNotFoundError(f"Prompt文件不存在: {prompt_path}") with open(prompt_path, 'r', encoding='utf-8') as f: return f.read().strip() def generate_query(self, question: str, post_info: str, persona_info: str) -> tuple: """ 生成查询语句 Returns: tuple: (query, detail_info) - query: 生成的查询语句 - detail_info: 详细信息dict,包含prompt和response """ logger.info(f"[步骤1] 生成Query...") # 组合问题的唯一标识 combined_question = f"{question}||{post_info}||{persona_info}" detail_info = {"cached": False, "prompt": None, "response": None} # 尝试从缓存读取 if self.use_cache: cached_query = self.cache.get(combined_question, 'function_knowledge', 'generated_query.txt') if cached_query: logger.info(f"✓ 使用缓存的Query: {cached_query}") detail_info["cached"] = True return cached_query, detail_info try: prompt_template = self._load_prompt("function_generate_query_prompt.md") prompt = prompt_template.format( question=question, post_info=post_info, persona_info=persona_info ) detail_info["prompt"] = prompt logger.info("→ 调用Gemini生成Query...") query = generate_text(prompt=prompt) query = query.strip() detail_info["response"] = query logger.info(f"✓ 生成Query: {query}") # 写入缓存 if self.use_cache: self.cache.set(combined_question, 'function_knowledge', 'generated_query.txt', query) return query, detail_info except Exception as e: logger.error(f"✗ 生成Query失败: {e}") detail_info["error"] = str(e) return question, detail_info # 降级使用原问题 def select_tool(self, combined_question: str, query: str) -> tuple: """ 选择合适的工具 Returns: tuple: (tool_name, detail_info) """ logger.info(f"[步骤2] 选择工具...") detail_info = {"cached": False, "prompt": None, "response": None, "available_tools_count": 0} # 尝试从缓存读取 if self.use_cache: cached_tool = self.cache.get(combined_question, 'function_knowledge', 'selected_tool.txt') if cached_tool: logger.info(f"✓ 使用缓存的工具: {cached_tool}") detail_info["cached"] = True return cached_tool, detail_info try: all_tool_infos = get_all_tool_infos() if not all_tool_infos: logger.info(" 工具库为空,无可用工具") return "None", detail_info tool_count = len(all_tool_infos.split('--- Tool:')) - 1 detail_info["available_tools_count"] = tool_count logger.info(f" 当前可用工具数: {tool_count}") prompt_template = self._load_prompt("function_knowledge_select_tools_prompt.md") prompt = prompt_template.format( query=query, tool_infos=all_tool_infos ) detail_info["prompt"] = prompt detail_info["tool_infos"] = all_tool_infos logger.info("→ 调用Gemini选择工具...") tool_name = generate_text(prompt=prompt) tool_name = tool_name.strip() detail_info["response"] = tool_name logger.info(f"✓ 选择结果: {tool_name}") # 写入缓存 if self.use_cache: self.cache.set(combined_question, 'function_knowledge', 'selected_tool.txt', tool_name) return tool_name, detail_info except Exception as e: logger.error(f"✗ 选择工具失败: {e}") detail_info["error"] = str(e) return "None", detail_info def extract_tool_params(self, combined_question: str, tool_name: str, query: str) -> tuple: """ 根据工具信息和查询提取调用参数 Args: combined_question: 组合问题(用于缓存) tool_name: 工具名称 query: 查询内容 Returns: tuple: (params, detail_info) """ logger.info(f"[步骤3] 提取工具参数...") # 初始化detail_info detail_info = {"cached": False, "prompt": None, "response": None, "tool_info": None} # 尝试从缓存读取 if self.use_cache: cached_params = self.cache.get(combined_question, 'function_knowledge', 'tool_params.json') if cached_params: logger.info(f"✓ 使用缓存的参数: {cached_params}") detail_info["cached"] = True return cached_params, detail_info try: # 获取工具信息 tool_info = get_tool_info(tool_name) if not tool_info: logger.warning(f" ⚠ 未找到工具 {tool_name} 的信息,使用默认参数") # 降级:使用query作为keyword default_params = {"keyword": query} detail_info["fallback"] = "tool_info_not_found" return default_params, detail_info detail_info["tool_info"] = tool_info logger.info(f" 工具 {tool_name} 信息长度: {len(tool_info)}") # 加载prompt prompt_template = self._load_prompt("function_knowledge_extract_tool_params_prompt.md") prompt = prompt_template.format( query=query, tool_info=tool_info ) detail_info["prompt"] = prompt # 调用LLM提取参数 logger.info(" → 调用Gemini提取参数...") response_text = generate_text(prompt=prompt) detail_info["response"] = response_text # 解析JSON logger.info(" → 解析参数JSON...") try: # 清理可能的markdown标记 response_text = response_text.strip() if response_text.startswith("```json"): response_text = response_text[7:] if response_text.startswith("```"): response_text = response_text[3:] if response_text.endswith("```"): response_text = response_text[:-3] response_text = response_text.strip() params = json.loads(response_text) logger.info(f"✓ 提取参数成功: {params}") # 写入缓存 if self.use_cache: self.cache.set(combined_question, 'function_knowledge', 'tool_params.json', params) return params, detail_info except json.JSONDecodeError as e: logger.error(f" ✗ 解析JSON失败: {e}") logger.error(f" 响应内容: {response_text}") # 降级:使用query作为keyword default_params = {"keyword": query} logger.warning(f" 使用默认参数: {default_params}") detail_info["fallback"] = "json_decode_error" return default_params, detail_info except Exception as e: logger.error(f"✗ 提取工具参数失败: {e}") # 降级:使用query作为keyword default_params = {"keyword": query} detail_info["error"] = str(e) detail_info["fallback"] = "exception" return default_params, detail_info def save_knowledge_to_file(self, knowledge: str, combined_question: str): """保存获取到的知识到文件""" try: logger.info("[保存知识] 开始保存知识到文件...") # 获取问题hash import hashlib question_hash = hashlib.md5(combined_question.encode('utf-8')).hexdigest()[:12] # 获取缓存目录(和execution_record.json同级) if self.use_cache and self.cache: cache_dir = os.path.join(self.cache.base_cache_dir, question_hash) else: cache_dir = os.path.join(os.path.dirname(__file__), '.cache', question_hash) os.makedirs(cache_dir, exist_ok=True) # 保存到knowledge.txt knowledge_file = os.path.join(cache_dir, 'knowledge.txt') with open(knowledge_file, 'w', encoding='utf-8') as f: f.write(knowledge) logger.info(f"✓ 知识已保存到: {knowledge_file}") logger.info(f" 知识长度: {len(knowledge)} 字符") except Exception as e: logger.error(f"✗ 保存知识失败: {e}") def get_knowledge(self, question: str, post_info: str, persona_info: str) -> dict: """ 获取方法知识的主流程 Returns: dict: 包含完整执行信息的字典 { "input": {...}, # 原始输入 "execution": {...}, # 执行过程信息 "result": {...}, # 最终结果 "metadata": {...} # 元数据 } """ import time start_time = time.time() timestamp = time.strftime("%Y-%m-%d %H:%M:%S") logger.info("=" * 80) logger.info(f"Function Knowledge - 开始处理") logger.info(f"问题: {question}") logger.info(f"帖子信息: {post_info}") logger.info(f"人设信息: {persona_info}") logger.info("=" * 80) # 组合问题的唯一标识 combined_question = f"{question}||{post_info}||{persona_info}" # 初始化执行记录 execution_record = { "input": { "question": question, "post_info": post_info, "persona_info": persona_info, "timestamp": timestamp }, "execution": { "steps": [], "tool_info": None, "knowledge_search_info": None }, "result": { "type": None, # "tool" 或 "knowledge" "content": None, "raw_data": None }, "metadata": { "execution_time": None, "cache_hits": [], "errors": [] } } # 检查最终结果缓存 if self.use_cache: cached_final = self.cache.get(combined_question, 'function_knowledge', 'final_result.json') if cached_final: logger.info(f"✓ 使用缓存的最终结果") logger.info("=" * 80 + "\n") # 如果是完整的执行记录,直接返回 if isinstance(cached_final, dict) and "execution" in cached_final: return cached_final # 否则构造一个简单的返回 return { "input": execution_record["input"], "execution": {"cached": True}, "result": {"type": "cached", "content": cached_final}, "metadata": {"cache_hit": True} } try: # 步骤1: 生成Query step1_start = time.time() query, query_detail = self.generate_query(question, post_info, persona_info) execution_record["execution"]["steps"].append({ "step": 1, "name": "generate_query", "duration": time.time() - step1_start, "output": query, "detail": query_detail # 包含prompt和response }) # 步骤2: 选择工具 step2_start = time.time() tool_name, tool_select_detail = self.select_tool(combined_question, query) execution_record["execution"]["steps"].append({ "step": 2, "name": "select_tool", "duration": time.time() - step2_start, "output": tool_name, "detail": tool_select_detail # 包含prompt、response和可用工具列表 }) result_content = None if tool_name and tool_name != "None": # 路径A: 使用工具 execution_record["result"]["type"] = "tool" # 步骤3: 提取参数 step3_start = time.time() arguments, params_detail = self.extract_tool_params(combined_question, tool_name, query) execution_record["execution"]["steps"].append({ "step": 3, "name": "extract_tool_params", "duration": time.time() - step3_start, "output": arguments, "detail": params_detail # 包含prompt、response和工具信息 }) # 步骤4: 调用工具 logger.info(f"[步骤4] 调用工具: {tool_name}") # 检查工具调用缓存 if self.use_cache: cached_tool_result = self.cache.get(combined_question, 'function_knowledge', 'tool_result.json') if cached_tool_result: logger.info(f"✓ 使用缓存的工具调用结果") execution_record["metadata"]["cache_hits"].append("tool_result") tool_result = cached_tool_result else: step4_start = time.time() logger.info(f" → 调用工具,参数: {arguments}") tool_result = call_tool(tool_name, arguments) # 缓存工具调用结果 self.cache.set(combined_question, 'function_knowledge', 'tool_result.json', tool_result) execution_record["execution"]["steps"].append({ "step": 4, "name": "call_tool", "duration": time.time() - step4_start, "output": "success" }) else: step4_start = time.time() logger.info(f" → 调用工具,参数: {arguments}") tool_result = call_tool(tool_name, arguments) execution_record["execution"]["steps"].append({ "step": 4, "name": "call_tool", "duration": time.time() - step4_start, "output": "success" }) # 记录工具调用信息 execution_record["execution"]["tool_info"] = { "tool_name": tool_name, "parameters": arguments, "result": tool_result } result_content = f"工具 {tool_name} 执行结果: {json.dumps(tool_result, ensure_ascii=False)}" execution_record["result"]["content"] = result_content execution_record["result"]["raw_data"] = tool_result logger.info(f"✓ 工具调用完成") else: # 路径B: 知识搜索 execution_record["result"]["type"] = "knowledge_search" logger.info("[步骤4] 未找到合适工具,调用 MultiSearch...") step4_start = time.time() knowledge = get_multi_search_knowledge(query, cache_key=combined_question) execution_record["execution"]["steps"].append({ "step": 4, "name": "multi_search_knowledge", "duration": time.time() - step4_start, "output": f"knowledge_length: {len(knowledge)}" }) # 记录知识搜索信息 execution_record["execution"]["knowledge_search_info"] = { "query": query, "knowledge_length": len(knowledge), "source": "multi_search" } result_content = knowledge execution_record["result"]["content"] = knowledge execution_record["result"]["raw_data"] = {"knowledge": knowledge, "query": query} # 异步生成新工具 logger.info("[后台任务] 启动新工具生成线程...") threading.Thread(target=self.save_knowledge_to_file, args=(knowledge, combined_question)).start() # 计算总执行时间 execution_record["metadata"]["execution_time"] = time.time() - start_time # 保存完整的执行记录到JSON文件 if self.use_cache: self.cache.set(combined_question, 'function_knowledge', 'final_result.json', execution_record) # 同时保存一个格式化的JSON文件供人类阅读 from knowledge_v2.cache_manager import CacheManager cache = CacheManager() import hashlib question_hash = hashlib.md5(combined_question.encode('utf-8')).hexdigest()[:12] output_file = os.path.join(cache.base_cache_dir, question_hash, 'execution_record.json') try: with open(output_file, 'w', encoding='utf-8') as f: json.dump(execution_record, f, ensure_ascii=False, indent=2) logger.info(f"✓ 完整执行记录已保存: {output_file}") except Exception as e: logger.error(f"保存执行记录失败: {e}") logger.info("=" * 80) logger.info(f"✓ Function Knowledge 完成") logger.info(f" 类型: {execution_record['result']['type']}") logger.info(f" 结果长度: {len(result_content) if result_content else 0}") logger.info(f" 执行时间: {execution_record['metadata']['execution_time']:.2f}秒") logger.info("=" * 80 + "\n") return execution_record except Exception as e: logger.error(f"✗ 执行失败: {e}") import traceback error_trace = traceback.format_exc() execution_record["metadata"]["errors"].append({ "error": str(e), "traceback": error_trace }) execution_record["result"]["type"] = "error" execution_record["result"]["content"] = f"执行失败: {str(e)}" execution_record["metadata"]["execution_time"] = time.time() - start_time return execution_record def get_knowledge(question: str, post_info: str, persona_info: str) -> dict: """ 便捷调用函数 Returns: dict: 完整的执行记录,包含输入、执行过程、结果和元数据 """ agent = FunctionKnowledge() return agent.get_knowledge(question, post_info, persona_info) if __name__ == "__main__": # 测试代码 question = "教资查分这个信息怎么来的" post_info = "发帖时间:2025.11.07" persona_info = "" try: agent = FunctionKnowledge() execution_result = agent.get_knowledge(question, post_info, persona_info) print("=" * 50) print("执行结果:") print("=" * 50) print(f"类型: {execution_result['result']['type']}") print(f"内容预览: {execution_result['result']['content'][:200]}...") print(f"执行时间: {execution_result['metadata']['execution_time']:.2f}秒") print(f"\n完整JSON已保存到缓存目录") except Exception as e: logger.error(f"测试失败: {e}")