# encoding: utf-8 from __future__ import annotations import numpy as np import statsmodels.api as sm from pandas import DataFrame from applications.config import CATEGORY_FEATURES, CATEGORY_MAP class CategoryRegression: """品类回归模型""" def __init__(self, features=None, category_map=None): self.features = features or CATEGORY_FEATURES self.category_map = category_map or CATEGORY_MAP @staticmethod def clip_func(x): """ 阅读率均值倍数调整 """ return x if x < 1.4 else 0.7 * np.log(x) + 1.165 def preprocess_data(self, raw_dataframe: DataFrame) -> DataFrame: """预处理数据""" df = raw_dataframe.copy() # 按品类打 one-hot for category in self.category_map: colname = self.category_map[category] df[colname] = (df["category"] == category).astype(int) # 次条阅读量校正 df_idx1 = df[df["index"] == 1][["dt", "gh_id", "read_avg_rate"]] merged_dataframe = df.merge( df_idx1, how="left", on=["dt", "gh_id"], suffixes=("", "1") ) debias_selection = merged_dataframe.query( "index != 1 and read_avg_rate1 < 0.7 and read_avg_rate < 0.7" ) output_dataframe = merged_dataframe.drop(debias_selection.index) # clip 成分位+log 形态 def _clip_series(s): mask = s < 1.4 res = s.copy() res[~mask] = 0.7 * np.log(res[~mask]) + 1.165 return res output_dataframe["read_avg_rate"] = _clip_series( output_dataframe["read_avg_rate"] ) output_dataframe["view_count_rate"] = _clip_series( output_dataframe["view_count_rate"] ) # 首发间隔带一点线性衰减 output_dataframe["days_decrease"] = ( output_dataframe["first_pub_interval"] * (-0.2 / 120) ) # 回归目标 output_dataframe["RegressionY"] = output_dataframe["read_avg_rate"] return output_dataframe def _build_and_print_by_account( self, raw_dataframe: DataFrame, account_name: str | None ) -> None: if account_name: sub_df = raw_dataframe[raw_dataframe["account_name"] == account_name] else: sub_df = raw_dataframe if len(sub_df) < 5: return sample_count = len(sub_df) param_names, params, t_stats, p_values = self.run_ols_linear_regression( sub_df ) row = f"{account_name}\t{sample_count}" for param, p_value in zip(params, p_values): row += f"\t{param:.3f}\t{p_value:.3f}" print(row) def build_and_print_matrix(self, raw_dataframe: DataFrame) -> None: p_value_column_names = "\t".join( [name + "\tp-" + name for name in ["bias"] + self.features] ) print("account\tsamples\t{}".format(p_value_column_names)) for account_name in raw_dataframe["account_name"].unique(): self._build_and_print_by_account(raw_dataframe, account_name) def get_param_names(self): # 仅用于打印 header,真实参数名以后直接从 model 里取 return ["bias"] + self.features def run_ols_linear_regression( self, raw_dataframe: DataFrame, weights=None, print_residual: bool = False, print_p_value_threshold: float = 0.1, ): """ 统一的回归入口: - 支持可选的加权回归 (WLS) - 返回: param_names, params, t_stats, p_values """ if raw_dataframe.empty: return [], np.array([]), np.array([]), np.array([]) X = raw_dataframe[self.features] y = raw_dataframe["RegressionY"] # 丢掉 NaN 行,避免回归报错 mask = X.notna().all(axis=1) & y.notna() X = X[mask] y = y[mask] if len(X) < 2: return [], np.array([]), np.array([]), np.array([]) X = sm.add_constant(X, has_constant="add") try: if weights is not None: w = np.asarray(weights)[mask.to_numpy()] model = sm.WLS(y, X, weights=w).fit() else: model = sm.OLS(y, X).fit() except Exception as exc: # 某个账号数据有问题时,打个日志/print,跳过即可 print(f"[WARN] OLS/WLS failed: {exc}") return [], np.array([]), np.array([]), np.array([]) param_names = list(model.params.index) params = model.params.to_numpy() t_stats = model.tvalues.to_numpy() p_values = model.pvalues.to_numpy() if print_residual: predict_y = model.predict(X) residuals = y - predict_y new_x = raw_dataframe.loc[mask, ["title", "category"]].copy() new_x["residual"] = residuals new_x["y"] = y select_idx = [] for index, row in new_x.iterrows(): param_name = self.category_map.get(row["category"], None) if not param_name or param_name not in param_names: continue param_index = param_names.index(param_name) param = params[param_index] p_value = p_values[param_index] if p_value < print_p_value_threshold: print( f"{row['y']:.3f}\t{row['residual']:.3f}\t" f"{row['category']}\t{param:.2f}\t" f"{row['title'][0:30]}" ) select_idx.append(index) if select_idx: has_category_residuals = residuals.loc[select_idx] r_min = has_category_residuals.min() r_max = has_category_residuals.max() r_avg = has_category_residuals.mean() print( f"residuals min: {r_min:.3f}, " f"max: {r_max:.3f}, mean: {r_avg:.3f}" ) return param_names, params, t_stats, p_values