Browse Source

品类优化

luojunhui 3 tháng trước cách đây
mục cha
commit
6abf86ad3a

+ 142 - 51
applications/tasks/algorithm_tasks/account_category_analysis.py

@@ -1,4 +1,7 @@
-import time, json
+import time
+import json
+import numpy as np
+import pandas as pd
 from datetime import datetime, timedelta
 from pandas import DataFrame
 from tqdm.asyncio import tqdm
@@ -11,7 +14,7 @@ class AccountCategoryConst:
     SAMPLE_MIN_SIZE = 5
     POSITIVE_STATUS = 1
     NEGATIVE_STATUS = 0
-    VERSION = 2
+    CURRENT_VERSION = 3
 
     # SOME THRESHOLDS
     SIMILARITY_THRESHOLD = 0
@@ -22,7 +25,13 @@ class AccountCategoryConst:
     INDEX_MAX = 3
 
     # MAX VALUE
-    MAX_VALUE = 0.5
+    MAX_VALUE = 0.25
+
+    # === NEW: 策略相关常量 ===
+    # 达到这个样本数后,账号权重占比接近 1
+    MERGE_SAMPLE_COUNT = 90
+    # 时间权重半衰期(天):晚 30 天,权重衰减一半
+    TIME_HALF_LIFE_DAYS = 90
 
 
 class AccountCategoryAnalysis(CategoryRegression, AccountCategoryConst):
@@ -38,6 +47,7 @@ class AccountCategoryAnalysis(CategoryRegression, AccountCategoryConst):
         self.view_only = data.get("view_only")
 
     def reverse_category_map(self):
+        # param_name -> category_name
         return {v: k for k, v in self.category_map.items()}
 
     def init_execute_date(self):
@@ -47,11 +57,10 @@ class AccountCategoryAnalysis(CategoryRegression, AccountCategoryConst):
             run_date = datetime.today()
 
         end_dt = (run_date - timedelta(1)).strftime("%Y%m%d")
-        begin_dt = (run_date - timedelta(61)).strftime("%Y%m%d")
+        begin_dt = (run_date - timedelta(181)).strftime("%Y%m%d")
         return begin_dt, end_dt
 
     async def prepare_raw_data(self, end_dt, begin_dt: str = "20250401"):
-        begin_dt = "20250401"
         query = """
             select dt, gh_id, account_name, title, similarity, view_count_rate, category,
                     read_avg, read_avg_rate, first_pub_interval, `index`
@@ -63,7 +72,6 @@ class AccountCategoryAnalysis(CategoryRegression, AccountCategoryConst):
                 and read_avg_rate between %s and %s
                 and view_count_rate > %s
                 and `index` < %s
-                and account_name in ('生活慢时光', '美好时光阅读汇', '史趣探秘', '趣味生活漫谈', '趣味生活方式', '趣味生活漫时光')
             ;
         """
         fetch_response = await self.pool.async_fetch(
@@ -98,11 +106,53 @@ class AccountCategoryAnalysis(CategoryRegression, AccountCategoryConst):
         dataframe = dataframe.drop_duplicates(["dt", "gh_id", "title"])
         return dataframe
 
+    # === NEW: 构建 WLS 权重(时间 + 曝光) ===
+    def build_sample_weights(self, df: DataFrame) -> pd.Series:
+        """
+        时间越近 + read_avg 越大,权重越高。
+        最后做一个均值归一化,避免数值过大。
+        """
+        # dt 形如 "20251124"
+        dt_series = pd.to_datetime(df["dt"], format="%Y%m%d", errors="coerce")
+        latest_dt = dt_series.max()
+        days_ago = (latest_dt - dt_series).dt.days.fillna(0)
+        time_weight = np.power(0.5, days_ago / self.TIME_HALF_LIFE_DAYS)
+
+        exposure_weight = np.log1p(df["read_avg"]).fillna(1.0)
+        weights = time_weight * exposure_weight
+
+        mean_w = weights.mean()
+        if mean_w > 0:
+            weights = weights / mean_w
+        return weights
+
+    # === NEW: 把回归参数转成 {category: score} ===
+    def _extract_category_scores(
+        self,
+        param_names,
+        params,
+        p_values,
+        param_to_category_map,
+    ):
+        scores = {}
+        for name, param, p_value in zip(param_names, params, p_values):
+            category_name = param_to_category_map.get(name)
+            if not category_name:
+                continue
+            if abs(param) <= 0.1 or p_value >= self.P_VALUE_THRESHOLD:
+                continue
+            scale_factor = min(0.1 / p_value, 1.0)
+            truncate_param = max(min(param, self.MAX_VALUE), -self.MAX_VALUE)
+            truncate_param *= scale_factor
+            scores[category_name] = round(truncate_param, 6)
+        return scores
+
     async def update_each_account(self, record):
         now_timestamp = int(time.time())
         query = """
-            insert ignore into account_category (dt, gh_id, category_map, status, version, create_timestamp, update_timestamp)
-                values (%s, %s, %s, %s, %s, %s, %s)
+            insert ignore into account_category 
+                (dt, gh_id, category_map, status, version, create_timestamp, update_timestamp)
+            values (%s, %s, %s, %s, %s, %s, %s)
         """
         insert_rows = await self.pool.async_save(
             query=query,
@@ -111,15 +161,16 @@ class AccountCategoryAnalysis(CategoryRegression, AccountCategoryConst):
                 record["gh_id"],
                 record["category_map"],
                 self.POSITIVE_STATUS,
-                self.VERSION,
+                self.CURRENT_VERSION,
                 now_timestamp,
                 now_timestamp,
             ),
         )
         if insert_rows:
             update_query = """
-                update account_category set status = %s, update_timestamp = %s 
-                where gh_id = %s and dt < %s and status = %s and version = %s;
+                update account_category 
+                set status = %s, update_timestamp = %s 
+                where gh_id = %s and dt < %s and status = %s;
             """
             await self.pool.async_save(
                 query=update_query,
@@ -129,60 +180,75 @@ class AccountCategoryAnalysis(CategoryRegression, AccountCategoryConst):
                     record["gh_id"],
                     record["dt"],
                     self.POSITIVE_STATUS,
-                    self.VERSION,
                 ),
             )
 
     async def predict_each_account(
-        self, df, account_id, account_id_map, end_dt, param_to_category_map
+        self,
+        sub_df,
+        account_id,
+        account_id_map,
+        end_dt,
+        param_to_category_map,
+        global_scores,
+        global_weights,
     ):
-        sub_df = df[df["gh_id"] == account_id]
         account_name = account_id_map[account_id]
         sample_count = sub_df.shape[0]
         if sample_count < self.SAMPLE_MIN_SIZE:
             return
 
-        params, t_stats, p_values = self.run_ols_linear_regression(
-            sub_df, self.view_only, self.P_VALUE_THRESHOLD
+        # 对应子集的权重
+        if global_weights is not None:
+            weights = global_weights[sub_df.index.to_numpy()]
+        else:
+            weights = None
+
+        param_names, params, t_stats, p_values = self.run_ols_linear_regression(
+            sub_df,
+            weights=weights,
+            print_residual=self.view_only,
+            print_p_value_threshold=self.P_VALUE_THRESHOLD,
         )
-        current_record = {
-            "dt": end_dt,
-            "gh_id": account_id,
-            "category_map": {},
-            "name": account_name,
-        }
-        params_names = self.get_param_names()
-        for name, param, p_value in zip(params_names, params, p_values):
-            category_name = param_to_category_map.get(name, None)
-            if (
-                abs(param) > 0.1
-                and p_value < self.P_VALUE_THRESHOLD
-                and category_name is not None
-            ):
-                scale_factor = min(0.1 / p_value, 1)
-                print(
-                    f"{account_id} {account_name} {category_name} {param:.3f} {p_value:.3f}"
-                )
-                truncate_param = round(
-                    max(min(param, self.MAX_VALUE), -self.MAX_VALUE) * scale_factor, 6
-                )
-                current_record["category_map"][category_name] = truncate_param
+        if not len(params):
+            return
 
+        # 账号层品类得分
+        account_scores = self._extract_category_scores(
+            param_names, params, p_values, param_to_category_map
+        )
+
+        # 记录负向品类(账号自身)
+        for name, param, p_value in zip(param_names, params, p_values):
+            category_name = param_to_category_map.get(name)
             if (
-                param < -0.1
+                category_name is not None
+                and param < -0.1
                 and p_value < self.P_VALUE_THRESHOLD
-                and category_name is not None
             ):
                 self.account_negative_categories[account_id].append(category_name)
 
-        if not current_record["category_map"]:
-            return
+        # 样本少时多依赖全局,样本多时更信账号自身
+        alpha = min(sample_count / self.MERGE_SAMPLE_COUNT, 1.0)
+        merged_scores = {}
+        all_categories = set(global_scores.keys()) | set(account_scores.keys())
+        for cat in all_categories:
+            g = global_scores.get(cat, 0.0)
+            a = account_scores.get(cat, 0.0)
+            final = (1 - alpha) * g + alpha * a
+            if abs(final) > 1e-6:
+                merged_scores[cat] = round(final, 6)
 
-        current_record["category_map"] = json.dumps(
-            current_record["category_map"], ensure_ascii=False
-        )
+        if not merged_scores:
+            return
 
-        await self.update_each_account(current_record)
+        record = {
+            "dt": end_dt,
+            "gh_id": account_id,
+            "category_map": json.dumps(merged_scores, ensure_ascii=False),
+            "name": account_name,
+        }
+        await self.update_each_account(record)
 
     async def deal(self):
         begin_dt, end_dt = self.init_execute_date()
@@ -191,12 +257,32 @@ class AccountCategoryAnalysis(CategoryRegression, AccountCategoryConst):
         # prepare data for model
         pre_processed_dataframe = self.preprocess_data(raw_dataframe)
 
+        if pre_processed_dataframe.empty:
+            print(f"[INFO] no valid data between {begin_dt} and {end_dt}")
+            return
+
         if self.view_only:
             self.build_and_print_matrix(pre_processed_dataframe)
             return
 
         param_to_category_map = self.reverse_category_map()
 
+        # === 1) 全局模型:带权重的 WLS,得到 global_scores ===
+        global_weights = self.build_sample_weights(pre_processed_dataframe)
+        g_param_names, g_params, g_t_stats, g_p_values = self.run_ols_linear_regression(
+            pre_processed_dataframe,
+            weights=global_weights,
+            print_residual=False,
+            print_p_value_threshold=self.P_VALUE_THRESHOLD,
+        )
+        if len(g_params):
+            global_scores = self._extract_category_scores(
+                g_param_names, g_params, g_p_values, param_to_category_map
+            )
+        else:
+            global_scores = {}
+
+        # 账号信息准备
         account_ids = pre_processed_dataframe["gh_id"].unique()
         account_id_map = (
             pre_processed_dataframe[["account_name", "gh_id"]]
@@ -204,15 +290,20 @@ class AccountCategoryAnalysis(CategoryRegression, AccountCategoryConst):
             .set_index("gh_id")["account_name"]
             .to_dict()
         )
+        self.account_negative_categories = {key: [] for key in account_ids}
 
-        account_negative_categories = {key: [] for key in account_ids}
-        self.account_negative_categories = account_negative_categories
-
-        for account_id in tqdm(account_ids, desc="analysis each account"):
+        # === 2) per-account 模型:在全局的基础上做微调 ===
+        for account_id, sub_df in tqdm(
+            pre_processed_dataframe.groupby("gh_id"),
+            desc="analysis each account",
+        ):
             await self.predict_each_account(
-                pre_processed_dataframe,
+                sub_df,
                 account_id,
                 account_id_map,
                 end_dt,
                 param_to_category_map,
+                global_scores,
+                global_weights,
             )
+

+ 82 - 36
applications/tasks/algorithm_tasks/models.py

@@ -25,16 +25,16 @@ class CategoryRegression:
 
     def preprocess_data(self, raw_dataframe: DataFrame) -> DataFrame:
         """预处理数据"""
+        df = raw_dataframe.copy()
+
+        # 按品类打 one-hot
         for category in self.category_map:
             colname = self.category_map[category]
-            raw_dataframe[colname] = raw_dataframe["category"] == category
-            raw_dataframe[colname] = raw_dataframe[colname].astype(int)
+            df[colname] = (df["category"] == category).astype(int)
 
         # 次条阅读量校正
-        df_idx1 = raw_dataframe[raw_dataframe["index"] == 1][
-            ["dt", "gh_id", "read_avg_rate"]
-        ]
-        merged_dataframe = raw_dataframe.merge(
+        df_idx1 = df[df["index"] == 1][["dt", "gh_id", "read_avg_rate"]]
+        merged_dataframe = df.merge(
             df_idx1, how="left", on=["dt", "gh_id"], suffixes=("", "1")
         )
         debias_selection = merged_dataframe.query(
@@ -42,15 +42,26 @@ class CategoryRegression:
         )
         output_dataframe = merged_dataframe.drop(debias_selection.index)
 
-        output_dataframe["read_avg_rate"] = output_dataframe["read_avg_rate"].apply(
-            self.clip_func
+        # clip 成分位+log 形态
+        def _clip_series(s):
+            mask = s < 1.4
+            res = s.copy()
+            res[~mask] = 0.7 * np.log(res[~mask]) + 1.165
+            return res
+
+        output_dataframe["read_avg_rate"] = _clip_series(
+            output_dataframe["read_avg_rate"]
         )
-        output_dataframe["view_count_rate"] = output_dataframe["view_count_rate"].apply(
-            self.clip_func
+        output_dataframe["view_count_rate"] = _clip_series(
+            output_dataframe["view_count_rate"]
         )
-        output_dataframe["days_decrease"] = output_dataframe["first_pub_interval"] * (
-            -0.2 / 120
+
+        # 首发间隔带一点线性衰减
+        output_dataframe["days_decrease"] = (
+            output_dataframe["first_pub_interval"] * (-0.2 / 120)
         )
+
+        # 回归目标
         output_dataframe["RegressionY"] = output_dataframe["read_avg_rate"]
         return output_dataframe
 
@@ -65,7 +76,9 @@ class CategoryRegression:
         if len(sub_df) < 5:
             return
         sample_count = len(sub_df)
-        params, t_stats, p_values = self.run_ols_linear_regression(sub_df)
+        param_names, params, t_stats, p_values = self.run_ols_linear_regression(
+            sub_df
+        )
         row = f"{account_name}\t{sample_count}"
         for param, p_value in zip(params, p_values):
             row += f"\t{param:.3f}\t{p_value:.3f}"
@@ -76,53 +89,86 @@ class CategoryRegression:
             [name + "\tp-" + name for name in ["bias"] + self.features]
         )
         print("account\tsamples\t{}".format(p_value_column_names))
-        # self._build_and_print_by_account(raw_dataframe, None)
         for account_name in raw_dataframe["account_name"].unique():
             self._build_and_print_by_account(raw_dataframe, account_name)
 
     def get_param_names(self):
+        # 仅用于打印 header,真实参数名以后直接从 model 里取
         return ["bias"] + self.features
 
     def run_ols_linear_regression(
         self,
-        raw_dataframe: DataFrame.series,
+        raw_dataframe: DataFrame,
+        weights=None,
         print_residual: bool = False,
         print_p_value_threshold: float = 0.1,
     ):
-        X = raw_dataframe[self.features]  # 特征列
-        y = raw_dataframe["RegressionY"]  # 目标变量
-        X = sm.add_constant(X, has_constant="add")
+        """
+        统一的回归入口:
+        - 支持可选的加权回归 (WLS)
+        - 返回: param_names, params, t_stats, p_values
+        """
+        if raw_dataframe.empty:
+            return [], np.array([]), np.array([]), np.array([])
+
+        X = raw_dataframe[self.features]
+        y = raw_dataframe["RegressionY"]
+
+        # 丢掉 NaN 行,避免回归报错
+        mask = X.notna().all(axis=1) & y.notna()
+        X = X[mask]
+        y = y[mask]
 
-        model = sm.OLS(y, X).fit()
+        if len(X) < 2:
+            return [], np.array([]), np.array([]), np.array([])
 
-        params = model.params
-        t_stats = model.tvalues
-        p_values = model.pvalues
-        conf_int = model.conf_int()
+        X = sm.add_constant(X, has_constant="add")
+
+        try:
+            if weights is not None:
+                w = np.asarray(weights)[mask.to_numpy()]
+                model = sm.WLS(y, X, weights=w).fit()
+            else:
+                model = sm.OLS(y, X).fit()
+        except Exception as exc:
+            # 某个账号数据有问题时,打个日志/print,跳过即可
+            print(f"[WARN] OLS/WLS failed: {exc}")
+            return [], np.array([]), np.array([]), np.array([])
+
+        param_names = list(model.params.index)
+        params = model.params.to_numpy()
+        t_stats = model.tvalues.to_numpy()
+        p_values = model.pvalues.to_numpy()
 
         if print_residual:
             predict_y = model.predict(X)
             residuals = y - predict_y
-            new_x = raw_dataframe[["title", "category"]].copy()
+            new_x = raw_dataframe.loc[mask, ["title", "category"]].copy()
             new_x["residual"] = residuals
             new_x["y"] = y
             select_idx = []
             for index, row in new_x.iterrows():
                 param_name = self.category_map.get(row["category"], None)
-                if not param_name:
+                if not param_name or param_name not in param_names:
                     continue
-                param_index = self.features.index(param_name) + 1
-                param = params.iloc[param_index]
-                p_value = p_values.iloc[param_index]
+                param_index = param_names.index(param_name)
+                param = params[param_index]
+                p_value = p_values[param_index]
                 if p_value < print_p_value_threshold:
                     print(
-                        f"{row['y']:.3f}\t{row['residual']:.3f}\t{row['category']}\t{param:.2f}\t{row['title'][0:30]}"
+                        f"{row['y']:.3f}\t{row['residual']:.3f}\t"
+                        f"{row['category']}\t{param:.2f}\t"
+                        f"{row['title'][0:30]}"
                     )
                     select_idx.append(index)
-            has_category_residuals = residuals.loc[select_idx]
-            r_min = has_category_residuals.min()
-            r_max = has_category_residuals.max()
-            r_avg = has_category_residuals.mean()
-            print(f"residuals min: {r_min:.3f}, max: {r_max:.3f}, mean: {r_avg:.3f}")
-
-        return params, t_stats, p_values
+            if select_idx:
+                has_category_residuals = residuals.loc[select_idx]
+                r_min = has_category_residuals.min()
+                r_max = has_category_residuals.max()
+                r_avg = has_category_residuals.mean()
+                print(
+                    f"residuals min: {r_min:.3f}, "
+                    f"max: {r_max:.3f}, mean: {r_avg:.3f}"
+                )
+
+        return param_names, params, t_stats, p_values