""" 内容表现数据管理 """ from typing import List, Dict, Any, Optional from datetime import datetime import json from pathlib import Path class ContentPerformanceManager: """内容表现数据管理器""" def __init__(self, storage_path: str): self.storage_path = Path(storage_path) self.storage_path.mkdir(parents=True, exist_ok=True) self.perf_file = self.storage_path / "content_performance.jsonl" async def save_performance( self, content_id: str, platform_views: int, platform_likes: int, platform_shares: int, internal_views: int, internal_engagement: float, conversion_rate: float, ) -> None: """保存内容表现数据""" record = { "content_id": content_id, "timestamp": datetime.now().isoformat(), "platform_views": platform_views, "platform_likes": platform_likes, "platform_shares": platform_shares, "internal_views": internal_views, "internal_engagement": internal_engagement, "conversion_rate": conversion_rate, } with open(self.perf_file, "a", encoding="utf-8") as f: f.write(json.dumps(record, ensure_ascii=False) + "\n") async def get_performance( self, content_id: Optional[str] = None, limit: int = 100, ) -> List[Dict[str, Any]]: """获取内容表现数据""" if not self.perf_file.exists(): return [] records = [] with open(self.perf_file, "r", encoding="utf-8") as f: for line in f: if line.strip(): record = json.loads(line) if content_id is None or record.get("content_id") == content_id: records.append(record) return records[-limit:] async def get_top_performers( self, metric: str = "internal_engagement", limit: int = 20, ) -> List[Dict[str, Any]]: """获取表现最好的内容""" all_records = await self.get_performance(limit=1000) # 按指定指标排序 sorted_records = sorted( all_records, key=lambda x: x.get(metric, 0), reverse=True, ) return sorted_records[:limit] async def analyze_trends(self) -> Dict[str, Any]: """分析内容表现趋势""" all_records = await self.get_performance(limit=1000) if not all_records: return { "avg_internal_engagement": 0, "avg_conversion_rate": 0, "total_internal_views": 0, } total_engagement = sum(r.get("internal_engagement", 0) for r in all_records) total_conversion = sum(r.get("conversion_rate", 0) for r in all_records) total_views = sum(r.get("internal_views", 0) for r in all_records) return { "avg_internal_engagement": total_engagement / len(all_records), "avg_conversion_rate": total_conversion / len(all_records), "total_internal_views": total_views, "sample_size": len(all_records), }