agent.py 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267
  1. """
  2. 内容寻找 AI Agent
  3. 核心功能:
  4. 1. 根据需求自主从全网寻找相关视频内容
  5. 2. 支持运营人工交互,调整记忆相关数据
  6. 3. 集成现有爬虫能力(抖音、快手等)
  7. 4. 通过运营交互和内容表现不断优化寻找路径
  8. """
  9. import asyncio
  10. import logging
  11. from dataclasses import dataclass
  12. from typing import List, Dict, Any, Optional
  13. from datetime import datetime
  14. from agent.core.runner import AgentRunner, RunConfig
  15. from agent.trace.protocols import TraceStore
  16. from agent.memory.protocols import MemoryStore
  17. logger = logging.getLogger(__name__)
  18. @dataclass
  19. class SearchRequest:
  20. """搜索请求"""
  21. keywords: List[str] # 关键词列表
  22. tags: List[str] # 标签列表
  23. platforms: List[str] # 目标平台(douyin, kuaishou等)
  24. filters: Dict[str, Any] # 筛选条件(播放量、时间范围等)
  25. max_results: int = 50 # 最大结果数
  26. @dataclass
  27. class ContentItem:
  28. """内容项"""
  29. content_id: str # 内容唯一ID
  30. platform: str # 平台名称
  31. title: str # 标题
  32. author: str # 作者
  33. url: str # 链接
  34. cover_url: str # 封面
  35. description: str # 描述
  36. stats: Dict[str, int] # 统计数据(播放、点赞等)
  37. tags: List[str] # 标签
  38. publish_time: datetime # 发布时间
  39. crawl_time: datetime # 抓取时间
  40. @dataclass
  41. class OperatorFeedback:
  42. """运营反馈"""
  43. content_id: str # 内容ID
  44. rating: str # 评级(excellent/good/poor)
  45. notes: str # 备注
  46. operator_id: str # 运营人员ID
  47. feedback_time: datetime # 反馈时间
  48. @dataclass
  49. class ContentPerformance:
  50. """内容表现数据"""
  51. content_id: str # 内容ID
  52. platform_views: int # 平台播放量
  53. platform_likes: int # 平台点赞数
  54. platform_shares: int # 平台分享数
  55. internal_views: int # 内部平台播放量
  56. internal_engagement: float # 内部互动率
  57. conversion_rate: float # 转化率
  58. update_time: datetime # 更新时间
  59. class ContentFinderAgent:
  60. """内容寻找 Agent"""
  61. def __init__(
  62. self,
  63. runner: AgentRunner,
  64. trace_store: TraceStore,
  65. memory_store: Optional[MemoryStore] = None,
  66. ):
  67. self.runner = runner
  68. self.trace_store = trace_store
  69. self.memory_store = memory_store
  70. async def search_content(
  71. self,
  72. request: SearchRequest,
  73. config: Optional[RunConfig] = None,
  74. ) -> List[ContentItem]:
  75. """
  76. 搜索内容
  77. Args:
  78. request: 搜索请求
  79. config: 运行配置
  80. Returns:
  81. 内容列表
  82. """
  83. if config is None:
  84. config = RunConfig(
  85. model="gpt-4o",
  86. agent_type="content_finder",
  87. skills=["content-finder"],
  88. )
  89. # 构建搜索提示词
  90. prompt = self._build_search_prompt(request)
  91. # 运行Agent
  92. results = []
  93. async for item in self.runner.run(
  94. messages=[{"role": "user", "content": prompt}],
  95. config=config,
  96. ):
  97. # 处理返回的内容项
  98. if isinstance(item, dict) and "content_items" in item:
  99. results.extend(item["content_items"])
  100. return results
  101. def _build_search_prompt(self, request: SearchRequest) -> str:
  102. """构建搜索提示词"""
  103. prompt_parts = [
  104. f"请帮我从以下平台搜索视频内容:{', '.join(request.platforms)}",
  105. f"\n关键词:{', '.join(request.keywords)}",
  106. ]
  107. if request.tags:
  108. prompt_parts.append(f"标签:{', '.join(request.tags)}")
  109. if request.filters:
  110. filter_desc = []
  111. if "min_views" in request.filters:
  112. filter_desc.append(f"最小播放量:{request.filters['min_views']}")
  113. if "min_likes" in request.filters:
  114. filter_desc.append(f"最小点赞数:{request.filters['min_likes']}")
  115. if "date_range" in request.filters:
  116. filter_desc.append(f"时间范围:{request.filters['date_range']}")
  117. if filter_desc:
  118. prompt_parts.append(f"筛选条件:{', '.join(filter_desc)}")
  119. prompt_parts.append(f"\n最多返回 {request.max_results} 条结果")
  120. prompt_parts.append("\n请使用爬虫工具搜索内容,并评估内容质量和相关性。")
  121. return "\n".join(prompt_parts)
  122. async def collect_feedback(
  123. self,
  124. content_id: str,
  125. rating: str,
  126. notes: str,
  127. operator_id: str,
  128. ) -> OperatorFeedback:
  129. """
  130. 收集运营反馈
  131. Args:
  132. content_id: 内容ID
  133. rating: 评级
  134. notes: 备注
  135. operator_id: 运营人员ID
  136. Returns:
  137. 反馈记录
  138. """
  139. feedback = OperatorFeedback(
  140. content_id=content_id,
  141. rating=rating,
  142. notes=notes,
  143. operator_id=operator_id,
  144. feedback_time=datetime.now(),
  145. )
  146. # 保存到记忆系统
  147. if self.memory_store:
  148. await self._save_feedback(feedback)
  149. return feedback
  150. async def update_performance(
  151. self,
  152. content_id: str,
  153. performance: ContentPerformance,
  154. ) -> None:
  155. """
  156. 更新内容表现数据
  157. Args:
  158. content_id: 内容ID
  159. performance: 表现数据
  160. """
  161. if self.memory_store:
  162. await self._save_performance(performance)
  163. async def optimize_strategy(self) -> Dict[str, Any]:
  164. """
  165. 优化搜索策略
  166. 基于历史搜索记录、运营反馈和内容表现数据,
  167. 分析并优化搜索策略。
  168. Returns:
  169. 优化后的策略配置
  170. """
  171. # 获取历史数据
  172. search_history = await self._get_search_history()
  173. feedbacks = await self._get_feedbacks()
  174. performances = await self._get_performances()
  175. # 分析高表现内容特征
  176. high_perf_features = self._analyze_high_performance_content(
  177. feedbacks, performances
  178. )
  179. # 生成优化策略
  180. strategy = {
  181. "recommended_keywords": high_perf_features.get("keywords", []),
  182. "recommended_tags": high_perf_features.get("tags", []),
  183. "optimal_filters": high_perf_features.get("filters", {}),
  184. "platform_weights": high_perf_features.get("platform_weights", {}),
  185. }
  186. return strategy
  187. # ===== 私有方法 =====
  188. async def _save_feedback(self, feedback: OperatorFeedback) -> None:
  189. """保存反馈到记忆系统"""
  190. # 伪代码:实际实现需要调用memory_store
  191. pass
  192. async def _save_performance(self, performance: ContentPerformance) -> None:
  193. """保存表现数据到记忆系统"""
  194. # 伪代码:实际实现需要调用memory_store
  195. pass
  196. async def _get_search_history(self) -> List[Dict[str, Any]]:
  197. """获取搜索历史"""
  198. # 伪代码:从memory_store读取
  199. return []
  200. async def _get_feedbacks(self) -> List[OperatorFeedback]:
  201. """获取所有反馈"""
  202. # 伪代码:从memory_store读取
  203. return []
  204. async def _get_performances(self) -> List[ContentPerformance]:
  205. """获取所有表现数据"""
  206. # 伪代码:从memory_store读取
  207. return []
  208. def _analyze_high_performance_content(
  209. self,
  210. feedbacks: List[OperatorFeedback],
  211. performances: List[ContentPerformance],
  212. ) -> Dict[str, Any]:
  213. """分析高表现内容的共同特征"""
  214. # 伪代码:实际实现需要数据分析逻辑
  215. return {
  216. "keywords": [],
  217. "tags": [],
  218. "filters": {},
  219. "platform_weights": {},
  220. }