Explorar o código

Merge branch '2024-12-18-publish-video-use-new-db' of luojunhui/LongArticlesJob into master

luojunhui hai 4 meses
pai
achega
52cf17e16c

+ 6 - 0
applications/const.py

@@ -43,6 +43,12 @@ class updatePublishedMsgTaskConst:
     # 监测周期(秒)
     MONITOR_PERIOD = 60 * 60 * 24 * 7
 
+    # 新号抓文章周期
+    NEW_ACCOUNT_CRAWL_PERIOD = 60 * 60 * 24 * 30
+
+    # 订阅号,抓取失败失败率报警阈值
+    SUBSCRIBE_FAIL_RATE_THRESHOLD = 0.3
+
 
 class updateAccountReadRateTaskConst:
     """

+ 168 - 0
applications/db/__init__.py

@@ -0,0 +1,168 @@
+"""
+@author: luojunhui
+"""
+
+import pymysql
+from contextlib import contextmanager
+
+from applications.exception import QueryError, TransactionError
+
+
+class DatabaseConnector:
+    """
+    数据库连接器,使用 pymysql 进行 MySQL 数据库操作。
+    """
+
+    def __init__(self, db_config):
+        """
+        初始化数据库连接配置。
+
+        :param db_config:
+        """
+        self.db_config = db_config
+        self.connection = None
+
+    def connect(self):
+        """
+        建立数据库连接。
+
+        :raises ConnectionError: 如果无法连接到数据库。
+        """
+        try:
+            self.connection = pymysql.connect(
+                host=self.db_config.get('host', 'localhost'),
+                user=self.db_config['user'],
+                password=self.db_config['password'],
+                db=self.db_config['db'],
+                port=self.db_config.get('port', 3306),
+                charset=self.db_config.get('charset', 'utf8mb4')
+            )
+        except pymysql.MySQLError as e:
+            raise ConnectionError(f"无法连接到数据库: {e}")
+
+    def close(self):
+        """
+        关闭数据库连接。
+        """
+        if self.connection:
+            self.connection.close()
+            self.connection = None
+
+    def fetch(self, query, cursor_type=None):
+        """
+        执行单条查询语句,并返回结果。
+        :param cursor_type: 输出的返回格式
+        :param query: 查询语句
+        :return: 查询结果列表
+        :raises QueryError: 如果执行查询时出错。
+        """
+        if not self.connection:
+            self.connect()
+
+        try:
+            with self.connection.cursor(cursor_type) as cursor:
+                cursor.execute(query)
+                result = cursor.fetchall()
+                return result
+        except pymysql.MySQLError as e:
+            self.rollback()
+            raise QueryError(e, query)
+
+    def save(self, query, params):
+        """
+        执行单条插入、更新语句
+        :param query:
+        :param params:
+        :return:
+        """
+        if not self.connection:
+            self.connect()
+
+        try:
+            with self.connection.cursor() as cursor:
+                affected_rows = cursor.execute(query, params)
+                if affected_rows:
+                    self.commit()
+                    return affected_rows
+                else:
+                    self.rollback()
+                    return 0
+        except pymysql.MySQLError as e:
+            self.rollback()
+            raise QueryError(e, query)
+
+    def save_many(self, query, params_list):
+        """
+        执行多条查询语句
+        :param query: SQL 查询语句。
+        :param params_list: 包含多个参数的列表。
+        :raises QueryError: 如果执行查询时出错。
+        """
+        if not self.connection:
+            self.connect()
+
+        try:
+            with self.connection.cursor() as cursor:
+                affected_rows = cursor.executemany(query, params_list)
+                if affected_rows:
+                    self.commit()
+                    return affected_rows
+                else:
+                    self.rollback()
+                    return 0
+        except pymysql.MySQLError as e:
+            self.rollback()
+            raise QueryError(e, query)
+
+    def commit(self):
+        """
+        提交当前事务。
+
+        :raises TransactionError: 如果提交事务时出错。
+        """
+        if not self.connection:
+            raise TransactionError("没有活动的数据库连接。")
+        try:
+            self.connection.commit()
+        except pymysql.MySQLError as e:
+            self.connection.rollback()
+            raise TransactionError(f"提交事务失败: {e}")
+
+    def rollback(self):
+        """
+        回滚当前事务。
+
+        :raises TransactionError: 如果回滚事务时出错。
+        """
+        if not self.connection:
+            raise TransactionError("没有活动的数据库连接。")
+        try:
+            self.connection.rollback()
+        except pymysql.MySQLError as e:
+            raise TransactionError(f"回滚事务失败: {e}")
+
+    @contextmanager
+    def transaction(self):
+        """
+        上下文管理器,用于处理事务
+        """
+        try:
+            yield self.commit()
+        except Exception as e:
+            self.rollback()
+            raise e
+
+    def __enter__(self):
+        """
+        支持 with 语句,进入上下文时建立连接。
+        """
+        self.connect()
+        return self
+
+    def __exit__(self, exc_type, exc_val, exc_tb):
+        """
+        支持 with 语句,退出上下文时关闭连接。
+        """
+        if exc_type:
+            self.rollback()
+        self.close()

+ 5 - 0
applications/exception/__init__.py

@@ -0,0 +1,5 @@
+"""
+@author: luojunhui
+"""
+from .query_error import QueryError
+from .transaction_error import TransactionError

+ 27 - 0
applications/exception/query_error.py

@@ -0,0 +1,27 @@
+"""
+@author: luojunhui
+"""
+import json
+
+from applications import log
+
+
+class QueryError(Exception):
+    """数据库查询异常"""
+
+    def __init__(self, error=None, sql=None):
+        """
+        :param error: 异常对象,可选,用于提供更详细的错误信息。
+        :param sql: 出错的 SQL 语句,可选,用于调试和记录日志。
+        """
+        error_obj = {
+            "error": str(error),
+            "sql": sql,
+            "message": "sql语法错误"
+        }
+        log(
+            task="sql_query",
+            function="log_query_error",
+            data=error_obj
+        )
+        super().__init__(json.dumps(error_obj, ensure_ascii=False, indent=4))

+ 21 - 0
applications/exception/transaction_error.py

@@ -0,0 +1,21 @@
+"""
+@author: luojunhui
+事务异常
+"""
+
+from applications import log
+
+
+class TransactionError(Exception):
+    """事务处理失败"""
+
+    def __init__(self, message):
+        """
+        :param message: 异常消息
+        """
+        log(
+            task="transaction",
+            function="log_query_error",
+            message=message
+        )
+        super().__init__(message)

+ 14 - 12
coldStartTasks/publish/publish_video_to_pq_for_audit.py

@@ -11,9 +11,10 @@ from pymysql.cursors import DictCursor
 
 from applications import log
 from applications import PQAPI
-from applications import longArticlesMySQL
 from applications.const import WeixinVideoCrawlerConst
 from applications.api import generate_mini_program_title
+from applications.db import DatabaseConnector
+from config import long_articles_config
 
 const = WeixinVideoCrawlerConst()
 pq_functions = PQAPI()
@@ -25,7 +26,8 @@ class PublishVideosForAudit(object):
     """
 
     def __init__(self):
-        self.db = longArticlesMySQL()
+        self.db_client = DatabaseConnector(db_config=long_articles_config)
+        self.db_client.connect()
 
     def get_publish_video_list(self) -> List[Dict]:
         """
@@ -40,7 +42,7 @@ class PublishVideosForAudit(object):
             WHERE audit_status = {const.VIDEO_AUDIT_INIT_STATUS} and bad_status = {const.TITLE_DEFAULT_STATUS}
             LIMIT {rest_count};
             """
-        response = self.db.select(sql, cursor_type=DictCursor)
+        response = self.db_client.fetch(sql, cursor_type=DictCursor)
         return response
 
     def update_audit_status(self, video_id: int, ori_audit_status: int, new_audit_status: int) -> int:
@@ -57,8 +59,8 @@ class PublishVideosForAudit(object):
             SET audit_status = %s 
             WHERE audit_video_id = %s and audit_status = %s;
         """
-        affected_rows = self.db.update(
-            sql=update_sql,
+        affected_rows = self.db_client.save(
+            query=update_sql,
             params=(new_audit_status, video_id, ori_audit_status)
         )
         return affected_rows
@@ -74,7 +76,7 @@ class PublishVideosForAudit(object):
             WHERE audit_status != {const.VIDEO_AUDIT_INIT_STATUS}
             AND DATE(FROM_UNIXTIME(audit_timestamp)) = CURDATE();
         """
-        response = self.db.select(select_sql, cursor_type=DictCursor)
+        response = self.db_client.fetch(select_sql, cursor_type=DictCursor)
         return response[0]['total_count']
 
     def publish_each_video(self, video_obj: Dict) -> Dict:
@@ -96,8 +98,8 @@ class PublishVideosForAudit(object):
                 SET audit_status = %s, audit_video_id = %s, audit_timestamp = %s
                 WHERE id = %s;
             """
-            affected_rows = self.db.update(
-                sql=update_sql,
+            affected_rows = self.db_client.save(
+                query=update_sql,
                 params=(const.VIDEO_AUDIT_PROCESSING_STATUS, video_id, int(time.time()), video_obj['id'])
             )
             if affected_rows:
@@ -121,7 +123,7 @@ class PublishVideosForAudit(object):
                     SET audit_status = %s 
                     WHERE id = %s and audit_status = %s;
                 """
-                self.db.update(update_sql, params=(const.VIDEO_AUDIT_FAIL_STATUS, video_obj['id'], const.VIDEO_AUDIT_INIT_STATUS))
+                self.db_client.save(update_sql, params=(const.VIDEO_AUDIT_FAIL_STATUS, video_obj['id'], const.VIDEO_AUDIT_INIT_STATUS))
 
             result = {
                 "status": "fail",
@@ -138,7 +140,7 @@ class PublishVideosForAudit(object):
         :return:
         """
         sql = f"""SELECT audit_video_id FROM publish_single_video_source WHERE audit_status = {const.VIDEO_AUDIT_PROCESSING_STATUS};"""
-        response = self.db.select(sql, cursor_type=DictCursor)
+        response = self.db_client.fetch(sql, cursor_type=DictCursor)
         return response
 
     def update_mini_program_title(self, video_id: int) -> bool:
@@ -148,14 +150,14 @@ class PublishVideosForAudit(object):
         select_sql = f"""
             SELECT article_title FROM publish_single_video_source WHERE audit_video_id = {video_id};
         """
-        title = self.db.select(select_sql, cursor_type=DictCursor)[0]['article_title']
+        title = self.db_client.fetch(select_sql, cursor_type=DictCursor)[0]['article_title']
 
         try:
             mini_program_title = generate_mini_program_title(title)
             update_sql = f"""
             UPDATE publish_single_video_source SET mini_program_title = %s WHERE audit_video_id = %s;
             """
-            self.db.update(update_sql, params=(mini_program_title, video_id))
+            self.db_client.save(update_sql, params=(mini_program_title, video_id))
             log(
                 task="publish_video_for_audit",
                 function="update_mini_program_title",

+ 34 - 1
config/__init__.py

@@ -46,4 +46,37 @@ class apolloConfig(object):
         :return:
         """
         response = self.apollo_connection.get_value(key)
-        return response
+        return response
+
+
+# aigc后台数据库连接配置
+denet_config = {
+    'host': 'rm-t4na9qj85v7790tf84o.mysql.singapore.rds.aliyuncs.com',
+    'port': 3306,
+    'user': 'crawler_admin',
+    'password': 'cyber#crawler_2023',
+    'db': 'aigc-admin-prod',
+    'charset': 'utf8mb4'
+}
+
+
+# 长文数据库连接配置
+long_articles_config = {
+    'host': 'rm-bp14529nwwcw75yr1ko.mysql.rds.aliyuncs.com',
+    'port': 3306,
+    'user': 'changwen_admin',
+    'password': 'changwen@123456',
+    'db': 'long_articles',
+    'charset': 'utf8mb4'
+}
+
+
+# 票圈爬虫库数据库配置
+piaoquan_crawler_config = {
+    'host': 'rm-bp1159bu17li9hi94.mysql.rds.aliyuncs.com',
+    'port': 3306,
+    'user': 'crawler',
+    'password': 'crawler123456@',
+    'db': 'piaoquan-crawler',
+    'charset': 'utf8mb4'
+}

+ 2 - 1
flow_pool/exit_article_with_title.py

@@ -224,4 +224,5 @@ def main():
 
 
 if __name__ == '__main__':
-    main()
+    main()
+

+ 217 - 261
updatePublishedMsgDaily.py

@@ -2,19 +2,26 @@
 @author: luojunhui
 @description: update daily information into official articles v2
 """
-
-import time
 import json
+import time
 import traceback
-
 import urllib.parse
-from tqdm import tqdm
-from datetime import datetime
 from argparse import ArgumentParser
+from datetime import datetime
+from typing import Dict, List, Tuple
 
-from applications import PQMySQL, WeixinSpider, Functions, log, bot, aiditApi, longArticlesMySQL, \
-    create_feishu_columns_sheet
+from pymysql.cursors import DictCursor
+from tqdm import tqdm
+
+from applications import aiditApi
+from applications import bot
+from applications import create_feishu_columns_sheet
+from applications import Functions
+from applications import log
+from applications import WeixinSpider
 from applications.const import updatePublishedMsgTaskConst
+from applications.db import DatabaseConnector
+from config import denet_config, long_articles_config, piaoquan_crawler_config
 
 ARTICLE_TABLE = "official_articles_v2"
 const = updatePublishedMsgTaskConst()
@@ -38,20 +45,23 @@ def generate_bot_columns():
     return columns
 
 
-def get_account_using_status():
+def get_account_status(db_client: DatabaseConnector) -> Dict:
     """
-    获取正在 using 的 ghid
+    获取账号的实验状态
     :return:
     """
-    sql = "SELECT gh_id FROM long_articles_publishing_accounts WHERE is_using = 1;"
-    gh_id_tuple = PQMySQL().select(sql)
-    gh_id_list = [
-        i[0] for i in gh_id_tuple
-    ]
-    return set(gh_id_list)
+    sql = f"""  
+            SELECT t1.account_id, t2.status
+            FROM wx_statistics_group_source_account t1
+            JOIN wx_statistics_group_source t2
+            ON t1.group_source_name = t2.account_source_name;
+            """
+    account_status_list = db_client.fetch(sql, cursor_type=DictCursor)
+    account_status_dict = {account['account_id']: account['status'] for account in account_status_list}
+    return account_status_dict
 
 
-def get_accounts():
+def get_accounts(db_client: DatabaseConnector) -> List[Dict]:
     """
     从 aigc 数据库中获取目前处于发布状态的账号
     :return:
@@ -62,33 +72,28 @@ def get_accounts():
     "account_type": line[4], # 订阅号 or 服务号
     "account_auth": line[5]
     """
-    using_account_set = get_account_using_status()
     account_list_with_out_using_status = aiditApi.get_publish_account_from_aigc()
-    # only_auto_reply_accounts_set = aiditApi.get_only_auto_reply_accounts()
-    account_list = []
-    for item in account_list_with_out_using_status:
-        # if item['account_id'] in only_auto_reply_accounts_set:
-        #     continue
-        if item['ghId'] in using_account_set:
-            item['using_status'] = 1
-        else:
-            item['using_status'] = 0
-        account_list.append(item)
-    subscription_account = [i for i in account_list if i['account_type'] in const.SUBSCRIBE_TYPE_SET]
-    server_account = [i for i in account_list if i['account_type'] == const.SERVICE_TYPE]
-    return subscription_account, server_account
+    account_status_dict = get_account_status(db_client)
+    account_list = [
+        {
+            **item,
+            'using_status': 0 if account_status_dict.get(item['account_id']) == '实验' else 1
+        }
+        for item in account_list_with_out_using_status
+    ]
+    return account_list
 
 
-def insert_each_msg(db_client, account_info, account_name, msg_list):
+def insert_each_msg(db_client: DatabaseConnector, account_info: Dict, msg_list: List[Dict]) -> None:
     """
     把消息数据更新到数据库中
     :param account_info:
     :param db_client:
-    :param account_name:
     :param msg_list:
     :return:
     """
     gh_id = account_info['ghId']
+    account_name = account_info['name']
     for info in msg_list:
         baseInfo = info.get("BaseInfo", {})
         appMsgId = info.get("AppMsg", {}).get("BaseInfo", {}).get("AppMsgId", None)
@@ -109,7 +114,7 @@ def insert_each_msg(db_client, account_info, account_name, msg_list):
                 ItemShowType = article.get("ItemShowType", None)
                 IsOriginal = article.get("IsOriginal", None)
                 ShowDesc = article.get("ShowDesc", None)
-                show_stat = Functions().show_desc_to_sta(ShowDesc)
+                show_stat = functions.show_desc_to_sta(ShowDesc)
                 ori_content = article.get("ori_content", None)
                 show_view_count = show_stat.get("show_view_count", 0)
                 show_like_count = show_stat.get("show_like_count", 0)
@@ -142,7 +147,7 @@ def insert_each_msg(db_client, account_info, account_name, msg_list):
                     show_pay_count,
                     wx_sn,
                     json.dumps(baseInfo, ensure_ascii=False),
-                    Functions().str_to_md5(title),
+                    functions.str_to_md5(title),
                     status
                 )
                 try:
@@ -152,7 +157,7 @@ def insert_each_msg(db_client, account_info, account_name, msg_list):
                         values
                         (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
                         """
-                    db_client.update(sql=insert_sql, params=info_tuple)
+                    db_client.save(query=insert_sql, params=info_tuple)
                     log(
                         task="updatePublishedMsgDaily",
                         function="insert_each_msg",
@@ -169,8 +174,8 @@ def insert_each_msg(db_client, account_info, account_name, msg_list):
                         SET show_view_count = %s, show_like_count=%s
                         WHERE wx_sn = %s;
                         """
-                        db_client.update(sql=update_sql,
-                                         params=(show_view_count, show_like_count, wx_sn))
+                        db_client.save(query=update_sql,
+                                       params=(show_view_count, show_like_count, wx_sn))
                         log(
                             task="updatePublishedMsgDaily",
                             function="insert_each_msg",
@@ -192,31 +197,29 @@ def insert_each_msg(db_client, account_info, account_name, msg_list):
                         continue
 
 
-def update_each_account(db_client, account_info, account_name, latest_update_time, cursor=None):
+def update_each_account(db_client: DatabaseConnector, account_info: Dict, latest_update_time: int, cursor=None):
     """
     更新每一个账号信息
     :param account_info:
-    :param account_name:
     :param cursor:
     :param latest_update_time: 最新更新时间
     :param db_client: 数据库连接信息
     :return: None
     """
     gh_id = account_info['ghId']
-    response = WeixinSpider().update_msg_list(ghId=gh_id, index=cursor)
-    msg_list = response.get("data", {}).get("data", {})
+    response = spider.update_msg_list(ghId=gh_id, index=cursor)
+    msg_list = response.get("data", {}).get("data", [])
     if msg_list:
         # do
         last_article_in_this_msg = msg_list[-1]
         last_time_stamp_in_this_msg = last_article_in_this_msg['AppMsg']['BaseInfo']['UpdateTime']
         last_url = last_article_in_this_msg['AppMsg']['DetailInfo'][0]['ContentUrl']
-        resdata = WeixinSpider().get_account_by_url(last_url)
+        resdata = spider.get_account_by_url(last_url)
         check_id = resdata['data'].get('data', {}).get('wx_gh')
         if check_id == gh_id:
             insert_each_msg(
                 db_client=db_client,
                 account_info=account_info,
-                account_name=account_name,
                 msg_list=msg_list
             )
             if last_time_stamp_in_this_msg > latest_update_time:
@@ -224,7 +227,6 @@ def update_each_account(db_client, account_info, account_name, latest_update_tim
                 return update_each_account(
                     db_client=db_client,
                     account_info=account_info,
-                    account_name=account_name,
                     latest_update_time=latest_update_time,
                     cursor=next_cursor
                 )
@@ -245,57 +247,43 @@ def update_each_account(db_client, account_info, account_name, latest_update_tim
         return
 
 
-def check_account_info(db_client, gh_id, account_name):
+def check_account_info(db_client: DatabaseConnector, gh_id: str) -> int:
     """
-    通过 gh_id查询视频信息
-    :param account_name:
+    通过 gh_id查询账号信息的最新发布时间
     :param db_client:
     :param gh_id:
     :return:
     """
     sql = f"""
-        SELECT accountName, updateTime 
+        SELECT MAX(publish_timestamp)
         FROM {ARTICLE_TABLE}
-        WHERE ghId = '{gh_id}' 
-        ORDER BY updateTime DESC LIMIT 1;
+        WHERE ghId = '{gh_id}';
         """
-    result = db_client.select(sql)
+    result = db_client.fetch(sql)
     if result:
-        old_account_name, update_time = result[0]
-        return {
-            "account_name": old_account_name,
-            "update_time": update_time,
-            "account_type": "history"
-        }
+        return result[0][0]
     else:
-        return {
-            "account_name": account_name,
-            "update_time": int(time.time()) - 30 * 24 * 60 * 60,
-            "account_type": "new"
-        }
+        # 新号,抓取周期定位抓取时刻往前推30天
+        return int(time.time()) - const.NEW_ACCOUNT_CRAWL_PERIOD
 
 
-def update_single_account(db_client, account_info):
+def update_single_account(db_client: DatabaseConnector, account_info: Dict):
     """
-
-    :param account_info:
+    更新单个账号
     :param db_client:
+    :param account_info:
     :return:
     """
     gh_id = account_info['ghId']
-    account_name = account_info['name']
-    account_detail = check_account_info(db_client, gh_id, account_name)
-    account_name = account_detail['account_name']
-    update_time = account_detail['update_time']
+    max_publish_time = check_account_info(db_client, gh_id)
     update_each_account(
         db_client=db_client,
         account_info=account_info,
-        account_name=account_name,
-        latest_update_time=update_time
+        latest_update_time=max_publish_time
     )
 
 
-def check_single_account(db_client, account_item):
+def check_single_account(db_client: DatabaseConnector, account_item: Dict) -> bool:
     """
     校验每个账号是否更新
     :param db_client:
@@ -308,15 +296,12 @@ def check_single_account(db_client, account_item):
     today_date_time = datetime.strptime(today_str, "%Y-%m-%d")
     today_timestamp = today_date_time.timestamp()
     sql = f"""
-            SELECT updateTime
+            SELECT max(updateTime)
             FROM {ARTICLE_TABLE}
-            WHERE ghId = '{gh_id}'
-            ORDER BY updateTime
-            DESC
-            LIMIT 1;
+            WHERE ghId = '{gh_id}';
             """
     try:
-        latest_update_time = db_client.select(sql)[0][0]
+        latest_update_time = db_client.fetch(sql)[0][0]
         # 判断该账号当天发布的文章是否被收集
         if account_type in const.SUBSCRIBE_TYPE_SET:
             if int(latest_update_time) > int(today_timestamp):
@@ -329,146 +314,11 @@ def check_single_account(db_client, account_item):
             else:
                 return False
     except Exception as e:
-        print("updateTime Error -- {}".format(e))
+        print(e)
         return False
 
 
-def update_job():
-    """
-    更新任务
-    :return:
-    """
-    try:
-        db_client = PQMySQL()
-    except Exception as e:
-        error_msg = traceback.format_exc()
-        bot(
-            title="更新文章任务连接数据库失败",
-            detail={
-                "error": e,
-                "msg": error_msg
-            }
-        )
-        return
-    sub_accounts, server_accounts = get_accounts()
-    s_count = 0
-    f_count = 0
-    for sub_item in tqdm(sub_accounts):
-        try:
-            update_single_account(db_client, sub_item)
-            s_count += 1
-            time.sleep(5)
-        except Exception as e:
-            f_count += 1
-            log(
-                task="updatePublishedMsgDaily",
-                function="update_job",
-                message="单个账号文章更新失败, 报错信息是: {}".format(e),
-                status="fail",
-            )
-    log(
-        task="updatePublishedMsgDaily",
-        function="update_job",
-        message="订阅号更新完成",
-        data={
-            "success": s_count,
-            "fail": f_count
-        }
-    )
-
-    if f_count / (s_count + f_count) > 0.3:
-        bot(
-            title="订阅号超过 30% 的账号更新失败",
-            detail={
-                "success": s_count,
-                "fail": f_count,
-                "failRate": f_count / (s_count + f_count)
-            }
-        )
-    bot(
-        title="更新每日发布文章任务完成通知",
-        detail={
-            "msg": "订阅号更新完成",
-            "finish_time": datetime.today().__str__()
-        },
-        mention=False
-    )
-    for sub_item in tqdm(server_accounts):
-        try:
-            update_single_account(db_client, sub_item)
-            time.sleep(5)
-        except Exception as e:
-            print(e)
-    bot(
-        title="更新每日发布文章任务完成通知",
-        detail={
-            "msg": "服务号更新完成",
-            "finish_time": datetime.today().__str__()
-        },
-        mention=False
-    )
-
-
-def check_job():
-    """
-    校验任务
-    :return:
-    """
-    try:
-        db_client = PQMySQL()
-    except Exception as e:
-        error_msg = traceback.format_exc()
-        bot(
-            title="校验更新文章任务连接数据库失败",
-            detail={
-                "job": "check_job",
-                "error": e,
-                "msg": error_msg
-            }
-        )
-        return
-    sub_accounts, server_accounts = get_accounts()
-    fail_list = []
-    # account_list = sub_accounts + server_accounts
-    account_list = sub_accounts
-    # check and rework if fail
-    for sub_item in tqdm(account_list):
-        res = check_single_account(db_client, sub_item)
-        if not res:
-            update_single_account(db_client, sub_item)
-    # check whether success and bot if fails
-    for sub_item in tqdm(account_list):
-        res = check_single_account(db_client, sub_item)
-        if not res:
-            # 去掉三个不需要查看的字段
-            sub_item.pop('account_type', None)
-            sub_item.pop('account_auth', None)
-            sub_item.pop('account_id', None)
-            fail_list.append(sub_item)
-    if fail_list:
-        try:
-            bot(
-                title="日常报警, 存在账号更新失败",
-                detail={
-                    "columns": generate_bot_columns(),
-                    "rows": fail_list
-                },
-                table=True
-            )
-        except Exception as e:
-            print("Timeout Error: {}".format(e))
-    else:
-        bot(
-            title="校验完成通知",
-            mention=False,
-            detail={
-                "msg": "校验任务完成",
-                "finish_time": datetime.today().__str__()
-            }
-        )
-
-
-def get_articles(db_client):
+def get_articles(db_client: DatabaseConnector):
     """
 
     :return:
@@ -477,11 +327,11 @@ def get_articles(db_client):
     SELECT ContentUrl, wx_sn 
     FROM {ARTICLE_TABLE}
     WHERE publish_timestamp in {(const.DEFAULT_STATUS, const.REQUEST_FAIL_STATUS)};"""
-    response = db_client.select(sql)
+    response = db_client.fetch(sql)
     return response
 
 
-def update_publish_timestamp(db_client, row):
+def update_publish_timestamp(db_client: DatabaseConnector, row: Tuple):
     """
     更新发布时间戳 && minigram 信息
     :param db_client:
@@ -491,7 +341,7 @@ def update_publish_timestamp(db_client, row):
     url = row[0]
     wx_sn = row[1]
     try:
-        response = WeixinSpider().get_article_text(url)
+        response = spider.get_article_text(url)
         response_code = response['code']
 
         if response_code == const.ARTICLE_DELETE_CODE:
@@ -528,8 +378,8 @@ def update_publish_timestamp(db_client, row):
             SET publish_timestamp = %s, root_source_id_list = %s
             WHERE wx_sn = %s;
         """
-    db_client.update(
-        sql=update_sql,
+    db_client.save(
+        query=update_sql,
         params=(
             publish_timestamp_s,
             json.dumps(root_source_id_list, ensure_ascii=False),
@@ -541,24 +391,12 @@ def update_publish_timestamp(db_client, row):
         return None
 
 
-def get_article_detail_job():
+def get_article_detail_job(db_client: DatabaseConnector):
     """
     获取发布文章详情
     :return:
     """
-    try:
-        db_client = PQMySQL()
-    except Exception as e:
-        error_msg = traceback.format_exc()
-        bot(
-            title="获取文章详情任务连接数据库失败",
-            detail={
-                "job": "get_article_detail_job",
-                "error": e,
-                "msg": error_msg
-            }
-        )
-        return
+
     article_tuple = get_articles(db_client)
     for article in tqdm(article_tuple):
         try:
@@ -593,8 +431,8 @@ def get_article_detail_job():
         SET oav.publish_timestamp = vv.publish_timestamp
         WHERE oav.publish_timestamp <= %s;
     """
-    affected_rows = db_client.update(
-        sql=update_sql,
+    db_client.save(
+        query=update_sql,
         params=(0, 0)
     )
 
@@ -604,8 +442,8 @@ def get_article_detail_job():
         SET publish_timestamp = updateTime
         WHERE publish_timestamp < %s;
     """
-    db_client.update(
-        sql=update_sql_2,
+    db_client.save(
+        query=update_sql_2,
         params=0
     )
     if fail_list:
@@ -615,7 +453,7 @@ def get_article_detail_job():
         )
 
 
-def whether_title_unsafe(db_client, title):
+def whether_title_unsafe(db_client: DatabaseConnector, title: str):
     """
     检查文章标题是否已经存在违规记录
     :param db_client:
@@ -628,33 +466,132 @@ def whether_title_unsafe(db_client, title):
         FROM article_unsafe_title
         WHERE title_md5 = '{title_md5}';
     """
-    res = db_client.select(sql)
+    res = db_client.fetch(sql)
     if res:
         return True
     else:
         return False
 
 
-def monitor(run_date):
+def update_job(piaoquan_crawler_db_client, aigc_db_client):
     """
-    监控任务, 监测周期为7天,监测文章是否被违规,若监测到违规文章,则进行告警
+    更新任务
     :return:
     """
-    try:
-        pq_client = PQMySQL()
-        lam_client = longArticlesMySQL()
-    except Exception as e:
-        error_msg = traceback.format_exc()
+    account_list = get_accounts(db_client=aigc_db_client)
+    # 订阅号
+    subscription_accounts = [i for i in account_list if i['account_type'] in const.SUBSCRIBE_TYPE_SET]
+    success_count = 0
+    fail_count = 0
+    for sub_item in tqdm(subscription_accounts):
+        try:
+            update_single_account(piaoquan_crawler_db_client, sub_item)
+            success_count += 1
+            time.sleep(5)
+        except Exception as e:
+            fail_count += 1
+            log(
+                task="updatePublishedMsgDaily",
+                function="update_job",
+                message="单个账号文章更新失败, 报错信息是: {}".format(e),
+                status="fail",
+            )
+    log(
+        task="updatePublishedMsgDaily",
+        function="update_job",
+        message="订阅号更新完成",
+        data={
+            "success": success_count,
+            "fail": fail_count
+        }
+    )
+
+    if fail_count / (success_count + fail_count) > const.SUBSCRIBE_FAIL_RATE_THRESHOLD:
         bot(
-            title="监控任务连接数据库失败",
+            title="订阅号超过 {}% 的账号更新失败".format(int(const.SUBSCRIBE_FAIL_RATE_THRESHOLD * 100)),
             detail={
-                "job": "monitor",
-                "error": str(e),
-                "msg": error_msg
+                "success": success_count,
+                "fail": fail_count,
+                "failRate": fail_count / (success_count + fail_count)
             }
         )
-        return
+    bot(
+        title="更新每日发布文章任务完成通知",
+        detail={
+            "msg": "订阅号更新完成",
+            "finish_time": datetime.today().__str__()
+        },
+        mention=False
+    )
+    # 服务号
+    server_accounts = [i for i in account_list if i['account_type'] == const.SERVICE_TYPE]
+    for sub_item in tqdm(server_accounts):
+        try:
+            update_single_account(piaoquan_crawler_db_client, sub_item)
+            time.sleep(5)
+        except Exception as e:
+            print(e)
+    bot(
+        title="更新每日发布文章任务完成通知",
+        detail={
+            "msg": "服务号更新完成",
+            "finish_time": datetime.today().__str__()
+        },
+        mention=False
+    )
 
+
+def check_job(piaoquan_crawler_db_client, aigc_db_client):
+    """
+    校验任务
+    :return:
+    """
+    account_list = get_accounts(db_client=aigc_db_client)
+    # 订阅号
+    subscription_accounts = [i for i in account_list if i['account_type'] in const.SUBSCRIBE_TYPE_SET]
+    fail_list = []
+    # check and rework if fail
+    for sub_item in tqdm(subscription_accounts):
+        res = check_single_account(piaoquan_crawler_db_client, sub_item)
+        if not res:
+            update_single_account(piaoquan_crawler_db_client, sub_item)
+    # check whether success and bot if fails
+    for sub_item in tqdm(account_list):
+        res = check_single_account(piaoquan_crawler_db_client, sub_item)
+        if not res:
+            # 去掉三个不需要查看的字段
+            sub_item.pop('account_type', None)
+            sub_item.pop('account_auth', None)
+            sub_item.pop('account_id', None)
+            fail_list.append(sub_item)
+    if fail_list:
+        try:
+            bot(
+                title="更新当天发布文章,存在未更新的账号",
+                detail={
+                    "columns": generate_bot_columns(),
+                    "rows": fail_list
+                },
+                table=True
+            )
+        except Exception as e:
+            print("Timeout Error: {}".format(e))
+    else:
+        bot(
+            title="更新当天发布文章,所有账号均更新成功",
+            mention=False,
+            detail={
+                "msg": "校验任务完成",
+                "finish_time": datetime.today().__str__()
+            }
+        )
+
+
+def monitor(piaoquan_crawler_db_client, long_articles_db_client, run_date):
+    """
+    监控任务, 监测周期为7天,监测文章是否被违规,若监测到违规文章,则进行告警
+    :return:
+    """
     if not run_date:
         run_date = datetime.today().strftime("%Y-%m-%d")
 
@@ -664,13 +601,13 @@ def monitor(run_date):
         FROM {ARTICLE_TABLE}
         WHERE publish_timestamp >= {monitor_start_timestamp};
     """
-    article_list = pq_client.select(select_sql)
+    article_list = piaoquan_crawler_db_client.fetch(select_sql)
     for article in tqdm(article_list, desc="monitor article list"):
         gh_id = article[0]
         account_name = article[1]
         title = article[2]
         # 判断标题是否存在违规记录
-        if whether_title_unsafe(lam_client, title):
+        if whether_title_unsafe(long_articles_db_client, title):
             continue
         url = article[3]
         wx_sn = article[4]
@@ -726,27 +663,46 @@ def main():
     )
     args = parser.parse_args()
 
+    # 初始化数据库连接
+    try:
+        piaoquan_crawler_db_client = DatabaseConnector(piaoquan_crawler_config)
+        piaoquan_crawler_db_client.connect()
+        aigc_db_client = DatabaseConnector(denet_config)
+        aigc_db_client.connect()
+        long_articles_db_client = DatabaseConnector(long_articles_config)
+    except Exception as e:
+        error_msg = traceback.format_exc()
+        bot(
+            title="更新文章任务连接数据库失败",
+            detail={
+                "error": e,
+                "msg": error_msg
+            }
+        )
+        return
+
     if args.run_task:
         run_task = args.run_task
         match run_task:
             case "update":
-                update_job()
+                update_job(piaoquan_crawler_db_client=piaoquan_crawler_db_client, aigc_db_client=aigc_db_client)
             case "check":
-                check_job()
+                check_job(piaoquan_crawler_db_client=piaoquan_crawler_db_client, aigc_db_client=aigc_db_client)
             case "detail":
-                get_article_detail_job()
+                get_article_detail_job(db_client=piaoquan_crawler_db_client)
             case "monitor":
                 if args.run_date:
                     run_date = args.run_date
                 else:
                     run_date = None
-                monitor(run_date)
+                monitor(piaoquan_crawler_db_client=piaoquan_crawler_db_client,
+                        long_articles_db_client=long_articles_db_client, run_date=run_date)
             case _:
                 print("No such task, input update: update_job, check: check_job, detail: get_article_detail_job")
     else:
-        update_job()
-        check_job()
-        get_article_detail_job()
+        update_job(piaoquan_crawler_db_client=piaoquan_crawler_db_client, aigc_db_client=aigc_db_client)
+        check_job(piaoquan_crawler_db_client=piaoquan_crawler_db_client, aigc_db_client=aigc_db_client)
+        get_article_detail_job(db_client=piaoquan_crawler_db_client)
 
 
 if __name__ == '__main__':