yangxiaohui 1 tháng trước cách đây
mục cha
commit
7b28a5fc74
2 tập tin đã thay đổi với 494 bổ sung21 xóa
  1. 305 0
      lib/structured_logger.py
  2. 189 21
      sug_v6_1_2.py

+ 305 - 0
lib/structured_logger.py

@@ -0,0 +1,305 @@
+"""
+结构化日志记录器
+提供步骤化、可追溯、易于可视化的日志记录功能
+"""
+import json
+import os
+from datetime import datetime
+from typing import Any, Optional
+from pathlib import Path
+
+
+class StructuredLogger:
+    """
+    结构化日志记录器
+
+    特点:
+    1. 每个步骤独立保存文件
+    2. 记录完整的时间线
+    3. 支持嵌套步骤(树形结构)
+    4. 便于可视化和debug
+    """
+
+    def __init__(self, log_dir: str, run_id: str):
+        """
+        初始化日志记录器
+
+        Args:
+            log_dir: 日志根目录
+            run_id: 本次运行的唯一标识
+        """
+        self.log_dir = Path(log_dir)
+        self.run_id = run_id
+
+        # 创建目录结构
+        self.steps_dir = self.log_dir / "steps"
+        self.timeline_dir = self.log_dir / "timeline"
+        self.artifacts_dir = self.log_dir / "artifacts"
+
+        for dir_path in [self.steps_dir, self.timeline_dir, self.artifacts_dir]:
+            dir_path.mkdir(parents=True, exist_ok=True)
+
+        # 时间线记录
+        self.timeline = []
+        self.step_counter = 0
+        self.step_stack = []  # 用于嵌套步骤
+
+        # 初始化元数据
+        self.metadata = {
+            "run_id": run_id,
+            "start_time": datetime.now().isoformat(),
+            "status": "running",
+            "steps_count": 0,
+            "log_dir": str(self.log_dir),
+        }
+        self._save_metadata()
+
+    def start_step(
+        self,
+        step_name: str,
+        step_type: str,
+        description: str = "",
+        input_data: Any = None
+    ) -> int:
+        """
+        开始一个新步骤
+
+        Args:
+            step_name: 步骤名称(如:"extract_keywords", "explore_level_1")
+            step_type: 步骤类型(如:"extraction", "exploration", "analysis", "evaluation")
+            description: 步骤描述
+            input_data: 输入数据
+
+        Returns:
+            step_id: 步骤ID
+        """
+        self.step_counter += 1
+        step_id = self.step_counter
+
+        # 计算层级(基于栈深度)
+        level = len(self.step_stack)
+        parent_id = self.step_stack[-1] if self.step_stack else None
+
+        step_info = {
+            "step_id": step_id,
+            "step_name": step_name,
+            "step_type": step_type,
+            "description": description,
+            "level": level,
+            "parent_id": parent_id,
+            "status": "running",
+            "start_time": datetime.now().isoformat(),
+            "end_time": None,
+            "duration_seconds": None,
+            "input": self._serialize(input_data),
+            "output": None,
+            "error": None,
+        }
+
+        # 压入栈
+        self.step_stack.append(step_id)
+
+        # 保存步骤开始信息
+        self._save_step(step_id, step_info)
+
+        # 添加到时间线
+        self.timeline.append({
+            "timestamp": step_info["start_time"],
+            "event": "step_start",
+            "step_id": step_id,
+            "step_name": step_name,
+            "step_type": step_type,
+        })
+        self._save_timeline()
+
+        print(f"\n{'  ' * level}[Step {step_id}] {step_name} - {description}")
+
+        return step_id
+
+    def end_step(
+        self,
+        step_id: int,
+        output_data: Any = None,
+        status: str = "success",
+        error: Optional[str] = None
+    ):
+        """
+        结束一个步骤
+
+        Args:
+            step_id: 步骤ID
+            output_data: 输出数据
+            status: 步骤状态("success", "error", "skipped")
+            error: 错误信息(如果有)
+        """
+        # 从栈中弹出
+        if self.step_stack and self.step_stack[-1] == step_id:
+            self.step_stack.pop()
+
+        # 读取步骤信息
+        step_info = self._load_step(step_id)
+
+        # 更新步骤信息
+        end_time = datetime.now()
+        start_time = datetime.fromisoformat(step_info["start_time"])
+        duration = (end_time - start_time).total_seconds()
+
+        step_info.update({
+            "status": status,
+            "end_time": end_time.isoformat(),
+            "duration_seconds": duration,
+            "output": self._serialize(output_data),
+            "error": error,
+        })
+
+        # 保存步骤结束信息
+        self._save_step(step_id, step_info)
+
+        # 添加到时间线
+        self.timeline.append({
+            "timestamp": step_info["end_time"],
+            "event": "step_end",
+            "step_id": step_id,
+            "step_name": step_info["step_name"],
+            "status": status,
+            "duration_seconds": duration,
+        })
+        self._save_timeline()
+
+        level = len(self.step_stack)
+        status_emoji = "✅" if status == "success" else "❌" if status == "error" else "⏭️"
+        print(f"{'  ' * level}{status_emoji} [Step {step_id}] Completed in {duration:.2f}s")
+
+    def log_artifact(
+        self,
+        step_id: int,
+        artifact_name: str,
+        artifact_data: Any,
+        artifact_type: str = "json"
+    ) -> str:
+        """
+        保存步骤的关联产物(如:API响应、中间结果等)
+
+        Args:
+            step_id: 步骤ID
+            artifact_name: 产物名称
+            artifact_data: 产物数据
+            artifact_type: 产物类型("json", "text", "image"等)
+
+        Returns:
+            artifact_path: 产物文件路径
+        """
+        artifact_dir = self.artifacts_dir / f"step_{step_id:04d}"
+        artifact_dir.mkdir(exist_ok=True)
+
+        if artifact_type == "json":
+            artifact_path = artifact_dir / f"{artifact_name}.json"
+            with open(artifact_path, "w", encoding="utf-8") as f:
+                json.dump(artifact_data, f, ensure_ascii=False, indent=2)
+        elif artifact_type == "text":
+            artifact_path = artifact_dir / f"{artifact_name}.txt"
+            with open(artifact_path, "w", encoding="utf-8") as f:
+                f.write(str(artifact_data))
+        else:
+            artifact_path = artifact_dir / artifact_name
+            with open(artifact_path, "wb") as f:
+                f.write(artifact_data)
+
+        print(f"  📎 Artifact saved: {artifact_path.name}")
+        return str(artifact_path)
+
+    def finalize(self, final_status: str = "success", final_output: Any = None):
+        """
+        完成整个运行,生成最终摘要
+
+        Args:
+            final_status: 最终状态
+            final_output: 最终输出
+        """
+        self.metadata.update({
+            "end_time": datetime.now().isoformat(),
+            "status": final_status,
+            "steps_count": self.step_counter,
+            "final_output": self._serialize(final_output),
+        })
+        self._save_metadata()
+
+        # 生成摘要
+        self._generate_summary()
+
+        print(f"\n{'='*60}")
+        print(f"Run completed: {final_status}")
+        print(f"Total steps: {self.step_counter}")
+        print(f"Log directory: {self.log_dir}")
+        print(f"{'='*60}")
+
+    def _save_step(self, step_id: int, step_info: dict):
+        """保存步骤信息"""
+        step_file = self.steps_dir / f"step_{step_id:04d}.json"
+        with open(step_file, "w", encoding="utf-8") as f:
+            json.dump(step_info, f, ensure_ascii=False, indent=2)
+
+    def _load_step(self, step_id: int) -> dict:
+        """加载步骤信息"""
+        step_file = self.steps_dir / f"step_{step_id:04d}.json"
+        with open(step_file, "r", encoding="utf-8") as f:
+            return json.load(f)
+
+    def _save_timeline(self):
+        """保存时间线"""
+        timeline_file = self.timeline_dir / "timeline.json"
+        with open(timeline_file, "w", encoding="utf-8") as f:
+            json.dump(self.timeline, f, ensure_ascii=False, indent=2)
+
+    def _save_metadata(self):
+        """保存元数据"""
+        metadata_file = self.log_dir / "metadata.json"
+        with open(metadata_file, "w", encoding="utf-8") as f:
+            json.dump(self.metadata, f, ensure_ascii=False, indent=2)
+
+    def _serialize(self, data: Any) -> Any:
+        """序列化数据(处理Pydantic模型等)"""
+        if data is None:
+            return None
+
+        # 处理Pydantic模型
+        if hasattr(data, "model_dump"):
+            return data.model_dump()
+
+        # 处理字典
+        if isinstance(data, dict):
+            return {k: self._serialize(v) for k, v in data.items()}
+
+        # 处理列表
+        if isinstance(data, list):
+            return [self._serialize(item) for item in data]
+
+        # 其他类型直接返回
+        return data
+
+    def _generate_summary(self):
+        """生成运行摘要"""
+        summary = {
+            "run_id": self.run_id,
+            "status": self.metadata["status"],
+            "start_time": self.metadata["start_time"],
+            "end_time": self.metadata["end_time"],
+            "total_steps": self.step_counter,
+            "steps_overview": [],
+        }
+
+        # 汇总所有步骤
+        for step_id in range(1, self.step_counter + 1):
+            step_info = self._load_step(step_id)
+            summary["steps_overview"].append({
+                "step_id": step_id,
+                "step_name": step_info["step_name"],
+                "step_type": step_info["step_type"],
+                "status": step_info["status"],
+                "duration_seconds": step_info["duration_seconds"],
+            })
+
+        # 保存摘要
+        summary_file = self.log_dir / "summary.json"
+        with open(summary_file, "w", encoding="utf-8") as f:
+            json.dump(summary, f, ensure_ascii=False, indent=2)

+ 189 - 21
sug_v6_1_2.py

@@ -450,12 +450,13 @@ def find_qualified_queries(evaluation_results: list[dict], min_relevance_score:
 # 主流程
 # ============================================================================
 
-async def progressive_exploration(context: RunContext, max_levels: int = 4) -> dict:
+async def progressive_exploration(context: RunContext, logger, max_levels: int = 4) -> dict:
     """
     渐进式广度探索流程
 
     Args:
         context: 运行上下文
+        logger: 结构化日志记录器
         max_levels: 最大探索层数,默认4
 
     返回格式:
@@ -467,24 +468,103 @@ async def progressive_exploration(context: RunContext, max_levels: int = 4) -> d
     """
 
     # 阶段1:提取关键词(从原始问题提取)
+    step_id = logger.start_step(
+        step_name="提取关键词",
+        step_type="extraction",
+        description="从原始问题提取关键词",
+        input_data={"q": context.q}
+    )
+
     keyword_result = await extract_keywords(context.q)
     context.keywords = keyword_result.keywords
 
+    logger.log_artifact(
+        step_id=step_id,
+        artifact_name="keywords",
+        artifact_data=keyword_result.model_dump()
+    )
+
+    logger.end_step(
+        step_id=step_id,
+        output_data={"keywords": keyword_result.keywords, "reasoning": keyword_result.reasoning}
+    )
+
     # 阶段2:渐进式探索
     current_level = 1
 
     # Level 1:单个关键词
+    step_id = logger.start_step(
+        step_name="探索Level 1",
+        step_type="exploration",
+        description=f"探索 Level 1:{len(context.keywords[:7])} 个单关键词",
+        input_data={"queries": context.keywords[:7]}
+    )
+
     level_1_queries = context.keywords[:7]  # 限制最多7个关键词
     level_1_data = await explore_level(level_1_queries, current_level, context)
 
+    logger.log_artifact(
+        step_id=step_id,
+        artifact_name="level_1_results",
+        artifact_data=level_1_data
+    )
+
+    logger.end_step(
+        step_id=step_id,
+        output_data={"level": 1, "queries_count": len(level_1_queries)}
+    )
+
     # 分析Level 1
+    step_id = logger.start_step(
+        step_name="分析Level 1",
+        step_type="analysis",
+        description="分析 Level 1 探索结果",
+        input_data={"level": 1, "queries_count": len(level_1_queries)}
+    )
+
     analysis_1 = await analyze_level(level_1_data, context.exploration_levels, context.q, context)
 
+    logger.log_artifact(
+        step_id=step_id,
+        artifact_name="analysis_result",
+        artifact_data=analysis_1.model_dump()
+    )
+
+    logger.end_step(
+        step_id=step_id,
+        output_data={
+            "should_evaluate_now": analysis_1.should_evaluate_now,
+            "candidates_to_evaluate": analysis_1.candidates_to_evaluate if analysis_1.should_evaluate_now else [],
+            "next_combinations": analysis_1.next_combinations,
+            "key_findings": analysis_1.key_findings,
+            "reasoning": analysis_1.reasoning,
+            "promising_signals": [s.model_dump() for s in analysis_1.promising_signals]
+        }
+    )
+
     if analysis_1.should_evaluate_now:
         # 直接评估
+        step_id = logger.start_step(
+            step_name="评估Level 1候选",
+            step_type="evaluation",
+            description=f"评估 Level 1 的 {len(analysis_1.candidates_to_evaluate)} 个候选query",
+            input_data={"candidates": analysis_1.candidates_to_evaluate}
+        )
+
         eval_results = await evaluate_candidates(analysis_1.candidates_to_evaluate, context.q, context)
         qualified = find_qualified_queries(eval_results, min_relevance_score=0.7)
 
+        logger.log_artifact(
+            step_id=step_id,
+            artifact_name="evaluation_results",
+            artifact_data=eval_results
+        )
+
+        logger.end_step(
+            step_id=step_id,
+            output_data={"qualified_count": len(qualified), "qualified": qualified}
+        )
+
         if qualified:
             return {
                 "success": True,
@@ -503,16 +583,77 @@ async def progressive_exploration(context: RunContext, max_levels: int = 4) -> d
             break
 
         # 探索当前层
+        step_id = logger.start_step(
+            step_name=f"探索Level {level_num}",
+            step_type="exploration",
+            description=f"探索 Level {level_num}:{len(prev_analysis.next_combinations)} 个组合query",
+            input_data={"queries": prev_analysis.next_combinations}
+        )
+
         level_data = await explore_level(prev_analysis.next_combinations, level_num, context)
 
+        logger.log_artifact(
+            step_id=step_id,
+            artifact_name=f"level_{level_num}_results",
+            artifact_data=level_data
+        )
+
+        logger.end_step(
+            step_id=step_id,
+            output_data={"level": level_num, "queries_count": len(prev_analysis.next_combinations)}
+        )
+
         # 分析当前层
+        step_id = logger.start_step(
+            step_name=f"分析Level {level_num}",
+            step_type="analysis",
+            description=f"分析 Level {level_num} 探索结果",
+            input_data={"level": level_num}
+        )
+
         analysis = await analyze_level(level_data, context.exploration_levels, context.q, context)
 
+        logger.log_artifact(
+            step_id=step_id,
+            artifact_name="analysis_result",
+            artifact_data=analysis.model_dump()
+        )
+
+        logger.end_step(
+            step_id=step_id,
+            output_data={
+                "should_evaluate_now": analysis.should_evaluate_now,
+                "candidates_to_evaluate": analysis.candidates_to_evaluate if analysis.should_evaluate_now else [],
+                "next_combinations": analysis.next_combinations,
+                "key_findings": analysis.key_findings,
+                "reasoning": analysis.reasoning,
+                "promising_signals": [s.model_dump() for s in analysis.promising_signals]
+            }
+        )
+
         if analysis.should_evaluate_now:
             # 评估候选
+            step_id = logger.start_step(
+                step_name=f"评估Level {level_num}候选",
+                step_type="evaluation",
+                description=f"评估 Level {level_num} 的 {len(analysis.candidates_to_evaluate)} 个候选query",
+                input_data={"candidates": analysis.candidates_to_evaluate}
+            )
+
             eval_results = await evaluate_candidates(analysis.candidates_to_evaluate, context.q, context)
             qualified = find_qualified_queries(eval_results, min_relevance_score=0.7)
 
+            logger.log_artifact(
+                step_id=step_id,
+                artifact_name="evaluation_results",
+                artifact_data=eval_results
+            )
+
+            logger.end_step(
+                step_id=step_id,
+                output_data={"qualified_count": len(qualified), "qualified": qualified}
+            )
+
             if qualified:
                 return {
                     "success": True,
@@ -615,26 +756,53 @@ async def main(input_dir: str, max_levels: int = 4):
         log_url=log_url,
     )
 
-    # 执行渐进式探索
-    optimization_result = await progressive_exploration(run_context, max_levels=max_levels)
-
-    # 格式化输出
-    final_output = format_output(optimization_result, run_context)
-    print(f"\n{'='*60}")
-    print("最终结果")
-    print(f"{'='*60}")
-    print(final_output)
-
-    # 保存结果
-    run_context.optimization_result = optimization_result
-    run_context.final_output = final_output
-
-    # 保存 RunContext 到 log_dir
-    os.makedirs(run_context.log_dir, exist_ok=True)
-    context_file_path = os.path.join(run_context.log_dir, "run_context.json")
-    with open(context_file_path, "w", encoding="utf-8") as f:
-        json.dump(run_context.model_dump(), f, ensure_ascii=False, indent=2)
-    print(f"\nRunContext saved to: {context_file_path}")
+    # === 初始化结构化日志系统 ===
+    from lib.structured_logger import StructuredLogger
+    from sug_v6_1_2_visualizer import visualize_log
+
+    logger = StructuredLogger(log_dir=log_dir, run_id=current_time)
+
+    try:
+        # 执行渐进式探索
+        optimization_result = await progressive_exploration(run_context, logger, max_levels=max_levels)
+
+        # 格式化输出
+        final_output = format_output(optimization_result, run_context)
+        print(f"\n{'='*60}")
+        print("最终结果")
+        print(f"{'='*60}")
+        print(final_output)
+
+        # 保存结果
+        run_context.optimization_result = optimization_result
+        run_context.final_output = final_output
+
+        # 保存 RunContext 到 log_dir(旧版兼容)
+        os.makedirs(run_context.log_dir, exist_ok=True)
+        context_file_path = os.path.join(run_context.log_dir, "run_context.json")
+        with open(context_file_path, "w", encoding="utf-8") as f:
+            json.dump(run_context.model_dump(), f, ensure_ascii=False, indent=2)
+        print(f"\nRunContext saved to: {context_file_path}")
+
+        # === 完成结构化日志 ===
+        logger.finalize(
+            final_status="success",
+            final_output=optimization_result
+        )
+
+        # === 生成可视化页面 ===
+        print("\n生成可视化页面...")
+        visualization_path = visualize_log(logger.log_dir)
+        print(f"🎨 可视化页面已生成: {visualization_path}")
+        print(f"   在浏览器中打开查看详细信息")
+
+    except Exception as e:
+        # 出错时也要完成日志记录
+        logger.finalize(
+            final_status="error",
+            final_output={"error": str(e)}
+        )
+        raise
 
 
 if __name__ == "__main__":