""" @author: luojunhui cal each account && position reading rate """ import json from tqdm import tqdm from pandas import DataFrame from argparse import ArgumentParser from datetime import datetime from pymysql.cursors import DictCursor from applications import bot, Functions, log from applications import create_feishu_columns_sheet from applications.db import DatabaseConnector from applications.const import UpdateAccountReadRateTaskConst from applications.utils import fetch_publishing_account_list from applications.utils import fetch_account_fans from config import apolloConfig, long_articles_config, piaoquan_crawler_config, denet_config const = UpdateAccountReadRateTaskConst() config = apolloConfig() unauthorized_account = json.loads(config.getConfigValue("unauthorized_gh_id_fans")) backup_account_fans = json.loads(config.getConfigValue("backup_account_fans")) functions = Functions() read_rate_table = "long_articles_read_rate" def filter_outlier_data(group, key='show_view_count'): """ :param group: :param key: :return: """ mean = group[key].mean() std = group[key].std() # 过滤二倍标准差的数据 filtered_group = group[(group[key] > mean - 2 * std) & (group[key] < mean + 2 * std)] # 过滤均值倍数大于5的数据 new_mean = filtered_group[key].mean() # print("阅读均值", new_mean) filtered_group = filtered_group[filtered_group[key] < new_mean * 5] return filtered_group def get_account_articles_detail(db_client, gh_id_tuple, min_publish_timestamp) -> list[dict]: """ get articles details :return: """ sql = f""" SELECT ghId, accountName, ItemIndex, show_view_count, publish_timestamp FROM official_articles_v2 WHERE ghId IN {gh_id_tuple} and Type = '{const.BULK_PUBLISH_TYPE}' and publish_timestamp >= {min_publish_timestamp}; """ response_list = db_client.fetch(query=sql, cursor_type=DictCursor) return response_list def cal_account_read_rate(article_list, fans_dict) -> DataFrame: """ 计算账号位置的阅读率 :return: """ response = [] for line in article_list: gh_id = line['ghId'] dt = functions.timestamp_to_str(timestamp=line['publish_timestamp'], string_format='%Y-%m-%d') fans = fans_dict.get(gh_id, {}).get(dt, const.DEFAULT_FANS) if not fans: fans = int(unauthorized_account.get(gh_id, const.DEFAULT_FANS)) if not fans: fans = int(backup_account_fans.get(gh_id, const.DEFAULT_FANS)) log( task='cal_read_rate_avg_task', function='cal_account_read_rate', message='未获取到粉丝,使用备份粉丝表', data=line ) line['fans'] = fans if fans > const.MIN_FANS: line['readRate'] = line['show_view_count'] / fans if fans else 0 response.append(line) return DataFrame(response, columns=['ghId', 'accountName', 'ItemIndex', 'show_view_count', 'publish_timestamp', 'readRate']) def cal_avg_account_read_rate(df, gh_id, index, dt) -> dict: """ 计算账号的阅读率均值 :return: """ max_time = functions.str_to_timestamp(date_string=dt) min_time = max_time - const.STATISTICS_PERIOD # 通过 filter_dataframe = df[ (df["ghId"] == gh_id) & (min_time <= df["publish_timestamp"]) & (df["publish_timestamp"] <= max_time) & (df['ItemIndex'] == index) ] # 用二倍标准差过滤 final_dataframe = filter_outlier_data(filter_dataframe) return { "read_rate_avg": final_dataframe['readRate'].mean(), "max_publish_time": final_dataframe['publish_timestamp'].max(), "min_publish_time": final_dataframe['publish_timestamp'].min(), "records": len(final_dataframe) } def check_each_position(db_client, gh_id, index, dt, avg_rate) -> dict: """ 检验某个具体账号的具体文章的阅读率均值和前段日子的比较 :param avg_rate: 当天计算出的阅读率均值 :param db_client: 数据库连接 :param gh_id: 账号 id :param index: 账号 index :param dt: :return: """ dt = int(dt.replace("-", "")) select_sql = f""" SELECT account_name, read_rate_avg FROM {read_rate_table} WHERE gh_id = '{gh_id}' and position = {index} and dt_version < {dt} ORDER BY dt_version DESC limit 1; """ result = db_client.fetch(select_sql) if result: account_name = result[0][0] previous_read_rate_avg = result[0][1] relative_value = (avg_rate - previous_read_rate_avg) / previous_read_rate_avg if -const.RELATIVE_VALUE_THRESHOLD <= relative_value <= const.RELATIVE_VALUE_THRESHOLD: return {} else: response = { "account_name": account_name, "position": index, "read_rate_avg_yesterday": Functions().float_to_percentage(avg_rate), "read_rate_avg_the_day_before_yesterday": Functions().float_to_percentage(previous_read_rate_avg), "relative_change_rate": [ { "text": Functions().float_to_percentage(relative_value), "color": "red" if relative_value < 0 else "green" } ] } return response def update_single_day(dt, account_list, article_df, lam): """ 更新单天数据 :param article_df: :param lam: :param account_list: :param dt: :return: """ error_list = [] insert_error_list = [] update_timestamp = functions.str_to_timestamp(date_string=dt) # 因为计算均值的时候是第二天,所以需要把时间前移一天 avg_date = functions.timestamp_to_str( timestamp=update_timestamp - const.ONE_DAY_IN_SECONDS, string_format='%Y-%m-%d' ) # processed_account_set processed_account_set = set() for account in tqdm(account_list, desc=dt): for index in const.ARTICLE_INDEX_LIST: read_rate_detail = cal_avg_account_read_rate( df=article_df, gh_id=account['gh_id'], index=index, dt=dt ) read_rate_avg = read_rate_detail['read_rate_avg'] max_publish_time = read_rate_detail['max_publish_time'] min_publish_time = read_rate_detail['min_publish_time'] articles_count = read_rate_detail['records'] if articles_count: processed_account_set.add(account['gh_id']) # check read rate in position 1 and 2 if index in [1, 2]: error_obj = check_each_position( db_client=lam, gh_id=account['gh_id'], index=index, dt=dt, avg_rate=read_rate_avg ) if error_obj: error_list.append(error_obj) # insert into database try: if not read_rate_avg: continue insert_sql = f""" INSERT INTO {read_rate_table} (account_name, gh_id, position, read_rate_avg, remark, articles_count, earliest_publish_time, latest_publish_time, dt_version, is_delete) values (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s); """ lam.save( query=insert_sql, params=( account['account_name'], account['gh_id'], index, read_rate_avg, "从 {} 开始往前计算 31 天".format(dt), articles_count, functions.timestamp_to_str(timestamp=min_publish_time, string_format='%Y-%m-%d'), functions.timestamp_to_str(timestamp=max_publish_time, string_format='%Y-%m-%d'), avg_date.replace("-", ""), 0 ) ) except Exception as e: print(e) insert_error_list.append(str(e)) # bot sql error if insert_error_list: bot( title="更新阅读率均值,存在sql 插入失败", detail=insert_error_list ) # bot outliers if error_list: columns = [ create_feishu_columns_sheet(sheet_type="plain_text", sheet_name="account_name", display_name="账号名称"), create_feishu_columns_sheet(sheet_type="plain_text", sheet_name="position", display_name="文章位置"), create_feishu_columns_sheet(sheet_type="plain_text", sheet_name="read_rate_avg_yesterday", display_name="昨日阅读率均值"), create_feishu_columns_sheet(sheet_type="plain_text", sheet_name="read_rate_avg_the_day_before_yesterday", display_name="前天阅读率均值"), create_feishu_columns_sheet(sheet_type="options", sheet_name="relative_change_rate", display_name="相对变化率") ] bot( title="阅读率均值表异常信息, 总共处理{}个账号".format(len(processed_account_set)), detail={ "columns": columns, "rows": error_list }, table=True, mention=False ) # if no error, send success info if not error_list and not insert_error_list: bot( title="阅读率均值表更新成功, 总共处理{}个账号".format(len(processed_account_set)), detail={ "日期": dt }, mention=False ) def main() -> None: """ main function :return: """ parser = ArgumentParser() parser.add_argument("--run-date", help="Run only once for date in format of %Y-%m-%d. \ If no specified, run as daily jobs.") args = parser.parse_args() if args.run_date: dt = args.run_date else: dt = datetime.today().strftime('%Y-%m-%d') # init stat period max_time = functions.str_to_timestamp(date_string=dt) min_time = max_time - const.STATISTICS_PERIOD min_stat_date = functions.timestamp_to_str(timestamp=min_time, string_format='%Y-%m-%d') # init database connector long_articles_db_client = DatabaseConnector(db_config=long_articles_config) long_articles_db_client.connect() piaoquan_crawler_db_client = DatabaseConnector(db_config=piaoquan_crawler_config) piaoquan_crawler_db_client.connect() denet_db_client = DatabaseConnector(db_config=denet_config) denet_db_client.connect() # get account list account_list = fetch_publishing_account_list(db_client=denet_db_client) # get fans dict fans_dict = fetch_account_fans(db_client=denet_db_client, start_date=min_stat_date) # get data frame from official_articles_v2 gh_id_tuple = tuple([i['gh_id'] for i in account_list]) article_list = get_account_articles_detail(db_client=piaoquan_crawler_db_client, gh_id_tuple=gh_id_tuple, min_publish_timestamp=min_time) # cal account read rate and make a dataframe read_rate_dataframe = cal_account_read_rate(article_list, fans_dict) # update each day's data update_single_day(dt, account_list, read_rate_dataframe, long_articles_db_client) if __name__ == '__main__': main()