# encoding: utf-8 from __future__ import annotations import numpy as np import statsmodels.api as sm from pandas import DataFrame from applications.config import CATEGORY_FEATURES, CATEGORY_MAP class CategoryRegression: """品类回归模型""" def __init__(self, features=None, category_map=None): self.features = features or CATEGORY_FEATURES self.category_map = category_map or CATEGORY_MAP @staticmethod def clip_func(x): """ 阅读率均值倍数调整 """ return x if x < 1.4 else 0.7 * np.log(x) + 1.165 def preprocess_data(self, raw_dataframe: DataFrame) -> DataFrame: """预处理数据""" for category in self.category_map: colname = self.category_map[category] raw_dataframe[colname] = raw_dataframe["category"] == category raw_dataframe[colname] = raw_dataframe[colname].astype(int) # 次条阅读量校正 df_idx1 = raw_dataframe[raw_dataframe["index"] == 1][ ["dt", "gh_id", "read_avg_rate"] ] merged_dataframe = raw_dataframe.merge( df_idx1, how="left", on=["dt", "gh_id"], suffixes=("", "1") ) debias_selection = merged_dataframe.query( "index != 1 and read_avg_rate1 < 0.7 and read_avg_rate < 0.7" ) output_dataframe = merged_dataframe.drop(debias_selection.index) output_dataframe["read_avg_rate"] = output_dataframe["read_avg_rate"].apply( self.clip_func ) output_dataframe["view_count_rate"] = output_dataframe["view_count_rate"].apply( self.clip_func ) output_dataframe["days_decrease"] = output_dataframe["first_pub_interval"] * ( -0.2 / 120 ) output_dataframe["RegressionY"] = output_dataframe["read_avg_rate"] return output_dataframe def _build_and_print_by_account( self, raw_dataframe: DataFrame, account_name: str | None ) -> None: if account_name: sub_df = raw_dataframe[raw_dataframe["account_name"] == account_name] else: sub_df = raw_dataframe if len(sub_df) < 5: return sample_count = len(sub_df) params, t_stats, p_values = self.run_ols_linear_regression(sub_df) row = f"{account_name}\t{sample_count}" for param, p_value in zip(params, p_values): row += f"\t{param:.3f}\t{p_value:.3f}" print(row) def build_and_print_matrix(self, raw_dataframe: DataFrame) -> None: p_value_column_names = "\t".join( [name + "\tp-" + name for name in ["bias"] + self.features] ) print("account\tsamples\t{}".format(p_value_column_names)) # self._build_and_print_by_account(raw_dataframe, None) for account_name in raw_dataframe["account_name"].unique(): self._build_and_print_by_account(raw_dataframe, account_name) def get_param_names(self): return ["bias"] + self.features def run_ols_linear_regression( self, raw_dataframe: DataFrame.series, print_residual: bool = False, print_p_value_threshold: float = 0.1, ): X = raw_dataframe[self.features] # 特征列 y = raw_dataframe["RegressionY"] # 目标变量 X = sm.add_constant(X, has_constant="add") model = sm.OLS(y, X).fit() params = model.params t_stats = model.tvalues p_values = model.pvalues conf_int = model.conf_int() if print_residual: predict_y = model.predict(X) residuals = y - predict_y new_x = raw_dataframe[["title", "category"]].copy() new_x["residual"] = residuals new_x["y"] = y select_idx = [] for index, row in new_x.iterrows(): param_name = self.category_map.get(row["category"], None) if not param_name: continue param_index = self.features.index(param_name) + 1 param = params.iloc[param_index] p_value = p_values.iloc[param_index] if p_value < print_p_value_threshold: print( f"{row['y']:.3f}\t{row['residual']:.3f}\t{row['category']}\t{param:.2f}\t{row['title'][0:30]}" ) select_idx.append(index) has_category_residuals = residuals.loc[select_idx] r_min = has_category_residuals.min() r_max = has_category_residuals.max() r_avg = has_category_residuals.mean() print(f"residuals min: {r_min:.3f}, max: {r_max:.3f}, mean: {r_avg:.3f}") return params, t_stats, p_values