123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205 |
- """
- 计算各个账号的阅读均值,以及阅读均值区间估计的上限
- """
- import json
- import numpy as np
- from tqdm import tqdm
- from scipy import stats
- from pymysql.cursors import DictCursor
- from applications.const import UpdateAccountReadAvgTaskConst
- from applications.db import DatabaseConnector
- from applications.utils import fetch_account_fans
- from applications.utils import fetch_publishing_account_list
- from config import apolloConfig
- from config import long_articles_config, denet_config, piaoquan_crawler_config
- config = apolloConfig()
- const = UpdateAccountReadAvgTaskConst()
- unauthorized_account = json.loads(config.getConfigValue("unauthorized_gh_id_fans"))
- touliu_accounts = set(json.loads(config.getConfigValue("touliu_gh_id_list")))
- backup_account_fans = json.loads(config.getConfigValue("backup_account_fans"))
- class AccountPositionReadAvgTask(object):
- def __init__(self):
- # init piaoquan crawler db client
- self.piaoquan_crawler_db_client = DatabaseConnector(piaoquan_crawler_config)
- self.piaoquan_crawler_db_client.connect()
- # init long articles db client
- self.long_articles_db_client = DatabaseConnector(long_articles_config)
- self.long_articles_db_client.connect()
- # init aigc db client
- self.denet_db_client = DatabaseConnector(denet_config)
- self.denet_db_client.connect()
- def fetch_read_rate_avg_for_each_account(self, dt):
- dt = int(dt.replace("-", ""))
- sql = f"""
- select gh_id, position, read_rate_avg
- from {const.ACCOUNT_READ_RATE_TABLE}
- where dt_version = {dt};
- """
- fetch_response_list = self.long_articles_db_client.fetch(
- query=sql, cursor_type=DictCursor
- )
- account_read_rate_dict = {}
- for item in fetch_response_list:
- key = "{}_{}".format(item["gh_id"], item["position"])
- account_read_rate_dict[key] = item["read_rate_avg"]
- return account_read_rate_dict
- def cal_read_avg_ci(self, gh_id, position):
- """
- 计算阅读均值的置信区间
- """
- fetch_query = f"""
- select read_avg, update_time
- from {const.ACCOUNT_READ_AVG_TABLE}
- where gh_id = %s and position = %s
- order by update_time desc limit 30;
- """
- fetch_response_list = self.piaoquan_crawler_db_client.fetch(
- query=fetch_query, params=(gh_id, position), cursor_type=DictCursor
- )
- read_avg_list = [i["read_avg"] for i in fetch_response_list]
- n = len(read_avg_list)
- mean = np.mean(read_avg_list)
- std = np.std(read_avg_list, ddof=1)
- se = std / np.sqrt(n)
- t = stats.t.ppf(0.975, df=n - 1)
- upper_t = mean + t * se
- return upper_t
- def process_each_record(
- self, account, index, fans, read_rate_avg, read_avg, read_avg_ci_upper, dt
- ):
- gh_id = account["gh_id"]
- business_type = (
- const.TOULIU if gh_id in touliu_accounts else const.ARTICLES_DAILY
- )
- # insert into database
- insert_sql = f"""
- insert into {const.ACCOUNT_READ_AVG_TABLE}
- (gh_id, position, update_time, account_name, fans, read_avg, like_avg, status, account_type,
- account_mode, account_source, account_status, business_type, read_rate_avg, read_avg_ci_upper)
- values
- (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
- """
- try:
- self.piaoquan_crawler_db_client.save(
- query=insert_sql,
- params=(
- gh_id,
- index,
- dt,
- account["account_name"],
- fans,
- read_avg,
- const.DEFAULT_LIKE,
- const.USING_STATUS,
- account["account_type"],
- account["mode_type"],
- account["account_source"],
- account["status"],
- business_type,
- read_rate_avg,
- read_avg_ci_upper,
- ),
- )
- except Exception as e:
- update_sql = f"""
- update {const.ACCOUNT_READ_AVG_TABLE}
- set fans = %s, read_avg = %s, read_rate_avg = %s, read_avg_ci_upper = %s
- where gh_id = %s and position = %s and update_time = %s
- """
- try:
- self.piaoquan_crawler_db_client.save(
- query=update_sql,
- params=(
- fans,
- read_avg,
- read_rate_avg,
- read_avg_ci_upper,
- account["gh_id"],
- index,
- dt,
- ),
- )
- except Exception as e:
- print(e)
- # 修改前一天的状态为 0
- update_status_sql = f"""
- update {const.ACCOUNT_READ_AVG_TABLE}
- set status = %s
- where update_time != %s and gh_id = %s and position = %s;
- """
- self.piaoquan_crawler_db_client.save(
- query=update_status_sql,
- params=(const.NOT_USING_STATUS, dt, account["gh_id"], index),
- )
- def cal_read_avg_for_each_account(self, account, fans_dict, read_rate_avg_dict, dt):
- gh_id = account["gh_id"]
- fans = fans_dict.get(gh_id, {}).get(dt, const.DEFAULT_FANS)
- # use unauthorized account's fans if not found in aigc
- if not fans:
- fans = int(unauthorized_account.get(gh_id, const.DEFAULT_FANS))
- # use backup account's fans if not found in aigc
- if not fans:
- fans = int(backup_account_fans.get(gh_id, const.DEFAULT_FANS))
- if fans:
- for index in const.ARTICLE_INDEX_LIST:
- gh_id_position = "{}_{}".format(gh_id, index)
- if read_rate_avg_dict.get(gh_id_position):
- # fetch read rate avg
- read_rate_avg = read_rate_avg_dict[gh_id_position]
- # cal read avg
- read_avg = fans * read_rate_avg
- # cal read avg ci upper
- read_avg_ci_upper = self.cal_read_avg_ci(gh_id, index)
- # insert into database
- self.process_each_record(
- account,
- index,
- fans,
- read_rate_avg,
- read_avg,
- read_avg_ci_upper,
- dt,
- )
- def do_task_list(self, dt):
- """
- do it
- """
- # get fans dict from aigc
- fans_dict = fetch_account_fans(self.denet_db_client, dt)
- # get publishing account list from aigc
- account_list = fetch_publishing_account_list(self.denet_db_client)
- # fetch each account's read avg for each position
- read_rate_avg_dict = self.fetch_read_rate_avg_for_each_account(dt)
- for account in tqdm(account_list, desc=dt):
- self.cal_read_avg_for_each_account(
- account, fans_dict, read_rate_avg_dict, dt
- )
- if __name__ == "__main__":
- A = AccountPositionReadAvgTask()
- A.do_task_list("2025-04-20")
|