""" 内容寻找 Agent - 简化运行示例 这是一个独立的演示脚本,展示核心逻辑,不依赖完整的框架。 """ import asyncio import json from pathlib import Path from datetime import datetime from typing import List, Dict, Any # ===== 数据模型 ===== class SearchRequest: """搜索请求""" def __init__(self, keywords: List[str], tags: List[str], platforms: List[str], filters: Dict[str, Any], max_results: int = 50): self.keywords = keywords self.tags = tags self.platforms = platforms self.filters = filters self.max_results = max_results class ContentItem: """内容项""" def __init__(self, content_id: str, platform: str, title: str, author: str, url: str, stats: Dict[str, int], tags: List[str]): self.content_id = content_id self.platform = platform self.title = title self.author = author self.url = url self.stats = stats self.tags = tags # ===== 模拟爬虫工具 ===== async def douyin_search(keywords: str, max_results: int = 20) -> List[ContentItem]: """模拟抖音搜索""" print(f"🔍 正在抖音搜索: {keywords}") await asyncio.sleep(0.5) # 模拟网络延迟 # 返回模拟数据 return [ ContentItem( content_id=f"dy_{i}", platform="douyin", title=f"【{keywords}】精彩视频 #{i}", author=f"作者{i}", url=f"https://douyin.com/video/{i}", stats={"views": 100000 + i * 10000, "likes": 5000 + i * 500, "comments": 200 + i * 20}, tags=[keywords, "热门", "推荐"] ) for i in range(1, min(max_results, 6)) ] async def kuaishou_search(keywords: str, max_results: int = 20) -> List[ContentItem]: """模拟快手搜索""" print(f"🔍 正在快手搜索: {keywords}") await asyncio.sleep(0.5) return [ ContentItem( content_id=f"ks_{i}", platform="kuaishou", title=f"【{keywords}】优质内容 #{i}", author=f"创作者{i}", url=f"https://kuaishou.com/video/{i}", stats={"views": 80000 + i * 8000, "likes": 4000 + i * 400, "comments": 150 + i * 15}, tags=[keywords, "精选"] ) for i in range(1, min(max_results, 6)) ] # ===== 内容评估 ===== def evaluate_content(items: List[ContentItem], keywords: List[str]) -> List[ContentItem]: """评估内容质量""" print(f"📊 正在评估 {len(items)} 条内容...") for item in items: # 计算质量分数 stats = item.stats views = stats.get("views", 0) likes = stats.get("likes", 0) comments = stats.get("comments", 0) engagement_rate = (likes + comments) / max(views, 1) quality_score = (views / 10000) * 0.5 + engagement_rate * 50 item.quality_score = round(quality_score, 2) # 计算相关性分数 relevance = sum(1 for kw in keywords if kw in item.title) / len(keywords) * 100 item.relevance_score = round(relevance, 2) # 按质量分数排序 items.sort(key=lambda x: x.quality_score, reverse=True) return items # ===== 记忆管理 ===== class MemoryManager: """简化的记忆管理器""" def __init__(self, storage_path: str): self.storage_path = Path(storage_path) self.storage_path.mkdir(parents=True, exist_ok=True) self.feedback_file = self.storage_path / "feedbacks.jsonl" self.performance_file = self.storage_path / "performances.jsonl" def save_feedback(self, content_id: str, rating: str, notes: str): """保存反馈""" record = { "content_id": content_id, "rating": rating, "notes": notes, "timestamp": datetime.now().isoformat(), } with open(self.feedback_file, "a", encoding="utf-8") as f: f.write(json.dumps(record, ensure_ascii=False) + "\n") print(f"✅ 反馈已保存: {rating}") def save_performance(self, content_id: str, internal_views: int, engagement: float): """保存表现数据""" record = { "content_id": content_id, "internal_views": internal_views, "engagement": engagement, "timestamp": datetime.now().isoformat(), } with open(self.performance_file, "a", encoding="utf-8") as f: f.write(json.dumps(record, ensure_ascii=False) + "\n") print(f"✅ 表现数据已保存") def get_feedbacks(self, rating_filter: str = None) -> List[Dict]: """获取反馈""" if not self.feedback_file.exists(): return [] feedbacks = [] with open(self.feedback_file, "r", encoding="utf-8") as f: for line in f: if line.strip(): record = json.loads(line) if rating_filter is None or record.get("rating") == rating_filter: feedbacks.append(record) return feedbacks # ===== 主Agent ===== class ContentFinderAgent: """内容寻找Agent""" def __init__(self, memory: MemoryManager): self.memory = memory async def search_content(self, request: SearchRequest) -> List[ContentItem]: """搜索内容""" print(f"\n{'='*60}") print(f"🎯 开始搜索内容") print(f"关键词: {', '.join(request.keywords)}") print(f"平台: {', '.join(request.platforms)}") print(f"{'='*60}\n") # 并行搜索多个平台 tasks = [] for platform in request.platforms: for keyword in request.keywords: if platform == "douyin": tasks.append(douyin_search(keyword, request.max_results // len(request.keywords))) elif platform == "kuaishou": tasks.append(kuaishou_search(keyword, request.max_results // len(request.keywords))) results_list = await asyncio.gather(*tasks) # 合并结果 all_items = [] for results in results_list: all_items.extend(results) # 去重 seen = set() unique_items = [] for item in all_items: if item.content_id not in seen: seen.add(item.content_id) unique_items.append(item) # 评估内容 evaluated_items = evaluate_content(unique_items, request.keywords) print(f"\n✨ 搜索完成,找到 {len(evaluated_items)} 条内容\n") return evaluated_items def collect_feedback(self, content_id: str, rating: str, notes: str): """收集反馈""" self.memory.save_feedback(content_id, rating, notes) def update_performance(self, content_id: str, internal_views: int, engagement: float): """更新表现""" self.memory.save_performance(content_id, internal_views, engagement) def analyze_insights(self): """分析洞察""" print(f"\n{'='*60}") print("📈 分析运营洞察") print(f"{'='*60}\n") excellent = self.memory.get_feedbacks("excellent") good = self.memory.get_feedbacks("good") poor = self.memory.get_feedbacks("poor") total = len(excellent) + len(good) + len(poor) if total == 0: print("暂无反馈数据") return print(f"总反馈数: {total}") print(f" - 优质 (excellent): {len(excellent)} ({len(excellent)/total*100:.1f}%)") print(f" - 良好 (good): {len(good)} ({len(good)/total*100:.1f}%)") print(f" - 较差 (poor): {len(poor)} ({len(poor)/total*100:.1f}%)") print() # ===== 主函数 ===== async def main(): """演示完整流程""" # 初始化 memory = MemoryManager(".cache/content_finder") agent = ContentFinderAgent(memory) # 示例1: 搜索美食内容 print("\n" + "="*60) print("示例1: 搜索美食类视频内容") print("="*60) request = SearchRequest( keywords=["美食探店", "美食推荐"], tags=["美食", "探店"], platforms=["douyin", "kuaishou"], filters={"min_views": 10000}, max_results=20, ) results = await agent.search_content(request) # 显示前5条结果 print("🏆 Top 5 内容:") for i, item in enumerate(results[:5], 1): print(f"\n{i}. [{item.platform}] {item.title}") print(f" 作者: {item.author}") print(f" 播放: {item.stats['views']:,} | 点赞: {item.stats['likes']:,}") print(f" 质量分数: {item.quality_score} | 相关性: {item.relevance_score}") # 示例2: 收集反馈 print("\n" + "="*60) print("示例2: 收集运营反馈") print("="*60 + "\n") if results: agent.collect_feedback(results[0].content_id, "excellent", "内容质量很高") agent.collect_feedback(results[1].content_id, "good", "内容不错") agent.collect_feedback(results[2].content_id, "excellent", "非常符合需求") # 示例3: 更新表现 print("\n" + "="*60) print("示例3: 更新内容表现") print("="*60 + "\n") if results: agent.update_performance(results[0].content_id, internal_views=5000, engagement=0.15) agent.update_performance(results[1].content_id, internal_views=3000, engagement=0.12) # 示例4: 分析洞察 print("\n" + "="*60) print("示例4: 分析运营洞察") print("="*60) agent.analyze_insights() print("\n" + "="*60) print("✅ 演示完成!") print("="*60 + "\n") if __name__ == "__main__": asyncio.run(main())