''' 基于LLM+search的知识获取模块 1. 输入:问题 2. 输出:知识文本 3. 处理流程: - 3.1 根据问题构建query,调用大模型生成多个query,prompt 在 llm_search_generate_query_prompt.md 中 - 3.2 根据query调用 utils/qwen_client.py 的 search_and_chat 方法(使用返回中的 'content' 字段即可),获取知识文本 - 3.3 用大模型合并多个query的知识文本,prompt在 llm_search_merge_knowledge_prompt.md 中 - 3.4 返回知识文本 4. 大模型调用使用uitls/gemini_client.py 的 generate_text 方法 5. 考虑复用性,尽量把每个步骤封装在一个方法中 ''' import os import sys import json from typing import List from loguru import logger # 设置路径以便导入工具类 current_dir = os.path.dirname(os.path.abspath(__file__)) root_dir = os.path.dirname(current_dir) sys.path.insert(0, root_dir) from utils.gemini_client import generate_text from utils.qwen_client import QwenClient from knowledge_v2.cache_manager import CacheManager class LLMSearchKnowledge: """基于LLM+search的知识获取类""" def __init__(self, use_cache: bool = True): """ 初始化 Args: use_cache: 是否启用缓存,默认启用 """ logger.info("=" * 60) logger.info("初始化 LLMSearchKnowledge") self.qwen_client = QwenClient() self.prompt_dir = os.path.join(current_dir, "prompt") self.use_cache = use_cache self.cache = CacheManager() if use_cache else None # 执行详情收集 self.execution_detail = { "generate_queries": None, "search_results": [], "merge_detail": None, "execution_time": 0, "cache_hits": [] } logger.info(f"缓存状态: {'启用' if use_cache else '禁用'}") logger.info("=" * 60) def _load_prompt(self, filename: str) -> str: """ 加载prompt文件内容 Args: filename: prompt文件名 Returns: str: prompt内容 Raises: FileNotFoundError: 文件不存在时抛出 ValueError: 文件内容为空时抛出 """ prompt_path = os.path.join(self.prompt_dir, filename) if not os.path.exists(prompt_path): error_msg = f"Prompt文件不存在: {prompt_path}" logger.error(error_msg) raise FileNotFoundError(error_msg) try: with open(prompt_path, 'r', encoding='utf-8') as f: content = f.read().strip() if not content: error_msg = f"Prompt文件内容为空: {prompt_path}" logger.error(error_msg) raise ValueError(error_msg) return content except Exception as e: error_msg = f"读取prompt文件 {filename} 失败: {e}" logger.error(error_msg) raise def generate_queries(self, question: str) -> List[str]: """ 根据问题生成多个搜索query Args: question: 问题字符串 Returns: List[str]: query列表 Raises: Exception: 生成query失败时抛出异常 """ logger.info(f"[步骤1] 生成搜索Query - 问题: {question[:50]}...") # 尝试从缓存读取 if self.use_cache: cached_queries = self.cache.get(question, 'llm_search', 'generated_queries.json') if cached_queries: logger.info(f"✓ 使用缓存的queries: {cached_queries}") # 记录缓存命中 self.execution_detail["generate_queries"].update({ "cached": True, "queries_count": len(cached_queries) }) return cached_queries try: # 加载prompt prompt_template = self._load_prompt("llm_search_generate_query_prompt.md") # 构建prompt,使用 {question} 作为占位符 prompt = prompt_template.format(question=question) # 调用gemini生成query logger.info("→ 调用Gemini生成query...") response_text = generate_text(prompt=prompt) # 解析JSON响应 logger.info("→ 解析生成的query...") try: # 尝试提取JSON部分(去除可能的markdown代码块标记) response_text = response_text.strip() if response_text.startswith("```json"): response_text = response_text[7:] if response_text.startswith("```"): response_text = response_text[3:] if response_text.endswith("```"): response_text = response_text[:-3] response_text = response_text.strip() result = json.loads(response_text) queries = result.get("queries", []) if not queries: raise ValueError("生成的query列表为空") logger.info(f"✓ 成功生成 {len(queries)} 个query:") for i, q in enumerate(queries, 1): logger.info(f" {i}. {q}") # 记录执行详情 self.execution_detail["generate_queries"].update({ "cached": False, "prompt": prompt, "response": response_text, "queries_count": len(queries), "queries": queries }) # 写入缓存 if self.use_cache: self.cache.set(question, 'llm_search', 'generated_queries.json', queries) return queries except json.JSONDecodeError as e: logger.error(f"✗ 解析JSON失败: {e}") logger.error(f"响应内容: {response_text}") raise ValueError(f"无法解析模型返回的JSON: {e}") except Exception as e: logger.error(f"✗ 生成query失败: {e}") raise def search_knowledge(self, question: str, query: str, query_index: int = 0) -> str: """ 根据单个query搜索知识 Args: question: 原始问题(用于缓存) query: 搜索query query_index: query索引(用于缓存文件名) Returns: str: 搜索到的知识文本(content字段) Raises: Exception: 搜索失败时抛出异常 """ logger.info(f" [{query_index}] 搜索Query: {query}") # 尝试从缓存读取 if self.use_cache: cache_filename = f"search_result_{query_index:03d}.txt" cached_result = self.cache.get(question, 'llm_search/search_results', cache_filename) if cached_result: logger.info(f" ✓ 使用缓存结果 (长度: {len(cached_result)})") # 记录缓存命中 self.execution_detail["search_results"].append({ "query": query, "query_index": query_index, "cached": True, "result_length": len(cached_result) }) self.execution_detail["cache_hits"].append(f"search_result_{query_index:03d}") return cached_result try: # 调用qwen_client的search_and_chat方法 logger.info(f" → 调用搜索引擎...") result = self.qwen_client.search_and_chat( user_prompt=query, search_strategy="agent" ) # 提取content字段 knowledge_text = result.get("content", "") if not knowledge_text: logger.warning(f" ⚠ query '{query}' 的搜索结果为空") return "" logger.info(f" ✓ 获取知识文本 (长度: {len(knowledge_text)})") # 记录搜索结果详情 self.execution_detail["search_results"].append({ "query": query, "query_index": query_index, "cached": False, "result_length": len(knowledge_text) }) # 写入缓存 if self.use_cache: cache_filename = f"search_result_{query_index:03d}.txt" self.cache.set(question, 'llm_search/search_results', cache_filename, knowledge_text) return knowledge_text except Exception as e: logger.error(f" ✗ 搜索知识失败,query: {query}, 错误: {e}") raise def search_knowledge_batch(self, question: str, queries: List[str]) -> List[str]: """ 批量搜索知识 Args: question: 原始问题(用于缓存) queries: query列表 Returns: List[str]: 知识文本列表 """ logger.info(f"[步骤2] 批量搜索 - 共 {len(queries)} 个Query") knowledge_texts = [] for i, query in enumerate(queries, 1): try: knowledge_text = self.search_knowledge(question, query, i) knowledge_texts.append(knowledge_text) except Exception as e: logger.error(f" ✗ 搜索第 {i} 个query失败,跳过: {e}") # 失败时添加空字符串,保持索引对应 knowledge_texts.append("") logger.info(f"✓ 批量搜索完成,获得 {len([k for k in knowledge_texts if k])} 个有效结果") return knowledge_texts def merge_knowledge(self, question: str, knowledge_texts: List[str]) -> str: """ 合并多个知识文本 Args: question: 原始问题(用于缓存) knowledge_texts: 知识文本列表 Returns: str: 合并后的知识文本 Raises: Exception: 合并失败时抛出异常 """ logger.info(f"[步骤3] 合并知识 - 共 {len(knowledge_texts)} 个文本") # 尝试从缓存读取 if self.use_cache: cached_merged = self.cache.get(question, 'llm_search', 'merged_knowledge.txt') if cached_merged: logger.info(f"✓ 使用缓存的合并知识 (长度: {len(cached_merged)})") # 记录缓存命中 self.execution_detail["merge_detail"].update({ "cached": True, "knowledge_count": len(knowledge_texts), "result_length": len(cached_merged) }) return cached_merged try: # 过滤空文本 valid_texts = [text for text in knowledge_texts if text.strip()] logger.info(f" 有效文本数量: {len(valid_texts)}/{len(knowledge_texts)}") if not valid_texts: logger.warning(" ⚠ 所有知识文本都为空,返回空字符串") return "" if len(valid_texts) == 1: logger.info(" 只有一个有效知识文本,直接返回") result = valid_texts[0] if self.use_cache: self.cache.set(question, 'llm_search', 'merged_knowledge.txt', result) return result # 加载prompt prompt_template = self._load_prompt("llm_search_merge_knowledge_prompt.md") # 构建prompt,将多个知识文本格式化 knowledge_sections = [] for i, text in enumerate(valid_texts, 1): knowledge_sections.append(f"【知识文本 {i}】\n{text}") knowledge_texts_str = "\n\n".join(knowledge_sections) prompt = prompt_template.format(knowledge_texts=knowledge_texts_str) # 调用gemini合并知识 logger.info(" → 调用Gemini合并知识文本...") merged_text = generate_text(prompt=prompt) logger.info(f"✓ 成功合并知识文本 (长度: {len(merged_text)})") # 记录合并详情 self.execution_detail["merge_detail"].update({ "cached": False, "prompt": prompt, "response": merged_text, "knowledge_count": len(knowledge_texts), "result_length": len(merged_text) }) # 写入缓存 if self.use_cache: self.cache.set(question, 'llm_search', 'merged_knowledge.txt', merged_text.strip()) return merged_text.strip() except Exception as e: logger.error(f"✗ 合并知识文本失败: {e}") raise def _save_execution_detail(self, cache_key: str): """ 保存执行详情到缓存(支持合并旧记录) Args: cache_key: 缓存键 """ if not self.use_cache or not self.cache: return try: import hashlib question_hash = hashlib.md5(cache_key.encode('utf-8')).hexdigest()[:12] detail_dir = os.path.join( self.cache.base_cache_dir, question_hash, 'llm_search' ) os.makedirs(detail_dir, exist_ok=True) detail_file = os.path.join(detail_dir, 'execution_detail.json') # 准备最终要保存的数据 final_detail = self.execution_detail.copy() # 尝试读取旧文件进行合并 if os.path.exists(detail_file): try: with open(detail_file, 'r', encoding='utf-8') as f: old_detail = json.load(f) # 1. 合并 generate_queries new_gen = self.execution_detail.get("generate_queries") old_gen = old_detail.get("generate_queries") if (new_gen and isinstance(new_gen, dict) and new_gen.get("cached") is True and old_gen and isinstance(old_gen, dict) and "prompt" in old_gen): final_detail["generate_queries"] = old_gen # 2. 合并 merge_detail new_merge = self.execution_detail.get("merge_detail") old_merge = old_detail.get("merge_detail") if (new_merge and isinstance(new_merge, dict) and new_merge.get("cached") is True and old_merge and isinstance(old_merge, dict) and "prompt" in old_merge): final_detail["merge_detail"] = old_merge # 3. 合并 search_results (列表) new_results = self.execution_detail.get("search_results", []) old_results = old_detail.get("search_results", []) if new_results and old_results: merged_results = [] # 建立旧结果的索引:(query, index) -> item old_map = {(item.get("query"), item.get("query_index")): item for item in old_results if isinstance(item, dict)} for item in new_results: if item.get("cached") is True: key = (item.get("query"), item.get("query_index")) if key in old_map: # 如果旧项包含更多信息(例如非cached状态),则使用旧项 old_item = old_map[key] if old_item.get("cached") is False: merged_results.append(old_item) continue merged_results.append(item) final_detail["search_results"] = merged_results except Exception as e: logger.warning(f" ⚠ 读取旧详情失败: {e}") with open(detail_file, 'w', encoding='utf-8') as f: json.dump(final_detail, f, ensure_ascii=False, indent=2) logger.info(f"✓ 执行详情已保存: {detail_file}") except Exception as e: logger.error(f"✗ 保存执行详情失败: {e}") def get_knowledge(self, question: str, cache_key: str = None) -> str: """ 主方法:根据问题获取知识文本 Args: question: 问题字符串 cache_key: 可选的缓存键,用于与主流程共享同一缓存目录 Returns: str: 最终的知识文本 Raises: Exception: 处理过程中出现错误时抛出异常 """ # 使用cache_key或question作为缓存键 actual_cache_key = cache_key if cache_key is not None else question import time start_time = time.time() try: logger.info(f"{'='*60}") logger.info(f"LLM Search - 开始处理问题: {question[:50]}...") logger.info(f"{'='*60}") # 步骤1: 生成多个query queries = self.generate_queries(actual_cache_key) # 步骤2: 对每个query搜索知识 knowledge_texts = self.search_knowledge_batch(actual_cache_key, queries) # 步骤3: 合并多个知识文本 merged_knowledge = self.merge_knowledge(actual_cache_key, knowledge_texts) logger.info(f"{'='*60}") logger.info(f"✓ LLM Search 完成 (最终长度: {len(merged_knowledge)})") logger.info(f"{'='*60}\n") # 计算执行时间并保存详情 self.execution_detail["execution_time"] = time.time() - start_time self._save_execution_detail(actual_cache_key) return merged_knowledge except Exception as e: logger.error(f"✗ 获取知识文本失败,问题: {question[:50]}..., 错误: {e}") # 即使失败也保存执行详情 self.execution_detail["execution_time"] = time.time() - start_time self._save_execution_detail(actual_cache_key) raise def get_knowledge(question: str, cache_key: str = None) -> str: """ 便捷函数:根据问题获取知识文本 Args: question: 问题字符串 cache_key: 可选的缓存键 Returns: str: 最终的知识文本 """ agent = LLMSearchKnowledge() return agent.get_knowledge(question, cache_key=cache_key) if __name__ == "__main__": # 测试代码 test_question = "关于猫咪和墨镜的服装造型元素" try: result = get_knowledge(test_question) print("=" * 50) print("最终知识文本:") print("=" * 50) print(result) except Exception as e: logger.error(f"测试失败: {e}")