Jelajahi Sumber

Merge branch '2025-03-17-luojunhui-cal-read-rate' of luojunhui/LongArticlesJob into master

luojunhui 3 bulan lalu
induk
melakukan
6a36a605fa

+ 21 - 2
applications/const/__init__.py

@@ -88,7 +88,7 @@ class updatePublishedMsgTaskConst:
     SUBSCRIBE_FAIL_RATE_THRESHOLD = 0.3
 
 
-class updateAccountReadRateTaskConst:
+class UpdateAccountReadRateTaskConst:
     """
     更新账号阅读率常量配置
     """
@@ -104,8 +104,14 @@ class updateAccountReadRateTaskConst:
     # 文章位置
     ARTICLE_INDEX_LIST = [1, 2, 3, 4, 5, 6, 7, 8]
 
+    # 默认粉丝
+    DEFAULT_FANS = 0
 
-class updateAccountReadAvgTaskConst:
+    # 最低粉丝量
+    MIN_FANS = 1000
+
+
+class UpdateAccountReadAvgTaskConst:
     """
     更新账号阅读均值常量配置
     """
@@ -124,6 +130,19 @@ class updateAccountReadAvgTaskConst:
     ARTICLES_DAILY = 1
     TOULIU = 2
 
+    # 默认粉丝
+    DEFAULT_FANS = 0
+
+    # index list
+    ARTICLE_INDEX_LIST = [1, 2, 3, 4, 5, 6, 7, 8]
+
+    # 默认点赞
+    DEFAULT_LIKE = 0
+
+    # 状态
+    USING_STATUS = 1
+    NOT_USING_STATUS = 0
+
 
 class WeixinVideoCrawlerConst:
     """

+ 2 - 0
applications/utils/__init__.py

@@ -10,3 +10,5 @@ from .download_video import download_toutiao_video
 from .item import Item
 from .save_to_db import insert_into_single_video_source_table
 from .upload import upload_to_oss
+from .fetch_info_from_aigc import fetch_account_fans
+from .fetch_info_from_aigc import fetch_publishing_account_list

+ 58 - 0
applications/utils/fetch_info_from_aigc.py

@@ -0,0 +1,58 @@
+"""
+fetch info from aigc database system
+"""
+from collections import defaultdict
+from typing import List, Dict
+
+from pymysql.cursors import DictCursor
+
+
+def fetch_publishing_account_list(db_client) -> List[Dict]:
+    """
+    fetch account_list from aigc database
+    """
+    fetch_sql = f"""
+        SELECT DISTINCT
+            t3.`name` as account_name,
+            t3.gh_id as gh_id,
+            t3.follower_count as fans,
+            t6.account_source_name as account_source,
+            t6.mode_type as mode_type,
+            t6.account_type as account_type,
+            t6.`status` as status
+        FROM
+            publish_plan t1
+            JOIN publish_plan_account t2 ON t1.id = t2.plan_id
+            JOIN publish_account t3 ON t2.account_id = t3.id
+            LEFT JOIN publish_account_wx_type t4 on t3.id = t4.account_id
+            LEFT JOIN wx_statistics_group_source_account t5 on t3.id = t5.account_id
+            LEFT JOIN wx_statistics_group_source t6 on t5.group_source_name = t6.account_source_name
+        WHERE
+            t1.plan_status = 1
+            AND t3.channel = 5
+            GROUP BY t3.id;
+    """
+    account_list = db_client.fetch(
+        query=fetch_sql,
+        cursor_type=DictCursor
+    )
+    return account_list
+
+def fetch_account_fans(db_client, start_date: str) -> Dict:
+    """
+    fetch account fans from aigc database
+    """
+    sql = f"""
+        SELECT t1.date_str, t1.fans_count, t2.gh_id
+        FROM datastat_wx t1 JOIN publish_account t2 ON t1.account_id = t2.id
+        WHERE t2.channel = 5
+            AND t2.status = 1 
+            AND t1.date_str >= '{start_date}' 
+        ORDER BY t1.date_str;
+        """
+    result = db_client.fetch(sql)
+    fans_dict = defaultdict(dict)
+    for dt, fans, gh_id in result:
+        fans_dict.setdefault(gh_id, {})[dt] = fans
+    return fans_dict
+

+ 81 - 117
cal_account_read_rate_avg_daily.py

@@ -7,14 +7,21 @@ from tqdm import tqdm
 from pandas import DataFrame
 from argparse import ArgumentParser
 from datetime import datetime
+from pymysql.cursors import DictCursor
 
-from applications import DeNetMysql, PQMySQL, longArticlesMySQL, bot, Functions, create_feishu_columns_sheet
-from applications.const import updateAccountReadRateTaskConst
-from config import apolloConfig
+from applications import bot, Functions, log
+from applications import create_feishu_columns_sheet
+from applications.db import DatabaseConnector
+from applications.const import UpdateAccountReadRateTaskConst
+from applications.utils import fetch_publishing_account_list
+from applications.utils import fetch_account_fans
+from config import apolloConfig, long_articles_config, piaoquan_crawler_config, denet_config
 
-const = updateAccountReadRateTaskConst()
+
+const = UpdateAccountReadRateTaskConst()
 config = apolloConfig()
 unauthorized_account = json.loads(config.getConfigValue("unauthorized_gh_id_fans"))
+backup_account_fans = json.loads(config.getConfigValue("backup_account_fans"))
 functions = Functions()
 read_rate_table = "long_articles_read_rate"
 
@@ -37,75 +44,7 @@ def filter_outlier_data(group, key='show_view_count'):
     return filtered_group
 
 
-def get_account_fans_by_dt(db_client) -> dict:
-    """
-    获取每个账号发粉丝,通过日期来区分
-    :return:
-    """
-    sql = f"""
-        SELECT 
-            t1.date_str, 
-            t1.fans_count, 
-            t2.gh_id
-        FROM datastat_wx t1
-        JOIN publish_account t2 ON t1.account_id = t2.id
-        WHERE 
-            t2.channel = 5 
-        AND t2.status = 1 
-        AND t1.date_str >= '2024-07-01' 
-        ORDER BY t1.date_str;
-    """
-    result = db_client.select(sql)
-    D = {}
-    for line in result:
-        dt = line[0]
-        fans = line[1]
-        gh_id = line[2]
-        if D.get(gh_id):
-            D[gh_id][dt] = fans
-        else:
-            D[gh_id] = {dt: fans}
-    return D
-
-
-def get_publishing_accounts(db_client) -> list[dict]:
-    """
-    获取每日正在发布的账号
-    :return:
-    """
-    sql = f"""
-    SELECT DISTINCT
-        t3.`name`,
-        t3.gh_id,
-        t3.follower_count,
-        t6.account_source_name,
-        t6.mode_type,
-        t6.account_type,
-        t6.`status`
-    FROM
-        publish_plan t1
-        JOIN publish_plan_account t2 ON t1.id = t2.plan_id
-        JOIN publish_account t3 ON t2.account_id = t3.id
-        LEFT JOIN publish_account_wx_type t4 on t3.id = t4.account_id
-        LEFT JOIN wx_statistics_group_source_account t5 on t3.id = t5.account_id
-        LEFT JOIN wx_statistics_group_source t6 on t5.group_source_name = t6.account_source_name
-    WHERE
-        t1.plan_status = 1
-        AND t3.channel = 5
-        -- AND t3.follower_count > 0
-        GROUP BY t3.id;
-    """
-    account_list = db_client.select(sql)
-    result_list = [
-        {
-            "account_name": i[0],
-            "gh_id": i[1]
-        } for i in account_list
-    ]
-    return result_list
-
-
-def get_account_articles_detail(db_client, gh_id_tuple) -> list[dict]:
+def get_account_articles_detail(db_client, gh_id_tuple, min_publish_timestamp) -> list[dict]:
     """
     get articles details
     :return:
@@ -116,47 +55,37 @@ def get_account_articles_detail(db_client, gh_id_tuple) -> list[dict]:
             FROM 
                 official_articles_v2
             WHERE 
-                ghId IN {gh_id_tuple} and Type = '{const.BULK_PUBLISH_TYPE}';
+                ghId IN {gh_id_tuple} and Type = '{const.BULK_PUBLISH_TYPE}' and publish_timestamp >= {min_publish_timestamp};
             """
-    result = db_client.select(sql)
-    response_list = [
-        {
-            "ghId": i[0],
-            "accountName": i[1],
-            "ItemIndex": i[2],
-            "show_view_count": i[3],
-            "publish_timestamp": i[4]
-        }
-        for i in result
-    ]
+    response_list = db_client.fetch(query=sql, cursor_type=DictCursor)
     return response_list
 
 
-def cal_account_read_rate(gh_id_tuple) -> DataFrame:
+def cal_account_read_rate(article_list, fans_dict) -> DataFrame:
     """
     计算账号位置的阅读率
     :return:
     """
-    pq_db = PQMySQL()
-    de_db = DeNetMysql()
     response = []
-    fans_dict_each_day = get_account_fans_by_dt(db_client=de_db)
-    account_article_detail = get_account_articles_detail(
-        db_client=pq_db,
-        gh_id_tuple=gh_id_tuple
-    )
-    for line in account_article_detail:
+    for line in article_list:
         gh_id = line['ghId']
         dt = functions.timestamp_to_str(timestamp=line['publish_timestamp'], string_format='%Y-%m-%d')
-        fans = fans_dict_each_day.get(gh_id, {}).get(dt, 0)
+        fans = fans_dict.get(gh_id, {}).get(dt, const.DEFAULT_FANS)
+        if not fans:
+            fans = int(unauthorized_account.get(gh_id, const.DEFAULT_FANS))
         if not fans:
-            fans = int(unauthorized_account.get(gh_id, 0))
+            fans = int(backup_account_fans.get(gh_id, const.DEFAULT_FANS))
+            log(
+                task='cal_read_rate_avg_task',
+                function='cal_account_read_rate',
+                message='未获取到粉丝,使用备份粉丝表',
+                data=line
+            )
         line['fans'] = fans
-        if fans > 1000:
+        if fans > const.MIN_FANS:
             line['readRate'] = line['show_view_count'] / fans if fans else 0
             response.append(line)
-    return DataFrame(response,
-                     columns=['ghId', 'accountName', 'ItemIndex', 'show_view_count', 'publish_timestamp', 'readRate'])
+    return DataFrame(response, columns=['ghId', 'accountName', 'ItemIndex', 'show_view_count', 'publish_timestamp', 'readRate'])
 
 
 def cal_avg_account_read_rate(df, gh_id, index, dt) -> dict:
@@ -168,7 +97,7 @@ def cal_avg_account_read_rate(df, gh_id, index, dt) -> dict:
     min_time = max_time - const.STATISTICS_PERIOD
 
     # 通过
-    filterDataFrame = df[
+    filter_dataframe = df[
         (df["ghId"] == gh_id)
         & (min_time <= df["publish_timestamp"])
         & (df["publish_timestamp"] <= max_time)
@@ -176,13 +105,13 @@ def cal_avg_account_read_rate(df, gh_id, index, dt) -> dict:
         ]
 
     # 用二倍标准差过滤
-    finalDF = filter_outlier_data(filterDataFrame)
+    final_dataframe = filter_outlier_data(filter_dataframe)
 
     return {
-        "read_rate_avg": finalDF['readRate'].mean(),
-        "max_publish_time": finalDF['publish_timestamp'].max(),
-        "min_publish_time": finalDF['publish_timestamp'].min(),
-        "records": len(finalDF)
+        "read_rate_avg": final_dataframe['readRate'].mean(),
+        "max_publish_time": final_dataframe['publish_timestamp'].max(),
+        "min_publish_time": final_dataframe['publish_timestamp'].min(),
+        "records": len(final_dataframe)
     }
 
 
@@ -204,7 +133,7 @@ def check_each_position(db_client, gh_id, index, dt, avg_rate) -> dict:
         WHERE gh_id = '{gh_id}' and position = {index} and dt_version < {dt}
         ORDER BY dt_version DESC limit 1;
     """
-    result = db_client.select(select_sql)
+    result = db_client.fetch(select_sql)
     if result:
         account_name = result[0][0]
         previous_read_rate_avg = result[0][1]
@@ -246,6 +175,9 @@ def update_single_day(dt, account_list, article_df, lam):
         string_format='%Y-%m-%d'
     )
 
+    # processed_account_set
+    processed_account_set = set()
+
     for account in tqdm(account_list, desc=dt):
         for index in const.ARTICLE_INDEX_LIST:
             read_rate_detail = cal_avg_account_read_rate(
@@ -259,7 +191,9 @@ def update_single_day(dt, account_list, article_df, lam):
             min_publish_time = read_rate_detail['min_publish_time']
             articles_count = read_rate_detail['records']
             if articles_count:
-                if index in {1, 2}:
+                processed_account_set.add(account['gh_id'])
+                # check read rate in position 1 and 2
+                if index in [1, 2]:
                     error_obj = check_each_position(
                         db_client=lam,
                         gh_id=account['gh_id'],
@@ -269,6 +203,7 @@ def update_single_day(dt, account_list, article_df, lam):
                     )
                     if error_obj:
                         error_list.append(error_obj)
+                # insert into database
                 try:
                     if not read_rate_avg:
                         continue
@@ -278,8 +213,8 @@ def update_single_day(dt, account_list, article_df, lam):
                         values
                         (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
                     """
-                    lam.update(
-                        sql=insert_sql,
+                    lam.save(
+                        query=insert_sql,
                         params=(
                             account['account_name'],
                             account['gh_id'],
@@ -294,14 +229,17 @@ def update_single_day(dt, account_list, article_df, lam):
                         )
                     )
                 except Exception as e:
+                    print(e)
                     insert_error_list.append(str(e))
 
+    # bot sql error
     if insert_error_list:
         bot(
             title="更新阅读率均值,存在sql 插入失败",
             detail=insert_error_list
         )
 
+    # bot outliers
     if error_list:
         columns = [
             create_feishu_columns_sheet(sheet_type="plain_text", sheet_name="account_name", display_name="账号名称"),
@@ -314,7 +252,7 @@ def update_single_day(dt, account_list, article_df, lam):
                                         display_name="相对变化率")
         ]
         bot(
-            title="更新阅读率均值,头次出现异常值通知",
+            title="阅读率均值表异常信息, 总共处理{}个账号".format(len(processed_account_set)),
             detail={
                 "columns": columns,
                 "rows": error_list
@@ -323,12 +261,14 @@ def update_single_day(dt, account_list, article_df, lam):
             mention=False
         )
 
+    # if no error, send success info
     if not error_list and not insert_error_list:
         bot(
-            title="阅读率均值表更新成功",
+            title="阅读率均值表更新成功, 总共处理{}个账号".format(len(processed_account_set)),
             detail={
                 "日期": dt
-            }
+            },
+            mention=False
         )
 
 
@@ -347,12 +287,36 @@ def main() -> None:
     else:
         dt = datetime.today().strftime('%Y-%m-%d')
 
-    lam = longArticlesMySQL()
-    de = DeNetMysql()
-    account_list = get_publishing_accounts(db_client=de)
-    df = cal_account_read_rate(tuple([i['gh_id'] for i in account_list]))
+    # init stat period
+    max_time = functions.str_to_timestamp(date_string=dt)
+    min_time = max_time - const.STATISTICS_PERIOD
+    min_stat_date = functions.timestamp_to_str(timestamp=min_time, string_format='%Y-%m-%d')
+
+    # init database connector
+    long_articles_db_client = DatabaseConnector(db_config=long_articles_config)
+    long_articles_db_client.connect()
+
+    piaoquan_crawler_db_client = DatabaseConnector(db_config=piaoquan_crawler_config)
+    piaoquan_crawler_db_client.connect()
+
+    denet_db_client = DatabaseConnector(db_config=denet_config)
+    denet_db_client.connect()
+
+    # get account list
+    account_list = fetch_publishing_account_list(db_client=denet_db_client)
+
+    # get fans dict
+    fans_dict = fetch_account_fans(db_client=denet_db_client, start_date=min_stat_date)
+
+    # get data frame from official_articles_v2
+    gh_id_tuple = tuple([i['gh_id'] for i in account_list])
+    article_list = get_account_articles_detail(db_client=piaoquan_crawler_db_client, gh_id_tuple=gh_id_tuple, min_publish_timestamp=min_time)
+
+    # cal account read rate and make a dataframe
+    read_rate_dataframe = cal_account_read_rate(article_list, fans_dict)
 
-    update_single_day(dt, account_list, df, lam)
+    # update each day's data
+    update_single_day(dt, account_list, read_rate_dataframe, long_articles_db_client)
 
 
 if __name__ == '__main__':

+ 83 - 131
updateAccountV3.py

@@ -7,151 +7,104 @@ import time
 from tqdm import tqdm
 from datetime import datetime, timedelta
 from argparse import ArgumentParser
+from pymysql.cursors import DictCursor
 
-from applications import PQMySQL, DeNetMysql, longArticlesMySQL
-from applications.const import updateAccountReadAvgTaskConst
+from applications.const import UpdateAccountReadAvgTaskConst
+from applications.db import DatabaseConnector
+from applications.utils import fetch_account_fans
+from applications.utils import fetch_publishing_account_list
 from config import apolloConfig
+from config import long_articles_config, denet_config, piaoquan_crawler_config
 
+read_rate_table = "long_articles_read_rate"
+read_avg_table = "account_avg_info_v3"
 config = apolloConfig()
+const = UpdateAccountReadAvgTaskConst()
 unauthorized_account = json.loads(config.getConfigValue("unauthorized_gh_id_fans"))
 touliu_accounts = set(json.loads(config.getConfigValue("touliu_gh_id_list")))
-
-
-def get_account_fans_by_dt(db_client) -> dict:
-    """
-    获取每个账号发粉丝,通过日期来区分
-    :return:
-    """
-    sql = f"""
-        SELECT 
-            t1.date_str, 
-            t1.fans_count, 
-            t2.gh_id
-        FROM datastat_wx t1
-        JOIN publish_account t2 ON t1.account_id = t2.id
-        WHERE 
-            t2.channel = 5 
-        AND t2.status = 1 
-        AND t1.date_str >= '2024-09-01' 
-        ORDER BY t1.date_str;
-    """
-    result = db_client.select(sql)
-    D = {}
-    for line in result:
-        dt = line[0]
-        fans = line[1]
-        gh_id = line[2]
-        if D.get(gh_id):
-            D[gh_id][dt] = fans
-        else:
-            D[gh_id] = {dt: fans}
-    return D
-
+backup_account_fans = json.loads(config.getConfigValue("backup_account_fans"))
 
 class UpdateAccountInfoVersion3(object):
     """
-    更新账号信息 v3
+    更新账号的平均阅读率
     """
 
     def __init__(self):
-        self.const = updateAccountReadAvgTaskConst()
-        self.pq = PQMySQL()
-        self.de = DeNetMysql()
-        self.lam = longArticlesMySQL()
+        # init piaoquan crawler db client
+        self.piaoquan_crawler_db_client = DatabaseConnector(piaoquan_crawler_config)
+        self.piaoquan_crawler_db_client.connect()
+
+        # init long articles db client
+        self.long_articles_db_client = DatabaseConnector(long_articles_config)
+        self.long_articles_db_client.connect()
+
+        #  init aigc db client
+        self.denet_db_client = DatabaseConnector(denet_config)
+        self.denet_db_client.connect()
 
-    def get_account_position_read_rate(self, dt):
+    def fetch_read_rate_avg_for_each_account(self, dt):
         """
         从长文数据库获取账号阅读均值
         :return:
         """
         dt = int(dt.replace("-", ""))
         sql = f"""
-            SELECT 
-                gh_id, position, read_rate_avg
-            FROM
-                long_articles_read_rate
-            WHERE dt_version = {dt};
+            select gh_id, position, read_rate_avg
+            from {read_rate_table}
+            where dt_version = {dt};
         """
-
-        result = self.lam.select(sql)
+        fetch_response_list = self.long_articles_db_client.fetch(query=sql, cursor_type=DictCursor)
         account_read_rate_dict = {}
-        for item in result:
-            gh_id = item[0]
-            position = item[1]
-            rate = item[2]
-            key = "{}_{}".format(gh_id, position)
-            account_read_rate_dict[key] = rate
+        for item in fetch_response_list:
+            key = "{}_{}".format(item['gh_id'], item['position'])
+            account_read_rate_dict[key] = item['read_rate_avg']
         return account_read_rate_dict
 
-    def get_publishing_accounts(self):
-        """
-        获取每日正在发布的账号
-        :return:
-        """
-        sql = f"""
-        SELECT DISTINCT
-            t3.`name`,
-            t3.gh_id,
-            t3.follower_count,
-            t6.account_source_name,
-            t6.mode_type,
-            t6.account_type,
-            t6.`status`
-        FROM
-            publish_plan t1
-            JOIN publish_plan_account t2 ON t1.id = t2.plan_id
-            JOIN publish_account t3 ON t2.account_id = t3.id
-            LEFT JOIN publish_account_wx_type t4 on t3.id = t4.account_id
-            LEFT JOIN wx_statistics_group_source_account t5 on t3.id = t5.account_id
-            LEFT JOIN wx_statistics_group_source t6 on t5.group_source_name = t6.account_source_name
-        WHERE
-            t1.plan_status = 1
-            AND t3.channel = 5
-            GROUP BY t3.id;
-        """
-        account_list = self.de.select(sql)
-        result_list = [
-            {
-                "account_name": i[0],
-                "gh_id": i[1],
-                "fans": i[2],
-                "account_source_name": i[3],
-                "mode_type": i[4],
-                "account_type": i[5],
-                "status": i[6]
-            } for i in account_list
-        ]
-        return result_list
-
     def do_task_list(self, dt):
         """
         do it
         """
-        fans_dict = get_account_fans_by_dt(db_client=self.de)
-        account_list = self.get_publishing_accounts()
-        rate_dict = self.get_account_position_read_rate(dt)
+        # get fans dict from aigc
+        fans_dict = fetch_account_fans(self.denet_db_client, dt)
+
+        # get publishing account list from aigc
+        account_list = fetch_publishing_account_list(self.denet_db_client)
+
+        # fetch each account's read avg for each position
+        read_rate_avg_dict = self.fetch_read_rate_avg_for_each_account(dt)
+
         for account in tqdm(account_list, desc=dt):
             gh_id = account["gh_id"]
-            business_type = self.const.TOULIU if gh_id in touliu_accounts else self.const.ARTICLES_DAILY
-            fans = fans_dict.get(gh_id, {}).get(dt, 0)
+            business_type = const.TOULIU if gh_id in touliu_accounts else const.ARTICLES_DAILY
+            fans = fans_dict.get(gh_id, {}).get(dt, const.DEFAULT_FANS)
+
+            # use unauthorized account's fans if not found in aigc
+            if not fans:
+                fans = int(unauthorized_account.get(gh_id, const.DEFAULT_FANS))
+
+            # use backup account's fans if not found in aigc
             if not fans:
-                fans = int(unauthorized_account.get(gh_id, 0))
+                fans = int(backup_account_fans.get(gh_id, const.DEFAULT_FANS))
+
             if fans:
-                for index in range(1, 9):
+                for index in const.ARTICLE_INDEX_LIST:
                     gh_id_position = "{}_{}".format(gh_id, index)
-                    if rate_dict.get(gh_id_position):
-                        rate = rate_dict[gh_id_position]
-                        read_avg = fans * rate
-                        print(rate, read_avg)
+                    if read_rate_avg_dict.get(gh_id_position):
+                        # fetch read rate avg
+                        read_rate_avg = read_rate_avg_dict[gh_id_position]
+                        # cal read avg
+                        read_avg = fans * read_rate_avg
+
+                        # insert into database
                         insert_sql = f"""
-                        INSERT INTO account_avg_info_v3
-                        (gh_id, position, update_time, account_name, fans, read_avg, like_avg, status, account_type, account_mode, account_source, account_status, business_type, read_rate_avg)
-                        values
-                        (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
+                            insert into {read_avg_table}
+                            (gh_id, position, update_time, account_name, fans, read_avg, like_avg, status, account_type, account_mode, account_source, account_status, business_type, read_rate_avg)
+                            values
+                            (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
                         """
                         try:
-                            self.pq.update(
-                                sql=insert_sql,
+                            self.piaoquan_crawler_db_client.save(
+                                query=insert_sql,
                                 params=(
                                     gh_id,
                                     index,
@@ -159,29 +112,29 @@ class UpdateAccountInfoVersion3(object):
                                     account['account_name'],
                                     fans,
                                     read_avg,
-                                    0,
-                                    1,
+                                    const.DEFAULT_LIKE,
+                                    const.USING_STATUS,
                                     account['account_type'],
                                     account['mode_type'],
-                                    account['account_source_name'],
+                                    account['account_source'],
                                     account['status'],
                                     business_type,
-                                    rate
+                                    read_rate_avg
                                 )
                             )
                         except Exception as e:
-                            updateSQL = f"""
-                            UPDATE account_avg_info_v3
-                            set fans = %s, read_avg = %s, read_rate_avg = %s
-                            where gh_id = %s and position = %s and update_time = %s
+                            update_sql = f"""
+                                update {read_avg_table}
+                                set fans = %s, read_avg = %s, read_rate_avg = %s
+                                where gh_id = %s and position = %s and update_time = %s
                             """
                             try:
-                                affected_rows = self.pq.update(
-                                    sql=updateSQL,
+                                self.piaoquan_crawler_db_client.save(
+                                    query=update_sql,
                                     params=(
                                         fans,
                                         read_avg,
-                                        rate,
+                                        read_rate_avg,
                                         account['gh_id'],
                                         index,
                                         dt
@@ -192,17 +145,16 @@ class UpdateAccountInfoVersion3(object):
 
                         # 修改前一天的状态为 0
                         update_status_sql = f"""
-                        UPDATE account_avg_info_v3
-                        SET status = %s
-                        where update_time != %s and gh_id = %s and position = %s;
+                            update {read_avg_table}
+                            set status = %s
+                            where update_time != %s and gh_id = %s and position = %s;
                         """
-                        rows_affected = self.pq.update(
-                            sql=update_status_sql,
+                        self.piaoquan_crawler_db_client.save(
+                            query=update_status_sql,
                             params=(
-                                0, dt, account['gh_id'], index
+                                const.NOT_USING_STATUS, dt, account['gh_id'], index
                             )
                         )
-                        print("修改成功")
 
 
 def main():
@@ -215,15 +167,15 @@ def main():
                         help="Run only once for date in format of %Y-%m-%d. \
                                 If no specified, run as daily jobs.")
     args = parser.parse_args()
-    Up = UpdateAccountInfoVersion3()
+    update_account_read_avg_task = UpdateAccountInfoVersion3()
     if args.run_date:
-        Up.do_task_list(dt=args.run_date)
+        update_account_read_avg_task.do_task_list(dt=args.run_date)
     else:
         dt_object = datetime.fromtimestamp(int(time.time()))
         one_day = timedelta(days=1)
         yesterday = dt_object - one_day
         yesterday_str = yesterday.strftime('%Y-%m-%d')
-        Up.do_task_list(dt=yesterday_str)
+        update_account_read_avg_task.do_task_list(dt=yesterday_str)
 
 
 if __name__ == '__main__':