Browse Source

Merge branch 'feature/luojunhui/2025-08-19-account-category-analysis-task' of Server/LongArticleTaskServer into master

luojunhui 2 weeks ago
parent
commit
efff558ab1

+ 6 - 1
applications/config/__init__.py

@@ -20,6 +20,9 @@ from .cold_start_config import category_config, input_source_map
 # name config
 from .task_chinese_name import name_map
 
+# article category model config
+from .category_config import CATEGORY_FEATURES, CATEGORY_MAP
+
 __all__ = [
     "aigc_db_config",
     "long_video_db_config",
@@ -33,5 +36,7 @@ __all__ = [
     "es_settings",
     "category_config",
     "input_source_map",
-    "name_map"
+    "name_map",
+    "CATEGORY_FEATURES",
+    "CATEGORY_MAP"
 ]

+ 36 - 0
applications/config/category_config.py

@@ -0,0 +1,36 @@
+CATEGORY_FEATURES = [
+    "CateSciencePop",
+    "CateMilitaryHistory",
+    "CateFamily",
+    "CateSocialRule",
+    "CateOddities",
+    "CateGossip",
+    "CateHealth",
+    "CateEmotional",
+    "CateNational",
+    "CateModernFigure",
+    "CateNostalgic",
+    "CatePolitics",
+    "CateHistoryFigure",
+    "CateSocialPhenomena",
+    "CateFinTech",
+    "view_count_rate",
+]
+
+CATEGORY_MAP = {
+    "知识科普": "CateSciencePop",
+    "军事历史": "CateMilitaryHistory",
+    "家长里短": "CateFamily",
+    "社会法治": "CateSocialRule",
+    "奇闻趣事": "CateOddities",
+    "名人八卦": "CateGossip",
+    "健康养生": "CateHealth",
+    "情感故事": "CateEmotional",
+    "国家大事": "CateNational",
+    "现代人物": "CateModernFigure",
+    "怀旧时光": "CateNostalgic",
+    "政治新闻": "CatePolitics",
+    "历史人物": "CateHistoryFigure",
+    "社会现象": "CateSocialPhenomena",
+    "财经科技": "CateFinTech",
+}

+ 3 - 0
applications/tasks/algorithm_tasks/__init__.py

@@ -0,0 +1,3 @@
+from .account_category_analysis import AccountCategoryAnalysis
+
+__all__ = ["AccountCategoryAnalysis"]

+ 201 - 0
applications/tasks/algorithm_tasks/account_category_analysis.py

@@ -0,0 +1,201 @@
+import time, json
+from datetime import datetime, timedelta
+from pandas import DataFrame
+from tqdm.asyncio import tqdm
+
+from .models import CategoryRegression
+
+
+class AccountCategoryConst:
+    P_VALUE_THRESHOLD = 0.15
+    SAMPLE_MIN_SIZE = 5
+    POSITIVE_STATUS = 1
+    NEGATIVE_STATUS = 0
+    VERSION = 2
+
+    # SOME THRESHOLDS
+    SIMILARITY_THRESHOLD = 0
+    READ_AVG_THRESHOLD = 500
+    READ_AVG_RATE_MIN = 0.3
+    READ_AVG_RATE_MAX = 3
+    VIEW_COUNT_RATE_THRESHOLD = 0
+    INDEX_MAX = 3
+
+
+class AccountCategoryAnalysis(CategoryRegression, AccountCategoryConst):
+    def __init__(self, pool, log_client, trace_id, date_string, data):
+        self.account_negative_categories = None
+        self.pool = pool
+        self.log_client = log_client
+        self.trace_id = trace_id
+        features = data.get("features")
+        category_map = data.get("category_map")
+        super().__init__(features, category_map)
+        self.date_string = date_string
+        self.view_only = data.get("view_only")
+
+    def reverse_category_map(self):
+        return {v: k for k, v in self.category_map.items()}
+
+    def init_execute_date(self):
+        if self.date_string:
+            run_date = datetime.strptime(self.date_string, "%Y-%m-%d")
+        else:
+            run_date = datetime.today()
+
+        end_dt = (run_date - timedelta(1)).strftime("%Y%m%d")
+        return end_dt
+
+    async def prepare_raw_data(self, end_dt, begin_dt: str = "20250401"):
+        query = """
+            select dt, gh_id, account_name, title, similarity, view_count_rate, category,
+                    read_avg, read_avg_rate, first_pub_interval, `index`
+            from datastat_score
+            where dt between %s and %s 
+                and similarity > %s
+                and category IS NOT NULL
+                and read_avg > %s
+                and read_avg_rate between %s and %s
+                and view_count_rate > %s
+                and `index` < %s;
+            ;
+        """
+        fetch_response = await self.pool.async_fetch(
+            query=query,
+            params=(
+                begin_dt,
+                end_dt,
+                self.SIMILARITY_THRESHOLD,
+                self.READ_AVG_THRESHOLD,
+                self.READ_AVG_RATE_MIN,
+                self.READ_AVG_RATE_MAX,
+                self.VIEW_COUNT_RATE_THRESHOLD,
+                self.INDEX_MAX,
+            ),
+        )
+        dataframe = DataFrame.from_records(
+            fetch_response,
+            columns=[
+                "dt",
+                "gh_id",
+                "account_name",
+                "title",
+                "similarity",
+                "view_count_rate",
+                "category",
+                "read_avg",
+                "read_avg_rate",
+                "first_pub_interval",
+                "index",
+            ],
+        )
+        dataframe = dataframe.drop_duplicates(["dt", "gh_id", "title"])
+        return dataframe
+
+    async def update_each_account(self, record):
+        now_timestamp = int(time.time())
+        query = """
+            insert ignore into account_category (dt, gh_id, category_map, status, version, create_timestamp, update_timestamp)
+                values (%s, %s, %s, %s, %s, %s, %s)
+        """
+        insert_rows = await self.pool.async_save(
+            query=query,
+            params=(
+                record["dt"],
+                record["gh_id"],
+                record["category_map"],
+                self.POSITIVE_STATUS,
+                self.VERSION,
+                now_timestamp,
+                now_timestamp,
+            ),
+        )
+        if insert_rows:
+            update_query = """
+                update account_category set status = %s, update_timestamp = %s 
+                where gh_id = %s and dt < %s and status = %s and version = %s;
+            """
+            await self.pool.async_save(
+                query=update_query,
+                params=(
+                    self.NEGATIVE_STATUS,
+                    now_timestamp,
+                    record["gh_id"],
+                    record["dt"],
+                    self.POSITIVE_STATUS,
+                    self.VERSION,
+                ),
+            )
+
+    async def predict_each_account(
+        self, df, account_id, account_id_map, end_dt, param_to_category_map
+    ):
+        sub_df = df[df["gh_id"] == account_id]
+        account_name = account_id_map[account_id]
+        sample_count = sub_df.shape[0]
+        if sample_count < self.SAMPLE_MIN_SIZE:
+            return
+
+        params, t_stats, p_values = self.run_ols_linear_regression(
+            sub_df, self.view_only, self.P_VALUE_THRESHOLD
+        )
+        current_record = {"dt": end_dt, "gh_id": account_id, "category_map": {}, "name": account_name}
+        params_names = self.get_param_names()
+        for name, param, p_value in zip(params_names, params, p_values):
+            category_name = param_to_category_map.get(name, None)
+            if (
+                abs(param) > 0.1
+                and p_value < self.P_VALUE_THRESHOLD
+                and category_name is not None
+            ):
+                scale_factor = min(0.1 / p_value, 1)
+                print(
+                    f"{account_id} {account_name} {category_name} {param:.3f} {p_value:.3f}"
+                )
+                truncate_param = round(max(min(param, 0.25), -0.25) * scale_factor, 6)
+                current_record["category_map"][category_name] = truncate_param
+
+            if (
+                param < -0.1
+                and p_value < self.P_VALUE_THRESHOLD
+                and category_name is not None
+            ):
+                self.account_negative_categories[account_id].append(category_name)
+
+        if not current_record["category_map"]:
+            return
+
+        current_record["category_map"] = json.dumps(
+            current_record["category_map"], ensure_ascii=False
+        )
+
+        await self.update_each_account(current_record)
+
+    async def deal(self):
+        end_dt = self.init_execute_date()
+        raw_dataframe = await self.prepare_raw_data(end_dt)
+
+        # prepare data for model
+        pre_processed_dataframe = self.preprocess_data(raw_dataframe)
+
+        if self.view_only:
+            self.build_and_print_matrix(pre_processed_dataframe)
+            return
+
+        param_to_category_map = self.reverse_category_map()
+
+        account_ids = pre_processed_dataframe["gh_id"].unique()
+        account_id_map = (
+            pre_processed_dataframe[["account_name", "gh_id"]]
+            .drop_duplicates()
+            .set_index("gh_id")["account_name"]
+            .to_dict()
+        )
+
+        account_negative_categories = {key: [] for key in account_ids}
+        self.account_negative_categories = account_negative_categories
+
+        for account_id in tqdm(account_ids, desc="analysis each account"):
+            await self.predict_each_account(
+                pre_processed_dataframe, account_id, account_id_map, end_dt, param_to_category_map
+            )

+ 128 - 0
applications/tasks/algorithm_tasks/models.py

@@ -0,0 +1,128 @@
+# encoding: utf-8
+from __future__ import annotations
+
+import numpy as np
+import statsmodels.api as sm
+
+from pandas import DataFrame
+
+from applications.config import CATEGORY_FEATURES, CATEGORY_MAP
+
+
+class CategoryRegression:
+    """品类回归模型"""
+
+    def __init__(self, features=None, category_map=None):
+        self.features = features or CATEGORY_FEATURES
+        self.category_map = category_map or CATEGORY_MAP
+
+    @staticmethod
+    def clip_func(x):
+        """
+        阅读率均值倍数调整
+        """
+        return x if x < 1.4 else 0.7 * np.log(x) + 1.165
+
+    def preprocess_data(self, raw_dataframe: DataFrame) -> DataFrame:
+        """预处理数据"""
+        for category in self.category_map:
+            colname = self.category_map[category]
+            raw_dataframe[colname] = raw_dataframe["category"] == category
+            raw_dataframe[colname] = raw_dataframe[colname].astype(int)
+
+        # 次条阅读量校正
+        df_idx1 = raw_dataframe[raw_dataframe["index"] == 1][
+            ["dt", "gh_id", "read_avg_rate"]
+        ]
+        merged_dataframe = raw_dataframe.merge(
+            df_idx1, how="left", on=["dt", "gh_id"], suffixes=("", "1")
+        )
+        debias_selection = merged_dataframe.query(
+            "index != 1 and read_avg_rate1 < 0.7 and read_avg_rate < 0.7"
+        )
+        output_dataframe = merged_dataframe.drop(debias_selection.index)
+
+        output_dataframe["read_avg_rate"] = output_dataframe["read_avg_rate"].apply(
+            self.clip_func
+        )
+        output_dataframe["view_count_rate"] = output_dataframe["view_count_rate"].apply(
+            self.clip_func
+        )
+        output_dataframe["days_decrease"] = output_dataframe["first_pub_interval"] * (
+            -0.2 / 120
+        )
+        output_dataframe["RegressionY"] = output_dataframe["read_avg_rate"]
+        return output_dataframe
+
+    def _build_and_print_by_account(
+        self, raw_dataframe: DataFrame, account_name: str | None
+    ) -> None:
+        if account_name:
+            sub_df = raw_dataframe[raw_dataframe["account_name"] == account_name]
+        else:
+            sub_df = raw_dataframe
+
+        if len(sub_df) < 5:
+            return
+        sample_count = len(sub_df)
+        params, t_stats, p_values = self.run_ols_linear_regression(sub_df)
+        row = f"{account_name}\t{sample_count}"
+        for param, p_value in zip(params, p_values):
+            row += f"\t{param:.3f}\t{p_value:.3f}"
+        print(row)
+
+    def build_and_print_matrix(self, raw_dataframe: DataFrame) -> None:
+        p_value_column_names = "\t".join(
+            [name + "\tp-" + name for name in ["bias"] + self.features]
+        )
+        print("account\tsamples\t{}".format(p_value_column_names))
+        # self._build_and_print_by_account(raw_dataframe, None)
+        for account_name in raw_dataframe["account_name"].unique():
+            self._build_and_print_by_account(raw_dataframe, account_name)
+
+    def get_param_names(self):
+        return ["bias"] + self.features
+
+    def run_ols_linear_regression(
+        self,
+        raw_dataframe: DataFrame.series,
+        print_residual: bool = False,
+        print_p_value_threshold: float = 0.1,
+    ):
+        X = raw_dataframe[self.features]  # 特征列
+        y = raw_dataframe["RegressionY"]  # 目标变量
+        X = sm.add_constant(X, has_constant="add")
+
+        model = sm.OLS(y, X).fit()
+
+        params = model.params
+        t_stats = model.tvalues
+        p_values = model.pvalues
+        conf_int = model.conf_int()
+
+        if print_residual:
+            predict_y = model.predict(X)
+            residuals = y - predict_y
+            new_x = raw_dataframe[["title", "category"]].copy()
+            new_x["residual"] = residuals
+            new_x["y"] = y
+            select_idx = []
+            for index, row in new_x.iterrows():
+                param_name = self.category_map.get(row["category"], None)
+                if not param_name:
+                    continue
+                param_index = self.features.index(param_name) + 1
+                param = params.iloc[param_index]
+                p_value = p_values.iloc[param_index]
+                if p_value < print_p_value_threshold:
+                    print(
+                        f"{row['y']:.3f}\t{row['residual']:.3f}\t{row['category']}\t{param:.2f}\t{row['title'][0:30]}"
+                    )
+                    select_idx.append(index)
+            has_category_residuals = residuals.loc[select_idx]
+            r_min = has_category_residuals.min()
+            r_max = has_category_residuals.max()
+            r_avg = has_category_residuals.mean()
+            print(f"residuals min: {r_min:.3f}, max: {r_max:.3f}, mean: {r_avg:.3f}")
+
+        return params, t_stats, p_values

+ 9 - 0
applications/tasks/task_handler.py

@@ -1,5 +1,6 @@
 from datetime import datetime
 
+from applications.tasks.algorithm_tasks import AccountCategoryAnalysis
 from applications.tasks.cold_start_tasks import ArticlePoolColdStart
 from applications.tasks.crawler_tasks import CrawlerToutiao
 from applications.tasks.crawler_tasks import WeixinAccountManager
@@ -164,3 +165,11 @@ class TaskHandler(TaskMapper):
         task = RecycleFwhDailyPublishArticlesTask(self.db_client, self.log_client)
         await task.deal()
         return self.TASK_SUCCESS_STATUS
+
+    async def _account_category_analysis_handler(self) -> int:
+        task = AccountCategoryAnalysis(
+            pool=self.db_client, log_client=self.log_client, trace_id=self.trace_id, data=self.data, date_string=None
+        )
+        await task.deal()
+        return self.TASK_SUCCESS_STATUS
+

+ 2 - 0
applications/tasks/task_scheduler.py

@@ -187,6 +187,8 @@ class TaskScheduler(TaskHandler):
             "crawler_gzh_articles": self._crawler_gzh_article_handler,
             # 服务号发文回收
             "fwh_daily_recycle": self._recycle_fwh_article_handler,
+            # 发文账号品类分析
+            "account_category_analysis": self._account_category_analysis_handler,
         }
 
         if task_name not in handlers:

+ 3 - 1
requirements.txt

@@ -21,4 +21,6 @@ tenacity~=9.0.0
 fake-useragent~=2.1.0
 pydantic~=2.10.6
 scipy~=1.15.2
-quart-cors~=0.8.0
+quart-cors~=0.8.0
+statsmodels
+