| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150 |
- from typing import Optional
- class DailyRankManagerUtils:
- """日榜管理工具类"""
- # 默认权重配置
- DEFAULT_WEIGHTS = {
- "account_user_category_score": 1.0,
- "category_score": 1.0,
- "flow_ctl_decrease_score": 1.0,
- "i2i_recommend_score": 1.0,
- "view_count_rate_score": 1.0,
- "his_fission_open_rate_score": 1.0,
- "crawler_days_decrease_score": 1.0,
- "crawler_days_decrease_v2_score": 1.0,
- "similarity_score": 1.0,
- "publish_times_score": 1.0,
- }
- @staticmethod
- def _build_where(
- rank_date: Optional[str] = None,
- gh_id: Optional[str] = None,
- account_name: Optional[str] = None,
- content_pool_type: Optional[str] = None,
- strategy: Optional[str] = None,
- ):
- """构建 WHERE 条件"""
- conds, params = [], []
- if rank_date:
- conds.append("rank_date = %s")
- params.append(rank_date)
- if gh_id:
- conds.append("gh_id = %s")
- params.append(gh_id)
- if account_name:
- conds.append("account_name = %s")
- params.append(account_name)
- if content_pool_type:
- conds.append("content_pool_type = %s")
- params.append(content_pool_type)
- if strategy:
- conds.append("strategy = %s")
- params.append(strategy)
- where_clause = " AND ".join(conds) if conds else "1=1"
- return where_clause, params
- @staticmethod
- def _calculate_new_score(row: dict, weights: dict) -> float:
- """根据权重计算新分数"""
- new_score = 0.0
- for field, weight in weights.items():
- value = row.get(field)
- if value is not None:
- try:
- new_score += float(value) * float(weight)
- except (TypeError, ValueError):
- pass
- return new_score
- class DailyRankManager(DailyRankManagerUtils):
- """日榜数据管理服务"""
- def __init__(self, pool, data: dict):
- self.pool = pool
- self.data = data
- async def query_daily_rank(self):
- """查询日榜数据(不分页,返回筛选条件下的全量数据)"""
- sort_by = self.data.get("sort_by", "new_score")
- sort_dir = self.data.get("sort_dir", "desc").lower()
- # 过滤条件
- rank_date = self.data.get("rank_date")
- gh_id = self.data.get("gh_id")
- account_name = self.data.get("account_name")
- content_pool_type = self.data.get("content_pool_type")
- strategy = self.data.get("strategy")
- # 权重配置
- weights = self.data.get("weights") or self.DEFAULT_WEIGHTS
- # 构建 WHERE 子句
- where_clause, params = self._build_where(
- rank_date, gh_id, account_name, content_pool_type, strategy
- )
- # 查询全量数据
- sql_list = f"""
- SELECT
- id, rank_date, account_name, gh_id, source_id,
- content_pool_type, strategy, title, score, category,
- account_user_category_score, category_score,
- flow_ctl_decrease_score, i2i_recommend_score,
- view_count_rate_score, his_fission_open_rate_score,
- crawler_days_decrease_score, crawler_days_decrease_v2_score, similarity_score,
- publish_times_score, source_log,
- create_timestamp, update_timestamp
- FROM long_articles_daily_rank
- WHERE {where_clause}
- ORDER BY score DESC
- LIMIT 1000
- """
- rows = await self.pool.async_fetch(query=sql_list, params=tuple(params))
- # 计算新分数并添加到结果中
- items = []
- for r in rows:
- new_score = self._calculate_new_score(r, weights)
- items.append({**r, "new_score": new_score})
- # 排序(在内存中排序,因为 new_score 是计算字段)
- sort_whitelist = {
- "id",
- "rank_date",
- "score",
- "new_score",
- "account_user_category_score",
- "category_score",
- "flow_ctl_decrease_score",
- "i2i_recommend_score",
- "view_count_rate_score",
- "his_fission_open_rate_score",
- "crawler_days_decrease_score",
- "crawler_days_decrease_v2_score",
- "similarity_score",
- "publish_times_score",
- }
- if sort_by in sort_whitelist:
- reverse = sort_dir == "desc"
- items.sort(
- key=lambda x: (
- x.get(sort_by) if x.get(sort_by) is not None else float("-inf")
- ),
- reverse=reverse,
- )
- return {
- "total": len(items),
- "items": items,
- "weights": weights,
- }
|