Explorar el Código

Inner Decode Tasks

luojunhui hace 4 semanas
padre
commit
5d80837bda

+ 2 - 0
.gitignore

@@ -62,3 +62,5 @@ docs/_build/
 # PyBuilder
 target/
 
+.cursor
+

+ 1 - 0
app/domains/monitor_tasks/rank_log_monitor/__init__.py

@@ -0,0 +1 @@
+from app.domains.monitor_tasks.rank_log_monitor.entrance import RankLogMonitor

+ 48 - 0
app/domains/monitor_tasks/rank_log_monitor/_const.py

@@ -0,0 +1,48 @@
+class RankLogMonitorConst:
+
+    DETAIL_COLUMN_MAP = {
+        "PublishTimesStrategy": "publish_times_score",
+        "AccountUserCategoryStrategy": "account_user_category_score",
+        "CategoryStrategy": "category_score",
+        "FlowCtlDecreaseStrategy": "flow_ctl_decrease_score",
+        "I2IRecommendStrategy": "i2i_recommend_score",
+        "ViewCountRateV2Strategy": "view_count_rate_score",
+        "HisFissionOpenRateStrategy": "his_fission_open_rate_score",
+        "CrawlerDaysDecreaseStrategy": "crawler_days_decrease_score",
+        "SimilarityStrategy": "similarity_score",
+    }
+
+    CREATE_LONG_ARTICLES_DAILY_RANK_TABLE_SQL = """
+    CREATE TABLE IF NOT EXISTS long_articles_daily_rank
+    (
+        id                          BIGINT AUTO_INCREMENT COMMENT '主键ID'
+            PRIMARY KEY,
+        rank_date                   DATE         NOT NULL COMMENT '榜单日期',
+        account_name                VARCHAR(128) NULL COMMENT '账号名称',
+        gh_id                       VARCHAR(64)  NOT NULL COMMENT '公众号ID',
+        content_pool_type           VARCHAR(64)  NULL COMMENT '内容池类型',
+        strategy                    VARCHAR(64)  NULL COMMENT '排序策略',
+        title                       VARCHAR(512) NULL COMMENT '标题',
+        score                       DOUBLE       NULL COMMENT '最终得分',
+        category                    VARCHAR(128) NULL COMMENT '一级品类',
+        account_user_category_score DOUBLE       NULL COMMENT '人群相关分',
+        category_score              DOUBLE       NULL COMMENT '文章相关分',
+        flow_ctl_decrease_score     DOUBLE       NULL COMMENT '流量控制分',
+        i2i_recommend_score         DOUBLE       NULL COMMENT 'i2i推荐分',
+        view_count_rate_score       DOUBLE       NULL COMMENT '阅读分',
+        his_fission_open_rate_score DOUBLE       NULL COMMENT '打开裂变分',
+        crawler_days_decrease_score DOUBLE       NULL COMMENT '阅读降权分',
+        similarity_score            DOUBLE       NULL COMMENT '相关性分(old)',
+        source_log                  TEXT         NULL COMMENT '原始日志行',
+        create_timestamp            BIGINT       NOT NULL COMMENT '创建时间戳',
+        update_timestamp            BIGINT       NOT NULL COMMENT '更新时间戳'
+    )
+        COMMENT '长文日榜解析结果表';
+    """
+
+    CREATE_LONG_ARTICLES_DAILY_RANK_INDEXES_SQL = [
+        "CREATE INDEX idx_ladr_rank_date ON long_articles_daily_rank (rank_date);",
+        "CREATE INDEX idx_ladr_gh_id ON long_articles_daily_rank (gh_id);",
+        "CREATE INDEX idx_ladr_strategy ON long_articles_daily_rank (strategy);",
+        "CREATE INDEX idx_ladr_create_timestamp ON long_articles_daily_rank (create_timestamp);",
+    ]

+ 78 - 0
app/domains/monitor_tasks/rank_log_monitor/_mapper.py

@@ -0,0 +1,78 @@
+import time
+
+from app.core.database import DatabaseManager
+from app.core.observability import LogService
+
+
+class RankLogMonitorMapper:
+    def __init__(self, pool: DatabaseManager, log_service: LogService):
+        self.pool = pool
+        self.log_service = log_service
+
+    async def fetch_accounts(self):
+        query = """
+            SELECT DISTINCT gh_id FROM rank_content_score;
+        """
+        response = await self.pool.async_fetch(query=query)
+        return [
+            i.get("gh_id") for i in response
+        ]
+
+    async def fetch_articles(self, gh_id: str):
+        query = """
+            SELECT gh_id, account_name, content_pool_type, strategy, title, score, score_map, category
+            FROM rank_content_score
+            WHERE gh_id = %s
+            ;
+        """
+        return await self.pool.async_fetch(query=query, params=(gh_id,))
+
+
+    async def save_records(self, rank_date: str, records: list[dict]) -> int:
+        now_ts = int(time.time())
+        params = []
+        for row in records:
+            params.append(
+                (
+                    rank_date,
+                    row.get("account_name"),
+                    row.get("gh_id"),
+                    row.get("content_pool_type"),
+                    row.get("strategy"),
+                    row.get("title"),
+                    row.get("score"),
+                    row.get("category"),
+                    row.get("account_user_category_score"),
+                    row.get("category_score"),
+                    row.get("flow_ctl_decrease_score"),
+                    row.get("i2i_recommend_score"),
+                    row.get("view_count_rate_score"),
+                    row.get("his_fission_open_rate_score"),
+                    row.get("crawler_days_decrease_score"),
+                    row.get("similarity_score"),
+                    row.get("source_log"),
+                    now_ts,
+                    now_ts,
+                )
+            )
+
+        if not params:
+            return 0
+        INSERT_LONG_ARTICLES_DAILY_RANK_SQL = """
+                INSERT INTO long_articles_daily_rank
+                (
+                    rank_date, account_name, gh_id, content_pool_type, strategy, title, score, category,
+                    account_user_category_score, category_score, flow_ctl_decrease_score, i2i_recommend_score,
+                    view_count_rate_score, his_fission_open_rate_score, crawler_days_decrease_score, similarity_score,
+                    source_log, create_timestamp, update_timestamp
+                )
+                VALUES
+                (
+                    %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s
+                );
+            """
+        return await self.pool.async_save(
+            query=INSERT_LONG_ARTICLES_DAILY_RANK_SQL,
+            params=params,
+            batch=True,
+        )

+ 83 - 0
app/domains/monitor_tasks/rank_log_monitor/_utils.py

@@ -0,0 +1,83 @@
+import csv
+import json
+import re
+from collections import defaultdict
+from pathlib import Path
+from typing import Any
+
+from ._const import RankLogMonitorConst
+
+
+class RankLogMonitorUtils(RankLogMonitorConst):
+
+    @staticmethod
+    def safe_float(raw: Any) -> float | None:
+        if raw in ("", None):
+            return None
+        try:
+            return float(raw)
+        except (TypeError, ValueError):
+            return None
+
+    def normalize_record(self, record: dict[str, Any]) -> dict[str, Any] | None:
+        gh_id = record.get("gh_id")
+        if not gh_id:
+            return None
+
+        return {
+            "account_name": record.get("account_name"),
+            "gh_id": gh_id,
+            "content_pool_type": record.get("content_pool_type"),
+            "strategy": record.get("strategy"),
+            "title": record.get("title"),
+            "score": self.safe_float(record.get("score")),
+            "category": record.get("category"),
+            "account_user_category_score": self.safe_float(record.get("account_user_category_score")),
+            "category_score": self.safe_float(record.get("category_score")),
+            "flow_ctl_decrease_score": self.safe_float(record.get("flow_ctl_decrease_score")),
+            "i2i_recommend_score": self.safe_float(record.get("i2i_recommend_score")),
+            "view_count_rate_score": self.safe_float(record.get("view_count_rate_score")),
+            "his_fission_open_rate_score": self.safe_float(record.get("his_fission_open_rate_score")),
+            "crawler_days_decrease_score": self.safe_float(record.get("crawler_days_decrease_score")),
+            "similarity_score": self.safe_float(record.get("similarity_score"))
+        }
+
+    def parse_text_log_line(self, line: str) -> dict[str, Any] | None:
+        line = line.strip()
+        if not line:
+            return None
+
+        if line.startswith("{") and line.endswith("}"):
+            try:
+                obj = json.loads(line)
+                score_map = obj.get("score_map") or {}
+                for detail_key, target_key in self.DETAIL_COLUMN_MAP.items():
+                    obj[target_key] = score_map.get(detail_key)
+                return self.normalize_record(obj)
+            except json.JSONDecodeError:
+                pass
+
+        kv_pairs = re.findall(r"([a-zA-Z_]+)=([^|]+)", line)
+        if not kv_pairs:
+            return None
+        kv = {k.strip(): v.strip() for k, v in kv_pairs}
+        return self.normalize_record(kv)
+
+    def parse_text_log(self, path: Path) -> list[dict[str, Any]]:
+        rows: list[dict[str, Any]] = []
+        with path.open("r", encoding="utf-8") as fp:
+            for line in fp:
+                parsed = self.parse_text_log_line(line)
+                if parsed:
+                    rows.append(parsed)
+        return rows
+
+    def parse_score_map(self, score_map: str):
+        score_map_json = json.loads(score_map)
+        L = {}
+        for key in score_map_json:
+            if self.DETAIL_COLUMN_MAP.get(key) is not None:
+                L[self.DETAIL_COLUMN_MAP[key]] = score_map_json[key]
+
+        return L
+

+ 60 - 0
app/domains/monitor_tasks/rank_log_monitor/entrance.py

@@ -0,0 +1,60 @@
+import asyncio
+import asyncio
+from datetime import date
+
+from tqdm import tqdm
+
+from app.core.database import DatabaseManager
+from app.core.observability import LogService
+
+from ._utils import RankLogMonitorUtils
+from ._mapper import RankLogMonitorMapper
+
+CONCURRENCY = 10
+
+
+class RankLogMonitor:
+    def __init__(self, pool: DatabaseManager, log_service: LogService):
+        self.pool = pool
+        self.log_service = log_service
+        self.mapper = RankLogMonitorMapper(self.pool, self.log_service)
+        self.tool = RankLogMonitorUtils()
+
+    async def save_single_account(self, gh_id: str, rank_date: str) -> int:
+        articles = await self.mapper.fetch_articles(gh_id=gh_id)
+        records = []
+        for article in articles:
+            score_detail = self.tool.parse_score_map(article["score_map"])
+            record = {
+                "account_name": article.get("account_name"),
+                "gh_id": article.get("gh_id"),
+                "content_pool_type": article.get("content_pool_type"),
+                "strategy": article.get("strategy"),
+                "title": article.get("title"),
+                "score": article.get("score"),
+                "category": article.get("category"),
+                "source_log": article.get("score_map"),
+                **score_detail,
+            }
+            records.append(record)
+
+        if records:
+            return await self.mapper.save_records(rank_date=rank_date, records=records)
+        return 0
+
+    async def deal(self):
+        rank_date = date.today().isoformat()
+        accounts = await self.mapper.fetch_accounts()
+        semaphore = asyncio.Semaphore(CONCURRENCY)
+        pbar = tqdm(total=len(accounts))
+
+        async def _worker(gh_id: str) -> int:
+            async with semaphore:
+                result = await self.save_single_account(gh_id, rank_date)
+                pbar.update(1)
+                return result
+
+        results = await asyncio.gather(*[_worker(acc) for acc in accounts])
+        pbar.close()
+        return sum(results)
+