models.py 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. # encoding: utf-8
  2. from __future__ import annotations
  3. import numpy as np
  4. import statsmodels.api as sm
  5. from pandas import DataFrame
  6. from applications.config import CATEGORY_FEATURES, CATEGORY_MAP
  7. class CategoryRegression:
  8. """品类回归模型"""
  9. def __init__(self, features=None, category_map=None):
  10. self.features = features or CATEGORY_FEATURES
  11. self.category_map = category_map or CATEGORY_MAP
  12. @staticmethod
  13. def clip_func(x):
  14. """
  15. 阅读率均值倍数调整
  16. """
  17. return x if x < 1.4 else 0.7 * np.log(x) + 1.165
  18. def preprocess_data(self, raw_dataframe: DataFrame) -> DataFrame:
  19. """预处理数据"""
  20. df = raw_dataframe.copy()
  21. # 按品类打 one-hot
  22. for category in self.category_map:
  23. colname = self.category_map[category]
  24. df[colname] = (df["category"] == category).astype(int)
  25. # 次条阅读量校正
  26. df_idx1 = df[df["index"] == 1][["dt", "gh_id", "read_avg_rate"]]
  27. merged_dataframe = df.merge(
  28. df_idx1, how="left", on=["dt", "gh_id"], suffixes=("", "1")
  29. )
  30. debias_selection = merged_dataframe.query(
  31. "index != 1 and read_avg_rate1 < 0.7 and read_avg_rate < 0.7"
  32. )
  33. output_dataframe = merged_dataframe.drop(debias_selection.index)
  34. # clip 成分位+log 形态
  35. def _clip_series(s):
  36. mask = s < 1.4
  37. res = s.copy()
  38. res[~mask] = 0.7 * np.log(res[~mask]) + 1.165
  39. return res
  40. output_dataframe["read_avg_rate"] = _clip_series(
  41. output_dataframe["read_avg_rate"]
  42. )
  43. output_dataframe["view_count_rate"] = _clip_series(
  44. output_dataframe["view_count_rate"]
  45. )
  46. # 首发间隔带一点线性衰减
  47. output_dataframe["days_decrease"] = (
  48. output_dataframe["first_pub_interval"] * (-0.2 / 120)
  49. )
  50. # 回归目标
  51. output_dataframe["RegressionY"] = output_dataframe["read_avg_rate"]
  52. return output_dataframe
  53. def _build_and_print_by_account(
  54. self, raw_dataframe: DataFrame, account_name: str | None
  55. ) -> None:
  56. if account_name:
  57. sub_df = raw_dataframe[raw_dataframe["account_name"] == account_name]
  58. else:
  59. sub_df = raw_dataframe
  60. if len(sub_df) < 5:
  61. return
  62. sample_count = len(sub_df)
  63. param_names, params, t_stats, p_values = self.run_ols_linear_regression(
  64. sub_df
  65. )
  66. row = f"{account_name}\t{sample_count}"
  67. for param, p_value in zip(params, p_values):
  68. row += f"\t{param:.3f}\t{p_value:.3f}"
  69. print(row)
  70. def build_and_print_matrix(self, raw_dataframe: DataFrame) -> None:
  71. p_value_column_names = "\t".join(
  72. [name + "\tp-" + name for name in ["bias"] + self.features]
  73. )
  74. print("account\tsamples\t{}".format(p_value_column_names))
  75. for account_name in raw_dataframe["account_name"].unique():
  76. self._build_and_print_by_account(raw_dataframe, account_name)
  77. def get_param_names(self):
  78. # 仅用于打印 header,真实参数名以后直接从 model 里取
  79. return ["bias"] + self.features
  80. def run_ols_linear_regression(
  81. self,
  82. raw_dataframe: DataFrame,
  83. weights=None,
  84. print_residual: bool = False,
  85. print_p_value_threshold: float = 0.1,
  86. ):
  87. """
  88. 统一的回归入口:
  89. - 支持可选的加权回归 (WLS)
  90. - 返回: param_names, params, t_stats, p_values
  91. """
  92. if raw_dataframe.empty:
  93. return [], np.array([]), np.array([]), np.array([])
  94. X = raw_dataframe[self.features]
  95. y = raw_dataframe["RegressionY"]
  96. # 丢掉 NaN 行,避免回归报错
  97. mask = X.notna().all(axis=1) & y.notna()
  98. X = X[mask]
  99. y = y[mask]
  100. if len(X) < 2:
  101. return [], np.array([]), np.array([]), np.array([])
  102. X = sm.add_constant(X, has_constant="add")
  103. try:
  104. if weights is not None:
  105. w = np.asarray(weights)[mask.to_numpy()]
  106. model = sm.WLS(y, X, weights=w).fit()
  107. else:
  108. model = sm.OLS(y, X).fit()
  109. except Exception as exc:
  110. # 某个账号数据有问题时,打个日志/print,跳过即可
  111. print(f"[WARN] OLS/WLS failed: {exc}")
  112. return [], np.array([]), np.array([]), np.array([])
  113. param_names = list(model.params.index)
  114. params = model.params.to_numpy()
  115. t_stats = model.tvalues.to_numpy()
  116. p_values = model.pvalues.to_numpy()
  117. if print_residual:
  118. predict_y = model.predict(X)
  119. residuals = y - predict_y
  120. new_x = raw_dataframe.loc[mask, ["title", "category"]].copy()
  121. new_x["residual"] = residuals
  122. new_x["y"] = y
  123. select_idx = []
  124. for index, row in new_x.iterrows():
  125. param_name = self.category_map.get(row["category"], None)
  126. if not param_name or param_name not in param_names:
  127. continue
  128. param_index = param_names.index(param_name)
  129. param = params[param_index]
  130. p_value = p_values[param_index]
  131. if p_value < print_p_value_threshold:
  132. print(
  133. f"{row['y']:.3f}\t{row['residual']:.3f}\t"
  134. f"{row['category']}\t{param:.2f}\t"
  135. f"{row['title'][0:30]}"
  136. )
  137. select_idx.append(index)
  138. if select_idx:
  139. has_category_residuals = residuals.loc[select_idx]
  140. r_min = has_category_residuals.min()
  141. r_max = has_category_residuals.max()
  142. r_avg = has_category_residuals.mean()
  143. print(
  144. f"residuals min: {r_min:.3f}, "
  145. f"max: {r_max:.3f}, mean: {r_avg:.3f}"
  146. )
  147. return param_names, params, t_stats, p_values