""" @author: luojunhui """ import json import time from tqdm import tqdm from datetime import datetime, timedelta from argparse import ArgumentParser from pymysql.cursors import DictCursor from applications.const import UpdateAccountReadAvgTaskConst from applications.db import DatabaseConnector from applications.utils import fetch_account_fans from applications.utils import fetch_publishing_account_list from config import apolloConfig from config import long_articles_config, denet_config, piaoquan_crawler_config read_rate_table = "long_articles_read_rate" read_avg_table = "account_avg_info_v3" config = apolloConfig() const = UpdateAccountReadAvgTaskConst() unauthorized_account = json.loads(config.getConfigValue("unauthorized_gh_id_fans")) touliu_accounts = set(json.loads(config.getConfigValue("touliu_gh_id_list"))) backup_account_fans = json.loads(config.getConfigValue("backup_account_fans")) class UpdateAccountInfoVersion3(object): """ 更新账号的平均阅读率 """ def __init__(self): # init piaoquan crawler db client self.piaoquan_crawler_db_client = DatabaseConnector(piaoquan_crawler_config) self.piaoquan_crawler_db_client.connect() # init long articles db client self.long_articles_db_client = DatabaseConnector(long_articles_config) self.long_articles_db_client.connect() # init aigc db client self.denet_db_client = DatabaseConnector(denet_config) self.denet_db_client.connect() def fetch_read_rate_avg_for_each_account(self, dt): """ 从长文数据库获取账号阅读均值 :return: """ dt = int(dt.replace("-", "")) sql = f""" select gh_id, position, read_rate_avg from {read_rate_table} where dt_version = {dt}; """ fetch_response_list = self.long_articles_db_client.fetch(query=sql, cursor_type=DictCursor) account_read_rate_dict = {} for item in fetch_response_list: key = "{}_{}".format(item['gh_id'], item['position']) account_read_rate_dict[key] = item['read_rate_avg'] return account_read_rate_dict def do_task_list(self, dt): """ do it """ # get fans dict from aigc fans_dict = fetch_account_fans(self.denet_db_client, dt) # get publishing account list from aigc account_list = fetch_publishing_account_list(self.denet_db_client) # fetch each account's read avg for each position read_rate_avg_dict = self.fetch_read_rate_avg_for_each_account(dt) for account in tqdm(account_list, desc=dt): gh_id = account["gh_id"] business_type = const.TOULIU if gh_id in touliu_accounts else const.ARTICLES_DAILY fans = fans_dict.get(gh_id, {}).get(dt, const.DEFAULT_FANS) # use unauthorized account's fans if not found in aigc if not fans: fans = int(unauthorized_account.get(gh_id, const.DEFAULT_FANS)) # use backup account's fans if not found in aigc if not fans: fans = int(backup_account_fans.get(gh_id, const.DEFAULT_FANS)) if fans: for index in const.ARTICLE_INDEX_LIST: gh_id_position = "{}_{}".format(gh_id, index) if read_rate_avg_dict.get(gh_id_position): # fetch read rate avg read_rate_avg = read_rate_avg_dict[gh_id_position] # cal read avg read_avg = fans * read_rate_avg # insert into database insert_sql = f""" insert into {read_avg_table} (gh_id, position, update_time, account_name, fans, read_avg, like_avg, status, account_type, account_mode, account_source, account_status, business_type, read_rate_avg) values (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s); """ try: self.piaoquan_crawler_db_client.save( query=insert_sql, params=( gh_id, index, dt, account['account_name'], fans, read_avg, const.DEFAULT_LIKE, const.USING_STATUS, account['account_type'], account['mode_type'], account['account_source'], account['status'], business_type, read_rate_avg ) ) except Exception as e: update_sql = f""" update {read_avg_table} set fans = %s, read_avg = %s, read_rate_avg = %s where gh_id = %s and position = %s and update_time = %s """ try: self.piaoquan_crawler_db_client.save( query=update_sql, params=( fans, read_avg, read_rate_avg, account['gh_id'], index, dt ) ) except Exception as e: print(e) # 修改前一天的状态为 0 update_status_sql = f""" update {read_avg_table} set status = %s where update_time != %s and gh_id = %s and position = %s; """ self.piaoquan_crawler_db_client.save( query=update_status_sql, params=( const.NOT_USING_STATUS, dt, account['gh_id'], index ) ) def main(): """ main job :return: """ parser = ArgumentParser() parser.add_argument("--run-date", help="Run only once for date in format of %Y-%m-%d. \ If no specified, run as daily jobs.") args = parser.parse_args() update_account_read_avg_task = UpdateAccountInfoVersion3() if args.run_date: update_account_read_avg_task.do_task_list(dt=args.run_date) else: dt_object = datetime.fromtimestamp(int(time.time())) one_day = timedelta(days=1) yesterday = dt_object - one_day yesterday_str = yesterday.strftime('%Y-%m-%d') update_account_read_avg_task.do_task_list(dt=yesterday_str) if __name__ == '__main__': main()