daily_rank_manager.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150
  1. from typing import Optional
  2. class DailyRankManagerUtils:
  3. """日榜管理工具类"""
  4. # 默认权重配置
  5. DEFAULT_WEIGHTS = {
  6. "account_user_category_score": 1.0,
  7. "category_score": 1.0,
  8. "flow_ctl_decrease_score": 1.0,
  9. "i2i_recommend_score": 1.0,
  10. "view_count_rate_score": 1.0,
  11. "his_fission_open_rate_score": 1.0,
  12. "crawler_days_decrease_score": 1.0,
  13. "crawler_days_decrease_v2_score": 1.0,
  14. "similarity_score": 1.0,
  15. "publish_times_score": 1.0,
  16. }
  17. @staticmethod
  18. def _build_where(
  19. rank_date: Optional[str] = None,
  20. gh_id: Optional[str] = None,
  21. account_name: Optional[str] = None,
  22. content_pool_type: Optional[str] = None,
  23. strategy: Optional[str] = None,
  24. ):
  25. """构建 WHERE 条件"""
  26. conds, params = [], []
  27. if rank_date:
  28. conds.append("rank_date = %s")
  29. params.append(rank_date)
  30. if gh_id:
  31. conds.append("gh_id = %s")
  32. params.append(gh_id)
  33. if account_name:
  34. conds.append("account_name = %s")
  35. params.append(account_name)
  36. if content_pool_type:
  37. conds.append("content_pool_type = %s")
  38. params.append(content_pool_type)
  39. if strategy:
  40. conds.append("strategy = %s")
  41. params.append(strategy)
  42. where_clause = " AND ".join(conds) if conds else "1=1"
  43. return where_clause, params
  44. @staticmethod
  45. def _calculate_new_score(row: dict, weights: dict) -> float:
  46. """根据权重计算新分数"""
  47. new_score = 0.0
  48. for field, weight in weights.items():
  49. value = row.get(field)
  50. if value is not None:
  51. try:
  52. new_score += float(value) * float(weight)
  53. except (TypeError, ValueError):
  54. pass
  55. return new_score
  56. class DailyRankManager(DailyRankManagerUtils):
  57. """日榜数据管理服务"""
  58. def __init__(self, pool, data: dict):
  59. self.pool = pool
  60. self.data = data
  61. async def query_daily_rank(self):
  62. """查询日榜数据(不分页,返回筛选条件下的全量数据)"""
  63. sort_by = self.data.get("sort_by", "new_score")
  64. sort_dir = self.data.get("sort_dir", "desc").lower()
  65. # 过滤条件
  66. rank_date = self.data.get("rank_date")
  67. gh_id = self.data.get("gh_id")
  68. account_name = self.data.get("account_name")
  69. content_pool_type = self.data.get("content_pool_type")
  70. strategy = self.data.get("strategy")
  71. # 权重配置
  72. weights = self.data.get("weights") or self.DEFAULT_WEIGHTS
  73. # 构建 WHERE 子句
  74. where_clause, params = self._build_where(
  75. rank_date, gh_id, account_name, content_pool_type, strategy
  76. )
  77. # 查询全量数据
  78. sql_list = f"""
  79. SELECT
  80. id, rank_date, account_name, gh_id, source_id,
  81. content_pool_type, strategy, title, score, category,
  82. account_user_category_score, category_score,
  83. flow_ctl_decrease_score, i2i_recommend_score,
  84. view_count_rate_score, his_fission_open_rate_score,
  85. crawler_days_decrease_score, crawler_days_decrease_v2_score, similarity_score,
  86. publish_times_score, source_log,
  87. create_timestamp, update_timestamp
  88. FROM long_articles_daily_rank
  89. WHERE {where_clause}
  90. ORDER BY score DESC
  91. LIMIT 1000
  92. """
  93. rows = await self.pool.async_fetch(query=sql_list, params=tuple(params))
  94. # 计算新分数并添加到结果中
  95. items = []
  96. for r in rows:
  97. new_score = self._calculate_new_score(r, weights)
  98. items.append({**r, "new_score": new_score})
  99. # 排序(在内存中排序,因为 new_score 是计算字段)
  100. sort_whitelist = {
  101. "id",
  102. "rank_date",
  103. "score",
  104. "new_score",
  105. "account_user_category_score",
  106. "category_score",
  107. "flow_ctl_decrease_score",
  108. "i2i_recommend_score",
  109. "view_count_rate_score",
  110. "his_fission_open_rate_score",
  111. "crawler_days_decrease_score",
  112. "crawler_days_decrease_v2_score",
  113. "similarity_score",
  114. "publish_times_score",
  115. }
  116. if sort_by in sort_whitelist:
  117. reverse = sort_dir == "desc"
  118. items.sort(
  119. key=lambda x: (
  120. x.get(sort_by) if x.get(sort_by) is not None else float("-inf")
  121. ),
  122. reverse=reverse,
  123. )
  124. return {
  125. "total": len(items),
  126. "items": items,
  127. "weights": weights,
  128. }