daily_rank_manager.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. from typing import Optional
  2. class DailyRankManagerUtils:
  3. """排序日志管理"""
  4. # 默认权重配置
  5. DEFAULT_WEIGHTS = {
  6. "account_user_category_score": 0.2,
  7. "category_score": 0.2,
  8. "flow_ctl_decrease_score": 1.0,
  9. "i2i_recommend_score": 1.0,
  10. "view_count_rate_score": 1.0,
  11. "view_count_rate_v2_score": 1.0,
  12. "his_fission_open_rate_score": 1.0,
  13. "crawler_days_decrease_score": 1.0,
  14. "crawler_days_decrease_v2_score": 1.0,
  15. "similarity_score": 0.0,
  16. "publish_times_score": 0.0,
  17. }
  18. @staticmethod
  19. def _build_where(
  20. rank_date: Optional[str] = None,
  21. gh_id: Optional[str] = None,
  22. account_name: Optional[str] = None,
  23. content_pool_type: Optional[str] = None,
  24. strategy: Optional[str] = None,
  25. ):
  26. """构建 WHERE 条件"""
  27. conds, params = [], []
  28. if rank_date:
  29. conds.append("rank_date = %s")
  30. params.append(rank_date)
  31. if gh_id:
  32. conds.append("gh_id = %s")
  33. params.append(gh_id)
  34. if account_name:
  35. conds.append("account_name = %s")
  36. params.append(account_name)
  37. if content_pool_type:
  38. conds.append("content_pool_type = %s")
  39. params.append(content_pool_type)
  40. if strategy:
  41. conds.append("strategy = %s")
  42. params.append(strategy)
  43. where_clause = " AND ".join(conds) if conds else "1=1"
  44. return where_clause, params
  45. @staticmethod
  46. def _calculate_new_score(row: dict, weights: dict) -> float:
  47. """根据权重计算新分数"""
  48. new_score = 0.0
  49. for field, weight in weights.items():
  50. value = row.get(field)
  51. if value is not None:
  52. try:
  53. new_score += float(value) * float(weight)
  54. except (TypeError, ValueError):
  55. pass
  56. return new_score
  57. class DailyRankManager(DailyRankManagerUtils):
  58. """日榜数据管理服务"""
  59. def __init__(self, pool, data: dict):
  60. self.pool = pool
  61. self.data = data
  62. async def query_daily_rank(self):
  63. """查询日榜数据(不分页,返回筛选条件下的全量数据)"""
  64. sort_by = self.data.get("sort_by", "new_score")
  65. sort_dir = self.data.get("sort_dir", "desc").lower()
  66. # 过滤条件
  67. rank_date = self.data.get("rank_date")
  68. gh_id = self.data.get("gh_id")
  69. account_name = self.data.get("account_name")
  70. content_pool_type = self.data.get("content_pool_type")
  71. strategy = self.data.get("strategy")
  72. # 权重配置
  73. weights = self.data.get("weights") or self.DEFAULT_WEIGHTS
  74. # 构建 WHERE 子句
  75. where_clause, params = self._build_where(
  76. rank_date, gh_id, account_name, content_pool_type, strategy
  77. )
  78. # 查询全量数据
  79. sql_list = f"""
  80. SELECT
  81. id, rank_date, account_name, gh_id, source_id,
  82. content_pool_type, strategy, title, score, category,
  83. account_user_category_score, category_score,
  84. flow_ctl_decrease_score, i2i_recommend_score,
  85. view_count_rate_score, view_count_rate_v2_score, his_fission_open_rate_score,
  86. crawler_days_decrease_score, crawler_days_decrease_v2_score, similarity_score,
  87. publish_times_score, source_log,
  88. create_timestamp, update_timestamp
  89. FROM long_articles_daily_rank
  90. WHERE {where_clause}
  91. ORDER BY score DESC
  92. LIMIT 1000
  93. """
  94. rows = await self.pool.async_fetch(query=sql_list, params=tuple(params))
  95. # 计算新分数并添加到结果中
  96. items = []
  97. for r in rows:
  98. new_score = self._calculate_new_score(r, weights)
  99. items.append({**r, "new_score": new_score})
  100. # 排序(在内存中排序,因为 new_score 是计算字段)
  101. sort_whitelist = {
  102. "id",
  103. "rank_date",
  104. "score",
  105. "new_score",
  106. "account_user_category_score",
  107. "category_score",
  108. "flow_ctl_decrease_score",
  109. "i2i_recommend_score",
  110. "view_count_rate_score",
  111. "view_count_rate_v2_score",
  112. "his_fission_open_rate_score",
  113. "crawler_days_decrease_score",
  114. "crawler_days_decrease_v2_score",
  115. "similarity_score",
  116. "publish_times_score",
  117. }
  118. if sort_by in sort_whitelist:
  119. reverse = sort_dir == "desc"
  120. items.sort(
  121. key=lambda x: (
  122. x.get(sort_by) if x.get(sort_by) is not None else float("-inf")
  123. ),
  124. reverse=reverse,
  125. )
  126. return {
  127. "total": len(items),
  128. "items": items,
  129. "weights": weights,
  130. }