run_search_agent_workflow.md 14 KB

run_search_agent.py 完整工作流程

总览

main()
 ├─ ① validate_prerequisites()          — 前置检查
 ├─ ② 读取环境变量 (query, demand_id)
 ├─ ③ AgentBudget.from_env() + validate — 预算约束
 ├─ ④ uuid4() 生成全局 trace_id
 ├─ ⑤ print_run_plan()                  — 打印运行计划
 ├─ ⑥ run_with_harness()                — 核心执行(带超时)
 │    ├─ 策略加载 (DB / default fallback)
 │    ├─ 预算注入 (cap target_count)
 │    └─ asyncio.wait_for(core.run(), timeout)
 │         ├─ SearchAgentCore.run()
 │         │    ├─ 解析策略 (SearchAgentPolicy)
 │         │    ├─ 构建 PipelineContext
 │         │    └─ run_content_finder_pipeline(ctx)
 │         │         ├─ build_default_pipeline() — 组装 stages/gates/hooks
 │         │         └─ PipelineOrchestrator.run(ctx)
 │         │              ├─ Stage 1: DemandAnalysisStage
 │         │              ├─ Stage 2: ContentSearchStage
 │         │              │    └─ Gate: SearchCompletenessGate
 │         │              ├─ Stage 3: HardFilterStage
 │         │              ├─ Stage 4: CoarseFilterStage (LLM 标题粗筛)
 │         │              ├─ Stage 5: QualityFilterStage (正文精排)
 │         │              │    └─ Gate: FilterSufficiencyGate (可 fallback 到 Stage 2)
 │         │              ├─ Stage 6: AccountPrecipitateStage
 │         │              └─ Stage 7: OutputPersistStage
 │         │                   └─ Gate: OutputSchemaGate
 │         └─ 返回 PipelineContext
 ├─ ⑦ summary.log()                     — 结构化摘要
 └─ ⑧ exit code (0=成功, 1=失败)

详细流程

1. 入口:main()

文件:run_search_agent.py:284

asyncio.run(main())

① 前置检查 — Fallback Harness

validate_prerequisites() (line 173) 检查 OPEN_ROUTER_API_KEY 是否存在,缺失则立即抛出 EnvironmentError,快速失败。

② 读取运行参数

环境变量 默认值 用途
PIPELINE_QUERY "伊朗以色列冲突、中老年人会关注什么?" 搜索查询
PIPELINE_DEMAND_ID "1" 需求 ID(关联 DB 策略)

③ 预算约束 — Budget Harness

AgentBudget.from_env() (line 61) 从环境变量构建预算:

参数 环境变量 默认值 约束范围
timeout_seconds PIPELINE_TIMEOUT 1800 (30min) ≥ 30
max_target_count PIPELINE_MAX_TARGET_COUNT 10 [1, 200]
max_fallback_rounds PIPELINE_MAX_FALLBACK_ROUNDS 1 [0, 5]

budget.validate() 校验参数范围,不合法直接抛异常。

④ 生成全局 trace_id

trace_id = str(uuid4())

trace_id 贯穿整个运行周期:运行计划 → harness → core → pipeline context → 所有 stages/hooks/gates。

⑤ 运行计划 — Planner Harness

print_run_plan() (line 138) 在执行前打印结构化计划,包含 trace_id、query、各阶段目标与约束,使运行意图可审计。


2. 核心执行:run_with_harness()

文件:run_search_agent.py:193

2.1 策略加载

use_db_policy=True?
  ├─ Yes → core.load_policy(demand_id)
  │         ├─ 成功 → policy_source = "db"
  │         └─ 失败 → 降级为 SearchAgentPolicy.defaults(), policy_source = "default(fallback)"
  └─ No  → SearchAgentPolicy.defaults(), policy_source = "default"

策略加载链路:

  • SearchAgentCore.load_policy()SearchAgentPolicyRepository.load_policy() (src/domain/search/repository.py:20)
  • 查询 search_agent 库的 search_agent_strategy
  • 优先按 demand_id 查找,找不到则查 strategy_code='default'
  • 解析 config_json 字段,合并默认值,返回 SearchAgentPolicy

SearchAgentPolicy 关键字段 (src/domain/search/policy.py):

字段 默认值 含义
max_keywords 6 最大搜索关键词数
min_candidate_multiplier 2.0 候选目标 = target_count × 此值
near_enough_candidate_multiplier 1.2 "接近足够"的阈值倍数
filter_near_ratio 0.8 过滤后"接近足够"的比例
max_detail_fetch 30 最多获取详情的文章数
enable_llm_review True 是否启用 LLM 复评

2.2 预算注入

effective_target = min(runtime.target_count, budget.max_target_count)

RuntimePipelineConfig.from_env() (src/pipeline/config/pipeline_config.py) 读取:

环境变量 默认值
MODEL anthropic/claude-sonnet-4-5
PIPELINE_TEMPERATURE 0.2
PIPELINE_MAX_ITERATIONS 12
PIPELINE_TARGET_COUNT 10

Budget Harness 将 target_count 限制在 max_target_count 以内,防止无限扩张。

2.3 超时包裹执行

ctx = await asyncio.wait_for(
    core.run(query, demand_id, effective_target, ..., trace_id=trace_id),
    timeout=budget.timeout_seconds,
)

三种结果:

  • 正常返回 → 采集摘要,success=True
  • asyncio.TimeoutError → 记录超时错误,success=False
  • 其他异常 → 记录异常信息,success=False

3. 服务层:SearchAgentCore.run()

文件:src/domain/search/core.py:41

trace_id = trace_id or str(uuid4())   # 优先使用外部传入的 trace_id
ctx = PipelineContext(
    task_id=str(uuid4()),
    trace_id=trace_id,
    query=query,
    demand_id=demand_id,
    target_count=target_count,
    model=runtime.model,
    output_dir="tests/output",
    knowledge_sources=default_knowledge_sources(),
)
apply_search_agent_policy(ctx, policy)   # 策略写入 ctx.metadata
return await run_content_finder_pipeline(ctx)

default_knowledge_sources() (src/pipeline/runner.py) 加载静态知识:平台规则、受众画像等。


4. Pipeline 组装:build_default_pipeline()

文件:src/pipeline/runner.py:39

组装三大组件:

Stages(按顺序执行):

# Stage 职责
1 DemandAnalysisStage LLM 理解需求,产出搜索策略
2 ContentSearchStage 按关键词召回候选文章
3 HardFilterStage 去重 + URL/时间基础校验
4 CoarseFilterStage LLM 批量标题语义粗筛
5 QualityFilterStage 数据指标评分 + LLM 正文精排
6 AccountPrecipitateStage 账号信息聚合沉淀
7 OutputPersistStage 输出结构化 JSON

Gates(阶段后置检查):

Gate 挂载在 动作
SearchCompletenessGate Stage 2 之后 候选不足 → abort
FilterSufficiencyGate Stage 5 之后 不足 → fallback 到 Stage 2
OutputSchemaGate Stage 7 之后 结构校验

Hooks(观测层):

Hook 职责
TraceHook JSON 快照写入 logger
PipelineTraceHook JSONL 事件写入 tests/traces/{trace_id}/pipeline.jsonl
LiveProgressHook 终端实时进度展示
DatabasePersistHook MySQL 持久化(可选,失败不阻塞)

5. Pipeline 执行:PipelineOrchestrator.run()

文件:src/pipeline/orchestrator.py:43

on_pipeline_start(ctx)                    ← 所有 hooks 触发

for stage in stages:
    on_stage_start(ctx, stage)            ← hooks
    validate_input(ctx)                   ← 前置校验
    _execute_stage(ctx)                   ← 执行(带重试,max_stage_retries=1)
    checkpoint(ctx)                       ← 快照保存
    on_stage_complete(ctx, stage)         ← hooks

    if gate exists for this stage:
        result = gate.check(ctx)
        ├─ passed / proceed  → 继续下一个 stage
        ├─ retry_stage       → 重新执行当前 stage
        ├─ fallback          → 跳转到 fallback_stage(最多 1 轮)
        └─ abort             → 抛出异常,终止 pipeline

on_pipeline_complete(ctx)                 ← 所有 hooks 触发
return ctx

6. 各 Stage 详细逻辑

Stage 1: DemandAnalysisStage

文件:src/pipeline/stages/demand_analysis.py:19

  • 输入:ctx.query + ctx.knowledge_sources
  • 调用 StageAgentExecutor.run_json_stage(),加载 skill demand_analysis.md
  • LLM 输出 JSON,解析为 DemandAnalysisResult
    • substantive_features / formal_features — 内容特征
    • search_strategy.precise_keywords / topic_keywords — 搜索关键词
    • filter_focus.format_rules / relevance_focus — 过滤指导
  • 输出:写入 ctx.demand_analysis

Stage 2: ContentSearchStage

文件:src/pipeline/stages/content_search.py:42

两种模式:

  • Agent 模式(有 agent_executor):LLM 自主调用 weixin_search 工具,迭代搜索
  • Code 模式(无 agent_executor):按关键词列表逐个调用 WeixinToolAdapter.search()

流程:

  1. ctx.demand_analysis 提取关键词列表
  2. 逐关键词搜索,结果按 URL 去重
  3. 达到 target_count × min_candidate_multiplier 时停止
  4. 输出:ctx.candidate_articles + ctx.metadata["_search_keyword_stats"]

→ Gate: SearchCompletenessGate (src/pipeline/gates/search_completeness.py:18)

候选数 ≥ target × 2.0        → PASS
候选数 ≥ target × 1.2        → PASS (warning)
候选数 < target × 1.2        → ABORT

Stage 3: HardFilterStage

文件:src/pipeline/stages/content_filter.py:96

  • 按 URL 去重(保留首次出现)
  • 过滤掉:缺少 title/URL、非 HTTP URL、publish_time ≤ 0
  • 输出:更新 ctx.candidate_articles

Stage 4: CoarseFilterStage

文件:src/pipeline/stages/coarse_filter.py

  • 输入:ctx.candidate_articles(硬过滤后)
  • 将候选文章标题按批(每批 ~20 篇)+ query + demand_analysis 特征打包成 LLM prompt
  • 调用 StageAgentExecutor.run_simple_llm_json() 做语义相关性判断
  • 宽松原则:只淘汰明显不相关的标题,不确定时放行
  • LLM 调用失败时该批全部放行(fail-open)
  • 输出:更新 ctx.candidate_articles + ctx.metadata["_coarse_filter_log"]

Stage 5: QualityFilterStage

文件:src/pipeline/stages/content_filter.py:134

  1. 对每篇文章(上限 max_detail_fetch=30):
    • 调用 adapter.get_article_detail(url) 获取正文
    • 数据指标评分 _score_article():正文长度、阅读量、互动率 → interest 等级
    • 关键词匹配结果仅作为参考信息传给 LLM
    • LLM 复评决定最终 relevance(所有非 spam 文章都进 LLM 审核)
    • relevance ≠ "low" 的文章入选
  2. 按 (relevance, interest, view_count, publish_time) 降序排序
  3. 截断到 target_count
  4. 输出:ctx.filtered_articles + ctx.metadata["_quality_review_log"]

→ Gate: FilterSufficiencyGate (src/pipeline/gates/filter_sufficiency.py:21)

入选数 ≥ target_count              → PASS
入选数 ≥ target_count × 0.8        → PASS (warning)
已 fallback 过 1 次                → PASS (接受现有结果)
否则                               → FALLBACK 到 Stage 2 (content_search)

Fallback 时:跳回 Stage 2 重新搜索,跳过已用关键词,尝试新组合。最多 1 轮。

Stage 6: AccountPrecipitateStage

文件:src/pipeline/stages/account_precipitate.py:24

  • 遍历 ctx.filtered_articles
  • 调用 adapter.get_account(url) 获取公众号信息
  • wx_gh / account_name 去重聚合:
    • article_count(累加)
    • sample_articles(最多 5 篇标题)
    • source_urls(去重)
  • 建立 ArticleAccountRelation 映射
  • 输出:ctx.accounts + ctx.article_account_relations

Stage 7: OutputPersistStage

文件:src/pipeline/stages/output_persist.py:28

  • 构建 PipelineOutput
    • summary:candidate_count, filtered_in_count, account_count
    • contents:[{title, url, statistics, reason}, ...]
    • accounts:[{wx_gh, account_name, article_count, sample_articles}, ...]
    • article_account_relations:[{article_url, wx_gh}, ...]
  • 写入 tests/output/{trace_id}/output.json
  • 输出:ctx.output + ctx.metadata["output_file"]

→ Gate: OutputSchemaGate (src/pipeline/gates/output_schema.py:18)

校验:

  • candidate_count ≥ filtered_in_count
  • account_count == len(accounts)
  • filtered_in_count == len(contents)
  • 所有 relation 引用的 URL 和 wx_gh 均存在

7. 结果回传

Pipeline 执行完毕后,PipelineContext 沿调用链返回:

PipelineOrchestrator.run(ctx)
  → run_content_finder_pipeline(ctx) 返回 ctx
    → SearchAgentCore.run() 返回 ctx
      → run_with_harness() 采集 RunSummary
        → main() 打印摘要 + 设置退出码

RunSummary 采集的指标:

字段 来源
trace_id harness 层生成,贯穿全程
policy_source "db" / "default" / "default(fallback)"
candidate_count len(ctx.candidate_articles)
filtered_count len(ctx.filtered_articles)
account_count len(ctx.accounts)
elapsed_seconds time.monotonic() 计时
stage_history 各 stage 的 name/status/attempt
output_file ctx.metadata["output_file"]

8. 产出物

产出 路径 内容
结构化输出 tests/output/{trace_id}/output.json 文章列表 + 账号列表 + 关联关系
Pipeline 事件流 tests/traces/{trace_id}/pipeline.jsonl 每个 stage/gate 的 JSONL 事件
DB 记录 search_agent 库多张表 任务、阶段、候选、评分、账号(可选)
终端摘要 stdout RunSummary 结构化日志