| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207 |
- """
- 内容寻找 Agent - 核心执行逻辑
- 提供可复用的 agent 执行函数,供 run.py 和 server.py 调用。
- """
- import asyncio
- import logging
- import sys
- import os
- from pathlib import Path
- from typing import Optional, Dict, Any
- sys.path.insert(0, str(Path(__file__).parent.parent.parent))
- from dotenv import load_dotenv
- load_dotenv()
- from agent import (
- AgentRunner,
- RunConfig,
- FileSystemTraceStore,
- Trace,
- Message,
- )
- from agent.llm import create_openrouter_llm_call
- from agent.llm.prompts import SimplePrompt
- from agent.tools.builtin.knowledge import KnowledgeConfig
- # 导入工具(确保工具被注册)
- from tools import (
- douyin_search,
- douyin_user_videos,
- get_content_fans_portrait,
- get_account_fans_portrait,
- )
- logger = logging.getLogger(__name__)
- # 默认 query
- DEFAULT_QUERY = """找10个和"毛主席"相关的,老年人感兴趣的视频。
- 要求:
- - 适合老年人分享观看
- - 热度要高,质量要好"""
- async def run_agent(query: Optional[str] = None, stream_output: bool = True) -> Dict[str, Any]:
- """
- 执行 agent 任务
- Args:
- query: 查询内容,None 则使用默认值
- stream_output: 是否流式输出到 stdout(run.py 需要,server.py 不需要)
- Returns:
- {
- "trace_id": "20260317_103046_xyz789",
- "status": "completed" | "failed",
- "error": "错误信息" # 失败时
- }
- """
- query = query or DEFAULT_QUERY
- # 加载 prompt
- prompt_path = Path(__file__).parent / "content_finder.prompt"
- prompt = SimplePrompt(prompt_path)
- # output 目录
- trace_dir = os.getenv("TRACE_DIR", ".cache/traces")
- # 构建消息(替换 %query% 和 %trace_dir%)
- messages = prompt.build_messages(query=query, trace_dir=trace_dir)
- # 初始化配置
- api_key = os.getenv("OPEN_ROUTER_API_KEY")
- if not api_key:
- raise ValueError("OPEN_ROUTER_API_KEY 未设置")
- model_name = prompt.config.get("model", "sonnet-4.6")
- model = os.getenv("MODEL", f"anthropic/claude-{model_name}")
- temperature = float(prompt.config.get("temperature", 0.3))
- max_iterations = int(os.getenv("MAX_ITERATIONS", "30"))
- trace_dir = os.getenv("TRACE_DIR", ".cache/traces")
- output_dir = os.getenv("OUTPUT_DIR", ".cache/output")
- skills_dir = str(Path(__file__).parent / "skills")
- Path(trace_dir).mkdir(parents=True, exist_ok=True)
- store = FileSystemTraceStore(base_path=trace_dir)
- allowed_tools = [
- "douyin_search",
- "douyin_user_videos",
- "get_content_fans_portrait",
- "get_account_fans_portrait",
- ]
- runner = AgentRunner(
- llm_call=create_openrouter_llm_call(model=model),
- trace_store=store,
- skills_dir=skills_dir,
- )
- config = RunConfig(
- name="内容寻找",
- model=model,
- temperature=temperature,
- max_iterations=max_iterations,
- tools=allowed_tools,
- extra_llm_params={"max_tokens": 8192},
- knowledge=KnowledgeConfig(
- enable_extraction=True,
- enable_completion_extraction=True,
- enable_injection=True,
- owner="content_finder_agent",
- default_tags={"project": "content_finder"},
- default_scopes=["com.piaoquantv.supply"],
- default_search_types=["tool", "usecase", "definition"],
- default_search_owner="content_finder_agent"
- )
- )
- # 执行
- trace_id = None
- try:
- async for item in runner.run(messages=messages, config=config):
- if isinstance(item, Trace):
- trace_id = item.trace_id
- if item.status == "completed":
- logger.info(f"Agent 执行完成: trace_id={trace_id}")
- logger.info(f"结果------: {item}")
- return {
- "trace_id": trace_id,
- "status": "completed"
- }
- elif item.status == "failed":
- logger.error(f"Agent 执行失败: {item.error_message}")
- return {
- "trace_id": trace_id,
- "status": "failed",
- "error": item.error_message
- }
- elif isinstance(item, Message) and stream_output:
- # 流式输出(仅 run.py 需要)
- if item.role == "assistant":
- content = item.content
- if isinstance(content, dict):
- text = content.get("text", "")
- tool_calls = content.get("tool_calls", [])
- if text:
- # 如果有推荐结果,完整输出
- if len(text) > 500 and ("推荐结果" in text or "推荐内容" in text or "🎯" in text):
- print(f"\n{text}")
- # 如果有工具调用且文本较短,只输出摘要
- elif tool_calls and len(text) > 100:
- print(f"[思考] {text[:100]}...")
- # 其他情况输出完整文本
- else:
- print(f"\n{text}")
- # 输出工具调用信息
- if tool_calls:
- for tc in tool_calls:
- tool_name = tc.get("function", {}).get("name", "unknown")
- # 跳过 goal 工具的输出,减少噪音
- if tool_name != "goal":
- print(f"[工具] {tool_name}")
- elif isinstance(content, str) and content:
- print(f"\n{content}")
- elif item.role == "tool":
- content = item.content
- if isinstance(content, dict):
- tool_name = content.get("tool_name", "unknown")
- print(f"[结果] {tool_name} ✓")
- # 如果循环结束但没有返回,说明异常退出
- return {
- "trace_id": trace_id,
- "status": "failed",
- "error": "Agent 异常退出"
- }
- except KeyboardInterrupt:
- logger.info("用户中断")
- if stream_output:
- print("\n用户中断")
- return {
- "trace_id": trace_id,
- "status": "failed",
- "error": "用户中断"
- }
- except Exception as e:
- logger.error(f"Agent 执行异常: {e}", exc_info=True)
- if stream_output:
- print(f"\n执行失败: {e}")
- return {
- "trace_id": trace_id,
- "status": "failed",
- "error": str(e)
- }
|