""" @author: luojunhui CREATE TABLE `account_avg_info_v2` ( `gh_id` varchar(32) NOT NULL COMMENT 'ghid', `position` int(11) NOT NULL COMMENT '位置', `account_name` varchar(255) DEFAULT NULL COMMENT '账号名称', `fans` int(10) DEFAULT NULL COMMENT '粉丝量', `read_avg` double(8,2) DEFAULT NULL COMMENT '阅读均值', `like_avg` double(8,2) DEFAULT NULL COMMENT '点赞均值', `update_time` datetime DEFAULT NULL COMMENT '更新时间 dt', `status` int(1) DEFAULT NULL COMMENT ' 状态', PRIMARY KEY (`gh_id`,`position`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 AVG_ROW_LENGTH=202 ROW_FORMAT=DYNAMIC """ import json import time import schedule from datetime import datetime from pandas import DataFrame from tqdm import tqdm from applications import PQMySQL, DeNetMysql, Functions def filter_outlier_data(group, key='show_view_count'): """ :param group: :param key: :return: """ mean = group[key].mean() std = group[key].std() # 过滤二倍标准差的数据 filtered_group = group[(group[key] > mean - 2 * std) & (group[key] < mean + 2 * std)] # 过滤均值倍数大于5的数据 new_mean = filtered_group[key].mean() filtered_group = filtered_group[filtered_group[key] < new_mean * 5] return filtered_group class UpdateAvgDaily(object): """ 日常更新文章 """ pqClient = PQMySQL() deNetClient = DeNetMysql() @classmethod def getAccountList(cls): """ 获取账号 list :return: """ sql = f""" SELECT t1.`name`,t1.gh_id, t1.follower_count FROM `publish_account` t1 JOIN wx_statistics_group_source_account t2 ON t1.id = t2.account_id UNION SELECT t1.`name`, t1.gh_id, t1.follower_count FROM `publish_account` t1 where t1.`name` in ( '晚年家人', '历史长河流淌', '史趣探秘', '暖心一隅', '小阳看天下', '小惠爱厨房'); """ response = cls.deNetClient.select(sql) L = [] for item in response: temp = { "accountName": item[0], "ghId": item[1], "fans": item[2] } L.append(temp) return L @classmethod def getAccountIdDict(cls): """ 获取全部内部账号的id :return: """ gh_id_dict = {} for line in cls.account_list: gh_id = line['gh_id'] gh_id_dict[gh_id] = line return gh_id_dict @classmethod def insertIntoMysql(cls, data): """ 将数据插入到 Mysql 中 :param data: :return: """ sql = f""" INSERT INTO account_avg_info_v2 (gh_id, position, account_name, fans, read_avg, like_avg, update_time, status) values (%s, %s, %s, %s, %s, %s, %s, %s); """ cls.pqClient.update( sql=sql, params=( data['gh_id'], data['position'], data['account_name'], data['fans'], data['avg_read'], data['avg_like'], data['update_time'], 1 ) ) @classmethod def getAllAvgRead(cls): """ :return: """ L = [] record_list = cls.getAccountList() dt_str = datetime.today().__str__().split(" ")[0] for item in tqdm(record_list): index_list = [i for i in range(1, 9)] for index in index_list: try: account_name = item['accountName'] avg_read, avg_like = cls.getArticleByFilter( account_name=account_name, index=index, min_time=int(time.time()) - 31 * 24 * 3600, max_time=int(time.time()) ) obj = { "account_name": account_name, "gh_id": item['ghId'], "fans": item.get('fans', 0), "position": index, "avg_read": avg_read if str(avg_read) != "nan" else 0, "avg_like": avg_like if str(avg_like) != "nan" else 0, "update_time": dt_str } cls.insertIntoMysql(obj) L.append(obj) except Exception as e: print(e) with open("new_account_avg_v3.json", "w", encoding="utf-8") as f: f.write(json.dumps(L, ensure_ascii=False, indent=4)) update_sql = f""" UPDATE account_avg_info_v2 SET status = %s where update_time != '{dt_str}'; """ cls.pqClient.update(sql=update_sql, params=0) @classmethod def getEachAvgRead(cls, account_name, index): """ :return: """ keys = [ "appMsgId", "title", "Type", "updateTime", "ItemIndex", "ContentUrl", "show_view_count", "show_like_count", ] sql = f""" SELECT {", ".join(keys)} FROM official_articles_v2 WHERE accountName = '{account_name}' and ItemIndex = {index};""" result = cls.pqClient.select(sql=sql) return DataFrame(result, columns=keys) @classmethod def getArticleByFilter( cls, account_name, index, min_time=None, max_time=None, msg_type=None, ): """ :param account_name: :param index: index ranges from 1 to 8 :param min_time: earliest time :param max_time: latest time :param msg_type: msg_type :return: """ if not msg_type: msg_type = "9" if not min_time: min_time = 0 if not max_time: # 2099年 max_time = 4088051123 articleDataFrame = cls.getEachAvgRead(account_name=account_name, index=index) filterDataFrame = articleDataFrame[ (articleDataFrame["Type"] == msg_type) & (min_time <= articleDataFrame["updateTime"]) & (articleDataFrame["updateTime"] <= max_time) ] # 过滤异常值 finalDF = filter_outlier_data(filterDataFrame) return finalDF['show_view_count'].mean(), finalDF['show_like_count'].mean() def updateAvgJob(): """ :return: """ S = UpdateAvgDaily() S.getAllAvgRead() if __name__ == "__main__": schedule.every().day.at("22:30").do(Functions().job_with_thread, updateAvgJob) while True: schedule.run_pending() time.sleep(1)