소스 검색

限流文章扫描删除策略迭代

luojunhui 3 주 전
부모
커밋
326120aeff

+ 9 - 3
app/core/config/settings/read_rate_limited.py

@@ -12,10 +12,16 @@ class ReadRateLimited(BaseSettings):
     )
 
     # 认为满足限流阅读均值倍数阈值
-    read_on_avg_threshold: float = 0.2
+    read_on_avg_threshold: float = 0.25
 
     # 探索次数
     base_discover_time: int = 1
 
-    # 认为文章限流占比阈值
-    low_read_rate_threshold: float = 0.5
+    # 文章限流占比阈值
+    low_read_rate_threshold: float = 0.3
+
+    # 周期内最多限流篇数
+    MAX_RATE_LIMITED_ARTICLES: int = 2
+
+    # 持续限流天数
+    CONSIST_DAYS: int = 2

+ 0 - 114
app/domains/analysis_task/rate_limited_article_filter.py

@@ -1,114 +0,0 @@
-import asyncio
-import datetime
-import hashlib, json
-from typing import List, Dict
-from tqdm.asyncio import tqdm
-
-from app.core.config import GlobalConfigSettings
-from app.core.database import DatabaseManager
-from app.core.observability import LogService
-
-from app.infra.internal import delete_illegal_gzh_articles
-
-
-class RateLimitedArticleMapper:
-    def __init__(self, pool: DatabaseManager):
-        self.pool = pool
-
-    async def find_rate_limited_articles(
-        self,
-        days_duration: int,
-        read_on_avg_threshold: float = 0.2,
-        base_discover_time: int = 1,
-        low_read_rate_threshold: float = 0.5,
-    ) -> List[Dict]:
-        query = f"""
-            SELECT
-                title,
-                gh_id,
-                COUNT(*) AS publish_count,
-                CAST(
-                    SUM(CASE WHEN read_rate < {read_on_avg_threshold} THEN 1 ELSE 0 END) AS UNSIGNED
-                )
-                AS low_read_count
-            FROM datastat_sort_strategy
-            WHERE position = 1
-              AND account_type = '订阅号'
-              AND date_str < DATE_FORMAT(CURDATE(), '%Y%m%d')
-              AND date_str >= DATE_FORMAT(DATE_SUB(CURDATE(), INTERVAL {days_duration} DAY), '%Y%m%d')
-              AND date_str >= '20250501'
-            GROUP BY title
-            HAVING SUM(CASE WHEN read_rate < {read_on_avg_threshold} THEN 1 ELSE 0 END) / (COUNT(*) + {base_discover_time}) >= {low_read_rate_threshold}
-            ;
-        """
-        return await self.pool.async_fetch(query=query)
-
-    async def save_record(self, article_tuple):
-        query = """
-            INSERT IGNORE INTO rate_limited_title
-            (title_md5, title, remark)
-            VALUES
-            (%s, %s, %s)
-        """
-        return await self.pool.async_save(query=query, params=article_tuple)
-
-
-class RateLimitedArticleFilter(RateLimitedArticleMapper):
-    def __init__(
-        self,
-        pool: DatabaseManager,
-        config: GlobalConfigSettings,
-        # log_service: LogService,
-    ):
-        super().__init__(pool=pool)
-        self.config = config.read_rate_limit
-
-    async def _process_single_article(
-        self, data: Dict, days: int, semaphore: asyncio.Semaphore
-    ):
-        """处理单个文章的异步任务"""
-        async with semaphore:
-            gh_id = data["gh_id"]
-            title = data["title"]
-            title_md5 = hashlib.md5(title.encode("utf-8")).hexdigest()
-            remark = json.dumps(
-                {
-                    "发文数量": data["publish_count"],
-                    "限流数量": data["low_read_count"],
-                    "周期": days,
-                    "执行日期": datetime.datetime.today().strftime("%Y-%m-%d"),
-                },
-                ensure_ascii=False,
-            )
-            insert_rows = await self.save_record(
-                article_tuple=(title_md5, title, remark)
-            )
-            if insert_rows:
-                await delete_illegal_gzh_articles(gh_id=gh_id, title=title, delete_flag=2)
-            else:
-                print("该文章已经删过")
-
-    async def process_single_task(self, days: int, max_concurrent: int = 10):
-        """并发处理所有文章任务"""
-        data_list = await self.find_rate_limited_articles(
-            days_duration=days,
-            read_on_avg_threshold=self.config.read_on_avg_threshold,
-            base_discover_time=self.config.base_discover_time,
-            low_read_rate_threshold=self.config.low_read_rate_threshold,
-        )
-
-        # 创建信号量限制并发数
-        semaphore = asyncio.Semaphore(max_concurrent)
-
-        # 创建所有任务
-        tasks = [
-            self._process_single_article(data, days, semaphore) for data in data_list
-        ]
-
-        # 使用 tqdm 显示进度并发执行所有任务
-        for coro in tqdm.as_completed(tasks, total=len(tasks)):
-            await coro
-
-    async def deal(self):
-        for _day in self.config.stat_durations:
-            await self.process_single_task(_day)

+ 6 - 0
app/domains/analysis_task/rate_limited_article_filter/__init__.py

@@ -0,0 +1,6 @@
+from .entrance import RateLimitedArticleFilter
+
+
+__all__ = [
+    "RateLimitedArticleFilter",
+]

+ 37 - 0
app/domains/analysis_task/rate_limited_article_filter/_mapper.py

@@ -0,0 +1,37 @@
+from typing import List, Dict
+
+from app.core.database import DatabaseManager
+
+
+class RateLimitedArticleMapper:
+    def __init__(self, pool: DatabaseManager):
+        self.pool = pool
+
+    async def find_rate_limited_articles(
+        self,
+        days_duration: int,
+    ) -> List[Dict]:
+        query = f"""
+            SELECT
+                title,
+                gh_id,
+                date_str,
+                IFNULL(read_rate, 0) AS read_rate
+            FROM datastat_sort_strategy
+            WHERE position = 1
+              AND account_type = '订阅号'
+              AND date_str < DATE_FORMAT(CURDATE(), '%Y%m%d')
+              AND date_str >= DATE_FORMAT(DATE_SUB(CURDATE(), INTERVAL {days_duration} DAY), '%Y%m%d')
+              AND date_str >= '20250501'
+            ORDER BY gh_id, date_str;
+        """
+        return await self.pool.async_fetch(query=query)
+
+    async def save_record(self, article_tuple):
+        query = """
+            INSERT IGNORE INTO rate_limited_title
+            (title_md5, title, remark)
+            VALUES
+            (%s, %s, %s)
+        """
+        return await self.pool.async_save(query=query, params=article_tuple)

+ 125 - 0
app/domains/analysis_task/rate_limited_article_filter/_utils.py

@@ -0,0 +1,125 @@
+from collections import defaultdict
+from typing import Dict, List
+
+from app.core.observability import LogService
+
+
+class RateLimitedArticleUtils:
+    _LOG_TASK = "rate_limited_article_filter"
+
+    def __init__(self, log_service: LogService):
+        self._log_service = log_service
+
+    async def trace_log(self, contents: dict) -> None:
+        if not self._log_service:
+            return
+        payload = {"task": self._LOG_TASK, **contents}
+        await self._log_service.log(contents=payload)
+
+    @staticmethod
+    def _sort_records_by_date(records: List[Dict]) -> List[Dict]:
+        return sorted(records, key=lambda item: item["date_str"])
+
+    def filter_account_disabled_records(
+        self, records: List[Dict], read_on_avg_threshold: float, consist_days: int
+    ) -> List[Dict]:
+        """
+        账号头条连续 N 天低于阈值后,账号进入停用状态,停用期间文章不参与限流判定。
+        当 read_rate >= 阈值时立即恢复,当天文章重新参与判定。
+        """
+        account_records = defaultdict(list)
+        for row in records:
+            account_records[row["gh_id"]].append(row)
+
+        filtered_records: List[Dict] = []
+
+        for gh_records in account_records.values():
+            low_streak = 0
+            is_disabled = False
+
+            for row in self._sort_records_by_date(gh_records):
+                read_rate = float(row.get("read_rate") or 0)
+                is_low = read_rate < read_on_avg_threshold
+
+                if is_disabled:
+                    if is_low:
+                        continue
+                    is_disabled = False
+                    low_streak = 0
+                    filtered_records.append(row)
+                    continue
+
+                if is_low:
+                    low_streak += 1
+                    if low_streak >= consist_days:
+                        is_disabled = True
+                        continue
+                    filtered_records.append(row)
+                    continue
+
+                low_streak = 0
+                filtered_records.append(row)
+
+        return filtered_records
+
+    @staticmethod
+    def aggregate_rate_limited_titles(
+        records: List[Dict],
+        read_on_avg_threshold: float,
+        base_discover_time: int,
+        low_read_rate_threshold: float,
+        max_rate_limited_articles: int,
+    ) -> List[Dict]:
+        title_stats: Dict[str, Dict] = {}
+
+        for row in records:
+            title = row["title"]
+            read_rate = float(row.get("read_rate") or 0)
+
+            if title not in title_stats:
+                title_stats[title] = {
+                    "title": title,
+                    "publish_count": 0,
+                    "low_read_count": 0,
+                    "gh_ids": set(),
+                }
+
+            stat = title_stats[title]
+            stat["publish_count"] += 1
+            stat["gh_ids"].add(row["gh_id"])
+            if read_rate < read_on_avg_threshold:
+                stat["low_read_count"] += 1
+
+        results: List[Dict] = []
+        for stat in title_stats.values():
+            publish_count = stat["publish_count"]
+            low_read_count = stat["low_read_count"]
+            low_read_ratio = (
+                low_read_count / (publish_count + base_discover_time)
+                if publish_count + base_discover_time > 0
+                else 0
+            )
+
+            hit_by_ratio = low_read_ratio >= low_read_rate_threshold
+            hit_by_count = low_read_count >= max_rate_limited_articles
+            if not (hit_by_ratio or hit_by_count):
+                continue
+
+            trigger_rules = []
+            if hit_by_ratio:
+                trigger_rules.append("low_read_ratio")
+            if hit_by_count:
+                trigger_rules.append("low_read_count")
+
+            results.append(
+                {
+                    "title": stat["title"],
+                    "publish_count": publish_count,
+                    "low_read_count": low_read_count,
+                    "low_read_ratio": low_read_ratio,
+                    "gh_ids": sorted(stat["gh_ids"]),
+                    "trigger_rules": trigger_rules,
+                }
+            )
+
+        return results

+ 143 - 0
app/domains/analysis_task/rate_limited_article_filter/entrance.py

@@ -0,0 +1,143 @@
+import datetime
+import hashlib
+import json
+from typing import Dict
+
+from app.core.config import GlobalConfigSettings
+from app.core.database import DatabaseManager
+from app.core.observability import LogService
+from app.infra.internal import delete_illegal_gzh_articles
+from app.infra.shared import run_tasks_with_asyncio_task_group
+
+from ._mapper import RateLimitedArticleMapper
+from ._utils import RateLimitedArticleUtils
+
+
+class RateLimitedArticleFilter(RateLimitedArticleMapper):
+    RATE_LIMITED = 2
+
+    def __init__(
+        self,
+        pool: DatabaseManager,
+        config: GlobalConfigSettings,
+        log_service: LogService,
+    ):
+        super().__init__(pool=pool)
+        self.config = config.read_rate_limit
+        self.tool = RateLimitedArticleUtils(log_service=log_service)
+
+    async def _process_single_article(self, data: Dict):
+        """处理单个文章的异步任务"""
+        title = data["title"]
+        title_md5 = hashlib.md5(title.encode("utf-8")).hexdigest()
+        remark = json.dumps(
+            {
+                "发文数量": data["publish_count"],
+                "限流数量": data["low_read_count"],
+                "限流比例": data["low_read_ratio"],
+                "周期": data["days"],
+                "触发规则": data["trigger_rules"],
+                "执行日期": datetime.datetime.today().strftime("%Y-%m-%d"),
+            },
+            ensure_ascii=False,
+        )
+        try:
+            insert_rows = await self.save_record(
+                article_tuple=(title_md5, title, remark)
+            )
+            if insert_rows:
+                gh_id = data["gh_ids"][0]
+                await delete_illegal_gzh_articles(
+                    gh_id=gh_id, title=title, delete_flag=self.RATE_LIMITED
+                )
+                await self.tool.trace_log(
+                    {
+                        "event": "title_shielded",
+                        "title": title,
+                        "days": data["days"],
+                        "trigger_rules": data["trigger_rules"],
+                        "gh_id": gh_id,
+                        "low_read_count": data["low_read_count"],
+                        "publish_count": data["publish_count"],
+                    }
+                )
+        except Exception as e:
+            await self.tool.trace_log(
+                {
+                    "event": "process_single_article_failed",
+                    "title": title,
+                    "days": data.get("days"),
+                    "status": "error",
+                    "message": str(e),
+                }
+            )
+            raise
+
+    async def process_single_task(self, days: int, max_concurrent: int = 5):
+        """并发处理所有文章任务"""
+        await self.tool.trace_log(
+            {
+                "event": "period_start",
+                "days": days,
+            }
+        )
+        raw_records = await self.find_rate_limited_articles(
+            days_duration=days,
+        )
+        effective_records = self.tool.filter_account_disabled_records(
+            records=raw_records,
+            read_on_avg_threshold=self.config.read_on_avg_threshold,
+            consist_days=self.config.CONSIST_DAYS,
+        )
+
+        aggregated = self.tool.aggregate_rate_limited_titles(
+            records=effective_records,
+            read_on_avg_threshold=self.config.read_on_avg_threshold,
+            base_discover_time=self.config.base_discover_time,
+            low_read_rate_threshold=self.config.low_read_rate_threshold,
+            max_rate_limited_articles=self.config.MAX_RATE_LIMITED_ARTICLES,
+        )
+        data_list = [{**item, "days": days} for item in aggregated]
+
+        result = await run_tasks_with_asyncio_task_group(
+            task_list=data_list,
+            handler=self._process_single_article,
+            description="执行限流删文处理",
+            max_concurrency=max_concurrent,
+            unit="per_title",
+        )
+
+        await self.tool.trace_log(
+            {
+                "event": "period_complete",
+                "days": days,
+                "raw_row_count": len(raw_records),
+                "effective_row_count": len(effective_records),
+                "hit_title_count": len(data_list),
+                "total_task": result["total_task"],
+                "processed_task": result["processed_task"],
+                "error_count": len(result["errors"]),
+            }
+        )
+        for _idx, task_obj, err in result["errors"]:
+            await self.tool.trace_log(
+                {
+                    "event": "period_item_error",
+                    "days": days,
+                    "title": task_obj.get("title"),
+                    "status": "error",
+                    "message": str(err),
+                }
+            )
+
+    async def deal(self):
+        await self.tool.trace_log(
+            {
+                "event": "deal_start",
+                "stat_durations": list(self.config.stat_durations),
+            }
+        )
+        for _day in self.config.stat_durations:
+            await self.process_single_task(_day)
+
+        await self.tool.trace_log({"event": "deal_complete"})

+ 5 - 1
app/jobs/task_handler.py

@@ -429,7 +429,11 @@ class TaskHandler:
     @register("rate_limited_article_filter")
     async def _rate_limited_article_filter(self) -> int:
         """限流文章删除"""
-        task = RateLimitedArticleFilter(pool=self.db_client, config=self.config)
+        task = RateLimitedArticleFilter(
+            pool=self.db_client,
+            config=self.config,
+            log_service=self.log_client,
+        )
         await task.deal()
         return TaskStatus.SUCCESS