123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182 |
- """
- @author: luojunhui
- """
- import json
- import time
- from tqdm import tqdm
- from datetime import datetime, timedelta
- from argparse import ArgumentParser
- from pymysql.cursors import DictCursor
- from applications.const import UpdateAccountReadAvgTaskConst
- from applications.db import DatabaseConnector
- from applications.utils import fetch_account_fans
- from applications.utils import fetch_publishing_account_list
- from config import apolloConfig
- from config import long_articles_config, denet_config, piaoquan_crawler_config
- read_rate_table = "long_articles_read_rate"
- read_avg_table = "account_avg_info_v3"
- config = apolloConfig()
- const = UpdateAccountReadAvgTaskConst()
- unauthorized_account = json.loads(config.getConfigValue("unauthorized_gh_id_fans"))
- touliu_accounts = set(json.loads(config.getConfigValue("touliu_gh_id_list")))
- backup_account_fans = json.loads(config.getConfigValue("backup_account_fans"))
- class UpdateAccountInfoVersion3(object):
- """
- 更新账号的平均阅读率
- """
- def __init__(self):
- # init piaoquan crawler db client
- self.piaoquan_crawler_db_client = DatabaseConnector(piaoquan_crawler_config)
- self.piaoquan_crawler_db_client.connect()
- # init long articles db client
- self.long_articles_db_client = DatabaseConnector(long_articles_config)
- self.long_articles_db_client.connect()
- # init aigc db client
- self.denet_db_client = DatabaseConnector(denet_config)
- self.denet_db_client.connect()
- def fetch_read_rate_avg_for_each_account(self, dt):
- """
- 从长文数据库获取账号阅读均值
- :return:
- """
- dt = int(dt.replace("-", ""))
- sql = f"""
- select gh_id, position, read_rate_avg
- from {read_rate_table}
- where dt_version = {dt};
- """
- fetch_response_list = self.long_articles_db_client.fetch(query=sql, cursor_type=DictCursor)
- account_read_rate_dict = {}
- for item in fetch_response_list:
- key = "{}_{}".format(item['gh_id'], item['position'])
- account_read_rate_dict[key] = item['read_rate_avg']
- return account_read_rate_dict
- def do_task_list(self, dt):
- """
- do it
- """
- # get fans dict from aigc
- fans_dict = fetch_account_fans(self.denet_db_client, dt)
- # get publishing account list from aigc
- account_list = fetch_publishing_account_list(self.denet_db_client)
- # fetch each account's read avg for each position
- read_rate_avg_dict = self.fetch_read_rate_avg_for_each_account(dt)
- for account in tqdm(account_list, desc=dt):
- gh_id = account["gh_id"]
- business_type = const.TOULIU if gh_id in touliu_accounts else const.ARTICLES_DAILY
- fans = fans_dict.get(gh_id, {}).get(dt, const.DEFAULT_FANS)
- # use unauthorized account's fans if not found in aigc
- if not fans:
- fans = int(unauthorized_account.get(gh_id, const.DEFAULT_FANS))
- # use backup account's fans if not found in aigc
- if not fans:
- fans = int(backup_account_fans.get(gh_id, const.DEFAULT_FANS))
- if fans:
- for index in const.ARTICLE_INDEX_LIST:
- gh_id_position = "{}_{}".format(gh_id, index)
- if read_rate_avg_dict.get(gh_id_position):
- # fetch read rate avg
- read_rate_avg = read_rate_avg_dict[gh_id_position]
- # cal read avg
- read_avg = fans * read_rate_avg
- # insert into database
- insert_sql = f"""
- insert into {read_avg_table}
- (gh_id, position, update_time, account_name, fans, read_avg, like_avg, status, account_type, account_mode, account_source, account_status, business_type, read_rate_avg)
- values
- (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
- """
- try:
- self.piaoquan_crawler_db_client.save(
- query=insert_sql,
- params=(
- gh_id,
- index,
- dt,
- account['account_name'],
- fans,
- read_avg,
- const.DEFAULT_LIKE,
- const.USING_STATUS,
- account['account_type'],
- account['mode_type'],
- account['account_source'],
- account['status'],
- business_type,
- read_rate_avg
- )
- )
- except Exception as e:
- update_sql = f"""
- update {read_avg_table}
- set fans = %s, read_avg = %s, read_rate_avg = %s
- where gh_id = %s and position = %s and update_time = %s
- """
- try:
- self.piaoquan_crawler_db_client.save(
- query=update_sql,
- params=(
- fans,
- read_avg,
- read_rate_avg,
- account['gh_id'],
- index,
- dt
- )
- )
- except Exception as e:
- print(e)
- # 修改前一天的状态为 0
- update_status_sql = f"""
- update {read_avg_table}
- set status = %s
- where update_time != %s and gh_id = %s and position = %s;
- """
- self.piaoquan_crawler_db_client.save(
- query=update_status_sql,
- params=(
- const.NOT_USING_STATUS, dt, account['gh_id'], index
- )
- )
- def main():
- """
- main job
- :return:
- """
- parser = ArgumentParser()
- parser.add_argument("--run-date",
- help="Run only once for date in format of %Y-%m-%d. \
- If no specified, run as daily jobs.")
- args = parser.parse_args()
- update_account_read_avg_task = UpdateAccountInfoVersion3()
- if args.run_date:
- update_account_read_avg_task.do_task_list(dt=args.run_date)
- else:
- dt_object = datetime.fromtimestamp(int(time.time()))
- one_day = timedelta(days=1)
- yesterday = dt_object - one_day
- yesterday_str = yesterday.strftime('%Y-%m-%d')
- update_account_read_avg_task.do_task_list(dt=yesterday_str)
- if __name__ == '__main__':
- main()
|