本文提供 LongArticleSearchAgent 的全局架构视图,涵盖从 CLI 入口到外部 API 调用的全部分层、数据流与关键组件。
flowchart TB
subgraph Entry["入口层"]
RSA["run_search_agent.py<br/>(Harness 模式)"]
RP["run_pipeline.py<br/>(简易 CLI)"]
APP["app.py<br/>(Web API / Quart)"]
end
subgraph Domain["领域层 src/domain"]
SAC["SearchAgentCore"]
SAP["SearchAgentPolicy"]
SAPR["SearchAgentPolicyRepository"]
end
subgraph Pipeline["编排层 src/pipeline"]
PO["PipelineOrchestrator"]
CTX["PipelineContext"]
STAGES["Stages × 7"]
GATES["QualityGates × 3"]
HOOKS["Hooks × 4"]
end
subgraph AgentCore["Agent 内核 agent/"]
AR["AgentRunner"]
LLM["OpenRouter LLM"]
TR["Trace / Message"]
TOOLS["Tool Registry"]
SK["Skill System"]
end
subgraph Adapters["适配器层"]
WA["WeixinToolAdapter"]
KS["KnowledgeSource"]
end
subgraph Infra["基础设施层 src/infra"]
HTTP["AsyncHttpClient"]
MYSQL["AsyncMySQLPool"]
LOG["Logging / Trace"]
end
subgraph External["外部服务"]
WXAPI["微信搜索 API<br/>crawler-cn.aiddit.com"]
ORAPI["OpenRouter API"]
DB[("MySQL<br/>search_agent / supply_*")]
end
RSA --> SAC
RP --> PO
APP --> SAC
SAC --> SAPR
SAPR --> MYSQL
SAC --> PO
PO --> CTX
PO --> STAGES
PO --> GATES
PO --> HOOKS
STAGES -->|"DemandAnalysis<br/>CoarseFilter<br/>QualityFilter"| AR
AR --> LLM
AR --> TR
AR --> TOOLS
AR --> SK
STAGES --> WA
STAGES --> KS
WA --> HTTP
HTTP --> WXAPI
LLM --> ORAPI
HOOKS -->|"DatabasePersistHook"| MYSQL
HOOKS -->|"PipelineTraceHook"| LOG
MYSQL --> DB
采用 粗筛 + 精排 两阶段过滤架构:先用标题做 LLM 语义粗筛,再对通过的文章拉取正文做 LLM 精排。
flowchart LR
S1["1. demand_analysis<br/>🔍 需求理解与特征分层"]
S2["2. content_search<br/>📡 按策略搜索候选文章"]
G1{"SearchCompleteness<br/>Gate 🚦"}
S3["3. hard_filter<br/>🧹 去重与基础规则过滤"]
S4["4. coarse_filter<br/>🏷️ LLM 标题语义粗筛"]
S5["5. quality_filter<br/>⭐ 正文精排(数据 + LLM)"]
G2{"FilterSufficiency<br/>Gate 🚦"}
S6["6. account_precipitate<br/>👤 基于文章聚合公众号"]
S7["7. output_persist<br/>💾 生成标准输出并落盘"]
G3{"OutputSchema<br/>Gate 🚦"}
S1 --> S2 --> G1 --> S3 --> S4 --> S5 --> G2 --> S6 --> S7 --> G3
G2 -->|"fallback: 候选不足"| S2
G1 -->|"abort: 严重不足"| ABORT["Pipeline 终止"]
G3 -->|"abort: Schema 校验失败"| ABORT
| 维度 | 粗筛 (CoarseFilter) | 精排 (QualityFilter) |
|---|---|---|
| 输入 | 文章标题 + 来源关键词 | 文章正文(detail API 拉取) |
| 方法 | LLM 批量语义判断(每批 ~20 篇) | 数据指标评分 + LLM 逐篇复评 |
| 目标 | 快速淘汰明显不相关的文章 | 精确判定相关性 + 兴趣度 |
| 原则 | 宽进:不确定就放行 | 准确:LLM 基于正文做最终判定 |
| 性能优势 | 减少后续 detail API 调用量 | — |
flowchart TD
subgraph Input["输入"]
Q["query + demand_id + target_count"]
end
subgraph S1["demand_analysis"]
DA["DemandAnalysisResult<br/>─ 实质/形式/上层/下层特征<br/>─ 精准词 + 主题词<br/>─ 筛选关注点"]
end
subgraph S2["content_search"]
CA["CandidateArticle[]<br/>─ title, url, publish_time<br/>─ view/like/share count<br/>─ source_keyword"]
end
subgraph S3["hard_filter"]
CA2["CandidateArticle[]<br/>(去重 + 规则过滤后)"]
end
subgraph S4["coarse_filter"]
CA3["CandidateArticle[]<br/>(LLM 标题粗筛后)<br/>+ metadata._coarse_filter_log"]
end
subgraph S5["quality_filter"]
FA["FilteredArticle[]<br/>─ relevance/interest level<br/>─ body_text, reason<br/>─ phase(heuristic/LLM)"]
end
subgraph S6["account_precipitate"]
ACC["AccountInfo[]<br/>─ account_name, wx_gh<br/>─ article_count<br/>─ sample_articles"]
REL["article_account_relations"]
end
subgraph S7["output_persist"]
OUT["PipelineOutput → JSON 文件<br/>tests/output/{trace_id}/output.json"]
end
Q --> S1
S1 -->|"ctx.demand_analysis"| S2
S2 -->|"ctx.candidate_articles"| S3
S3 -->|"ctx.candidate_articles"| S4
S4 -->|"ctx.candidate_articles (筛减后)"| S5
S5 -->|"ctx.filtered_articles"| S6
S6 -->|"ctx.accounts<br/>ctx.article_account_relations"| S7
| 入口 | 文件 | 说明 |
|---|---|---|
| Harness CLI | run_search_agent.py |
带预算控制(AgentBudget)、观测(Observer)、回退策略的生产入口 |
| 简易 CLI | run_pipeline.py |
无 DB 策略,硬编码 query,开发调试用 |
| Web API | app.py |
Quart 服务,REST 端点调用 SearchAgentCore |
src/domain/search/)classDiagram
class SearchAgentCore {
+run(query, demand_id, ...) RunSummary
-_load_policy(demand_id) SearchAgentPolicy
-_build_pipeline(ctx) PipelineOrchestrator
}
class SearchAgentPolicy {
+max_keywords: int
+keyword_priority: str
+min_candidate_multiplier: float
+enable_llm_review: bool
+extra_keywords: list
}
class SearchAgentPolicyRepository {
+load(demand_id) SearchAgentPolicy
}
SearchAgentCore --> SearchAgentPolicyRepository : 加载策略
SearchAgentCore --> SearchAgentPolicy : 应用到 ctx
SearchAgentPolicyRepository --> SearchAgentPolicy : 返回
src/pipeline/)classDiagram
class PipelineOrchestrator {
+run(ctx) PipelineContext
-_run_stage(stage, ctx)
-_resolve_gate(gate, result, ctx)
}
class PipelineContext {
+task_id: str
+trace_id: str
+query: str
+demand_analysis: DemandAnalysisResult
+candidate_articles: list
+filtered_articles: list
+accounts: list
+stage_history: list
+metadata: dict
}
class Stage {
<<abstract>>
+execute(ctx)*
+validate_input(ctx)*
+on_retry(ctx, attempt)
}
class QualityGate {
<<abstract>>
+check(ctx)* GateResult
}
class PipelineHook {
<<abstract>>
+on_pipeline_start(ctx)
+on_stage_start(name, ctx)
+on_stage_complete(name, ctx)
+on_gate_check(name, result, ctx)
+on_error(name, error, ctx)
+on_pipeline_complete(ctx)
}
PipelineOrchestrator --> PipelineContext
PipelineOrchestrator --> Stage
PipelineOrchestrator --> QualityGate
PipelineOrchestrator --> PipelineHook
| # | Stage | 类型 | 关键依赖 | 写入 ctx |
|---|---|---|---|---|
| 1 | DemandAnalysisStage |
LLM (AgentRunner) | StageAgentExecutor |
demand_analysis |
| 2 | ContentSearchStage |
API 调用 | WeixinToolAdapter.search |
candidate_articles |
| 3 | HardFilterStage |
纯代码 | — | candidate_articles (清洗后) |
| 4 | CoarseFilterStage |
LLM 批量判断 | StageAgentExecutor.run_simple_llm_json |
candidate_articles (粗筛后), metadata["_coarse_filter_log"] |
| 5 | QualityFilterStage |
API + LLM | WeixinToolAdapter.get_article_detail |
filtered_articles |
| 6 | AccountPrecipitateStage |
API 调用 | WeixinToolAdapter.get_account |
accounts, article_account_relations |
| 7 | OutputPersistStage |
纯代码 | — | output, metadata["output_file"] |
run_simple_llm_json() 做语义相关性判断pass(通过)或 reject(淘汰),附带简短理由ctx.metadata["_coarse_filter_log"]| Gate | 挂载阶段后 | 通过条件 | 失败动作 |
|---|---|---|---|
SearchCompletenessGate |
content_search |
候选 >= target * multiplier | abort |
FilterSufficiencyGate |
quality_filter |
入选 >= target | fallback → content_search |
OutputSchemaGate |
output_persist |
JSON Schema 合法 | abort |
| Hook | 路径 | 职责 |
|---|---|---|
PipelineTraceHook |
hooks/pipeline_trace_hook.py |
JSONL 事件流落盘(含 decisions 决策数据),供 HTML 可视化 |
LiveProgressHook |
hooks/live_progress_hook.py |
终端实时进度打印 |
DatabasePersistHook |
hooks/db_hook.py |
写入 MySQL supply_* 表(可选) |
TraceHook |
hooks/trace_hook.py |
结构化日志 |
Pipeline 中需要 LLM 能力的阶段通过 StageAgentExecutor 桥接到 agent/ 框架:
sequenceDiagram
participant Stage as Pipeline Stage
participant SAE as StageAgentExecutor
participant AR as AgentRunner
participant LLM as OpenRouter API
participant TS as FileSystemTraceStore
Stage->>SAE: run_json_stage / run_simple_llm_json
SAE->>AR: run(system_prompt, tools, config)
loop Agent Loop (仅 run_json_stage)
AR->>LLM: chat completion
LLM-->>AR: response (text / tool_call)
AR->>AR: execute tool if needed
end
AR->>TS: persist trace
AR-->>SAE: final assistant message
SAE->>SAE: extract JSON from response
SAE-->>Stage: parsed result
涉及 LLM 的阶段:
DemandAnalysisStage:将 query 分解为特征分层 + 搜索策略(run_json_stage,带工具调用)CoarseFilterStage:批量标题语义相关性判断(run_simple_llm_json,单轮无工具)QualityFilterStage(enable_llm_review=True 时):基于正文的 LLM 精排复评(run_simple_llm_json)flowchart LR
subgraph Adapter["WeixinToolAdapter"]
A1["search(keyword)"]
A2["get_article_detail(url)"]
A3["get_account(url)"]
end
subgraph Tools["tests/tools/weixin_tools.py"]
T1["weixin_search()"]
T2["fetch_article_detail()"]
T3["fetch_weixin_account()"]
end
subgraph API["crawler-cn.aiddit.com"]
E1["/crawler/wei_xin/keyword"]
E2["/crawler/wei_xin/detail"]
E3["/crawler/wei_xin/account"]
end
A1 --> T1 --> E1
A2 --> T2 --> E2
A3 --> T3 --> E3
flowchart LR
PO["PipelineOrchestrator"]
subgraph Hooks["Hook 回调"]
H1["PipelineTraceHook"]
H2["LiveProgressHook"]
H3["DatabasePersistHook"]
end
subgraph Output["产物"]
JSONL["tests/traces/{trace_id}/pipeline.jsonl"]
HTML["pipeline_trace.html"]
TERM["终端 stdout"]
DBOUT[("MySQL supply_* 表")]
end
PO --> H1 --> JSONL
JSONL -->|"pipeline_visualize.py"| HTML
PO --> H2 --> TERM
PO --> H3 --> DBOUT
JSONL 事件类型:init → stage_start → stage_complete(含 decisions 决策详情)→ gate_check → error → complete
HTML 可视化:pipeline_visualize.py 读取 JSONL 生成深色主题 Timeline 页面,每个阶段的决策数据渲染为可折叠的 <details> 卡片。可视化内容包括:
flowchart TD
ENV[".env 环境变量"]
subgraph Configs["配置类"]
MC["LongArticlesSearchAgentConfig<br/>(Pydantic BaseSettings)"]
RPC["RuntimePipelineConfig"]
AB["AgentBudget"]
SAMC["SearchAgentMySQLConfig"]
end
ENV --> MC
ENV --> RPC
ENV --> AB
MC --> SAMC
MC -->|"search_agent_db"| SAMC
RPC -->|"model / temperature<br/>target_count / max_iterations"| PO["PipelineOrchestrator"]
AB -->|"timeout / max_fallback_rounds"| RSA["run_search_agent.py"]
SAMC -->|"host/port/db"| MYSQL["AsyncMySQLPool"]
| 变量 | 默认值 | 说明 |
|---|---|---|
OPEN_ROUTER_API_KEY |
必填 | OpenRouter 调用密钥 |
MODEL |
anthropic/claude-sonnet-4-5 |
LLM 模型 |
PIPELINE_TARGET_COUNT |
10 |
目标文章数 |
PIPELINE_TEMPERATURE |
0.2 |
LLM 温度 |
PIPELINE_MAX_ITERATIONS |
12 |
单阶段 Agent 最大轮次 |
SEARCH_AGENT_DB_HOST / PORT / DB |
可选 | MySQL 连接(策略读取 + 结果持久化) |
erDiagram
search_agent_strategy {
int id PK
varchar demand_id
varchar strategy_code
json config_json
int version
boolean enabled
}
supply_task {
varchar task_id PK
varchar query
varchar demand_id
varchar status
timestamp created_at
}
supply_candidate_content {
int id PK
varchar task_id FK
varchar title
varchar url
varchar source_keyword
}
supply_content_score {
int id PK
int candidate_id FK
varchar relevance_level
varchar interest_level
text reason
}
supply_account_profile {
int id PK
varchar task_id FK
varchar account_name
varchar wx_gh
int article_count
}
search_agent_strategy ||--o{ supply_task : "策略驱动"
supply_task ||--o{ supply_candidate_content : "候选"
supply_candidate_content ||--o| supply_content_score : "评分"
supply_task ||--o{ supply_account_profile : "账号"
| 分层 | 文件 | 说明 |
|---|---|---|
| 入口 | run_search_agent.py |
Harness 生产入口 |
run_pipeline.py |
简易 CLI | |
app.py |
Web API | |
| 领域 | src/domain/search/core.py |
SearchAgentCore |
src/domain/search/policy.py |
SearchAgentPolicy 策略模型 |
|
src/domain/search/repository.py |
策略 DB 加载 | |
| 编排 | src/pipeline/orchestrator.py |
PipelineOrchestrator |
src/pipeline/base.py |
Stage / Gate / Hook 抽象基类 | |
src/pipeline/context.py |
PipelineContext 与数据结构 |
|
src/pipeline/runner.py |
Pipeline 构建器 | |
src/pipeline/config/pipeline_config.py |
RuntimePipelineConfig |
|
| 阶段 | src/pipeline/stages/demand_analysis.py |
需求分析 |
src/pipeline/stages/content_search.py |
内容召回 | |
src/pipeline/stages/content_filter.py |
硬规则 + 质量精排 | |
src/pipeline/stages/coarse_filter.py |
LLM 标题粗筛 | |
src/pipeline/stages/account_precipitate.py |
账号沉淀 | |
src/pipeline/stages/output_persist.py |
输出落盘 | |
src/pipeline/stages/common.py |
StageAgentExecutor (LLM 桥接) |
|
| 门禁 | src/pipeline/gates/search_completeness.py |
召回充分性 |
src/pipeline/gates/filter_sufficiency.py |
筛选充分性 | |
src/pipeline/gates/output_schema.py |
输出 Schema 校验 | |
| Hook | src/pipeline/hooks/pipeline_trace_hook.py |
JSONL 追踪 + 决策落盘 |
src/pipeline/hooks/live_progress_hook.py |
终端实时进度 | |
src/pipeline/hooks/db_hook.py |
MySQL 持久化 | |
| 适配器 | src/pipeline/adapters/weixin.py |
微信平台适配器 |
src/pipeline/adapters/knowledge/ |
知识源扩展点 | |
| 工具 | tests/tools/weixin_tools.py |
微信 API 封装 |
| Skills | tests/skills/demand_analysis.md |
需求分析 LLM 提示词 |
tests/skills/article_finding_strategy.md |
内容搜索策略 | |
tests/skills/article_filter_strategy.md |
筛选评审策略 | |
tests/skills/account_precipitation.md |
账号沉淀策略 | |
| 可视化 | pipeline_visualize.py |
JSONL → HTML 渲染 |
| Agent 内核 | agent/core/runner.py |
AgentRunner |
agent/llm/openrouter.py |
LLM 调用 | |
agent/trace/models.py |
Trace / Message | |
agent/tools/registry.py |
工具注册 | |
| 配置 | src/config/agent_config.py |
主配置类 |
.env |
环境变量 |
文档版本:与当前仓库 src/pipeline + src/domain/search 实现同步(7 阶段,含 CoarseFilterStage 粗筛)。