#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Pipeline包装器 封装EnhancedSearchV2,提供只执行阶段3-7的接口 """ import os import json import logging import tempfile from typing import Dict, List, Any, Optional from src.pipeline.feature_search_pipeline import EnhancedSearchV2 from api.config import APIConfig logger = logging.getLogger(__name__) class PipelineWrapper: """Pipeline包装器,复用阶段3-7""" def __init__(self): """初始化Pipeline包装器""" # 创建临时输出目录 self.temp_output_dir = tempfile.mkdtemp(prefix='api_pipeline_') logger.info(f"创建临时输出目录: {self.temp_output_dir}") # 初始化EnhancedSearchV2实例 # 注意:how_json_path参数是必需的,但我们不会使用它(因为我们跳过阶段1-2) # 创建一个空的临时文件作为占位符 temp_how_file = os.path.join(self.temp_output_dir, 'temp_how.json') with open(temp_how_file, 'w', encoding='utf-8') as f: import json json.dump({'解构结果': {}}, f) self.pipeline = EnhancedSearchV2( how_json_path=temp_how_file, # 占位符文件,实际不会使用 openrouter_api_key=APIConfig.OPENROUTER_API_KEY, output_dir=self.temp_output_dir, top_n=10, max_total_searches=APIConfig.MAX_TOTAL_SEARCHES, search_max_workers=APIConfig.SEARCH_MAX_WORKERS, max_searches_per_feature=APIConfig.MAX_SEARCHES_PER_FEATURE, max_searches_per_base_word=APIConfig.MAX_SEARCHES_PER_BASE_WORD, enable_evaluation=True, evaluation_max_workers=APIConfig.EVALUATION_MAX_WORKERS, evaluation_max_notes_per_query=APIConfig.EVALUATION_MAX_NOTES_PER_QUERY, enable_deep_analysis=True, # 启用深度解构 deep_analysis_only=False, deep_analysis_max_workers=APIConfig.DEEP_ANALYSIS_MAX_WORKERS, deep_analysis_max_notes=None, deep_analysis_skip_count=0, deep_analysis_sort_by='score', deep_analysis_api_url=APIConfig.DEEP_ANALYSIS_API_URL, deep_analysis_min_score=APIConfig.DEEP_ANALYSIS_MIN_SCORE, enable_similarity=True, # 启用相似度分析 similarity_weight_embedding=APIConfig.SIMILARITY_WEIGHT_EMBEDDING, similarity_weight_semantic=APIConfig.SIMILARITY_WEIGHT_SEMANTIC, similarity_max_workers=APIConfig.SIMILARITY_MAX_WORKERS, similarity_min_similarity=APIConfig.SIMILARITY_MIN_SIMILARITY ) logger.info("Pipeline包装器初始化完成") async def run_stages_3_to_7( self, features_data: List[Dict[str, Any]] ) -> Dict[str, Any]: """ 执行阶段3-7的完整流程 Args: features_data: 阶段2的输出格式数据(candidate_words.json格式) Returns: 包含阶段3-7结果的字典 Raises: Exception: 当任何阶段执行失败时 """ try: logger.info("=" * 60) logger.info("开始执行阶段3-7") logger.info("=" * 60) # 验证输入数据 if not features_data: raise ValueError("features_data不能为空") # 阶段3:多词组合 + LLM评估 logger.info("阶段3:生成搜索词...") try: queries = self.pipeline.generate_search_queries( features_data, max_workers=APIConfig.QUERY_GENERATION_MAX_WORKERS, max_candidates=APIConfig.MAX_CANDIDATES, max_combo_length=APIConfig.MAX_COMBO_LENGTH ) except Exception as e: logger.error(f"阶段3执行失败: {e}", exc_info=True) raise Exception(f"搜索词生成失败: {str(e)}") # 阶段4:执行搜索 logger.info("阶段4:执行搜索...") try: search_results = self.pipeline.execute_search_queries( queries, search_delay=2.0, top_n=self.pipeline.top_n ) except Exception as e: logger.error(f"阶段4执行失败: {e}", exc_info=True) raise Exception(f"搜索执行失败: {str(e)}") # 阶段5:LLM评估搜索结果 logger.info("阶段5:评估搜索结果...") try: evaluation_results = self.pipeline.evaluate_search_results(search_results) except Exception as e: logger.error(f"阶段5执行失败: {e}", exc_info=True) raise Exception(f"结果评估失败: {str(e)}") # 阶段6:深度解构分析 logger.info("阶段6:深度解构分析...") try: deep_results = self.pipeline.deep_analyzer.run(evaluation_results) except Exception as e: logger.error(f"阶段6执行失败: {e}", exc_info=True) raise Exception(f"深度解构分析失败: {str(e)}") # 阶段7:相似度分析 logger.info("阶段7:相似度分析...") try: # 在异步环境中直接调用run_async而不是run similarity_results = await self.pipeline.similarity_analyzer.run_async( deep_results, output_path=os.path.join(self.temp_output_dir, "similarity_analysis_results.json") ) except Exception as e: logger.error(f"阶段7执行失败: {e}", exc_info=True) raise Exception(f"相似度分析失败: {str(e)}") # 重要:similarity_analyzer.run_async会更新文件中的evaluation_results(添加comprehensive_score) # 需要重新加载更新后的文件,因为内存中的evaluation_results变量还没有被更新 logger.info("重新加载更新后的评估结果(包含comprehensive_score)...") evaluated_results_path = os.path.join(self.temp_output_dir, "evaluated_results.json") if os.path.exists(evaluated_results_path): with open(evaluated_results_path, 'r', encoding='utf-8') as f: evaluation_results = json.load(f) logger.info(f"已重新加载评估结果,包含 {len(evaluation_results)} 个原始特征") else: logger.warning(f"评估结果文件不存在: {evaluated_results_path},使用内存中的数据") logger.info("=" * 60) logger.info("阶段3-7执行完成") logger.info("=" * 60) return { 'evaluation_results': evaluation_results, # 已包含更新后的comprehensive_score 'deep_results': deep_results, 'similarity_results': similarity_results } except Exception as e: logger.error(f"执行阶段3-7失败: {e}", exc_info=True) raise def cleanup(self): """清理临时文件""" try: import shutil if os.path.exists(self.temp_output_dir): shutil.rmtree(self.temp_output_dir) logger.info(f"已清理临时目录: {self.temp_output_dir}") except Exception as e: logger.warning(f"清理临时目录失败: {e}")