| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267 |
- """
- 内容寻找 AI Agent
- 核心功能:
- 1. 根据需求自主从全网寻找相关视频内容
- 2. 支持运营人工交互,调整记忆相关数据
- 3. 集成现有爬虫能力(抖音、快手等)
- 4. 通过运营交互和内容表现不断优化寻找路径
- """
- import asyncio
- import logging
- from dataclasses import dataclass
- from typing import List, Dict, Any, Optional
- from datetime import datetime
- from agent.core.runner import AgentRunner, RunConfig
- from agent.trace.protocols import TraceStore
- from agent.memory.protocols import MemoryStore
- logger = logging.getLogger(__name__)
- @dataclass
- class SearchRequest:
- """搜索请求"""
- keywords: List[str] # 关键词列表
- tags: List[str] # 标签列表
- platforms: List[str] # 目标平台(douyin, kuaishou等)
- filters: Dict[str, Any] # 筛选条件(播放量、时间范围等)
- max_results: int = 50 # 最大结果数
- @dataclass
- class ContentItem:
- """内容项"""
- content_id: str # 内容唯一ID
- platform: str # 平台名称
- title: str # 标题
- author: str # 作者
- url: str # 链接
- cover_url: str # 封面
- description: str # 描述
- stats: Dict[str, int] # 统计数据(播放、点赞等)
- tags: List[str] # 标签
- publish_time: datetime # 发布时间
- crawl_time: datetime # 抓取时间
- @dataclass
- class OperatorFeedback:
- """运营反馈"""
- content_id: str # 内容ID
- rating: str # 评级(excellent/good/poor)
- notes: str # 备注
- operator_id: str # 运营人员ID
- feedback_time: datetime # 反馈时间
- @dataclass
- class ContentPerformance:
- """内容表现数据"""
- content_id: str # 内容ID
- platform_views: int # 平台播放量
- platform_likes: int # 平台点赞数
- platform_shares: int # 平台分享数
- internal_views: int # 内部平台播放量
- internal_engagement: float # 内部互动率
- conversion_rate: float # 转化率
- update_time: datetime # 更新时间
- class ContentFinderAgent:
- """内容寻找 Agent"""
- def __init__(
- self,
- runner: AgentRunner,
- trace_store: TraceStore,
- memory_store: Optional[MemoryStore] = None,
- ):
- self.runner = runner
- self.trace_store = trace_store
- self.memory_store = memory_store
- async def search_content(
- self,
- request: SearchRequest,
- config: Optional[RunConfig] = None,
- ) -> List[ContentItem]:
- """
- 搜索内容
- Args:
- request: 搜索请求
- config: 运行配置
- Returns:
- 内容列表
- """
- if config is None:
- config = RunConfig(
- model="gpt-4o",
- agent_type="content_finder",
- skills=["content-finder"],
- )
- # 构建搜索提示词
- prompt = self._build_search_prompt(request)
- # 运行Agent
- results = []
- async for item in self.runner.run(
- messages=[{"role": "user", "content": prompt}],
- config=config,
- ):
- # 处理返回的内容项
- if isinstance(item, dict) and "content_items" in item:
- results.extend(item["content_items"])
- return results
- def _build_search_prompt(self, request: SearchRequest) -> str:
- """构建搜索提示词"""
- prompt_parts = [
- f"请帮我从以下平台搜索视频内容:{', '.join(request.platforms)}",
- f"\n关键词:{', '.join(request.keywords)}",
- ]
- if request.tags:
- prompt_parts.append(f"标签:{', '.join(request.tags)}")
- if request.filters:
- filter_desc = []
- if "min_views" in request.filters:
- filter_desc.append(f"最小播放量:{request.filters['min_views']}")
- if "min_likes" in request.filters:
- filter_desc.append(f"最小点赞数:{request.filters['min_likes']}")
- if "date_range" in request.filters:
- filter_desc.append(f"时间范围:{request.filters['date_range']}")
- if filter_desc:
- prompt_parts.append(f"筛选条件:{', '.join(filter_desc)}")
- prompt_parts.append(f"\n最多返回 {request.max_results} 条结果")
- prompt_parts.append("\n请使用爬虫工具搜索内容,并评估内容质量和相关性。")
- return "\n".join(prompt_parts)
- async def collect_feedback(
- self,
- content_id: str,
- rating: str,
- notes: str,
- operator_id: str,
- ) -> OperatorFeedback:
- """
- 收集运营反馈
- Args:
- content_id: 内容ID
- rating: 评级
- notes: 备注
- operator_id: 运营人员ID
- Returns:
- 反馈记录
- """
- feedback = OperatorFeedback(
- content_id=content_id,
- rating=rating,
- notes=notes,
- operator_id=operator_id,
- feedback_time=datetime.now(),
- )
- # 保存到记忆系统
- if self.memory_store:
- await self._save_feedback(feedback)
- return feedback
- async def update_performance(
- self,
- content_id: str,
- performance: ContentPerformance,
- ) -> None:
- """
- 更新内容表现数据
- Args:
- content_id: 内容ID
- performance: 表现数据
- """
- if self.memory_store:
- await self._save_performance(performance)
- async def optimize_strategy(self) -> Dict[str, Any]:
- """
- 优化搜索策略
- 基于历史搜索记录、运营反馈和内容表现数据,
- 分析并优化搜索策略。
- Returns:
- 优化后的策略配置
- """
- # 获取历史数据
- search_history = await self._get_search_history()
- feedbacks = await self._get_feedbacks()
- performances = await self._get_performances()
- # 分析高表现内容特征
- high_perf_features = self._analyze_high_performance_content(
- feedbacks, performances
- )
- # 生成优化策略
- strategy = {
- "recommended_keywords": high_perf_features.get("keywords", []),
- "recommended_tags": high_perf_features.get("tags", []),
- "optimal_filters": high_perf_features.get("filters", {}),
- "platform_weights": high_perf_features.get("platform_weights", {}),
- }
- return strategy
- # ===== 私有方法 =====
- async def _save_feedback(self, feedback: OperatorFeedback) -> None:
- """保存反馈到记忆系统"""
- # 伪代码:实际实现需要调用memory_store
- pass
- async def _save_performance(self, performance: ContentPerformance) -> None:
- """保存表现数据到记忆系统"""
- # 伪代码:实际实现需要调用memory_store
- pass
- async def _get_search_history(self) -> List[Dict[str, Any]]:
- """获取搜索历史"""
- # 伪代码:从memory_store读取
- return []
- async def _get_feedbacks(self) -> List[OperatorFeedback]:
- """获取所有反馈"""
- # 伪代码:从memory_store读取
- return []
- async def _get_performances(self) -> List[ContentPerformance]:
- """获取所有表现数据"""
- # 伪代码:从memory_store读取
- return []
- def _analyze_high_performance_content(
- self,
- feedbacks: List[OperatorFeedback],
- performances: List[ContentPerformance],
- ) -> Dict[str, Any]:
- """分析高表现内容的共同特征"""
- # 伪代码:实际实现需要数据分析逻辑
- return {
- "keywords": [],
- "tags": [],
- "filters": {},
- "platform_weights": {},
- }
|