| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298 |
- """
- 内容寻找 Agent - 简化运行示例
- 这是一个独立的演示脚本,展示核心逻辑,不依赖完整的框架。
- """
- import asyncio
- import json
- from pathlib import Path
- from datetime import datetime
- from typing import List, Dict, Any
- # ===== 数据模型 =====
- class SearchRequest:
- """搜索请求"""
- def __init__(self, keywords: List[str], tags: List[str], platforms: List[str],
- filters: Dict[str, Any], max_results: int = 50):
- self.keywords = keywords
- self.tags = tags
- self.platforms = platforms
- self.filters = filters
- self.max_results = max_results
- class ContentItem:
- """内容项"""
- def __init__(self, content_id: str, platform: str, title: str, author: str,
- url: str, stats: Dict[str, int], tags: List[str]):
- self.content_id = content_id
- self.platform = platform
- self.title = title
- self.author = author
- self.url = url
- self.stats = stats
- self.tags = tags
- # ===== 模拟爬虫工具 =====
- async def douyin_search(keywords: str, max_results: int = 20) -> List[ContentItem]:
- """模拟抖音搜索"""
- print(f"🔍 正在抖音搜索: {keywords}")
- await asyncio.sleep(0.5) # 模拟网络延迟
- # 返回模拟数据
- return [
- ContentItem(
- content_id=f"dy_{i}",
- platform="douyin",
- title=f"【{keywords}】精彩视频 #{i}",
- author=f"作者{i}",
- url=f"https://douyin.com/video/{i}",
- stats={"views": 100000 + i * 10000, "likes": 5000 + i * 500, "comments": 200 + i * 20},
- tags=[keywords, "热门", "推荐"]
- )
- for i in range(1, min(max_results, 6))
- ]
- async def kuaishou_search(keywords: str, max_results: int = 20) -> List[ContentItem]:
- """模拟快手搜索"""
- print(f"🔍 正在快手搜索: {keywords}")
- await asyncio.sleep(0.5)
- return [
- ContentItem(
- content_id=f"ks_{i}",
- platform="kuaishou",
- title=f"【{keywords}】优质内容 #{i}",
- author=f"创作者{i}",
- url=f"https://kuaishou.com/video/{i}",
- stats={"views": 80000 + i * 8000, "likes": 4000 + i * 400, "comments": 150 + i * 15},
- tags=[keywords, "精选"]
- )
- for i in range(1, min(max_results, 6))
- ]
- # ===== 内容评估 =====
- def evaluate_content(items: List[ContentItem], keywords: List[str]) -> List[ContentItem]:
- """评估内容质量"""
- print(f"📊 正在评估 {len(items)} 条内容...")
- for item in items:
- # 计算质量分数
- stats = item.stats
- views = stats.get("views", 0)
- likes = stats.get("likes", 0)
- comments = stats.get("comments", 0)
- engagement_rate = (likes + comments) / max(views, 1)
- quality_score = (views / 10000) * 0.5 + engagement_rate * 50
- item.quality_score = round(quality_score, 2)
- # 计算相关性分数
- relevance = sum(1 for kw in keywords if kw in item.title) / len(keywords) * 100
- item.relevance_score = round(relevance, 2)
- # 按质量分数排序
- items.sort(key=lambda x: x.quality_score, reverse=True)
- return items
- # ===== 记忆管理 =====
- class MemoryManager:
- """简化的记忆管理器"""
- def __init__(self, storage_path: str):
- self.storage_path = Path(storage_path)
- self.storage_path.mkdir(parents=True, exist_ok=True)
- self.feedback_file = self.storage_path / "feedbacks.jsonl"
- self.performance_file = self.storage_path / "performances.jsonl"
- def save_feedback(self, content_id: str, rating: str, notes: str):
- """保存反馈"""
- record = {
- "content_id": content_id,
- "rating": rating,
- "notes": notes,
- "timestamp": datetime.now().isoformat(),
- }
- with open(self.feedback_file, "a", encoding="utf-8") as f:
- f.write(json.dumps(record, ensure_ascii=False) + "\n")
- print(f"✅ 反馈已保存: {rating}")
- def save_performance(self, content_id: str, internal_views: int, engagement: float):
- """保存表现数据"""
- record = {
- "content_id": content_id,
- "internal_views": internal_views,
- "engagement": engagement,
- "timestamp": datetime.now().isoformat(),
- }
- with open(self.performance_file, "a", encoding="utf-8") as f:
- f.write(json.dumps(record, ensure_ascii=False) + "\n")
- print(f"✅ 表现数据已保存")
- def get_feedbacks(self, rating_filter: str = None) -> List[Dict]:
- """获取反馈"""
- if not self.feedback_file.exists():
- return []
- feedbacks = []
- with open(self.feedback_file, "r", encoding="utf-8") as f:
- for line in f:
- if line.strip():
- record = json.loads(line)
- if rating_filter is None or record.get("rating") == rating_filter:
- feedbacks.append(record)
- return feedbacks
- # ===== 主Agent =====
- class ContentFinderAgent:
- """内容寻找Agent"""
- def __init__(self, memory: MemoryManager):
- self.memory = memory
- async def search_content(self, request: SearchRequest) -> List[ContentItem]:
- """搜索内容"""
- print(f"\n{'='*60}")
- print(f"🎯 开始搜索内容")
- print(f"关键词: {', '.join(request.keywords)}")
- print(f"平台: {', '.join(request.platforms)}")
- print(f"{'='*60}\n")
- # 并行搜索多个平台
- tasks = []
- for platform in request.platforms:
- for keyword in request.keywords:
- if platform == "douyin":
- tasks.append(douyin_search(keyword, request.max_results // len(request.keywords)))
- elif platform == "kuaishou":
- tasks.append(kuaishou_search(keyword, request.max_results // len(request.keywords)))
- results_list = await asyncio.gather(*tasks)
- # 合并结果
- all_items = []
- for results in results_list:
- all_items.extend(results)
- # 去重
- seen = set()
- unique_items = []
- for item in all_items:
- if item.content_id not in seen:
- seen.add(item.content_id)
- unique_items.append(item)
- # 评估内容
- evaluated_items = evaluate_content(unique_items, request.keywords)
- print(f"\n✨ 搜索完成,找到 {len(evaluated_items)} 条内容\n")
- return evaluated_items
- def collect_feedback(self, content_id: str, rating: str, notes: str):
- """收集反馈"""
- self.memory.save_feedback(content_id, rating, notes)
- def update_performance(self, content_id: str, internal_views: int, engagement: float):
- """更新表现"""
- self.memory.save_performance(content_id, internal_views, engagement)
- def analyze_insights(self):
- """分析洞察"""
- print(f"\n{'='*60}")
- print("📈 分析运营洞察")
- print(f"{'='*60}\n")
- excellent = self.memory.get_feedbacks("excellent")
- good = self.memory.get_feedbacks("good")
- poor = self.memory.get_feedbacks("poor")
- total = len(excellent) + len(good) + len(poor)
- if total == 0:
- print("暂无反馈数据")
- return
- print(f"总反馈数: {total}")
- print(f" - 优质 (excellent): {len(excellent)} ({len(excellent)/total*100:.1f}%)")
- print(f" - 良好 (good): {len(good)} ({len(good)/total*100:.1f}%)")
- print(f" - 较差 (poor): {len(poor)} ({len(poor)/total*100:.1f}%)")
- print()
- # ===== 主函数 =====
- async def main():
- """演示完整流程"""
- # 初始化
- memory = MemoryManager(".cache/content_finder")
- agent = ContentFinderAgent(memory)
- # 示例1: 搜索美食内容
- print("\n" + "="*60)
- print("示例1: 搜索美食类视频内容")
- print("="*60)
- request = SearchRequest(
- keywords=["美食探店", "美食推荐"],
- tags=["美食", "探店"],
- platforms=["douyin", "kuaishou"],
- filters={"min_views": 10000},
- max_results=20,
- )
- results = await agent.search_content(request)
- # 显示前5条结果
- print("🏆 Top 5 内容:")
- for i, item in enumerate(results[:5], 1):
- print(f"\n{i}. [{item.platform}] {item.title}")
- print(f" 作者: {item.author}")
- print(f" 播放: {item.stats['views']:,} | 点赞: {item.stats['likes']:,}")
- print(f" 质量分数: {item.quality_score} | 相关性: {item.relevance_score}")
- # 示例2: 收集反馈
- print("\n" + "="*60)
- print("示例2: 收集运营反馈")
- print("="*60 + "\n")
- if results:
- agent.collect_feedback(results[0].content_id, "excellent", "内容质量很高")
- agent.collect_feedback(results[1].content_id, "good", "内容不错")
- agent.collect_feedback(results[2].content_id, "excellent", "非常符合需求")
- # 示例3: 更新表现
- print("\n" + "="*60)
- print("示例3: 更新内容表现")
- print("="*60 + "\n")
- if results:
- agent.update_performance(results[0].content_id, internal_views=5000, engagement=0.15)
- agent.update_performance(results[1].content_id, internal_views=3000, engagement=0.12)
- # 示例4: 分析洞察
- print("\n" + "="*60)
- print("示例4: 分析运营洞察")
- print("="*60)
- agent.analyze_insights()
- print("\n" + "="*60)
- print("✅ 演示完成!")
- print("="*60 + "\n")
- if __name__ == "__main__":
- asyncio.run(main())
|