| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174 |
- # encoding: utf-8
- from __future__ import annotations
- import numpy as np
- import statsmodels.api as sm
- from pandas import DataFrame
- from applications.config import CATEGORY_FEATURES, CATEGORY_MAP
- class CategoryRegression:
- """品类回归模型"""
- def __init__(self, features=None, category_map=None):
- self.features = features or CATEGORY_FEATURES
- self.category_map = category_map or CATEGORY_MAP
- @staticmethod
- def clip_func(x):
- """
- 阅读率均值倍数调整
- """
- return x if x < 1.4 else 0.7 * np.log(x) + 1.165
- def preprocess_data(self, raw_dataframe: DataFrame) -> DataFrame:
- """预处理数据"""
- df = raw_dataframe.copy()
- # 按品类打 one-hot
- for category in self.category_map:
- colname = self.category_map[category]
- df[colname] = (df["category"] == category).astype(int)
- # 次条阅读量校正
- df_idx1 = df[df["index"] == 1][["dt", "gh_id", "read_avg_rate"]]
- merged_dataframe = df.merge(
- df_idx1, how="left", on=["dt", "gh_id"], suffixes=("", "1")
- )
- debias_selection = merged_dataframe.query(
- "index != 1 and read_avg_rate1 < 0.7 and read_avg_rate < 0.7"
- )
- output_dataframe = merged_dataframe.drop(debias_selection.index)
- # clip 成分位+log 形态
- def _clip_series(s):
- mask = s < 1.4
- res = s.copy()
- res[~mask] = 0.7 * np.log(res[~mask]) + 1.165
- return res
- output_dataframe["read_avg_rate"] = _clip_series(
- output_dataframe["read_avg_rate"]
- )
- output_dataframe["view_count_rate"] = _clip_series(
- output_dataframe["view_count_rate"]
- )
- # 首发间隔带一点线性衰减
- output_dataframe["days_decrease"] = (
- output_dataframe["first_pub_interval"] * (-0.2 / 120)
- )
- # 回归目标
- output_dataframe["RegressionY"] = output_dataframe["read_avg_rate"]
- return output_dataframe
- def _build_and_print_by_account(
- self, raw_dataframe: DataFrame, account_name: str | None
- ) -> None:
- if account_name:
- sub_df = raw_dataframe[raw_dataframe["account_name"] == account_name]
- else:
- sub_df = raw_dataframe
- if len(sub_df) < 5:
- return
- sample_count = len(sub_df)
- param_names, params, t_stats, p_values = self.run_ols_linear_regression(
- sub_df
- )
- row = f"{account_name}\t{sample_count}"
- for param, p_value in zip(params, p_values):
- row += f"\t{param:.3f}\t{p_value:.3f}"
- print(row)
- def build_and_print_matrix(self, raw_dataframe: DataFrame) -> None:
- p_value_column_names = "\t".join(
- [name + "\tp-" + name for name in ["bias"] + self.features]
- )
- print("account\tsamples\t{}".format(p_value_column_names))
- for account_name in raw_dataframe["account_name"].unique():
- self._build_and_print_by_account(raw_dataframe, account_name)
- def get_param_names(self):
- # 仅用于打印 header,真实参数名以后直接从 model 里取
- return ["bias"] + self.features
- def run_ols_linear_regression(
- self,
- raw_dataframe: DataFrame,
- weights=None,
- print_residual: bool = False,
- print_p_value_threshold: float = 0.1,
- ):
- """
- 统一的回归入口:
- - 支持可选的加权回归 (WLS)
- - 返回: param_names, params, t_stats, p_values
- """
- if raw_dataframe.empty:
- return [], np.array([]), np.array([]), np.array([])
- X = raw_dataframe[self.features]
- y = raw_dataframe["RegressionY"]
- # 丢掉 NaN 行,避免回归报错
- mask = X.notna().all(axis=1) & y.notna()
- X = X[mask]
- y = y[mask]
- if len(X) < 2:
- return [], np.array([]), np.array([]), np.array([])
- X = sm.add_constant(X, has_constant="add")
- try:
- if weights is not None:
- w = np.asarray(weights)[mask.to_numpy()]
- model = sm.WLS(y, X, weights=w).fit()
- else:
- model = sm.OLS(y, X).fit()
- except Exception as exc:
- # 某个账号数据有问题时,打个日志/print,跳过即可
- print(f"[WARN] OLS/WLS failed: {exc}")
- return [], np.array([]), np.array([]), np.array([])
- param_names = list(model.params.index)
- params = model.params.to_numpy()
- t_stats = model.tvalues.to_numpy()
- p_values = model.pvalues.to_numpy()
- if print_residual:
- predict_y = model.predict(X)
- residuals = y - predict_y
- new_x = raw_dataframe.loc[mask, ["title", "category"]].copy()
- new_x["residual"] = residuals
- new_x["y"] = y
- select_idx = []
- for index, row in new_x.iterrows():
- param_name = self.category_map.get(row["category"], None)
- if not param_name or param_name not in param_names:
- continue
- param_index = param_names.index(param_name)
- param = params[param_index]
- p_value = p_values[param_index]
- if p_value < print_p_value_threshold:
- print(
- f"{row['y']:.3f}\t{row['residual']:.3f}\t"
- f"{row['category']}\t{param:.2f}\t"
- f"{row['title'][0:30]}"
- )
- select_idx.append(index)
- if select_idx:
- has_category_residuals = residuals.loc[select_idx]
- r_min = has_category_residuals.min()
- r_max = has_category_residuals.max()
- r_avg = has_category_residuals.mean()
- print(
- f"residuals min: {r_min:.3f}, "
- f"max: {r_max:.3f}, mean: {r_avg:.3f}"
- )
- return param_names, params, t_stats, p_values
|