luojunhui 6 дней назад
Родитель
Сommit
b34dca7a0e

+ 2 - 0
app/api/service/__init__.py

@@ -3,6 +3,7 @@
 # 前端交互
 from .task_manager_service import TaskManager
 from .gzh_cookie_manager import GzhCookieManager
+from .daily_rank_manager import DailyRankManager
 
 # 任务调度器
 from .task_scheduler import TaskScheduler
@@ -11,5 +12,6 @@ from .task_scheduler import TaskScheduler
 __all__ = [
     "TaskManager",
     "GzhCookieManager",
+    "DailyRankManager",
     "TaskScheduler",
 ]

+ 150 - 0
app/api/service/daily_rank_manager.py

@@ -0,0 +1,150 @@
+from typing import Optional
+
+
+class DailyRankManagerUtils:
+    """日榜管理工具类"""
+
+    # 默认权重配置
+    DEFAULT_WEIGHTS = {
+        "account_user_category_score": 1.0,
+        "category_score": 1.0,
+        "flow_ctl_decrease_score": 1.0,
+        "i2i_recommend_score": 1.0,
+        "view_count_rate_score": 1.0,
+        "his_fission_open_rate_score": 1.0,
+        "crawler_days_decrease_score": 1.0,
+        "crawler_days_decrease_v2_score": 1.0,
+        "similarity_score": 1.0,
+        "publish_times_score": 1.0,
+    }
+
+    @staticmethod
+    def _build_where(
+        rank_date: Optional[str] = None,
+        gh_id: Optional[str] = None,
+        account_name: Optional[str] = None,
+        content_pool_type: Optional[str] = None,
+        strategy: Optional[str] = None,
+    ):
+        """构建 WHERE 条件"""
+        conds, params = [], []
+
+        if rank_date:
+            conds.append("rank_date = %s")
+            params.append(rank_date)
+
+        if gh_id:
+            conds.append("gh_id = %s")
+            params.append(gh_id)
+
+        if account_name:
+            conds.append("account_name = %s")
+            params.append(account_name)
+
+        if content_pool_type:
+            conds.append("content_pool_type = %s")
+            params.append(content_pool_type)
+
+        if strategy:
+            conds.append("strategy = %s")
+            params.append(strategy)
+
+        where_clause = " AND ".join(conds) if conds else "1=1"
+        return where_clause, params
+
+    @staticmethod
+    def _calculate_new_score(row: dict, weights: dict) -> float:
+        """根据权重计算新分数"""
+        new_score = 0.0
+        for field, weight in weights.items():
+            value = row.get(field)
+            if value is not None:
+                try:
+                    new_score += float(value) * float(weight)
+                except (TypeError, ValueError):
+                    pass
+        return new_score
+
+
+class DailyRankManager(DailyRankManagerUtils):
+    """日榜数据管理服务"""
+
+    def __init__(self, pool, data: dict):
+        self.pool = pool
+        self.data = data
+
+    async def query_daily_rank(self):
+        """查询日榜数据(不分页,返回筛选条件下的全量数据)"""
+        sort_by = self.data.get("sort_by", "new_score")
+        sort_dir = self.data.get("sort_dir", "desc").lower()
+
+        # 过滤条件
+        rank_date = self.data.get("rank_date")
+        gh_id = self.data.get("gh_id")
+        account_name = self.data.get("account_name")
+        content_pool_type = self.data.get("content_pool_type")
+        strategy = self.data.get("strategy")
+
+        # 权重配置
+        weights = self.data.get("weights") or self.DEFAULT_WEIGHTS
+
+        # 构建 WHERE 子句
+        where_clause, params = self._build_where(
+            rank_date, gh_id, account_name, content_pool_type, strategy
+        )
+
+        # 查询全量数据
+        sql_list = f"""
+            SELECT
+                id, rank_date, account_name, gh_id, source_id,
+                content_pool_type, strategy, title, score, category,
+                account_user_category_score, category_score,
+                flow_ctl_decrease_score, i2i_recommend_score,
+                view_count_rate_score, his_fission_open_rate_score,
+                crawler_days_decrease_score, crawler_days_decrease_v2_score, similarity_score,
+                publish_times_score, source_log,
+                create_timestamp, update_timestamp
+            FROM long_articles_daily_rank
+            WHERE {where_clause}
+            ORDER BY score DESC
+            LIMIT 1000
+        """
+        rows = await self.pool.async_fetch(query=sql_list, params=tuple(params))
+
+        # 计算新分数并添加到结果中
+        items = []
+        for r in rows:
+            new_score = self._calculate_new_score(r, weights)
+            items.append({**r, "new_score": new_score})
+
+        # 排序(在内存中排序,因为 new_score 是计算字段)
+        sort_whitelist = {
+            "id",
+            "rank_date",
+            "score",
+            "new_score",
+            "account_user_category_score",
+            "category_score",
+            "flow_ctl_decrease_score",
+            "i2i_recommend_score",
+            "view_count_rate_score",
+            "his_fission_open_rate_score",
+            "crawler_days_decrease_score",
+            "crawler_days_decrease_v2_score",
+            "similarity_score",
+            "publish_times_score",
+        }
+        if sort_by in sort_whitelist:
+            reverse = sort_dir == "desc"
+            items.sort(
+                key=lambda x: (
+                    x.get(sort_by) if x.get(sort_by) is not None else float("-inf")
+                ),
+                reverse=reverse,
+            )
+
+        return {
+            "total": len(items),
+            "items": items,
+            "weights": weights,
+        }

+ 2 - 0
app/api/v1/endpoints/__init__.py

@@ -3,6 +3,7 @@ from .health import create_health_bp
 from .tasks import create_tasks_bp
 from .tokens import create_tokens_bp
 from .monitor import create_monitor_bp
+from .rank_log import create_rank_log_bp
 # from .mcp import create_mcp_bp
 
 __all__ = [
@@ -11,5 +12,6 @@ __all__ = [
     "create_tasks_bp",
     "create_tokens_bp",
     "create_monitor_bp",
+    "create_rank_log_bp",
     # "create_mcp_bp",
 ]

+ 27 - 0
app/api/v1/endpoints/rank_log.py

@@ -0,0 +1,27 @@
+from __future__ import annotations
+
+from pydantic import ValidationError
+from quart import Blueprint, jsonify
+
+from app.api.service import DailyRankManager
+from app.api.v1.utils import ApiDependencies, DailyRankQueryRequest
+from app.api.v1.utils import parse_json, validation_error_response
+
+
+def create_rank_log_bp(deps: ApiDependencies) -> Blueprint:
+    bp = Blueprint("rank_log", __name__)
+
+    @bp.route("/daily_rank", methods=["POST"])
+    async def query_daily_rank():
+        """查询日榜数据"""
+        try:
+            _, body = await parse_json(DailyRankQueryRequest)
+        except ValidationError as e:
+            payload, status = validation_error_response(e)
+            return jsonify(payload), status
+
+        manager = DailyRankManager(pool=deps.db, data=body)
+        result = await manager.query_daily_rank()
+        return jsonify(result)
+
+    return bp

+ 2 - 0
app/api/v1/routes/routes.py

@@ -9,6 +9,7 @@ from app.api.v1.endpoints import (
     create_tasks_bp,
     create_tokens_bp,
     create_monitor_bp,
+    create_rank_log_bp,
     # create_mcp_bp
 )
 from app.core.config import GlobalConfigSettings
@@ -33,6 +34,7 @@ def register_v1_blueprints(deps: ApiDependencies) -> Blueprint:
     api.register_blueprint(create_tokens_bp(deps))
     api.register_blueprint(create_abtest_bp(deps))
     api.register_blueprint(create_monitor_bp(deps))
+    api.register_blueprint(create_rank_log_bp(deps))
     # api.register_blueprint(create_mcp_bp(deps))
 
     return api

+ 2 - 0
app/api/v1/utils/__init__.py

@@ -6,6 +6,7 @@ from .schemas import (
     SaveTokenRequest,
     GetCoverRequest,
     LongArticlesMcpRequest,
+    DailyRankQueryRequest,
 )
 
 
@@ -17,5 +18,6 @@ __all__ = [
     "SaveTokenRequest",
     "GetCoverRequest",
     "LongArticlesMcpRequest",
+    "DailyRankQueryRequest",
     "ApiDependencies",
 ]

+ 19 - 0
app/api/v1/utils/schemas.py

@@ -50,3 +50,22 @@ class LongArticlesMcpRequest(BaseRequest):
     sort_order: Optional[Literal["asc", "desc"]] = Field(
         default=None, description="排序方向: asc / desc"
     )
+
+
+class DailyRankQueryRequest(BaseRequest):
+    """日榜数据查询请求"""
+
+    rank_date: Optional[str] = Field(default=None, description="榜单日期 YYYY-MM-DD")
+    gh_id: Optional[str] = Field(default=None, description="公众号ID")
+    account_name: Optional[str] = Field(default=None, description="账号名称")
+    content_pool_type: Optional[str] = Field(default=None, description="内容池类型")
+    strategy: Optional[str] = Field(default=None, description="排序策略")
+
+    # 权重配置(前端可调整)
+    weights: Optional[Dict[str, float]] = Field(
+        default=None,
+        description="分数权重配置,key为字段名,value为权重值",
+    )
+
+    sort_by: str = Field(default="new_score", description="排序字段")
+    sort_dir: str = Field(default="desc", description="排序方向")

+ 4 - 0
app/domains/monitor_tasks/rank_log_monitor/_const.py

@@ -9,6 +9,7 @@ class RankLogMonitorConst:
         "ViewCountRateV2Strategy": "view_count_rate_score",
         "HisFissionOpenRateStrategy": "his_fission_open_rate_score",
         "CrawlerDaysDecreaseStrategy": "crawler_days_decrease_score",
+        "CrawlerDaysDecreaseV2Strategy": "crawler_days_decrease_v2_score",
         "SimilarityStrategy": "similarity_score",
     }
 
@@ -20,6 +21,7 @@ class RankLogMonitorConst:
         "view_count_rate_score",
         "his_fission_open_rate_score",
         "crawler_days_decrease_score",
+        "crawler_days_decrease_v2_score",
         "similarity_score",
         "publish_times_score",
     ]
@@ -45,6 +47,7 @@ class RankLogMonitorConst:
         view_count_rate_score       DOUBLE       NULL COMMENT '阅读分',
         his_fission_open_rate_score DOUBLE       NULL COMMENT '打开裂变分',
         crawler_days_decrease_score DOUBLE       NULL COMMENT '阅读降权分',
+        crawler_days_decrease_v2_score DOUBLE   NULL COMMENT '阅读降权分V2',
         similarity_score            DOUBLE       NULL COMMENT '相关性分(old)',
         publish_times_score         DOUBLE       NULL COMMENT '发布次数分',
         source_log                  TEXT         NULL COMMENT '原始日志行',
@@ -58,6 +61,7 @@ class RankLogMonitorConst:
     ALTER_TABLE_SQL = [
         "ALTER TABLE long_articles_daily_rank ADD COLUMN source_id VARCHAR(64) NULL COMMENT '源内容ID' AFTER gh_id;",
         "ALTER TABLE long_articles_daily_rank ADD COLUMN publish_times_score DOUBLE NULL COMMENT '发布次数分' AFTER similarity_score;",
+        "ALTER TABLE long_articles_daily_rank ADD COLUMN crawler_days_decrease_v2_score DOUBLE NULL COMMENT '阅读降权分V2' AFTER crawler_days_decrease_score;",
         "ALTER TABLE long_articles_daily_rank ADD UNIQUE KEY uk_date_ghid_sourceid_strategy (rank_date, gh_id, source_id, strategy);",
     ]
 

+ 4 - 2
app/domains/monitor_tasks/rank_log_monitor/_mapper.py

@@ -60,6 +60,7 @@ class RankLogMonitorMapper:
                     row.get("view_count_rate_score"),
                     row.get("his_fission_open_rate_score"),
                     row.get("crawler_days_decrease_score"),
+                    row.get("crawler_days_decrease_v2_score"),
                     row.get("similarity_score"),
                     row.get("publish_times_score"),
                     row.get("source_log"),
@@ -75,10 +76,10 @@ class RankLogMonitorMapper:
                 title, score, category,
                 account_user_category_score, category_score, flow_ctl_decrease_score,
                 i2i_recommend_score, view_count_rate_score, his_fission_open_rate_score,
-                crawler_days_decrease_score, similarity_score, publish_times_score,
+                crawler_days_decrease_score, crawler_days_decrease_v2_score, similarity_score, publish_times_score,
                 source_log, create_timestamp, update_timestamp
             )
-            VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
+            VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
             ON DUPLICATE KEY UPDATE
                 score = VALUES(score),
                 title = VALUES(title),
@@ -90,6 +91,7 @@ class RankLogMonitorMapper:
                 view_count_rate_score = VALUES(view_count_rate_score),
                 his_fission_open_rate_score = VALUES(his_fission_open_rate_score),
                 crawler_days_decrease_score = VALUES(crawler_days_decrease_score),
+                crawler_days_decrease_v2_score = VALUES(crawler_days_decrease_v2_score),
                 similarity_score = VALUES(similarity_score),
                 publish_times_score = VALUES(publish_times_score),
                 source_log = VALUES(source_log),

+ 9 - 2
app/domains/monitor_tasks/rank_log_monitor/_utils.py

@@ -35,7 +35,7 @@ class RankLogMonitorUtils(RankLogMonitorConst):
             normalized[col] = self.safe_float(record.get(col))
         return normalized
 
-    def parse_score_map(self, score_map: str | None) -> dict[str, Any]:
+    def parse_score_map(self, score_map: str | None, content_pool_type: str | None = None) -> dict[str, Any]:
         if not score_map:
             return {}
         try:
@@ -46,5 +46,12 @@ class RankLogMonitorUtils(RankLogMonitorConst):
         result = {}
         for key, column in self.DETAIL_COLUMN_MAP.items():
             if key in score_map_json:
-                result[column] = score_map_json[key]
+                value = score_map_json[key]
+                if (
+                    key == "HisFissionOpenRateStrategy"
+                    and value == 0
+                    and content_pool_type == "autoArticlePoolLevel1"
+                ):
+                    value = 0.32
+                result[column] = value
         return result

+ 4 - 1
app/domains/monitor_tasks/rank_log_monitor/entrance.py

@@ -23,7 +23,10 @@ class RankLogMonitor:
         articles = await self.mapper.fetch_articles(gh_id=gh_id)
         records = []
         for article in articles:
-            score_detail = self.tool.parse_score_map(article.get("score_map"))
+            score_detail = self.tool.parse_score_map(
+                article.get("score_map"),
+                content_pool_type=article.get("content_pool_type"),
+            )
             record = {
                 "account_name": article.get("account_name"),
                 "gh_id": article.get("gh_id"),