123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344 |
- """
- @author: luojunhui
- cal each account && position reading rate
- """
- import json
- from tqdm import tqdm
- from pandas import DataFrame
- from argparse import ArgumentParser
- from datetime import datetime, timedelta
- from applications import DeNetMysql, PQMySQL, longArticlesMySQL, bot, Functions
- from applications.const import updateAccountReadRateTaskConst
- from config import apolloConfig
- const = updateAccountReadRateTaskConst()
- config = apolloConfig()
- unauthorized_account = json.loads(config.getConfigValue("unauthed_account_fans"))
- functions = Functions()
- read_rate_table = "long_articles_read_rate"
- def filter_outlier_data(group, key='show_view_count'):
- """
- :param group:
- :param key:
- :return:
- """
- mean = group[key].mean()
- std = group[key].std()
- # 过滤二倍标准差的数据
- filtered_group = group[(group[key] > mean - 2 * std) & (group[key] < mean + 2 * std)]
- # 过滤均值倍数大于5的数据
- new_mean = filtered_group[key].mean()
- # print("阅读均值", new_mean)
- filtered_group = filtered_group[filtered_group[key] < new_mean * 5]
- return filtered_group
- def get_account_fans_by_dt(db_client) -> dict:
- """
- 获取每个账号发粉丝,通过日期来区分
- :return:
- """
- sql = f"""
- SELECT
- t1.date_str,
- t1.fans_count,
- t2.gh_id
- FROM datastat_wx t1
- JOIN publish_account t2 ON t1.account_id = t2.id
- WHERE
- t2.channel = 5
- AND t2.status = 1
- AND t1.date_str >= '2024-07-01'
- ORDER BY t1.date_str;
- """
- result = db_client.select(sql)
- D = {}
- for line in result:
- dt = line[0]
- fans = line[1]
- gh_id = line[2]
- if D.get(gh_id):
- D[gh_id][dt] = fans
- else:
- D[gh_id] = {dt: fans}
- return D
- def get_publishing_accounts(db_client) -> list[dict]:
- """
- 获取每日正在发布的账号
- :return:
- """
- sql = f"""
- SELECT DISTINCT
- t3.`name`,
- t3.gh_id,
- t3.follower_count,
- t6.account_source_name,
- t6.mode_type,
- t6.account_type,
- t6.`status`
- FROM
- publish_plan t1
- JOIN publish_plan_account t2 ON t1.id = t2.plan_id
- JOIN publish_account t3 ON t2.account_id = t3.id
- LEFT JOIN publish_account_wx_type t4 on t3.id = t4.account_id
- LEFT JOIN wx_statistics_group_source_account t5 on t3.id = t5.account_id
- LEFT JOIN wx_statistics_group_source t6 on t5.group_source_name = t6.account_source_name
- WHERE
- t1.plan_status = 1
- AND t3.channel = 5
- -- AND t3.follower_count > 0
- GROUP BY t3.id;
- """
- account_list = db_client.select(sql)
- result_list = [
- {
- "account_name": i[0],
- "gh_id": i[1]
- } for i in account_list
- ]
- return result_list
- def get_account_articles_detail(db_client, gh_id_tuple) -> list[dict]:
- """
- get articles details
- :return:
- """
- sql = f"""
- SELECT
- ghId, accountName, ItemIndex, show_view_count, publish_timestamp
- FROM
- official_articles_v2
- WHERE
- ghId IN {gh_id_tuple} and Type = '{const.BULK_PUBLISH_TYPE}';
- """
- result = db_client.select(sql)
- response_list = [
- {
- "ghId": i[0],
- "accountName": i[1],
- "ItemIndex": i[2],
- "show_view_count": i[3],
- "publish_timestamp": i[4]
- }
- for i in result
- ]
- return response_list
- def cal_account_read_rate(gh_id_tuple) -> DataFrame:
- """
- 计算账号位置的阅读率
- :return:
- """
- pq_db = PQMySQL()
- de_db = DeNetMysql()
- response = []
- fans_dict_each_day = get_account_fans_by_dt(db_client=de_db)
- account_article_detail = get_account_articles_detail(
- db_client=pq_db,
- gh_id_tuple=gh_id_tuple
- )
- for line in account_article_detail:
- gh_id = line['ghId']
- dt = functions.timestamp_to_str(timestamp=line['publish_timestamp'], string_format='%Y-%m-%d')
- fans = fans_dict_each_day.get(gh_id, {}).get(dt, 0)
- if not fans:
- account_name = line['accountName']
- fans = int(unauthorized_account.get(account_name, 0))
- line['fans'] = fans
- if fans > 1000:
- line['readRate'] = line['show_view_count'] / fans if fans else 0
- response.append(line)
- return DataFrame(response,
- columns=['ghId', 'accountName', 'ItemIndex', 'show_view_count', 'publish_timestamp', 'readRate'])
- def cal_avg_account_read_rate(df, gh_id, index, dt) -> dict:
- """
- 计算账号的阅读率均值
- :return:
- """
- max_time = functions.str_to_timestamp(date_string=dt)
- min_time = max_time - const.STATISTICS_PERIOD
- # 通过
- filterDataFrame = df[
- (df["ghId"] == gh_id)
- & (min_time <= df["publish_timestamp"])
- & (df["publish_timestamp"] <= max_time)
- & (df['ItemIndex'] == index)
- ]
- # 用二倍标准差过滤
- finalDF = filter_outlier_data(filterDataFrame)
- return {
- "read_rate_avg": finalDF['readRate'].mean(),
- "max_publish_time": finalDF['publish_timestamp'].max(),
- "min_publish_time": finalDF['publish_timestamp'].min(),
- "records": len(finalDF)
- }
- def check_each_position(db_client, gh_id, index, dt, avg_rate) -> dict:
- """
- 检验某个具体账号的具体文章的阅读率均值和前段日子的比较
- :param avg_rate: 当天计算出的阅读率均值
- :param db_client: 数据库连接
- :param gh_id: 账号 id
- :param index: 账号 index
- :param dt:
- :return:
- """
- dt = int(dt.replace("-", ""))
- select_sql = f"""
- SELECT account_name, read_rate_avg
- FROM {read_rate_table}
- WHERE gh_id = '{gh_id}' and position = {index} and dt_version < {dt}
- ORDER BY dt_version DESC limit 1;
- """
- result = db_client.select(select_sql)
- if result:
- account_name = result[0][0]
- previous_read_rate_avg = result[0][1]
- relative_value = (avg_rate - previous_read_rate_avg) / previous_read_rate_avg
- if -const.RELATIVE_VALUE_THRESHOLD <= relative_value <= const.RELATIVE_VALUE_THRESHOLD:
- return {}
- else:
- response = {
- "账号名称": account_name,
- "位置": index,
- "当天阅读率均值": Functions().float_to_percentage(avg_rate),
- "前一天阅读率均值": Functions().float_to_percentage(previous_read_rate_avg),
- "相对变化率": Functions().float_to_percentage(relative_value)
- }
- return response
- def update_single_day(dt, account_list, article_df, lam):
- """
- 更新单天数据
- :param article_df:
- :param lam:
- :param account_list:
- :param dt:
- :return:
- """
- error_list = []
- insert_error_list = []
- update_timestamp = functions.str_to_timestamp(date_string=dt)
- # 因为计算均值的时候是第二天,所以需要把时间前移一天
- avg_date = functions.timestamp_to_str(
- timestamp=update_timestamp - const.ONE_DAY_IN_SECONDS,
- string_format='%Y-%m-%d'
- )
- for account in tqdm(account_list, desc=dt):
- for index in const.ARTICLE_INDEX_LIST:
- read_rate_detail = cal_avg_account_read_rate(
- df=article_df,
- gh_id=account['gh_id'],
- index=index,
- dt=dt
- )
- read_rate_avg = read_rate_detail['read_rate_avg']
- max_publish_time = read_rate_detail['max_publish_time']
- min_publish_time = read_rate_detail['min_publish_time']
- articles_count = read_rate_detail['records']
- if articles_count:
- if index in {1, 2}:
- error_obj = check_each_position(
- db_client=lam,
- gh_id=account['gh_id'],
- index=index,
- dt=dt,
- avg_rate=read_rate_avg
- )
- if error_obj:
- error_list.append(error_obj)
- # continue
- try:
- if not read_rate_avg:
- continue
- insert_sql = f"""
- INSERT INTO {read_rate_table}
- (account_name, gh_id, position, read_rate_avg, remark, articles_count, earliest_publish_time, latest_publish_time, dt_version, is_delete)
- values
- (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
- """
- lam.update(
- sql=insert_sql,
- params=(
- account['account_name'],
- account['gh_id'],
- index,
- read_rate_avg,
- "从 {} 开始往前计算 31 天".format(dt),
- articles_count,
- functions.timestamp_to_str(timestamp=min_publish_time, string_format='%Y-%m-%d'),
- functions.timestamp_to_str(timestamp=max_publish_time, string_format='%Y-%m-%d'),
- avg_date.replace("-", ""),
- 0
- )
- )
- except Exception as e:
- insert_error_list.append(e)
- if insert_error_list:
- bot(
- title="更新阅读率均值,存在sql 插入失败",
- detail=insert_error_list
- )
- if error_list:
- bot(
- title="更新阅读率均值,头次出现异常值通知",
- detail={
- "时间": dt,
- "异常列表": error_list
- }
- )
- if not error_list and not insert_error_list:
- bot(
- title="阅读率均值表,更新成功",
- detail={
- "日期": dt
- }
- )
- def main() -> None:
- """
- main function
- :return:
- """
- parser = ArgumentParser()
- parser.add_argument("--run-date",
- help="Run only once for date in format of %Y-%m-%d. \
- If no specified, run as daily jobs.")
- args = parser.parse_args()
- if args.run_date:
- dt = args.run_date
- else:
- dt = datetime.today().strftime('%Y-%m-%d')
- lam = longArticlesMySQL()
- de = DeNetMysql()
- account_list = get_publishing_accounts(db_client=de)
- df = cal_account_read_rate(tuple([i['gh_id'] for i in account_list]))
- update_single_day(dt, account_list, df, lam)
- if __name__ == '__main__':
- main()
|