Przeglądaj źródła

Merge branch 'feature/luojunhui/20260408-save-daily-rank-log' of Server/LongArticleTaskServer into master

luojunhui 3 tygodni temu
rodzic
commit
f49333f174

+ 2 - 0
app/api/service/__init__.py

@@ -3,6 +3,7 @@
 # 前端交互
 from .task_manager_service import TaskManager
 from .gzh_cookie_manager import GzhCookieManager
+from .daily_rank_manager import DailyRankManager
 
 # 任务调度器
 from .task_scheduler import TaskScheduler
@@ -11,5 +12,6 @@ from .task_scheduler import TaskScheduler
 __all__ = [
     "TaskManager",
     "GzhCookieManager",
+    "DailyRankManager",
     "TaskScheduler",
 ]

+ 150 - 0
app/api/service/daily_rank_manager.py

@@ -0,0 +1,150 @@
+from typing import Optional
+
+
+class DailyRankManagerUtils:
+    """日榜管理工具类"""
+
+    # 默认权重配置
+    DEFAULT_WEIGHTS = {
+        "account_user_category_score": 1.0,
+        "category_score": 1.0,
+        "flow_ctl_decrease_score": 1.0,
+        "i2i_recommend_score": 1.0,
+        "view_count_rate_score": 1.0,
+        "his_fission_open_rate_score": 1.0,
+        "crawler_days_decrease_score": 1.0,
+        "crawler_days_decrease_v2_score": 1.0,
+        "similarity_score": 1.0,
+        "publish_times_score": 1.0,
+    }
+
+    @staticmethod
+    def _build_where(
+        rank_date: Optional[str] = None,
+        gh_id: Optional[str] = None,
+        account_name: Optional[str] = None,
+        content_pool_type: Optional[str] = None,
+        strategy: Optional[str] = None,
+    ):
+        """构建 WHERE 条件"""
+        conds, params = [], []
+
+        if rank_date:
+            conds.append("rank_date = %s")
+            params.append(rank_date)
+
+        if gh_id:
+            conds.append("gh_id = %s")
+            params.append(gh_id)
+
+        if account_name:
+            conds.append("account_name = %s")
+            params.append(account_name)
+
+        if content_pool_type:
+            conds.append("content_pool_type = %s")
+            params.append(content_pool_type)
+
+        if strategy:
+            conds.append("strategy = %s")
+            params.append(strategy)
+
+        where_clause = " AND ".join(conds) if conds else "1=1"
+        return where_clause, params
+
+    @staticmethod
+    def _calculate_new_score(row: dict, weights: dict) -> float:
+        """根据权重计算新分数"""
+        new_score = 0.0
+        for field, weight in weights.items():
+            value = row.get(field)
+            if value is not None:
+                try:
+                    new_score += float(value) * float(weight)
+                except (TypeError, ValueError):
+                    pass
+        return new_score
+
+
+class DailyRankManager(DailyRankManagerUtils):
+    """日榜数据管理服务"""
+
+    def __init__(self, pool, data: dict):
+        self.pool = pool
+        self.data = data
+
+    async def query_daily_rank(self):
+        """查询日榜数据(不分页,返回筛选条件下的全量数据)"""
+        sort_by = self.data.get("sort_by", "new_score")
+        sort_dir = self.data.get("sort_dir", "desc").lower()
+
+        # 过滤条件
+        rank_date = self.data.get("rank_date")
+        gh_id = self.data.get("gh_id")
+        account_name = self.data.get("account_name")
+        content_pool_type = self.data.get("content_pool_type")
+        strategy = self.data.get("strategy")
+
+        # 权重配置
+        weights = self.data.get("weights") or self.DEFAULT_WEIGHTS
+
+        # 构建 WHERE 子句
+        where_clause, params = self._build_where(
+            rank_date, gh_id, account_name, content_pool_type, strategy
+        )
+
+        # 查询全量数据
+        sql_list = f"""
+            SELECT
+                id, rank_date, account_name, gh_id, source_id,
+                content_pool_type, strategy, title, score, category,
+                account_user_category_score, category_score,
+                flow_ctl_decrease_score, i2i_recommend_score,
+                view_count_rate_score, his_fission_open_rate_score,
+                crawler_days_decrease_score, crawler_days_decrease_v2_score, similarity_score,
+                publish_times_score, source_log,
+                create_timestamp, update_timestamp
+            FROM long_articles_daily_rank
+            WHERE {where_clause}
+            ORDER BY score DESC
+            LIMIT 1000
+        """
+        rows = await self.pool.async_fetch(query=sql_list, params=tuple(params))
+
+        # 计算新分数并添加到结果中
+        items = []
+        for r in rows:
+            new_score = self._calculate_new_score(r, weights)
+            items.append({**r, "new_score": new_score})
+
+        # 排序(在内存中排序,因为 new_score 是计算字段)
+        sort_whitelist = {
+            "id",
+            "rank_date",
+            "score",
+            "new_score",
+            "account_user_category_score",
+            "category_score",
+            "flow_ctl_decrease_score",
+            "i2i_recommend_score",
+            "view_count_rate_score",
+            "his_fission_open_rate_score",
+            "crawler_days_decrease_score",
+            "crawler_days_decrease_v2_score",
+            "similarity_score",
+            "publish_times_score",
+        }
+        if sort_by in sort_whitelist:
+            reverse = sort_dir == "desc"
+            items.sort(
+                key=lambda x: (
+                    x.get(sort_by) if x.get(sort_by) is not None else float("-inf")
+                ),
+                reverse=reverse,
+            )
+
+        return {
+            "total": len(items),
+            "items": items,
+            "weights": weights,
+        }

+ 2 - 0
app/api/v1/endpoints/__init__.py

@@ -3,6 +3,7 @@ from .health import create_health_bp
 from .tasks import create_tasks_bp
 from .tokens import create_tokens_bp
 from .monitor import create_monitor_bp
+from .rank_log import create_rank_log_bp
 # from .mcp import create_mcp_bp
 
 __all__ = [
@@ -11,5 +12,6 @@ __all__ = [
     "create_tasks_bp",
     "create_tokens_bp",
     "create_monitor_bp",
+    "create_rank_log_bp",
     # "create_mcp_bp",
 ]

+ 27 - 0
app/api/v1/endpoints/rank_log.py

@@ -0,0 +1,27 @@
+from __future__ import annotations
+
+from pydantic import ValidationError
+from quart import Blueprint, jsonify
+
+from app.api.service import DailyRankManager
+from app.api.v1.utils import ApiDependencies, DailyRankQueryRequest
+from app.api.v1.utils import parse_json, validation_error_response
+
+
+def create_rank_log_bp(deps: ApiDependencies) -> Blueprint:
+    bp = Blueprint("rank_log", __name__)
+
+    @bp.route("/daily_rank", methods=["POST"])
+    async def query_daily_rank():
+        """查询日榜数据"""
+        try:
+            _, body = await parse_json(DailyRankQueryRequest)
+        except ValidationError as e:
+            payload, status = validation_error_response(e)
+            return jsonify(payload), status
+
+        manager = DailyRankManager(pool=deps.db, data=body)
+        result = await manager.query_daily_rank()
+        return jsonify(result)
+
+    return bp

+ 2 - 0
app/api/v1/routes/routes.py

@@ -9,6 +9,7 @@ from app.api.v1.endpoints import (
     create_tasks_bp,
     create_tokens_bp,
     create_monitor_bp,
+    create_rank_log_bp,
     # create_mcp_bp
 )
 from app.core.config import GlobalConfigSettings
@@ -33,6 +34,7 @@ def register_v1_blueprints(deps: ApiDependencies) -> Blueprint:
     api.register_blueprint(create_tokens_bp(deps))
     api.register_blueprint(create_abtest_bp(deps))
     api.register_blueprint(create_monitor_bp(deps))
+    api.register_blueprint(create_rank_log_bp(deps))
     # api.register_blueprint(create_mcp_bp(deps))
 
     return api

+ 2 - 0
app/api/v1/utils/__init__.py

@@ -6,6 +6,7 @@ from .schemas import (
     SaveTokenRequest,
     GetCoverRequest,
     LongArticlesMcpRequest,
+    DailyRankQueryRequest,
 )
 
 
@@ -17,5 +18,6 @@ __all__ = [
     "SaveTokenRequest",
     "GetCoverRequest",
     "LongArticlesMcpRequest",
+    "DailyRankQueryRequest",
     "ApiDependencies",
 ]

+ 19 - 0
app/api/v1/utils/schemas.py

@@ -50,3 +50,22 @@ class LongArticlesMcpRequest(BaseRequest):
     sort_order: Optional[Literal["asc", "desc"]] = Field(
         default=None, description="排序方向: asc / desc"
     )
+
+
+class DailyRankQueryRequest(BaseRequest):
+    """日榜数据查询请求"""
+
+    rank_date: Optional[str] = Field(default=None, description="榜单日期 YYYY-MM-DD")
+    gh_id: Optional[str] = Field(default=None, description="公众号ID")
+    account_name: Optional[str] = Field(default=None, description="账号名称")
+    content_pool_type: Optional[str] = Field(default=None, description="内容池类型")
+    strategy: Optional[str] = Field(default=None, description="排序策略")
+
+    # 权重配置(前端可调整)
+    weights: Optional[Dict[str, float]] = Field(
+        default=None,
+        description="分数权重配置,key为字段名,value为权重值",
+    )
+
+    sort_by: str = Field(default="new_score", description="排序字段")
+    sort_dir: str = Field(default="desc", description="排序方向")

+ 2 - 1
app/domains/monitor_tasks/__init__.py

@@ -10,7 +10,7 @@ from .auto_reply_cards_monitor import AutoReplyCardsMonitor
 from .cooperate_accounts_monitor import CooperateAccountsMonitorTask
 from .fwh_group_publish_monitor import FwhGroupPublishMonitor
 from .ad_platform_accounts_monitor import AdPlatformAccountsMonitorTask
-
+from .rank_log_monitor import RankLogMonitor
 
 __all__ = [
     "check_kimi_balance",
@@ -25,4 +25,5 @@ __all__ = [
     "CooperateAccountsMonitorTask",
     "FwhGroupPublishMonitor",
     "AdPlatformAccountsMonitorTask",
+    "RankLogMonitor",
 ]

+ 3 - 0
app/domains/monitor_tasks/rank_log_monitor/__init__.py

@@ -0,0 +1,3 @@
+from .entrance import RankLogMonitor
+
+__all__ = ['RankLogMonitor']

+ 73 - 0
app/domains/monitor_tasks/rank_log_monitor/_const.py

@@ -0,0 +1,73 @@
+class RankLogMonitorConst:
+
+    DETAIL_COLUMN_MAP = {
+        "PublishTimesStrategy": "publish_times_score",
+        "AccountUserCategoryStrategy": "account_user_category_score",
+        "CategoryStrategy": "category_score",
+        "FlowCtlDecreaseStrategy": "flow_ctl_decrease_score",
+        "I2IRecommendStrategy": "i2i_recommend_score",
+        "ViewCountRateV2Strategy": "view_count_rate_score",
+        "HisFissionOpenRateStrategy": "his_fission_open_rate_score",
+        "CrawlerDaysDecreaseStrategy": "crawler_days_decrease_score",
+        "CrawlerDaysDecreaseV2Strategy": "crawler_days_decrease_v2_score",
+        "SimilarityStrategy": "similarity_score",
+    }
+
+    SCORE_COLUMNS = [
+        "account_user_category_score",
+        "category_score",
+        "flow_ctl_decrease_score",
+        "i2i_recommend_score",
+        "view_count_rate_score",
+        "his_fission_open_rate_score",
+        "crawler_days_decrease_score",
+        "crawler_days_decrease_v2_score",
+        "similarity_score",
+        "publish_times_score",
+    ]
+
+    CREATE_LONG_ARTICLES_DAILY_RANK_TABLE_SQL = """
+    CREATE TABLE IF NOT EXISTS long_articles_daily_rank
+    (
+        id                          BIGINT AUTO_INCREMENT COMMENT '主键ID'
+            PRIMARY KEY,
+        rank_date                   DATE         NOT NULL COMMENT '榜单日期',
+        account_name                VARCHAR(128) NULL COMMENT '账号名称',
+        gh_id                       VARCHAR(64)  NOT NULL COMMENT '公众号ID',
+        source_id                   VARCHAR(64)  NULL COMMENT '源内容ID',
+        content_pool_type           VARCHAR(64)  NULL COMMENT '内容池类型',
+        strategy                    VARCHAR(64)  NULL COMMENT '排序策略',
+        title                       VARCHAR(512) NULL COMMENT '标题',
+        score                       DOUBLE       NULL COMMENT '最终得分',
+        category                    VARCHAR(128) NULL COMMENT '一级品类',
+        account_user_category_score DOUBLE       NULL COMMENT '人群相关分',
+        category_score              DOUBLE       NULL COMMENT '文章相关分',
+        flow_ctl_decrease_score     DOUBLE       NULL COMMENT '流量控制分',
+        i2i_recommend_score         DOUBLE       NULL COMMENT 'i2i推荐分',
+        view_count_rate_score       DOUBLE       NULL COMMENT '阅读分',
+        his_fission_open_rate_score DOUBLE       NULL COMMENT '打开裂变分',
+        crawler_days_decrease_score DOUBLE       NULL COMMENT '阅读降权分',
+        crawler_days_decrease_v2_score DOUBLE   NULL COMMENT '阅读降权分V2',
+        similarity_score            DOUBLE       NULL COMMENT '相关性分(old)',
+        publish_times_score         DOUBLE       NULL COMMENT '发布次数分',
+        source_log                  TEXT         NULL COMMENT '原始日志行',
+        create_timestamp            BIGINT       NOT NULL COMMENT '创建时间戳',
+        update_timestamp            BIGINT       NOT NULL COMMENT '更新时间戳',
+        UNIQUE KEY uk_date_ghid_sourceid_strategy (rank_date, gh_id, source_id, strategy)
+    )
+        COMMENT '长文日榜解析结果表';
+    """
+
+    ALTER_TABLE_SQL = [
+        "ALTER TABLE long_articles_daily_rank ADD COLUMN source_id VARCHAR(64) NULL COMMENT '源内容ID' AFTER gh_id;",
+        "ALTER TABLE long_articles_daily_rank ADD COLUMN publish_times_score DOUBLE NULL COMMENT '发布次数分' AFTER similarity_score;",
+        "ALTER TABLE long_articles_daily_rank ADD COLUMN crawler_days_decrease_v2_score DOUBLE NULL COMMENT '阅读降权分V2' AFTER crawler_days_decrease_score;",
+        "ALTER TABLE long_articles_daily_rank ADD UNIQUE KEY uk_date_ghid_sourceid_strategy (rank_date, gh_id, source_id, strategy);",
+    ]
+
+    CREATE_LONG_ARTICLES_DAILY_RANK_INDEXES_SQL = [
+        "CREATE INDEX idx_ladr_rank_date ON long_articles_daily_rank (rank_date);",
+        "CREATE INDEX idx_ladr_gh_id ON long_articles_daily_rank (gh_id);",
+        "CREATE INDEX idx_ladr_strategy ON long_articles_daily_rank (strategy);",
+        "CREATE INDEX idx_ladr_create_timestamp ON long_articles_daily_rank (create_timestamp);",
+    ]

+ 114 - 0
app/domains/monitor_tasks/rank_log_monitor/_mapper.py

@@ -0,0 +1,114 @@
+import time
+
+from app.core.database import DatabaseManager
+from app.core.observability import LogService
+
+
+class RankLogMonitorMapper:
+    def __init__(self, pool: DatabaseManager, log_service: LogService):
+        self.pool = pool
+        self.log_service = log_service
+
+    async def fetch_accounts(self) -> list[str]:
+        query = """
+            SELECT DISTINCT gh_id FROM rank_content_score;
+        """
+        response = await self.pool.async_fetch(query=query)
+        return [i.get("gh_id") for i in (response or [])]
+
+    async def fetch_all_articles(self) -> list[dict]:
+        query = """
+            SELECT gh_id, account_name, source_id, content_pool_type, strategy,
+                   title, score, score_map, category
+            FROM rank_content_score;
+        """
+        result = await self.pool.async_fetch(query=query)
+        return result or []
+
+    async def fetch_articles(self, gh_id: str) -> list[dict]:
+        query = """
+            SELECT gh_id, account_name, source_id, content_pool_type, strategy,
+                   title, score, score_map, category
+            FROM rank_content_score
+            WHERE gh_id = %s;
+        """
+        result = await self.pool.async_fetch(query=query, params=(gh_id,))
+        return result or []
+
+    async def save_records(self, rank_date: str, records: list[dict]) -> int:
+        if not records:
+            return 0
+
+        now_ts = int(time.time())
+        params = []
+        for row in records:
+            params.append(
+                (
+                    rank_date,
+                    row.get("account_name"),
+                    row.get("gh_id"),
+                    row.get("source_id"),
+                    row.get("content_pool_type"),
+                    row.get("strategy"),
+                    row.get("title"),
+                    row.get("score"),
+                    row.get("category"),
+                    row.get("account_user_category_score"),
+                    row.get("category_score"),
+                    row.get("flow_ctl_decrease_score"),
+                    row.get("i2i_recommend_score"),
+                    row.get("view_count_rate_score"),
+                    row.get("his_fission_open_rate_score"),
+                    row.get("crawler_days_decrease_score"),
+                    row.get("crawler_days_decrease_v2_score"),
+                    row.get("similarity_score"),
+                    row.get("publish_times_score"),
+                    row.get("source_log"),
+                    now_ts,
+                    now_ts,
+                )
+            )
+
+        sql = """
+            INSERT INTO long_articles_daily_rank
+            (
+                rank_date, account_name, gh_id, source_id, content_pool_type, strategy,
+                title, score, category,
+                account_user_category_score, category_score, flow_ctl_decrease_score,
+                i2i_recommend_score, view_count_rate_score, his_fission_open_rate_score,
+                crawler_days_decrease_score, crawler_days_decrease_v2_score, similarity_score, publish_times_score,
+                source_log, create_timestamp, update_timestamp
+            )
+            VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
+            ON DUPLICATE KEY UPDATE
+                score = VALUES(score),
+                title = VALUES(title),
+                category = VALUES(category),
+                account_user_category_score = VALUES(account_user_category_score),
+                category_score = VALUES(category_score),
+                flow_ctl_decrease_score = VALUES(flow_ctl_decrease_score),
+                i2i_recommend_score = VALUES(i2i_recommend_score),
+                view_count_rate_score = VALUES(view_count_rate_score),
+                his_fission_open_rate_score = VALUES(his_fission_open_rate_score),
+                crawler_days_decrease_score = VALUES(crawler_days_decrease_score),
+                crawler_days_decrease_v2_score = VALUES(crawler_days_decrease_v2_score),
+                similarity_score = VALUES(similarity_score),
+                publish_times_score = VALUES(publish_times_score),
+                source_log = VALUES(source_log),
+                update_timestamp = VALUES(update_timestamp);
+        """
+        return await self.pool.async_save(
+            query=sql,
+            params=params,
+            batch=True,
+        )
+
+    async def count_records(self, rank_date: str) -> int:
+        query = """
+            SELECT COUNT(*) AS cnt FROM long_articles_daily_rank
+            WHERE rank_date = %s;
+        """
+        result = await self.pool.async_fetch(query=query, params=(rank_date,))
+        if result:
+            return result[0].get("cnt", 0)
+        return 0

+ 57 - 0
app/domains/monitor_tasks/rank_log_monitor/_utils.py

@@ -0,0 +1,57 @@
+import json
+from typing import Any
+
+from ._const import RankLogMonitorConst
+
+
+class RankLogMonitorUtils(RankLogMonitorConst):
+
+    @staticmethod
+    def safe_float(raw: Any) -> float | None:
+        if raw in ("", None):
+            return None
+        try:
+            return float(raw)
+        except (TypeError, ValueError):
+            return None
+
+    def normalize_record(self, record: dict[str, Any]) -> dict[str, Any] | None:
+        gh_id = record.get("gh_id")
+        if not gh_id:
+            return None
+
+        normalized = {
+            "account_name": record.get("account_name"),
+            "gh_id": gh_id,
+            "source_id": record.get("source_id"),
+            "content_pool_type": record.get("content_pool_type"),
+            "strategy": record.get("strategy"),
+            "title": record.get("title"),
+            "score": self.safe_float(record.get("score")),
+            "category": record.get("category"),
+            "source_log": record.get("source_log"),
+        }
+        for col in self.SCORE_COLUMNS:
+            normalized[col] = self.safe_float(record.get(col))
+        return normalized
+
+    def parse_score_map(self, score_map: str | None, content_pool_type: str | None = None) -> dict[str, Any]:
+        if not score_map:
+            return {}
+        try:
+            score_map_json = json.loads(score_map)
+        except (json.JSONDecodeError, TypeError):
+            return {}
+
+        result = {}
+        for key, column in self.DETAIL_COLUMN_MAP.items():
+            if key in score_map_json:
+                value = score_map_json[key]
+                if (
+                    key == "HisFissionOpenRateStrategy"
+                    and value == 0
+                    and content_pool_type == "autoArticlePoolLevel1"
+                ):
+                    value = 0.32
+                result[column] = value
+        return result

+ 62 - 0
app/domains/monitor_tasks/rank_log_monitor/entrance.py

@@ -0,0 +1,62 @@
+import asyncio
+from datetime import date
+
+from tqdm import tqdm
+
+from app.core.database import DatabaseManager
+from app.core.observability import LogService
+
+from ._utils import RankLogMonitorUtils
+from ._mapper import RankLogMonitorMapper
+
+CONCURRENCY = 10
+
+
+class RankLogMonitor:
+    def __init__(self, pool: DatabaseManager, log_service: LogService):
+        self.pool = pool
+        self.log_service = log_service
+        self.mapper = RankLogMonitorMapper(self.pool, self.log_service)
+        self.tool = RankLogMonitorUtils()
+
+    async def save_single_account(self, gh_id: str, rank_date: str) -> int:
+        articles = await self.mapper.fetch_articles(gh_id=gh_id)
+        records = []
+        for article in articles:
+            score_detail = self.tool.parse_score_map(
+                article.get("score_map"),
+                content_pool_type=article.get("content_pool_type"),
+            )
+            record = {
+                "account_name": article.get("account_name"),
+                "gh_id": article.get("gh_id"),
+                "source_id": article.get("source_id"),
+                "content_pool_type": article.get("content_pool_type"),
+                "strategy": article.get("strategy"),
+                "title": article.get("title"),
+                "score": article.get("score"),
+                "category": article.get("category"),
+                "source_log": article.get("score_map"),
+                **score_detail,
+            }
+            records.append(record)
+
+        if records:
+            return await self.mapper.save_records(rank_date=rank_date, records=records)
+        return 0
+
+    async def deal(self):
+        rank_date = date.today().isoformat()
+        accounts = await self.mapper.fetch_accounts()
+        semaphore = asyncio.Semaphore(CONCURRENCY)
+        pbar = tqdm(total=len(accounts))
+
+        async def _worker(gh_id: str) -> int:
+            async with semaphore:
+                result = await self.save_single_account(gh_id, rank_date)
+                pbar.update(1)
+                return result
+
+        results = await asyncio.gather(*[_worker(acc) for acc in accounts])
+        pbar.close()
+        return sum(results)