""" 计算各个账号的阅读均值,以及阅读均值区间估计的上限 """ import json import numpy as np from tqdm import tqdm from scipy import stats from pymysql.cursors import DictCursor from applications.const import UpdateAccountReadAvgTaskConst from applications.db import DatabaseConnector from applications.utils import fetch_account_fans from applications.utils import fetch_publishing_account_list from config import apolloConfig from config import long_articles_config, denet_config, piaoquan_crawler_config config = apolloConfig() const = UpdateAccountReadAvgTaskConst() unauthorized_account = json.loads(config.getConfigValue("unauthorized_gh_id_fans")) touliu_accounts = set(json.loads(config.getConfigValue("touliu_gh_id_list"))) backup_account_fans = json.loads(config.getConfigValue("backup_account_fans")) class AccountPositionReadAvgTask(object): def __init__(self): # init piaoquan crawler db client self.piaoquan_crawler_db_client = DatabaseConnector(piaoquan_crawler_config) self.piaoquan_crawler_db_client.connect() # init long articles db client self.long_articles_db_client = DatabaseConnector(long_articles_config) self.long_articles_db_client.connect() # init aigc db client self.denet_db_client = DatabaseConnector(denet_config) self.denet_db_client.connect() def fetch_read_rate_avg_for_each_account(self, dt): dt = int(dt.replace("-", "")) sql = f""" select gh_id, position, read_rate_avg from {const.ACCOUNT_READ_RATE_TABLE} where dt_version = {dt}; """ fetch_response_list = self.long_articles_db_client.fetch( query=sql, cursor_type=DictCursor ) account_read_rate_dict = {} for item in fetch_response_list: key = "{}_{}".format(item["gh_id"], item["position"]) account_read_rate_dict[key] = item["read_rate_avg"] return account_read_rate_dict def cal_read_avg_ci(self, gh_id, position): """ 计算阅读均值的置信区间 """ fetch_query = f""" select read_avg, update_time from {const.ACCOUNT_READ_AVG_TABLE} where gh_id = %s and position = %s order by update_time desc limit 30; """ fetch_response_list = self.piaoquan_crawler_db_client.fetch( query=fetch_query, params=(gh_id, position), cursor_type=DictCursor ) read_avg_list = [i["read_avg"] for i in fetch_response_list] n = len(read_avg_list) mean = np.mean(read_avg_list) std = np.std(read_avg_list, ddof=1) se = std / np.sqrt(n) t = stats.t.ppf(0.975, df=n - 1) upper_t = mean + t * se return upper_t def process_each_record( self, account, index, fans, read_rate_avg, read_avg, read_avg_ci_upper, dt ): gh_id = account["gh_id"] business_type = ( const.TOULIU if gh_id in touliu_accounts else const.ARTICLES_DAILY ) # insert into database insert_sql = f""" insert into {const.ACCOUNT_READ_AVG_TABLE} (gh_id, position, update_time, account_name, fans, read_avg, like_avg, status, account_type, account_mode, account_source, account_status, business_type, read_rate_avg, read_avg_ci_upper) values (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s); """ try: self.piaoquan_crawler_db_client.save( query=insert_sql, params=( gh_id, index, dt, account["account_name"], fans, read_avg, const.DEFAULT_LIKE, const.USING_STATUS, account["account_type"], account["mode_type"], account["account_source"], account["status"], business_type, read_rate_avg, read_avg_ci_upper, ), ) except Exception as e: update_sql = f""" update {const.ACCOUNT_READ_AVG_TABLE} set fans = %s, read_avg = %s, read_rate_avg = %s, read_avg_ci_upper = %s where gh_id = %s and position = %s and update_time = %s """ try: self.piaoquan_crawler_db_client.save( query=update_sql, params=( fans, read_avg, read_rate_avg, read_avg_ci_upper, account["gh_id"], index, dt, ), ) except Exception as e: print(e) # 修改前一天的状态为 0 update_status_sql = f""" update {const.ACCOUNT_READ_AVG_TABLE} set status = %s where update_time != %s and gh_id = %s and position = %s; """ self.piaoquan_crawler_db_client.save( query=update_status_sql, params=(const.NOT_USING_STATUS, dt, account["gh_id"], index), ) def cal_read_avg_for_each_account(self, account, fans_dict, read_rate_avg_dict, dt): gh_id = account["gh_id"] fans = fans_dict.get(gh_id, {}).get(dt, const.DEFAULT_FANS) # use unauthorized account's fans if not found in aigc if not fans: fans = int(unauthorized_account.get(gh_id, const.DEFAULT_FANS)) # use backup account's fans if not found in aigc if not fans: fans = int(backup_account_fans.get(gh_id, const.DEFAULT_FANS)) if fans: for index in const.ARTICLE_INDEX_LIST: gh_id_position = "{}_{}".format(gh_id, index) if read_rate_avg_dict.get(gh_id_position): # fetch read rate avg read_rate_avg = read_rate_avg_dict[gh_id_position] # cal read avg read_avg = fans * read_rate_avg # cal read avg ci upper read_avg_ci_upper = self.cal_read_avg_ci(gh_id, index) # insert into database self.process_each_record( account, index, fans, read_rate_avg, read_avg, read_avg_ci_upper, dt, ) def do_task_list(self, dt): """ do it """ # get fans dict from aigc fans_dict = fetch_account_fans(self.denet_db_client, dt) # get publishing account list from aigc account_list = fetch_publishing_account_list(self.denet_db_client) # fetch each account's read avg for each position read_rate_avg_dict = self.fetch_read_rate_avg_for_each_account(dt) for account in tqdm(account_list, desc=dt): self.cal_read_avg_for_each_account( account, fans_dict, read_rate_avg_dict, dt )