Преглед на файлове

新增-账号品类生成任务

luojunhui преди 1 месец
родител
ревизия
b430f819a7

+ 2 - 1
applications/tasks/crawler_tasks/__init__.py

@@ -1,3 +1,4 @@
 from .crawler_toutiao import CrawlerToutiao
+from .crawler_account_manager import WeixinAccountManager
 
-__all__ = ["CrawlerToutiao"]
+__all__ = ["CrawlerToutiao", "WeixinAccountManager"]

+ 132 - 0
applications/tasks/crawler_tasks/crawler_account_manager.py

@@ -0,0 +1,132 @@
+import time
+import math
+
+from typing import Optional, List, Dict
+from pandas import DataFrame
+from scipy import stats
+
+
+class CrawlerAccountManagerConst:
+    # 文章状态
+    ARTICLE_LOW_SIMILARITY_STATUS = 0
+    ARTICLE_INIT_STATUS = 1
+    ARTICLE_PUBLISHED_STATUS = 2
+
+    # 相似度阈值
+    SIMILARITY_THRESHOLD = 0.4
+
+    # 1天时间戳
+    ONE_DAY_TIMESTAMP = 86400
+    # 近30天时间戳
+    THIRTY_DAYS_TIMESTAMP = 30 * ONE_DAY_TIMESTAMP
+
+
+class CrawlerAccountManager(CrawlerAccountManagerConst):
+    def __init__(self, pool, aliyun_log, trace_id):
+        self.pool = pool
+        self.aliyun_log = aliyun_log
+        self.trace_id = trace_id
+
+    @staticmethod
+    def safe_float(x: float | None, default: float = 0.0) -> float:
+        """把 None / NaN / Inf 统一替换成指定默认值"""
+        return default if x is None or not math.isfinite(x) else float(x)
+
+    async def get_crawling_accounts(self, platform):
+        """获取抓取账号信息"""
+        pass
+
+
+class WeixinAccountManager(CrawlerAccountManager):
+
+    def __init__(self, pool, aliyun_log, trace_id):
+        super().__init__(pool, aliyun_log, trace_id)
+        self.pool = pool
+        self.aliyun_log = aliyun_log
+        self.trace_id = trace_id
+
+    async def get_account_crawler_articles_info(
+        self, account_id: str
+    ) -> Optional[DataFrame]:
+        """get articles and set as dataframe"""
+        query = f"""
+            select title, link, score, status, article_index, read_cnt, publish_time
+            from crawler_meta_article where out_account_id = %s;
+        """
+        response = await self.pool.async_fetch(query=query, params=(account_id,))
+        return DataFrame(
+            response,
+            columns=[
+                "title",
+                "link",
+                "score",
+                "status",
+                "article_index",
+                "read_cnt",
+                "publish_time",
+            ],
+        )
+
+    async def update_account_stat_detail(self, account_id: str, history_info: Dict, recently_info: Dict) -> int:
+        """更新账号统计详情"""
+        query = f"""
+            update long_articles_accounts 
+            set history_publish_frequency = %s, history_score_ci_lower = %s, 
+                recent_publish_frequency= %s, recent_score_ci_lower = %s
+            where gh_id = %s;
+        """
+        return await self.pool.async_save(
+            query=query, params=(
+                history_info["publish_frequency"],
+                history_info["ci_lower"],
+                recently_info["publish_frequency"],
+                recently_info["ci_lower"],
+                account_id,
+            )
+        )
+
+    def analysis_dataframe(self, dataframe: DataFrame) -> Optional[Dict]:
+        score_list = dataframe["score"].dropna()
+        n = len(score_list)
+        mean = score_list.mean() if n else 0.0
+        # 置信区间
+        if n < 2:
+            ci_lower, ci_upper = 0.0, 0.0
+        else:
+            sem = stats.sem(score_list)  # 可能返回 NaN
+            t_val = stats.t.ppf(0.975, df=n - 1)
+            margin = t_val * sem if math.isfinite(sem) else 0.0
+            ci_lower, ci_upper = mean - margin, mean + margin
+
+        # 计算发文频率
+        publish_times = dataframe["publish_time"].dropna()
+        if len(publish_times) >= 2:
+            delta = publish_times.max() - publish_times.min()
+            publish_frequency = (len(publish_times) / delta
+                                 * self.ONE_DAY_TIMESTAMP) if delta else 0.0
+        else:
+            publish_frequency = 0.0
+
+        return {
+        "publish_frequency": self.safe_float(publish_frequency),
+        "ci_lower":         self.safe_float(ci_lower),
+        "ci_upper":         self.safe_float(ci_upper),
+    }
+
+    async def analysis_single_account(self, account_id: str) -> None:
+        dataframe = await self.get_account_crawler_articles_info(account_id)
+        history_articles_analysis = self.analysis_dataframe(dataframe)
+        thirty_days_before = int(time.time()) - self.THIRTY_DAYS_TIMESTAMP
+        recent_30_days_df = dataframe[
+            dataframe["publish_time"] >= thirty_days_before
+            ]
+        recent_30_days_analysis = self.analysis_dataframe(recent_30_days_df)
+        await self.update_account_stat_detail(account_id, history_articles_analysis, recent_30_days_analysis)
+
+    async def deal(self, account_id_list: List[str]):
+        for account_id in account_id_list:
+            await self.analysis_single_account(account_id)
+
+
+
+

+ 18 - 0
applications/tasks/task_handler.py

@@ -1,7 +1,9 @@
 from datetime import datetime
+from unittest import case
 
 from applications.tasks.cold_start_tasks import ArticlePoolColdStart
 from applications.tasks.crawler_tasks import CrawlerToutiao
+from applications.tasks.crawler_tasks import WeixinAccountManager
 from applications.tasks.data_recycle_tasks import RecycleDailyPublishArticlesTask
 from applications.tasks.data_recycle_tasks import CheckDailyPublishArticlesTask
 from applications.tasks.data_recycle_tasks import UpdateRootSourceIdAndUpdateTimeTask
@@ -119,3 +121,19 @@ class TaskHandler(TaskMapper):
         limit_num = self.data.get("limit")
         await task.deal(limit=limit_num)
         return self.TASK_SUCCESS_STATUS
+
+    async def _crawler_account_manager_handler(self) -> int:
+        platform = self.data.get("platform", "weixin")
+        account_id_list = self.data.get("account_id_list")
+        if not account_id_list:
+            return self.TASK_FAILED_STATUS
+        match platform:
+            case "weixin":
+                task = WeixinAccountManager(
+                    self.db_client, self.log_client, self.trace_id
+                )
+            case _:
+                raise ValueError(f"Unsupported platform {platform}")
+
+        await task.deal(account_id_list=account_id_list)
+        return self.TASK_SUCCESS_STATUS

+ 2 - 0
applications/tasks/task_scheduler.py

@@ -181,6 +181,8 @@ class TaskScheduler(TaskHandler):
             "candidate_account_quality_analysis": self._candidate_account_quality_score_handler,
             # 文章内容池--标题品类处理
             "article_pool_category_generation": self._article_pool_category_generation_handler,
+            # 抓取账号管理
+            "crawler_account_manager": self._crawler_account_manager_handler
         }
 
         if task_name not in handlers: