# run_search_agent.py 完整工作流程 ## 总览 ``` main() ├─ ① validate_prerequisites() — 前置检查 ├─ ② 读取环境变量 (query, demand_id) ├─ ③ AgentBudget.from_env() + validate — 预算约束 ├─ ④ uuid4() 生成全局 trace_id ├─ ⑤ print_run_plan() — 打印运行计划 ├─ ⑥ run_with_harness() — 核心执行(带超时) │ ├─ 策略加载 (DB / default fallback) │ ├─ 预算注入 (cap target_count) │ └─ asyncio.wait_for(core.run(), timeout) │ ├─ SearchAgentCore.run() │ │ ├─ 解析策略 (SearchAgentPolicy) │ │ ├─ 构建 PipelineContext │ │ └─ run_content_finder_pipeline(ctx) │ │ ├─ build_default_pipeline() — 组装 stages/gates/hooks │ │ └─ PipelineOrchestrator.run(ctx) │ │ ├─ Stage 1: DemandAnalysisStage │ │ ├─ Stage 2: ContentSearchStage │ │ │ └─ Gate: SearchCompletenessGate │ │ ├─ Stage 3: HardFilterStage │ │ ├─ Stage 4: CoarseFilterStage (LLM 标题粗筛) │ │ ├─ Stage 5: QualityFilterStage (正文精排) │ │ │ └─ Gate: FilterSufficiencyGate (可 fallback 到 Stage 2) │ │ ├─ Stage 6: AccountPrecipitateStage │ │ └─ Stage 7: OutputPersistStage │ │ └─ Gate: OutputSchemaGate │ └─ 返回 PipelineContext ├─ ⑦ summary.log() — 结构化摘要 └─ ⑧ exit code (0=成功, 1=失败) ``` --- ## 详细流程 ### 1. 入口:`main()` > 文件:`run_search_agent.py:284` ``` asyncio.run(main()) ``` #### ① 前置检查 — Fallback Harness `validate_prerequisites()` (line 173) 检查 `OPEN_ROUTER_API_KEY` 是否存在,缺失则立即抛出 `EnvironmentError`,快速失败。 #### ② 读取运行参数 | 环境变量 | 默认值 | 用途 | |---------|--------|------| | `PIPELINE_QUERY` | `"伊朗以色列冲突、中老年人会关注什么?"` | 搜索查询 | | `PIPELINE_DEMAND_ID` | `"1"` | 需求 ID(关联 DB 策略) | #### ③ 预算约束 — Budget Harness `AgentBudget.from_env()` (line 61) 从环境变量构建预算: | 参数 | 环境变量 | 默认值 | 约束范围 | |------|---------|--------|---------| | `timeout_seconds` | `PIPELINE_TIMEOUT` | 1800 (30min) | ≥ 30 | | `max_target_count` | `PIPELINE_MAX_TARGET_COUNT` | 10 | [1, 200] | | `max_fallback_rounds` | `PIPELINE_MAX_FALLBACK_ROUNDS` | 1 | [0, 5] | `budget.validate()` 校验参数范围,不合法直接抛异常。 #### ④ 生成全局 trace_id ```python trace_id = str(uuid4()) ``` 此 `trace_id` 贯穿整个运行周期:运行计划 → harness → core → pipeline context → 所有 stages/hooks/gates。 #### ⑤ 运行计划 — Planner Harness `print_run_plan()` (line 138) 在执行前打印结构化计划,包含 trace_id、query、各阶段目标与约束,使运行意图可审计。 --- ### 2. 核心执行:`run_with_harness()` > 文件:`run_search_agent.py:193` #### 2.1 策略加载 ``` use_db_policy=True? ├─ Yes → core.load_policy(demand_id) │ ├─ 成功 → policy_source = "db" │ └─ 失败 → 降级为 SearchAgentPolicy.defaults(), policy_source = "default(fallback)" └─ No → SearchAgentPolicy.defaults(), policy_source = "default" ``` **策略加载链路:** - `SearchAgentCore.load_policy()` → `SearchAgentPolicyRepository.load_policy()` (`src/domain/search/repository.py:20`) - 查询 `search_agent` 库的 `search_agent_strategy` 表 - 优先按 `demand_id` 查找,找不到则查 `strategy_code='default'` - 解析 `config_json` 字段,合并默认值,返回 `SearchAgentPolicy` **SearchAgentPolicy 关键字段** (`src/domain/search/policy.py`): | 字段 | 默认值 | 含义 | |------|--------|------| | `max_keywords` | 6 | 最大搜索关键词数 | | `min_candidate_multiplier` | 2.0 | 候选目标 = target_count × 此值 | | `near_enough_candidate_multiplier` | 1.2 | "接近足够"的阈值倍数 | | `filter_near_ratio` | 0.8 | 过滤后"接近足够"的比例 | | `max_detail_fetch` | 30 | 最多获取详情的文章数 | | `enable_llm_review` | True | 是否启用 LLM 复评 | #### 2.2 预算注入 ```python effective_target = min(runtime.target_count, budget.max_target_count) ``` `RuntimePipelineConfig.from_env()` (`src/pipeline/config/pipeline_config.py`) 读取: | 环境变量 | 默认值 | |---------|--------| | `MODEL` | `anthropic/claude-sonnet-4-5` | | `PIPELINE_TEMPERATURE` | 0.2 | | `PIPELINE_MAX_ITERATIONS` | 12 | | `PIPELINE_TARGET_COUNT` | 10 | Budget Harness 将 `target_count` 限制在 `max_target_count` 以内,防止无限扩张。 #### 2.3 超时包裹执行 ```python ctx = await asyncio.wait_for( core.run(query, demand_id, effective_target, ..., trace_id=trace_id), timeout=budget.timeout_seconds, ) ``` 三种结果: - 正常返回 → 采集摘要,`success=True` - `asyncio.TimeoutError` → 记录超时错误,`success=False` - 其他异常 → 记录异常信息,`success=False` --- ### 3. 服务层:`SearchAgentCore.run()` > 文件:`src/domain/search/core.py:41` ```python trace_id = trace_id or str(uuid4()) # 优先使用外部传入的 trace_id ctx = PipelineContext( task_id=str(uuid4()), trace_id=trace_id, query=query, demand_id=demand_id, target_count=target_count, model=runtime.model, output_dir="tests/output", knowledge_sources=default_knowledge_sources(), ) apply_search_agent_policy(ctx, policy) # 策略写入 ctx.metadata return await run_content_finder_pipeline(ctx) ``` `default_knowledge_sources()` (`src/pipeline/runner.py`) 加载静态知识:平台规则、受众画像等。 --- ### 4. Pipeline 组装:`build_default_pipeline()` > 文件:`src/pipeline/runner.py:39` 组装三大组件: **Stages(按顺序执行):** | # | Stage | 职责 | |---|-------|------| | 1 | `DemandAnalysisStage` | LLM 理解需求,产出搜索策略 | | 2 | `ContentSearchStage` | 按关键词召回候选文章 | | 3 | `HardFilterStage` | 去重 + URL/时间基础校验 | | 4 | `CoarseFilterStage` | LLM 批量标题语义粗筛 | | 5 | `QualityFilterStage` | 数据指标评分 + LLM 正文精排 | | 6 | `AccountPrecipitateStage` | 账号信息聚合沉淀 | | 7 | `OutputPersistStage` | 输出结构化 JSON | **Gates(阶段后置检查):** | Gate | 挂载在 | 动作 | |------|--------|------| | `SearchCompletenessGate` | Stage 2 之后 | 候选不足 → abort | | `FilterSufficiencyGate` | Stage 5 之后 | 不足 → fallback 到 Stage 2 | | `OutputSchemaGate` | Stage 7 之后 | 结构校验 | **Hooks(观测层):** | Hook | 职责 | |------|------| | `TraceHook` | JSON 快照写入 logger | | `PipelineTraceHook` | JSONL 事件写入 `tests/traces/{trace_id}/pipeline.jsonl` | | `LiveProgressHook` | 终端实时进度展示 | | `DatabasePersistHook` | MySQL 持久化(可选,失败不阻塞) | --- ### 5. Pipeline 执行:`PipelineOrchestrator.run()` > 文件:`src/pipeline/orchestrator.py:43` ``` on_pipeline_start(ctx) ← 所有 hooks 触发 for stage in stages: on_stage_start(ctx, stage) ← hooks validate_input(ctx) ← 前置校验 _execute_stage(ctx) ← 执行(带重试,max_stage_retries=1) checkpoint(ctx) ← 快照保存 on_stage_complete(ctx, stage) ← hooks if gate exists for this stage: result = gate.check(ctx) ├─ passed / proceed → 继续下一个 stage ├─ retry_stage → 重新执行当前 stage ├─ fallback → 跳转到 fallback_stage(最多 1 轮) └─ abort → 抛出异常,终止 pipeline on_pipeline_complete(ctx) ← 所有 hooks 触发 return ctx ``` --- ### 6. 各 Stage 详细逻辑 #### Stage 1: DemandAnalysisStage > 文件:`src/pipeline/stages/demand_analysis.py:19` - 输入:`ctx.query` + `ctx.knowledge_sources` - 调用 `StageAgentExecutor.run_json_stage()`,加载 skill `demand_analysis.md` - LLM 输出 JSON,解析为 `DemandAnalysisResult`: - `substantive_features` / `formal_features` — 内容特征 - `search_strategy.precise_keywords` / `topic_keywords` — 搜索关键词 - `filter_focus.format_rules` / `relevance_focus` — 过滤指导 - 输出:写入 `ctx.demand_analysis` #### Stage 2: ContentSearchStage > 文件:`src/pipeline/stages/content_search.py:42` 两种模式: - **Agent 模式**(有 `agent_executor`):LLM 自主调用 `weixin_search` 工具,迭代搜索 - **Code 模式**(无 `agent_executor`):按关键词列表逐个调用 `WeixinToolAdapter.search()` 流程: 1. 从 `ctx.demand_analysis` 提取关键词列表 2. 逐关键词搜索,结果按 URL 去重 3. 达到 `target_count × min_candidate_multiplier` 时停止 4. 输出:`ctx.candidate_articles` + `ctx.metadata["_search_keyword_stats"]` **→ Gate: SearchCompletenessGate** (`src/pipeline/gates/search_completeness.py:18`) ``` 候选数 ≥ target × 2.0 → PASS 候选数 ≥ target × 1.2 → PASS (warning) 候选数 < target × 1.2 → ABORT ``` #### Stage 3: HardFilterStage > 文件:`src/pipeline/stages/content_filter.py:96` - 按 URL 去重(保留首次出现) - 过滤掉:缺少 title/URL、非 HTTP URL、publish_time ≤ 0 - 输出:更新 `ctx.candidate_articles` #### Stage 4: CoarseFilterStage > 文件:`src/pipeline/stages/coarse_filter.py` - 输入:`ctx.candidate_articles`(硬过滤后) - 将候选文章标题按批(每批 ~20 篇)+ query + demand_analysis 特征打包成 LLM prompt - 调用 `StageAgentExecutor.run_simple_llm_json()` 做语义相关性判断 - 宽松原则:只淘汰明显不相关的标题,不确定时放行 - LLM 调用失败时该批全部放行(fail-open) - 输出:更新 `ctx.candidate_articles` + `ctx.metadata["_coarse_filter_log"]` #### Stage 5: QualityFilterStage > 文件:`src/pipeline/stages/content_filter.py:134` 1. 对每篇文章(上限 `max_detail_fetch=30`): - 调用 `adapter.get_article_detail(url)` 获取正文 - 数据指标评分 `_score_article()`:正文长度、阅读量、互动率 → interest 等级 - 关键词匹配结果仅作为参考信息传给 LLM - LLM 复评决定最终 relevance(所有非 spam 文章都进 LLM 审核) - relevance ≠ "low" 的文章入选 2. 按 (relevance, interest, view_count, publish_time) 降序排序 3. 截断到 `target_count` 4. 输出:`ctx.filtered_articles` + `ctx.metadata["_quality_review_log"]` **→ Gate: FilterSufficiencyGate** (`src/pipeline/gates/filter_sufficiency.py:21`) ``` 入选数 ≥ target_count → PASS 入选数 ≥ target_count × 0.8 → PASS (warning) 已 fallback 过 1 次 → PASS (接受现有结果) 否则 → FALLBACK 到 Stage 2 (content_search) ``` Fallback 时:跳回 Stage 2 重新搜索,跳过已用关键词,尝试新组合。最多 1 轮。 #### Stage 6: AccountPrecipitateStage > 文件:`src/pipeline/stages/account_precipitate.py:24` - 遍历 `ctx.filtered_articles` - 调用 `adapter.get_account(url)` 获取公众号信息 - 按 `wx_gh` / `account_name` 去重聚合: - `article_count`(累加) - `sample_articles`(最多 5 篇标题) - `source_urls`(去重) - 建立 `ArticleAccountRelation` 映射 - 输出:`ctx.accounts` + `ctx.article_account_relations` #### Stage 7: OutputPersistStage > 文件:`src/pipeline/stages/output_persist.py:28` - 构建 `PipelineOutput`: - `summary`:candidate_count, filtered_in_count, account_count - `contents`:[{title, url, statistics, reason}, ...] - `accounts`:[{wx_gh, account_name, article_count, sample_articles}, ...] - `article_account_relations`:[{article_url, wx_gh}, ...] - 写入 `tests/output/{trace_id}/output.json` - 输出:`ctx.output` + `ctx.metadata["output_file"]` **→ Gate: OutputSchemaGate** (`src/pipeline/gates/output_schema.py:18`) 校验: - `candidate_count ≥ filtered_in_count` - `account_count == len(accounts)` - `filtered_in_count == len(contents)` - 所有 relation 引用的 URL 和 wx_gh 均存在 --- ### 7. 结果回传 Pipeline 执行完毕后,`PipelineContext` 沿调用链返回: ``` PipelineOrchestrator.run(ctx) → run_content_finder_pipeline(ctx) 返回 ctx → SearchAgentCore.run() 返回 ctx → run_with_harness() 采集 RunSummary → main() 打印摘要 + 设置退出码 ``` `RunSummary` 采集的指标: | 字段 | 来源 | |------|------| | `trace_id` | harness 层生成,贯穿全程 | | `policy_source` | "db" / "default" / "default(fallback)" | | `candidate_count` | `len(ctx.candidate_articles)` | | `filtered_count` | `len(ctx.filtered_articles)` | | `account_count` | `len(ctx.accounts)` | | `elapsed_seconds` | `time.monotonic()` 计时 | | `stage_history` | 各 stage 的 name/status/attempt | | `output_file` | `ctx.metadata["output_file"]` | --- ### 8. 产出物 | 产出 | 路径 | 内容 | |------|------|------| | 结构化输出 | `tests/output/{trace_id}/output.json` | 文章列表 + 账号列表 + 关联关系 | | Pipeline 事件流 | `tests/traces/{trace_id}/pipeline.jsonl` | 每个 stage/gate 的 JSONL 事件 | | DB 记录 | `search_agent` 库多张表 | 任务、阶段、候选、评分、账号(可选) | | 终端摘要 | stdout | RunSummary 结构化日志 |