123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187 |
- #! /usr/bin/env python
- # -*- coding: utf-8 -*-
- # vim:fenc=utf-8
- #
- # Copyright © 2024 StrayWarrior <i@straywarrior.com>
- """
- Models for long article categories.
- """
- import pandas as pd
- from sklearn.model_selection import train_test_split
- from sklearn.linear_model import LogisticRegression, LinearRegression
- from sklearn.metrics import accuracy_score, confusion_matrix, classification_report
- from sklearn.metrics import mean_squared_error, r2_score
- import statsmodels.api as sm
- from .consts import category_name_map, reverse_category_name_map
- class CategoryRegressionV1:
- def __init__(self):
- self.features = [
- 'CateOddities', 'CateFamily', 'CateHeartwarm',
- 'CateHistory', 'CateHealth', 'CateLifeKnowledge', 'CateGossip',
- 'CatePolitics', 'CateMilitary', 'CateMovie', 'CateSociety',
- 'view_count_rate'
- ]
- def preprocess_data(self, df):
- for cate in category_name_map:
- colname = category_name_map[cate]
- df[colname] = df['category'] == cate
- df[colname] = df[colname].astype(int)
- df['ClassY'] = df['read_avg_rate'] > 1
- df['RegressionY'] = df['read_avg_rate']
- return df
- def build_and_print(self, df, account_name):
- if account_name is not None:
- sub_df = df[df['account_name'] == account_name]
- else:
- sub_df = df
- if len(sub_df) < 5:
- return
- sample_count = len(sub_df)
- params, t_stats, p_values = self.run_ols_linear_regression(sub_df)
- row = f'{account_name}\t{sample_count}'
- for param, p_value in zip(params, p_values):
- row += f'\t{param:.3f}\t{p_value:.3f}'
- print(row)
- def build(self, df):
- p_value_column_names = '\t'.join([name + "\tp-" + name for name in
- ['bias'] + self.features])
- print('account\tsamples\t{}'.format(p_value_column_names))
- self.build_and_print(df, None)
- for account_name in df['account_name'].unique():
- self.build_and_print(df, account_name)
- def get_param_names(self):
- return ['bias'] + self.features
- def run_ols_linear_regression(self, df, print_residual=False):
- X = df[self.features] # 特征列
- y = df['RegressionY'] # 目标变量
- X = sm.add_constant(X)
- model = sm.OLS(y, X).fit()
- params = model.params
- t_stats = model.tvalues
- p_values = model.pvalues
- conf_int = model.conf_int()
- if print_residual:
- predict_y = model.predict(X)
- residuals = y - predict_y
- new_x = df[['title', 'category']].copy()
- new_x['residual'] = residuals
- new_x['y'] = y
- for index, row in new_x.iterrows():
- param_name = category_name_map.get(row['category'], None)
- if not param_name:
- continue
- param_index = self.features.index(param_name) + 1
- param = params.iloc[param_index]
- p_value = p_values.iloc[param_index]
- if p_value < 0.1:
- print(f"{row['y']:.3f}\t{row['residual']:.3f}\t{row['category']}\t{param:.2f}\t{row['title']}")
- r_min = residuals.min()
- r_max = residuals.max()
- r_avg = residuals.mean()
- print(f"residuals min: {r_min:.3f}, max: {r_max:.3f}, mean: {r_avg:.3f}")
- return params, t_stats, p_values
- class CategoryLR:
- def __init__(self):
- self.features = [
- 'CateOddities', 'CateFamily', 'CateHeartwarm',
- 'CateHistory', 'CateHealth', 'CateLifeKnowledge', 'CateGossip',
- 'CatePolitics', 'CateMilitary', 'CateMovie', 'CateSociety',
- 'view_count_rate', 'bias'
- ]
- def preprocess_data(self, df):
- for cate in category_name_map:
- colname = category_name_map[cate]
- df[colname] = df['category'] == cate
- df[colname] = df[colname].astype(int)
- df['ClassY'] = df['read_avg_rate'] > 1
- df['bias'] = 1.0
- return df
- def build_and_print(self, df, account_name):
- if account_name is not None:
- sub_df = df[df['account_name'] == account_name]
- else:
- sub_df = df
- sample_count = len(sub_df)
- positive_count = len(sub_df.query('ClassY == 1'))
- if sample_count < 10 or positive_count * (sample_count - positive_count) == 0:
- return
- sample_count = len(sub_df)
- params, t_stats, p_values = self.run_logistic_regression(sub_df)
- row = f'{account_name}\t{sample_count}'
- for param, p_value in zip(params, p_values):
- row += f'\t{param:.3f}'
- print(row)
- def build(self, df):
- p_value_column_names = '\t'.join(self.features)
- print('account\tsamples\t{}'.format(p_value_column_names))
- # self.build_and_print(df, None)
- for account_name in df['account_name'].unique():
- self.build_and_print(df, account_name)
- def get_param_names(self):
- return ['bias'] + self.features
- def run_logistic_regression(self, df):
- X = df[self.features] # 特征列
- y = df['ClassY'] # 目标变量
- # 将数据集分为训练集和测试集
- X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
- # 创建线性回归模型
- logreg = LogisticRegression()
- # 训练模型
- logreg.fit(X_train, y_train)
- # 预测测试集
- y_pred = logreg.predict(X_test)
- # 评估模型性能
- accuracy = accuracy_score(y_test, y_pred)
- conf_matrix = confusion_matrix(y_test, y_pred)
- class_report = classification_report(y_test, y_pred)
- # print(f"Accuracy: {accuracy}")
- # print(f"Confusion Matrix: \n{conf_matrix}")
- # print(f"Classification Report: \n{class_report}")
- return logreg.coef_[0], None, [0] * len(logreg.coef_[0])
- def main():
- df = pd.read_excel('20241101_read_rate_samples.xlsx') # 如果数据来自CSV文件
- df['read_avg'] = df['阅读均值']
- df['read_avg_rate'] = df['阅读倍数']
- df['dt'] = df['日期']
- df['similarity'] = df['Similarity']
- filter_condition = 'read_avg > 500 ' \
- 'and read_avg_rate > 0 and read_avg_rate < 3 ' \
- 'and dt > 20240914 and similarity > 0'
- df = df.query(filter_condition).copy()
- m_cate = CategoryRegressionV1()
- df = m_cate.preprocess_data(df)
- m_cate.build(df)
- if __name__ == '__main__':
- main()
|