#! /usr/bin/env python # -*- coding: utf-8 -*- # vim:fenc=utf-8 # # Copyright © 2024 StrayWarrior """ Models for long article categories. """ import pandas as pd from sklearn.model_selection import train_test_split from sklearn.linear_model import LogisticRegression, LinearRegression from sklearn.metrics import accuracy_score, confusion_matrix, classification_report from sklearn.metrics import mean_squared_error, r2_score import statsmodels.api as sm from .consts import category_name_map, reverse_category_name_map class CategoryRegressionV1: def __init__(self): self.features = [ 'CateOddities', 'CateFamily', 'CateHeartwarm', 'CateHistory', 'CateHealth', 'CateLifeKnowledge', 'CateGossip', 'CatePolitics', 'CateMilitary', 'CateMovie', 'CateSociety', 'view_count_rate' ] def preprocess_data(self, df): for cate in category_name_map: colname = category_name_map[cate] df[colname] = df['category'] == cate df[colname] = df[colname].astype(int) df['ClassY'] = df['read_avg_rate'] > 1 df['RegressionY'] = df['read_avg_rate'] return df def build_and_print(self, df, account_name): if account_name is not None: sub_df = df[df['account_name'] == account_name] else: sub_df = df if len(sub_df) < 5: return sample_count = len(sub_df) params, t_stats, p_values = self.run_ols_linear_regression(sub_df) row = f'{account_name}\t{sample_count}' for param, p_value in zip(params, p_values): row += f'\t{param:.3f}\t{p_value:.3f}' print(row) def build(self, df): p_value_column_names = '\t'.join([name + "\tp-" + name for name in ['bias'] + self.features]) print('account\tsamples\t{}'.format(p_value_column_names)) self.build_and_print(df, None) for account_name in df['account_name'].unique(): self.build_and_print(df, account_name) def get_param_names(self): return ['bias'] + self.features def run_ols_linear_regression(self, df, print_residual=False): X = df[self.features] # 特征列 y = df['RegressionY'] # 目标变量 X = sm.add_constant(X) model = sm.OLS(y, X).fit() params = model.params t_stats = model.tvalues p_values = model.pvalues conf_int = model.conf_int() if print_residual: predict_y = model.predict(X) residuals = y - predict_y new_x = df[['title', 'category']].copy() new_x['residual'] = residuals new_x['y'] = y for index, row in new_x.iterrows(): param_name = category_name_map.get(row['category'], None) if not param_name: continue param_index = self.features.index(param_name) + 1 param = params.iloc[param_index] p_value = p_values.iloc[param_index] if p_value < 0.1: print(f"{row['y']:.3f}\t{row['residual']:.3f}\t{row['category']}\t{param:.2f}\t{row['title']}") r_min = residuals.min() r_max = residuals.max() r_avg = residuals.mean() print(f"residuals min: {r_min:.3f}, max: {r_max:.3f}, mean: {r_avg:.3f}") return params, t_stats, p_values class CategoryLR: def __init__(self): self.features = [ 'CateOddities', 'CateFamily', 'CateHeartwarm', 'CateHistory', 'CateHealth', 'CateLifeKnowledge', 'CateGossip', 'CatePolitics', 'CateMilitary', 'CateMovie', 'CateSociety', 'view_count_rate', 'bias' ] def preprocess_data(self, df): for cate in category_name_map: colname = category_name_map[cate] df[colname] = df['category'] == cate df[colname] = df[colname].astype(int) df['ClassY'] = df['read_avg_rate'] > 1 df['bias'] = 1.0 return df def build_and_print(self, df, account_name): if account_name is not None: sub_df = df[df['account_name'] == account_name] else: sub_df = df sample_count = len(sub_df) positive_count = len(sub_df.query('ClassY == 1')) if sample_count < 10 or positive_count * (sample_count - positive_count) == 0: return sample_count = len(sub_df) params, t_stats, p_values = self.run_logistic_regression(sub_df) row = f'{account_name}\t{sample_count}' for param, p_value in zip(params, p_values): row += f'\t{param:.3f}' print(row) def build(self, df): p_value_column_names = '\t'.join(self.features) print('account\tsamples\t{}'.format(p_value_column_names)) # self.build_and_print(df, None) for account_name in df['account_name'].unique(): self.build_and_print(df, account_name) def get_param_names(self): return ['bias'] + self.features def run_logistic_regression(self, df): X = df[self.features] # 特征列 y = df['ClassY'] # 目标变量 # 将数据集分为训练集和测试集 X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42) # 创建线性回归模型 logreg = LogisticRegression() # 训练模型 logreg.fit(X_train, y_train) # 预测测试集 y_pred = logreg.predict(X_test) # 评估模型性能 accuracy = accuracy_score(y_test, y_pred) conf_matrix = confusion_matrix(y_test, y_pred) class_report = classification_report(y_test, y_pred) # print(f"Accuracy: {accuracy}") # print(f"Confusion Matrix: \n{conf_matrix}") # print(f"Classification Report: \n{class_report}") return logreg.coef_[0], None, [0] * len(logreg.coef_[0]) def main(): df = pd.read_excel('20241101_read_rate_samples.xlsx') # 如果数据来自CSV文件 df['read_avg'] = df['阅读均值'] df['read_avg_rate'] = df['阅读倍数'] df['dt'] = df['日期'] df['similarity'] = df['Similarity'] filter_condition = 'read_avg > 500 ' \ 'and read_avg_rate > 0 and read_avg_rate < 3 ' \ 'and dt > 20240914 and similarity > 0' df = df.query(filter_condition).copy() m_cate = CategoryRegressionV1() df = m_cate.preprocess_data(df) m_cate.build(df) if __name__ == '__main__': main()