operator_pref.py 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185
  1. """
  2. 运营偏好数据管理
  3. """
  4. from typing import List, Dict, Any, Optional
  5. from datetime import datetime
  6. import json
  7. from pathlib import Path
  8. from collections import Counter
  9. class OperatorPreferenceManager:
  10. """运营偏好数据管理器"""
  11. def __init__(self, storage_path: str):
  12. self.storage_path = Path(storage_path)
  13. self.storage_path.mkdir(parents=True, exist_ok=True)
  14. self.feedback_file = self.storage_path / "operator_feedback.jsonl"
  15. async def save_feedback(
  16. self,
  17. content_id: str,
  18. rating: str,
  19. notes: str,
  20. operator_id: str,
  21. content_features: Optional[Dict[str, Any]] = None,
  22. ) -> None:
  23. """保存运营反馈"""
  24. record = {
  25. "content_id": content_id,
  26. "timestamp": datetime.now().isoformat(),
  27. "rating": rating,
  28. "notes": notes,
  29. "operator_id": operator_id,
  30. "content_features": content_features or {},
  31. }
  32. with open(self.feedback_file, "a", encoding="utf-8") as f:
  33. f.write(json.dumps(record, ensure_ascii=False) + "\n")
  34. async def get_feedbacks(
  35. self,
  36. content_id: Optional[str] = None,
  37. rating_filter: Optional[str] = None,
  38. operator_id: Optional[str] = None,
  39. limit: int = 100,
  40. ) -> List[Dict[str, Any]]:
  41. """获取反馈记录"""
  42. if not self.feedback_file.exists():
  43. return []
  44. records = []
  45. with open(self.feedback_file, "r", encoding="utf-8") as f:
  46. for line in f:
  47. if line.strip():
  48. record = json.loads(line)
  49. # 应用筛选条件
  50. if content_id and record.get("content_id") != content_id:
  51. continue
  52. if rating_filter and record.get("rating") != rating_filter:
  53. continue
  54. if operator_id and record.get("operator_id") != operator_id:
  55. continue
  56. records.append(record)
  57. return records[-limit:]
  58. async def analyze_preferences(
  59. self,
  60. operator_id: Optional[str] = None,
  61. ) -> Dict[str, Any]:
  62. """分析运营偏好"""
  63. feedbacks = await self.get_feedbacks(
  64. operator_id=operator_id,
  65. limit=1000,
  66. )
  67. if not feedbacks:
  68. return {
  69. "rating_distribution": {},
  70. "preferred_keywords": [],
  71. "preferred_tags": [],
  72. "preferred_platforms": [],
  73. }
  74. # 评级分布
  75. ratings = [f.get("rating") for f in feedbacks]
  76. rating_dist = dict(Counter(ratings))
  77. # 提取优质内容的特征
  78. excellent_feedbacks = [f for f in feedbacks if f.get("rating") == "excellent"]
  79. keywords = []
  80. tags = []
  81. platforms = []
  82. for feedback in excellent_feedbacks:
  83. features = feedback.get("content_features", {})
  84. keywords.extend(features.get("keywords", []))
  85. tags.extend(features.get("tags", []))
  86. if "platform" in features:
  87. platforms.append(features["platform"])
  88. # 统计频率
  89. keyword_freq = Counter(keywords)
  90. tag_freq = Counter(tags)
  91. platform_freq = Counter(platforms)
  92. return {
  93. "rating_distribution": rating_dist,
  94. "preferred_keywords": [k for k, _ in keyword_freq.most_common(10)],
  95. "preferred_tags": [t for t, _ in tag_freq.most_common(10)],
  96. "preferred_platforms": [p for p, _ in platform_freq.most_common()],
  97. "total_feedbacks": len(feedbacks),
  98. "excellent_count": len(excellent_feedbacks),
  99. }
  100. async def get_learning_insights(self) -> Dict[str, Any]:
  101. """获取学习洞察"""
  102. all_feedbacks = await self.get_feedbacks(limit=1000)
  103. if not all_feedbacks:
  104. return {"insights": []}
  105. # 分析优质内容的共同特征
  106. excellent = [f for f in all_feedbacks if f.get("rating") == "excellent"]
  107. good = [f for f in all_feedbacks if f.get("rating") == "good"]
  108. poor = [f for f in all_feedbacks if f.get("rating") == "poor"]
  109. insights = []
  110. # 洞察1:评级分布
  111. insights.append({
  112. "type": "rating_distribution",
  113. "message": f"优质内容占比:{len(excellent) / len(all_feedbacks) * 100:.1f}%",
  114. "data": {
  115. "excellent": len(excellent),
  116. "good": len(good),
  117. "poor": len(poor),
  118. },
  119. })
  120. # 洞察2:优质内容特征
  121. if excellent:
  122. excellent_features = self._extract_common_features(excellent)
  123. insights.append({
  124. "type": "excellent_features",
  125. "message": "优质内容的共同特征",
  126. "data": excellent_features,
  127. })
  128. # 洞察3:需要避免的特征
  129. if poor:
  130. poor_features = self._extract_common_features(poor)
  131. insights.append({
  132. "type": "poor_features",
  133. "message": "低质内容的共同特征(需避免)",
  134. "data": poor_features,
  135. })
  136. return {"insights": insights}
  137. def _extract_common_features(
  138. self,
  139. feedbacks: List[Dict[str, Any]],
  140. ) -> Dict[str, Any]:
  141. """提取共同特征"""
  142. all_keywords = []
  143. all_tags = []
  144. all_platforms = []
  145. for feedback in feedbacks:
  146. features = feedback.get("content_features", {})
  147. all_keywords.extend(features.get("keywords", []))
  148. all_tags.extend(features.get("tags", []))
  149. if "platform" in features:
  150. all_platforms.append(features["platform"])
  151. return {
  152. "top_keywords": [k for k, _ in Counter(all_keywords).most_common(5)],
  153. "top_tags": [t for t, _ in Counter(all_tags).most_common(5)],
  154. "top_platforms": [p for p, _ in Counter(all_platforms).most_common(3)],
  155. }