#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Pipeline包装器 封装EnhancedSearchV2,提供只执行阶段3-7的接口 """ import os import logging import tempfile from typing import Dict, List, Any, Optional from src.pipeline.feature_search_pipeline import EnhancedSearchV2 from api.config import APIConfig logger = logging.getLogger(__name__) class PipelineWrapper: """Pipeline包装器,复用阶段3-7""" def __init__(self): """初始化Pipeline包装器""" # 创建临时输出目录 self.temp_output_dir = tempfile.mkdtemp(prefix='api_pipeline_') logger.info(f"创建临时输出目录: {self.temp_output_dir}") # 初始化EnhancedSearchV2实例 # 注意:how_json_path参数是必需的,但我们不会使用它(因为我们跳过阶段1-2) # 创建一个空的临时文件作为占位符 temp_how_file = os.path.join(self.temp_output_dir, 'temp_how.json') with open(temp_how_file, 'w', encoding='utf-8') as f: import json json.dump({'解构结果': {}}, f) self.pipeline = EnhancedSearchV2( how_json_path=temp_how_file, # 占位符文件,实际不会使用 openrouter_api_key=APIConfig.OPENROUTER_API_KEY, output_dir=self.temp_output_dir, top_n=10, max_total_searches=APIConfig.MAX_TOTAL_SEARCHES, search_max_workers=APIConfig.SEARCH_MAX_WORKERS, max_searches_per_feature=APIConfig.MAX_SEARCHES_PER_FEATURE, max_searches_per_base_word=APIConfig.MAX_SEARCHES_PER_BASE_WORD, enable_evaluation=True, evaluation_max_workers=APIConfig.EVALUATION_MAX_WORKERS, evaluation_max_notes_per_query=APIConfig.EVALUATION_MAX_NOTES_PER_QUERY, enable_deep_analysis=True, # 启用深度解构 deep_analysis_only=False, deep_analysis_max_workers=APIConfig.DEEP_ANALYSIS_MAX_WORKERS, deep_analysis_max_notes=None, deep_analysis_skip_count=0, deep_analysis_sort_by='score', deep_analysis_api_url=APIConfig.DEEP_ANALYSIS_API_URL, deep_analysis_min_score=APIConfig.DEEP_ANALYSIS_MIN_SCORE, enable_similarity=True, # 启用相似度分析 similarity_weight_embedding=APIConfig.SIMILARITY_WEIGHT_EMBEDDING, similarity_weight_semantic=APIConfig.SIMILARITY_WEIGHT_SEMANTIC, similarity_max_workers=APIConfig.SIMILARITY_MAX_WORKERS, similarity_min_similarity=APIConfig.SIMILARITY_MIN_SIMILARITY ) logger.info("Pipeline包装器初始化完成") def run_stages_3_to_7( self, features_data: List[Dict[str, Any]] ) -> Dict[str, Any]: """ 执行阶段3-7的完整流程 Args: features_data: 阶段2的输出格式数据(candidate_words.json格式) Returns: 包含阶段3-7结果的字典 Raises: Exception: 当任何阶段执行失败时 """ try: logger.info("=" * 60) logger.info("开始执行阶段3-7") logger.info("=" * 60) # 验证输入数据 if not features_data: raise ValueError("features_data不能为空") # 阶段3:多词组合 + LLM评估 logger.info("阶段3:生成搜索词...") try: queries = self.pipeline.generate_search_queries( features_data, max_workers=APIConfig.QUERY_GENERATION_MAX_WORKERS, max_candidates=APIConfig.MAX_CANDIDATES, max_combo_length=APIConfig.MAX_COMBO_LENGTH ) except Exception as e: logger.error(f"阶段3执行失败: {e}", exc_info=True) raise Exception(f"搜索词生成失败: {str(e)}") # 阶段4:执行搜索 logger.info("阶段4:执行搜索...") try: search_results = self.pipeline.execute_search_queries( queries, search_delay=2.0, top_n=self.pipeline.top_n ) except Exception as e: logger.error(f"阶段4执行失败: {e}", exc_info=True) raise Exception(f"搜索执行失败: {str(e)}") # 阶段5:LLM评估搜索结果 logger.info("阶段5:评估搜索结果...") try: evaluation_results = self.pipeline.evaluate_search_results(search_results) except Exception as e: logger.error(f"阶段5执行失败: {e}", exc_info=True) raise Exception(f"结果评估失败: {str(e)}") # 阶段6:深度解构分析 logger.info("阶段6:深度解构分析...") try: deep_results = self.pipeline.deep_analyzer.run(evaluation_results) except Exception as e: logger.error(f"阶段6执行失败: {e}", exc_info=True) raise Exception(f"深度解构分析失败: {str(e)}") # 阶段7:相似度分析 logger.info("阶段7:相似度分析...") try: similarity_results = self.pipeline.similarity_analyzer.run( deep_results, output_path=os.path.join(self.temp_output_dir, "similarity_analysis_results.json") ) except Exception as e: logger.error(f"阶段7执行失败: {e}", exc_info=True) raise Exception(f"相似度分析失败: {str(e)}") # 注意:similarity_analyzer.run会自动更新evaluation_results中的comprehensive_score # 所以我们需要重新加载更新后的evaluation_results # 但由于similarity_analyzer已经更新了内存中的evaluation_results,这里直接使用即可 logger.info("=" * 60) logger.info("阶段3-7执行完成") logger.info("=" * 60) return { 'evaluation_results': evaluation_results, # 已包含更新后的comprehensive_score 'deep_results': deep_results, 'similarity_results': similarity_results } except Exception as e: logger.error(f"执行阶段3-7失败: {e}", exc_info=True) raise def cleanup(self): """清理临时文件""" try: import shutil if os.path.exists(self.temp_output_dir): shutil.rmtree(self.temp_output_dir) logger.info(f"已清理临时目录: {self.temp_output_dir}") except Exception as e: logger.warning(f"清理临时目录失败: {e}")