luojunhui 12 часов назад
Родитель
Сommit
0c0a71d71a

+ 1 - 1
agent/core/runner.py

@@ -555,7 +555,7 @@ class AgentRunner:
         goal_tree = self.goal_tree or GoalTree(mission=task_name)
 
         if self.trace_store:
-            await self.trace_store.create_trace(trace_obj)
+            await self.trace_store.create_trace(trace_obj, stage_name=config.name)
             await self.trace_store.update_goal_tree(trace_id, goal_tree)
 
         return trace_obj, goal_tree, 1

+ 71 - 10
agent/trace/store.py

@@ -40,9 +40,57 @@ class FileSystemTraceStore:
         self.base_path = Path(base_path)
         self.base_path.mkdir(exist_ok=True)
 
-    def _get_trace_dir(self, trace_id: str) -> Path:
-        """获取 trace 目录"""
-        return self.base_path / trace_id
+    def _get_trace_dir(self, trace_id: str, parent_trace_id: Optional[str] = None, stage_name: Optional[str] = None) -> Path:
+        """
+        获取 trace 目录。
+
+        如果提供 parent_trace_id 和 stage_name,则创建在父目录的 agents/ 子目录下:
+        - {base_path}/{parent_trace_id}/agents/{stage_name}-{trace_id_suffix}/
+
+        如果只提供 trace_id,则智能查找:
+        1. 先查根目录 {base_path}/{trace_id}/
+        2. 再查所有 agents/ 子目录(用于读取已存在的子 trace)
+
+        Args:
+            trace_id: Trace ID
+            parent_trace_id: 父 Trace ID(创建子 trace 时提供)
+            stage_name: 阶段名称(创建子 trace 时提供)
+        """
+        if parent_trace_id and stage_name:
+            # 创建子 trace:放在父目录的 agents/ 下
+            parent_dir = self.base_path / parent_trace_id / "agents"
+            # 使用 stage_name + trace_id 后8位作为目录名
+            trace_id_suffix = trace_id.split('-')[-1][:8]
+            dir_name = f"{stage_name}-{trace_id_suffix}"
+            return parent_dir / dir_name
+
+        # 智能查找:先查根目录
+        root_dir = self.base_path / trace_id
+        if root_dir.exists():
+            return root_dir
+
+        # 再查所有 agents/ 子目录(用于读取已存在的子 trace)
+        for parent_dir in self.base_path.iterdir():
+            if not parent_dir.is_dir():
+                continue
+            agents_dir = parent_dir / "agents"
+            if not agents_dir.exists():
+                continue
+            for sub_dir in agents_dir.iterdir():
+                if not sub_dir.is_dir():
+                    continue
+                # 检查 meta.json 中的 trace_id
+                meta_file = sub_dir / "meta.json"
+                if meta_file.exists():
+                    try:
+                        data = json.loads(meta_file.read_text(encoding="utf-8"))
+                        if data.get("trace_id") == trace_id:
+                            return sub_dir
+                    except Exception:
+                        continue
+
+        # 找不到则返回根目录(向后兼容)
+        return root_dir
 
     def _get_meta_file(self, trace_id: str) -> Path:
         """获取 meta.json 文件路径"""
@@ -70,21 +118,34 @@ class FileSystemTraceStore:
 
     # ===== Trace 操作 =====
 
-    async def create_trace(self, trace: Trace) -> str:
-        """创建新的 Trace"""
-        trace_dir = self._get_trace_dir(trace.trace_id)
-        trace_dir.mkdir(exist_ok=True)
+    async def create_trace(self, trace: Trace, stage_name: Optional[str] = None) -> str:
+        """
+        创建新的 Trace。
+
+        Args:
+            trace: Trace 对象
+            stage_name: 阶段名称(创建子 trace 时提供,用于目录命名)
+        """
+        # 如果有 parent_trace_id,使用层级化目录结构
+        if trace.parent_trace_id:
+            # 使用 stage_name 或 trace.task 作为目录名前缀
+            dir_stage_name = stage_name or trace.task or "subtrace"
+            trace_dir = self._get_trace_dir(trace.trace_id, trace.parent_trace_id, dir_stage_name)
+        else:
+            trace_dir = self._get_trace_dir(trace.trace_id)
+
+        trace_dir.mkdir(parents=True, exist_ok=True)
 
         # 创建 messages 目录
-        messages_dir = self._get_messages_dir(trace.trace_id)
+        messages_dir = trace_dir / "messages"
         messages_dir.mkdir(exist_ok=True)
 
         # 写入 meta.json
-        meta_file = self._get_meta_file(trace.trace_id)
+        meta_file = trace_dir / "meta.json"
         meta_file.write_text(json.dumps(trace.to_dict(), indent=2, ensure_ascii=False), encoding="utf-8")
 
         # 创建空的 events.jsonl
-        events_file = self._get_events_file(trace.trace_id)
+        events_file = trace_dir / "events.jsonl"
         events_file.touch()
 
         return trace.trace_id

+ 54 - 8
pipeline_visualize.py

@@ -281,6 +281,7 @@ def _render_coarse_filter(d: dict) -> str:
     total = d.get("total_count", len(log))
     passed_cnt = d.get("passed_count", 0)
     rejected_cnt = d.get("rejected_count", 0)
+    low_score_cnt = d.get("low_score_count", 0)
     after_cnt = d.get("after_filter_count", 0)
 
     if not log:
@@ -289,41 +290,84 @@ def _render_coarse_filter(d: dict) -> str:
     parts: list[str] = []
 
     # 统计概览
+    low_score_html = (
+        f'<span class="stat-pill stat-low-score">低分淘汰 {low_score_cnt}</span>'
+        if low_score_cnt else ""
+    )
     parts.append(
         '<div class="decision-section">'
         f'<div class="section-title">📊 粗筛统计</div>'
         f'<span class="stat-pill stat-accept">通过 {passed_cnt}</span>'
-        f'<span class="stat-pill stat-reject">淘汰 {rejected_cnt}</span>'
+        f'<span class="stat-pill stat-reject">语义淘汰 {rejected_cnt}</span>'
+        f'{low_score_html}'
         '</div>'
     )
 
-    # 通过的文章
+    # 通过的文章(按 score 降序)
     passed = [r for r in log if r.get("status") == "pass"]
+    passed.sort(key=lambda r: int(r.get("score", 0)), reverse=True)
     if passed:
         rows = []
         for idx, r in enumerate(passed, 1):
             title = _esc(r.get("title", ""))
             url = _esc(r.get("url", ""))
-            reason = _esc(r.get("reason", ""))
             src_kw = _esc(r.get("source_keyword", ""))
+            score = r.get("score", 0)
+            features = r.get("features", [])
+            features_html = " ".join(
+                f'<span class="tag tag-blue">{_esc(f)}</span>' for f in features
+            ) if features else '<span class="tag">无特征</span>'
             rows.append(
                 f'<tr class="row-accept">'
                 f'<td class="num-cell">{idx}</td>'
                 f'<td class="article-title-cell"><a href="{url}" target="_blank">{title}</a></td>'
                 f'<td><code>{src_kw}</code></td>'
-                f'<td class="reason-full-cell">{reason}</td>'
+                f'<td class="num-cell"><b>{score}</b></td>'
+                f'<td>{features_html}</td>'
                 f'</tr>'
             )
         parts.append(
             '<div class="decision-section">'
             f'<div class="section-title">✅ 通过文章({len(passed)} 篇)</div>'
             '<table class="decision-table review-table">'
-            '<thead><tr><th>#</th><th>标题</th><th>来源词</th><th>理由</th></tr></thead>'
+            '<thead><tr><th>#</th><th>标题</th><th>来源词</th><th>爆款分</th><th>匹配特征</th></tr></thead>'
             '<tbody>' + "\n".join(rows) + '</tbody>'
             '</table></div>'
         )
 
-    # 淘汰的文章
+    # 低分淘汰的文章
+    low_score = [r for r in log if r.get("status") == "low_score"]
+    low_score.sort(key=lambda r: int(r.get("score", 0)), reverse=True)
+    if low_score:
+        rows = []
+        for idx, r in enumerate(low_score, 1):
+            title = _esc(r.get("title", ""))
+            url = _esc(r.get("url", ""))
+            src_kw = _esc(r.get("source_keyword", ""))
+            score = r.get("score", 0)
+            features = r.get("features", [])
+            features_html = " ".join(
+                f'<span class="tag">{_esc(f)}</span>' for f in features
+            ) if features else '<span class="tag">无特征</span>'
+            rows.append(
+                f'<tr class="row-low-score">'
+                f'<td class="num-cell">{idx}</td>'
+                f'<td class="article-title-cell"><a href="{url}" target="_blank">{title}</a></td>'
+                f'<td><code>{src_kw}</code></td>'
+                f'<td class="num-cell">{score}</td>'
+                f'<td>{features_html}</td>'
+                f'</tr>'
+            )
+        parts.append(
+            '<div class="decision-section">'
+            f'<div class="section-title">📉 低分淘汰({len(low_score)} 篇)</div>'
+            '<table class="decision-table review-table">'
+            '<thead><tr><th>#</th><th>标题</th><th>来源词</th><th>爆款分</th><th>匹配特征</th></tr></thead>'
+            '<tbody>' + "\n".join(rows) + '</tbody>'
+            '</table></div>'
+        )
+
+    # 语义淘汰的文章
     rejected = [r for r in log if r.get("status") == "reject"]
     if rejected:
         rows = []
@@ -342,7 +386,7 @@ def _render_coarse_filter(d: dict) -> str:
             )
         parts.append(
             '<div class="decision-section">'
-            f'<div class="section-title">❌ 淘汰文章({len(rejected)} 篇)</div>'
+            f'<div class="section-title">❌ 语义淘汰({len(rejected)} 篇)</div>'
             '<table class="decision-table review-table">'
             '<thead><tr><th>#</th><th>标题</th><th>来源词</th><th>理由</th></tr></thead>'
             '<tbody>' + "\n".join(rows) + '</tbody>'
@@ -1563,9 +1607,11 @@ header .sub span {{ margin:0 6px; }}
 }}
 .row-accept td {{ border-left:2px solid var(--green); }}
 .row-reject td {{ border-left:2px solid var(--red); }}
+.row-low-score td {{ border-left:2px solid var(--yellow); }}
 .row-skip td {{ border-left:2px solid var(--dim); }}
 .stat-accept {{ background:rgba(86,211,100,.1); border-color:rgba(86,211,100,.3); color:var(--green); }}
 .stat-reject {{ background:rgba(248,81,73,.1); border-color:rgba(248,81,73,.3); color:var(--red); }}
+.stat-low-score {{ background:rgba(227,179,65,.1); border-color:rgba(227,179,65,.3); color:var(--yellow); }}
 .stat-skip {{ background:rgba(139,148,158,.1); border-color:rgba(139,148,158,.3); color:var(--dim); }}
 .acct-table .sample-titles {{ font-size:11px; color:var(--dim); }}
 .file-path {{ font-size:11px; color:var(--dim); background:rgba(139,148,158,.08); padding:3px 8px; border-radius:4px; }}
@@ -1851,7 +1897,7 @@ def main() -> None:
         print(f"📋 读取了 {len(full_log_lines)} 行完整日志")
 
     html_content = render_html(events, full_log_lines=full_log_lines)
-    out_path = trace_dir / "pipeline_trace.html"
+    out_path = trace_dir / "执行结果.html"
     out_path.write_text(html_content, encoding="utf-8")
     size_kb = out_path.stat().st_size / 1024
     print(f"✅ 已生成: {out_path}  ({size_kb:.0f} KB)")

+ 49 - 366
run_search_agent.py

@@ -1,21 +1,7 @@
 """
-Search Agent Harness — 约束驱动的搜索 Agent 入口。
+Search Agent 统一入口
 
-Harness Engineering 分层:
-  1. Budget Harness   — 显式限定运行预算(超时、迭代上限、召回上限)
-  2. Planner Harness  — 启动前打印运行计划,明确每阶段目标与约束
-  3. Observer Harness — 结构化进度回调,暴露关键检查点状态
-  4. Fallback Harness — DB 策略失败 / API Key 缺失的显式降级路径
-
-前置:
-- OPEN_ROUTER_API_KEY
-- 可选:SEARCH_AGENT_DB_* 与表 search_agent_strategy(见 docs/search_agent_strategy.sql)
-
-环境变量:
-- PIPELINE_QUERY       / 默认 "伊朗、以色列、和平是永恒的主题"
-- PIPELINE_DEMAND_ID   / 默认 "1"
-- PIPELINE_TIMEOUT     / 整个 Agent 超时秒数,默认 1800(30 分钟)
-- PIPELINE_TARGET_COUNT / 目标文章数,默认取 RuntimePipelineConfig
+简化为薄壳,所有逻辑委托给 application 层。
 """
 
 from __future__ import annotations
@@ -23,383 +9,80 @@ from __future__ import annotations
 import asyncio
 import logging
 import os
-import shutil
-import sys
-import tempfile
-import time
-from dataclasses import dataclass, field
-from typing import Optional
 from uuid import uuid4
 
 from dotenv import load_dotenv
-from src.domain.search.core import SearchAgentCore
-from src.domain.search.policy import SearchAgentPolicy
-
-load_dotenv()
-
-# ── 日志级别由环境变量控制 ────────────
-_LOG_LEVEL = os.getenv("LOG_LEVEL", "DEBUG").upper()
-_CONSOLE_LEVEL = os.getenv("CONSOLE_LOG_LEVEL", "INFO").upper()
-_LOG_FMT = "%(asctime)s | %(levelname)-7s | %(name)s | %(message)s"
-_LOG_DATEFMT = "%Y-%m-%d %H:%M:%S"
-
-# 全局文件 handler 引用,供 main() 移动日志文件
-_file_handler: Optional[logging.FileHandler] = None
-_tmp_log_path: Optional[str] = None
-
-
-def _setup_logging() -> None:
-    """
-    配置双通道日志:console(INFO)+ file(DEBUG)。
-
-    全量日志写入临时文件,pipeline 完成后移入 trace 目录。
-    """
-    global _file_handler, _tmp_log_path
-
-    root = logging.getLogger()
-    root.setLevel(getattr(logging, _LOG_LEVEL, logging.DEBUG))
-
-    formatter = logging.Formatter(fmt=_LOG_FMT, datefmt=_LOG_DATEFMT)
-
-    console = logging.StreamHandler(sys.__stdout__)
-    console.setLevel(getattr(logging, _CONSOLE_LEVEL, logging.INFO))
-    console.setFormatter(formatter)
-    root.addHandler(console)
-
-    tmp = tempfile.NamedTemporaryFile(
-        delete=False, suffix=".log", prefix="search_agent_", mode="w", encoding="utf-8",
-    )
-    _tmp_log_path = tmp.name
-    tmp.close()
-
-    _file_handler = logging.FileHandler(_tmp_log_path, mode="w", encoding="utf-8")
-    _file_handler.setLevel(logging.DEBUG)
-    _file_handler.setFormatter(formatter)
-    root.addHandler(_file_handler)
 
-    for noisy in ("httpx", "httpcore", "urllib3", "asyncio"):
-        logging.getLogger(noisy).setLevel(logging.WARNING)
+from src.config.settings import get_config
+from src.application import ApplicationRunner
+from src.harness.search_agent import (
+    finalize_search_agent_log,
+    log_environment_profile,
+    load_environment_profile,
+    print_run_plan,
+    setup_search_agent_logging,
+    validate_prerequisites,
+)
 
-    # agent 内核日志不写入全量日志文件(减少噪音)
-    # 过滤 agent.core.runner / agent.llm.* / agent.tools.* / agent.trace.* 等
-    class _AgentLogFilter(logging.Filter):
-        def filter(self, record: logging.LogRecord) -> bool:
-            return not record.name.startswith("agent.")
-
-    _file_handler.addFilter(_AgentLogFilter())
-
-
-_setup_logging()
+load_dotenv()
+setup_search_agent_logging()
+logging.getLogger("agent").setLevel(logging.CRITICAL)
 logger = logging.getLogger(__name__)
 
 
-# ─────────────────────────────────────────────
-# 1. Budget Harness — 运行预算约束
-# ─────────────────────────────────────────────
-
-@dataclass
-class AgentBudget:
-    """
-    显式声明 Agent 可消耗的资源上限。
-
-    约束驱动原则:
-    - 所有上限必须在启动前确定,不允许在运行中隐式扩张。
-    - 超时由 harness 层统一兜底,不依赖各 Stage 自己的超时。
-    """
-    timeout_seconds: int = 1800         # 整体超时(30 分钟)
-    max_target_count: int = 10          # 单次运行最多产出文章数(防止无限扩张)
-    max_fallback_rounds: int = 1        # content_search gate fallback 最大轮次(防止死循环)
-
-    @classmethod
-    def from_env(cls) -> "AgentBudget":
-        return cls(
-            timeout_seconds=int(os.getenv("PIPELINE_TIMEOUT", "1800")),
-            max_target_count=int(os.getenv("PIPELINE_MAX_TARGET_COUNT", "10")),
-            max_fallback_rounds=int(os.getenv("PIPELINE_MAX_FALLBACK_ROUNDS", "1")),
-        )
-
-    def validate(self) -> None:
-        """前置断言:预算参数必须在合理范围内。"""
-        if self.timeout_seconds < 30:
-            raise ValueError(f"timeout_seconds 至少 30 秒,当前: {self.timeout_seconds}")
-        if self.max_target_count < 1 or self.max_target_count > 200:
-            raise ValueError(f"max_target_count 须在 [1, 200],当前: {self.max_target_count}")
-        if self.max_fallback_rounds < 0 or self.max_fallback_rounds > 5:
-            raise ValueError(f"max_fallback_rounds 须在 [0, 5],当前: {self.max_fallback_rounds}")
-
-
-# ─────────────────────────────────────────────
-# 2. Observer Harness — 结构化运行摘要
-# ─────────────────────────────────────────────
-
-@dataclass
-class RunSummary:
-    """
-    Agent 运行后的结构化摘要(非裸日志)。
-
-    设计意图:
-    - 调用方可检查 success / error_message 决定后续动作。
-    - 关键指标(candidate_count / filtered_count)可接入告警。
-    """
-    success: bool
-    query: str
-    demand_id: str
-    policy_source: str = "unknown"      # "db" | "default" | "override"
-    trace_id: Optional[str] = None
-    output_file: str = ""
-    candidate_count: int = 0
-    filtered_count: int = 0
-    account_count: int = 0
-    elapsed_seconds: float = 0.0
-    error_message: str = ""
-    stage_history: list = field(default_factory=list)
-
-    def log(self) -> None:
-        """结构化打印运行摘要。"""
-        status = "✅ 成功" if self.success else "❌ 失败"
-        logger.info("=" * 60)
-        logger.info("Agent 运行摘要 %s", status)
-        logger.info("  query        : %s", self.query)
-        logger.info("  demand_id    : %s", self.demand_id)
-        logger.info("  policy_source: %s", self.policy_source)
-        logger.info("  trace_id     : %s", self.trace_id)
-        logger.info("  output_file  : %s", self.output_file)
-        logger.info("  候选文章数    : %d", self.candidate_count)
-        logger.info("  入选文章数    : %d", self.filtered_count)
-        logger.info("  账号数        : %d", self.account_count)
-        logger.info("  耗时          : %.1f 秒", self.elapsed_seconds)
-        if self.error_message:
-            logger.error("  错误信息      : %s", self.error_message)
-        if self.stage_history:
-            logger.info("  阶段历史:")
-            for record in self.stage_history:
-                status_flag = "✓" if record.get("status") == "completed" else "✗"
-                logger.info(
-                    "    %s %-28s attempt=%d",
-                    status_flag,
-                    record.get("stage_name", "?"),
-                    record.get("attempt", 1),
-                )
-        logger.info("=" * 60)
-
-
-# ─────────────────────────────────────────────
-# 3. Planner Harness — 启动前打印运行计划
-# ─────────────────────────────────────────────
-
-def print_run_plan(query: str, demand_id: str, budget: AgentBudget, trace_id: str) -> dict:
-    """
-    在 Agent 启动前打印结构化运行计划,并返回计划数据供 trace 使用。
-
-    目的:
-    - 使运行意图可见、可审计,便于调试和追溯。
-    - 明确各阶段目标与约束,防止"黑盒"执行。
-    """
-    logger.info("=" * 60)
-    logger.info("▶ Search Agent 运行计划")
-    logger.info("  Trace ID   : %s", trace_id)
-    logger.info("  Query      : %s", query)
-    logger.info("  Demand ID  : %s", demand_id or "(未指定,使用 default 策略)")
-    logger.info("  超时上限    : %d 秒", budget.timeout_seconds)
-    logger.info("  目标文章上限 : %d 篇", budget.max_target_count)
-    logger.info("  最大补召回轮次: %d 轮", budget.max_fallback_rounds)
-    logger.info("")
-    logger.info("  阶段规划:")
-    logger.info("    1. [demand_analysis   ]  ← 需求理解,产出搜索策略(无工具调用)")
-    logger.info("    2. [content_search    ]  ← 按关键词召回候选文章")
-    logger.info("       └─ Gate: SearchCompletenessGate — 候选不足则 abort")
-    logger.info("    3. [hard_filter       ]  ← 去重 + URL / 时间基础校验")
-    logger.info("    4. [coarse_filter     ]  ← LLM 标题语义粗筛")
-    logger.info("    5. [quality_filter    ]  ← 数据指标评分 + LLM 正文精排")
-    logger.info("       └─ Gate: FilterSufficiencyGate — 不足则回退补召回(最多 %d 轮)",
-                budget.max_fallback_rounds)
-    logger.info("    6. [account_precipitate] ← 账号信息沉淀")
-    logger.info("    7. [output_persist    ]  ← 输出结构化 JSON")
-    logger.info("       └─ Gate: OutputSchemaGate — 结构校验")
-    logger.info("=" * 60)
-
-    return {
-        "trace_id": trace_id,
-        "query": query,
-        "demand_id": demand_id or "",
-        "timeout_seconds": budget.timeout_seconds,
-        "max_target_count": budget.max_target_count,
-        "max_fallback_rounds": budget.max_fallback_rounds,
-        "stages": [
-            {"name": "demand_analysis", "label": "需求理解,产出搜索策略"},
-            {"name": "content_search", "label": "按关键词召回候选文章", "gate": "SearchCompletenessGate"},
-            {"name": "hard_filter", "label": "去重 + 基础规则过滤"},
-            {"name": "coarse_filter", "label": "LLM 标题语义粗筛"},
-            {"name": "quality_filter", "label": "数据指标评分 + LLM 正文精排", "gate": "FilterSufficiencyGate"},
-            {"name": "account_precipitate", "label": "账号信息沉淀"},
-            {"name": "output_persist", "label": "输出结构化 JSON", "gate": "OutputSchemaGate"},
-        ],
-    }
-
-
-# ─────────────────────────────────────────────
-# 4. Fallback Harness — 前置检查与降级路径
-# ─────────────────────────────────────────────
-
-def validate_prerequisites() -> None:
-    """
-    前置条件检查(Harness 级别,不依赖 Core 内部检查)。
-
-    设计意图:
-    - 把必须满足的约束提升到最外层,让失败快速、信息明确。
-    - 避免在深层 Stage 里才触发 "OPEN_ROUTER_API_KEY 未设置"。
-    """
-    api_key = os.getenv("OPEN_ROUTER_API_KEY", "").strip()
-    if not api_key:
-        raise EnvironmentError(
-            "缺少必要环境变量: OPEN_ROUTER_API_KEY\n"
-            "请在 .env 文件或系统环境中设置该变量后重试。"
-        )
-
-
-# ─────────────────────────────────────────────
-# 5. 主流程 — Harness 统一编排
-# ─────────────────────────────────────────────
-
-async def run_with_harness(
-    query: str,
-    demand_id: str,
-    budget: AgentBudget,
-    trace_id: str,
-    use_db_policy: bool = True,
-    run_plan: dict | None = None,
-) -> RunSummary:
-    """
-    带 Harness 的 Agent 执行入口。
-
-    职责分层:
-    - 本函数只做"约束注入 + 超时包裹 + 摘要采集"。
-    - 业务逻辑委托给 SearchAgentCore。
-    - 不在这里写 if/else 业务判断。
-    """
-
-    start = time.monotonic()
-    summary = RunSummary(success=False, query=query, demand_id=demand_id, trace_id=trace_id)
-
-    # --- 策略来源标记(Observer 用) ---
-    core = SearchAgentCore()
-    policy_override: Optional[SearchAgentPolicy] = None
-
-    if use_db_policy:
-        try:
-            # 预读策略仅用于确认 DB 连通性和标记来源;
-            # SearchAgentCore.run() 内部会用同一 demand_id 再次加载。
-            await core.load_policy(demand_id or None)
-            summary.policy_source = "db"
-            logger.info("策略已从 DB 加载: demand_id=%s", demand_id)
-        except Exception as exc:
-            logger.warning("DB 策略读取失败,降级为默认策略: %s", exc)
-            policy_override = SearchAgentPolicy.defaults()
-            summary.policy_source = "default(fallback)"
-    else:
-        policy_override = SearchAgentPolicy.defaults()
-        summary.policy_source = "default"
-
-    # --- 预算注入:target_count 不超过 max_target_count ---
-    from src.pipeline.config.pipeline_config import RuntimePipelineConfig
-    runtime = RuntimePipelineConfig.from_env()
-    effective_target = min(runtime.target_count, budget.max_target_count)
-    if effective_target != runtime.target_count:
-        logger.info(
-            "target_count 被 Budget Harness 限制: %d → %d",
-            runtime.target_count,
-            effective_target,
-        )
-
-    # --- 超时包裹执行 ---
-    try:
-        ctx = await asyncio.wait_for(
-            core.run(
-                query=query,
-                demand_id=demand_id,
-                target_count=effective_target,
-                use_db_policy=(policy_override is None),
-                policy_override=policy_override,
-                trace_id=trace_id,
-                run_plan=run_plan,
-            ),
-            timeout=budget.timeout_seconds,
-        )
-    except asyncio.TimeoutError:
-        summary.elapsed_seconds = time.monotonic() - start
-        summary.error_message = f"Agent 超时(>{budget.timeout_seconds}s),已中止"
-        logger.error(summary.error_message)
-        return summary
-    except Exception as exc:
-        summary.elapsed_seconds = time.monotonic() - start
-        summary.error_message = str(exc)
-        logger.exception("Agent 运行异常: %s", exc)
-        return summary
-
-    # --- 采集 Observer 摘要 ---
-    summary.success = True
-    summary.output_file = ctx.metadata.get("output_file", "")
-    summary.candidate_count = len(ctx.candidate_articles)
-    summary.filtered_count = len(ctx.filtered_articles)
-    summary.account_count = len(ctx.accounts)
-    summary.elapsed_seconds = time.monotonic() - start
-    summary.stage_history = [
-        {
-            "stage_name": r.stage_name,
-            "status": r.status,
-            "attempt": r.attempt,
-        }
-        for r in ctx.stage_history
-    ]
-    return summary
-
-
 async def main() -> None:
-    # ① 前置检查(Fallback Harness)
+    """主入口"""
+    # 前置检查
     validate_prerequisites()
 
-    # ② 读取运行参数
-    query = os.getenv("PIPELINE_QUERY", "伊朗以色列冲突、中老年人会关注什么?")
-    demand_id = os.getenv("PIPELINE_DEMAND_ID", "1")
+    # 加载配置
+    config = get_config()
+    environment = load_environment_profile()
+    log_environment_profile(environment)
 
-    # ③ 预算约束(Budget Harness)
-    budget = AgentBudget.from_env()
-    budget.validate()
-
-    # ④ 生成全局 trace_id,贯穿整个运行周期
+    # 读取参数
+    query = os.getenv("PIPELINE_QUERY", "当谈到毛主席的生平、丰功伟绩、伟人伟绩、老年人会关注什么?")
+    demand_id = os.getenv("PIPELINE_DEMAND_ID", "1")
     trace_id = str(uuid4())
+
     logger.info("Trace ID: %s", trace_id)
 
-    # ⑤ 运行计划(Planner Harness)
+    # 打印运行计划
+    from src.application.runner import AgentBudget
+    budget = AgentBudget.from_config(config)
+    budget.validate()
+
     run_plan = print_run_plan(query=query, demand_id=demand_id, budget=budget, trace_id=trace_id)
+    run_plan["environment"] = {
+        "env_name": environment.env_name,
+        "use_db_policy": environment.use_db_policy,
+        "strategy_source": environment.strategy_source,
+    }
 
-    # ⑥ 执行(带约束 + 观测)
-    summary = await run_with_harness(
+    # 执行
+    runner = ApplicationRunner(config)
+    summary = await runner.run(
         query=query,
         demand_id=demand_id,
-        budget=budget,
         trace_id=trace_id,
-        use_db_policy=True,
-        run_plan=run_plan,
+        use_db_policy=environment.use_db_policy,
+        policy_override=environment.strategy_override,
     )
 
-    # ⑦ 结构化输出摘要(Observer Harness)
+    # 输出摘要
     summary.log()
+    finalize_search_agent_log(trace_id)
 
-    # ⑧ 将全量日志移入 trace 目录
-    global _file_handler, _tmp_log_path
-    if _file_handler and _tmp_log_path and os.path.exists(_tmp_log_path):
+    # 自动触发知识总结
+    if os.getenv("ENABLE_KNOWLEDGE_SUMMARY", "false").lower() == "true":
         try:
-            _file_handler.close()
-            trace_dir = os.path.join("tests", "traces", trace_id)
-            os.makedirs(trace_dir, exist_ok=True)
-            dest = os.path.join(trace_dir, "full_log.log")
-            shutil.move(_tmp_log_path, dest)
-            logger.info("完整日志已保存: %s", dest)
+            from agent.llm.openrouter import openrouter_llm_call
+            from src.knowledge import trigger_knowledge_summary
+            logger.info("触发知识总结...")
+            await trigger_knowledge_summary(llm_call=openrouter_llm_call)
         except Exception as exc:
-            logger.warning("移动日志文件失败: %s", exc)
+            logger.warning("知识总结失败: %s", exc)
 
-    # ⑨ 非零退出码(让 CI/调度系统能感知失败)
     if not summary.success:
         raise SystemExit(1)
 

+ 78 - 13
src/domain/search/policy.py

@@ -21,11 +21,15 @@ class SearchAgentPolicy:
     initial_cursor: str = "1"
     keyword_priority: KeywordPriority = "demand_first"
     extra_keywords: List[str] = field(default_factory=list)
+    recall_multiplier: float = 5.0
     min_candidate_multiplier: float = 2.0
     near_enough_candidate_multiplier: float = 1.2
     filter_near_ratio: float = 0.8
     max_detail_fetch: int = 30
     enable_llm_review: bool = True
+    quality_score: Dict[str, Any] = field(default_factory=dict)
+    account_strategy: Dict[str, Any] = field(default_factory=dict)
+    target_count_override: int | None = None
 
     @classmethod
     def defaults(cls) -> SearchAgentPolicy:
@@ -34,24 +38,50 @@ class SearchAgentPolicy:
     @classmethod
     def from_dict(cls, data: Dict[str, Any]) -> SearchAgentPolicy:
         base = cls.defaults().__dict__.copy()
-        for key, value in (data or {}).items():
-            if key in base and value is not None:
-                base[key] = value
-        ek = base["extra_keywords"]
+        payload = data or {}
+        search_cfg = payload.get("search") if isinstance(payload.get("search"), dict) else {}
+        filter_cfg = payload.get("filter") if isinstance(payload.get("filter"), dict) else {}
+        account_cfg = payload.get("account") if isinstance(payload.get("account"), dict) else {}
+        runtime_cfg = payload.get("runtime") if isinstance(payload.get("runtime"), dict) else {}
+
+        def _pick(name: str, *, section: Dict[str, Any] | None = None) -> Any:
+            if section and name in section and section[name] is not None:
+                return section[name]
+            if name in payload and payload[name] is not None:
+                return payload[name]
+            return base[name]
+
+        ek = _pick("extra_keywords", section=search_cfg)
         if not isinstance(ek, list):
             ek = []
+        quality_score = _pick("quality_score", section=filter_cfg)
+        if not isinstance(quality_score, dict):
+            quality_score = {}
+        account_strategy = _pick("account_strategy", section=account_cfg)
+        if not isinstance(account_strategy, dict):
+            account_strategy = {}
+        target_raw = runtime_cfg.get("target_count", payload.get("target_count"))
+        if target_raw in (None, ""):
+            target_count_override = None
+        else:
+            target_count_override = int(target_raw)
+        keyword_priority = _pick("keyword_priority", section=search_cfg)
         return cls(
-            max_keywords=int(base["max_keywords"]),
-            initial_cursor=str(base["initial_cursor"]),
-            keyword_priority=base["keyword_priority"]
-            if base["keyword_priority"] in ("demand_first", "query_first")
+            max_keywords=int(_pick("max_keywords", section=search_cfg)),
+            initial_cursor=str(_pick("initial_cursor", section=search_cfg)),
+            keyword_priority=keyword_priority
+            if keyword_priority in ("demand_first", "query_first")
             else "demand_first",
             extra_keywords=[str(x).strip() for x in ek if str(x).strip()],
-            min_candidate_multiplier=float(base["min_candidate_multiplier"]),
-            near_enough_candidate_multiplier=float(base["near_enough_candidate_multiplier"]),
-            filter_near_ratio=float(base["filter_near_ratio"]),
-            max_detail_fetch=int(base["max_detail_fetch"]),
-            enable_llm_review=bool(base["enable_llm_review"]),
+            recall_multiplier=float(_pick("recall_multiplier", section=search_cfg)),
+            min_candidate_multiplier=float(_pick("min_candidate_multiplier", section=search_cfg)),
+            near_enough_candidate_multiplier=float(_pick("near_enough_candidate_multiplier", section=search_cfg)),
+            filter_near_ratio=float(_pick("filter_near_ratio", section=filter_cfg)),
+            max_detail_fetch=int(_pick("max_detail_fetch", section=filter_cfg)),
+            enable_llm_review=bool(_pick("enable_llm_review", section=filter_cfg)),
+            quality_score=quality_score,
+            account_strategy=account_strategy,
+            target_count_override=target_count_override,
         )
 
     def to_dict(self) -> Dict[str, Any]:
@@ -60,13 +90,48 @@ class SearchAgentPolicy:
             "initial_cursor": self.initial_cursor,
             "keyword_priority": self.keyword_priority,
             "extra_keywords": list(self.extra_keywords),
+            "recall_multiplier": self.recall_multiplier,
             "min_candidate_multiplier": self.min_candidate_multiplier,
             "near_enough_candidate_multiplier": self.near_enough_candidate_multiplier,
             "filter_near_ratio": self.filter_near_ratio,
             "max_detail_fetch": self.max_detail_fetch,
             "enable_llm_review": self.enable_llm_review,
+            "quality_score": dict(self.quality_score),
+            "account_strategy": dict(self.account_strategy),
+            "target_count": self.target_count_override,
+            "search": {
+                "max_keywords": self.max_keywords,
+                "initial_cursor": self.initial_cursor,
+                "keyword_priority": self.keyword_priority,
+                "extra_keywords": list(self.extra_keywords),
+                "recall_multiplier": self.recall_multiplier,
+                "min_candidate_multiplier": self.min_candidate_multiplier,
+                "near_enough_candidate_multiplier": self.near_enough_candidate_multiplier,
+            },
+            "filter": {
+                "filter_near_ratio": self.filter_near_ratio,
+                "max_detail_fetch": self.max_detail_fetch,
+                "enable_llm_review": self.enable_llm_review,
+                "quality_score": dict(self.quality_score),
+            },
+            "account": {
+                "account_strategy": dict(self.account_strategy),
+            },
+            "runtime": {
+                "target_count": self.target_count_override,
+            },
         }
 
+    def merged_with(self, override: Dict[str, Any]) -> "SearchAgentPolicy":
+        """
+        用额外策略覆盖当前策略并返回新实例。
+
+        适用于 env/file 的快速调参,不改 DB 即可生效。
+        """
+        merged = self.to_dict()
+        merged.update(override or {})
+        return SearchAgentPolicy.from_dict(merged)
+
 
 def apply_search_agent_policy(ctx: PipelineContext, policy: SearchAgentPolicy) -> None:
     """将策略写入上下文,供 Stage / Gate 读取。"""

+ 9 - 0
src/pipeline/context.py

@@ -52,6 +52,14 @@ class DemandAnalysisResult:
     raw_result: Dict[str, Any] = field(default_factory=dict)
 
 
+@dataclass
+class ExpandedQuery:
+    """查询拓展结果(基于爆款特征)。"""
+    original_keywords: List[str] = field(default_factory=list)  # 原始关键词(来自 demand_analysis)
+    expanded_keywords: List[Dict[str, Any]] = field(default_factory=list)  # 拓展关键词列表
+    # 每个 expanded_keyword 包含: {"keyword": str, "original": str, "features": List[str], "weight_sum": int, "priority": int}
+
+
 @dataclass
 class CandidateArticle:
     """搜索阶段候选文章结构。"""
@@ -160,6 +168,7 @@ class PipelineContext:
     current_stage: str = "INIT"
 
     demand_analysis: Optional[DemandAnalysisResult] = None
+    expanded_query: Optional[ExpandedQuery] = None
     candidate_articles: List[CandidateArticle] = field(default_factory=list)
     filtered_articles: List[FilteredArticle] = field(default_factory=list)
     accounts: List[AccountInfo] = field(default_factory=list)

+ 3 - 2
src/pipeline/gates/filter_sufficiency.py

@@ -4,6 +4,7 @@ from __future__ import annotations
 
 from src.pipeline.base import GateResult, QualityGate
 from src.pipeline.context import PipelineContext
+from src.pipeline.policy_resolver import get_policy_value
 
 
 class FilterSufficiencyGate(QualityGate):
@@ -13,7 +14,7 @@ class FilterSufficiencyGate(QualityGate):
     若补召回后仍不足,则放行(有多少用多少)。
     """
 
-    def __init__(self, fallback_stage: str = "content_search"):
+    def __init__(self, fallback_stage: str = "query_expansion"):
         # 当数量明显不足时,回退到指定阶段补召回
         self.fallback_stage = fallback_stage
         self._check_count = 0
@@ -21,7 +22,7 @@ class FilterSufficiencyGate(QualityGate):
     def check(self, ctx: PipelineContext) -> GateResult:
         self._check_count += 1
         policy = ctx.metadata.get("search_agent_policy") or {}
-        near_ratio = float(policy.get("filter_near_ratio", 0.5))
+        near_ratio = float(get_policy_value(policy, "filter_near_ratio", 0.5, section="filter"))
         count = len(ctx.filtered_articles)
         target = max(ctx.target_count, 1)
 

+ 26 - 3
src/pipeline/gates/search_completeness.py

@@ -4,6 +4,7 @@ from __future__ import annotations
 
 from src.pipeline.base import GateResult, QualityGate
 from src.pipeline.context import PipelineContext
+from src.pipeline.policy_resolver import get_policy_value
 
 
 class SearchCompletenessGate(QualityGate):
@@ -15,10 +16,24 @@ class SearchCompletenessGate(QualityGate):
     - ctx.metadata.search_agent_policy 中的候选倍率参数
     """
 
+    def __init__(self, fallback_stage: str = "query_expansion"):
+        # 候选明显不足时,回退到 query_expansion 做新一轮关键词拓展与搜索
+        self.fallback_stage = fallback_stage
+        self._check_count = 0
+
     def check(self, ctx: PipelineContext) -> GateResult:
+        self._check_count += 1
+
+        # 搜索阶段主动达到召回上限后停搜,直接放行
+        if ctx.metadata.get("_search_stopped_early"):
+            return GateResult(
+                passed=True,
+                issues=[f"搜索达到召回上限后提前结束,候选 {len(ctx.candidate_articles)} 篇"],
+            )
+
         policy = ctx.metadata.get("search_agent_policy") or {}
-        mult = float(policy.get("min_candidate_multiplier", 2.0))
-        near = float(policy.get("near_enough_candidate_multiplier", 1.2))
+        mult = float(get_policy_value(policy, "min_candidate_multiplier", 2.0, section="search"))
+        near = float(get_policy_value(policy, "near_enough_candidate_multiplier", 1.2, section="search"))
         target = max(int(ctx.target_count * mult), 1)
         count = len(ctx.candidate_articles)
         if count >= target:
@@ -29,8 +44,16 @@ class SearchCompletenessGate(QualityGate):
                 issues=[f"候选数量低于理想值,但可继续: {count}/{target}"],
                 action="proceed",
             )
+        # 已经 fallback 过一次,不再反复补召回,避免门禁死循环
+        if self._check_count > 1:
+            return GateResult(
+                passed=True,
+                issues=[f"补召回后候选仍不足({count}/{target}),放行已有候选"],
+                action="proceed",
+            )
         return GateResult(
             passed=False,
             issues=[f"候选数量不足: {count}/{target}"],
-            action="abort",
+            action="fallback",
+            fallback_stage=self.fallback_stage,
         )

+ 2 - 0
src/pipeline/hooks/live_progress_hook.py

@@ -149,6 +149,8 @@ class LiveProgressHook(PipelineHook):
                 connector, s["keyword"], s["returned"], s["new"],
             )
         logger.info("      └─ 📊 累计候选: %d 篇", len(ctx.candidate_articles))
+        if ctx.metadata.get("_search_stopped_early"):
+            logger.info("      ⚡ 达到召回上限,提前停搜(共 %d 轮搜索词)", len(stats))
 
     def _print_hard_filter(self, ctx: PipelineContext) -> None:
         logger.info("      └─ 📊 过滤后: %d 篇", len(ctx.candidate_articles))

+ 2 - 0
src/pipeline/hooks/pipeline_trace_hook.py

@@ -145,6 +145,7 @@ class PipelineTraceHook(PipelineHook):
         return {
             "keyword_stats": stats,
             "total_candidates": len(ctx.candidate_articles),
+            "stopped_early": bool(ctx.metadata.get("_search_stopped_early")),
             "candidates": candidates,
         }
 
@@ -159,6 +160,7 @@ class PipelineTraceHook(PipelineHook):
             "coarse_log": log,
             "passed_count": sum(1 for r in log if r.get("status") == "pass"),
             "rejected_count": sum(1 for r in log if r.get("status") == "reject"),
+            "low_score_count": sum(1 for r in log if r.get("status") == "low_score"),
             "after_filter_count": len(ctx.candidate_articles),
         }
 

+ 2 - 0
src/pipeline/runner.py

@@ -29,6 +29,7 @@ from src.pipeline.stages import (
     HardFilterStage,
     OutputPersistStage,
     QualityFilterStage,
+    QueryExpansionStage,
 )
 from src.pipeline.stages.common import StageAgentExecutor
 
@@ -63,6 +64,7 @@ def build_default_pipeline(runtime: RuntimePipelineConfig) -> PipelineOrchestrat
     pipeline = PipelineOrchestrator(
         stages=[
             DemandAnalysisStage(agent_executor=agent_executor),
+            QueryExpansionStage(agent_executor=agent_executor),
             ContentSearchStage(adapter=adapter, agent_executor=agent_executor),
             HardFilterStage(),
             CoarseFilterStage(agent_executor=agent_executor),

+ 2 - 0
src/pipeline/stages/__init__.py

@@ -6,6 +6,7 @@ from .content_filter import HardFilterStage, QualityFilterStage
 from .content_search import ContentSearchStage
 from .demand_analysis import DemandAnalysisStage
 from .output_persist import OutputPersistStage
+from .query_expansion import QueryExpansionStage
 
 __all__ = [
     "AccountPrecipitateStage",
@@ -15,4 +16,5 @@ __all__ = [
     "HardFilterStage",
     "OutputPersistStage",
     "QualityFilterStage",
+    "QueryExpansionStage",
 ]

+ 15 - 2
src/pipeline/stages/account_precipitate.py

@@ -7,6 +7,7 @@ from typing import Dict, List
 from src.pipeline.adapters.base import ToolAdapter
 from src.pipeline.base import Stage
 from src.pipeline.context import AccountInfo, ArticleAccountRelation, PipelineContext
+from src.pipeline.policy_resolver import get_policy_value
 
 
 class AccountPrecipitateStage(Stage):
@@ -30,6 +31,10 @@ class AccountPrecipitateStage(Stage):
         """
         account_map: Dict[str, AccountInfo] = {}
         relations: List[ArticleAccountRelation] = []
+        policy = ctx.metadata.get("search_agent_policy") or {}
+        account_strategy = get_policy_value(policy, "account_strategy", {}, section="account") or {}
+        sample_limit = int(account_strategy.get("sample_articles_limit", 5))
+        source_url_limit = int(account_strategy.get("source_urls_limit", 100))
 
         for article in ctx.filtered_articles:
             account = await self.adapter.get_account(article.url)
@@ -48,9 +53,17 @@ class AccountPrecipitateStage(Stage):
                 account_map[key] = existing
 
             existing.article_count += 1
-            if article.title and article.title not in existing.sample_articles and len(existing.sample_articles) < 5:
+            if (
+                article.title
+                and article.title not in existing.sample_articles
+                and len(existing.sample_articles) < sample_limit
+            ):
                 existing.sample_articles.append(article.title)
-            if article.url and article.url not in existing.source_urls:
+            if (
+                article.url
+                and article.url not in existing.source_urls
+                and len(existing.source_urls) < source_url_limit
+            ):
                 existing.source_urls.append(article.url)
 
             relations.append(ArticleAccountRelation(article_url=article.url, wx_gh=existing.wx_gh))

+ 129 - 20
src/pipeline/stages/coarse_filter.py

@@ -1,10 +1,10 @@
 from __future__ import annotations
 
-"""粗筛阶段:基于标题的 LLM 语义相关性批量判断。
+"""粗筛阶段:基于标题的 LLM 语义相关性批量判断 + 爆款特征打分
 
 在 HardFilterStage 之后、QualityFilterStage 之前执行。
 用 LLM 对候选文章标题做批量语义相关性判断,快速淘汰明显不相关的文章,
-减少后续 detail API 调用量。
+然后对通过的文章进行爆款特征打分,根据分数阈值筛选,减少后续 detail API 调用量。
 """
 
 import logging
@@ -12,6 +12,7 @@ from typing import Any, Dict, List
 
 from src.pipeline.base import Stage
 from src.pipeline.context import PipelineContext
+from src.pipeline.policy_resolver import get_policy_value
 from src.pipeline.stages.common import StageAgentExecutor
 
 logger = logging.getLogger(__name__)
@@ -37,36 +38,80 @@ class CoarseFilterStage(Stage):
         articles = ctx.candidate_articles
         query = ctx.query
 
-        # 构建需求特征摘要供 LLM 参考
-        demand_summary = self._build_demand_summary(ctx)
+        policy = ctx.metadata.get("search_agent_policy") or {}
+        score_threshold = int(get_policy_value(policy, "coarse_score_threshold", 15, section="filter"))
 
+        demand_summary = self._build_demand_summary(ctx)
         coarse_log: List[Dict[str, Any]] = []
         passed_articles = []
 
         # 分批处理
         for batch_start in range(0, len(articles), self.batch_size):
             batch = articles[batch_start : batch_start + self.batch_size]
-            batch_results = await self._judge_batch(query, demand_summary, batch, ctx)
 
-            for article, result in zip(batch, batch_results):
-                relevance = result.get("relevance", "reject")
-                reason = result.get("reason", "")
-                status = "pass" if relevance == "pass" else "reject"
+            # 步骤 1:语义相关性判断
+            batch_relevance = await self._judge_batch(query, demand_summary, batch, ctx)
+
+            # 收集通过语义判断的文章
+            passed_indices = [
+                i for i, r in enumerate(batch_relevance)
+                if r.get("relevance") == "pass"
+            ]
+
+            if not passed_indices:
+                for article, result in zip(batch, batch_relevance):
+                    coarse_log.append({
+                        "title": article.title,
+                        "url": article.url,
+                        "source_keyword": article.source_keyword,
+                        "status": "reject",
+                        "reason": result.get("reason", ""),
+                        "score": 0,
+                        "features": [],
+                    })
+                continue
 
-                coarse_log.append({
-                    "title": article.title,
-                    "url": article.url,
-                    "source_keyword": article.source_keyword,
-                    "status": status,
-                    "reason": reason,
-                })
+            # 步骤 2:对通过的文章进行爆款特征打分
+            passed_batch = [batch[i] for i in passed_indices]
+            batch_scores = await self._score_batch(passed_batch, ctx)
+            score_map: Dict[int, Dict] = {
+                passed_indices[j]: batch_scores[j]
+                for j in range(len(passed_indices))
+            }
 
-                if status == "pass":
-                    passed_articles.append(article)
+            for i, (article, relevance) in enumerate(zip(batch, batch_relevance)):
+                if relevance.get("relevance") != "pass":
+                    coarse_log.append({
+                        "title": article.title,
+                        "url": article.url,
+                        "source_keyword": article.source_keyword,
+                        "status": "reject",
+                        "reason": relevance.get("reason", ""),
+                        "score": 0,
+                        "features": [],
+                    })
+                else:
+                    sr = score_map.get(i, {"score": 0, "features": []})
+                    score = int(sr.get("score", 0))
+                    features = sr.get("features", [])
+                    status = "pass" if score >= score_threshold else "low_score"
+                    if status == "pass":
+                        passed_articles.append(article)
+                    coarse_log.append({
+                        "title": article.title,
+                        "url": article.url,
+                        "source_keyword": article.source_keyword,
+                        "status": status,
+                        "reason": relevance.get("reason", ""),
+                        "score": score,
+                        "features": features,
+                    })
 
+        reject_count = sum(1 for r in coarse_log if r["status"] == "reject")
+        low_score_count = sum(1 for r in coarse_log if r["status"] == "low_score")
         logger.info(
-            "coarse_filter 粗筛完成: %d → %d 篇 (淘汰 %d 篇)",
-            len(articles), len(passed_articles), len(articles) - len(passed_articles),
+            "coarse_filter 完成: %d → %d 篇 (语义淘汰 %d, 低分淘汰 %d, 阈值 %d)",
+            len(articles), len(passed_articles), reject_count, low_score_count, score_threshold,
         )
 
         ctx.candidate_articles = passed_articles
@@ -135,6 +180,70 @@ class CoarseFilterStage(Stage):
             logger.warning("coarse_filter LLM 调用失败,全部放行: %s", exc)
             return [{"relevance": "pass", "reason": "LLM 调用失败,默认通过"} for _ in batch]
 
+    async def _score_batch(
+        self,
+        batch: list,
+        ctx: PipelineContext,
+    ) -> List[Dict[str, Any]]:
+        """对一批文章标题进行爆款特征打分。"""
+        titles_block = "\n".join(
+            f"{i + 1}. {a.title}" for i, a in enumerate(batch)
+        )
+
+        messages = [
+            {
+                "role": "system",
+                "content": (
+                    "你是爆款标题特征评分专家。根据以下爆款标题特征权重表对标题打分:\n\n"
+                    "**加分项(爆款特征):**\n"
+                    "1. 情绪极端化:25分(愤怒、恐慌、感动、震惊、痛心、太可怕了、必看、吓坏)\n"
+                    "2. 名人/大国冲突:20分(特朗普、中美、俄乌、普京、大国博弈)\n"
+                    "3. 悬念制造:15分(真相、内幕、不为人知、背后的秘密、终于曝光)\n"
+                    "4. 数字具体化:10分(3个信号、5大变化、100万人、暴涨300%)\n"
+                    "5. 时间紧迫感:10分(刚刚、紧急通知、最新消息、马上、突发)\n"
+                    "6. 对比/意外转折:8分(没想到、竟然、反而、万万没想到、出人意料)\n"
+                    "7. 阵营对立:7分(中国vs美国、正义vs邪恶、我们vs他们)\n"
+                    "8. 军事危机暗示:5分(战争、冲突、危机、威胁、军事行动)\n\n"
+                    "**减分项(平铺直述):**\n"
+                    "- 标题过于平淡、缺乏吸引力、纯陈述性:-10分\n"
+                    "  (如:「某某发布通知」「某某召开会议」「某某介绍情况」)\n\n"
+                    "评分规则:\n"
+                    "- 如果标题符合某个加分特征,就加上该特征的权重分\n"
+                    "- 如果标题过于平铺直述,扣 10 分\n"
+                    "- 最终得分 = 所有加分项之和 - 减分项(最低 0 分)\n"
+                    "- 只输出分数和匹配的特征名称,不要给理由和解释\n\n"
+                    "输出格式:JSON,放在 ```json 代码块中。\n"
+                    "```json\n"
+                    '{"results": [{"index": 1, "score": 35, "features": ["情绪极端化", "悬念制造"]}, ...]}\n'
+                    "```"
+                ),
+            },
+            {
+                "role": "user",
+                "content": f"请对以下 {len(batch)} 篇文章标题进行爆款特征打分:\n\n{titles_block}",
+            },
+        ]
+
+        try:
+            result = await self.agent_executor.run_simple_llm_json(
+                name="标题打分",
+                messages=messages,
+                ctx=ctx,
+            )
+            items = result.get("results", [])
+            indexed: Dict[int, Dict] = {}
+            for item in items:
+                idx = item.get("index", 0)
+                if isinstance(idx, int) and 1 <= idx <= len(batch):
+                    indexed[idx] = item
+            return [
+                indexed.get(i + 1, {"score": 0, "features": []})
+                for i in range(len(batch))
+            ]
+        except Exception as exc:
+            logger.warning("标题打分 LLM 调用失败,全部给 0 分: %s", exc)
+            return [{"score": 0, "features": []} for _ in batch]
+
     @staticmethod
     def _build_demand_summary(ctx: PipelineContext) -> str:
         """从需求分析结果中提取摘要信息供粗筛 LLM 参考。"""

+ 1 - 0
src/pipeline/stages/common.py

@@ -87,6 +87,7 @@ class StageAgentExecutor:
             max_iterations=self.max_iterations,
             tools=allowed_tools,
             skills=skills,
+            parent_trace_id=ctx.trace_id,  # 传递 pipeline trace_id 作为 parent
             extra_llm_params=self.extra_llm_params,
             knowledge=KnowledgeConfig(
                 enable_extraction=False,

+ 6 - 8
src/pipeline/stages/content_filter.py

@@ -12,6 +12,7 @@ from typing import Any, Dict, List, Tuple
 from src.pipeline.adapters.base import ToolAdapter
 from src.pipeline.base import Stage
 from src.pipeline.context import CandidateArticle, FilteredArticle, PipelineContext
+from src.pipeline.policy_resolver import get_policy_value
 from src.pipeline.stages.common import StageAgentExecutor
 
 logger = logging.getLogger(__name__)
@@ -128,7 +129,7 @@ class QualityScoreConfig:
 
     def merge_policy(self, policy: Dict) -> "QualityScoreConfig":
         """用 DB 策略中的值覆盖当前配置,返回新实例。"""
-        score_cfg = policy.get("quality_score") or {}
+        score_cfg = get_policy_value(policy, "quality_score", {}, section="filter")
         if not score_cfg:
             return self
         return QualityScoreConfig(
@@ -210,9 +211,10 @@ class QualityFilterStage(Stage):
         4) 按等级与时间排序后截断到目标数量
         """
         policy = ctx.metadata.get("search_agent_policy") or {}
-        limit = int(policy.get("max_detail_fetch", self.detail_limit))
-        if "enable_llm_review" in policy:
-            enable_llm = bool(policy["enable_llm_review"]) and self.agent_executor is not None
+        limit = int(get_policy_value(policy, "max_detail_fetch", self.detail_limit, section="filter"))
+        llm_toggle = get_policy_value(policy, "enable_llm_review", None, section="filter")
+        if llm_toggle is not None:
+            enable_llm = bool(llm_toggle) and self.agent_executor is not None
         else:
             enable_llm = self.enable_llm_review
 
@@ -375,10 +377,6 @@ class QualityFilterStage(Stage):
         # interest 基于数据指标
         interest = "high" if len(body_text) >= cfg.min_body_length else "medium"
 
-        # spam 检测仍保留为硬规则
-        if any(flag in haystack_lower for flag in cfg.spam_keywords):
-            return "low", "low", "存在明显标题党或情绪煽动风险"
-
         # 利用阅读量/互动数据辅助判断 interest
         view_count = detail.view_count
         engagement = detail.like_count + detail.share_count + detail.looking_count

+ 103 - 22
src/pipeline/stages/content_search.py

@@ -13,6 +13,7 @@ from agent.tools.builtin.knowledge import KnowledgeConfig
 from src.pipeline.adapters.base import ToolAdapter
 from src.pipeline.base import Stage
 from src.pipeline.context import CandidateArticle, PipelineContext
+from src.pipeline.policy_resolver import get_policy_value
 from src.pipeline.stages.common import StageAgentExecutor, _append_llm_interaction, _compact_messages, extract_json_object
 
 # 从 weixin_search 工具输出中提取 JSON 文章列表
@@ -49,7 +50,7 @@ class ContentSearchStage(Stage):
         例:目标 10 篇 × 5.0 = 最多 50 篇候选进入过滤阶段。
         """
         policy = ctx.metadata.get("search_agent_policy") or {}
-        mult = float(policy.get("recall_multiplier", self.recall_multiplier))
+        mult = float(get_policy_value(policy, "recall_multiplier", self.recall_multiplier, section="search"))
         return max(int(ctx.target_count * mult), ctx.target_count + 1)
 
     async def execute(self, ctx: PipelineContext) -> PipelineContext:
@@ -79,8 +80,25 @@ class ContentSearchStage(Stage):
         analysis = ctx.demand_analysis
         assert analysis is not None
 
-        precise_keywords = json.dumps(analysis.search_strategy.precise_keywords, ensure_ascii=False)
-        topic_keywords = json.dumps(analysis.search_strategy.topic_keywords, ensure_ascii=False)
+        # 优先使用拓展后的关键词
+        if ctx.expanded_query and ctx.expanded_query.expanded_keywords:
+            # 使用拓展关键词(已按权重排序)
+            expanded_kws = [
+                item.get("keyword", "")
+                for item in ctx.expanded_query.expanded_keywords
+                if item.get("keyword")
+            ]
+            search_keywords = json.dumps(expanded_kws, ensure_ascii=False)
+            keywords_source = "拓展关键词(基于爆款特征)"
+        else:
+            # 回退到原始关键词
+            original_kws = (
+                analysis.search_strategy.precise_keywords
+                + analysis.search_strategy.topic_keywords
+            )
+            search_keywords = json.dumps(original_kws, ensure_ascii=False)
+            keywords_source = "原始关键词"
+
         upper_features = json.dumps(analysis.upper_features, ensure_ascii=False)
         lower_features = json.dumps(analysis.lower_features, ensure_ascii=False)
         filter_focus = ""
@@ -103,6 +121,12 @@ class ContentSearchStage(Stage):
         max_recall = self._max_recall(ctx)
         existing_count = len(ctx.candidate_articles)
         remaining_quota = max(max_recall - existing_count, 0)
+        if remaining_quota <= 0:
+            logging.getLogger(__name__).info(
+                "content_search(agent) 提前结束: 已达召回上限 %d 篇",
+                max_recall,
+            )
+            return ctx
 
         messages = [
             {
@@ -122,15 +146,15 @@ class ContentSearchStage(Stage):
 目标文章数: {ctx.target_count}
 召回上限: {remaining_quota} 篇(已有 {existing_count} 篇候选,总上限 {max_recall} 篇)
 
-需求分析结果:
-- 精准词候选: {precise_keywords}
-- 主题下钻候选: {topic_keywords}
+搜索关键词({keywords_source}): {search_keywords}
+
+需求分析上下文:
 - 上层特征: {upper_features}
 - 下层特征: {lower_features}
 - {filter_focus}
 {fallback_hint}
 
-注意:搜索 2-3 个关键词即可,不要搜索过多。优先使用最相关的精准词
+注意:搜索 2-3 个关键词即可,不要搜索过多。优先使用权重高的关键词(列表前面的)
 
 请按照 content_finding_strategy 技能中的方法论执行搜索,完成后输出 JSON:
 ```json
@@ -152,6 +176,7 @@ class ContentSearchStage(Stage):
             max_iterations=self.agent_executor.max_iterations,
             tools=["weixin_search"],
             skills=["content_finding_strategy"],
+            parent_trace_id=ctx.trace_id,  # 传递 pipeline trace_id 作为 parent
             extra_llm_params=self.agent_executor.extra_llm_params,
             knowledge=KnowledgeConfig(
                 enable_extraction=False,
@@ -169,6 +194,7 @@ class ContentSearchStage(Stage):
         max_recall = self._max_recall(ctx)
         logger = logging.getLogger(__name__)
         logger.info("content_search(agent) 最大召回上限: %d 篇 (target=%d)", max_recall, ctx.target_count)
+        stopped_early = False
 
         import time as _time
         t0 = _time.monotonic()
@@ -218,6 +244,16 @@ class ContentSearchStage(Stage):
                                 break
                             if article.url not in dedup:
                                 dedup[article.url] = article
+                        if len(dedup) >= max_recall:
+                            stopped_early = True
+                            logger.info(
+                                "content_search(agent) 提前停止: 候选达到上限 %d 篇",
+                                max_recall,
+                            )
+                            break
+            if len(dedup) >= max_recall:
+                stopped_early = True
+                break
 
         duration_ms = int((_time.monotonic() - t0) * 1000)
 
@@ -235,6 +271,7 @@ class ContentSearchStage(Stage):
         })
 
         ctx.candidate_articles = list(dedup.values())
+        ctx.metadata["_search_stopped_early"] = stopped_early
 
         # 尝试从 agent 最终输出提取 keyword_stats(可选)
         keyword_stats = []
@@ -244,6 +281,7 @@ class ContentSearchStage(Stage):
                 keyword_stats = data.get("keyword_stats", [])
                 break
         ctx.metadata["_search_keyword_stats"] = keyword_stats
+        self._archive_search_feedback(ctx, keyword_stats=keyword_stats, stopped_early=stopped_early)
         return ctx
 
     @staticmethod
@@ -292,10 +330,14 @@ class ContentSearchStage(Stage):
     async def _code_search(self, ctx: PipelineContext) -> PipelineContext:
         """代码驱动搜索:按关键词依次调用 adapter.search。"""
         policy = ctx.metadata.get("search_agent_policy") or {}
-        page = str(policy.get("initial_cursor", self.page))
+        page = str(get_policy_value(policy, "initial_cursor", self.page, section="search"))
         max_recall = self._max_recall(ctx)
+        max_per_keyword = int(get_policy_value(policy, "max_per_keyword", 3, section="search"))
         logger = logging.getLogger(__name__)
-        logger.info("content_search(code) 最大召回上限: %d 篇 (target=%d)", max_recall, ctx.target_count)
+        logger.info(
+            "content_search(code) 最大召回上限: %d 篇 (target=%d), 每词上限: %d 篇",
+            max_recall, ctx.target_count, max_per_keyword,
+        )
 
         fallback_round = ctx.metadata.get("_fallback_round", 0)
         keywords = self._build_keywords(ctx, fallback_round=fallback_round)
@@ -307,30 +349,42 @@ class ContentSearchStage(Stage):
                 break
             before = len(dedup)
             articles = await self.adapter.search(keyword=keyword, page=page)
+
+            # 每个关键词最多保留 max_per_keyword 篇(按阅读量降序)
+            articles.sort(key=lambda a: a.view_count or 0, reverse=True)
+            added_count = 0
             for article in articles:
+                if added_count >= max_per_keyword:
+                    break
                 article.source_keyword = keyword
                 article.recall_round = index
                 if article.url not in dedup:
                     dedup[article.url] = article
+                    added_count += 1
+
             keyword_stats.append({
                 "keyword": keyword,
                 "round": index,
                 "returned": len(articles),
-                "new": len(dedup) - before,
+                "new": added_count,
             })
 
+        stopped_early = len(dedup) >= max_recall
         ctx.candidate_articles = list(dedup.values())
         ctx.metadata["_search_keyword_stats"] = keyword_stats
+        ctx.metadata["_search_stopped_early"] = stopped_early
+        self._archive_search_feedback(ctx, keyword_stats=keyword_stats, stopped_early=stopped_early)
         return ctx
 
     def _build_keywords(self, ctx: PipelineContext, *, fallback_round: int = 0) -> List[str]:
         """
         构建搜索词队列。
 
-        来源:
-        - demand_analysis 产出的精准词/主题词/上下层特征
-        - policy.extra_keywords
-        - 原始 query(兜底)
+        来源优先级:
+        1. expanded_query 拓展关键词(按爆款特征权重排序)
+        2. demand_analysis 产出的精准词/主题词/上下层特征
+        3. policy.extra_keywords
+        4. 原始 query(兜底)
 
         回退搜索(fallback_round >= 1)时:
         - 跳过上一轮已使用的关键词
@@ -339,9 +393,10 @@ class ContentSearchStage(Stage):
         - 增大搜索词数量上限
         """
         policy = ctx.metadata.get("search_agent_policy") or {}
-        max_kw = int(policy.get("max_keywords", self.max_keywords))
-        priority = policy.get("keyword_priority", "demand_first")
-        extras = [str(x).strip() for x in (policy.get("extra_keywords") or []) if str(x).strip()]
+        max_kw = int(get_policy_value(policy, "max_keywords", self.max_keywords, section="search"))
+        priority = str(get_policy_value(policy, "keyword_priority", "demand_first", section="search"))
+        extras_raw = get_policy_value(policy, "extra_keywords", [], section="search")
+        extras = [str(x).strip() for x in (extras_raw or []) if str(x).strip()]
 
         analysis = ctx.demand_analysis
         assert analysis is not None
@@ -354,14 +409,21 @@ class ContentSearchStage(Stage):
             # 回退轮增大关键词数量上限
             max_kw = max(max_kw, self.max_keywords) + 4
 
+        # 构建拓展关键词列表(按权重排序)
+        expanded_kws: List[str] = []
+        if ctx.expanded_query and ctx.expanded_query.expanded_keywords:
+            expanded_kws = [
+                str(item.get("keyword", "")).strip()
+                for item in ctx.expanded_query.expanded_keywords
+                if str(item.get("keyword", "")).strip()
+            ]
+
         from_demand: List[str] = []
         if fallback_round >= 1:
-            # 回退搜索:优先未用过的 topic_keywords 和 lower/upper features
             from_demand.extend(analysis.search_strategy.topic_keywords)
             from_demand.extend(analysis.lower_features)
             from_demand.extend(analysis.upper_features)
             from_demand.extend(analysis.search_strategy.precise_keywords)
-            # 加入 filter_focus.relevance_focus 作为补充搜索词
             if analysis.filter_focus and analysis.filter_focus.relevance_focus:
                 from_demand.extend(analysis.filter_focus.relevance_focus)
         else:
@@ -372,10 +434,11 @@ class ContentSearchStage(Stage):
 
         query = str(ctx.query).strip()
 
+        # 拓展关键词优先,然后是 demand 关键词
         if priority == "query_first":
-            ordered = [query] + extras + from_demand
+            ordered = [query] + expanded_kws + extras + from_demand
         else:
-            ordered = from_demand + extras + [query]
+            ordered = expanded_kws + from_demand + extras + [query]
 
         seen = set()
         keywords: List[str] = []
@@ -383,7 +446,6 @@ class ContentSearchStage(Stage):
             value = str(keyword).strip()
             if not value or value in seen:
                 continue
-            # 回退搜索时跳过上一轮已使用的关键词
             if fallback_round >= 1 and value in used_keywords:
                 continue
             seen.add(value)
@@ -391,3 +453,22 @@ class ContentSearchStage(Stage):
             if len(keywords) >= max_kw:
                 break
         return keywords
+
+    @staticmethod
+    def _archive_search_feedback(
+        ctx: PipelineContext,
+        *,
+        keyword_stats: List[Dict],
+        stopped_early: bool,
+    ) -> None:
+        """把本轮搜索效果沉淀到 metadata,供后续 run 复用。"""
+        history = ctx.metadata.setdefault("_search_feedback_history", [])
+        if not isinstance(history, list):
+            return
+        history.append({
+            "query": ctx.query,
+            "fallback_round": ctx.metadata.get("_fallback_round", 0),
+            "candidate_count": len(ctx.candidate_articles),
+            "stopped_early": stopped_early,
+            "keyword_stats": keyword_stats,
+        })

+ 60 - 1
src/pipeline/stages/demand_analysis.py

@@ -2,6 +2,7 @@ from __future__ import annotations
 
 """需求理解阶段:把自然语言 query 转成结构化策略。"""
 
+import json
 from typing import List
 
 from src.pipeline.base import Stage
@@ -28,6 +29,7 @@ class DemandAnalysisStage(Stage):
         - ctx.demand_analysis
         """
         knowledge_context = await _build_knowledge_context(ctx)
+        feedback_context = _build_search_feedback_context(ctx)
         messages = [
             {
                 "role": "system",
@@ -48,11 +50,19 @@ class DemandAnalysisStage(Stage):
 补充知识:
 {knowledge_context or "无"}
 
+历史搜索反馈:
+{feedback_context or "无"}
+
 要求:
 1. 只能用 query 中已有词语做归类,禁止编造核心特征。
 2. 先区分 `实质特征` 与 `形式特征`。
 3. 只对 `实质特征` 继续区分 `上层特征` 与 `下层特征`。
-4. 输出 JSON:
+4. 结合历史搜索反馈优化策略:
+   - noise_ratio > 0.6 的关键词路径应降权或替换,说明该词召回大量重复内容
+   - new 数量高且 noise_ratio 低的词路优先保留
+   - 若上一轮 stopped_early=True,说明当前词路召回效率足够,应保留其高产关键词
+   - 若上一轮 stopped_early=False 且候选不足,需要拓展新的搜索角度
+5. 输出 JSON:
 ```json
 {{
   "特征归类": {{
@@ -121,6 +131,55 @@ async def _build_knowledge_context(ctx: PipelineContext) -> str:
     return "\n".join(lines)
 
 
+def _build_search_feedback_context(ctx: PipelineContext) -> str:
+    """
+    构建历史搜索反馈上下文。
+
+    支持两种 metadata 键:
+    - search_feedback_history(推荐,外部注入)
+    - _search_feedback_history(内部累积)
+    """
+    history = ctx.metadata.get("search_feedback_history")
+    if history is None:
+        history = ctx.metadata.get("_search_feedback_history")
+    if not isinstance(history, list) or not history:
+        return ""
+
+    lines: List[str] = []
+    for idx, item in enumerate(history[:5], start=1):
+        if not isinstance(item, dict):
+            continue
+        query = str(item.get("query", "")).strip()
+        note = str(item.get("note", "")).strip()
+        fallback_round = item.get("fallback_round", 0)
+        candidate_count = item.get("candidate_count", 0)
+        stopped_early = item.get("stopped_early", False)
+        keyword_stats = item.get("keyword_stats")
+        compact_stats = []
+        if isinstance(keyword_stats, list):
+            for stat in keyword_stats[:8]:
+                if not isinstance(stat, dict):
+                    continue
+                returned = int(stat.get("returned", 0) or 0)
+                new = int(stat.get("new", 0) or 0)
+                noise_ratio = round((returned - new) / returned, 2) if returned > 0 else 0.0
+                compact_stats.append({
+                    "keyword": str(stat.get("keyword", "")).strip(),
+                    "returned": returned,
+                    "new": new,
+                    "noise_ratio": noise_ratio,
+                })
+        lines.append(f"### 反馈 {idx}")
+        if query:
+            lines.append(f"- query: {query}")
+        if note:
+            lines.append(f"- 备注: {note}")
+        lines.append(f"- 补召回轮次: {fallback_round}, 累计候选: {candidate_count}, 提前停搜: {stopped_early}")
+        if compact_stats:
+            lines.append(f"- 关键词效果: {json.dumps(compact_stats, ensure_ascii=False)}")
+    return "\n".join(lines)
+
+
 def _ensure_list(value) -> List[str]:
     """把外部结果安全规范为字符串列表。"""
     if isinstance(value, list):

+ 0 - 132
tests/output.json

@@ -1,132 +0,0 @@
-{
-  "trace_id": "content_finding_iran_israel_peace_20260326",
-  "query": "伊朗、以色列、和平是永恒的主题",
-  "demand_id": "1",
-  "summary": {
-    "candidate_count": 50,
-    "filtered_in_count": 20,
-    "account_count": 16
-  },
-  "contents": [
-    {
-      "title": "伊朗和以色列:和平的契机",
-      "url": "https://mp.weixin.qq.com/s?__biz=MzI0OTU4NDU2NQ==&mid=2247484382&idx=1&sn=4c44e3a214df638eca93986a32a1333e",
-      "statistics": {},
-      "reason": "TED演讲《伊朗和以色列:和平的契机》,直接呼应主题,由知名社会活动家Trita Parsi主讲,探讨两国和平可能性,内容权威、正能量,适合老年人分享观看"
-    },
-    {
-      "title": "从"阿克萨洪水"泄去,到"史诗怒火"熄灭,中东和平终将到来",
-      "url": "https://mp.weixin.qq.com/s?__biz=Mzk0NzQzNzY5NA==&mid=2247486586&idx=1&sn=9c23d93b8890d5d4648984df58acb08d",
-      "statistics": {},
-      "reason": "深度分析中东和平曙光,以"和平终将到来"为核心论点,历史感强,情感积极,符合老年人对和平的期盼,与主题高度契合"
-    },
-    {
-      "title": "美以伊战争分析——和谈的条件已经在萌芽",
-      "url": "https://mp.weixin.qq.com/s?__biz=MzU3NjAzMzM5Ng==&mid=2247484503&idx=1&sn=dbfff22eaed2250a213e3e5c6a97f820",
-      "statistics": {},
-      "reason": "深度分析美以伊三方和谈条件,逻辑清晰,展望和平前景,适合关注时事的老年人,内容理性客观"
-    },
-    {
-      "title": "伊朗与以色列由"热"变"冷"的关系探析",
-      "url": "https://mp.weixin.qq.com/s?__biz=MzA5Mzc1NTYwMQ==&mid=2649683805&idx=1&sn=b61e99f1cfca068132d271a2a9832f29",
-      "statistics": {},
-      "reason": "节选自《史学月刊》学术文章,系统梳理伊以关系从友好到对立的历史演变,历史感强,文笔流畅,适合老年人了解历史背景"
-    },
-    {
-      "title": "为中东求平安!恨能挑起事端",
-      "url": "https://mp.weixin.qq.com/s?__biz=MzkwODg4NDkwOQ==&mid=2247489216&idx=1&sn=e62c7e5152a22141d6b8f7f69110b174",
-      "statistics": {},
-      "reason": "从人文关怀角度呼吁中东和平,情感真挚,有宗教情怀,呼吁放下仇恨、和平相处,非常适合有宗教信仰的老年人分享"
-    },
-    {
-      "title": "以伊"12天战争",结束了?",
-      "url": "https://mp.weixin.qq.com/s?__biz=MjM5MDU1Mzg3Mw==&mid=2651666228&idx=1&sn=9f3aa30d2bb89e1151b3e5e851fbac7e",
-      "statistics": {},
-      "reason": "中国新闻周刊权威报道,深度分析以伊12天战争停火始末,内容专业权威,适合关注国际时事的老年人,可信度高"
-    },
-    {
-      "title": "从"天然盟友"到不共戴天:以色列与伊朗的历史积怨与现实纠葛",
-      "url": "https://mp.weixin.qq.com/s?__biz=MzIwNTEzMzQ1NA==&mid=2648346427&idx=1&sn=933a5e1283454221f323730d1a355a1f",
-      "statistics": {},
-      "reason": "深度历史文章,从居鲁士大帝到现代冲突,梳理两国千年恩怨,文笔优美,历史感强,结尾呼吁和平,非常适合老年人阅读"
-    },
-    {
-      "title": "伊朗与以色列:"我们原本并非死敌"",
-      "url": "https://mp.weixin.qq.com/s?__biz=MzU1NjQ4ODEwMQ==&mid=2247487606&idx=1&sn=fa428d5ce0f8bb774d096b4264f27536",
-      "statistics": {},
-      "reason": "通俗易懂地讲述伊以关系从蜜月到决裂的历史,语言生动,有历史温情,标题引人入胜,适合老年人了解历史真相"
-    },
-    {
-      "title": "以色列打不动了想停火,伊朗说不:47年的账,今天得算清",
-      "url": "https://mp.weixin.qq.com/s?__biz=MzI2NDA1MTEwNQ==&mid=2247617031&idx=1&sn=286a96c958af7dc5b253d1a2c518e412",
-      "statistics": {},
-      "reason": "时事热点文章,叙事生动,深度分析停火博弈,结尾呼吁真正和平,语言通俗,适合老年人了解当前局势"
-    },
-    {
-      "title": "武力换不来中东和平,是国际社会共识",
-      "url": "https://mp.weixin.qq.com/s?__biz=MTQzMTE0MjcyMQ==&mid=2667843652&idx=1&sn=9a2dc886bd0793887f79e2ba10e58cb1",
-      "statistics": {},
-      "reason": "环球时报社评,权威媒体,明确表达和平立场,引用中国外长王毅呼吁停火,体现中国和平主张,适合老年人分享传播"
-    },
-    {
-      "title": "伊朗和以色列为什么会停战?",
-      "url": "https://mp.weixin.qq.com/s?__biz=MzIwMzAwMzQxNw==&mid=2756747925&idx=1&sn=5c0c8d6f0fbeee1c1365edfd2305d719",
-      "statistics": {},
-      "reason": "卢克文工作室出品,高热度大V文章,深度分析停战原因,语言生动有趣,结尾反思战争代价呼吁真正和平,老年人喜爱的风格"
-    },
-    {
-      "title": "以色列与伊朗:战火重燃,还是永久握手言和?",
-      "url": "https://mp.weixin.qq.com/s?__biz=Mzg3NjUyNjc5Mw==&mid=2247484806&idx=1&sn=1ef51adb1607300430c3989b50b99312",
-      "statistics": {},
-      "reason": "系统分析以伊冲突的深层矛盾与和平可能,结尾呼吁和平,强调中国的和平贡献,正能量,适合老年人分享"
-    },
-    {
-      "title": "中东重燃战火,促和止战才是正道",
-      "url": "https://mp.weixin.qq.com/s?__biz=Mzg5NTg3NDM2Nw==&mid=2247486429&idx=1&sn=4e1ae791124459e2ff6ddba088e98bd2",
-      "statistics": {},
-      "reason": "从基督教视角呼吁和平,引用圣经智慧,情感真挚,有祷告祈愿,适合有宗教信仰的老年人,和平主题突出"
-    },
-    {
-      "title": "以色列和伊朗,一对孪生的镜像",
-      "url": "https://mp.weixin.qq.com/s?__biz=MzI1NzA4NTYwNg==&mid=2649232215&idx=1&sn=bacc2dab481374ede4aee1ce3cfcb312",
-      "statistics": {},
-      "reason": "独特视角深度分析两国内部矛盾与相似性,文章深刻,揭示战争背后的政治逻辑,适合有思考力的老年读者"
-    },
-    {
-      "title": "深度解读:中东人都渴望和平,为何战火从未熄灭?",
-      "url": "https://mp.weixin.qq.com/s?__biz=MzA5ODA1NTExMQ==&mid=2650200536&idx=1&sn=bdc5f6a272cad350793c98b1504cc802",
-      "statistics": {},
-      "reason": "从地理、历史、文明、殖民、石油、霸权六层深度解读中东乱局,引用多位权威学者,结尾呼吁还命运给中东人民,适合老年人深度阅读"
-    },
-    {
-      "title": "还中东以秩序,还人民以安宁,还世界以和平",
-      "url": "https://mp.weixin.qq.com/s?__biz=MzA3MTQ4MDA2Mw==&mid=2650918465&idx=1&sn=05a31e6bf85ea1c3125b7e6dcab0a8c3",
-      "statistics": {},
-      "reason": "中国驻阿联酋大使馆发文,代表中国官方立场,呼吁停火止战,体现中国和平主张,权威可信,适合老年人了解中国外交立场"
-    },
-    {
-      "title": "中东——人类文明的摇篮,为何永无宁日?",
-      "url": "https://mp.weixin.qq.com/s?__biz=MzU0NDQwOTU4Ng==&mid=2247484008&idx=1&sn=5f9666fc8def1a96a856681bb961d57d",
-      "statistics": {},
-      "reason": "从地理、宗教、资源、大国博弈多维度深度解读中东乱局,文笔优美,历史感强,结尾充满希望,适合老年人深度阅读"
-    },
-    {
-      "title": "美国伊朗开始和谈,中东战争要结束了吗?",
-      "url": "https://mp.weixin.qq.com/s?__biz=Mzg3ODE3MDAwMA==&mid=2247485956&idx=1&sn=030d5c96f70f8634663d22e1b441d1f5",
-      "statistics": {},
-      "reason": "分析美伊和谈进展,探讨中东战争结束可能性,时效性强,适合关注时事的老年人了解最新局势"
-    },
-    {
-      "title": "伊朗与以色列的千年恩怨 | 范鸿达",
-      "url": "https://mp.weixin.qq.com/s?__biz=MzI5ODc4Njc5Ng==&mid=2247505775&idx=2&sn=69c6f4e3b7e3d07f99471ba9b323ae33",
-      "statistics": {},
-      "reason": "知名学者范鸿达撰写,学术权威,系统梳理伊以千年恩怨,适合老年人了解历史背景,可信度高"
-    },
-    {
-      "title": "以色列与伊朗有哪些你死我活的历史恩怨?",
-      "url": "https://mp.weixin.qq.com/s?__biz=MzU1MDAzOTQ5NA==&mid=2247490015&idx=1&sn=afe29bc1290a51044422ffd1d4f4c16c",
-      "statistics": {},
-      "reason": "通俗易懂地梳理以伊历史恩怨,语言生动,适合老年人了解两国冲突根源,有助于理解当前局势"
-    }
-  ]
-}

+ 6 - 0
tests/skills/account_precipitation.md

@@ -3,6 +3,12 @@ name: account_precipitation
 description: 账号沉淀策略(Harness 架构:biz 批量合并 + 质量分级)
 ---
 
+> **注意**:此文件仅作文档参考,未被代码引用。
+> 
+> `AccountPrecipitateStage` 是纯代码驱动,不调用 LLM,不使用 skill。
+> 
+> 本文件保留作为账号沉淀逻辑的参考文档。
+
 # 账号沉淀策略
 
 ---

+ 1 - 0
tests/skills/article_finding_strategy.md

@@ -41,6 +41,7 @@ description: 内容搜索方法论(Harness 架构:两轨搜索 + 搜索期
 | 候选上限 P | `target_count × 3` | 达到 P 立即停止搜索,不再调用 weixin_search |
 | 关键词轮询上限 | 精准词 + 下钻词全部使用完 | 用完后不再补充关键词,将已有候选交付筛选 |
 | 单关键词最多翻页 | 2 页(首页 + next_cursor 续页) | 单词超 2 页不再翻页,换下一个关键词 |
+| 单关键词保留上限 | 3 条(按阅读量降序) | 每个搜索词最多保留 3 篇最优质文章,保证结果多样性 |
 | 同一 biz 保留上限 | 3 条 | 超出丢弃,防止单一账号垄断候选 |
 
 ---

+ 169 - 33
tests/skills/demand_analysis.md

@@ -1,60 +1,196 @@
 ---
 name: demand_analysis
-description: 需求分析
+description: 需求分析与特征分层(Harness 架构:零工具调用 + 结构化输出)
 ---
 
-# 需求分析(仅理解,不执行)
+# 需求分析策略
 
-输入:逗号分隔特征词,如 `养老,防骗,政策解读,故事化`。
-本步骤只输出结构化理解结果,不调用工具、不执行搜索/过滤/沉淀。
+---
+
+## ⚡ Harness: Fallback — 前置验证(快速失败)
+
+在执行需求分析前,先验证以下前置条件。**任一失败则立即终止。**
+
+| 检查项 | 通过条件 | 失败处理 |
+|---|---|---|
+| query 非空 | `query` 长度 >= 1 | 终止,告知用户"query 为空" |
+| target_count 有效 | `target_count >= 1` | 使用默认值 10 |
+
+---
+
+## 📋 Harness: Planner — 执行计划(开始前打印)
 
-## 步骤1:特征分层
+```
+[DemandAnalysisPlanner]
+  原始 query          = {query}
+  目标文章数          = {target_count}
+  历史搜索反馈        = {feedback_count} 条(fallback_round = {round})
+  补充知识源          = {knowledge_sources}
+  输出目标            = 特征归类 + 起点策略 + 筛选关注点
+```
+
+---
 
-仅对输入词归类,禁止编造新词。
+## 💰 Harness: Budget — 预算约束
+
+| 预算项 | 限制 | 说明 |
+|---|---|---|
+| LLM 调用 | 1 次 | 零工具调用,纯理解任务 |
+| 输出字段 | 固定 3 个顶层字段 | 特征归类、起点策略、筛选关注点 |
+| 精准词候选上限 | <= 6 个 | 避免搜索词过多导致召回质量下降 |
+| 主题下钻候选上限 | <= 6 个 | 同上 |
+
+---
+
+## ⚙️ Core Execution — 核心执行
+
+### 步骤 1:特征分层(仅归类,禁止编造)
+
+**规则 A:实质 vs 形式**
+
+| 特征类型 | 定义 | 示例 |
+|---|---|---|
+| 实质特征 | 主题/问题/对象/场景 | "养老"、"防骗"、"政策解读" |
+| 形式特征 | 表达方式/结构/语气 | "故事化"、"数据化"、"情绪化" |
+
+**规则 B:仅对实质特征继续细分**
+
+| 特征类型 | 定义 | 示例 |
+|---|---|---|
+| 上层特征 | 宽泛,不能直接检索 | "养老政策" |
+| 下层特征 | 具体,可直接检索 | "退休金被骗套路" |
+
+**约束**:
+- `上层 ∪ 下层 = 实质特征`(不重不漏)
+- 只能用 query 中已有词语做归类,**禁止编造核心特征**
+- 形式特征不参与上层/下层细分
+
+---
+
+### 步骤 2:策略判定(只给建议)
+
+| 条件 | 建议策略 | 输出字段 |
+|---|---|---|
+| 下层特征非空 | 精准词直搜 | `建议精准词直搜 = true`,`精准词候选 = 下层特征` |
+| 上层特征非空 | 主题下钻 | `建议主题下钻 = true`,`主题下钻候选 = 上层特征` |
+| 两者都非空 | 并行 | 两个策略都启用 |
+| 只有形式特征 | 用原话构造最小词包 | 从 query 中提取核心名词作为精准词候选 |
+
+---
 
-1. **实质 vs 形式**
-   - `实质特征`:主题/问题/对象/场景
-   - `形式特征`:表达方式/结构/语气(不参与下一步细分)
+### 步骤 3:筛选关注点提取
 
-2. **仅对实质特征继续细分**
-   - `上层特征`:宽泛,不能直接检索(如"养老政策")
-   - `下层特征`:具体,可直接检索(如"退休金被骗套路")
-   - 约束:`上层 ∪ 下层 = 实质特征`
+| 关注点类型 | 定义 | 示例 |
+|---|---|---|
+| 形式规则 | 从形式特征推导的筛选规则 | "故事化" → 需要有具体案例 |
+| 相关性关注点 | 判断文章是否相关的核心要素 | "养老金" → 必须涉及养老金政策或案例 |
+| 淘汰风险点 | 明显不符合需求的关键词 | "广告"、"推销"、"引流加微信"、"恶搞戏说历史"、"纯争议性政治观点输出" |
 
-## 步骤2:策略判定(只给建议)
+**淘汰风险点约束**:
+- 只包含**明确的低质量信号**,如广告、推销、引流等
+- **不包含主观判断**,如"标题党"、"无实质内容"等(这些应由质量筛选阶段判断)
+- 保持精简,避免过度过滤
 
-| 条件 | 建议 |
+---
+
+### 步骤 4:搜索词初步拓展(可选,与 query_expansion 阶段协同)
+
+**目标**:在需求理解阶段就产出初步的搜索词拓展,供后续 query_expansion 阶段参考。
+
+**规则**:
+- 基于实质特征,为每个精准词候选和主题下钻候选生成 1-2 个同义或相关表达
+- 不融入爆款特征(爆款特征由 query_expansion 阶段负责)
+- 输出到 `起点策略.初步拓展词` 字段(可选字段)
+
+**示例**:
+- 原始词:"养老金" → 初步拓展:"退休金"、"养老保险"
+- 原始词:"防骗" → 初步拓展:"反诈"、"防诈骗"
+
+**与 query_expansion 的协同**:
+- 需求理解阶段:产出基础同义词拓展(保守)
+- query_expansion 阶段:基于爆款特征做深度拓展(激进)
+- 两者互补,避免冲突
+
+---
+
+### 步骤 5:历史搜索反馈消费(fallback 轮次)
+
+**当 `fallback_round >= 1` 时**,历史搜索反馈会注入到 prompt 中,包含:
+
+| 反馈字段 | 含义 | 使用方式 |
+|---|---|---|
+| `keyword` | 上一轮使用的搜索词 | 评估该词的召回效果 |
+| `returned` | 该词返回的文章数 | 判断该词的覆盖面 |
+| `new` | 该词新增的文章数(去重后) | 判断该词的有效性 |
+| `noise_ratio` | `(returned - new) / returned` | **> 0.6 视为高噪音,该词路应降权或替换** |
+| `stopped_early` | 是否提前停搜 | `true` 说明当前词路召回效率足够,应保留高产关键词 |
+| `candidate_count` | 累计候选数 | 判断整体召回量是否充足 |
+| `fallback_round` | 补召回轮次 | 判断是否需要更激进的策略调整 |
+
+**决策规则**:
+
+| 场景 | 决策 |
 |---|---|
-| 下层特征非空 | 精准词直搜 |
-| 上层特征非空 | 主题下钻 |
-| 两者都非空 | 并行 |
-| 只有形式特征 | 用原话构造最小词包 |
+| `noise_ratio > 0.6` 的关键词 | 降权或替换,说明该词召回大量重复内容 |
+| `new` 数量高且 `noise_ratio` 低的词 | 优先保留,说明该词路高效 |
+| 上一轮 `stopped_early=True` | 当前词路召回效率足够,应保留其高产关键词 |
+| 上一轮 `stopped_early=False` 且候选不足 | 需要拓展新的搜索角度,避免重复上一轮的词路 |
 
-## 输出模板
+---
+
+## 输出格式
 
 ```json
 {
   "特征归类": {
-    "实质特征": [],
-    "形式特征": [],
-    "上层特征": [],
-    "下层特征": []
+    "实质特征": ["养老", "防骗"],
+    "形式特征": ["故事化"],
+    "上层特征": ["养老政策"],
+    "下层特征": ["退休金被骗套路"]
   },
   "起点策略": {
     "建议精准词直搜": true,
     "建议主题下钻": true,
-    "精准词候选": [],
-    "主题下钻候选": []
+    "精准词候选": ["退休金被骗套路"],
+    "主题下钻候选": ["养老政策"],
+    "初步拓展词": ["退休金", "养老保险", "反诈", "防诈骗"]
   },
   "筛选关注点": {
-    "形式规则": [],
-    "相关性关注点": [],
-    "淘汰风险点": []
+    "形式规则": ["需要有具体案例"],
+    "相关性关注点": ["必须涉及养老金政策或案例"],
+    "淘汰风险点": ["广告", "推销", "引流加微信", "恶搞戏说历史"]
   }
 }
 ```
 
-## 自检
-- 完成实质/形式 + 上层/下层双重标注
-- 只输出理解结果,未执行任何动作
-- 未引入输入外的核心主题词
+**字段约束**:
+- `精准词候选` 和 `主题下钻候选` 长度均 <= 6
+- `初步拓展词` 为可选字段,长度 <= 8(如果不产出则省略该字段)
+- 所有数组字段必须是字符串数组,不能为 `null`(空数组用 `[]`)
+- `建议精准词直搜` 和 `建议主题下钻` 必须是布尔值
+
+---
+
+## 📊 Harness: Observer — 观测与输出
+
+### 分析摘要(写入日志 / 传递给下游)
+
+```
+[DemandAnalysisObserver]
+  实质特征数          = {len(实质特征)}
+  形式特征数          = {len(形式特征)}
+  精准词候选数        = {len(精准词候选)}
+  主题下钻候选数      = {len(主题下钻候选)}
+  淘汰风险点数        = {len(淘汰风险点)}
+  历史反馈消费        = {True/False}(fallback_round >= 1)
+```
+
+---
+
+## 自检清单
+
+- ✅ 完成实质/形式 + 上层/下层双重标注
+- ✅ 只输出理解结果,未执行任何动作(零工具调用)
+- ✅ 未引入输入外的核心主题词
+- ✅ 精准词候选和主题下钻候选长度均 <= 6
+- ✅ 所有字段类型符合约束(数组/布尔值/字符串)

+ 6 - 0
tests/skills/output_schema.md

@@ -3,6 +3,12 @@ name: output_schema
 description: 微信文章搜索任务输出结构规范(文章+账号+关系)
 ---
 
+> **注意**:此文件仅作文档参考,未被代码引用。
+> 
+> `OutputPersistStage` 和 `OutputSchemaGate` 是纯代码驱动,不调用 LLM,不使用 skill。
+> 
+> 本文件保留作为输出格式的参考文档。
+
 ## 输出结果指南
 
 ### 输出目录(本地 JSON)