""" 内容寻找 Agent 执行入口 日志设计: - agent.log: JSONL 格式的完整执行追踪(无截断),用于 HTML 可视化 - 控制台: 人类可读的简要输出 """ import logging import sys import os import json import traceback as tb_mod from typing import Dict, Any, Optional, List from pathlib import Path from datetime import datetime PROJECT_ROOT = Path(__file__).resolve().parent log_dir = PROJECT_ROOT / '.cache' log_dir.mkdir(exist_ok=True) TRACE_LOG_PATH = log_dir / 'agent.log' # ============================================================ # TraceWriter: 结构化 JSONL 追踪日志(写入 agent.log,无截断) # ============================================================ class TraceWriter: """将 Agent 执行的每一步写为 JSONL 事件到 agent.log,不做任何截断。""" def __init__(self, path: Path): self._file = open(path, 'w', encoding='utf-8') self._iteration = 0 def _ts(self) -> str: return datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] def _write(self, event: dict): event["ts"] = self._ts() self._file.write(json.dumps(event, ensure_ascii=False) + '\n') self._file.flush() # --- 任务生命周期 --- def log_init(self, query: str, demand_id: int, model: str, trace_id: str = None): self._write({ "type": "init", "query": query, "demand_id": demand_id, "model": model, "trace_id": trace_id, }) def log_complete(self, trace_id: str, status: str, error: str = None): self._write({ "type": "complete", "trace_id": trace_id, "status": status, "error": error, "total_iterations": self._iteration, }) # --- 框架日志 --- def log_framework(self, logger_name: str, level: str, message: str): self._write({ "type": "framework", "logger": logger_name, "level": level, "msg": message, }) # --- Agent 思考 / LLM 输出 --- def log_assistant(self, text: str, tool_calls: List[dict] = None, reasoning: str = None, tokens: dict = None): """记录 LLM 的完整输出(思考文本 + 工具调用,不截断)。""" self._iteration += 1 parsed_calls = [] for tc in (tool_calls or []): func = tc.get("function", {}) name = func.get("name", "unknown") args_str = func.get("arguments", "{}") try: params = json.loads(args_str) except (json.JSONDecodeError, TypeError): params = args_str parsed_calls.append({ "name": name, "params": params, "call_id": tc.get("id", ""), }) self._write({ "type": "assistant", "iteration": self._iteration, "text": text or "", "tool_calls": parsed_calls, "reasoning": reasoning or "", "tokens": tokens or {}, }) # --- 工具结果 --- def log_tool_result(self, tool_name: str, result: Any, call_id: str = ""): """记录工具的完整返回(不截断)。""" if isinstance(result, list): texts = [ p.get("text", "") for p in result if isinstance(p, dict) and p.get("type") == "text" ] output = "\n".join(texts) if texts else json.dumps(result, ensure_ascii=False) elif isinstance(result, str): output = result else: output = str(result) self._write({ "type": "tool_result", "name": tool_name, "call_id": call_id, "output": output, }) # --- 错误 --- def log_error(self, message: str, traceback_str: str = ""): self._write({ "type": "error", "msg": message, "traceback": traceback_str, }) def close(self): if self._file and not self._file.closed: self._file.close() class JsonlLogHandler(logging.Handler): """将 Python logging 记录路由到 TraceWriter(JSONL 格式)。""" def __init__(self, trace_writer: TraceWriter): super().__init__() self.trace_writer = trace_writer def emit(self, record: logging.LogRecord): try: self.trace_writer.log_framework( record.name, record.levelname, record.getMessage(), ) except Exception: pass # ============================================================ # 控制台日志(人类可读) # ============================================================ console_handler = logging.StreamHandler(sys.stdout) console_handler.setFormatter( logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') ) logging.basicConfig(level=logging.INFO, handlers=[console_handler], force=True) logger = logging.getLogger(__name__) # ============================================================ # 第三方 / 项目依赖导入(放在 logging 配置之后) # ============================================================ from dotenv import load_dotenv load_dotenv() from tools import fetch_account_article_list, fetch_weixin_account, weixin_search from agent import AgentRunner, RunConfig, FileSystemTraceStore, Trace, Message from agent.llm import create_openrouter_llm_call from agent.llm.prompts import SimplePrompt from agent.tools.builtin.knowledge import KnowledgeConfig DEFAULT_QUERY = "伊朗、以色列、和平是永恒的主题" DEFAULT_DEMAND_ID = 1 # ============================================================ # 工具函数 # ============================================================ def _normalize_ascii_double_quotes(text: str) -> str: """将字符串中的 ASCII 双引号 `"` 规范化为中文双引号 `"`、`"`。""" if '"' not in text: return text chars: list[str] = [] open_quote = True for ch in text: if ch == '"': chars.append("\u201c" if open_quote else "\u201d") open_quote = not open_quote else: chars.append(ch) return "".join(chars) def _sanitize_json_strings(value: Any) -> Any: if isinstance(value, str): return _normalize_ascii_double_quotes(value) if isinstance(value, list): return [_sanitize_json_strings(v) for v in value] if isinstance(value, dict): return {k: _sanitize_json_strings(v) for k, v in value.items()} return value def _sanitize_output_json(output_json_path: Path) -> None: if not output_json_path.exists(): logger.warning(f"未找到 output.json,跳过清洗: {output_json_path}") return try: data = json.loads(output_json_path.read_text(encoding="utf-8")) except Exception as e: logger.warning(f"output.json 解析失败,跳过清洗: {e}") return cleaned = _sanitize_json_strings(data) output_json_path.write_text( json.dumps(cleaned, ensure_ascii=False, indent=2), encoding="utf-8" ) logger.info(f"已完成 output.json 引号清洗: {output_json_path}") # ============================================================ # 控制台流式输出(简洁版) # ============================================================ def _print_assistant(text: str, tool_calls: list): """向控制台打印 Agent 输出的摘要。""" if text: logger.info("\n%s", text) for tc in (tool_calls or []): name = tc.get("function", {}).get("name", "unknown") if name not in ("goal", "get_current_context"): logger.info("[工具] %s", name) def _print_tool_result(tool_name: str): """向控制台打印工具结果标记。""" if tool_name not in ("goal", "get_current_context"): logger.info("[结果] %s ✓", tool_name) # ============================================================ # Agent 执行 # ============================================================ async def run_agent( query: Optional[str] = None, demand_id: Optional[int] = None, stream_output: bool = True, ) -> Dict[str, Any]: query = query or DEFAULT_QUERY demand_id = demand_id or DEFAULT_DEMAND_ID # 创建 TraceWriter → agent.log(JSONL,完整无截断) tw = TraceWriter(TRACE_LOG_PATH) # 将 Python logging 也路由到 JSONL jsonl_handler = JsonlLogHandler(tw) jsonl_handler.setLevel(logging.DEBUG) logging.getLogger().addHandler(jsonl_handler) prompt_path = PROJECT_ROOT / "content_finder.md" prompt = SimplePrompt(prompt_path) output_dir = str(PROJECT_ROOT / "output") demand_id_str = str(demand_id) if demand_id is not None else "" messages = prompt.build_messages(query=query, output_dir=output_dir, demand_id=demand_id_str) api_key = os.getenv("OPEN_ROUTER_API_KEY") if not api_key: raise ValueError("OPEN_ROUTER_API_KEY 未设置") model_name = prompt.config.get("model", "sonnet-4.6") model = os.getenv("MODEL", f"anthropic/claude-{model_name}") temperature = float(prompt.config.get("temperature", 0.3)) max_iterations = 30 trace_dir = str(PROJECT_ROOT / "traces") skills_dir = str(PROJECT_ROOT / "skills") Path(trace_dir).mkdir(parents=True, exist_ok=True) store = FileSystemTraceStore(base_path=trace_dir) allowed_tools = [ "weixin_search", "fetch_weixin_account", "fetch_account_article_list", "fetch_article_detail", ] runner = AgentRunner( llm_call=create_openrouter_llm_call(model=model), trace_store=store, skills_dir=skills_dir, ) config = RunConfig( name="内容寻找", model=model, temperature=temperature, max_iterations=max_iterations, tools=allowed_tools, extra_llm_params={"max_tokens": 8192}, knowledge=KnowledgeConfig( enable_extraction=False, enable_completion_extraction=False, enable_injection=False, ) ) tw.log_init(query, demand_id, model) trace_id = None try: async for item in runner.run(messages=messages, config=config): # ---------- Trace 对象 ---------- if isinstance(item, Trace): trace_id = item.trace_id tw._write({"type": "trace_status", "trace_id": trace_id, "status": item.status}) if item.status == "completed": if trace_id: output_json_path = Path(output_dir) / trace_id / "output.json" _sanitize_output_json(output_json_path) tw.log_complete(trace_id, "completed") logger.info(f"Agent 执行完成: trace_id={trace_id}") return {"trace_id": trace_id, "status": "completed"} elif item.status == "failed": tw.log_complete(trace_id, "failed", item.error_message) logger.error(f"Agent 执行失败: {item.error_message}") return {"trace_id": trace_id, "status": "failed", "error": item.error_message} # ---------- Message 对象 ---------- elif isinstance(item, Message): # --- Assistant 消息(思考 + 工具调用)--- if item.role == "assistant": content = item.content if isinstance(content, dict): text = content.get("text", "") tool_calls = content.get("tool_calls", []) reasoning = content.get("reasoning_content", "") # JSONL: 完整记录,不截断 tw.log_assistant( text=text, tool_calls=tool_calls, reasoning=reasoning, tokens={ "prompt": getattr(item, "prompt_tokens", None), "completion": getattr(item, "completion_tokens", None), }, ) # 控制台:简要输出 if stream_output: _print_assistant(text, tool_calls) elif isinstance(content, str) and content: tw.log_assistant(text=content) if stream_output: logger.info("\n%s", content) # --- Tool 消息(工具返回)--- elif item.role == "tool": content = item.content if isinstance(content, dict): tool_name = content.get("tool_name", "unknown") result = content.get("result", "") error = content.get("error") # JSONL: 完整记录,不截断 if error: tw.log_error( message=f"Tool {tool_name}: {error}", ) else: tw.log_tool_result( tool_name=tool_name, result=result, call_id=item.tool_call_id or "", ) # 控制台:简要标记 if stream_output: _print_tool_result(tool_name) # 循环正常结束但未返回 tw.log_complete(trace_id, "failed", "Agent 异常退出(循环结束未返回结果)") return {"trace_id": trace_id, "status": "failed", "error": "Agent 异常退出"} except KeyboardInterrupt: logger.info("用户中断") tw.log_complete(trace_id, "interrupted", "用户中断") if stream_output: logger.info("用户中断") return {"trace_id": trace_id, "status": "failed", "error": "用户中断"} except Exception as e: tb_str = tb_mod.format_exc() logger.error(f"Agent 执行异常: {e}", exc_info=True) tw.log_error(str(e), tb_str) tw.log_complete(trace_id, "failed", str(e)) if stream_output: logger.error("执行失败: %s", e) return {"trace_id": trace_id, "status": "failed", "error": str(e)} finally: logging.getLogger().removeHandler(jsonl_handler) tw.close() async def main(): try: result = await run_agent(query=None, demand_id=None, stream_output=True) if result["status"] == "completed": logger.info(f"[完成] trace_id={result['trace_id']}") else: logger.error(f"[失败] trace_id={result.get('trace_id')}, 错误: {result.get('error')}") sys.exit(1) except KeyboardInterrupt: logger.info("用户中断") except Exception as e: logger.error(f"执行失败: {e}", exc_info=True) sys.exit(1) if __name__ == "__main__": import asyncio asyncio.run(main())