| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534 |
- '''
- 方法知识获取模块
- 1. 输入:问题 + 帖子信息 + 账号人设信息
- 2. 将输入的问题转化成query,调用大模型,prompt在 function_knowledge_generate_query_prompt.md 中
- 3. 从已有方法工具库中尝试选择合适的方法工具(调用大模型执行,prompt在 function_knowledge_select_tools_prompt.md 中),如果有,则返回选择的方法工具,否则:
- - 调用 multi_search_knowledge.py 获取知识
- - 返回新的方法工具知识
- - 异步从新方法知识中获取新工具(调用大模型执行,prompt在 function_knowledge_generate_new_tool_prompt.md 中),调用工具库系统,接入新的工具
- 4. 调用选择的方法工具执行验证,返回工具执行结果
- '''
- import os
- import sys
- import json
- import threading
- from loguru import logger
- # 设置路径以便导入工具类
- current_dir = os.path.dirname(os.path.abspath(__file__))
- root_dir = os.path.dirname(current_dir)
- sys.path.insert(0, root_dir)
- from utils.gemini_client import generate_text
- from knowledge_v2.tools_library import call_tool, save_tool_info, get_all_tool_infos, get_tool_info
- from knowledge_v2.multi_search_knowledge import get_knowledge as get_multi_search_knowledge
- from knowledge_v2.cache_manager import CacheManager
- class FunctionKnowledge:
- """方法知识获取类"""
-
- def __init__(self, use_cache: bool = True):
- """
- 初始化
-
- Args:
- use_cache: 是否启用缓存,默认启用
- """
- logger.info("=" * 80)
- logger.info("初始化 FunctionKnowledge - 方法知识获取入口")
- self.prompt_dir = os.path.join(current_dir, "prompt")
- self.use_cache = use_cache
- self.cache = CacheManager() if use_cache else None
- logger.info(f"缓存状态: {'启用' if use_cache else '禁用'}")
- logger.info("=" * 80)
-
- def _load_prompt(self, filename: str) -> str:
- """加载prompt文件内容"""
- prompt_path = os.path.join(self.prompt_dir, filename)
- if not os.path.exists(prompt_path):
- raise FileNotFoundError(f"Prompt文件不存在: {prompt_path}")
- with open(prompt_path, 'r', encoding='utf-8') as f:
- return f.read().strip()
- def generate_query(self, question: str, post_info: str, persona_info: str) -> tuple:
- """
- 生成查询语句
-
- Returns:
- tuple: (query, detail_info)
- - query: 生成的查询语句
- - detail_info: 详细信息dict,包含prompt和response
- """
- logger.info(f"[步骤1] 生成Query...")
-
- # 组合问题的唯一标识
- combined_question = f"{question}||{post_info}||{persona_info}"
-
- detail_info = {"cached": False, "prompt": None, "response": None}
-
- # 尝试从缓存读取
- if self.use_cache:
- cached_query = self.cache.get(combined_question, 'function_knowledge', 'generated_query.txt')
- if cached_query:
- logger.info(f"✓ 使用缓存的Query: {cached_query}")
- detail_info["cached"] = True
- return cached_query, detail_info
-
- try:
- prompt_template = self._load_prompt("function_generate_query_prompt.md")
- prompt = prompt_template.replace("{question}", question)
-
- detail_info["prompt"] = prompt
-
- logger.info("→ 调用Gemini生成Query...")
- query = generate_text(prompt=prompt)
- query = query.strip()
-
- detail_info["response"] = query
-
- logger.info(f"✓ 生成Query: {query}")
-
- # 写入缓存
- if self.use_cache:
- self.cache.set(combined_question, 'function_knowledge', 'generated_query.txt', query)
-
- return query, detail_info
- except Exception as e:
- logger.error(f"✗ 生成Query失败: {e}")
- detail_info["error"] = str(e)
- return question, detail_info # 降级使用原问题
- def select_tool(self, combined_question: str, query: str) -> tuple:
- """
- 选择合适的工具
-
- Returns:
- tuple: (tool_name, detail_info)
- """
- logger.info(f"[步骤2] 选择工具...")
-
- detail_info = {"cached": False, "prompt": None, "response": None, "available_tools_count": 0}
-
- # 尝试从缓存读取
- if self.use_cache:
- cached_tool = self.cache.get(combined_question, 'function_knowledge', 'selected_tool.txt')
- if cached_tool:
- logger.info(f"✓ 使用缓存的工具: {cached_tool}")
- detail_info["cached"] = True
- return cached_tool, detail_info
-
- try:
- all_tool_infos = self._load_prompt("all_tools_infos.md")
- if not all_tool_infos:
- logger.info(" 工具库为空,无可用工具")
- return "None", detail_info
-
- tool_count = len(all_tool_infos.split('--- Tool:')) - 1
- detail_info["available_tools_count"] = tool_count
- logger.info(f" 当前可用工具数: {tool_count}")
-
- prompt_template = self._load_prompt("function_knowledge_select_tools_prompt.md")
- prompt = prompt_template.replace("{all_tool_infos}", all_tool_infos)
-
- detail_info["prompt"] = prompt
- detail_info["tool_infos"] = all_tool_infos
-
- logger.info("→ 调用Gemini选择工具...")
- tool_name = generate_text(prompt=prompt)
- tool_name = tool_name.strip()
-
- detail_info["response"] = tool_name
-
- logger.info(f"✓ 选择结果: {tool_name}")
-
- # 写入缓存
- if self.use_cache:
- self.cache.set(combined_question, 'function_knowledge', 'selected_tool.txt', tool_name)
-
- return tool_name, detail_info
- except Exception as e:
- logger.error(f"✗ 选择工具失败: {e}")
- detail_info["error"] = str(e)
- return "None", detail_info
- def extract_tool_params(self, combined_question: str, tool_name: str, query: str) -> tuple:
- """
- 根据工具信息和查询提取调用参数
-
- Args:
- combined_question: 组合问题(用于缓存)
- tool_name: 工具名称
- query: 查询内容
-
- Returns:
- tuple: (params, detail_info)
- """
- logger.info(f"[步骤3] 提取工具参数...")
-
- # 初始化detail_info
- detail_info = {"cached": False, "prompt": None, "response": None, "tool_info": None}
-
- # 尝试从缓存读取
- if self.use_cache:
- cached_params = self.cache.get(combined_question, 'function_knowledge', 'tool_params.json')
- if cached_params:
- logger.info(f"✓ 使用缓存的参数: {cached_params}")
- detail_info["cached"] = True
- return cached_params, detail_info
-
- try:
- # 获取工具信息
- tool_info = get_tool_info(tool_name)
- if not tool_info:
- logger.warning(f" ⚠ 未找到工具 {tool_name} 的信息,使用默认参数")
- # 降级:使用query作为keyword
- default_params = {"keyword": query}
- detail_info["fallback"] = "tool_info_not_found"
- return default_params, detail_info
-
- detail_info["tool_info"] = tool_info
- logger.info(f" 工具 {tool_name} 信息长度: {len(tool_info)}")
-
- # 加载prompt
- prompt_template = self._load_prompt("function_knowledge_extract_tool_params_prompt.md")
- prompt = prompt_template.format(
- query=query,
- tool_info=tool_info
- )
-
- detail_info["prompt"] = prompt
-
- # 调用LLM提取参数
- logger.info(" → 调用Gemini提取参数...")
- response_text = generate_text(prompt=prompt)
- detail_info["response"] = response_text
-
- # 解析JSON
- logger.info(" → 解析参数JSON...")
- try:
- # 清理可能的markdown标记
- response_text = response_text.strip()
- if response_text.startswith("```json"):
- response_text = response_text[7:]
- if response_text.startswith("```"):
- response_text = response_text[3:]
- if response_text.endswith("```"):
- response_text = response_text[:-3]
- response_text = response_text.strip()
-
- params = json.loads(response_text)
-
- logger.info(f"✓ 提取参数成功: {params}")
-
- # 写入缓存
- if self.use_cache:
- self.cache.set(combined_question, 'function_knowledge', 'tool_params.json', params)
-
- return params, detail_info
-
- except json.JSONDecodeError as e:
- logger.error(f" ✗ 解析JSON失败: {e}")
- logger.error(f" 响应内容: {response_text}")
- # 降级:使用query作为keyword
- default_params = {"keyword": query}
- logger.warning(f" 使用默认参数: {default_params}")
- detail_info["fallback"] = "json_decode_error"
- return default_params, detail_info
-
- except Exception as e:
- logger.error(f"✗ 提取工具参数失败: {e}")
- # 降级:使用query作为keyword
- default_params = {"keyword": query}
- detail_info["error"] = str(e)
- detail_info["fallback"] = "exception"
- return default_params, detail_info
- def save_knowledge_to_file(self, knowledge: str, combined_question: str):
- """保存获取到的知识到文件"""
- try:
- logger.info("[保存知识] 开始保存知识到文件...")
- # 获取问题hash
- import hashlib
- question_hash = hashlib.md5(combined_question.encode('utf-8')).hexdigest()[:12]
- # 获取缓存目录(和execution_record.json同级)
- if self.use_cache and self.cache:
- cache_dir = os.path.join(self.cache.base_cache_dir, question_hash)
- else:
- cache_dir = os.path.join(os.path.dirname(__file__), '.cache', question_hash)
- os.makedirs(cache_dir, exist_ok=True)
- # 保存到knowledge.txt
- knowledge_file = os.path.join(cache_dir, 'knowledge.txt')
- with open(knowledge_file, 'w', encoding='utf-8') as f:
- f.write(knowledge)
- logger.info(f"✓ 知识已保存到: {knowledge_file}")
- logger.info(f" 知识长度: {len(knowledge)} 字符")
- except Exception as e:
- logger.error(f"✗ 保存知识失败: {e}")
- def get_knowledge(self, question: str, post_info: str, persona_info: str) -> dict:
- """
- 获取方法知识的主流程
-
- Returns:
- dict: 包含完整执行信息的字典
- {
- "input": {...}, # 原始输入
- "execution": {...}, # 执行过程信息
- "result": {...}, # 最终结果
- "metadata": {...} # 元数据
- }
- """
- import time
- start_time = time.time()
- timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
-
- logger.info("=" * 80)
- logger.info(f"Function Knowledge - 开始处理")
- logger.info(f"问题: {question}")
- logger.info(f"帖子信息: {post_info}")
- logger.info(f"人设信息: {persona_info}")
- logger.info("=" * 80)
-
- # 组合问题的唯一标识
- combined_question = f"{question}||{post_info}||{persona_info}"
-
- # 初始化执行记录
- execution_record = {
- "input": {
- "question": question,
- "post_info": post_info,
- "persona_info": persona_info,
- "timestamp": timestamp
- },
- "execution": {
- "steps": [],
- "tool_info": None,
- "knowledge_search_info": None
- },
- "result": {
- "type": None, # "tool" 或 "knowledge"
- "content": None,
- "raw_data": None
- },
- "metadata": {
- "execution_time": None,
- "cache_hits": [],
- "errors": []
- }
- }
-
- # 检查最终结果缓存
- if self.use_cache:
- cached_final = self.cache.get(combined_question, 'function_knowledge', 'final_result.json')
- if cached_final:
- logger.info(f"✓ 使用缓存的最终结果")
- logger.info("=" * 80 + "\n")
- # 如果是完整的执行记录,直接返回
- if isinstance(cached_final, dict) and "execution" in cached_final:
- return cached_final
- # 否则构造一个简单的返回
- return {
- "input": execution_record["input"],
- "execution": {"cached": True},
- "result": {"type": "cached", "content": cached_final},
- "metadata": {"cache_hit": True}
- }
-
- try:
- # 步骤1: 生成Query
- step1_start = time.time()
- query, query_detail = self.generate_query(question, post_info, persona_info)
- execution_record["execution"]["steps"].append({
- "step": 1,
- "name": "generate_query",
- "duration": time.time() - step1_start,
- "output": query,
- "detail": query_detail # 包含prompt和response
- })
-
- # 步骤2: 选择工具
- step2_start = time.time()
- tool_name, tool_select_detail = self.select_tool(combined_question, query)
- execution_record["execution"]["steps"].append({
- "step": 2,
- "name": "select_tool",
- "duration": time.time() - step2_start,
- "output": tool_name,
- "detail": tool_select_detail # 包含prompt、response和可用工具列表
- })
-
- result_content = None
- if tool_name and tool_name != "None":
- # 路径A: 使用工具
- execution_record["result"]["type"] = "tool"
-
- # 步骤3: 提取参数
- step3_start = time.time()
- arguments, params_detail = self.extract_tool_params(combined_question, tool_name, query)
- execution_record["execution"]["steps"].append({
- "step": 3,
- "name": "extract_tool_params",
- "duration": time.time() - step3_start,
- "output": arguments,
- "detail": params_detail # 包含prompt、response和工具信息
- })
-
- # 步骤4: 调用工具
- logger.info(f"[步骤4] 调用工具: {tool_name}")
-
- # 检查工具调用缓存
- if self.use_cache:
- cached_tool_result = self.cache.get(combined_question, 'function_knowledge', 'tool_result.json')
- if cached_tool_result:
- logger.info(f"✓ 使用缓存的工具调用结果")
- execution_record["metadata"]["cache_hits"].append("tool_result")
- tool_result = cached_tool_result
- else:
- step4_start = time.time()
- logger.info(f" → 调用工具,参数: {arguments}")
- tool_result = call_tool(tool_name, arguments)
-
- # 缓存工具调用结果
- self.cache.set(combined_question, 'function_knowledge', 'tool_result.json', tool_result)
-
- execution_record["execution"]["steps"].append({
- "step": 4,
- "name": "call_tool",
- "duration": time.time() - step4_start,
- "output": "success"
- })
- else:
- step4_start = time.time()
- logger.info(f" → 调用工具,参数: {arguments}")
- tool_result = call_tool(tool_name, arguments)
- execution_record["execution"]["steps"].append({
- "step": 4,
- "name": "call_tool",
- "duration": time.time() - step4_start,
- "output": "success"
- })
-
- # 记录工具调用信息
- execution_record["execution"]["tool_info"] = {
- "tool_name": tool_name,
- "parameters": arguments,
- "result": tool_result
- }
-
- result_content = f"工具 {tool_name} 执行结果: {json.dumps(tool_result, ensure_ascii=False)}"
- execution_record["result"]["content"] = result_content
- execution_record["result"]["raw_data"] = tool_result
-
- logger.info(f"✓ 工具调用完成")
-
- else:
- # 路径B: 知识搜索
- execution_record["result"]["type"] = "knowledge_search"
-
- logger.info("[步骤4] 未找到合适工具,调用 MultiSearch...")
-
- step4_start = time.time()
- knowledge = get_multi_search_knowledge(query, cache_key=combined_question)
-
- execution_record["execution"]["steps"].append({
- "step": 4,
- "name": "multi_search_knowledge",
- "duration": time.time() - step4_start,
- "output": f"knowledge_length: {len(knowledge)}"
- })
-
- # 记录知识搜索信息
- execution_record["execution"]["knowledge_search_info"] = {
- "query": query,
- "knowledge_length": len(knowledge),
- "source": "multi_search"
- }
-
- result_content = knowledge
- execution_record["result"]["content"] = knowledge
- execution_record["result"]["raw_data"] = {"knowledge": knowledge, "query": query}
-
- # 异步生成新工具
- logger.info("[后台任务] 启动新工具生成线程...")
- threading.Thread(target=self.save_knowledge_to_file, args=(knowledge, combined_question)).start()
- # 计算总执行时间
- execution_record["metadata"]["execution_time"] = time.time() - start_time
-
- # 保存完整的执行记录到JSON文件
- if self.use_cache:
- self.cache.set(combined_question, 'function_knowledge', 'final_result.json', execution_record)
-
- # 同时保存一个格式化的JSON文件供人类阅读
- from knowledge_v2.cache_manager import CacheManager
- cache = CacheManager()
- import hashlib
- question_hash = hashlib.md5(combined_question.encode('utf-8')).hexdigest()[:12]
- output_file = os.path.join(cache.base_cache_dir, question_hash, 'execution_record.json')
-
- try:
- with open(output_file, 'w', encoding='utf-8') as f:
- json.dump(execution_record, f, ensure_ascii=False, indent=2)
- logger.info(f"✓ 完整执行记录已保存: {output_file}")
- except Exception as e:
- logger.error(f"保存执行记录失败: {e}")
-
- logger.info("=" * 80)
- logger.info(f"✓ Function Knowledge 完成")
- logger.info(f" 类型: {execution_record['result']['type']}")
- logger.info(f" 结果长度: {len(result_content) if result_content else 0}")
- logger.info(f" 执行时间: {execution_record['metadata']['execution_time']:.2f}秒")
- logger.info("=" * 80 + "\n")
-
- return execution_record
-
- except Exception as e:
- logger.error(f"✗ 执行失败: {e}")
- import traceback
- error_trace = traceback.format_exc()
-
- execution_record["metadata"]["errors"].append({
- "error": str(e),
- "traceback": error_trace
- })
- execution_record["result"]["type"] = "error"
- execution_record["result"]["content"] = f"执行失败: {str(e)}"
- execution_record["metadata"]["execution_time"] = time.time() - start_time
-
- return execution_record
- def get_knowledge(question: str, post_info: str, persona_info: str) -> dict:
- """
- 便捷调用函数
-
- Returns:
- dict: 完整的执行记录,包含输入、执行过程、结果和元数据
- """
- agent = FunctionKnowledge()
- return agent.get_knowledge(question, post_info, persona_info)
- if __name__ == "__main__":
- # 测试代码
- question = "教资查分这个信息怎么来的"
- post_info = "发帖时间:2025.11.07"
- persona_info = ""
-
- try:
- agent = FunctionKnowledge()
- execution_result = agent.get_knowledge(question, post_info, persona_info)
- print("=" * 50)
- print("执行结果:")
- print("=" * 50)
- print(f"类型: {execution_result['result']['type']}")
- print(f"内容预览: {execution_result['result']['content'][:200]}...")
- print(f"执行时间: {execution_result['metadata']['execution_time']:.2f}秒")
- print(f"\n完整JSON已保存到缓存目录")
- except Exception as e:
- logger.error(f"测试失败: {e}")
|