""" @author: luojunhui """ import json import threading from datetime import datetime from pandas import DataFrame from tqdm import tqdm from applications import PQMySQL def filter_outlier_data(group, key='show_view_count'): """ :param group: :param key: :return: """ mean = group[key].mean() std = group[key].std() # 过滤二倍标准差的数据 filtered_group = group[(group[key] > mean - 2 * std) & (group[key] < mean + 2 * std)] # 过滤均值倍数大于5的数据 new_mean = filtered_group[key].mean() filtered_group = filtered_group[filtered_group[key] < new_mean * 5] return filtered_group class UpdateMsgDaily(object): """ 日常更新文章 """ db_client = PQMySQL() with open("config/accountInfoV2.json", encoding="utf-8") as f: account_list = json.loads(f.read()) # subscription_account = [i for i in account_list if i['type'] == '订阅号'] # server_account = [i for i in account_list if i['type'] == '服务号'] @classmethod def getAccountIdDict(cls): """ 获取全部内部账号的id :return: """ gh_id_dict = {} for line in cls.account_list: gh_id = line['gh_id'] gh_id_dict[gh_id] = line return gh_id_dict @classmethod def getAllAvgRead(cls): """ :return: """ L = [] record_list = cls.account_list for item in tqdm(record_list): index_list = [i for i in range(1, 9)] for index in index_list: account_name = item['name'] print(account_name, index) avg_read, avg_like = cls.getArticleByFilter( account_name=account_name, index=index, min_time=1716480000, max_time=1721836800 ) obj = { "account_name": account_name, "gh_id": item['ghId'], "fans": item.get('follower_count', 0), "position": index, "avg_read": avg_read, "avg_like": avg_like } L.append(obj) with open("new_account_avg_v2.json", "w", encoding="utf-8") as f: f.write(json.dumps(L, ensure_ascii=False, indent=4)) @classmethod def getEachAvgRead(cls, account_name, index): """ :return: """ keys = [ "appMsgId", "title", "Type", "updateTime", "ItemIndex", "ContentUrl", "show_view_count", "show_like_count", ] sql = f""" SELECT {", ".join(keys)} FROM official_articles_v2 WHERE accountName = '{account_name}' and ItemIndex = {index};""" result = cls.db_client.select(sql=sql) return DataFrame(result, columns=keys) @classmethod def getArticleByFilter( cls, account_name, index, min_time=None, max_time=None, msg_type=None, ): """ :param account_name: :param index: index ranges from 1 to 8 :param min_time: earliest time :param max_time: latest time :param msg_type: msg_type :return: """ if not msg_type: msg_type = "9" if not min_time: min_time = 0 if not max_time: # 2099年 max_time = 4088051123 articleDataFrame = cls.getEachAvgRead(account_name=account_name, index=index) filterDataFrame = articleDataFrame[ (articleDataFrame["Type"] == msg_type) & (min_time <= articleDataFrame["updateTime"]) & (articleDataFrame["updateTime"] <= max_time) ] # 过滤异常值 finalDF = filter_outlier_data(filterDataFrame) return finalDF['show_view_count'].mean(), finalDF['show_like_count'].mean() def job_with_thread(job_func): """ 每个任务放到单个线程中 :param job_func: :return: """ job_thread = threading.Thread(target=job_func) job_thread.start() if __name__ == "__main__": UMD = UpdateMsgDaily() UMD.getAllAvgRead()