search-agent-architecture.md 17 KB

Search Agent 系统架构

本文提供 LongArticleSearchAgent 的全局架构视图,涵盖从 CLI 入口到外部 API 调用的全部分层、数据流与关键组件。


1. 系统分层总览

flowchart TB
    subgraph Entry["入口层"]
        RSA["run_search_agent.py<br/>(Harness 模式)"]
        RP["run_pipeline.py<br/>(简易 CLI)"]
        APP["app.py<br/>(Web API / Quart)"]
    end

    subgraph Domain["领域层 src/domain"]
        SAC["SearchAgentCore"]
        SAP["SearchAgentPolicy"]
        SAPR["SearchAgentPolicyRepository"]
    end

    subgraph Pipeline["编排层 src/pipeline"]
        PO["PipelineOrchestrator"]
        CTX["PipelineContext"]
        STAGES["Stages × 7"]
        GATES["QualityGates × 3"]
        HOOKS["Hooks × 4"]
    end

    subgraph AgentCore["Agent 内核 agent/"]
        AR["AgentRunner"]
        LLM["OpenRouter LLM"]
        TR["Trace / Message"]
        TOOLS["Tool Registry"]
        SK["Skill System"]
    end

    subgraph Adapters["适配器层"]
        WA["WeixinToolAdapter"]
        KS["KnowledgeSource"]
    end

    subgraph Infra["基础设施层 src/infra"]
        HTTP["AsyncHttpClient"]
        MYSQL["AsyncMySQLPool"]
        LOG["Logging / Trace"]
    end

    subgraph External["外部服务"]
        WXAPI["微信搜索 API<br/>crawler-cn.aiddit.com"]
        ORAPI["OpenRouter API"]
        DB[("MySQL<br/>search_agent / supply_*")]
    end

    RSA --> SAC
    RP --> PO
    APP --> SAC
    SAC --> SAPR
    SAPR --> MYSQL
    SAC --> PO
    PO --> CTX
    PO --> STAGES
    PO --> GATES
    PO --> HOOKS
    STAGES -->|"DemandAnalysis<br/>CoarseFilter<br/>QualityFilter"| AR
    AR --> LLM
    AR --> TR
    AR --> TOOLS
    AR --> SK
    STAGES --> WA
    STAGES --> KS
    WA --> HTTP
    HTTP --> WXAPI
    LLM --> ORAPI
    HOOKS -->|"DatabasePersistHook"| MYSQL
    HOOKS -->|"PipelineTraceHook"| LOG
    MYSQL --> DB

2. Pipeline 阶段与门禁流程

采用 粗筛 + 精排 两阶段过滤架构:先用标题做 LLM 语义粗筛,再对通过的文章拉取正文做 LLM 精排。

flowchart LR
    S1["1. demand_analysis<br/>🔍 需求理解与特征分层"]
    S2["2. content_search<br/>📡 按策略搜索候选文章"]
    G1{"SearchCompleteness<br/>Gate 🚦"}
    S3["3. hard_filter<br/>🧹 去重与基础规则过滤"]
    S4["4. coarse_filter<br/>🏷️ LLM 标题语义粗筛"]
    S5["5. quality_filter<br/>⭐ 正文精排(数据 + LLM)"]
    G2{"FilterSufficiency<br/>Gate 🚦"}
    S6["6. account_precipitate<br/>👤 基于文章聚合公众号"]
    S7["7. output_persist<br/>💾 生成标准输出并落盘"]
    G3{"OutputSchema<br/>Gate 🚦"}

    S1 --> S2 --> G1 --> S3 --> S4 --> S5 --> G2 --> S6 --> S7 --> G3
    G2 -->|"fallback: 候选不足"| S2
    G1 -->|"abort: 严重不足"| ABORT["Pipeline 终止"]
    G3 -->|"abort: Schema 校验失败"| ABORT

粗筛 → 精排 设计原理

维度 粗筛 (CoarseFilter) 精排 (QualityFilter)
输入 文章标题 + 来源关键词 文章正文(detail API 拉取)
方法 LLM 批量语义判断(每批 ~20 篇) 数据指标评分 + LLM 逐篇复评
目标 快速淘汰明显不相关的文章 精确判定相关性 + 兴趣度
原则 宽进:不确定就放行 准确:LLM 基于正文做最终判定
性能优势 减少后续 detail API 调用量

3. 数据流与 Context 变迁

flowchart TD
    subgraph Input["输入"]
        Q["query + demand_id + target_count"]
    end

    subgraph S1["demand_analysis"]
        DA["DemandAnalysisResult<br/>─ 实质/形式/上层/下层特征<br/>─ 精准词 + 主题词<br/>─ 筛选关注点"]
    end

    subgraph S2["content_search"]
        CA["CandidateArticle[]<br/>─ title, url, publish_time<br/>─ view/like/share count<br/>─ source_keyword"]
    end

    subgraph S3["hard_filter"]
        CA2["CandidateArticle[]<br/>(去重 + 规则过滤后)"]
    end

    subgraph S4["coarse_filter"]
        CA3["CandidateArticle[]<br/>(LLM 标题粗筛后)<br/>+ metadata._coarse_filter_log"]
    end

    subgraph S5["quality_filter"]
        FA["FilteredArticle[]<br/>─ relevance/interest level<br/>─ body_text, reason<br/>─ phase(heuristic/LLM)"]
    end

    subgraph S6["account_precipitate"]
        ACC["AccountInfo[]<br/>─ account_name, wx_gh<br/>─ article_count<br/>─ sample_articles"]
        REL["article_account_relations"]
    end

    subgraph S7["output_persist"]
        OUT["PipelineOutput → JSON 文件<br/>tests/output/{trace_id}/output.json"]
    end

    Q --> S1
    S1 -->|"ctx.demand_analysis"| S2
    S2 -->|"ctx.candidate_articles"| S3
    S3 -->|"ctx.candidate_articles"| S4
    S4 -->|"ctx.candidate_articles (筛减后)"| S5
    S5 -->|"ctx.filtered_articles"| S6
    S6 -->|"ctx.accounts<br/>ctx.article_account_relations"| S7

4. 核心组件详解

4.1 入口层

入口 文件 说明
Harness CLI run_search_agent.py 带预算控制(AgentBudget)、观测(Observer)、回退策略的生产入口
简易 CLI run_pipeline.py 无 DB 策略,硬编码 query,开发调试用
Web API app.py Quart 服务,REST 端点调用 SearchAgentCore

4.2 领域层 (src/domain/search/)

classDiagram
    class SearchAgentCore {
        +run(query, demand_id, ...) RunSummary
        -_load_policy(demand_id) SearchAgentPolicy
        -_build_pipeline(ctx) PipelineOrchestrator
    }
    class SearchAgentPolicy {
        +max_keywords: int
        +keyword_priority: str
        +min_candidate_multiplier: float
        +enable_llm_review: bool
        +extra_keywords: list
    }
    class SearchAgentPolicyRepository {
        +load(demand_id) SearchAgentPolicy
    }

    SearchAgentCore --> SearchAgentPolicyRepository : 加载策略
    SearchAgentCore --> SearchAgentPolicy : 应用到 ctx
    SearchAgentPolicyRepository --> SearchAgentPolicy : 返回

4.3 编排层 (src/pipeline/)

classDiagram
    class PipelineOrchestrator {
        +run(ctx) PipelineContext
        -_run_stage(stage, ctx)
        -_resolve_gate(gate, result, ctx)
    }
    class PipelineContext {
        +task_id: str
        +trace_id: str
        +query: str
        +demand_analysis: DemandAnalysisResult
        +candidate_articles: list
        +filtered_articles: list
        +accounts: list
        +stage_history: list
        +metadata: dict
    }
    class Stage {
        <<abstract>>
        +execute(ctx)*
        +validate_input(ctx)*
        +on_retry(ctx, attempt)
    }
    class QualityGate {
        <<abstract>>
        +check(ctx)* GateResult
    }
    class PipelineHook {
        <<abstract>>
        +on_pipeline_start(ctx)
        +on_stage_start(name, ctx)
        +on_stage_complete(name, ctx)
        +on_gate_check(name, result, ctx)
        +on_error(name, error, ctx)
        +on_pipeline_complete(ctx)
    }

    PipelineOrchestrator --> PipelineContext
    PipelineOrchestrator --> Stage
    PipelineOrchestrator --> QualityGate
    PipelineOrchestrator --> PipelineHook

4.4 七阶段实现

# Stage 类型 关键依赖 写入 ctx
1 DemandAnalysisStage LLM (AgentRunner) StageAgentExecutor demand_analysis
2 ContentSearchStage API 调用 WeixinToolAdapter.search candidate_articles
3 HardFilterStage 纯代码 candidate_articles (清洗后)
4 CoarseFilterStage LLM 批量判断 StageAgentExecutor.run_simple_llm_json candidate_articles (粗筛后), metadata["_coarse_filter_log"]
5 QualityFilterStage API + LLM WeixinToolAdapter.get_article_detail filtered_articles
6 AccountPrecipitateStage API 调用 WeixinToolAdapter.get_account accounts, article_account_relations
7 OutputPersistStage 纯代码 output, metadata["output_file"]

Stage 4: CoarseFilterStage(粗筛)

  • 将候选文章标题列表 + query + demand_analysis 特征打包成 LLM prompt
  • 每批 ~20 篇,批量调用 run_simple_llm_json() 做语义相关性判断
  • 只判断 pass(通过)或 reject(淘汰),附带简短理由
  • 宽松原则:不确定时倾向 pass,LLM 调用失败时全部放行
  • 粗筛日志写入 ctx.metadata["_coarse_filter_log"]

Stage 5: QualityFilterStage(精排)

  • 对粗筛通过的文章调用 detail API 获取正文
  • interest(兴趣度)纯数据驱动:正文长度、阅读量、互动率
  • relevance(相关性)由 LLM 语义分析决定(关键词匹配结果仅作参考信息传给 LLM)
  • 所有非 spam 文章都进入 LLM 复评,不做启发式提前淘汰
  • 按 relevance/interest/阅读量/发布时间排序后截断到目标数量

4.5 三道门禁

Gate 挂载阶段后 通过条件 失败动作
SearchCompletenessGate content_search 候选 >= target * multiplier abort
FilterSufficiencyGate quality_filter 入选 >= target fallbackcontent_search
OutputSchemaGate output_persist JSON Schema 合法 abort

4.6 四种 Hook

Hook 路径 职责
PipelineTraceHook hooks/pipeline_trace_hook.py JSONL 事件流落盘(含 decisions 决策数据),供 HTML 可视化
LiveProgressHook hooks/live_progress_hook.py 终端实时进度打印
DatabasePersistHook hooks/db_hook.py 写入 MySQL supply_* 表(可选)
TraceHook hooks/trace_hook.py 结构化日志

5. Agent 内核集成

Pipeline 中需要 LLM 能力的阶段通过 StageAgentExecutor 桥接到 agent/ 框架:

sequenceDiagram
    participant Stage as Pipeline Stage
    participant SAE as StageAgentExecutor
    participant AR as AgentRunner
    participant LLM as OpenRouter API
    participant TS as FileSystemTraceStore

    Stage->>SAE: run_json_stage / run_simple_llm_json
    SAE->>AR: run(system_prompt, tools, config)
    loop Agent Loop (仅 run_json_stage)
        AR->>LLM: chat completion
        LLM-->>AR: response (text / tool_call)
        AR->>AR: execute tool if needed
    end
    AR->>TS: persist trace
    AR-->>SAE: final assistant message
    SAE->>SAE: extract JSON from response
    SAE-->>Stage: parsed result

涉及 LLM 的阶段:

  • DemandAnalysisStage:将 query 分解为特征分层 + 搜索策略(run_json_stage,带工具调用)
  • CoarseFilterStage:批量标题语义相关性判断(run_simple_llm_json,单轮无工具)
  • QualityFilterStageenable_llm_review=True 时):基于正文的 LLM 精排复评(run_simple_llm_json

6. 适配器与外部 API

flowchart LR
    subgraph Adapter["WeixinToolAdapter"]
        A1["search(keyword)"]
        A2["get_article_detail(url)"]
        A3["get_account(url)"]
    end

    subgraph Tools["tests/tools/weixin_tools.py"]
        T1["weixin_search()"]
        T2["fetch_article_detail()"]
        T3["fetch_weixin_account()"]
    end

    subgraph API["crawler-cn.aiddit.com"]
        E1["/crawler/wei_xin/keyword"]
        E2["/crawler/wei_xin/detail"]
        E3["/crawler/wei_xin/account"]
    end

    A1 --> T1 --> E1
    A2 --> T2 --> E2
    A3 --> T3 --> E3

7. 可观测性与追踪

flowchart LR
    PO["PipelineOrchestrator"]

    subgraph Hooks["Hook 回调"]
        H1["PipelineTraceHook"]
        H2["LiveProgressHook"]
        H3["DatabasePersistHook"]
    end

    subgraph Output["产物"]
        JSONL["tests/traces/{trace_id}/pipeline.jsonl"]
        HTML["pipeline_trace.html"]
        TERM["终端 stdout"]
        DBOUT[("MySQL supply_* 表")]
    end

    PO --> H1 --> JSONL
    JSONL -->|"pipeline_visualize.py"| HTML
    PO --> H2 --> TERM
    PO --> H3 --> DBOUT

JSONL 事件类型initstage_startstage_complete(含 decisions 决策详情)→ gate_checkerrorcomplete

HTML 可视化pipeline_visualize.py 读取 JSONL 生成深色主题 Timeline 页面,每个阶段的决策数据渲染为可折叠的 <details> 卡片。可视化内容包括:

  • 粗筛阶段:标题级别的 pass/reject 表格,含判断理由
  • 精排阶段:相关性和兴趣度分列显示,含 LLM 评审理由

8. 配置体系

flowchart TD
    ENV[".env 环境变量"]

    subgraph Configs["配置类"]
        MC["LongArticlesSearchAgentConfig<br/>(Pydantic BaseSettings)"]
        RPC["RuntimePipelineConfig"]
        AB["AgentBudget"]
        SAMC["SearchAgentMySQLConfig"]
    end

    ENV --> MC
    ENV --> RPC
    ENV --> AB
    MC --> SAMC

    MC -->|"search_agent_db"| SAMC
    RPC -->|"model / temperature<br/>target_count / max_iterations"| PO["PipelineOrchestrator"]
    AB -->|"timeout / max_fallback_rounds"| RSA["run_search_agent.py"]
    SAMC -->|"host/port/db"| MYSQL["AsyncMySQLPool"]
变量 默认值 说明
OPEN_ROUTER_API_KEY 必填 OpenRouter 调用密钥
MODEL anthropic/claude-sonnet-4-5 LLM 模型
PIPELINE_TARGET_COUNT 10 目标文章数
PIPELINE_TEMPERATURE 0.2 LLM 温度
PIPELINE_MAX_ITERATIONS 12 单阶段 Agent 最大轮次
SEARCH_AGENT_DB_HOST / PORT / DB 可选 MySQL 连接(策略读取 + 结果持久化)

9. 数据库表结构

erDiagram
    search_agent_strategy {
        int id PK
        varchar demand_id
        varchar strategy_code
        json config_json
        int version
        boolean enabled
    }

    supply_task {
        varchar task_id PK
        varchar query
        varchar demand_id
        varchar status
        timestamp created_at
    }

    supply_candidate_content {
        int id PK
        varchar task_id FK
        varchar title
        varchar url
        varchar source_keyword
    }

    supply_content_score {
        int id PK
        int candidate_id FK
        varchar relevance_level
        varchar interest_level
        text reason
    }

    supply_account_profile {
        int id PK
        varchar task_id FK
        varchar account_name
        varchar wx_gh
        int article_count
    }

    search_agent_strategy ||--o{ supply_task : "策略驱动"
    supply_task ||--o{ supply_candidate_content : "候选"
    supply_candidate_content ||--o| supply_content_score : "评分"
    supply_task ||--o{ supply_account_profile : "账号"

10. 关键文件索引

分层 文件 说明
入口 run_search_agent.py Harness 生产入口
run_pipeline.py 简易 CLI
app.py Web API
领域 src/domain/search/core.py SearchAgentCore
src/domain/search/policy.py SearchAgentPolicy 策略模型
src/domain/search/repository.py 策略 DB 加载
编排 src/pipeline/orchestrator.py PipelineOrchestrator
src/pipeline/base.py Stage / Gate / Hook 抽象基类
src/pipeline/context.py PipelineContext 与数据结构
src/pipeline/runner.py Pipeline 构建器
src/pipeline/config/pipeline_config.py RuntimePipelineConfig
阶段 src/pipeline/stages/demand_analysis.py 需求分析
src/pipeline/stages/content_search.py 内容召回
src/pipeline/stages/content_filter.py 硬规则 + 质量精排
src/pipeline/stages/coarse_filter.py LLM 标题粗筛
src/pipeline/stages/account_precipitate.py 账号沉淀
src/pipeline/stages/output_persist.py 输出落盘
src/pipeline/stages/common.py StageAgentExecutor (LLM 桥接)
门禁 src/pipeline/gates/search_completeness.py 召回充分性
src/pipeline/gates/filter_sufficiency.py 筛选充分性
src/pipeline/gates/output_schema.py 输出 Schema 校验
Hook src/pipeline/hooks/pipeline_trace_hook.py JSONL 追踪 + 决策落盘
src/pipeline/hooks/live_progress_hook.py 终端实时进度
src/pipeline/hooks/db_hook.py MySQL 持久化
适配器 src/pipeline/adapters/weixin.py 微信平台适配器
src/pipeline/adapters/knowledge/ 知识源扩展点
工具 tests/tools/weixin_tools.py 微信 API 封装
Skills tests/skills/demand_analysis.md 需求分析 LLM 提示词
tests/skills/article_finding_strategy.md 内容搜索策略
tests/skills/article_filter_strategy.md 筛选评审策略
tests/skills/account_precipitation.md 账号沉淀策略
可视化 pipeline_visualize.py JSONL → HTML 渲染
Agent 内核 agent/core/runner.py AgentRunner
agent/llm/openrouter.py LLM 调用
agent/trace/models.py Trace / Message
agent/tools/registry.py 工具注册
配置 src/config/agent_config.py 主配置类
.env 环境变量

文档版本:与当前仓库 src/pipeline + src/domain/search 实现同步(7 阶段,含 CoarseFilterStage 粗筛)。