| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- Pipeline包装器
- 封装EnhancedSearchV2,提供只执行阶段3-7的接口
- """
- import os
- import logging
- import tempfile
- from typing import Dict, List, Any, Optional
- from src.pipeline.feature_search_pipeline import EnhancedSearchV2
- from api.config import APIConfig
- logger = logging.getLogger(__name__)
- class PipelineWrapper:
- """Pipeline包装器,复用阶段3-7"""
-
- def __init__(self):
- """初始化Pipeline包装器"""
- # 创建临时输出目录
- self.temp_output_dir = tempfile.mkdtemp(prefix='api_pipeline_')
- logger.info(f"创建临时输出目录: {self.temp_output_dir}")
-
- # 初始化EnhancedSearchV2实例
- # 注意:how_json_path参数是必需的,但我们不会使用它(因为我们跳过阶段1-2)
- # 创建一个空的临时文件作为占位符
- temp_how_file = os.path.join(self.temp_output_dir, 'temp_how.json')
- with open(temp_how_file, 'w', encoding='utf-8') as f:
- import json
- json.dump({'解构结果': {}}, f)
-
- self.pipeline = EnhancedSearchV2(
- how_json_path=temp_how_file, # 占位符文件,实际不会使用
- openrouter_api_key=APIConfig.OPENROUTER_API_KEY,
- output_dir=self.temp_output_dir,
- top_n=10,
- max_total_searches=APIConfig.MAX_TOTAL_SEARCHES,
- search_max_workers=APIConfig.SEARCH_MAX_WORKERS,
- max_searches_per_feature=APIConfig.MAX_SEARCHES_PER_FEATURE,
- max_searches_per_base_word=APIConfig.MAX_SEARCHES_PER_BASE_WORD,
- enable_evaluation=True,
- evaluation_max_workers=APIConfig.EVALUATION_MAX_WORKERS,
- evaluation_max_notes_per_query=APIConfig.EVALUATION_MAX_NOTES_PER_QUERY,
- enable_deep_analysis=True, # 启用深度解构
- deep_analysis_only=False,
- deep_analysis_max_workers=APIConfig.DEEP_ANALYSIS_MAX_WORKERS,
- deep_analysis_max_notes=None,
- deep_analysis_skip_count=0,
- deep_analysis_sort_by='score',
- deep_analysis_api_url=APIConfig.DEEP_ANALYSIS_API_URL,
- deep_analysis_min_score=APIConfig.DEEP_ANALYSIS_MIN_SCORE,
- enable_similarity=True, # 启用相似度分析
- similarity_weight_embedding=APIConfig.SIMILARITY_WEIGHT_EMBEDDING,
- similarity_weight_semantic=APIConfig.SIMILARITY_WEIGHT_SEMANTIC,
- similarity_max_workers=APIConfig.SIMILARITY_MAX_WORKERS,
- similarity_min_similarity=APIConfig.SIMILARITY_MIN_SIMILARITY
- )
-
- logger.info("Pipeline包装器初始化完成")
-
- def run_stages_3_to_7(
- self,
- features_data: List[Dict[str, Any]]
- ) -> Dict[str, Any]:
- """
- 执行阶段3-7的完整流程
-
- Args:
- features_data: 阶段2的输出格式数据(candidate_words.json格式)
-
- Returns:
- 包含阶段3-7结果的字典
-
- Raises:
- Exception: 当任何阶段执行失败时
- """
- try:
- logger.info("=" * 60)
- logger.info("开始执行阶段3-7")
- logger.info("=" * 60)
-
- # 验证输入数据
- if not features_data:
- raise ValueError("features_data不能为空")
-
- # 阶段3:多词组合 + LLM评估
- logger.info("阶段3:生成搜索词...")
- try:
- queries = self.pipeline.generate_search_queries(
- features_data,
- max_workers=APIConfig.QUERY_GENERATION_MAX_WORKERS,
- max_candidates=APIConfig.MAX_CANDIDATES,
- max_combo_length=APIConfig.MAX_COMBO_LENGTH
- )
- except Exception as e:
- logger.error(f"阶段3执行失败: {e}", exc_info=True)
- raise Exception(f"搜索词生成失败: {str(e)}")
-
- # 阶段4:执行搜索
- logger.info("阶段4:执行搜索...")
- try:
- search_results = self.pipeline.execute_search_queries(
- queries,
- search_delay=2.0,
- top_n=self.pipeline.top_n
- )
- except Exception as e:
- logger.error(f"阶段4执行失败: {e}", exc_info=True)
- raise Exception(f"搜索执行失败: {str(e)}")
-
- # 阶段5:LLM评估搜索结果
- logger.info("阶段5:评估搜索结果...")
- try:
- evaluation_results = self.pipeline.evaluate_search_results(search_results)
- except Exception as e:
- logger.error(f"阶段5执行失败: {e}", exc_info=True)
- raise Exception(f"结果评估失败: {str(e)}")
-
- # 阶段6:深度解构分析
- logger.info("阶段6:深度解构分析...")
- try:
- deep_results = self.pipeline.deep_analyzer.run(evaluation_results)
- except Exception as e:
- logger.error(f"阶段6执行失败: {e}", exc_info=True)
- raise Exception(f"深度解构分析失败: {str(e)}")
-
- # 阶段7:相似度分析
- logger.info("阶段7:相似度分析...")
- try:
- similarity_results = self.pipeline.similarity_analyzer.run(
- deep_results,
- output_path=os.path.join(self.temp_output_dir, "similarity_analysis_results.json")
- )
- except Exception as e:
- logger.error(f"阶段7执行失败: {e}", exc_info=True)
- raise Exception(f"相似度分析失败: {str(e)}")
-
- # 注意:similarity_analyzer.run会自动更新evaluation_results中的comprehensive_score
- # 所以我们需要重新加载更新后的evaluation_results
- # 但由于similarity_analyzer已经更新了内存中的evaluation_results,这里直接使用即可
-
- logger.info("=" * 60)
- logger.info("阶段3-7执行完成")
- logger.info("=" * 60)
-
- return {
- 'evaluation_results': evaluation_results, # 已包含更新后的comprehensive_score
- 'deep_results': deep_results,
- 'similarity_results': similarity_results
- }
-
- except Exception as e:
- logger.error(f"执行阶段3-7失败: {e}", exc_info=True)
- raise
-
- def cleanup(self):
- """清理临时文件"""
- try:
- import shutil
- if os.path.exists(self.temp_output_dir):
- shutil.rmtree(self.temp_output_dir)
- logger.info(f"已清理临时目录: {self.temp_output_dir}")
- except Exception as e:
- logger.warning(f"清理临时目录失败: {e}")
|