| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451 |
- """
- 内容寻找 Agent 执行入口
- 日志设计:
- - agent.log: JSONL 格式的完整执行追踪(无截断),用于 HTML 可视化
- - 控制台: 人类可读的简要输出
- """
- import logging
- import sys
- import os
- import json
- import traceback as tb_mod
- from typing import Dict, Any, Optional, List
- from pathlib import Path
- from datetime import datetime
- PROJECT_ROOT = Path(__file__).resolve().parent
- log_dir = PROJECT_ROOT / '.cache'
- log_dir.mkdir(exist_ok=True)
- TRACE_LOG_PATH = log_dir / 'agent.log'
- # ============================================================
- # TraceWriter: 结构化 JSONL 追踪日志(写入 agent.log,无截断)
- # ============================================================
- class TraceWriter:
- """将 Agent 执行的每一步写为 JSONL 事件到 agent.log,不做任何截断。"""
- def __init__(self, path: Path):
- self._file = open(path, 'w', encoding='utf-8')
- self._iteration = 0
- def _ts(self) -> str:
- return datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
- def _write(self, event: dict):
- event["ts"] = self._ts()
- self._file.write(json.dumps(event, ensure_ascii=False) + '\n')
- self._file.flush()
- # --- 任务生命周期 ---
- def log_init(self, query: str, demand_id: int, model: str, trace_id: str = None):
- self._write({
- "type": "init",
- "query": query,
- "demand_id": demand_id,
- "model": model,
- "trace_id": trace_id,
- })
- def log_complete(self, trace_id: str, status: str, error: str = None):
- self._write({
- "type": "complete",
- "trace_id": trace_id,
- "status": status,
- "error": error,
- "total_iterations": self._iteration,
- })
- # --- 框架日志 ---
- def log_framework(self, logger_name: str, level: str, message: str):
- self._write({
- "type": "framework",
- "logger": logger_name,
- "level": level,
- "msg": message,
- })
- # --- Agent 思考 / LLM 输出 ---
- def log_assistant(self, text: str, tool_calls: List[dict] = None,
- reasoning: str = None, tokens: dict = None):
- """记录 LLM 的完整输出(思考文本 + 工具调用,不截断)。"""
- self._iteration += 1
- parsed_calls = []
- for tc in (tool_calls or []):
- func = tc.get("function", {})
- name = func.get("name", "unknown")
- args_str = func.get("arguments", "{}")
- try:
- params = json.loads(args_str)
- except (json.JSONDecodeError, TypeError):
- params = args_str
- parsed_calls.append({
- "name": name,
- "params": params,
- "call_id": tc.get("id", ""),
- })
- self._write({
- "type": "assistant",
- "iteration": self._iteration,
- "text": text or "",
- "tool_calls": parsed_calls,
- "reasoning": reasoning or "",
- "tokens": tokens or {},
- })
- # --- 工具结果 ---
- def log_tool_result(self, tool_name: str, result: Any, call_id: str = ""):
- """记录工具的完整返回(不截断)。"""
- if isinstance(result, list):
- texts = [
- p.get("text", "") for p in result
- if isinstance(p, dict) and p.get("type") == "text"
- ]
- output = "\n".join(texts) if texts else json.dumps(result, ensure_ascii=False)
- elif isinstance(result, str):
- output = result
- else:
- output = str(result)
- self._write({
- "type": "tool_result",
- "name": tool_name,
- "call_id": call_id,
- "output": output,
- })
- # --- 错误 ---
- def log_error(self, message: str, traceback_str: str = ""):
- self._write({
- "type": "error",
- "msg": message,
- "traceback": traceback_str,
- })
- def close(self):
- if self._file and not self._file.closed:
- self._file.close()
- class JsonlLogHandler(logging.Handler):
- """将 Python logging 记录路由到 TraceWriter(JSONL 格式)。"""
- def __init__(self, trace_writer: TraceWriter):
- super().__init__()
- self.trace_writer = trace_writer
- def emit(self, record: logging.LogRecord):
- try:
- self.trace_writer.log_framework(
- record.name,
- record.levelname,
- record.getMessage(),
- )
- except Exception:
- pass
- # ============================================================
- # 控制台日志(人类可读)
- # ============================================================
- console_handler = logging.StreamHandler(sys.stdout)
- console_handler.setFormatter(
- logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
- )
- logging.basicConfig(level=logging.INFO, handlers=[console_handler], force=True)
- logger = logging.getLogger(__name__)
- # ============================================================
- # 第三方 / 项目依赖导入(放在 logging 配置之后)
- # ============================================================
- from dotenv import load_dotenv
- load_dotenv()
- from tools import fetch_account_article_list, fetch_weixin_account, weixin_search
- from agent import AgentRunner, RunConfig, FileSystemTraceStore, Trace, Message
- from agent.llm import create_openrouter_llm_call
- from agent.llm.prompts import SimplePrompt
- from agent.tools.builtin.knowledge import KnowledgeConfig
- DEFAULT_QUERY = "伊朗、以色列、和平是永恒的主题"
- DEFAULT_DEMAND_ID = 1
- # ============================================================
- # 工具函数
- # ============================================================
- def _normalize_ascii_double_quotes(text: str) -> str:
- """将字符串中的 ASCII 双引号 `"` 规范化为中文双引号 `"`、`"`。"""
- if '"' not in text:
- return text
- chars: list[str] = []
- open_quote = True
- for ch in text:
- if ch == '"':
- chars.append("\u201c" if open_quote else "\u201d")
- open_quote = not open_quote
- else:
- chars.append(ch)
- return "".join(chars)
- def _sanitize_json_strings(value: Any) -> Any:
- if isinstance(value, str):
- return _normalize_ascii_double_quotes(value)
- if isinstance(value, list):
- return [_sanitize_json_strings(v) for v in value]
- if isinstance(value, dict):
- return {k: _sanitize_json_strings(v) for k, v in value.items()}
- return value
- def _sanitize_output_json(output_json_path: Path) -> None:
- if not output_json_path.exists():
- logger.warning(f"未找到 output.json,跳过清洗: {output_json_path}")
- return
- try:
- data = json.loads(output_json_path.read_text(encoding="utf-8"))
- except Exception as e:
- logger.warning(f"output.json 解析失败,跳过清洗: {e}")
- return
- cleaned = _sanitize_json_strings(data)
- output_json_path.write_text(
- json.dumps(cleaned, ensure_ascii=False, indent=2),
- encoding="utf-8"
- )
- logger.info(f"已完成 output.json 引号清洗: {output_json_path}")
- # ============================================================
- # 控制台流式输出(简洁版)
- # ============================================================
- def _print_assistant(text: str, tool_calls: list):
- """向控制台打印 Agent 输出的摘要。"""
- if text:
- logger.info("\n%s", text)
- for tc in (tool_calls or []):
- name = tc.get("function", {}).get("name", "unknown")
- if name not in ("goal", "get_current_context"):
- logger.info("[工具] %s", name)
- def _print_tool_result(tool_name: str):
- """向控制台打印工具结果标记。"""
- if tool_name not in ("goal", "get_current_context"):
- logger.info("[结果] %s ✓", tool_name)
- # ============================================================
- # Agent 执行
- # ============================================================
- async def run_agent(
- query: Optional[str] = None,
- demand_id: Optional[int] = None,
- stream_output: bool = True,
- ) -> Dict[str, Any]:
- query = query or DEFAULT_QUERY
- demand_id = demand_id or DEFAULT_DEMAND_ID
- # 创建 TraceWriter → agent.log(JSONL,完整无截断)
- tw = TraceWriter(TRACE_LOG_PATH)
- # 将 Python logging 也路由到 JSONL
- jsonl_handler = JsonlLogHandler(tw)
- jsonl_handler.setLevel(logging.DEBUG)
- logging.getLogger().addHandler(jsonl_handler)
- prompt_path = PROJECT_ROOT / "content_finder.md"
- prompt = SimplePrompt(prompt_path)
- output_dir = str(PROJECT_ROOT / "output")
- demand_id_str = str(demand_id) if demand_id is not None else ""
- messages = prompt.build_messages(query=query, output_dir=output_dir, demand_id=demand_id_str)
- api_key = os.getenv("OPEN_ROUTER_API_KEY")
- if not api_key:
- raise ValueError("OPEN_ROUTER_API_KEY 未设置")
- model_name = prompt.config.get("model", "sonnet-4.6")
- model = os.getenv("MODEL", f"anthropic/claude-{model_name}")
- temperature = float(prompt.config.get("temperature", 0.3))
- max_iterations = 30
- trace_dir = str(PROJECT_ROOT / "traces")
- skills_dir = str(PROJECT_ROOT / "skills")
- Path(trace_dir).mkdir(parents=True, exist_ok=True)
- store = FileSystemTraceStore(base_path=trace_dir)
- allowed_tools = [
- "weixin_search",
- "fetch_weixin_account",
- "fetch_account_article_list",
- "fetch_article_detail",
- ]
- runner = AgentRunner(
- llm_call=create_openrouter_llm_call(model=model),
- trace_store=store,
- skills_dir=skills_dir,
- )
- config = RunConfig(
- name="内容寻找",
- model=model,
- temperature=temperature,
- max_iterations=max_iterations,
- tools=allowed_tools,
- extra_llm_params={"max_tokens": 8192},
- knowledge=KnowledgeConfig(
- enable_extraction=False,
- enable_completion_extraction=False,
- enable_injection=False,
- )
- )
- tw.log_init(query, demand_id, model)
- trace_id = None
- try:
- async for item in runner.run(messages=messages, config=config):
- # ---------- Trace 对象 ----------
- if isinstance(item, Trace):
- trace_id = item.trace_id
- tw._write({"type": "trace_status", "trace_id": trace_id, "status": item.status})
- if item.status == "completed":
- if trace_id:
- output_json_path = Path(output_dir) / trace_id / "output.json"
- _sanitize_output_json(output_json_path)
- tw.log_complete(trace_id, "completed")
- logger.info(f"Agent 执行完成: trace_id={trace_id}")
- return {"trace_id": trace_id, "status": "completed"}
- elif item.status == "failed":
- tw.log_complete(trace_id, "failed", item.error_message)
- logger.error(f"Agent 执行失败: {item.error_message}")
- return {"trace_id": trace_id, "status": "failed", "error": item.error_message}
- # ---------- Message 对象 ----------
- elif isinstance(item, Message):
- # --- Assistant 消息(思考 + 工具调用)---
- if item.role == "assistant":
- content = item.content
- if isinstance(content, dict):
- text = content.get("text", "")
- tool_calls = content.get("tool_calls", [])
- reasoning = content.get("reasoning_content", "")
- # JSONL: 完整记录,不截断
- tw.log_assistant(
- text=text,
- tool_calls=tool_calls,
- reasoning=reasoning,
- tokens={
- "prompt": getattr(item, "prompt_tokens", None),
- "completion": getattr(item, "completion_tokens", None),
- },
- )
- # 控制台:简要输出
- if stream_output:
- _print_assistant(text, tool_calls)
- elif isinstance(content, str) and content:
- tw.log_assistant(text=content)
- if stream_output:
- logger.info("\n%s", content)
- # --- Tool 消息(工具返回)---
- elif item.role == "tool":
- content = item.content
- if isinstance(content, dict):
- tool_name = content.get("tool_name", "unknown")
- result = content.get("result", "")
- error = content.get("error")
- # JSONL: 完整记录,不截断
- if error:
- tw.log_error(
- message=f"Tool {tool_name}: {error}",
- )
- else:
- tw.log_tool_result(
- tool_name=tool_name,
- result=result,
- call_id=item.tool_call_id or "",
- )
- # 控制台:简要标记
- if stream_output:
- _print_tool_result(tool_name)
- # 循环正常结束但未返回
- tw.log_complete(trace_id, "failed", "Agent 异常退出(循环结束未返回结果)")
- return {"trace_id": trace_id, "status": "failed", "error": "Agent 异常退出"}
- except KeyboardInterrupt:
- logger.info("用户中断")
- tw.log_complete(trace_id, "interrupted", "用户中断")
- if stream_output:
- logger.info("用户中断")
- return {"trace_id": trace_id, "status": "failed", "error": "用户中断"}
- except Exception as e:
- tb_str = tb_mod.format_exc()
- logger.error(f"Agent 执行异常: {e}", exc_info=True)
- tw.log_error(str(e), tb_str)
- tw.log_complete(trace_id, "failed", str(e))
- if stream_output:
- logger.error("执行失败: %s", e)
- return {"trace_id": trace_id, "status": "failed", "error": str(e)}
- finally:
- logging.getLogger().removeHandler(jsonl_handler)
- tw.close()
- async def main():
- try:
- result = await run_agent(query=None, demand_id=None, stream_output=True)
- if result["status"] == "completed":
- logger.info(f"[完成] trace_id={result['trace_id']}")
- else:
- logger.error(f"[失败] trace_id={result.get('trace_id')}, 错误: {result.get('error')}")
- sys.exit(1)
- except KeyboardInterrupt:
- logger.info("用户中断")
- except Exception as e:
- logger.error(f"执行失败: {e}", exc_info=True)
- sys.exit(1)
- if __name__ == "__main__":
- import asyncio
- asyncio.run(main())
|