from typing import Optional class DailyRankManagerUtils: """排序日志管理""" # 默认权重配置 DEFAULT_WEIGHTS = { "account_user_category_score": 0.2, "category_score": 0.2, "flow_ctl_decrease_score": 1.0, "i2i_recommend_score": 1.0, "view_count_rate_score": 1.0, "view_count_rate_v2_score": 1.0, "his_fission_open_rate_score": 1.0, "crawler_days_decrease_score": 1.0, "crawler_days_decrease_v2_score": 1.0, "similarity_score": 0.0, "publish_times_score": 0.0, } @staticmethod def _build_where( rank_date: Optional[str] = None, gh_id: Optional[str] = None, account_name: Optional[str] = None, content_pool_type: Optional[str] = None, strategy: Optional[str] = None, ): """构建 WHERE 条件""" conds, params = [], [] if rank_date: conds.append("rank_date = %s") params.append(rank_date) if gh_id: conds.append("gh_id = %s") params.append(gh_id) if account_name: conds.append("account_name = %s") params.append(account_name) if content_pool_type: conds.append("content_pool_type = %s") params.append(content_pool_type) if strategy: conds.append("strategy = %s") params.append(strategy) where_clause = " AND ".join(conds) if conds else "1=1" return where_clause, params @staticmethod def _calculate_new_score(row: dict, weights: dict) -> float: """根据权重计算新分数""" new_score = 0.0 for field, weight in weights.items(): value = row.get(field) if value is not None: try: new_score += float(value) * float(weight) except (TypeError, ValueError): pass return new_score class DailyRankManager(DailyRankManagerUtils): """日榜数据管理服务""" def __init__(self, pool, data: dict): self.pool = pool self.data = data async def query_daily_rank(self): """查询日榜数据(不分页,返回筛选条件下的全量数据)""" sort_by = self.data.get("sort_by", "new_score") sort_dir = self.data.get("sort_dir", "desc").lower() # 过滤条件 rank_date = self.data.get("rank_date") gh_id = self.data.get("gh_id") account_name = self.data.get("account_name") content_pool_type = self.data.get("content_pool_type") strategy = self.data.get("strategy") # 权重配置 weights = self.data.get("weights") or self.DEFAULT_WEIGHTS # 构建 WHERE 子句 where_clause, params = self._build_where( rank_date, gh_id, account_name, content_pool_type, strategy ) # 查询全量数据 sql_list = f""" SELECT id, rank_date, account_name, gh_id, source_id, content_pool_type, strategy, title, score, category, account_user_category_score, category_score, flow_ctl_decrease_score, i2i_recommend_score, view_count_rate_score, view_count_rate_v2_score, his_fission_open_rate_score, crawler_days_decrease_score, crawler_days_decrease_v2_score, similarity_score, publish_times_score, source_log, create_timestamp, update_timestamp FROM long_articles_daily_rank WHERE {where_clause} ORDER BY score DESC LIMIT 1000 """ rows = await self.pool.async_fetch(query=sql_list, params=tuple(params)) # 计算新分数并添加到结果中 items = [] for r in rows: new_score = self._calculate_new_score(r, weights) items.append({**r, "new_score": new_score}) # 排序(在内存中排序,因为 new_score 是计算字段) sort_whitelist = { "id", "rank_date", "score", "new_score", "account_user_category_score", "category_score", "flow_ctl_decrease_score", "i2i_recommend_score", "view_count_rate_score", "view_count_rate_v2_score", "his_fission_open_rate_score", "crawler_days_decrease_score", "crawler_days_decrease_v2_score", "similarity_score", "publish_times_score", } if sort_by in sort_whitelist: reverse = sort_dir == "desc" items.sort( key=lambda x: ( x.get(sort_by) if x.get(sort_by) is not None else float("-inf") ), reverse=reverse, ) return { "total": len(items), "items": items, "weights": weights, }