Przeglądaj źródła

developing read rate improve

luojunhui 1 miesiąc temu
rodzic
commit
8eb7fa22c0

+ 2 - 0
applications/utils/__init__.py

@@ -10,3 +10,5 @@ from .download_video import download_toutiao_video
 from .item import Item
 from .save_to_db import insert_into_single_video_source_table
 from .upload import upload_to_oss
+from .fetch_info_from_aigc import fetch_account_fans
+from .fetch_info_from_aigc import fetch_publishing_account_list

+ 58 - 0
applications/utils/fetch_info_from_aigc.py

@@ -0,0 +1,58 @@
+"""
+fetch info from aigc database system
+"""
+from collections import defaultdict
+from typing import List, Dict
+
+from pymysql.cursors import DictCursor
+
+
+def fetch_publishing_account_list(db_client) -> List[Dict]:
+    """
+    fetch account_list from aigc database
+    """
+    fetch_sql = f"""
+        SELECT DISTINCT
+            t3.`name` as account_name,
+            t3.gh_id as gh_id,
+            t3.follower_count as fans,
+            t6.account_source_name as account_source,
+            t6.mode_type as mode_type,
+            t6.account_type as account_type,
+            t6.`status` as status
+        FROM
+            publish_plan t1
+            JOIN publish_plan_account t2 ON t1.id = t2.plan_id
+            JOIN publish_account t3 ON t2.account_id = t3.id
+            LEFT JOIN publish_account_wx_type t4 on t3.id = t4.account_id
+            LEFT JOIN wx_statistics_group_source_account t5 on t3.id = t5.account_id
+            LEFT JOIN wx_statistics_group_source t6 on t5.group_source_name = t6.account_source_name
+        WHERE
+            t1.plan_status = 1
+            AND t3.channel = 5
+            GROUP BY t3.id;
+    """
+    account_list = db_client.fetch(
+        query=fetch_sql,
+        cursor_type=DictCursor
+    )
+    return account_list
+
+def fetch_account_fans(db_client, start_date: str) -> Dict:
+    """
+    fetch account fans from aigc database
+    """
+    sql = f"""
+        SELECT t1.date_str, t1.fans_count, t2.gh_id
+        FROM datastat_wx t1 JOIN publish_account t2 ON t1.account_id = t2.id
+        WHERE t2.channel = 5
+        AND t1.fans_count > 0
+        AND t1.date_str >= '{start_date}' 
+        ORDER BY t1.date_str;
+        """
+    result = db_client.fetch(sql)
+    fans_dict = defaultdict(dict)
+    for dt, fans, gh_id in result:
+        fans_dict.setdefault(gh_id, {})[dt] = fans
+    return fans_dict
+

+ 91 - 141
cal_account_read_rate_avg_daily.py

@@ -7,16 +7,23 @@ from tqdm import tqdm
 from pandas import DataFrame
 from argparse import ArgumentParser
 from datetime import datetime
+from pymysql.cursors import DictCursor
 
-from applications import DeNetMysql, PQMySQL, longArticlesMySQL, bot, Functions, create_feishu_columns_sheet
+from applications import bot, Functions
+from applications import create_feishu_columns_sheet
+from applications.db import DatabaseConnector
 from applications.const import updateAccountReadRateTaskConst
-from config import apolloConfig
+from applications.utils import fetch_publishing_account_list
+from applications.utils import fetch_account_fans
+from config import apolloConfig, long_articles_config, piaoquan_crawler_config, denet_config
+
 
 const = updateAccountReadRateTaskConst()
 config = apolloConfig()
 unauthorized_account = json.loads(config.getConfigValue("unauthorized_gh_id_fans"))
+backup_account_detail = json.loads(config.getConfigValue("backup_account_detail"))
 functions = Functions()
-read_rate_table = "long_articles_read_rate"
+read_rate_table = "long_articles_read_rate_dev"
 
 
 def filter_outlier_data(group, key='show_view_count'):
@@ -37,75 +44,7 @@ def filter_outlier_data(group, key='show_view_count'):
     return filtered_group
 
 
-def get_account_fans_by_dt(db_client) -> dict:
-    """
-    获取每个账号发粉丝,通过日期来区分
-    :return:
-    """
-    sql = f"""
-        SELECT 
-            t1.date_str, 
-            t1.fans_count, 
-            t2.gh_id
-        FROM datastat_wx t1
-        JOIN publish_account t2 ON t1.account_id = t2.id
-        WHERE 
-            t2.channel = 5 
-        AND t2.status = 1 
-        AND t1.date_str >= '2024-07-01' 
-        ORDER BY t1.date_str;
-    """
-    result = db_client.select(sql)
-    D = {}
-    for line in result:
-        dt = line[0]
-        fans = line[1]
-        gh_id = line[2]
-        if D.get(gh_id):
-            D[gh_id][dt] = fans
-        else:
-            D[gh_id] = {dt: fans}
-    return D
-
-
-def get_publishing_accounts(db_client) -> list[dict]:
-    """
-    获取每日正在发布的账号
-    :return:
-    """
-    sql = f"""
-    SELECT DISTINCT
-        t3.`name`,
-        t3.gh_id,
-        t3.follower_count,
-        t6.account_source_name,
-        t6.mode_type,
-        t6.account_type,
-        t6.`status`
-    FROM
-        publish_plan t1
-        JOIN publish_plan_account t2 ON t1.id = t2.plan_id
-        JOIN publish_account t3 ON t2.account_id = t3.id
-        LEFT JOIN publish_account_wx_type t4 on t3.id = t4.account_id
-        LEFT JOIN wx_statistics_group_source_account t5 on t3.id = t5.account_id
-        LEFT JOIN wx_statistics_group_source t6 on t5.group_source_name = t6.account_source_name
-    WHERE
-        t1.plan_status = 1
-        AND t3.channel = 5
-        -- AND t3.follower_count > 0
-        GROUP BY t3.id;
-    """
-    account_list = db_client.select(sql)
-    result_list = [
-        {
-            "account_name": i[0],
-            "gh_id": i[1]
-        } for i in account_list
-    ]
-    return result_list
-
-
-def get_account_articles_detail(db_client, gh_id_tuple) -> list[dict]:
+def get_account_articles_detail(db_client, gh_id_tuple, min_publish_timestamp) -> list[dict]:
     """
     get articles details
     :return:
@@ -116,47 +55,31 @@ def get_account_articles_detail(db_client, gh_id_tuple) -> list[dict]:
             FROM 
                 official_articles_v2
             WHERE 
-                ghId IN {gh_id_tuple} and Type = '{const.BULK_PUBLISH_TYPE}';
+                ghId IN {gh_id_tuple} and Type = '{const.BULK_PUBLISH_TYPE}' and publish_timestamp >= {min_publish_timestamp};
             """
-    result = db_client.select(sql)
-    response_list = [
-        {
-            "ghId": i[0],
-            "accountName": i[1],
-            "ItemIndex": i[2],
-            "show_view_count": i[3],
-            "publish_timestamp": i[4]
-        }
-        for i in result
-    ]
+    response_list = db_client.fetch(query=sql, cursor_type=DictCursor)
     return response_list
 
 
-def cal_account_read_rate(gh_id_tuple) -> DataFrame:
+def cal_account_read_rate(article_list, fans_dict) -> DataFrame:
     """
     计算账号位置的阅读率
     :return:
     """
-    pq_db = PQMySQL()
-    de_db = DeNetMysql()
     response = []
-    fans_dict_each_day = get_account_fans_by_dt(db_client=de_db)
-    account_article_detail = get_account_articles_detail(
-        db_client=pq_db,
-        gh_id_tuple=gh_id_tuple
-    )
-    for line in account_article_detail:
+    for line in article_list:
         gh_id = line['ghId']
         dt = functions.timestamp_to_str(timestamp=line['publish_timestamp'], string_format='%Y-%m-%d')
-        fans = fans_dict_each_day.get(gh_id, {}).get(dt, 0)
+        fans = fans_dict.get(gh_id, {}).get(dt, 0)
         if not fans:
             fans = int(unauthorized_account.get(gh_id, 0))
+        if not fans:
+            fans = int(backup_account_detail.get(gh_id, 0))
         line['fans'] = fans
         if fans > 1000:
             line['readRate'] = line['show_view_count'] / fans if fans else 0
             response.append(line)
-    return DataFrame(response,
-                     columns=['ghId', 'accountName', 'ItemIndex', 'show_view_count', 'publish_timestamp', 'readRate'])
+    return DataFrame(response, columns=['ghId', 'accountName', 'ItemIndex', 'show_view_count', 'publish_timestamp', 'readRate'])
 
 
 def cal_avg_account_read_rate(df, gh_id, index, dt) -> dict:
@@ -168,7 +91,7 @@ def cal_avg_account_read_rate(df, gh_id, index, dt) -> dict:
     min_time = max_time - const.STATISTICS_PERIOD
 
     # 通过
-    filterDataFrame = df[
+    filter_dataframe = df[
         (df["ghId"] == gh_id)
         & (min_time <= df["publish_timestamp"])
         & (df["publish_timestamp"] <= max_time)
@@ -176,13 +99,13 @@ def cal_avg_account_read_rate(df, gh_id, index, dt) -> dict:
         ]
 
     # 用二倍标准差过滤
-    finalDF = filter_outlier_data(filterDataFrame)
+    final_dataframe = filter_outlier_data(filter_dataframe)
 
     return {
-        "read_rate_avg": finalDF['readRate'].mean(),
-        "max_publish_time": finalDF['publish_timestamp'].max(),
-        "min_publish_time": finalDF['publish_timestamp'].min(),
-        "records": len(finalDF)
+        "read_rate_avg": final_dataframe['readRate'].mean(),
+        "max_publish_time": final_dataframe['publish_timestamp'].max(),
+        "min_publish_time": final_dataframe['publish_timestamp'].min(),
+        "records": len(final_dataframe)
     }
 
 
@@ -246,6 +169,7 @@ def update_single_day(dt, account_list, article_df, lam):
         string_format='%Y-%m-%d'
     )
 
+    process_account_cnt = 0
     for account in tqdm(account_list, desc=dt):
         for index in const.ARTICLE_INDEX_LIST:
             read_rate_detail = cal_avg_account_read_rate(
@@ -259,6 +183,7 @@ def update_single_day(dt, account_list, article_df, lam):
             min_publish_time = read_rate_detail['min_publish_time']
             articles_count = read_rate_detail['records']
             if articles_count:
+                process_account_cnt += 1
                 if index in {1, 2}:
                     error_obj = check_each_position(
                         db_client=lam,
@@ -296,40 +221,41 @@ def update_single_day(dt, account_list, article_df, lam):
                 except Exception as e:
                     insert_error_list.append(str(e))
 
-    if insert_error_list:
-        bot(
-            title="更新阅读率均值,存在sql 插入失败",
-            detail=insert_error_list
-        )
-
-    if error_list:
-        columns = [
-            create_feishu_columns_sheet(sheet_type="plain_text", sheet_name="account_name", display_name="账号名称"),
-            create_feishu_columns_sheet(sheet_type="plain_text", sheet_name="position", display_name="文章位置"),
-            create_feishu_columns_sheet(sheet_type="plain_text", sheet_name="read_rate_avg_yesterday",
-                                        display_name="昨日阅读率均值"),
-            create_feishu_columns_sheet(sheet_type="plain_text", sheet_name="read_rate_avg_the_day_before_yesterday",
-                                        display_name="前天阅读率均值"),
-            create_feishu_columns_sheet(sheet_type="options", sheet_name="relative_change_rate",
-                                        display_name="相对变化率")
-        ]
-        bot(
-            title="更新阅读率均值,头次出现异常值通知",
-            detail={
-                "columns": columns,
-                "rows": error_list
-            },
-            table=True,
-            mention=False
-        )
-
-    if not error_list and not insert_error_list:
-        bot(
-            title="阅读率均值表,更新成功",
-            detail={
-                "日期": dt
-            }
-        )
+    print(process_account_cnt)
+    # if insert_error_list:
+    #     bot(
+    #         title="更新阅读率均值,存在sql 插入失败",
+    #         detail=insert_error_list
+    #     )
+    #
+    # if error_list:
+    #     columns = [
+    #         create_feishu_columns_sheet(sheet_type="plain_text", sheet_name="account_name", display_name="账号名称"),
+    #         create_feishu_columns_sheet(sheet_type="plain_text", sheet_name="position", display_name="文章位置"),
+    #         create_feishu_columns_sheet(sheet_type="plain_text", sheet_name="read_rate_avg_yesterday",
+    #                                     display_name="昨日阅读率均值"),
+    #         create_feishu_columns_sheet(sheet_type="plain_text", sheet_name="read_rate_avg_the_day_before_yesterday",
+    #                                     display_name="前天阅读率均值"),
+    #         create_feishu_columns_sheet(sheet_type="options", sheet_name="relative_change_rate",
+    #                                     display_name="相对变化率")
+    #     ]
+    #     bot(
+    #         title="更新阅读率均值,头次出现异常值通知",
+    #         detail={
+    #             "columns": columns,
+    #             "rows": error_list
+    #         },
+    #         table=True,
+    #         mention=False
+    #     )
+    #
+    # if not error_list and not insert_error_list:
+    #     bot(
+    #         title="阅读率均值表,更新成功",
+    #         detail={
+    #             "日期": dt
+    #         }
+    #     )
 
 
 def main() -> None:
@@ -347,12 +273,36 @@ def main() -> None:
     else:
         dt = datetime.today().strftime('%Y-%m-%d')
 
-    lam = longArticlesMySQL()
-    de = DeNetMysql()
-    account_list = get_publishing_accounts(db_client=de)
-    df = cal_account_read_rate(tuple([i['gh_id'] for i in account_list]))
+    # init stat period
+    max_time = functions.str_to_timestamp(date_string=dt)
+    min_time = max_time - const.STATISTICS_PERIOD
+    min_stat_date = functions.timestamp_to_str(timestamp=min_time, string_format='%Y-%m-%d')
+
+    # init database connector
+    long_articles_db_client = DatabaseConnector(db_config=long_articles_config)
+    long_articles_db_client.connect()
+
+    piaoquan_crawler_db_client = DatabaseConnector(db_config=piaoquan_crawler_config)
+    piaoquan_crawler_db_client.connect()
+
+    denet_db_client = DatabaseConnector(db_config=denet_config)
+    denet_db_client.connect()
+
+    # get account list
+    account_list = fetch_publishing_account_list(db_client=denet_db_client)
+
+    # get fans dict
+    fans_dict = fetch_account_fans(db_client=denet_db_client, start_date=min_stat_date)
+
+    # get data frame from official_articles_v2
+    gh_id_tuple = tuple([i['gh_id'] for i in account_list])
+    article_list = get_account_articles_detail(db_client=piaoquan_crawler_db_client, gh_id_tuple=gh_id_tuple, min_publish_timestamp=min_time)
+
+    # cal account read rate and make a dataframe
+    read_rate_dataframe = cal_account_read_rate(article_list, fans_dict)
 
-    update_single_day(dt, account_list, df, lam)
+    # update each day's data
+    update_single_day(dt, account_list, read_rate_dataframe, long_articles_db_client)
 
 
 if __name__ == '__main__':

+ 6 - 5
updateAccountV3.py

@@ -31,7 +31,8 @@ def get_account_fans_by_dt(db_client) -> dict:
         JOIN publish_account t2 ON t1.account_id = t2.id
         WHERE 
             t2.channel = 5 
-        AND t2.status = 1 
+#         AND t2.status = 1 
+        AND t1.fans_count > 0
         AND t1.date_str >= '2024-09-01' 
         ORDER BY t1.date_str;
     """
@@ -69,7 +70,7 @@ class UpdateAccountInfoVersion3(object):
             SELECT 
                 gh_id, position, read_rate_avg
             FROM
-                long_articles_read_rate
+                long_articles_read_rate_dev
             WHERE dt_version = {dt};
         """
 
@@ -144,7 +145,7 @@ class UpdateAccountInfoVersion3(object):
                         read_avg = fans * rate
                         print(rate, read_avg)
                         insert_sql = f"""
-                        INSERT INTO account_avg_info_v3
+                        INSERT INTO account_avg_info_v3_dev
                         (gh_id, position, update_time, account_name, fans, read_avg, like_avg, status, account_type, account_mode, account_source, account_status, business_type, read_rate_avg)
                         values
                         (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
@@ -171,7 +172,7 @@ class UpdateAccountInfoVersion3(object):
                             )
                         except Exception as e:
                             updateSQL = f"""
-                            UPDATE account_avg_info_v3
+                            UPDATE account_avg_info_v3_dev
                             set fans = %s, read_avg = %s, read_rate_avg = %s
                             where gh_id = %s and position = %s and update_time = %s
                             """
@@ -192,7 +193,7 @@ class UpdateAccountInfoVersion3(object):
 
                         # 修改前一天的状态为 0
                         update_status_sql = f"""
-                        UPDATE account_avg_info_v3
+                        UPDATE account_avg_info_v3_dev
                         SET status = %s
                         where update_time != %s and gh_id = %s and position = %s;
                         """