import sys import os from typing import Any, Dict, List import json # 添加项目根目录到 Python 路径 sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) from utils.logging_config import get_logger from utils.mysql_db import MysqlHelper logger = get_logger('ExpandAgent') try: from gemini import GeminiProcessor HAS_GEMINI = True except ImportError: HAS_GEMINI = False def _fetch_parsing_data_by_request(request_id: str) -> List[str]: """从 knowledge_parsing_content 表中根据 request_id 获取 parsing_data 字段""" sql = "SELECT parsing_data FROM knowledge_parsing_content WHERE request_id = %s ORDER BY id DESC LIMIT 10" rows = MysqlHelper.get_values(sql, (request_id,)) or [] results = [] for row in rows: parsing_data = row[0] # 获取 parsing_data 字段 if parsing_data: results.append(parsing_data) print(f"Final results: {len(results)} items") return results def _build_prompt(data_samples: str, input_query: str) -> str: """构建用于扩展查询的 Prompt""" return f"""# 角色与目标 你是一个Query词扩展与优化专家,任务是从输入文本中提取适合用于搜索"创作/制作方法论"的关键词,并基于这些关键词生成多组可直接用于爬虫搜索的query词模板。 目标是: 1. 提取文本中与"创作方法论/制作方法论"相关的核心关键词。 2. 为每个关键词生成同义词、近义词、衍生表达。 3. 将关键词与固定的模板组合,输出可单独搜索的query词,以及多关键词组合的query词。 4. 输出格式要求结构化,支持工程化直接使用。 # 任务 1. 提取文本中的核心关键词(与"创作/制作方法论"高度相关)。 2. 为每个关键词生成同义/扩展词。 3. 按以下模板格式输出结果。 4. 将任务中1和2的关键词(含扩展的关键词)需要填入到输出格式的 "query_templates"与"query_combinations":模板后输出结果。 # 输出格式 {{ "core_keywords": [ "关键词1", "关键词2", ... ], "expanded_keywords": {{ "关键词1": ["同义词1", "同义词2", "变体1"], "关键词2": ["同义词1", "同义词2", "变体1"] }}, "query_templates": [ "如何 + {{关键词}}", "{{关键词}} + 方法论", "{{关键词}} + 技巧", "{{关键词}} + 步骤", "{{关键词}} + 流程", "{{关键词}} + 案例", "{{关键词}} + 总结", "{{关键词}} + 原理", "如何提升 + {{关键词}}", "从零开始 + {{关键词}}" ], "query_combinations": [ "{{关键词1}} + {{关键词2}} + 方法论", "{{关键词1}} + {{关键词2}} + 案例", "如何结合 {{关键词1}} 与 {{关键词2}}" ] }} 输入文本: {data_samples} 请按照上述格式输出JSON结果。""" def _run_llm(prompt: str) -> List[str]: """调用LLM生成扩展查询""" if not HAS_GEMINI: return [] try: processor = GeminiProcessor() result = processor.process(content=prompt, system_prompt="你是专业的查询扩展助手") print(f"result: {result}") # 尝试解析返回结果 if isinstance(result, dict): text = result.get("result", "") or result.get("raw_response", "") else: text = str(result) try: queries = json.loads(text) return queries if isinstance(queries, list) else [] except: return [] except Exception as e: logger.error(f"LLM调用失败: {e}") return [] def _heuristic_expand(input_query: str) -> List[str]: """启发式扩展(LLM不可用时的fallback)""" base = input_query.strip() if not base: return [] return [ base, f"{base} 教程", f"{base} 实战", f"{base} 入门指南", f"{base} 高级技巧" ] def execute_expand_agent_with_api(requestId: str, query: str = "") -> Dict[str, Any]: """对外暴露的API:根据requestId查询数据,生成扩展查询""" # 获取数据 data_samples = _fetch_parsing_data_by_request(requestId) print(f"data_samples: {data_samples[0]}") # 构建prompt prompt = _build_prompt(data_samples[0], query) print(f"prompt: {prompt}") # 生成扩展查询 expanded = _run_llm(prompt) if not expanded: expanded = _heuristic_expand(query) print(f"expanded: {expanded}") return { "requestId": requestId, "inputQuery": query, "prompt": prompt, "expandedQueries": expanded } if __name__ == "__main__": queries = execute_expand_agent_with_api("REQUEST_001") print(queries)