123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377 |
- """
- @author: luojunhui
- cal each account && position reading rate
- """
- import json
- from tqdm import tqdm
- from pandas import DataFrame
- from argparse import ArgumentParser
- from datetime import datetime, timezone, timedelta
- from applications import DeNetMysql, PQMySQL, longArticlesMySQL, bot
- STATISTICS_PERIOD = 31 * 24 * 60 * 60
- ONE_DAY_IN_SECONDS = 60 * 60 * 24
- def float_to_percentage(value, decimals=3) -> str:
- """
- 把小数转化为百分数
- :param value:
- :param decimals:
- :return:
- """
- percentage_value = round(value * 100, decimals)
- return "{}%".format(percentage_value)
- def filter_outlier_data(group, key='show_view_count'):
- """
- :param group:
- :param key:
- :return:
- """
- mean = group[key].mean()
- std = group[key].std()
- # 过滤二倍标准差的数据
- filtered_group = group[(group[key] > mean - 2 * std) & (group[key] < mean + 2 * std)]
- # 过滤均值倍数大于5的数据
- new_mean = filtered_group[key].mean()
- # print("阅读均值", new_mean)
- filtered_group = filtered_group[filtered_group[key] < new_mean * 5]
- return filtered_group
- def timestamp_to_str(timestamp) -> str:
- """
- :param timestamp:
- """
- dt_object = datetime.utcfromtimestamp(timestamp).replace(tzinfo=timezone.utc).astimezone()
- date_string = dt_object.strftime('%Y-%m-%d')
- return date_string
- def str_to_timestamp(date_string) -> int:
- """
- :param date_string:
- :return:
- """
- date_obj = datetime.strptime(date_string, '%Y-%m-%d')
- # 使用timestamp()方法将datetime对象转换为时间戳
- timestamp = date_obj.timestamp()
- return int(timestamp)
- def get_account_fans_by_dt(db_client) -> tuple[dict, dict]:
- """
- 获取每个账号发粉丝,通过日期来区分
- :return:
- """
- sql = f"""
- SELECT
- t1.date_str,
- t1.fans_count,
- t2.gh_id
- FROM datastat_wx t1
- JOIN publish_account t2 ON t1.account_id = t2.id
- WHERE
- t2.channel = 5
- AND t2.status = 1
- AND t1.date_str >= '2024-07-01'
- ORDER BY t1.date_str;
- """
- result = db_client.select(sql)
- # 分日期的粉丝数据
- gh_id_fans_dt_dict = {}
- # 不分日期的粉丝数据
- gh_id_fans_dict = {}
- for line in result:
- dt = line[0]
- fans = line[1]
- gh_id = line[2]
- if gh_id_fans_dt_dict.get(gh_id):
- gh_id_fans_dt_dict[gh_id][dt] = fans
- else:
- gh_id_fans_dt_dict[gh_id] = {dt: fans}
- if fans:
- gh_id_fans_dict[gh_id] = fans
- return gh_id_fans_dt_dict, gh_id_fans_dict
- def get_publishing_accounts(db_client) -> list[dict]:
- """
- 获取每日正在发布的账号
- :return:
- """
- sql = f"""
- SELECT DISTINCT
- t3.`name`,
- t3.gh_id,
- t3.follower_count,
- t6.account_source_name,
- t6.mode_type,
- t6.account_type,
- t6.`status`
- FROM
- publish_plan t1
- JOIN publish_plan_account t2 ON t1.id = t2.plan_id
- JOIN publish_account t3 ON t2.account_id = t3.id
- LEFT JOIN publish_account_wx_type t4 on t3.id = t4.account_id
- LEFT JOIN wx_statistics_group_source_account t5 on t3.id = t5.account_id
- LEFT JOIN wx_statistics_group_source t6 on t5.group_source_name = t6.account_source_name
- WHERE
- t1.plan_status = 1
- AND t3.channel = 5
- AND t3.follower_count > 0
- GROUP BY t3.id;
- """
- account_list = db_client.select(sql)
- result_list = [
- {
- "account_name": i[0],
- "gh_id": i[1]
- } for i in account_list
- ]
- return result_list
- def get_account_articles_detail(db_client, gh_id_tuple) -> list[dict]:
- """
- get articles details
- :return:
- """
- sql = f"""
- SELECT
- ghId, accountName, updateTime, ItemIndex, show_view_count
- FROM
- official_articles_v2
- WHERE
- ghId IN {gh_id_tuple} and Type = '9';
- """
- result = db_client.select(sql)
- response_list = [
- {
- "ghId": i[0],
- "accountName": i[1],
- "updateTime": i[2],
- "ItemIndex": i[3],
- "show_view_count": i[4]
- }
- for i in result
- ]
- return response_list
- def cal_account_read_rate(gh_id_tuple) -> DataFrame:
- """
- 计算账号位置的阅读率
- :return:
- """
- pq_db = PQMySQL()
- de_db = DeNetMysql()
- response = []
- fans_dict_each_day, fans_dict = get_account_fans_by_dt(db_client=de_db)
- account_article_detail = get_account_articles_detail(
- db_client=pq_db,
- gh_id_tuple=gh_id_tuple
- )
- for line in account_article_detail:
- gh_id = line['ghId']
- dt = timestamp_to_str(line['updateTime'])
- fans = fans_dict_each_day.get(gh_id, {}).get(dt, 0)
- if fans == 0:
- fans = fans_dict.get(gh_id, 0)
- line['fans'] = fans
- if fans:
- line['readRate'] = line['show_view_count'] / fans if fans else 0
- response.append(line)
- return DataFrame(response,
- columns=['ghId', 'accountName', 'updateTime', 'ItemIndex', 'show_view_count', 'readRate'])
- def cal_avg_account_read_rate(df, gh_id, index, dt) -> tuple:
- """
- 计算账号的阅读率均值
- :return:
- """
- max_time = str_to_timestamp(dt)
- min_time = max_time - STATISTICS_PERIOD
- filterDataFrame = df[
- (df["ghId"] == gh_id)
- & (min_time <= df["updateTime"])
- & (df["updateTime"] <= max_time)
- & (df['ItemIndex'] == index)
- ]
- finalDF = filter_outlier_data(filterDataFrame)
- # finalDF = finalDF.sort_values(by=['updateTime'], ascending=False)
- # if index == 1:
- # for i in finalDF.values.tolist():
- # print(datetime.fromtimestamp(i[2]).strftime('%Y-%m-%d'), i)
- return (
- finalDF['readRate'].mean(),
- finalDF['updateTime'].max(),
- finalDF['updateTime'].min(),
- len(finalDF)
- )
- def check_each_position(db_client, gh_id, index, dt, avg_rate) -> dict:
- """
- 检验某个具体账号的具体文章的阅读率均值和前段日子的比较
- :param avg_rate: 当天计算出的阅读率均值
- :param db_client: 数据库连接
- :param gh_id: 账号 id
- :param index: 账号 index
- :param dt:
- :return:
- """
- RELATIVE_VALUE_THRESHOLD = 0.1
- dt = int(dt.replace("-", ""))
- select_sql = f"""
- SELECT account_name, read_rate_avg
- FROM long_articles_read_rate
- WHERE gh_id = '{gh_id}' and position = {index} and dt_version < {dt}
- ORDER BY dt_version DESC limit 1;
- """
- result = db_client.select(select_sql)
- if result:
- account_name = result[0][0]
- previous_read_rate_avg = result[0][1]
- relative_value = (avg_rate - previous_read_rate_avg) / previous_read_rate_avg
- if -RELATIVE_VALUE_THRESHOLD <= relative_value <= RELATIVE_VALUE_THRESHOLD:
- return {}
- else:
- response = {
- "账号名称": account_name,
- "位置": index,
- "当天阅读率均值": float_to_percentage(avg_rate),
- "前一天阅读率均值": float_to_percentage(previous_read_rate_avg),
- "相对变化率": float_to_percentage(relative_value)
- }
- return response
- def update_single_day(dt, account_list, article_df, lam):
- """
- 更新单天数据
- :param article_df:
- :param lam:
- :param account_list:
- :param dt:
- :return:
- """
- index_list = [1, 2, 3, 4, 5, 6, 7, 8]
- error_list = []
- insert_error_list = []
- update_timestamp = str_to_timestamp(dt)
- # 因为计算均值的时候是第二天,所以需要把时间前移一天
- avg_date = timestamp_to_str(update_timestamp - ONE_DAY_IN_SECONDS)
- for account in tqdm(account_list):
- for index in index_list:
- avg_rate, max_time, min_time, articles_count = cal_avg_account_read_rate(article_df, account['gh_id'], index, dt)
- if articles_count > 0:
- if index in {1, 2}:
- error_obj = check_each_position(
- db_client=lam,
- gh_id=account['gh_id'],
- index=index,
- dt=dt,
- avg_rate=avg_rate
- )
- if error_obj:
- error_list.append(error_obj)
- # continue
- try:
- if avg_rate == 0:
- continue
- insert_sql = f"""
- INSERT INTO long_articles_read_rate
- (account_name, gh_id, position, read_rate_avg, remark, articles_count, earliest_publish_time, latest_publish_time, dt_version, is_delete)
- values
- (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
- """
- lam.update(
- sql=insert_sql,
- params=(
- account['account_name'],
- account['gh_id'],
- index,
- avg_rate,
- "从 {} 开始往前计算 31 天".format(dt),
- articles_count,
- timestamp_to_str(min_time),
- timestamp_to_str(max_time),
- avg_date.replace("-", ""),
- 0
- )
- )
- except Exception as e:
- insert_error_list.append(e)
- if insert_error_list:
- bot(
- title="更新阅读率均值,存在sql 插入失败",
- detail=insert_error_list
- )
- if error_list:
- bot(
- title="更新阅读率均值,头次出现异常值通知",
- detail={
- "时间": dt,
- "异常列表": error_list
- }
- )
- if not error_list and not insert_error_list:
- bot(
- title="阅读率均值表,更新成功",
- detail={
- "日期": dt
- }
- )
- def main() -> None:
- """
- main function
- :return:
- """
- parser = ArgumentParser()
- parser.add_argument("--run-date",
- help="Run only once for date in format of %Y-%m-%d. \
- If no specified, run as daily jobs.")
- args = parser.parse_args()
- if args.run_date:
- dt = args.run_date
- else:
- dt = datetime.today().strftime('%Y-%m-%d')
- lam = longArticlesMySQL()
- de = DeNetMysql()
- account_list = get_publishing_accounts(db_client=de)
- df = cal_account_read_rate(tuple([i['gh_id'] for i in account_list]))
- update_single_day(dt, account_list, df, lam)
- # start_dt = start_date = datetime(2024, 8, 1)
- # end_date = datetime(2024, 10, 22)
- # # 计算日期差
- # delta = end_date - start_date
- # # 生成日期字符串列表
- # date_strings = []
- # for i in range(delta.days + 1):
- # date_strings.append((start_date + timedelta(days=i)).strftime('%Y-%m-%d'))
- #
- # # 打印结果
- # date_str = '2024-09-11'
- # date_strings = [date_str,]
- # for date_str in tqdm(date_strings):
- # update_single_day(date_str, account_list, df, lam)
- if __name__ == '__main__':
- main()
|