123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323 |
- """
- @author: luojunhui
- cal each account && position reading rate
- """
- import json
- from tqdm import tqdm
- from pandas import DataFrame
- from argparse import ArgumentParser
- from datetime import datetime
- from pymysql.cursors import DictCursor
- from applications import bot, Functions, log
- from applications import create_feishu_columns_sheet
- from applications.db import DatabaseConnector
- from applications.const import UpdateAccountReadRateTaskConst
- from applications.utils import fetch_publishing_account_list
- from applications.utils import fetch_account_fans
- from config import apolloConfig, long_articles_config, piaoquan_crawler_config, denet_config
- const = UpdateAccountReadRateTaskConst()
- config = apolloConfig()
- unauthorized_account = json.loads(config.getConfigValue("unauthorized_gh_id_fans"))
- backup_account_fans = json.loads(config.getConfigValue("backup_account_fans"))
- functions = Functions()
- read_rate_table = "long_articles_read_rate"
- def filter_outlier_data(group, key='show_view_count'):
- """
- :param group:
- :param key:
- :return:
- """
- mean = group[key].mean()
- std = group[key].std()
- # 过滤二倍标准差的数据
- filtered_group = group[(group[key] > mean - 2 * std) & (group[key] < mean + 2 * std)]
- # 过滤均值倍数大于5的数据
- new_mean = filtered_group[key].mean()
- # print("阅读均值", new_mean)
- filtered_group = filtered_group[filtered_group[key] < new_mean * 5]
- return filtered_group
- def get_account_articles_detail(db_client, gh_id_tuple, min_publish_timestamp) -> list[dict]:
- """
- get articles details
- :return:
- """
- sql = f"""
- SELECT
- ghId, accountName, ItemIndex, show_view_count, publish_timestamp
- FROM
- official_articles_v2
- WHERE
- ghId IN {gh_id_tuple} and Type = '{const.BULK_PUBLISH_TYPE}' and publish_timestamp >= {min_publish_timestamp};
- """
- response_list = db_client.fetch(query=sql, cursor_type=DictCursor)
- return response_list
- def cal_account_read_rate(article_list, fans_dict) -> DataFrame:
- """
- 计算账号位置的阅读率
- :return:
- """
- response = []
- for line in article_list:
- gh_id = line['ghId']
- dt = functions.timestamp_to_str(timestamp=line['publish_timestamp'], string_format='%Y-%m-%d')
- fans = fans_dict.get(gh_id, {}).get(dt, const.DEFAULT_FANS)
- if not fans:
- fans = int(unauthorized_account.get(gh_id, const.DEFAULT_FANS))
- if not fans:
- fans = int(backup_account_fans.get(gh_id, const.DEFAULT_FANS))
- log(
- task='cal_read_rate_avg_task',
- function='cal_account_read_rate',
- message='未获取到粉丝,使用备份粉丝表',
- data=line
- )
- line['fans'] = fans
- if fans > const.MIN_FANS:
- line['readRate'] = line['show_view_count'] / fans if fans else 0
- response.append(line)
- return DataFrame(response, columns=['ghId', 'accountName', 'ItemIndex', 'show_view_count', 'publish_timestamp', 'readRate'])
- def cal_avg_account_read_rate(df, gh_id, index, dt) -> dict:
- """
- 计算账号的阅读率均值
- :return:
- """
- max_time = functions.str_to_timestamp(date_string=dt)
- min_time = max_time - const.STATISTICS_PERIOD
- # 通过
- filter_dataframe = df[
- (df["ghId"] == gh_id)
- & (min_time <= df["publish_timestamp"])
- & (df["publish_timestamp"] <= max_time)
- & (df['ItemIndex'] == index)
- ]
- # 用二倍标准差过滤
- final_dataframe = filter_outlier_data(filter_dataframe)
- return {
- "read_rate_avg": final_dataframe['readRate'].mean(),
- "max_publish_time": final_dataframe['publish_timestamp'].max(),
- "min_publish_time": final_dataframe['publish_timestamp'].min(),
- "records": len(final_dataframe)
- }
- def check_each_position(db_client, gh_id, index, dt, avg_rate) -> dict:
- """
- 检验某个具体账号的具体文章的阅读率均值和前段日子的比较
- :param avg_rate: 当天计算出的阅读率均值
- :param db_client: 数据库连接
- :param gh_id: 账号 id
- :param index: 账号 index
- :param dt:
- :return:
- """
- dt = int(dt.replace("-", ""))
- select_sql = f"""
- SELECT account_name, read_rate_avg
- FROM {read_rate_table}
- WHERE gh_id = '{gh_id}' and position = {index} and dt_version < {dt}
- ORDER BY dt_version DESC limit 1;
- """
- result = db_client.fetch(select_sql)
- if result:
- account_name = result[0][0]
- previous_read_rate_avg = result[0][1]
- relative_value = (avg_rate - previous_read_rate_avg) / previous_read_rate_avg
- if -const.RELATIVE_VALUE_THRESHOLD <= relative_value <= const.RELATIVE_VALUE_THRESHOLD:
- return {}
- else:
- response = {
- "account_name": account_name,
- "position": index,
- "read_rate_avg_yesterday": Functions().float_to_percentage(avg_rate),
- "read_rate_avg_the_day_before_yesterday": Functions().float_to_percentage(previous_read_rate_avg),
- "relative_change_rate": [
- {
- "text": Functions().float_to_percentage(relative_value),
- "color": "red" if relative_value < 0 else "green"
- }
- ]
- }
- return response
- def update_single_day(dt, account_list, article_df, lam):
- """
- 更新单天数据
- :param article_df:
- :param lam:
- :param account_list:
- :param dt:
- :return:
- """
- error_list = []
- insert_error_list = []
- update_timestamp = functions.str_to_timestamp(date_string=dt)
- # 因为计算均值的时候是第二天,所以需要把时间前移一天
- avg_date = functions.timestamp_to_str(
- timestamp=update_timestamp - const.ONE_DAY_IN_SECONDS,
- string_format='%Y-%m-%d'
- )
- # processed_account_set
- processed_account_set = set()
- for account in tqdm(account_list, desc=dt):
- for index in const.ARTICLE_INDEX_LIST:
- read_rate_detail = cal_avg_account_read_rate(
- df=article_df,
- gh_id=account['gh_id'],
- index=index,
- dt=dt
- )
- read_rate_avg = read_rate_detail['read_rate_avg']
- max_publish_time = read_rate_detail['max_publish_time']
- min_publish_time = read_rate_detail['min_publish_time']
- articles_count = read_rate_detail['records']
- if articles_count:
- processed_account_set.add(account['gh_id'])
- # check read rate in position 1 and 2
- if index in [1, 2]:
- error_obj = check_each_position(
- db_client=lam,
- gh_id=account['gh_id'],
- index=index,
- dt=dt,
- avg_rate=read_rate_avg
- )
- if error_obj:
- error_list.append(error_obj)
- # insert into database
- try:
- if not read_rate_avg:
- continue
- insert_sql = f"""
- INSERT INTO {read_rate_table}
- (account_name, gh_id, position, read_rate_avg, remark, articles_count, earliest_publish_time, latest_publish_time, dt_version, is_delete)
- values
- (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
- """
- lam.save(
- query=insert_sql,
- params=(
- account['account_name'],
- account['gh_id'],
- index,
- read_rate_avg,
- "从 {} 开始往前计算 31 天".format(dt),
- articles_count,
- functions.timestamp_to_str(timestamp=min_publish_time, string_format='%Y-%m-%d'),
- functions.timestamp_to_str(timestamp=max_publish_time, string_format='%Y-%m-%d'),
- avg_date.replace("-", ""),
- 0
- )
- )
- except Exception as e:
- print(e)
- insert_error_list.append(str(e))
- # bot sql error
- if insert_error_list:
- bot(
- title="更新阅读率均值,存在sql 插入失败",
- detail=insert_error_list
- )
- # bot outliers
- if error_list:
- columns = [
- create_feishu_columns_sheet(sheet_type="plain_text", sheet_name="account_name", display_name="账号名称"),
- create_feishu_columns_sheet(sheet_type="plain_text", sheet_name="position", display_name="文章位置"),
- create_feishu_columns_sheet(sheet_type="plain_text", sheet_name="read_rate_avg_yesterday",
- display_name="昨日阅读率均值"),
- create_feishu_columns_sheet(sheet_type="plain_text", sheet_name="read_rate_avg_the_day_before_yesterday",
- display_name="前天阅读率均值"),
- create_feishu_columns_sheet(sheet_type="options", sheet_name="relative_change_rate",
- display_name="相对变化率")
- ]
- bot(
- title="阅读率均值表异常信息, 总共处理{}个账号".format(len(processed_account_set)),
- detail={
- "columns": columns,
- "rows": error_list
- },
- table=True,
- mention=False
- )
- # if no error, send success info
- if not error_list and not insert_error_list:
- bot(
- title="阅读率均值表更新成功, 总共处理{}个账号".format(len(processed_account_set)),
- detail={
- "日期": dt
- },
- mention=False
- )
- def main() -> None:
- """
- main function
- :return:
- """
- parser = ArgumentParser()
- parser.add_argument("--run-date",
- help="Run only once for date in format of %Y-%m-%d. \
- If no specified, run as daily jobs.")
- args = parser.parse_args()
- if args.run_date:
- dt = args.run_date
- else:
- dt = datetime.today().strftime('%Y-%m-%d')
- # init stat period
- max_time = functions.str_to_timestamp(date_string=dt)
- min_time = max_time - const.STATISTICS_PERIOD
- min_stat_date = functions.timestamp_to_str(timestamp=min_time, string_format='%Y-%m-%d')
- # init database connector
- long_articles_db_client = DatabaseConnector(db_config=long_articles_config)
- long_articles_db_client.connect()
- piaoquan_crawler_db_client = DatabaseConnector(db_config=piaoquan_crawler_config)
- piaoquan_crawler_db_client.connect()
- denet_db_client = DatabaseConnector(db_config=denet_config)
- denet_db_client.connect()
- # get account list
- account_list = fetch_publishing_account_list(db_client=denet_db_client)
- # get fans dict
- fans_dict = fetch_account_fans(db_client=denet_db_client, start_date=min_stat_date)
- # get data frame from official_articles_v2
- gh_id_tuple = tuple([i['gh_id'] for i in account_list])
- article_list = get_account_articles_detail(db_client=piaoquan_crawler_db_client, gh_id_tuple=gh_id_tuple, min_publish_timestamp=min_time)
- # cal account read rate and make a dataframe
- read_rate_dataframe = cal_account_read_rate(article_list, fans_dict)
- # update each day's data
- update_single_day(dt, account_list, read_rate_dataframe, long_articles_db_client)
- if __name__ == '__main__':
- main()
|