""" 内容寻找 Agent - 核心执行逻辑 提供可复用的 agent 执行函数,供 run.py 和 server.py 调用。 """ import asyncio import logging import sys import os from pathlib import Path from typing import Optional, Dict, Any from utils.log_capture import build_log, log from datetime import datetime import uuid def _resolve_input_log_dir(content_finder_root: Path) -> Path: """与 .env 中 INPUT_LOG_PATH 一致:目录;相对路径相对 content_finder 根目录。""" raw = os.getenv("INPUT_LOG_PATH", ".cache/input_log") p = Path(raw).expanduser() if p.is_absolute(): return p if not p.suffix else p.parent return (content_finder_root / p).resolve() sys.path.insert(0, str(Path(__file__).parent.parent.parent)) from dotenv import load_dotenv load_dotenv() # 保证从仓库根目录运行时也能读到 content_finder 下的 .env(INPUT_LOG_PATH 等) load_dotenv(dotenv_path=Path(__file__).resolve().parent / ".env", override=True) from agent import ( AgentRunner, RunConfig, FileSystemTraceStore, Trace, Message, ) from agent.llm import create_openrouter_llm_call from agent.llm.prompts import SimplePrompt from agent.tools.builtin.knowledge import KnowledgeConfig # 导入工具(确保工具被注册) from tools import ( douyin_search, douyin_search_tikhub, douyin_user_videos, get_content_fans_portrait, get_account_fans_portrait, create_crawler_plan_by_douyin_content_id, create_crawler_plan_by_douyin_account_id, store_results_mysql, think_and_plan, find_authors_from_db, get_video_topic, ) logger = logging.getLogger(__name__) # 默认搜索词 DEFAULT_QUERY = "毛泽东,反腐倡廉" DEFAULT_DEMAND_ID = 1 def extract_assistant_text(message: Message) -> str: if message.role != "assistant": return "" content = message.content if isinstance(content, str): return content if isinstance(content, dict): text = content.get("text", "") # 即使本轮包含工具调用,也打印模型给出的文本,便于观察每一步输出 if text: return text return "" async def run_agent( query: Optional[str] = None, demand_id: Optional[int] = None, stream_output: bool = True, ) -> Dict[str, Any]: """ 执行 agent 任务 Args: query: 查询内容(搜索词),None 则使用默认值 demand_id: 本次搜索任务 id(int,关联 demand_content 表) stream_output: 是否流式输出到 stdout(run.py 需要,server.py 不需要) Returns: { "trace_id": "20260317_103046_xyz789", "status": "completed" | "failed", "error": "错误信息" # 失败时 } """ query = query or DEFAULT_QUERY demand_id = demand_id or DEFAULT_DEMAND_ID # 加载 prompt prompt_path = Path(__file__).parent / "content_finder.md" prompt = SimplePrompt(prompt_path) # output 目录(相对路径相对 content_finder) content_finder_root = Path(__file__).resolve().parent output_dir = os.getenv("OUTPUT_DIR", ".cache/output") output_dir_path = Path(output_dir).expanduser() if not output_dir_path.is_absolute(): output_dir_path = (content_finder_root / output_dir_path).resolve() # 构建消息(替换 %query%、%output_dir%、%demand_id%) demand_id_str = str(demand_id) if demand_id is not None else "" messages = prompt.build_messages( query=query, output_dir=str(output_dir_path), demand_id=demand_id_str ) # 初始化配置 api_key = os.getenv("OPEN_ROUTER_API_KEY") if not api_key: raise ValueError("OPEN_ROUTER_API_KEY 未设置") model_name = prompt.config.get("model", "sonnet-4.6") model = os.getenv("MODEL", f"anthropic/claude-{model_name}") temperature = float(prompt.config.get("temperature", 0.3)) max_iterations = int(os.getenv("MAX_ITERATIONS", "30")) trace_dir = os.getenv("TRACE_DIR", ".cache/traces") skills_dir = str(Path(__file__).parent / "skills") Path(trace_dir).mkdir(parents=True, exist_ok=True) store = FileSystemTraceStore(base_path=trace_dir) allowed_tools = [ "douyin_search", "douyin_search_tikhub", "douyin_user_videos", "get_content_fans_portrait", "get_account_fans_portrait", "find_authors_from_db", "store_results_mysql", "create_crawler_plan_by_douyin_content_id", "create_crawler_plan_by_douyin_account_id", "think_and_plan", "get_video_topic", ] runner = AgentRunner( llm_call=create_openrouter_llm_call(model=model), trace_store=store, skills_dir=skills_dir, ) config = RunConfig( name="内容寻找", model=model, temperature=temperature, enable_research_flow = False, goal_compression = "none", force_side_branch = None, max_iterations=max_iterations, tools=allowed_tools, extra_llm_params={"max_tokens": 8192}, knowledge=KnowledgeConfig( enable_extraction=False, enable_completion_extraction=False, enable_injection=False, # owner="content_finder_agent", # default_tags={"project": "content_finder"}, # default_scopes=["com.piaoquantv.supply"], # default_search_types=["tool", "usecase", "definition"], # default_search_owner="content_finder_agent" ) ) # 执行 trace_id = None execution_id = str(uuid.uuid4()) try: log_dir = _resolve_input_log_dir(content_finder_root) log_dir.mkdir(parents=True, exist_ok=True) log_file_path = log_dir / f"run_log_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt" run_result: Optional[Dict[str, Any]] = None with build_log(execution_id) as log_buffer: async for item in runner.run(messages=messages, config=config): if isinstance(item, Trace): trace_id = item.trace_id if item.status == "completed": logger.info(f"Agent 执行完成: trace_id={trace_id}") run_result = { "trace_id": trace_id, "status": "completed", } break if item.status == "failed": logger.error(f"Agent 执行失败: {item.error_message}") run_result = { "trace_id": trace_id, "status": "failed", "error": item.error_message, } break elif isinstance(item, Message) and stream_output: text = extract_assistant_text(item) if text: log(f"[assistant] {text}") if run_result is None: run_result = { "trace_id": trace_id, "status": "failed", "error": "Agent 异常退出", } full_log = log_buffer.getvalue() with open(log_file_path, "w", encoding="utf-8") as f: f.write(full_log) return run_result except KeyboardInterrupt: logger.info("用户中断") if stream_output: print("\n用户中断") return { "trace_id": trace_id, "status": "failed", "error": "用户中断" } except Exception as e: logger.error(f"Agent 执行异常: {e}", exc_info=True) if stream_output: print(f"\n执行失败: {e}") return { "trace_id": trace_id, "status": "failed", "error": str(e) }