""" @author: luojunhui CREATE TABLE `account_avg_info_v2` ( `gh_id` varchar(32) NOT NULL COMMENT 'ghid', `position` int(11) NOT NULL COMMENT '位置', `account_name` varchar(255) DEFAULT NULL COMMENT '账号名称', `fans` int(10) DEFAULT NULL COMMENT '粉丝量', `read_avg` double(8,2) DEFAULT NULL COMMENT '阅读均值', `like_avg` double(8,2) DEFAULT NULL COMMENT '点赞均值', `update_time` datetime DEFAULT NULL COMMENT '更新时间 dt', `status` int(1) DEFAULT NULL COMMENT ' 状态', PRIMARY KEY (`gh_id`,`position`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 AVG_ROW_LENGTH=202 ROW_FORMAT=DYNAMIC """ import json import time import schedule from datetime import datetime from pandas import DataFrame from tqdm import tqdm from applications import PQMySQL, DeNetMysql, Functions, log, bot def filter_outlier_data(group, key='show_view_count'): """ :param group: :param key: :return: """ mean = group[key].mean() std = group[key].std() # 过滤二倍标准差的数据 filtered_group = group[(group[key] > mean - 2 * std) & (group[key] < mean + 2 * std)] # 过滤均值倍数大于5的数据 new_mean = filtered_group[key].mean() filtered_group = filtered_group[filtered_group[key] < new_mean * 5] return filtered_group class UpdateAvgDaily(object): """ 日常更新文章 """ pqClient = PQMySQL() deNetClient = DeNetMysql() @classmethod def getAccountList(cls): """ get official accounts and its ghid, fans, and account_type :return: """ sql = f""" SELECT t1.`name`, t1.gh_id, t1.follower_count, t3.account_type, t3.account_source_name, t3.mode_type, t3.status FROM `publish_account` t1 JOIN wx_statistics_group_source_account t2 on t1.id = t2.account_id JOIN wx_statistics_group_source t3 on t2.group_source_name = t3.account_source_name; """ response = cls.deNetClient.select(sql) log( task="updateAccountAvgDaily", function="getAccountList", message="获取账号成功,一共获取: {} 个账号".format(len(response)) ) L = [] for item in response: temp = { "accountName": item[0], "ghId": item[1], "fans": item[2], "accountType": item[3], "accountSource": item[4], "accountMode": item[5], "accountStatus": item[6] } if temp["accountName"] in ['口琴', '二胡']: continue elif temp["accountType"] == '服务号': continue else: L.append(temp) log( task="updateAccountAvgDaily", function="getAccountList", message="过滤账号成功,过滤后一共获取: {} 个账号".format(len(L)) ) return L @classmethod def insertIntoMysql(cls, data): """ 将数据插入到 Mysql 中 :param data: :return: """ sql = f""" INSERT INTO account_avg_info_v2 (gh_id, position, account_name, fans, read_avg, like_avg, update_time, status, account_type, account_mode, account_source, account_status) values (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s); """ try: cls.pqClient.update( sql=sql, params=( data['gh_id'], data['position'], data['account_name'], data['fans'], data['avg_read'], data['avg_like'], data['update_time'], 1, data['account_type'], data['account_mode'], data['account_source'], data['account_status'] ) ) log( task="updateAccountAvgDaily", function="insertIntoMysql", message="数据插入成功", data=data ) except Exception as e: log( task="updateAccountAvgDaily", function="insertIntoMysql", message="数据插入失败, 失败原因是: {}".format(e), status="fail", data=data ) @classmethod def getAllAvgRead(cls): """ :return: """ L = [] record_list = cls.getAccountList() dt_str = datetime.today().__str__().split(" ")[0] for item in tqdm(record_list): index_list = [i for i in range(1, 9)] for index in index_list: try: account_name = item['accountName'] avg_read, avg_like = cls.getArticleByFilter( account_name=account_name, index=index, min_time=int(time.time()) - 31 * 24 * 3600, max_time=int(time.time()) ) obj = { "account_name": account_name, "gh_id": item['ghId'], "fans": item.get('fans', 0), "position": index, "avg_read": avg_read if str(avg_read) != "nan" else 0, "avg_like": avg_like if str(avg_like) != "nan" else 0, "update_time": dt_str, "account_type": item['accountType'], "account_mode": item['accountMode'], "account_source": item['accountSource'], "account_status": item['accountStatus'] } cls.insertIntoMysql(obj) L.append(obj) except Exception as e: log( task="updateAccountAvgDaily", function="getAllAvgRead", status="fail", message="更新单个账号单个位置的账号均值失败, 失败原因是: {}".format(e) ) with open("new_account_avg_v3.json", "w", encoding="utf-8") as f: f.write(json.dumps(L, ensure_ascii=False, indent=4)) log( task="updateAccountAvgDaily", function="getAllAvgRead", message="账号均值数据写入文件成功" ) update_sql = f""" UPDATE account_avg_info_v2 SET status = %s where update_time != '{dt_str}'; """ try: cls.pqClient.update(sql=update_sql, params=0) log( task="updateAccountAvgDaily", function="getAllAvgRead", message="修改非当日数据状态为 0 成功" ) except Exception as e: bot( title="账号均值表,更新非当日数据状态失败", detail={ "task": "updateAccountAvgDaily" } ) log( task="updateAccountAvgDaily", function="getAllAvgRead", status="fail", message="修改非当日数据状态为 0 失败, 报错为 {}".format(e) ) @classmethod def getEachAvgRead(cls, account_name, index): """ :return: """ keys = [ "appMsgId", "title", "Type", "updateTime", "ItemIndex", "ContentUrl", "show_view_count", "show_like_count", ] sql = f""" SELECT {", ".join(keys)} FROM official_articles_v2 WHERE accountName = '{account_name}' and ItemIndex = {index};""" result = cls.pqClient.select(sql=sql) return DataFrame(result, columns=keys) @classmethod def getArticleByFilter( cls, account_name, index, min_time=None, max_time=None, msg_type=None, ): """ :param account_name: :param index: index ranges from 1 to 8 :param min_time: earliest time :param max_time: latest time :param msg_type: msg_type :return: """ if not msg_type: msg_type = "9" if not min_time: min_time = 0 if not max_time: # 2099年 max_time = 4088051123 articleDataFrame = cls.getEachAvgRead(account_name=account_name, index=index) filterDataFrame = articleDataFrame[ (articleDataFrame["Type"] == msg_type) & (min_time <= articleDataFrame["updateTime"]) & (articleDataFrame["updateTime"] <= max_time) ] # 过滤异常值 finalDF = filter_outlier_data(filterDataFrame) return finalDF['show_view_count'].mean(), finalDF['show_like_count'].mean() def updateAvgJob(): """ :return: """ S = UpdateAvgDaily() S.getAllAvgRead() if __name__ == "__main__": schedule.every().day.at("22:00").do(Functions().job_with_thread, updateAvgJob) while True: schedule.run_pending() time.sleep(1) # log( # task="updateAccountAvgDaily", # function="main", # message="更新账号均值任务正常执行" # )