demo.py 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298
  1. """
  2. 内容寻找 Agent - 简化运行示例
  3. 这是一个独立的演示脚本,展示核心逻辑,不依赖完整的框架。
  4. """
  5. import asyncio
  6. import json
  7. from pathlib import Path
  8. from datetime import datetime
  9. from typing import List, Dict, Any
  10. # ===== 数据模型 =====
  11. class SearchRequest:
  12. """搜索请求"""
  13. def __init__(self, keywords: List[str], tags: List[str], platforms: List[str],
  14. filters: Dict[str, Any], max_results: int = 50):
  15. self.keywords = keywords
  16. self.tags = tags
  17. self.platforms = platforms
  18. self.filters = filters
  19. self.max_results = max_results
  20. class ContentItem:
  21. """内容项"""
  22. def __init__(self, content_id: str, platform: str, title: str, author: str,
  23. url: str, stats: Dict[str, int], tags: List[str]):
  24. self.content_id = content_id
  25. self.platform = platform
  26. self.title = title
  27. self.author = author
  28. self.url = url
  29. self.stats = stats
  30. self.tags = tags
  31. # ===== 模拟爬虫工具 =====
  32. async def douyin_search(keywords: str, max_results: int = 20) -> List[ContentItem]:
  33. """模拟抖音搜索"""
  34. print(f"🔍 正在抖音搜索: {keywords}")
  35. await asyncio.sleep(0.5) # 模拟网络延迟
  36. # 返回模拟数据
  37. return [
  38. ContentItem(
  39. content_id=f"dy_{i}",
  40. platform="douyin",
  41. title=f"【{keywords}】精彩视频 #{i}",
  42. author=f"作者{i}",
  43. url=f"https://douyin.com/video/{i}",
  44. stats={"views": 100000 + i * 10000, "likes": 5000 + i * 500, "comments": 200 + i * 20},
  45. tags=[keywords, "热门", "推荐"]
  46. )
  47. for i in range(1, min(max_results, 6))
  48. ]
  49. async def kuaishou_search(keywords: str, max_results: int = 20) -> List[ContentItem]:
  50. """模拟快手搜索"""
  51. print(f"🔍 正在快手搜索: {keywords}")
  52. await asyncio.sleep(0.5)
  53. return [
  54. ContentItem(
  55. content_id=f"ks_{i}",
  56. platform="kuaishou",
  57. title=f"【{keywords}】优质内容 #{i}",
  58. author=f"创作者{i}",
  59. url=f"https://kuaishou.com/video/{i}",
  60. stats={"views": 80000 + i * 8000, "likes": 4000 + i * 400, "comments": 150 + i * 15},
  61. tags=[keywords, "精选"]
  62. )
  63. for i in range(1, min(max_results, 6))
  64. ]
  65. # ===== 内容评估 =====
  66. def evaluate_content(items: List[ContentItem], keywords: List[str]) -> List[ContentItem]:
  67. """评估内容质量"""
  68. print(f"📊 正在评估 {len(items)} 条内容...")
  69. for item in items:
  70. # 计算质量分数
  71. stats = item.stats
  72. views = stats.get("views", 0)
  73. likes = stats.get("likes", 0)
  74. comments = stats.get("comments", 0)
  75. engagement_rate = (likes + comments) / max(views, 1)
  76. quality_score = (views / 10000) * 0.5 + engagement_rate * 50
  77. item.quality_score = round(quality_score, 2)
  78. # 计算相关性分数
  79. relevance = sum(1 for kw in keywords if kw in item.title) / len(keywords) * 100
  80. item.relevance_score = round(relevance, 2)
  81. # 按质量分数排序
  82. items.sort(key=lambda x: x.quality_score, reverse=True)
  83. return items
  84. # ===== 记忆管理 =====
  85. class MemoryManager:
  86. """简化的记忆管理器"""
  87. def __init__(self, storage_path: str):
  88. self.storage_path = Path(storage_path)
  89. self.storage_path.mkdir(parents=True, exist_ok=True)
  90. self.feedback_file = self.storage_path / "feedbacks.jsonl"
  91. self.performance_file = self.storage_path / "performances.jsonl"
  92. def save_feedback(self, content_id: str, rating: str, notes: str):
  93. """保存反馈"""
  94. record = {
  95. "content_id": content_id,
  96. "rating": rating,
  97. "notes": notes,
  98. "timestamp": datetime.now().isoformat(),
  99. }
  100. with open(self.feedback_file, "a", encoding="utf-8") as f:
  101. f.write(json.dumps(record, ensure_ascii=False) + "\n")
  102. print(f"✅ 反馈已保存: {rating}")
  103. def save_performance(self, content_id: str, internal_views: int, engagement: float):
  104. """保存表现数据"""
  105. record = {
  106. "content_id": content_id,
  107. "internal_views": internal_views,
  108. "engagement": engagement,
  109. "timestamp": datetime.now().isoformat(),
  110. }
  111. with open(self.performance_file, "a", encoding="utf-8") as f:
  112. f.write(json.dumps(record, ensure_ascii=False) + "\n")
  113. print(f"✅ 表现数据已保存")
  114. def get_feedbacks(self, rating_filter: str = None) -> List[Dict]:
  115. """获取反馈"""
  116. if not self.feedback_file.exists():
  117. return []
  118. feedbacks = []
  119. with open(self.feedback_file, "r", encoding="utf-8") as f:
  120. for line in f:
  121. if line.strip():
  122. record = json.loads(line)
  123. if rating_filter is None or record.get("rating") == rating_filter:
  124. feedbacks.append(record)
  125. return feedbacks
  126. # ===== 主Agent =====
  127. class ContentFinderAgent:
  128. """内容寻找Agent"""
  129. def __init__(self, memory: MemoryManager):
  130. self.memory = memory
  131. async def search_content(self, request: SearchRequest) -> List[ContentItem]:
  132. """搜索内容"""
  133. print(f"\n{'='*60}")
  134. print(f"🎯 开始搜索内容")
  135. print(f"关键词: {', '.join(request.keywords)}")
  136. print(f"平台: {', '.join(request.platforms)}")
  137. print(f"{'='*60}\n")
  138. # 并行搜索多个平台
  139. tasks = []
  140. for platform in request.platforms:
  141. for keyword in request.keywords:
  142. if platform == "douyin":
  143. tasks.append(douyin_search(keyword, request.max_results // len(request.keywords)))
  144. elif platform == "kuaishou":
  145. tasks.append(kuaishou_search(keyword, request.max_results // len(request.keywords)))
  146. results_list = await asyncio.gather(*tasks)
  147. # 合并结果
  148. all_items = []
  149. for results in results_list:
  150. all_items.extend(results)
  151. # 去重
  152. seen = set()
  153. unique_items = []
  154. for item in all_items:
  155. if item.content_id not in seen:
  156. seen.add(item.content_id)
  157. unique_items.append(item)
  158. # 评估内容
  159. evaluated_items = evaluate_content(unique_items, request.keywords)
  160. print(f"\n✨ 搜索完成,找到 {len(evaluated_items)} 条内容\n")
  161. return evaluated_items
  162. def collect_feedback(self, content_id: str, rating: str, notes: str):
  163. """收集反馈"""
  164. self.memory.save_feedback(content_id, rating, notes)
  165. def update_performance(self, content_id: str, internal_views: int, engagement: float):
  166. """更新表现"""
  167. self.memory.save_performance(content_id, internal_views, engagement)
  168. def analyze_insights(self):
  169. """分析洞察"""
  170. print(f"\n{'='*60}")
  171. print("📈 分析运营洞察")
  172. print(f"{'='*60}\n")
  173. excellent = self.memory.get_feedbacks("excellent")
  174. good = self.memory.get_feedbacks("good")
  175. poor = self.memory.get_feedbacks("poor")
  176. total = len(excellent) + len(good) + len(poor)
  177. if total == 0:
  178. print("暂无反馈数据")
  179. return
  180. print(f"总反馈数: {total}")
  181. print(f" - 优质 (excellent): {len(excellent)} ({len(excellent)/total*100:.1f}%)")
  182. print(f" - 良好 (good): {len(good)} ({len(good)/total*100:.1f}%)")
  183. print(f" - 较差 (poor): {len(poor)} ({len(poor)/total*100:.1f}%)")
  184. print()
  185. # ===== 主函数 =====
  186. async def main():
  187. """演示完整流程"""
  188. # 初始化
  189. memory = MemoryManager(".cache/content_finder")
  190. agent = ContentFinderAgent(memory)
  191. # 示例1: 搜索美食内容
  192. print("\n" + "="*60)
  193. print("示例1: 搜索美食类视频内容")
  194. print("="*60)
  195. request = SearchRequest(
  196. keywords=["美食探店", "美食推荐"],
  197. tags=["美食", "探店"],
  198. platforms=["douyin", "kuaishou"],
  199. filters={"min_views": 10000},
  200. max_results=20,
  201. )
  202. results = await agent.search_content(request)
  203. # 显示前5条结果
  204. print("🏆 Top 5 内容:")
  205. for i, item in enumerate(results[:5], 1):
  206. print(f"\n{i}. [{item.platform}] {item.title}")
  207. print(f" 作者: {item.author}")
  208. print(f" 播放: {item.stats['views']:,} | 点赞: {item.stats['likes']:,}")
  209. print(f" 质量分数: {item.quality_score} | 相关性: {item.relevance_score}")
  210. # 示例2: 收集反馈
  211. print("\n" + "="*60)
  212. print("示例2: 收集运营反馈")
  213. print("="*60 + "\n")
  214. if results:
  215. agent.collect_feedback(results[0].content_id, "excellent", "内容质量很高")
  216. agent.collect_feedback(results[1].content_id, "good", "内容不错")
  217. agent.collect_feedback(results[2].content_id, "excellent", "非常符合需求")
  218. # 示例3: 更新表现
  219. print("\n" + "="*60)
  220. print("示例3: 更新内容表现")
  221. print("="*60 + "\n")
  222. if results:
  223. agent.update_performance(results[0].content_id, internal_views=5000, engagement=0.15)
  224. agent.update_performance(results[1].content_id, internal_views=3000, engagement=0.12)
  225. # 示例4: 分析洞察
  226. print("\n" + "="*60)
  227. print("示例4: 分析运营洞察")
  228. print("="*60)
  229. agent.analyze_insights()
  230. print("\n" + "="*60)
  231. print("✅ 演示完成!")
  232. print("="*60 + "\n")
  233. if __name__ == "__main__":
  234. asyncio.run(main())