luojunhui 3 недель назад
Родитель
Сommit
bd1d88e081

+ 1 - 0
app/core/config/global_settings.py

@@ -40,6 +40,7 @@ class GlobalConfigSettings(BaseSettings):
     task_chinese_name: TaskChineseNameConfig = Field(
     task_chinese_name: TaskChineseNameConfig = Field(
         default_factory=TaskChineseNameConfig
         default_factory=TaskChineseNameConfig
     )
     )
+    read_rate_limit: ReadRateLimited = Field(default_factory=ReadRateLimited)
 
 
     model_config = SettingsConfigDict(
     model_config = SettingsConfigDict(
         env_file=".env", env_file_encoding="utf-8", case_sensitive=False, extra="ignore"
         env_file=".env", env_file_encoding="utf-8", case_sensitive=False, extra="ignore"

+ 2 - 0
app/core/config/settings/__init__.py

@@ -9,6 +9,7 @@ from .mysql import GrowthDatabaseConfig
 from .mysql import LongArticlesDatabaseConfig
 from .mysql import LongArticlesDatabaseConfig
 from .mysql import LongVideoDatabaseConfig
 from .mysql import LongVideoDatabaseConfig
 from .mysql import PiaoquanCrawlerDatabaseConfig
 from .mysql import PiaoquanCrawlerDatabaseConfig
+from .read_rate_limited import ReadRateLimited
 from .task_chinese_name import TaskChineseNameConfig
 from .task_chinese_name import TaskChineseNameConfig
 
 
 
 
@@ -25,4 +26,5 @@ __ALL__ = [
     "LongVideoDatabaseConfig",
     "LongVideoDatabaseConfig",
     "PiaoquanCrawlerDatabaseConfig",
     "PiaoquanCrawlerDatabaseConfig",
     "TaskChineseNameConfig",
     "TaskChineseNameConfig",
+    "ReadRateLimited"
 ]
 ]

+ 23 - 0
app/core/config/settings/read_rate_limited.py

@@ -0,0 +1,23 @@
+from pydantic import Field
+from pydantic_settings import BaseSettings, SettingsConfigDict
+from typing import List
+
+
+class ReadRateLimited(BaseSettings):
+    """限流配置"""
+
+    # 统计周期
+    stat_durations: List[int] = Field(
+        default_factory=lambda: [
+           2, 10, 30, 60, 90, 120, 150, 365
+        ]
+    )
+
+    # 认为满足限流阅读均值倍数阈值
+    read_on_avg_threshold: float = 0.2
+
+    # 探索次数
+    base_discover_time: int = 1
+
+    # 认为文章限流占比阈值
+    low_read_rate_threshold: float = 0.5

+ 2 - 1
app/domains/analysis_task/__init__.py

@@ -2,11 +2,12 @@ from .crawler_detail import CrawlerDetailDeal
 from .account_position_info import AccountPositionReadRateAvg
 from .account_position_info import AccountPositionReadRateAvg
 from .account_position_info import AccountPositionReadAvg
 from .account_position_info import AccountPositionReadAvg
 from .account_position_info import AccountPositionOpenRateAvg
 from .account_position_info import AccountPositionOpenRateAvg
-
+from .rate_limited_article_filter import RateLimitedArticleFilter
 
 
 __all__ = [
 __all__ = [
     "CrawlerDetailDeal",
     "CrawlerDetailDeal",
     "AccountPositionReadRateAvg",
     "AccountPositionReadRateAvg",
     "AccountPositionReadAvg",
     "AccountPositionReadAvg",
     "AccountPositionOpenRateAvg",
     "AccountPositionOpenRateAvg",
+    "RateLimitedArticleFilter"
 ]
 ]

+ 110 - 0
app/domains/analysis_task/rate_limited_article_filter.py

@@ -0,0 +1,110 @@
+import asyncio
+import datetime
+import hashlib, json
+from typing import List, Dict
+from tqdm.asyncio import tqdm
+
+from app.core.config import GlobalConfigSettings
+from app.core.database import DatabaseManager
+from app.core.observability import LogService
+
+from app.infra.internal import delete_illegal_gzh_articles
+
+
+class RateLimitedArticleMapper:
+    def __init__(self, pool: DatabaseManager):
+        self.pool = pool
+
+    async def find_rate_limited_articles(
+        self,
+        days_duration: int,
+        read_on_avg_threshold: float = 0.2,
+        base_discover_time: int = 1,
+        low_read_rate_threshold: float = 0.5,
+    ) -> List[Dict]:
+        query = f"""
+            SELECT
+                title,
+                gh_id,
+                COUNT(*) AS publish_count,
+                CAST(
+                    SUM(CASE WHEN read_rate < {read_on_avg_threshold} THEN 1 ELSE 0 END) AS UNSIGNED
+                )
+                AS low_read_count
+            FROM datastat_sort_strategy
+            WHERE position = 1
+              AND account_type = '订阅号'
+              AND date_str >= DATE_FORMAT(DATE_SUB(CURDATE(), INTERVAL {days_duration} DAY), '%Y%m%d')
+              AND date_str >= '20250501'
+            GROUP BY title
+            HAVING SUM(CASE WHEN read_rate < {read_on_avg_threshold} THEN 1 ELSE 0 END) / (COUNT(*) + {base_discover_time}) >= {low_read_rate_threshold}
+            ;
+        """
+        return await self.pool.async_fetch(query=query)
+
+    async def save_record(self, article_tuple):
+        query = """
+            INSERT IGNORE INTO rate_limited_title
+            (title_md5, title, remark)
+            VALUES
+            (%s, %s, %s)
+        """
+        return await self.pool.async_save(query=query, params=article_tuple)
+
+
+class RateLimitedArticleFilter(RateLimitedArticleMapper):
+    def __init__(
+        self,
+        pool: DatabaseManager,
+        config: GlobalConfigSettings,
+        # log_service: LogService,
+    ):
+        super().__init__(pool=pool)
+        self.config = config.read_rate_limit
+
+    async def _process_single_article(self, data: Dict, days: int, semaphore: asyncio.Semaphore):
+        """处理单个文章的异步任务"""
+        async with semaphore:
+            gh_id = data["gh_id"]
+            title = data["title"]
+            title_md5 = hashlib.md5(title.encode("utf-8")).hexdigest()
+            remark = json.dumps(
+                {
+                    "发文数量": data["publish_count"],
+                    "限流数量": data["low_read_count"],
+                    "周期": days,
+                    "执行日期": datetime.datetime.today().strftime("%Y-%m-%d"),
+                },
+                ensure_ascii=False,
+            )
+            insert_rows = await self.save_record(article_tuple=(title_md5, title, remark))
+            if insert_rows:
+                await delete_illegal_gzh_articles(gh_id=gh_id, title=title)
+            else:
+                print("该文章已经删过")
+
+    async def process_single_task(self, days: int, max_concurrent: int = 10):
+        """并发处理所有文章任务"""
+        data_list = await self.find_rate_limited_articles(
+            days_duration=days,
+            read_on_avg_threshold=self.config.read_on_avg_threshold,
+            base_discover_time=self.config.base_discover_time,
+            low_read_rate_threshold=self.config.low_read_rate_threshold
+        )
+
+        # 创建信号量限制并发数
+        semaphore = asyncio.Semaphore(max_concurrent)
+
+        # 创建所有任务
+        tasks = [
+            self._process_single_article(data, days, semaphore)
+            for data in data_list
+        ]
+
+        # 使用 tqdm 显示进度并发执行所有任务
+        for coro in tqdm.as_completed(tasks, total=len(tasks)):
+            await coro
+
+    async def deal(self):
+        for _day in self.config.stat_durations:
+            await self.process_single_task(_day)

+ 10 - 0
app/jobs/task_handler.py

@@ -7,6 +7,7 @@ from app.domains.analysis_task import CrawlerDetailDeal
 from app.domains.analysis_task import AccountPositionReadRateAvg
 from app.domains.analysis_task import AccountPositionReadRateAvg
 from app.domains.analysis_task import AccountPositionReadAvg
 from app.domains.analysis_task import AccountPositionReadAvg
 from app.domains.analysis_task import AccountPositionOpenRateAvg
 from app.domains.analysis_task import AccountPositionOpenRateAvg
+from app.domains.analysis_task import RateLimitedArticleFilter
 
 
 from app.domains.algorithm_tasks import AccountCategoryAnalysis
 from app.domains.algorithm_tasks import AccountCategoryAnalysis
 
 
@@ -424,5 +425,14 @@ class TaskHandler:
         await task.deal(params=self.data)
         await task.deal(params=self.data)
         return TaskStatus.SUCCESS
         return TaskStatus.SUCCESS
 
 
+    @register("rate_limited_article_filter")
+    async def _rate_limited_article_filter(self) -> int:
+        """限流文章删除"""
+        task = RateLimitedArticleFilter(
+            pool=self.db_client, config=self.config
+        )
+        await task.deal()
+        return TaskStatus.SUCCESS
+
 
 
 __all__ = ["TaskHandler"]
 __all__ = ["TaskHandler"]