| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408 |
- """
- Search Agent Harness — 约束驱动的搜索 Agent 入口。
- Harness Engineering 分层:
- 1. Budget Harness — 显式限定运行预算(超时、迭代上限、召回上限)
- 2. Planner Harness — 启动前打印运行计划,明确每阶段目标与约束
- 3. Observer Harness — 结构化进度回调,暴露关键检查点状态
- 4. Fallback Harness — DB 策略失败 / API Key 缺失的显式降级路径
- 前置:
- - OPEN_ROUTER_API_KEY
- - 可选:SEARCH_AGENT_DB_* 与表 search_agent_strategy(见 docs/search_agent_strategy.sql)
- 环境变量:
- - PIPELINE_QUERY / 默认 "伊朗、以色列、和平是永恒的主题"
- - PIPELINE_DEMAND_ID / 默认 "1"
- - PIPELINE_TIMEOUT / 整个 Agent 超时秒数,默认 1800(30 分钟)
- - PIPELINE_TARGET_COUNT / 目标文章数,默认取 RuntimePipelineConfig
- """
- from __future__ import annotations
- import asyncio
- import logging
- import os
- import shutil
- import sys
- import tempfile
- import time
- from dataclasses import dataclass, field
- from typing import Optional
- from uuid import uuid4
- from dotenv import load_dotenv
- from src.domain.search.core import SearchAgentCore
- from src.domain.search.policy import SearchAgentPolicy
- load_dotenv()
- # ── 日志级别由环境变量控制 ────────────
- _LOG_LEVEL = os.getenv("LOG_LEVEL", "DEBUG").upper()
- _CONSOLE_LEVEL = os.getenv("CONSOLE_LOG_LEVEL", "INFO").upper()
- _LOG_FMT = "%(asctime)s | %(levelname)-7s | %(name)s | %(message)s"
- _LOG_DATEFMT = "%Y-%m-%d %H:%M:%S"
- # 全局文件 handler 引用,供 main() 移动日志文件
- _file_handler: Optional[logging.FileHandler] = None
- _tmp_log_path: Optional[str] = None
- def _setup_logging() -> None:
- """
- 配置双通道日志:console(INFO)+ file(DEBUG)。
- 全量日志写入临时文件,pipeline 完成后移入 trace 目录。
- """
- global _file_handler, _tmp_log_path
- root = logging.getLogger()
- root.setLevel(getattr(logging, _LOG_LEVEL, logging.DEBUG))
- formatter = logging.Formatter(fmt=_LOG_FMT, datefmt=_LOG_DATEFMT)
- console = logging.StreamHandler(sys.__stdout__)
- console.setLevel(getattr(logging, _CONSOLE_LEVEL, logging.INFO))
- console.setFormatter(formatter)
- root.addHandler(console)
- tmp = tempfile.NamedTemporaryFile(
- delete=False, suffix=".log", prefix="search_agent_", mode="w", encoding="utf-8",
- )
- _tmp_log_path = tmp.name
- tmp.close()
- _file_handler = logging.FileHandler(_tmp_log_path, mode="w", encoding="utf-8")
- _file_handler.setLevel(logging.DEBUG)
- _file_handler.setFormatter(formatter)
- root.addHandler(_file_handler)
- for noisy in ("httpx", "httpcore", "urllib3", "asyncio"):
- logging.getLogger(noisy).setLevel(logging.WARNING)
- # agent 内核日志不写入全量日志文件(减少噪音)
- # 过滤 agent.core.runner / agent.llm.* / agent.tools.* / agent.trace.* 等
- class _AgentLogFilter(logging.Filter):
- def filter(self, record: logging.LogRecord) -> bool:
- return not record.name.startswith("agent.")
- _file_handler.addFilter(_AgentLogFilter())
- _setup_logging()
- logger = logging.getLogger(__name__)
- # ─────────────────────────────────────────────
- # 1. Budget Harness — 运行预算约束
- # ─────────────────────────────────────────────
- @dataclass
- class AgentBudget:
- """
- 显式声明 Agent 可消耗的资源上限。
- 约束驱动原则:
- - 所有上限必须在启动前确定,不允许在运行中隐式扩张。
- - 超时由 harness 层统一兜底,不依赖各 Stage 自己的超时。
- """
- timeout_seconds: int = 1800 # 整体超时(30 分钟)
- max_target_count: int = 10 # 单次运行最多产出文章数(防止无限扩张)
- max_fallback_rounds: int = 1 # content_search gate fallback 最大轮次(防止死循环)
- @classmethod
- def from_env(cls) -> "AgentBudget":
- return cls(
- timeout_seconds=int(os.getenv("PIPELINE_TIMEOUT", "1800")),
- max_target_count=int(os.getenv("PIPELINE_MAX_TARGET_COUNT", "10")),
- max_fallback_rounds=int(os.getenv("PIPELINE_MAX_FALLBACK_ROUNDS", "1")),
- )
- def validate(self) -> None:
- """前置断言:预算参数必须在合理范围内。"""
- if self.timeout_seconds < 30:
- raise ValueError(f"timeout_seconds 至少 30 秒,当前: {self.timeout_seconds}")
- if self.max_target_count < 1 or self.max_target_count > 200:
- raise ValueError(f"max_target_count 须在 [1, 200],当前: {self.max_target_count}")
- if self.max_fallback_rounds < 0 or self.max_fallback_rounds > 5:
- raise ValueError(f"max_fallback_rounds 须在 [0, 5],当前: {self.max_fallback_rounds}")
- # ─────────────────────────────────────────────
- # 2. Observer Harness — 结构化运行摘要
- # ─────────────────────────────────────────────
- @dataclass
- class RunSummary:
- """
- Agent 运行后的结构化摘要(非裸日志)。
- 设计意图:
- - 调用方可检查 success / error_message 决定后续动作。
- - 关键指标(candidate_count / filtered_count)可接入告警。
- """
- success: bool
- query: str
- demand_id: str
- policy_source: str = "unknown" # "db" | "default" | "override"
- trace_id: Optional[str] = None
- output_file: str = ""
- candidate_count: int = 0
- filtered_count: int = 0
- account_count: int = 0
- elapsed_seconds: float = 0.0
- error_message: str = ""
- stage_history: list = field(default_factory=list)
- def log(self) -> None:
- """结构化打印运行摘要。"""
- status = "✅ 成功" if self.success else "❌ 失败"
- logger.info("=" * 60)
- logger.info("Agent 运行摘要 %s", status)
- logger.info(" query : %s", self.query)
- logger.info(" demand_id : %s", self.demand_id)
- logger.info(" policy_source: %s", self.policy_source)
- logger.info(" trace_id : %s", self.trace_id)
- logger.info(" output_file : %s", self.output_file)
- logger.info(" 候选文章数 : %d", self.candidate_count)
- logger.info(" 入选文章数 : %d", self.filtered_count)
- logger.info(" 账号数 : %d", self.account_count)
- logger.info(" 耗时 : %.1f 秒", self.elapsed_seconds)
- if self.error_message:
- logger.error(" 错误信息 : %s", self.error_message)
- if self.stage_history:
- logger.info(" 阶段历史:")
- for record in self.stage_history:
- status_flag = "✓" if record.get("status") == "completed" else "✗"
- logger.info(
- " %s %-28s attempt=%d",
- status_flag,
- record.get("stage_name", "?"),
- record.get("attempt", 1),
- )
- logger.info("=" * 60)
- # ─────────────────────────────────────────────
- # 3. Planner Harness — 启动前打印运行计划
- # ─────────────────────────────────────────────
- def print_run_plan(query: str, demand_id: str, budget: AgentBudget, trace_id: str) -> dict:
- """
- 在 Agent 启动前打印结构化运行计划,并返回计划数据供 trace 使用。
- 目的:
- - 使运行意图可见、可审计,便于调试和追溯。
- - 明确各阶段目标与约束,防止"黑盒"执行。
- """
- logger.info("=" * 60)
- logger.info("▶ Search Agent 运行计划")
- logger.info(" Trace ID : %s", trace_id)
- logger.info(" Query : %s", query)
- logger.info(" Demand ID : %s", demand_id or "(未指定,使用 default 策略)")
- logger.info(" 超时上限 : %d 秒", budget.timeout_seconds)
- logger.info(" 目标文章上限 : %d 篇", budget.max_target_count)
- logger.info(" 最大补召回轮次: %d 轮", budget.max_fallback_rounds)
- logger.info("")
- logger.info(" 阶段规划:")
- logger.info(" 1. [demand_analysis ] ← 需求理解,产出搜索策略(无工具调用)")
- logger.info(" 2. [content_search ] ← 按关键词召回候选文章")
- logger.info(" └─ Gate: SearchCompletenessGate — 候选不足则 abort")
- logger.info(" 3. [hard_filter ] ← 去重 + URL / 时间基础校验")
- logger.info(" 4. [coarse_filter ] ← LLM 标题语义粗筛")
- logger.info(" 5. [quality_filter ] ← 数据指标评分 + LLM 正文精排")
- logger.info(" └─ Gate: FilterSufficiencyGate — 不足则回退补召回(最多 %d 轮)",
- budget.max_fallback_rounds)
- logger.info(" 6. [account_precipitate] ← 账号信息沉淀")
- logger.info(" 7. [output_persist ] ← 输出结构化 JSON")
- logger.info(" └─ Gate: OutputSchemaGate — 结构校验")
- logger.info("=" * 60)
- return {
- "trace_id": trace_id,
- "query": query,
- "demand_id": demand_id or "",
- "timeout_seconds": budget.timeout_seconds,
- "max_target_count": budget.max_target_count,
- "max_fallback_rounds": budget.max_fallback_rounds,
- "stages": [
- {"name": "demand_analysis", "label": "需求理解,产出搜索策略"},
- {"name": "content_search", "label": "按关键词召回候选文章", "gate": "SearchCompletenessGate"},
- {"name": "hard_filter", "label": "去重 + 基础规则过滤"},
- {"name": "coarse_filter", "label": "LLM 标题语义粗筛"},
- {"name": "quality_filter", "label": "数据指标评分 + LLM 正文精排", "gate": "FilterSufficiencyGate"},
- {"name": "account_precipitate", "label": "账号信息沉淀"},
- {"name": "output_persist", "label": "输出结构化 JSON", "gate": "OutputSchemaGate"},
- ],
- }
- # ─────────────────────────────────────────────
- # 4. Fallback Harness — 前置检查与降级路径
- # ─────────────────────────────────────────────
- def validate_prerequisites() -> None:
- """
- 前置条件检查(Harness 级别,不依赖 Core 内部检查)。
- 设计意图:
- - 把必须满足的约束提升到最外层,让失败快速、信息明确。
- - 避免在深层 Stage 里才触发 "OPEN_ROUTER_API_KEY 未设置"。
- """
- api_key = os.getenv("OPEN_ROUTER_API_KEY", "").strip()
- if not api_key:
- raise EnvironmentError(
- "缺少必要环境变量: OPEN_ROUTER_API_KEY\n"
- "请在 .env 文件或系统环境中设置该变量后重试。"
- )
- # ─────────────────────────────────────────────
- # 5. 主流程 — Harness 统一编排
- # ─────────────────────────────────────────────
- async def run_with_harness(
- query: str,
- demand_id: str,
- budget: AgentBudget,
- trace_id: str,
- use_db_policy: bool = True,
- run_plan: dict | None = None,
- ) -> RunSummary:
- """
- 带 Harness 的 Agent 执行入口。
- 职责分层:
- - 本函数只做"约束注入 + 超时包裹 + 摘要采集"。
- - 业务逻辑委托给 SearchAgentCore。
- - 不在这里写 if/else 业务判断。
- """
- start = time.monotonic()
- summary = RunSummary(success=False, query=query, demand_id=demand_id, trace_id=trace_id)
- # --- 策略来源标记(Observer 用) ---
- core = SearchAgentCore()
- policy_override: Optional[SearchAgentPolicy] = None
- if use_db_policy:
- try:
- # 预读策略仅用于确认 DB 连通性和标记来源;
- # SearchAgentCore.run() 内部会用同一 demand_id 再次加载。
- await core.load_policy(demand_id or None)
- summary.policy_source = "db"
- logger.info("策略已从 DB 加载: demand_id=%s", demand_id)
- except Exception as exc:
- logger.warning("DB 策略读取失败,降级为默认策略: %s", exc)
- policy_override = SearchAgentPolicy.defaults()
- summary.policy_source = "default(fallback)"
- else:
- policy_override = SearchAgentPolicy.defaults()
- summary.policy_source = "default"
- # --- 预算注入:target_count 不超过 max_target_count ---
- from src.pipeline.config.pipeline_config import RuntimePipelineConfig
- runtime = RuntimePipelineConfig.from_env()
- effective_target = min(runtime.target_count, budget.max_target_count)
- if effective_target != runtime.target_count:
- logger.info(
- "target_count 被 Budget Harness 限制: %d → %d",
- runtime.target_count,
- effective_target,
- )
- # --- 超时包裹执行 ---
- try:
- ctx = await asyncio.wait_for(
- core.run(
- query=query,
- demand_id=demand_id,
- target_count=effective_target,
- use_db_policy=(policy_override is None),
- policy_override=policy_override,
- trace_id=trace_id,
- run_plan=run_plan,
- ),
- timeout=budget.timeout_seconds,
- )
- except asyncio.TimeoutError:
- summary.elapsed_seconds = time.monotonic() - start
- summary.error_message = f"Agent 超时(>{budget.timeout_seconds}s),已中止"
- logger.error(summary.error_message)
- return summary
- except Exception as exc:
- summary.elapsed_seconds = time.monotonic() - start
- summary.error_message = str(exc)
- logger.exception("Agent 运行异常: %s", exc)
- return summary
- # --- 采集 Observer 摘要 ---
- summary.success = True
- summary.output_file = ctx.metadata.get("output_file", "")
- summary.candidate_count = len(ctx.candidate_articles)
- summary.filtered_count = len(ctx.filtered_articles)
- summary.account_count = len(ctx.accounts)
- summary.elapsed_seconds = time.monotonic() - start
- summary.stage_history = [
- {
- "stage_name": r.stage_name,
- "status": r.status,
- "attempt": r.attempt,
- }
- for r in ctx.stage_history
- ]
- return summary
- async def main() -> None:
- # ① 前置检查(Fallback Harness)
- validate_prerequisites()
- # ② 读取运行参数
- query = os.getenv("PIPELINE_QUERY", "伊朗以色列冲突、中老年人会关注什么?")
- demand_id = os.getenv("PIPELINE_DEMAND_ID", "1")
- # ③ 预算约束(Budget Harness)
- budget = AgentBudget.from_env()
- budget.validate()
- # ④ 生成全局 trace_id,贯穿整个运行周期
- trace_id = str(uuid4())
- logger.info("Trace ID: %s", trace_id)
- # ⑤ 运行计划(Planner Harness)
- run_plan = print_run_plan(query=query, demand_id=demand_id, budget=budget, trace_id=trace_id)
- # ⑥ 执行(带约束 + 观测)
- summary = await run_with_harness(
- query=query,
- demand_id=demand_id,
- budget=budget,
- trace_id=trace_id,
- use_db_policy=True,
- run_plan=run_plan,
- )
- # ⑦ 结构化输出摘要(Observer Harness)
- summary.log()
- # ⑧ 将全量日志移入 trace 目录
- global _file_handler, _tmp_log_path
- if _file_handler and _tmp_log_path and os.path.exists(_tmp_log_path):
- try:
- _file_handler.close()
- trace_dir = os.path.join("tests", "traces", trace_id)
- os.makedirs(trace_dir, exist_ok=True)
- dest = os.path.join(trace_dir, "full_log.log")
- shutil.move(_tmp_log_path, dest)
- logger.info("完整日志已保存: %s", dest)
- except Exception as exc:
- logger.warning("移动日志文件失败: %s", exc)
- # ⑨ 非零退出码(让 CI/调度系统能感知失败)
- if not summary.success:
- raise SystemExit(1)
- if __name__ == "__main__":
- asyncio.run(main())
|