| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- Pipeline包装器
- 封装EnhancedSearchV2,提供只执行阶段3-7的接口
- """
- import os
- import json
- import logging
- import tempfile
- from typing import Dict, List, Any, Optional
- from src.pipeline.feature_search_pipeline import EnhancedSearchV2
- from api.config import APIConfig
- logger = logging.getLogger(__name__)
- class PipelineWrapper:
- """Pipeline包装器,复用阶段3-7"""
- def __init__(self, output_dir: str, request_id: str = None):
- """
- 初始化Pipeline包装器
- Args:
- output_dir: 请求专用的输出目录(由RequestContext提供)
- request_id: 请求ID(用于日志标识)
- """
- self.output_dir = output_dir
- self.request_id = request_id or "unknown"
- logger.info(f"[{self.request_id}] 初始化Pipeline,输出目录: {output_dir}")
- # 创建占位符how.json(API模式不需要真实文件)
- temp_how_file = os.path.join(output_dir, 'placeholder_how.json')
- with open(temp_how_file, 'w', encoding='utf-8') as f:
- json.dump({'解构结果': {}}, f)
- # 初始化EnhancedSearchV2实例,使用传入的output_dir
- self.pipeline = EnhancedSearchV2(
- how_json_path=temp_how_file, # 占位符文件,实际不会使用
- openrouter_api_key=APIConfig.OPENROUTER_API_KEY,
- output_dir=output_dir, # 使用独立目录
- top_n=10,
- max_total_searches=APIConfig.MAX_TOTAL_SEARCHES,
- search_max_workers=APIConfig.SEARCH_MAX_WORKERS,
- max_searches_per_feature=APIConfig.MAX_SEARCHES_PER_FEATURE,
- max_searches_per_base_word=APIConfig.MAX_SEARCHES_PER_BASE_WORD,
- enable_evaluation=True,
- evaluation_max_workers=APIConfig.EVALUATION_MAX_WORKERS,
- evaluation_max_notes_per_query=APIConfig.EVALUATION_MAX_NOTES_PER_QUERY,
- enable_deep_analysis=True, # 启用深度解构
- deep_analysis_only=False,
- deep_analysis_max_workers=APIConfig.DEEP_ANALYSIS_MAX_WORKERS,
- deep_analysis_max_notes=None,
- deep_analysis_skip_count=0,
- deep_analysis_sort_by='score',
- deep_analysis_api_url=APIConfig.DEEP_ANALYSIS_API_URL,
- deep_analysis_min_score=APIConfig.DEEP_ANALYSIS_MIN_SCORE,
- enable_similarity=True, # 启用相似度分析
- similarity_weight_embedding=APIConfig.SIMILARITY_WEIGHT_EMBEDDING,
- similarity_weight_semantic=APIConfig.SIMILARITY_WEIGHT_SEMANTIC,
- similarity_max_workers=APIConfig.SIMILARITY_MAX_WORKERS,
- similarity_min_similarity=APIConfig.SIMILARITY_MIN_SIMILARITY
- )
- logger.info(f"[{self.request_id}] Pipeline包装器初始化完成")
- def run_stages_3_to_7_sync(
- self,
- features_data: List[Dict[str, Any]]
- ) -> Dict[str, Any]:
- """
- 执行阶段3-7的完整流程(同步版本,用于在线程池中执行)
- Args:
- features_data: 阶段2的输出格式数据(candidate_words.json格式)
- Returns:
- 包含阶段3-7结果的字典
- Raises:
- Exception: 当任何阶段执行失败时
- """
- try:
- logger.info("=" * 60)
- logger.info("开始执行阶段3-7")
- logger.info("=" * 60)
-
- # 验证输入数据
- if not features_data:
- raise ValueError("features_data不能为空")
-
- # 阶段3:多词组合 + LLM评估
- logger.info("阶段3:生成搜索词...")
- try:
- queries = self.pipeline.generate_search_queries(
- features_data,
- max_workers=APIConfig.QUERY_GENERATION_MAX_WORKERS,
- max_candidates=APIConfig.MAX_CANDIDATES,
- max_combo_length=APIConfig.MAX_COMBO_LENGTH
- )
- except Exception as e:
- logger.error(f"阶段3执行失败: {e}", exc_info=True)
- raise Exception(f"搜索词生成失败: {str(e)}")
-
- # 阶段4:执行搜索
- logger.info("阶段4:执行搜索...")
- try:
- search_results = self.pipeline.execute_search_queries(
- queries,
- search_delay=2.0,
- top_n=self.pipeline.top_n
- )
- except Exception as e:
- logger.error(f"阶段4执行失败: {e}", exc_info=True)
- raise Exception(f"搜索执行失败: {str(e)}")
-
- # 阶段5:LLM评估搜索结果
- logger.info("阶段5:评估搜索结果...")
- try:
- evaluation_results = self.pipeline.evaluate_search_results(search_results)
- except Exception as e:
- logger.error(f"阶段5执行失败: {e}", exc_info=True)
- raise Exception(f"结果评估失败: {str(e)}")
-
- # 阶段6:深度解构分析
- logger.info("阶段6:深度解构分析...")
- try:
- deep_results = self.pipeline.deep_analyzer.run(evaluation_results)
- except Exception as e:
- logger.error(f"阶段6执行失败: {e}", exc_info=True)
- raise Exception(f"深度解构分析失败: {str(e)}")
-
- # 阶段7:相似度分析
- logger.info("阶段7:相似度分析...")
- try:
- # 同步版本使用run方法
- similarity_results = self.pipeline.similarity_analyzer.run(
- deep_results,
- output_path=os.path.join(self.output_dir, "similarity_analysis_results.json")
- )
- except Exception as e:
- logger.error(f"阶段7执行失败: {e}", exc_info=True)
- raise Exception(f"相似度分析失败: {str(e)}")
- # 重要:similarity_analyzer.run_async会更新文件中的evaluation_results(添加comprehensive_score)
- # 需要重新加载更新后的文件,因为内存中的evaluation_results变量还没有被更新
- logger.info("重新加载更新后的评估结果(包含comprehensive_score)...")
- evaluated_results_path = os.path.join(self.output_dir, "evaluated_results.json")
- if os.path.exists(evaluated_results_path):
- with open(evaluated_results_path, 'r', encoding='utf-8') as f:
- evaluation_results = json.load(f)
- logger.info(f"已重新加载评估结果,包含 {len(evaluation_results)} 个原始特征")
- else:
- logger.warning(f"评估结果文件不存在: {evaluated_results_path},使用内存中的数据")
-
- logger.info("=" * 60)
- logger.info("阶段3-7执行完成")
- logger.info("=" * 60)
-
- return {
- 'evaluation_results': evaluation_results, # 已包含更新后的comprehensive_score
- 'deep_results': deep_results,
- 'similarity_results': similarity_results
- }
-
- except Exception as e:
- logger.error(f"执行阶段3-7失败: {e}", exc_info=True)
- raise
|