| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274 |
- """
- 内容寻找 Agent - MVP 版本
- 功能:
- - 根据用户需求智能搜索抖音视频内容
- - 基于画像数据筛选符合老年人群体的内容
- - 深度挖掘优质作者的其他作品
- 使用示例:
- python run.py
- """
- import asyncio
- import logging
- import sys
- import os
- from pathlib import Path
- from typing import Optional, Dict, Any
- from datetime import datetime
- # 添加项目根目录到 Python 路径
- sys.path.insert(0, str(Path(__file__).parent.parent.parent))
- from dotenv import load_dotenv
- load_dotenv()
- from agent import (
- AgentRunner,
- RunConfig,
- FileSystemTraceStore,
- Trace,
- Message,
- )
- from agent.llm import create_openrouter_llm_call
- # 导入工具(确保工具被注册)
- from tools import (
- douyin_search,
- douyin_user_videos,
- get_video_audience_profile,
- get_user_fans_profile,
- )
- # 配置日志
- log_dir = Path(__file__).parent / '.cache'
- log_dir.mkdir(exist_ok=True)
- logging.basicConfig(
- level=logging.INFO,
- format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
- handlers=[
- logging.FileHandler(log_dir / 'agent.log'),
- logging.StreamHandler()
- ]
- )
- logger = logging.getLogger(__name__)
- # ===== 配置管理 =====
- class ContentFinderConfig:
- """内容寻找 Agent 配置"""
- def __init__(self):
- # LLM 配置
- self.model = os.getenv("MODEL", "anthropic/claude-sonnet-4.5")
- self.api_key = os.getenv("OPEN_ROUTER_API_KEY")
- self.temperature = float(os.getenv("TEMPERATURE", "0.3"))
- self.max_iterations = int(os.getenv("MAX_ITERATIONS", "30"))
- # 存储配置
- self.trace_dir = os.getenv("TRACE_DIR", ".cache/traces")
- self.output_dir = os.getenv("OUTPUT_DIR", ".cache/output")
- # Skills 配置 - 使用绝对路径
- default_skills_dir = str(Path(__file__).parent / "skills")
- self.skills_dir = os.getenv("SKILLS_DIR", default_skills_dir)
- # 验证配置
- self._validate()
- def _validate(self):
- """验证配置"""
- if not self.api_key:
- raise ValueError("OPEN_ROUTER_API_KEY 未设置,请在 .env 文件中配置")
- # 创建必要的目录
- Path(self.trace_dir).mkdir(parents=True, exist_ok=True)
- Path(self.output_dir).mkdir(parents=True, exist_ok=True)
- Path('.cache').mkdir(exist_ok=True)
- # ===== Agent 封装 =====
- class ContentFinderAgent:
- """内容寻找 Agent"""
- def __init__(self, config: ContentFinderConfig):
- self.config = config
- # 初始化 Trace Store
- self.trace_store = FileSystemTraceStore(base_path=config.trace_dir)
- # 初始化 LLM
- self.llm_call = create_openrouter_llm_call(model=config.model)
- # 初始化 Runner
- self.runner = AgentRunner(
- llm_call=self.llm_call,
- trace_store=self.trace_store,
- skills_dir=config.skills_dir,
- )
- logger.info(f"ContentFinderAgent 初始化完成")
- logger.info(f" 模型: {config.model}")
- logger.info(f" Skills 目录: {config.skills_dir}")
- async def find_content(
- self,
- user_request: str,
- trace_id: Optional[str] = None,
- ) -> Dict[str, Any]:
- """
- 根据用户需求寻找内容
- Args:
- user_request: 用户需求描述
- trace_id: 可选的 trace_id,用于恢复之前的会话
- Returns:
- 包含执行结果的字典
- """
- logger.info(f"开始处理用户需求: {user_request}")
- # 构建系统提示
- system_prompt = """你是一个专业的内容寻找助手,擅长根据用户需求在抖音平台上寻找合适的视频内容。
- ⚠️ 重要提示:当前所有工具返回的都是模拟数据,仅用于测试和演示。
- - 在分析和推荐时,请明确说明这是基于模拟数据的演示
- - 不要给出过于自信的评分和推荐
- - 提醒用户实际使用时需要对接真实API
- 你的工作流程:
- 1. 理解用户需求,提取关键词和目标受众特征
- 2. 使用 douyin_search 工具搜索相关内容(返回模拟数据)
- 3. 使用画像工具分析内容是否符合目标受众(返回模拟数据)
- 4. 对于优质内容,使用 douyin_user_videos 获取作者的其他作品(返回模拟数据)
- 5. 综合评估并推荐最合适的内容,但要说明这是基于模拟数据的演示
- 重点关注:
- - 内容是否符合老年人群体的偏好(年龄分布、偏好度)
- - 内容热度(点赞量、评论量、分享量)
- - 互动率(评论率、分享率)
- - 时效性(发布时间)
- - 作者的内容质量和稳定性
- 请按照 content_finding_strategy 和 content_filtering_strategy 中的策略执行。"""
- # 构建配置
- config = RunConfig(
- model=self.config.model,
- temperature=self.config.temperature,
- max_iterations=self.config.max_iterations,
- agent_type="content_finder",
- trace_id=trace_id,
- )
- # 构建消息
- messages = [
- {"role": "system", "content": system_prompt},
- {"role": "user", "content": user_request},
- ]
- # 执行
- result = {
- "trace_id": None,
- "status": "unknown",
- "messages_count": 0,
- "response": "",
- "error": None,
- }
- try:
- last_assistant_message = None
- message_count = 0
- async for item in self.runner.run(messages=messages, config=config):
- if isinstance(item, Trace):
- result["trace_id"] = item.trace_id
- result["status"] = item.status
- logger.info(f"Trace 状态: {item.status}")
- elif isinstance(item, Message):
- message_count += 1
- if item.role == "assistant":
- last_assistant_message = item
- # 实时输出助手响应
- content = item.content
- if isinstance(content, str):
- print(f"\n{content}")
- elif isinstance(content, dict):
- text = content.get("text", "")
- if text:
- print(f"\n{text}")
- result["messages_count"] = message_count
- # 提取最后的助手响应
- if last_assistant_message:
- content = last_assistant_message.content
- if isinstance(content, str):
- result["response"] = content
- elif isinstance(content, dict):
- result["response"] = content.get("text", "")
- logger.info(f"执行完成,共 {message_count} 条消息")
- except Exception as e:
- logger.error(f"执行失败: {e}", exc_info=True)
- result["error"] = str(e)
- result["status"] = "failed"
- return result
- # ===== 主函数 =====
- async def main():
- """主函数"""
- try:
- # 加载配置
- config = ContentFinderConfig()
- # 创建 Agent
- agent = ContentFinderAgent(config)
- # 用户需求
- user_request = """
- 孩子军抗日,让人感动。找这样的视频。
- 要求:
- - 内容要感人,有情感共鸣
- - 适合老年人观看
- - 热度要高,质量要好
- """
- print("\n" + "="*60)
- print("内容寻找 Agent - MVP 版本")
- print("="*60)
- print(f"\n用户需求:\n{user_request}")
- print("\n" + "="*60)
- print("开始执行...\n")
- # 执行任务
- result = await agent.find_content(user_request)
- # 输出结果摘要
- print("\n" + "="*60)
- print("执行结果摘要")
- print("="*60)
- print(f"Trace ID: {result.get('trace_id')}")
- print(f"状态: {result.get('status')}")
- print(f"消息数: {result.get('messages_count')}")
- if result.get('error'):
- print(f"错误: {result.get('error')}")
- print("="*60 + "\n")
- except Exception as e:
- logger.error(f"程序执行失败: {e}", exc_info=True)
- sys.exit(1)
- if __name__ == "__main__":
- asyncio.run(main())
|