Jelajahi Sumber

save-daily-rank-log

luojunhui 3 minggu lalu
induk
melakukan
14b18ba2e7

+ 22 - 1
app/domains/monitor_tasks/rank_log_monitor/_const.py

@@ -12,6 +12,18 @@ class RankLogMonitorConst:
         "SimilarityStrategy": "similarity_score",
     }
 
+    SCORE_COLUMNS = [
+        "account_user_category_score",
+        "category_score",
+        "flow_ctl_decrease_score",
+        "i2i_recommend_score",
+        "view_count_rate_score",
+        "his_fission_open_rate_score",
+        "crawler_days_decrease_score",
+        "similarity_score",
+        "publish_times_score",
+    ]
+
     CREATE_LONG_ARTICLES_DAILY_RANK_TABLE_SQL = """
     CREATE TABLE IF NOT EXISTS long_articles_daily_rank
     (
@@ -20,6 +32,7 @@ class RankLogMonitorConst:
         rank_date                   DATE         NOT NULL COMMENT '榜单日期',
         account_name                VARCHAR(128) NULL COMMENT '账号名称',
         gh_id                       VARCHAR(64)  NOT NULL COMMENT '公众号ID',
+        source_id                   VARCHAR(64)  NULL COMMENT '源内容ID',
         content_pool_type           VARCHAR(64)  NULL COMMENT '内容池类型',
         strategy                    VARCHAR(64)  NULL COMMENT '排序策略',
         title                       VARCHAR(512) NULL COMMENT '标题',
@@ -33,13 +46,21 @@ class RankLogMonitorConst:
         his_fission_open_rate_score DOUBLE       NULL COMMENT '打开裂变分',
         crawler_days_decrease_score DOUBLE       NULL COMMENT '阅读降权分',
         similarity_score            DOUBLE       NULL COMMENT '相关性分(old)',
+        publish_times_score         DOUBLE       NULL COMMENT '发布次数分',
         source_log                  TEXT         NULL COMMENT '原始日志行',
         create_timestamp            BIGINT       NOT NULL COMMENT '创建时间戳',
-        update_timestamp            BIGINT       NOT NULL COMMENT '更新时间戳'
+        update_timestamp            BIGINT       NOT NULL COMMENT '更新时间戳',
+        UNIQUE KEY uk_date_ghid_sourceid_strategy (rank_date, gh_id, source_id, strategy)
     )
         COMMENT '长文日榜解析结果表';
     """
 
+    ALTER_TABLE_SQL = [
+        "ALTER TABLE long_articles_daily_rank ADD COLUMN source_id VARCHAR(64) NULL COMMENT '源内容ID' AFTER gh_id;",
+        "ALTER TABLE long_articles_daily_rank ADD COLUMN publish_times_score DOUBLE NULL COMMENT '发布次数分' AFTER similarity_score;",
+        "ALTER TABLE long_articles_daily_rank ADD UNIQUE KEY uk_date_ghid_sourceid_strategy (rank_date, gh_id, source_id, strategy);",
+    ]
+
     CREATE_LONG_ARTICLES_DAILY_RANK_INDEXES_SQL = [
         "CREATE INDEX idx_ladr_rank_date ON long_articles_daily_rank (rank_date);",
         "CREATE INDEX idx_ladr_gh_id ON long_articles_daily_rank (gh_id);",

+ 55 - 28
app/domains/monitor_tasks/rank_log_monitor/_mapper.py

@@ -9,26 +9,29 @@ class RankLogMonitorMapper:
         self.pool = pool
         self.log_service = log_service
 
-    async def fetch_accounts(self):
+    async def fetch_all_articles(self) -> list[dict]:
         query = """
-            SELECT DISTINCT gh_id FROM rank_content_score;
+            SELECT gh_id, account_name, source_id, content_pool_type, strategy,
+                   title, score, score_map, category
+            FROM rank_content_score;
         """
-        response = await self.pool.async_fetch(query=query)
-        return [
-            i.get("gh_id") for i in response
-        ]
+        result = await self.pool.async_fetch(query=query)
+        return result or []
 
-    async def fetch_articles(self, gh_id: str):
+    async def fetch_articles(self, gh_id: str) -> list[dict]:
         query = """
-            SELECT gh_id, account_name, content_pool_type, strategy, title, score, score_map, category
+            SELECT gh_id, account_name, source_id, content_pool_type, strategy,
+                   title, score, score_map, category
             FROM rank_content_score
-            WHERE gh_id = %s
-            ;
+            WHERE gh_id = %s;
         """
-        return await self.pool.async_fetch(query=query, params=(gh_id,))
-
+        result = await self.pool.async_fetch(query=query, params=(gh_id,))
+        return result or []
 
     async def save_records(self, rank_date: str, records: list[dict]) -> int:
+        if not records:
+            return 0
+
         now_ts = int(time.time())
         params = []
         for row in records:
@@ -37,6 +40,7 @@ class RankLogMonitorMapper:
                     rank_date,
                     row.get("account_name"),
                     row.get("gh_id"),
+                    row.get("source_id"),
                     row.get("content_pool_type"),
                     row.get("strategy"),
                     row.get("title"),
@@ -50,29 +54,52 @@ class RankLogMonitorMapper:
                     row.get("his_fission_open_rate_score"),
                     row.get("crawler_days_decrease_score"),
                     row.get("similarity_score"),
+                    row.get("publish_times_score"),
                     row.get("source_log"),
                     now_ts,
                     now_ts,
                 )
             )
 
-        if not params:
-            return 0
-        INSERT_LONG_ARTICLES_DAILY_RANK_SQL = """
-                INSERT INTO long_articles_daily_rank
-                (
-                    rank_date, account_name, gh_id, content_pool_type, strategy, title, score, category,
-                    account_user_category_score, category_score, flow_ctl_decrease_score, i2i_recommend_score,
-                    view_count_rate_score, his_fission_open_rate_score, crawler_days_decrease_score, similarity_score,
-                    source_log, create_timestamp, update_timestamp
-                )
-                VALUES
-                (
-                    %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s
-                );
-            """
+        sql = """
+            INSERT INTO long_articles_daily_rank
+            (
+                rank_date, account_name, gh_id, source_id, content_pool_type, strategy,
+                title, score, category,
+                account_user_category_score, category_score, flow_ctl_decrease_score,
+                i2i_recommend_score, view_count_rate_score, his_fission_open_rate_score,
+                crawler_days_decrease_score, similarity_score, publish_times_score,
+                source_log, create_timestamp, update_timestamp
+            )
+            VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
+            ON DUPLICATE KEY UPDATE
+                score = VALUES(score),
+                title = VALUES(title),
+                category = VALUES(category),
+                account_user_category_score = VALUES(account_user_category_score),
+                category_score = VALUES(category_score),
+                flow_ctl_decrease_score = VALUES(flow_ctl_decrease_score),
+                i2i_recommend_score = VALUES(i2i_recommend_score),
+                view_count_rate_score = VALUES(view_count_rate_score),
+                his_fission_open_rate_score = VALUES(his_fission_open_rate_score),
+                crawler_days_decrease_score = VALUES(crawler_days_decrease_score),
+                similarity_score = VALUES(similarity_score),
+                publish_times_score = VALUES(publish_times_score),
+                source_log = VALUES(source_log),
+                update_timestamp = VALUES(update_timestamp);
+        """
         return await self.pool.async_save(
-            query=INSERT_LONG_ARTICLES_DAILY_RANK_SQL,
+            query=sql,
             params=params,
             batch=True,
         )
+
+    async def count_records(self, rank_date: str) -> int:
+        query = """
+            SELECT COUNT(*) AS cnt FROM long_articles_daily_rank
+            WHERE rank_date = %s;
+        """
+        result = await self.pool.async_fetch(query=query, params=(rank_date,))
+        if result:
+            return result[0].get("cnt", 0)
+        return 0

+ 19 - 52
app/domains/monitor_tasks/rank_log_monitor/_utils.py

@@ -1,8 +1,4 @@
-import csv
 import json
-import re
-from collections import defaultdict
-from pathlib import Path
 from typing import Any
 
 from ._const import RankLogMonitorConst
@@ -24,60 +20,31 @@ class RankLogMonitorUtils(RankLogMonitorConst):
         if not gh_id:
             return None
 
-        return {
+        normalized = {
             "account_name": record.get("account_name"),
             "gh_id": gh_id,
+            "source_id": record.get("source_id"),
             "content_pool_type": record.get("content_pool_type"),
             "strategy": record.get("strategy"),
             "title": record.get("title"),
             "score": self.safe_float(record.get("score")),
             "category": record.get("category"),
-            "account_user_category_score": self.safe_float(record.get("account_user_category_score")),
-            "category_score": self.safe_float(record.get("category_score")),
-            "flow_ctl_decrease_score": self.safe_float(record.get("flow_ctl_decrease_score")),
-            "i2i_recommend_score": self.safe_float(record.get("i2i_recommend_score")),
-            "view_count_rate_score": self.safe_float(record.get("view_count_rate_score")),
-            "his_fission_open_rate_score": self.safe_float(record.get("his_fission_open_rate_score")),
-            "crawler_days_decrease_score": self.safe_float(record.get("crawler_days_decrease_score")),
-            "similarity_score": self.safe_float(record.get("similarity_score"))
+            "source_log": record.get("source_log"),
         }
+        for col in self.SCORE_COLUMNS:
+            normalized[col] = self.safe_float(record.get(col))
+        return normalized
 
-    def parse_text_log_line(self, line: str) -> dict[str, Any] | None:
-        line = line.strip()
-        if not line:
-            return None
-
-        if line.startswith("{") and line.endswith("}"):
-            try:
-                obj = json.loads(line)
-                score_map = obj.get("score_map") or {}
-                for detail_key, target_key in self.DETAIL_COLUMN_MAP.items():
-                    obj[target_key] = score_map.get(detail_key)
-                return self.normalize_record(obj)
-            except json.JSONDecodeError:
-                pass
-
-        kv_pairs = re.findall(r"([a-zA-Z_]+)=([^|]+)", line)
-        if not kv_pairs:
-            return None
-        kv = {k.strip(): v.strip() for k, v in kv_pairs}
-        return self.normalize_record(kv)
-
-    def parse_text_log(self, path: Path) -> list[dict[str, Any]]:
-        rows: list[dict[str, Any]] = []
-        with path.open("r", encoding="utf-8") as fp:
-            for line in fp:
-                parsed = self.parse_text_log_line(line)
-                if parsed:
-                    rows.append(parsed)
-        return rows
-
-    def parse_score_map(self, score_map: str):
-        score_map_json = json.loads(score_map)
-        L = {}
-        for key in score_map_json:
-            if self.DETAIL_COLUMN_MAP.get(key) is not None:
-                L[self.DETAIL_COLUMN_MAP[key]] = score_map_json[key]
-
-        return L
-
+    def parse_score_map(self, score_map: str | None) -> dict[str, Any]:
+        if not score_map:
+            return {}
+        try:
+            score_map_json = json.loads(score_map)
+        except (json.JSONDecodeError, TypeError):
+            return {}
+
+        result = {}
+        for key, column in self.DETAIL_COLUMN_MAP.items():
+            if key in score_map_json:
+                result[column] = score_map_json[key]
+        return result

+ 2 - 3
app/domains/monitor_tasks/rank_log_monitor/entrance.py

@@ -1,5 +1,4 @@
 import asyncio
-import asyncio
 from datetime import date
 
 from tqdm import tqdm
@@ -24,10 +23,11 @@ class RankLogMonitor:
         articles = await self.mapper.fetch_articles(gh_id=gh_id)
         records = []
         for article in articles:
-            score_detail = self.tool.parse_score_map(article["score_map"])
+            score_detail = self.tool.parse_score_map(article.get("score_map"))
             record = {
                 "account_name": article.get("account_name"),
                 "gh_id": article.get("gh_id"),
+                "source_id": article.get("source_id"),
                 "content_pool_type": article.get("content_pool_type"),
                 "strategy": article.get("strategy"),
                 "title": article.get("title"),
@@ -57,4 +57,3 @@ class RankLogMonitor:
         results = await asyncio.gather(*[_worker(acc) for acc in accounts])
         pbar.close()
         return sum(results)
-