""" @author: luojunhui cal each account && position reading rate """ from tqdm import tqdm from datetime import datetime, timezone, timedelta from pandas import DataFrame from applications import DeNetMysql, PQMySQL, longArticlesMySQL STATISTICS_PERIOD = 31 * 24 * 60 * 60 def filter_outlier_data(group, key='show_view_count'): """ :param group: :param key: :return: """ mean = group[key].mean() std = group[key].std() # 过滤二倍标准差的数据 filtered_group = group[(group[key] > mean - 2 * std) & (group[key] < mean + 2 * std)] # 过滤均值倍数大于5的数据 new_mean = filtered_group[key].mean() filtered_group = filtered_group[filtered_group[key] < new_mean * 5] return filtered_group def timestamp_to_str(timestamp) -> str: """ :param timestamp: """ dt_object = datetime.utcfromtimestamp(timestamp).replace(tzinfo=timezone.utc).astimezone() date_string = dt_object.strftime('%Y-%m-%d') return date_string def str_to_timestamp(date_string) -> int: """ :param date_string: :return: """ date_obj = datetime.strptime(date_string, '%Y-%m-%d') # 使用timestamp()方法将datetime对象转换为时间戳 timestamp = date_obj.timestamp() return timestamp def get_account_fans_by_dt(db_client) -> dict: """ 获取每个账号发粉丝,通过日期来区分 :return: """ sql = f""" SELECT t1.date_str, t1.fans_count, t2.gh_id FROM datastat_wx t1 JOIN publish_account t2 ON t1.account_id = t2.id WHERE t2.channel = 5 AND t2.status = 1 AND t1.date_str >= '2024-07-01' ORDER BY t1.date_str; """ result = db_client.select(sql) D = {} for line in result: dt = line[0] fans = line[1] gh_id = line[2] if D.get(gh_id): D[gh_id][dt] = fans else: D[gh_id] = {dt: fans} return D def get_publishing_accounts(db_client) -> list[dict]: """ 获取每日正在发布的账号 :return: """ sql = f""" SELECT DISTINCT t3.`name`, t3.gh_id, t3.follower_count, t6.account_source_name, t6.mode_type, t6.account_type, t6.`status` FROM publish_plan t1 JOIN publish_plan_account t2 ON t1.id = t2.plan_id JOIN publish_account t3 ON t2.account_id = t3.id LEFT JOIN publish_account_wx_type t4 on t3.id = t4.account_id LEFT JOIN wx_statistics_group_source_account t5 on t3.id = t5.account_id LEFT JOIN wx_statistics_group_source t6 on t5.group_source_name = t6.account_source_name WHERE t1.plan_status = 1 AND t3.channel = 5 AND t3.follower_count > 0 GROUP BY t3.id; """ account_list = db_client.select(sql) result_list = [ { "account_name": i[0], "gh_id": i[1] } for i in account_list ] return result_list def get_account_articles_detail(db_client, gh_id_tuple) -> list[dict]: """ get articles details :return: """ sql = f""" SELECT ghId, accountName, updateTime, ItemIndex, show_view_count FROM official_articles_v2 WHERE ghId IN {gh_id_tuple} and Type = '9'; """ result = db_client.select(sql) response_list = [ { "ghId": i[0], "accountName": i[1], "updateTime": i[2], "ItemIndex": i[3], "show_view_count": i[4] } for i in result ] return response_list def cal_account_read_rate(gh_id_tuple) -> DataFrame: """ 计算账号位置的阅读率 :return: """ pq_db = PQMySQL() de_db = DeNetMysql() response = [] fans_dict_each_day = get_account_fans_by_dt(db_client=de_db) account_article_detail = get_account_articles_detail( db_client=pq_db, gh_id_tuple=gh_id_tuple ) for line in account_article_detail: gh_id = line['ghId'] dt = timestamp_to_str(line['updateTime']) fans = fans_dict_each_day.get(gh_id, {}).get(dt, 0) line['fans'] = fans if fans: line['readRate'] = line['show_view_count'] / fans if fans else 0 response.append(line) return DataFrame(response, columns=['ghId', 'accountName', 'updateTime', 'ItemIndex', 'show_view_count', 'readRate']) def cal_avg_account_read_rate(df, gh_id, index, dt) -> tuple: """ 计算账号的阅读率均值 :return: """ max_time = str_to_timestamp(dt) min_time = max_time - STATISTICS_PERIOD filterDataFrame = df[ (df["ghId"] == gh_id) & (min_time <= df["updateTime"]) & (df["updateTime"] <= max_time) & (df['ItemIndex'] == index) ] finalDF = filter_outlier_data(filterDataFrame) return ( finalDF['readRate'].mean(), finalDF['updateTime'].max(), finalDF['updateTime'].min(), len(finalDF) ) def update_single_day(dt, account_list, article_df, lam): """ 更新单天数据 :param article_df: :param lam: :param account_list: :param dt: :return: """ index_list = [1, 2, 3, 4, 5, 6, 7, 8] for account in tqdm(account_list): for index in index_list: avg_rate, max_time, min_time, a_count = cal_avg_account_read_rate(article_df, account['gh_id'], index, dt) # print(account['account_name'], "\t", index, "\t", avg_rate, "\t", max_time, "\t", min_time, "\t", a_count, # "\t", account['gh_id']) try: if avg_rate == 0: continue insert_sql = f""" INSERT INTO long_articles_read_rate_dev (account_name, gh_id, position, read_rate_avg, remark, articles_count, earliest_publish_time, latest_publish_time, dt_version, is_delete) values (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s); """ lam.update( sql=insert_sql, params=( account['account_name'], account['gh_id'], index, avg_rate, "从 {} 开始往前计算 31 天".format(dt), a_count, timestamp_to_str(min_time), timestamp_to_str(max_time), dt.replace("-", ""), 0 ) ) except Exception as e: print(e) def main() -> None: """ main function :return: """ lam = longArticlesMySQL() de = DeNetMysql() # dt = '2024-10-22' account_list = get_publishing_accounts(db_client=de) # 获取这些账号所有的文章 df = cal_account_read_rate(tuple([i['gh_id'] for i in account_list])) start_dt = start_date = datetime(2024, 8, 1) end_date = datetime(2024, 10, 22) # 计算日期差 delta = end_date - start_date # 生成日期字符串列表 date_strings = [] for i in range(delta.days + 1): date_strings.append((start_date + timedelta(days=i)).strftime('%Y-%m-%d')) # 打印结果 date_str = '2024-09-11' date_strings = [date_str,] for date_str in tqdm(date_strings): update_single_day(date_str, account_list, df, lam) if __name__ == '__main__': main()