| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292 |
- '''
- 基于LLM+search的知识获取模块
- 1. 输入:问题
- 2. 输出:知识文本
- 3. 处理流程:
- - 3.1 根据问题构建query,调用大模型生成多个query
- - 3.2 根据query调用 utils/qwen_client.py 的 search_and_chat 方法(使用返回中的 'content' 字段即可),获取知识文本
- - 3.3 用大模型合并多个query的知识文本,
- - 3.4 返回知识文本
- 4. 大模型调用使用uitls/gemini_client.py 的 generate_text 方法
- 5. 考虑复用性,尽量把每个步骤封装在一个方法中
- '''
- import os
- import sys
- import json
- from typing import List
- from loguru import logger
- # 设置路径以便导入工具类
- current_dir = os.path.dirname(os.path.abspath(__file__))
- root_dir = os.path.dirname(current_dir)
- sys.path.insert(0, root_dir)
- from utils.gemini_client import generate_text
- from utils.qwen_client import QwenClient
- class LLMSearchKnowledge:
- """基于LLM+search的知识获取类"""
-
- def __init__(self):
- """初始化"""
- self.qwen_client = QwenClient()
- self.prompt_dir = os.path.join(current_dir, "prompt")
-
- def _load_prompt(self, filename: str) -> str:
- """
- 加载prompt文件内容
-
- Args:
- filename: prompt文件名
-
- Returns:
- str: prompt内容
-
- Raises:
- FileNotFoundError: 文件不存在时抛出
- ValueError: 文件内容为空时抛出
- """
- prompt_path = os.path.join(self.prompt_dir, filename)
-
- if not os.path.exists(prompt_path):
- error_msg = f"Prompt文件不存在: {prompt_path}"
- logger.error(error_msg)
- raise FileNotFoundError(error_msg)
-
- try:
- with open(prompt_path, 'r', encoding='utf-8') as f:
- content = f.read().strip()
- if not content:
- error_msg = f"Prompt文件内容为空: {prompt_path}"
- logger.error(error_msg)
- raise ValueError(error_msg)
- return content
- except Exception as e:
- error_msg = f"读取prompt文件 {filename} 失败: {e}"
- logger.error(error_msg)
- raise
-
- def generate_queries(self, question: str) -> List[str]:
- """
- 根据问题生成多个搜索query
-
- Args:
- question: 问题字符串
-
- Returns:
- List[str]: query列表
-
- Raises:
- Exception: 生成query失败时抛出异常
- """
- try:
- logger.info(f"开始生成query,问题: {question[:50]}...")
-
- # 加载prompt
- prompt_template = self._load_prompt("llm_search_generate_query_prompt.md")
-
- # 构建prompt,使用 {question} 作为占位符
- prompt = prompt_template.format(question=question)
-
- # 调用gemini生成query
- logger.info("调用Gemini生成query")
- response_text = generate_text(prompt=prompt)
-
- # 解析JSON响应
- logger.info("解析生成的query")
- try:
- # 尝试提取JSON部分(去除可能的markdown代码块标记)
- response_text = response_text.strip()
- if response_text.startswith("```json"):
- response_text = response_text[7:]
- if response_text.startswith("```"):
- response_text = response_text[3:]
- if response_text.endswith("```"):
- response_text = response_text[:-3]
- response_text = response_text.strip()
-
- result = json.loads(response_text)
- queries = result.get("queries", [])
-
- if not queries:
- raise ValueError("生成的query列表为空")
-
- logger.info(f"成功生成 {len(queries)} 个query: {queries}")
- return queries
-
- except json.JSONDecodeError as e:
- logger.error(f"解析JSON失败: {e}, 响应内容: {response_text}")
- raise ValueError(f"无法解析模型返回的JSON: {e}")
-
- except Exception as e:
- logger.error(f"生成query失败: {e}")
- raise
-
- def search_knowledge(self, query: str) -> str:
- """
- 根据单个query搜索知识
-
- Args:
- query: 搜索query
-
- Returns:
- str: 搜索到的知识文本(content字段)
-
- Raises:
- Exception: 搜索失败时抛出异常
- """
- try:
- logger.info(f"搜索知识,query: {query}")
-
- # 调用qwen_client的search_and_chat方法
- result = self.qwen_client.search_and_chat(
- user_prompt=query,
- search_strategy="agent"
- )
-
- # 提取content字段
- knowledge_text = result.get("content", "")
-
- if not knowledge_text:
- logger.warning(f"query '{query}' 的搜索结果为空")
- return ""
-
- logger.info(f"成功获取知识文本,长度: {len(knowledge_text)}")
- return knowledge_text
-
- except Exception as e:
- logger.error(f"搜索知识失败,query: {query}, 错误: {e}")
- raise
-
- def search_knowledge_batch(self, queries: List[str]) -> List[str]:
- """
- 批量搜索知识
-
- Args:
- queries: query列表
-
- Returns:
- List[str]: 知识文本列表
- """
- knowledge_texts = []
- for i, query in enumerate(queries, 1):
- try:
- logger.info(f"搜索第 {i}/{len(queries)} 个query")
- knowledge_text = self.search_knowledge(query)
- knowledge_texts.append(knowledge_text)
- except Exception as e:
- logger.error(f"搜索第 {i} 个query失败,跳过: {e}")
- # 失败时添加空字符串,保持索引对应
- knowledge_texts.append("")
-
- return knowledge_texts
-
- def merge_knowledge(self, knowledge_texts: List[str]) -> str:
- """
- 合并多个知识文本
-
- Args:
- knowledge_texts: 知识文本列表
-
- Returns:
- str: 合并后的知识文本
-
- Raises:
- Exception: 合并失败时抛出异常
- """
- try:
- logger.info(f"开始合并 {len(knowledge_texts)} 个知识文本")
-
- # 过滤空文本
- valid_texts = [text for text in knowledge_texts if text.strip()]
- if not valid_texts:
- logger.warning("所有知识文本都为空,返回空字符串")
- return ""
-
- if len(valid_texts) == 1:
- logger.info("只有一个有效知识文本,直接返回")
- return valid_texts[0]
-
- # 加载prompt
- prompt_template = self._load_prompt("llm_search_merge_knowledge_prompt.md")
-
- # 构建prompt,将多个知识文本格式化
- knowledge_sections = []
- for i, text in enumerate(valid_texts, 1):
- knowledge_sections.append(f"【知识文本 {i}】\n{text}")
-
- knowledge_texts_str = "\n\n".join(knowledge_sections)
- prompt = prompt_template.format(knowledge_texts=knowledge_texts_str)
-
- # 调用gemini合并知识
- logger.info("调用Gemini合并知识文本")
- merged_text = generate_text(prompt=prompt)
-
- logger.info(f"成功合并知识文本,长度: {len(merged_text)}")
- return merged_text.strip()
-
- except Exception as e:
- logger.error(f"合并知识文本失败: {e}")
- raise
-
- def get_knowledge(self, question: str) -> str:
- """
- 主方法:根据问题获取知识文本
-
- Args:
- question: 问题字符串
-
- Returns:
- str: 最终的知识文本
-
- Raises:
- Exception: 处理过程中出现错误时抛出异常
- """
- try:
- logger.info(f"开始处理问题: {question[:50]}...")
-
- # 步骤1: 生成多个query
- queries = self.generate_queries(question)
-
- # 步骤2: 对每个query搜索知识
- knowledge_texts = self.search_knowledge_batch(queries)
-
- # 步骤3: 合并多个知识文本
- merged_knowledge = self.merge_knowledge(knowledge_texts)
-
- logger.info(f"成功获取知识文本,长度: {len(merged_knowledge)}")
- return merged_knowledge
-
- except Exception as e:
- logger.error(f"获取知识文本失败,问题: {question[:50]}..., 错误: {e}")
- raise
- def get_knowledge(question: str) -> str:
- """
- 便捷函数:根据问题获取知识文本
-
- Args:
- question: 问题字符串
-
- Returns:
- str: 最终的知识文本
- """
- agent = LLMSearchKnowledge()
- return agent.get_knowledge(question)
- if __name__ == "__main__":
- # 测试代码
- test_question = "关于猫咪和墨镜的服装造型元素"
-
- try:
- result = get_knowledge(test_question)
- print("=" * 50)
- print("最终知识文本:")
- print("=" * 50)
- print(result)
- except Exception as e:
- logger.error(f"测试失败: {e}")
|