main()
├─ ① validate_prerequisites() — 前置检查
├─ ② 读取环境变量 (query, demand_id)
├─ ③ AgentBudget.from_env() + validate — 预算约束
├─ ④ uuid4() 生成全局 trace_id
├─ ⑤ print_run_plan() — 打印运行计划
├─ ⑥ run_with_harness() — 核心执行(带超时)
│ ├─ 策略加载 (DB / default fallback)
│ ├─ 预算注入 (cap target_count)
│ └─ asyncio.wait_for(core.run(), timeout)
│ ├─ SearchAgentCore.run()
│ │ ├─ 解析策略 (SearchAgentPolicy)
│ │ ├─ 构建 PipelineContext
│ │ └─ run_content_finder_pipeline(ctx)
│ │ ├─ build_default_pipeline() — 组装 stages/gates/hooks
│ │ └─ PipelineOrchestrator.run(ctx)
│ │ ├─ Stage 1: DemandAnalysisStage
│ │ ├─ Stage 2: ContentSearchStage
│ │ │ └─ Gate: SearchCompletenessGate
│ │ ├─ Stage 3: HardFilterStage
│ │ ├─ Stage 4: CoarseFilterStage (LLM 标题粗筛)
│ │ ├─ Stage 5: QualityFilterStage (正文精排)
│ │ │ └─ Gate: FilterSufficiencyGate (可 fallback 到 Stage 2)
│ │ ├─ Stage 6: AccountPrecipitateStage
│ │ └─ Stage 7: OutputPersistStage
│ │ └─ Gate: OutputSchemaGate
│ └─ 返回 PipelineContext
├─ ⑦ summary.log() — 结构化摘要
└─ ⑧ exit code (0=成功, 1=失败)
main()文件:
run_search_agent.py:284
asyncio.run(main())
validate_prerequisites() (line 173) 检查 OPEN_ROUTER_API_KEY 是否存在,缺失则立即抛出 EnvironmentError,快速失败。
| 环境变量 | 默认值 | 用途 |
|---|---|---|
PIPELINE_QUERY |
"伊朗以色列冲突、中老年人会关注什么?" |
搜索查询 |
PIPELINE_DEMAND_ID |
"1" |
需求 ID(关联 DB 策略) |
AgentBudget.from_env() (line 61) 从环境变量构建预算:
| 参数 | 环境变量 | 默认值 | 约束范围 |
|---|---|---|---|
timeout_seconds |
PIPELINE_TIMEOUT |
1800 (30min) | ≥ 30 |
max_target_count |
PIPELINE_MAX_TARGET_COUNT |
10 | [1, 200] |
max_fallback_rounds |
PIPELINE_MAX_FALLBACK_ROUNDS |
1 | [0, 5] |
budget.validate() 校验参数范围,不合法直接抛异常。
trace_id = str(uuid4())
此 trace_id 贯穿整个运行周期:运行计划 → harness → core → pipeline context → 所有 stages/hooks/gates。
print_run_plan() (line 138) 在执行前打印结构化计划,包含 trace_id、query、各阶段目标与约束,使运行意图可审计。
run_with_harness()文件:
run_search_agent.py:193
use_db_policy=True?
├─ Yes → core.load_policy(demand_id)
│ ├─ 成功 → policy_source = "db"
│ └─ 失败 → 降级为 SearchAgentPolicy.defaults(), policy_source = "default(fallback)"
└─ No → SearchAgentPolicy.defaults(), policy_source = "default"
策略加载链路:
SearchAgentCore.load_policy() → SearchAgentPolicyRepository.load_policy() (src/domain/search/repository.py:20)search_agent 库的 search_agent_strategy 表demand_id 查找,找不到则查 strategy_code='default'config_json 字段,合并默认值,返回 SearchAgentPolicySearchAgentPolicy 关键字段 (src/domain/search/policy.py):
| 字段 | 默认值 | 含义 |
|---|---|---|
max_keywords |
6 | 最大搜索关键词数 |
min_candidate_multiplier |
2.0 | 候选目标 = target_count × 此值 |
near_enough_candidate_multiplier |
1.2 | "接近足够"的阈值倍数 |
filter_near_ratio |
0.8 | 过滤后"接近足够"的比例 |
max_detail_fetch |
30 | 最多获取详情的文章数 |
enable_llm_review |
True | 是否启用 LLM 复评 |
effective_target = min(runtime.target_count, budget.max_target_count)
RuntimePipelineConfig.from_env() (src/pipeline/config/pipeline_config.py) 读取:
| 环境变量 | 默认值 |
|---|---|
MODEL |
anthropic/claude-sonnet-4-5 |
PIPELINE_TEMPERATURE |
0.2 |
PIPELINE_MAX_ITERATIONS |
12 |
PIPELINE_TARGET_COUNT |
10 |
Budget Harness 将 target_count 限制在 max_target_count 以内,防止无限扩张。
ctx = await asyncio.wait_for(
core.run(query, demand_id, effective_target, ..., trace_id=trace_id),
timeout=budget.timeout_seconds,
)
三种结果:
success=Trueasyncio.TimeoutError → 记录超时错误,success=Falsesuccess=FalseSearchAgentCore.run()文件:
src/domain/search/core.py:41
trace_id = trace_id or str(uuid4()) # 优先使用外部传入的 trace_id
ctx = PipelineContext(
task_id=str(uuid4()),
trace_id=trace_id,
query=query,
demand_id=demand_id,
target_count=target_count,
model=runtime.model,
output_dir="tests/output",
knowledge_sources=default_knowledge_sources(),
)
apply_search_agent_policy(ctx, policy) # 策略写入 ctx.metadata
return await run_content_finder_pipeline(ctx)
default_knowledge_sources() (src/pipeline/runner.py) 加载静态知识:平台规则、受众画像等。
build_default_pipeline()文件:
src/pipeline/runner.py:39
组装三大组件:
Stages(按顺序执行):
| # | Stage | 职责 |
|---|---|---|
| 1 | DemandAnalysisStage |
LLM 理解需求,产出搜索策略 |
| 2 | ContentSearchStage |
按关键词召回候选文章 |
| 3 | HardFilterStage |
去重 + URL/时间基础校验 |
| 4 | CoarseFilterStage |
LLM 批量标题语义粗筛 |
| 5 | QualityFilterStage |
数据指标评分 + LLM 正文精排 |
| 6 | AccountPrecipitateStage |
账号信息聚合沉淀 |
| 7 | OutputPersistStage |
输出结构化 JSON |
Gates(阶段后置检查):
| Gate | 挂载在 | 动作 |
|---|---|---|
SearchCompletenessGate |
Stage 2 之后 | 候选不足 → abort |
FilterSufficiencyGate |
Stage 5 之后 | 不足 → fallback 到 Stage 2 |
OutputSchemaGate |
Stage 7 之后 | 结构校验 |
Hooks(观测层):
| Hook | 职责 |
|---|---|
TraceHook |
JSON 快照写入 logger |
PipelineTraceHook |
JSONL 事件写入 tests/traces/{trace_id}/pipeline.jsonl |
LiveProgressHook |
终端实时进度展示 |
DatabasePersistHook |
MySQL 持久化(可选,失败不阻塞) |
PipelineOrchestrator.run()文件:
src/pipeline/orchestrator.py:43
on_pipeline_start(ctx) ← 所有 hooks 触发
for stage in stages:
on_stage_start(ctx, stage) ← hooks
validate_input(ctx) ← 前置校验
_execute_stage(ctx) ← 执行(带重试,max_stage_retries=1)
checkpoint(ctx) ← 快照保存
on_stage_complete(ctx, stage) ← hooks
if gate exists for this stage:
result = gate.check(ctx)
├─ passed / proceed → 继续下一个 stage
├─ retry_stage → 重新执行当前 stage
├─ fallback → 跳转到 fallback_stage(最多 1 轮)
└─ abort → 抛出异常,终止 pipeline
on_pipeline_complete(ctx) ← 所有 hooks 触发
return ctx
文件:
src/pipeline/stages/demand_analysis.py:19
ctx.query + ctx.knowledge_sourcesStageAgentExecutor.run_json_stage(),加载 skill demand_analysis.mdDemandAnalysisResult:
substantive_features / formal_features — 内容特征search_strategy.precise_keywords / topic_keywords — 搜索关键词filter_focus.format_rules / relevance_focus — 过滤指导ctx.demand_analysis文件:
src/pipeline/stages/content_search.py:42
两种模式:
agent_executor):LLM 自主调用 weixin_search 工具,迭代搜索agent_executor):按关键词列表逐个调用 WeixinToolAdapter.search()流程:
ctx.demand_analysis 提取关键词列表target_count × min_candidate_multiplier 时停止ctx.candidate_articles + ctx.metadata["_search_keyword_stats"]→ Gate: SearchCompletenessGate (src/pipeline/gates/search_completeness.py:18)
候选数 ≥ target × 2.0 → PASS
候选数 ≥ target × 1.2 → PASS (warning)
候选数 < target × 1.2 → ABORT
文件:
src/pipeline/stages/content_filter.py:96
ctx.candidate_articles文件:
src/pipeline/stages/coarse_filter.py
ctx.candidate_articles(硬过滤后)StageAgentExecutor.run_simple_llm_json() 做语义相关性判断ctx.candidate_articles + ctx.metadata["_coarse_filter_log"]文件:
src/pipeline/stages/content_filter.py:134
max_detail_fetch=30):
adapter.get_article_detail(url) 获取正文_score_article():正文长度、阅读量、互动率 → interest 等级target_countctx.filtered_articles + ctx.metadata["_quality_review_log"]→ Gate: FilterSufficiencyGate (src/pipeline/gates/filter_sufficiency.py:21)
入选数 ≥ target_count → PASS
入选数 ≥ target_count × 0.8 → PASS (warning)
已 fallback 过 1 次 → PASS (接受现有结果)
否则 → FALLBACK 到 Stage 2 (content_search)
Fallback 时:跳回 Stage 2 重新搜索,跳过已用关键词,尝试新组合。最多 1 轮。
文件:
src/pipeline/stages/account_precipitate.py:24
ctx.filtered_articlesadapter.get_account(url) 获取公众号信息wx_gh / account_name 去重聚合:
article_count(累加)sample_articles(最多 5 篇标题)source_urls(去重)ArticleAccountRelation 映射ctx.accounts + ctx.article_account_relations文件:
src/pipeline/stages/output_persist.py:28
PipelineOutput:
summary:candidate_count, filtered_in_count, account_countcontents:[{title, url, statistics, reason}, ...]accounts:[{wx_gh, account_name, article_count, sample_articles}, ...]article_account_relations:[{article_url, wx_gh}, ...]tests/output/{trace_id}/output.jsonctx.output + ctx.metadata["output_file"]→ Gate: OutputSchemaGate (src/pipeline/gates/output_schema.py:18)
校验:
candidate_count ≥ filtered_in_countaccount_count == len(accounts)filtered_in_count == len(contents)Pipeline 执行完毕后,PipelineContext 沿调用链返回:
PipelineOrchestrator.run(ctx)
→ run_content_finder_pipeline(ctx) 返回 ctx
→ SearchAgentCore.run() 返回 ctx
→ run_with_harness() 采集 RunSummary
→ main() 打印摘要 + 设置退出码
RunSummary 采集的指标:
| 字段 | 来源 |
|---|---|
trace_id |
harness 层生成,贯穿全程 |
policy_source |
"db" / "default" / "default(fallback)" |
candidate_count |
len(ctx.candidate_articles) |
filtered_count |
len(ctx.filtered_articles) |
account_count |
len(ctx.accounts) |
elapsed_seconds |
time.monotonic() 计时 |
stage_history |
各 stage 的 name/status/attempt |
output_file |
ctx.metadata["output_file"] |
| 产出 | 路径 | 内容 |
|---|---|---|
| 结构化输出 | tests/output/{trace_id}/output.json |
文章列表 + 账号列表 + 关联关系 |
| Pipeline 事件流 | tests/traces/{trace_id}/pipeline.jsonl |
每个 stage/gate 的 JSONL 事件 |
| DB 记录 | search_agent 库多张表 |
任务、阶段、候选、评分、账号(可选) |
| 终端摘要 | stdout | RunSummary 结构化日志 |