content_perf.py 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100
  1. """
  2. 内容表现数据管理
  3. """
  4. from typing import List, Dict, Any, Optional
  5. from datetime import datetime
  6. import json
  7. from pathlib import Path
  8. class ContentPerformanceManager:
  9. """内容表现数据管理器"""
  10. def __init__(self, storage_path: str):
  11. self.storage_path = Path(storage_path)
  12. self.storage_path.mkdir(parents=True, exist_ok=True)
  13. self.perf_file = self.storage_path / "content_performance.jsonl"
  14. async def save_performance(
  15. self,
  16. content_id: str,
  17. platform_views: int,
  18. platform_likes: int,
  19. platform_shares: int,
  20. internal_views: int,
  21. internal_engagement: float,
  22. conversion_rate: float,
  23. ) -> None:
  24. """保存内容表现数据"""
  25. record = {
  26. "content_id": content_id,
  27. "timestamp": datetime.now().isoformat(),
  28. "platform_views": platform_views,
  29. "platform_likes": platform_likes,
  30. "platform_shares": platform_shares,
  31. "internal_views": internal_views,
  32. "internal_engagement": internal_engagement,
  33. "conversion_rate": conversion_rate,
  34. }
  35. with open(self.perf_file, "a", encoding="utf-8") as f:
  36. f.write(json.dumps(record, ensure_ascii=False) + "\n")
  37. async def get_performance(
  38. self,
  39. content_id: Optional[str] = None,
  40. limit: int = 100,
  41. ) -> List[Dict[str, Any]]:
  42. """获取内容表现数据"""
  43. if not self.perf_file.exists():
  44. return []
  45. records = []
  46. with open(self.perf_file, "r", encoding="utf-8") as f:
  47. for line in f:
  48. if line.strip():
  49. record = json.loads(line)
  50. if content_id is None or record.get("content_id") == content_id:
  51. records.append(record)
  52. return records[-limit:]
  53. async def get_top_performers(
  54. self,
  55. metric: str = "internal_engagement",
  56. limit: int = 20,
  57. ) -> List[Dict[str, Any]]:
  58. """获取表现最好的内容"""
  59. all_records = await self.get_performance(limit=1000)
  60. # 按指定指标排序
  61. sorted_records = sorted(
  62. all_records,
  63. key=lambda x: x.get(metric, 0),
  64. reverse=True,
  65. )
  66. return sorted_records[:limit]
  67. async def analyze_trends(self) -> Dict[str, Any]:
  68. """分析内容表现趋势"""
  69. all_records = await self.get_performance(limit=1000)
  70. if not all_records:
  71. return {
  72. "avg_internal_engagement": 0,
  73. "avg_conversion_rate": 0,
  74. "total_internal_views": 0,
  75. }
  76. total_engagement = sum(r.get("internal_engagement", 0) for r in all_records)
  77. total_conversion = sum(r.get("conversion_rate", 0) for r in all_records)
  78. total_views = sum(r.get("internal_views", 0) for r in all_records)
  79. return {
  80. "avg_internal_engagement": total_engagement / len(all_records),
  81. "avg_conversion_rate": total_conversion / len(all_records),
  82. "total_internal_views": total_views,
  83. "sample_size": len(all_records),
  84. }