|
@@ -7,14 +7,21 @@ from tqdm import tqdm
|
|
|
from pandas import DataFrame
|
|
|
from argparse import ArgumentParser
|
|
|
from datetime import datetime
|
|
|
+from pymysql.cursors import DictCursor
|
|
|
|
|
|
-from applications import DeNetMysql, PQMySQL, longArticlesMySQL, bot, Functions, create_feishu_columns_sheet
|
|
|
-from applications.const import updateAccountReadRateTaskConst
|
|
|
-from config import apolloConfig
|
|
|
+from applications import bot, Functions, log
|
|
|
+from applications import create_feishu_columns_sheet
|
|
|
+from applications.db import DatabaseConnector
|
|
|
+from applications.const import UpdateAccountReadRateTaskConst
|
|
|
+from applications.utils import fetch_publishing_account_list
|
|
|
+from applications.utils import fetch_account_fans
|
|
|
+from config import apolloConfig, long_articles_config, piaoquan_crawler_config, denet_config
|
|
|
|
|
|
-const = updateAccountReadRateTaskConst()
|
|
|
+
|
|
|
+const = UpdateAccountReadRateTaskConst()
|
|
|
config = apolloConfig()
|
|
|
unauthorized_account = json.loads(config.getConfigValue("unauthorized_gh_id_fans"))
|
|
|
+backup_account_fans = json.loads(config.getConfigValue("backup_account_fans"))
|
|
|
functions = Functions()
|
|
|
read_rate_table = "long_articles_read_rate"
|
|
|
|
|
@@ -37,75 +44,7 @@ def filter_outlier_data(group, key='show_view_count'):
|
|
|
return filtered_group
|
|
|
|
|
|
|
|
|
-def get_account_fans_by_dt(db_client) -> dict:
|
|
|
- """
|
|
|
- 获取每个账号发粉丝,通过日期来区分
|
|
|
- :return:
|
|
|
- """
|
|
|
- sql = f"""
|
|
|
- SELECT
|
|
|
- t1.date_str,
|
|
|
- t1.fans_count,
|
|
|
- t2.gh_id
|
|
|
- FROM datastat_wx t1
|
|
|
- JOIN publish_account t2 ON t1.account_id = t2.id
|
|
|
- WHERE
|
|
|
- t2.channel = 5
|
|
|
- AND t2.status = 1
|
|
|
- AND t1.date_str >= '2024-07-01'
|
|
|
- ORDER BY t1.date_str;
|
|
|
- """
|
|
|
- result = db_client.select(sql)
|
|
|
- D = {}
|
|
|
- for line in result:
|
|
|
- dt = line[0]
|
|
|
- fans = line[1]
|
|
|
- gh_id = line[2]
|
|
|
- if D.get(gh_id):
|
|
|
- D[gh_id][dt] = fans
|
|
|
- else:
|
|
|
- D[gh_id] = {dt: fans}
|
|
|
- return D
|
|
|
-
|
|
|
-
|
|
|
-def get_publishing_accounts(db_client) -> list[dict]:
|
|
|
- """
|
|
|
- 获取每日正在发布的账号
|
|
|
- :return:
|
|
|
- """
|
|
|
- sql = f"""
|
|
|
- SELECT DISTINCT
|
|
|
- t3.`name`,
|
|
|
- t3.gh_id,
|
|
|
- t3.follower_count,
|
|
|
- t6.account_source_name,
|
|
|
- t6.mode_type,
|
|
|
- t6.account_type,
|
|
|
- t6.`status`
|
|
|
- FROM
|
|
|
- publish_plan t1
|
|
|
- JOIN publish_plan_account t2 ON t1.id = t2.plan_id
|
|
|
- JOIN publish_account t3 ON t2.account_id = t3.id
|
|
|
- LEFT JOIN publish_account_wx_type t4 on t3.id = t4.account_id
|
|
|
- LEFT JOIN wx_statistics_group_source_account t5 on t3.id = t5.account_id
|
|
|
- LEFT JOIN wx_statistics_group_source t6 on t5.group_source_name = t6.account_source_name
|
|
|
- WHERE
|
|
|
- t1.plan_status = 1
|
|
|
- AND t3.channel = 5
|
|
|
- -- AND t3.follower_count > 0
|
|
|
- GROUP BY t3.id;
|
|
|
- """
|
|
|
- account_list = db_client.select(sql)
|
|
|
- result_list = [
|
|
|
- {
|
|
|
- "account_name": i[0],
|
|
|
- "gh_id": i[1]
|
|
|
- } for i in account_list
|
|
|
- ]
|
|
|
- return result_list
|
|
|
-
|
|
|
-
|
|
|
-def get_account_articles_detail(db_client, gh_id_tuple) -> list[dict]:
|
|
|
+def get_account_articles_detail(db_client, gh_id_tuple, min_publish_timestamp) -> list[dict]:
|
|
|
"""
|
|
|
get articles details
|
|
|
:return:
|
|
@@ -116,47 +55,37 @@ def get_account_articles_detail(db_client, gh_id_tuple) -> list[dict]:
|
|
|
FROM
|
|
|
official_articles_v2
|
|
|
WHERE
|
|
|
- ghId IN {gh_id_tuple} and Type = '{const.BULK_PUBLISH_TYPE}';
|
|
|
+ ghId IN {gh_id_tuple} and Type = '{const.BULK_PUBLISH_TYPE}' and publish_timestamp >= {min_publish_timestamp};
|
|
|
"""
|
|
|
- result = db_client.select(sql)
|
|
|
- response_list = [
|
|
|
- {
|
|
|
- "ghId": i[0],
|
|
|
- "accountName": i[1],
|
|
|
- "ItemIndex": i[2],
|
|
|
- "show_view_count": i[3],
|
|
|
- "publish_timestamp": i[4]
|
|
|
- }
|
|
|
- for i in result
|
|
|
- ]
|
|
|
+ response_list = db_client.fetch(query=sql, cursor_type=DictCursor)
|
|
|
return response_list
|
|
|
|
|
|
|
|
|
-def cal_account_read_rate(gh_id_tuple) -> DataFrame:
|
|
|
+def cal_account_read_rate(article_list, fans_dict) -> DataFrame:
|
|
|
"""
|
|
|
计算账号位置的阅读率
|
|
|
:return:
|
|
|
"""
|
|
|
- pq_db = PQMySQL()
|
|
|
- de_db = DeNetMysql()
|
|
|
response = []
|
|
|
- fans_dict_each_day = get_account_fans_by_dt(db_client=de_db)
|
|
|
- account_article_detail = get_account_articles_detail(
|
|
|
- db_client=pq_db,
|
|
|
- gh_id_tuple=gh_id_tuple
|
|
|
- )
|
|
|
- for line in account_article_detail:
|
|
|
+ for line in article_list:
|
|
|
gh_id = line['ghId']
|
|
|
dt = functions.timestamp_to_str(timestamp=line['publish_timestamp'], string_format='%Y-%m-%d')
|
|
|
- fans = fans_dict_each_day.get(gh_id, {}).get(dt, 0)
|
|
|
+ fans = fans_dict.get(gh_id, {}).get(dt, const.DEFAULT_FANS)
|
|
|
+ if not fans:
|
|
|
+ fans = int(unauthorized_account.get(gh_id, const.DEFAULT_FANS))
|
|
|
if not fans:
|
|
|
- fans = int(unauthorized_account.get(gh_id, 0))
|
|
|
+ fans = int(backup_account_fans.get(gh_id, const.DEFAULT_FANS))
|
|
|
+ log(
|
|
|
+ task='cal_read_rate_avg_task',
|
|
|
+ function='cal_account_read_rate',
|
|
|
+ message='未获取到粉丝,使用备份粉丝表',
|
|
|
+ data=line
|
|
|
+ )
|
|
|
line['fans'] = fans
|
|
|
- if fans > 1000:
|
|
|
+ if fans > const.MIN_FANS:
|
|
|
line['readRate'] = line['show_view_count'] / fans if fans else 0
|
|
|
response.append(line)
|
|
|
- return DataFrame(response,
|
|
|
- columns=['ghId', 'accountName', 'ItemIndex', 'show_view_count', 'publish_timestamp', 'readRate'])
|
|
|
+ return DataFrame(response, columns=['ghId', 'accountName', 'ItemIndex', 'show_view_count', 'publish_timestamp', 'readRate'])
|
|
|
|
|
|
|
|
|
def cal_avg_account_read_rate(df, gh_id, index, dt) -> dict:
|
|
@@ -168,7 +97,7 @@ def cal_avg_account_read_rate(df, gh_id, index, dt) -> dict:
|
|
|
min_time = max_time - const.STATISTICS_PERIOD
|
|
|
|
|
|
# 通过
|
|
|
- filterDataFrame = df[
|
|
|
+ filter_dataframe = df[
|
|
|
(df["ghId"] == gh_id)
|
|
|
& (min_time <= df["publish_timestamp"])
|
|
|
& (df["publish_timestamp"] <= max_time)
|
|
@@ -176,13 +105,13 @@ def cal_avg_account_read_rate(df, gh_id, index, dt) -> dict:
|
|
|
]
|
|
|
|
|
|
# 用二倍标准差过滤
|
|
|
- finalDF = filter_outlier_data(filterDataFrame)
|
|
|
+ final_dataframe = filter_outlier_data(filter_dataframe)
|
|
|
|
|
|
return {
|
|
|
- "read_rate_avg": finalDF['readRate'].mean(),
|
|
|
- "max_publish_time": finalDF['publish_timestamp'].max(),
|
|
|
- "min_publish_time": finalDF['publish_timestamp'].min(),
|
|
|
- "records": len(finalDF)
|
|
|
+ "read_rate_avg": final_dataframe['readRate'].mean(),
|
|
|
+ "max_publish_time": final_dataframe['publish_timestamp'].max(),
|
|
|
+ "min_publish_time": final_dataframe['publish_timestamp'].min(),
|
|
|
+ "records": len(final_dataframe)
|
|
|
}
|
|
|
|
|
|
|
|
@@ -204,7 +133,7 @@ def check_each_position(db_client, gh_id, index, dt, avg_rate) -> dict:
|
|
|
WHERE gh_id = '{gh_id}' and position = {index} and dt_version < {dt}
|
|
|
ORDER BY dt_version DESC limit 1;
|
|
|
"""
|
|
|
- result = db_client.select(select_sql)
|
|
|
+ result = db_client.fetch(select_sql)
|
|
|
if result:
|
|
|
account_name = result[0][0]
|
|
|
previous_read_rate_avg = result[0][1]
|
|
@@ -246,6 +175,9 @@ def update_single_day(dt, account_list, article_df, lam):
|
|
|
string_format='%Y-%m-%d'
|
|
|
)
|
|
|
|
|
|
+ # processed_account_set
|
|
|
+ processed_account_set = set()
|
|
|
+
|
|
|
for account in tqdm(account_list, desc=dt):
|
|
|
for index in const.ARTICLE_INDEX_LIST:
|
|
|
read_rate_detail = cal_avg_account_read_rate(
|
|
@@ -259,7 +191,9 @@ def update_single_day(dt, account_list, article_df, lam):
|
|
|
min_publish_time = read_rate_detail['min_publish_time']
|
|
|
articles_count = read_rate_detail['records']
|
|
|
if articles_count:
|
|
|
- if index in {1, 2}:
|
|
|
+ processed_account_set.add(account['gh_id'])
|
|
|
+ # check read rate in position 1 and 2
|
|
|
+ if index in [1, 2]:
|
|
|
error_obj = check_each_position(
|
|
|
db_client=lam,
|
|
|
gh_id=account['gh_id'],
|
|
@@ -269,6 +203,7 @@ def update_single_day(dt, account_list, article_df, lam):
|
|
|
)
|
|
|
if error_obj:
|
|
|
error_list.append(error_obj)
|
|
|
+ # insert into database
|
|
|
try:
|
|
|
if not read_rate_avg:
|
|
|
continue
|
|
@@ -278,8 +213,8 @@ def update_single_day(dt, account_list, article_df, lam):
|
|
|
values
|
|
|
(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
|
|
|
"""
|
|
|
- lam.update(
|
|
|
- sql=insert_sql,
|
|
|
+ lam.save(
|
|
|
+ query=insert_sql,
|
|
|
params=(
|
|
|
account['account_name'],
|
|
|
account['gh_id'],
|
|
@@ -294,14 +229,17 @@ def update_single_day(dt, account_list, article_df, lam):
|
|
|
)
|
|
|
)
|
|
|
except Exception as e:
|
|
|
+ print(e)
|
|
|
insert_error_list.append(str(e))
|
|
|
|
|
|
+ # bot sql error
|
|
|
if insert_error_list:
|
|
|
bot(
|
|
|
title="更新阅读率均值,存在sql 插入失败",
|
|
|
detail=insert_error_list
|
|
|
)
|
|
|
|
|
|
+ # bot outliers
|
|
|
if error_list:
|
|
|
columns = [
|
|
|
create_feishu_columns_sheet(sheet_type="plain_text", sheet_name="account_name", display_name="账号名称"),
|
|
@@ -314,7 +252,7 @@ def update_single_day(dt, account_list, article_df, lam):
|
|
|
display_name="相对变化率")
|
|
|
]
|
|
|
bot(
|
|
|
- title="更新阅读率均值,头次出现异常值通知",
|
|
|
+ title="阅读率均值表异常信息, 总共处理{}个账号".format(len(processed_account_set)),
|
|
|
detail={
|
|
|
"columns": columns,
|
|
|
"rows": error_list
|
|
@@ -323,12 +261,14 @@ def update_single_day(dt, account_list, article_df, lam):
|
|
|
mention=False
|
|
|
)
|
|
|
|
|
|
+ # if no error, send success info
|
|
|
if not error_list and not insert_error_list:
|
|
|
bot(
|
|
|
- title="阅读率均值表,更新成功",
|
|
|
+ title="阅读率均值表更新成功, 总共处理{}个账号".format(len(processed_account_set)),
|
|
|
detail={
|
|
|
"日期": dt
|
|
|
- }
|
|
|
+ },
|
|
|
+ mention=False
|
|
|
)
|
|
|
|
|
|
|
|
@@ -347,12 +287,36 @@ def main() -> None:
|
|
|
else:
|
|
|
dt = datetime.today().strftime('%Y-%m-%d')
|
|
|
|
|
|
- lam = longArticlesMySQL()
|
|
|
- de = DeNetMysql()
|
|
|
- account_list = get_publishing_accounts(db_client=de)
|
|
|
- df = cal_account_read_rate(tuple([i['gh_id'] for i in account_list]))
|
|
|
+ # init stat period
|
|
|
+ max_time = functions.str_to_timestamp(date_string=dt)
|
|
|
+ min_time = max_time - const.STATISTICS_PERIOD
|
|
|
+ min_stat_date = functions.timestamp_to_str(timestamp=min_time, string_format='%Y-%m-%d')
|
|
|
+
|
|
|
+ # init database connector
|
|
|
+ long_articles_db_client = DatabaseConnector(db_config=long_articles_config)
|
|
|
+ long_articles_db_client.connect()
|
|
|
+
|
|
|
+ piaoquan_crawler_db_client = DatabaseConnector(db_config=piaoquan_crawler_config)
|
|
|
+ piaoquan_crawler_db_client.connect()
|
|
|
+
|
|
|
+ denet_db_client = DatabaseConnector(db_config=denet_config)
|
|
|
+ denet_db_client.connect()
|
|
|
+
|
|
|
+ # get account list
|
|
|
+ account_list = fetch_publishing_account_list(db_client=denet_db_client)
|
|
|
+
|
|
|
+ # get fans dict
|
|
|
+ fans_dict = fetch_account_fans(db_client=denet_db_client, start_date=min_stat_date)
|
|
|
+
|
|
|
+ # get data frame from official_articles_v2
|
|
|
+ gh_id_tuple = tuple([i['gh_id'] for i in account_list])
|
|
|
+ article_list = get_account_articles_detail(db_client=piaoquan_crawler_db_client, gh_id_tuple=gh_id_tuple, min_publish_timestamp=min_time)
|
|
|
+
|
|
|
+ # cal account read rate and make a dataframe
|
|
|
+ read_rate_dataframe = cal_account_read_rate(article_list, fans_dict)
|
|
|
|
|
|
- update_single_day(dt, account_list, df, lam)
|
|
|
+ # update each day's data
|
|
|
+ update_single_day(dt, account_list, read_rate_dataframe, long_articles_db_client)
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|