| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247 |
- """
- 内容寻找 Agent - 核心执行逻辑
- 提供可复用的 agent 执行函数,供 run.py 和 server.py 调用。
- """
- import asyncio
- import logging
- import sys
- import os
- from pathlib import Path
- from typing import Optional, Dict, Any
- from utils.log_capture import build_log, log
- from datetime import datetime
- import uuid
- def _resolve_input_log_dir(content_finder_root: Path) -> Path:
- """与 .env 中 INPUT_LOG_PATH 一致:目录;相对路径相对 content_finder 根目录。"""
- raw = os.getenv("INPUT_LOG_PATH", ".cache/input_log")
- p = Path(raw).expanduser()
- if p.is_absolute():
- return p if not p.suffix else p.parent
- return (content_finder_root / p).resolve()
- sys.path.insert(0, str(Path(__file__).parent.parent.parent))
- from dotenv import load_dotenv
- load_dotenv()
- # 保证从仓库根目录运行时也能读到 content_finder 下的 .env(INPUT_LOG_PATH 等)
- load_dotenv(dotenv_path=Path(__file__).resolve().parent / ".env", override=True)
- from agent import (
- AgentRunner,
- RunConfig,
- FileSystemTraceStore,
- Trace,
- Message,
- )
- from agent.llm import create_openrouter_llm_call
- from agent.llm.prompts import SimplePrompt
- from agent.tools.builtin.knowledge import KnowledgeConfig
- # 导入工具(确保工具被注册)
- from tools import (
- douyin_search,
- douyin_search_fallback,
- douyin_user_videos,
- get_content_fans_portrait,
- get_account_fans_portrait,
- create_crawler_plan_by_douyin_content_id,
- create_crawler_plan_by_douyin_account_id,
- store_results_mysql,
- think_and_plan,
- find_authors_from_db,
- get_video_topic,
- )
- logger = logging.getLogger(__name__)
- # 默认搜索词
- DEFAULT_QUERY = "毛泽东,反腐倡廉"
- DEFAULT_DEMAND_ID = 1
- def extract_assistant_text(message: Message) -> str:
- if message.role != "assistant":
- return ""
- content = message.content
- if isinstance(content, str):
- return content
- if isinstance(content, dict):
- text = content.get("text", "")
- # 即使本轮包含工具调用,也打印模型给出的文本,便于观察每一步输出
- if text:
- return text
- return ""
- async def run_agent(
- query: Optional[str] = None,
- demand_id: Optional[int] = None,
- stream_output: bool = True,
- ) -> Dict[str, Any]:
- """
- 执行 agent 任务
- Args:
- query: 查询内容(搜索词),None 则使用默认值
- demand_id: 本次搜索任务 id(int,关联 demand_content 表)
- stream_output: 是否流式输出到 stdout(run.py 需要,server.py 不需要)
- Returns:
- {
- "trace_id": "20260317_103046_xyz789",
- "status": "completed" | "failed",
- "error": "错误信息" # 失败时
- }
- """
- query = query or DEFAULT_QUERY
- demand_id = demand_id or DEFAULT_DEMAND_ID
- # 加载 prompt
- prompt_path = Path(__file__).parent / "content_finder.md"
- prompt = SimplePrompt(prompt_path)
- # output 目录(相对路径相对 content_finder)
- content_finder_root = Path(__file__).resolve().parent
- output_dir = os.getenv("OUTPUT_DIR", ".cache/output")
- output_dir_path = Path(output_dir).expanduser()
- if not output_dir_path.is_absolute():
- output_dir_path = (content_finder_root / output_dir_path).resolve()
- # 构建消息(替换 %query%、%output_dir%、%demand_id%)
- demand_id_str = str(demand_id) if demand_id is not None else ""
- messages = prompt.build_messages(
- query=query, output_dir=str(output_dir_path), demand_id=demand_id_str
- )
- # 初始化配置
- api_key = os.getenv("OPEN_ROUTER_API_KEY")
- if not api_key:
- raise ValueError("OPEN_ROUTER_API_KEY 未设置")
- model_name = prompt.config.get("model", "sonnet-4.6")
- model = os.getenv("MODEL", f"anthropic/claude-{model_name}")
- temperature = float(prompt.config.get("temperature", 0.3))
- max_iterations = int(os.getenv("MAX_ITERATIONS", "30"))
- trace_dir = os.getenv("TRACE_DIR", ".cache/traces")
-
- skills_dir = str(Path(__file__).parent / "skills")
- Path(trace_dir).mkdir(parents=True, exist_ok=True)
- store = FileSystemTraceStore(base_path=trace_dir)
- allowed_tools = [
- "douyin_search",
- "douyin_search_fallback",
- "douyin_user_videos",
- "get_content_fans_portrait",
- "get_account_fans_portrait",
- "find_authors_from_db",
- "store_results_mysql",
- "create_crawler_plan_by_douyin_content_id",
- "create_crawler_plan_by_douyin_account_id",
- "think_and_plan",
- "get_video_topic",
- ]
- runner = AgentRunner(
- llm_call=create_openrouter_llm_call(model=model),
- trace_store=store,
- skills_dir=skills_dir,
- )
- config = RunConfig(
- name="内容寻找",
- model=model,
- temperature=temperature,
- enable_research_flow = False,
- goal_compression = "none",
- force_side_branch = None,
- max_iterations=max_iterations,
- tools=allowed_tools,
- extra_llm_params={"max_tokens": 8192},
- knowledge=KnowledgeConfig(
- enable_extraction=False,
- enable_completion_extraction=False,
- enable_injection=False,
- # owner="content_finder_agent",
- # default_tags={"project": "content_finder"},
- # default_scopes=["com.piaoquantv.supply"],
- # default_search_types=["tool", "usecase", "definition"],
- # default_search_owner="content_finder_agent"
- )
- )
- # 执行
- trace_id = None
- execution_id = str(uuid.uuid4())
- try:
- log_dir = _resolve_input_log_dir(content_finder_root)
- log_dir.mkdir(parents=True, exist_ok=True)
- log_file_path = log_dir / f"run_log_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
- run_result: Optional[Dict[str, Any]] = None
- with build_log(execution_id) as log_buffer:
- async for item in runner.run(messages=messages, config=config):
- if isinstance(item, Trace):
- trace_id = item.trace_id
- if item.status == "completed":
- logger.info(f"Agent 执行完成: trace_id={trace_id}")
- run_result = {
- "trace_id": trace_id,
- "status": "completed",
- }
- break
- if item.status == "failed":
- logger.error(f"Agent 执行失败: {item.error_message}")
- run_result = {
- "trace_id": trace_id,
- "status": "failed",
- "error": item.error_message,
- }
- break
- elif isinstance(item, Message) and stream_output:
- text = extract_assistant_text(item)
- if text:
- log(f"[assistant] {text}")
- if run_result is None:
- run_result = {
- "trace_id": trace_id,
- "status": "failed",
- "error": "Agent 异常退出",
- }
- full_log = log_buffer.getvalue()
- with open(log_file_path, "w", encoding="utf-8") as f:
- f.write(full_log)
- return run_result
- except KeyboardInterrupt:
- logger.info("用户中断")
- if stream_output:
- print("\n用户中断")
- return {
- "trace_id": trace_id,
- "status": "failed",
- "error": "用户中断"
- }
- except Exception as e:
- logger.error(f"Agent 执行异常: {e}", exc_info=True)
- if stream_output:
- print(f"\n执行失败: {e}")
- return {
- "trace_id": trace_id,
- "status": "failed",
- "error": str(e)
- }
|