Przeglądaj źródła

db_connector improve

luojunhui 4 miesięcy temu
rodzic
commit
2318a31102
2 zmienionych plików z 139 dodań i 176 usunięć
  1. 6 0
      applications/const.py
  2. 133 176
      updatePublishedMsgDaily.py

+ 6 - 0
applications/const.py

@@ -43,6 +43,12 @@ class updatePublishedMsgTaskConst:
     # 监测周期(秒)
     MONITOR_PERIOD = 60 * 60 * 24 * 7
 
+    # 新号抓文章周期
+    NEW_ACCOUNT_CRAWL_PERIOD = 60 * 60 * 24 * 30
+
+    # 订阅号,抓取失败失败率报警阈值
+    SUBSCRIBE_FAIL_RATE_THRESHOLD = 0.3
+
 
 class updateAccountReadRateTaskConst:
     """

+ 133 - 176
updatePublishedMsgDaily.py

@@ -2,19 +2,26 @@
 @author: luojunhui
 @description: update daily information into official articles v2
 """
-
-import time
 import json
+import time
 import traceback
-
 import urllib.parse
-from tqdm import tqdm
-from datetime import datetime
 from argparse import ArgumentParser
+from datetime import datetime
+from typing import Dict, List
 
-from applications import PQMySQL, WeixinSpider, Functions, log, bot, aiditApi, longArticlesMySQL, \
-    create_feishu_columns_sheet
+from pymysql.cursors import DictCursor
+from tqdm import tqdm
+
+from applications import aiditApi
+from applications import bot
+from applications import create_feishu_columns_sheet
+from applications import Functions
+from applications import log
+from applications import WeixinSpider
 from applications.const import updatePublishedMsgTaskConst
+from applications.db import DatabaseConnector
+from config import denet_config, long_articles_config, piaoquan_crawler_config
 
 ARTICLE_TABLE = "official_articles_v2"
 const = updatePublishedMsgTaskConst()
@@ -38,20 +45,23 @@ def generate_bot_columns():
     return columns
 
 
-def get_account_using_status():
+def get_account_status(aigc_db_client: DatabaseConnector(denet_config)) -> Dict:
     """
-    获取正在 using 的 ghid
+    获取账号的实验状态
     :return:
     """
-    sql = "SELECT gh_id FROM long_articles_publishing_accounts WHERE is_using = 1;"
-    gh_id_tuple = PQMySQL().select(sql)
-    gh_id_list = [
-        i[0] for i in gh_id_tuple
-    ]
-    return set(gh_id_list)
+    sql = f"""  
+            SELECT t1.account_id, t2.status
+            FROM wx_statistics_group_source_account t1
+            JOIN wx_statistics_group_source t2
+            ON t1.group_source_name = t2.account_source_name;
+            """
+    account_status_list = aigc_db_client.fetch(sql, cursor_type=DictCursor)
+    account_status_dict = {account['account_id']: account['status'] for account in account_status_list}
+    return account_status_dict
 
 
-def get_accounts():
+def get_accounts(aigc_db_client: DatabaseConnector(denet_config)) -> List[Dict]:
     """
     从 aigc 数据库中获取目前处于发布状态的账号
     :return:
@@ -62,21 +72,16 @@ def get_accounts():
     "account_type": line[4], # 订阅号 or 服务号
     "account_auth": line[5]
     """
-    using_account_set = get_account_using_status()
     account_list_with_out_using_status = aiditApi.get_publish_account_from_aigc()
-    # only_auto_reply_accounts_set = aiditApi.get_only_auto_reply_accounts()
-    account_list = []
-    for item in account_list_with_out_using_status:
-        # if item['account_id'] in only_auto_reply_accounts_set:
-        #     continue
-        if item['ghId'] in using_account_set:
-            item['using_status'] = 1
-        else:
-            item['using_status'] = 0
-        account_list.append(item)
-    subscription_account = [i for i in account_list if i['account_type'] in const.SUBSCRIBE_TYPE_SET]
-    server_account = [i for i in account_list if i['account_type'] == const.SERVICE_TYPE]
-    return subscription_account, server_account
+    account_status_dict = get_account_status(aigc_db_client)
+    account_list = [
+        {
+            **item,
+            'using_status': 0 if account_status_dict.get(item['account_id']) == '实验' else 1
+        }
+        for item in account_list_with_out_using_status
+    ]
+    return account_list
 
 
 def insert_each_msg(db_client, account_info, account_name, msg_list):
@@ -89,6 +94,7 @@ def insert_each_msg(db_client, account_info, account_name, msg_list):
     :return:
     """
     gh_id = account_info['ghId']
+    account_name = account_name['name']
     for info in msg_list:
         baseInfo = info.get("BaseInfo", {})
         appMsgId = info.get("AppMsg", {}).get("BaseInfo", {}).get("AppMsgId", None)
@@ -109,7 +115,7 @@ def insert_each_msg(db_client, account_info, account_name, msg_list):
                 ItemShowType = article.get("ItemShowType", None)
                 IsOriginal = article.get("IsOriginal", None)
                 ShowDesc = article.get("ShowDesc", None)
-                show_stat = Functions().show_desc_to_sta(ShowDesc)
+                show_stat = functions.show_desc_to_sta(ShowDesc)
                 ori_content = article.get("ori_content", None)
                 show_view_count = show_stat.get("show_view_count", 0)
                 show_like_count = show_stat.get("show_like_count", 0)
@@ -142,7 +148,7 @@ def insert_each_msg(db_client, account_info, account_name, msg_list):
                     show_pay_count,
                     wx_sn,
                     json.dumps(baseInfo, ensure_ascii=False),
-                    Functions().str_to_md5(title),
+                    functions.str_to_md5(title),
                     status
                 )
                 try:
@@ -152,7 +158,7 @@ def insert_each_msg(db_client, account_info, account_name, msg_list):
                         values
                         (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
                         """
-                    db_client.update(sql=insert_sql, params=info_tuple)
+                    db_client.save(sql=insert_sql, params=info_tuple)
                     log(
                         task="updatePublishedMsgDaily",
                         function="insert_each_msg",
@@ -169,8 +175,8 @@ def insert_each_msg(db_client, account_info, account_name, msg_list):
                         SET show_view_count = %s, show_like_count=%s
                         WHERE wx_sn = %s;
                         """
-                        db_client.update(sql=update_sql,
-                                         params=(show_view_count, show_like_count, wx_sn))
+                        db_client.save(sql=update_sql,
+                                       params=(show_view_count, show_like_count, wx_sn))
                         log(
                             task="updatePublishedMsgDaily",
                             function="insert_each_msg",
@@ -192,31 +198,29 @@ def insert_each_msg(db_client, account_info, account_name, msg_list):
                         continue
 
 
-def update_each_account(db_client, account_info, account_name, latest_update_time, cursor=None):
+def update_each_account(db_client: DatabaseConnector(piaoquan_crawler_config), account_info: Dict, latest_update_time: int, cursor=None):
     """
     更新每一个账号信息
     :param account_info:
-    :param account_name:
     :param cursor:
     :param latest_update_time: 最新更新时间
     :param db_client: 数据库连接信息
     :return: None
     """
     gh_id = account_info['ghId']
-    response = WeixinSpider().update_msg_list(ghId=gh_id, index=cursor)
+    response = spider.update_msg_list(ghId=gh_id, index=cursor)
     msg_list = response.get("data", {}).get("data", {})
     if msg_list:
         # do
         last_article_in_this_msg = msg_list[-1]
         last_time_stamp_in_this_msg = last_article_in_this_msg['AppMsg']['BaseInfo']['UpdateTime']
         last_url = last_article_in_this_msg['AppMsg']['DetailInfo'][0]['ContentUrl']
-        resdata = WeixinSpider().get_account_by_url(last_url)
+        resdata = spider.get_account_by_url(last_url)
         check_id = resdata['data'].get('data', {}).get('wx_gh')
         if check_id == gh_id:
             insert_each_msg(
                 db_client=db_client,
                 account_info=account_info,
-                account_name=account_name,
                 msg_list=msg_list
             )
             if last_time_stamp_in_this_msg > latest_update_time:
@@ -224,7 +228,6 @@ def update_each_account(db_client, account_info, account_name, latest_update_tim
                 return update_each_account(
                     db_client=db_client,
                     account_info=account_info,
-                    account_name=account_name,
                     latest_update_time=latest_update_time,
                     cursor=next_cursor
                 )
@@ -245,53 +248,39 @@ def update_each_account(db_client, account_info, account_name, latest_update_tim
         return
 
 
-def check_account_info(db_client, gh_id, account_name):
+def check_account_info(piaoquan_crawler_db_client: DatabaseConnector(piaoquan_crawler_config), gh_id: str) -> int:
     """
-    通过 gh_id查询视频信息
-    :param account_name:
-    :param db_client:
+    通过 gh_id查询账号信息的最新发布时间
+    :param piaoquan_crawler_db_client:
     :param gh_id:
     :return:
     """
     sql = f"""
-        SELECT accountName, updateTime 
+        SELECT MAX(publish_timestamp)
         FROM {ARTICLE_TABLE}
-        WHERE ghId = '{gh_id}' 
-        ORDER BY updateTime DESC LIMIT 1;
+        WHERE ghId = '{gh_id}';
         """
-    result = db_client.select(sql)
+    result = piaoquan_crawler_db_client.fetch(sql)
     if result:
-        old_account_name, update_time = result[0]
-        return {
-            "account_name": old_account_name,
-            "update_time": update_time,
-            "account_type": "history"
-        }
+        return result[0][0]
     else:
-        return {
-            "account_name": account_name,
-            "update_time": int(time.time()) - 30 * 24 * 60 * 60,
-            "account_type": "new"
-        }
+        # 新号,抓取周期定位抓取时刻往前推30天
+        return int(time.time()) - const.NEW_ACCOUNT_CRAWL_PERIOD
 
 
-def update_single_account(db_client, account_info):
+def update_single_account(piaoquan_crawler_db_client: DatabaseConnector(piaoquan_crawler_config), account_info: Dict):
     """
-
+    更新单个账号
+    :param piaoquan_crawler_db_client:
     :param account_info:
-    :param db_client:
     :return:
     """
     gh_id = account_info['ghId']
-    account_name = account_info['name']
-    account_detail = check_account_info(db_client, gh_id, account_name)
-    account_name = account_detail['account_name']
-    update_time = account_detail['update_time']
+    max_publish_time = check_account_info(piaoquan_crawler_db_client, gh_id)
     update_each_account(
-        db_client=db_client,
+        db_client=piaoquan_crawler_db_client,
         account_info=account_info,
-        account_name=account_name,
-        latest_update_time=update_time
+        latest_update_time=max_publish_time
     )
 
 
@@ -308,15 +297,12 @@ def check_single_account(db_client, account_item):
     today_date_time = datetime.strptime(today_str, "%Y-%m-%d")
     today_timestamp = today_date_time.timestamp()
     sql = f"""
-            SELECT updateTime
+            SELECT max(updateTime)
             FROM {ARTICLE_TABLE}
-            WHERE ghId = '{gh_id}'
-            ORDER BY updateTime
-            DESC
-            LIMIT 1;
+            WHERE ghId = '{gh_id}';
             """
     try:
-        latest_update_time = db_client.select(sql)[0][0]
+        latest_update_time = db_client.fetch(sql)[0][0]
         # 判断该账号当天发布的文章是否被收集
         if account_type in const.SUBSCRIBE_TYPE_SET:
             if int(latest_update_time) > int(today_timestamp):
@@ -329,37 +315,27 @@ def check_single_account(db_client, account_item):
             else:
                 return False
     except Exception as e:
-        print("updateTime Error -- {}".format(e))
+        print(e)
         return False
 
 
-def update_job():
+def update_job(piaoquan_crawler_db_client, aigc_db_client):
     """
     更新任务
     :return:
     """
-    try:
-        db_client = PQMySQL()
-    except Exception as e:
-        error_msg = traceback.format_exc()
-        bot(
-            title="更新文章任务连接数据库失败",
-            detail={
-                "error": e,
-                "msg": error_msg
-            }
-        )
-        return
-    sub_accounts, server_accounts = get_accounts()
-    s_count = 0
-    f_count = 0
-    for sub_item in tqdm(sub_accounts):
+    account_list = get_accounts(aigc_db_client=aigc_db_client)
+    # 订阅号
+    subscription_accounts = [i for i in account_list if i['account_type'] in const.SUBSCRIBE_TYPE_SET]
+    success_count = 0
+    fail_count = 0
+    for sub_item in tqdm(subscription_accounts):
         try:
-            update_single_account(db_client, sub_item)
-            s_count += 1
+            update_single_account(piaoquan_crawler_db_client, sub_item)
+            success_count += 1
             time.sleep(5)
         except Exception as e:
-            f_count += 1
+            fail_count += 1
             log(
                 task="updatePublishedMsgDaily",
                 function="update_job",
@@ -371,18 +347,18 @@ def update_job():
         function="update_job",
         message="订阅号更新完成",
         data={
-            "success": s_count,
-            "fail": f_count
+            "success": success_count,
+            "fail": fail_count
         }
     )
 
-    if f_count / (s_count + f_count) > 0.3:
+    if fail_count / (success_count + fail_count) > const.SUBSCRIBE_FAIL_RATE_THRESHOLD:
         bot(
             title="订阅号超过 30% 的账号更新失败",
             detail={
-                "success": s_count,
-                "fail": f_count,
-                "failRate": f_count / (s_count + f_count)
+                "success": success_count,
+                "fail": fail_count,
+                "failRate": fail_count / (success_count + fail_count)
             }
         )
     bot(
@@ -393,9 +369,11 @@ def update_job():
         },
         mention=False
     )
+    # 服务号
+    server_accounts = [i for i in account_list if i['account_type'] == const.SERVICE_TYPE]
     for sub_item in tqdm(server_accounts):
         try:
-            update_single_account(db_client, sub_item)
+            update_single_account(piaoquan_crawler_db_client, sub_item)
             time.sleep(5)
         except Exception as e:
             print(e)
@@ -409,36 +387,23 @@ def update_job():
     )
 
 
-def check_job():
+def check_job(piaoquan_crawler_db_client, aigc_db_client):
     """
     校验任务
     :return:
     """
-    try:
-        db_client = PQMySQL()
-    except Exception as e:
-        error_msg = traceback.format_exc()
-        bot(
-            title="校验更新文章任务连接数据库失败",
-            detail={
-                "job": "check_job",
-                "error": e,
-                "msg": error_msg
-            }
-        )
-        return
-    sub_accounts, server_accounts = get_accounts()
+    account_list = get_accounts(aigc_db_client=aigc_db_client)
+    # 订阅号
+    subscription_accounts = [i for i in account_list if i['account_type'] in const.SUBSCRIBE_TYPE_SET]
     fail_list = []
-    # account_list = sub_accounts + server_accounts
-    account_list = sub_accounts
     # check and rework if fail
-    for sub_item in tqdm(account_list):
-        res = check_single_account(db_client, sub_item)
+    for sub_item in tqdm(subscription_accounts):
+        res = check_single_account(piaoquan_crawler_db_client, sub_item)
         if not res:
-            update_single_account(db_client, sub_item)
+            update_single_account(piaoquan_crawler_db_client, sub_item)
     # check whether success and bot if fails
     for sub_item in tqdm(account_list):
-        res = check_single_account(db_client, sub_item)
+        res = check_single_account(piaoquan_crawler_db_client, sub_item)
         if not res:
             # 去掉三个不需要查看的字段
             sub_item.pop('account_type', None)
@@ -448,7 +413,7 @@ def check_job():
     if fail_list:
         try:
             bot(
-                title="日常报警, 存在账号更新失败",
+                title="更新当天发布文章,存在未更新的账号",
                 detail={
                     "columns": generate_bot_columns(),
                     "rows": fail_list
@@ -459,7 +424,7 @@ def check_job():
             print("Timeout Error: {}".format(e))
     else:
         bot(
-            title="校验完成通知",
+            title="更新当天发布文章,所有账号均更新成功",
             mention=False,
             detail={
                 "msg": "校验任务完成",
@@ -477,7 +442,7 @@ def get_articles(db_client):
     SELECT ContentUrl, wx_sn 
     FROM {ARTICLE_TABLE}
     WHERE publish_timestamp in {(const.DEFAULT_STATUS, const.REQUEST_FAIL_STATUS)};"""
-    response = db_client.select(sql)
+    response = db_client.fetch(sql)
     return response
 
 
@@ -491,7 +456,7 @@ def update_publish_timestamp(db_client, row):
     url = row[0]
     wx_sn = row[1]
     try:
-        response = WeixinSpider().get_article_text(url)
+        response = spider.get_article_text(url)
         response_code = response['code']
 
         if response_code == const.ARTICLE_DELETE_CODE:
@@ -528,7 +493,7 @@ def update_publish_timestamp(db_client, row):
             SET publish_timestamp = %s, root_source_id_list = %s
             WHERE wx_sn = %s;
         """
-    db_client.update(
+    db_client.save(
         sql=update_sql,
         params=(
             publish_timestamp_s,
@@ -541,39 +506,27 @@ def update_publish_timestamp(db_client, row):
         return None
 
 
-def get_article_detail_job():
+def get_article_detail_job(piaoquan_crawler_db_client):
     """
     获取发布文章详情
     :return:
     """
-    try:
-        db_client = PQMySQL()
-    except Exception as e:
-        error_msg = traceback.format_exc()
-        bot(
-            title="获取文章详情任务连接数据库失败",
-            detail={
-                "job": "get_article_detail_job",
-                "error": e,
-                "msg": error_msg
-            }
-        )
-        return
-    article_tuple = get_articles(db_client)
+
+    article_tuple = get_articles(piaoquan_crawler_db_client)
     for article in tqdm(article_tuple):
         try:
-            update_publish_timestamp(db_client=db_client, row=article)
+            update_publish_timestamp(db_client=piaoquan_crawler_db_client, row=article)
         except Exception as e:
             print(e)
             error_msg = traceback.format_exc()
             print(error_msg)
     # check 一遍存在请求失败-1 && 0 的文章
-    process_failed_articles = get_articles(db_client)
+    process_failed_articles = get_articles(piaoquan_crawler_db_client)
     fail_list = []
     if process_failed_articles:
         for article in tqdm(process_failed_articles):
             try:
-                res = update_publish_timestamp(db_client=db_client, row=article)
+                res = update_publish_timestamp(db_client=piaoquan_crawler_db_client, row=article)
                 fail_list.append({"wx_sn": res[1], "url": res[0]})
             except Exception as e:
                 print(e)
@@ -593,7 +546,7 @@ def get_article_detail_job():
         SET oav.publish_timestamp = vv.publish_timestamp
         WHERE oav.publish_timestamp <= %s;
     """
-    affected_rows = db_client.update(
+    piaoquan_crawler_db_client.save(
         sql=update_sql,
         params=(0, 0)
     )
@@ -604,7 +557,7 @@ def get_article_detail_job():
         SET publish_timestamp = updateTime
         WHERE publish_timestamp < %s;
     """
-    db_client.update(
+    piaoquan_crawler_db_client.save(
         sql=update_sql_2,
         params=0
     )
@@ -628,33 +581,18 @@ def whether_title_unsafe(db_client, title):
         FROM article_unsafe_title
         WHERE title_md5 = '{title_md5}';
     """
-    res = db_client.select(sql)
+    res = db_client.fetch(sql)
     if res:
         return True
     else:
         return False
 
 
-def monitor(run_date):
+def monitor(piaoquan_crawler_db_client, long_articles_db_client, run_date):
     """
     监控任务, 监测周期为7天,监测文章是否被违规,若监测到违规文章,则进行告警
     :return:
     """
-    try:
-        pq_client = PQMySQL()
-        lam_client = longArticlesMySQL()
-    except Exception as e:
-        error_msg = traceback.format_exc()
-        bot(
-            title="监控任务连接数据库失败",
-            detail={
-                "job": "monitor",
-                "error": str(e),
-                "msg": error_msg
-            }
-        )
-        return
-
     if not run_date:
         run_date = datetime.today().strftime("%Y-%m-%d")
 
@@ -664,13 +602,13 @@ def monitor(run_date):
         FROM {ARTICLE_TABLE}
         WHERE publish_timestamp >= {monitor_start_timestamp};
     """
-    article_list = pq_client.select(select_sql)
+    article_list = piaoquan_crawler_db_client.fetch(select_sql)
     for article in tqdm(article_list, desc="monitor article list"):
         gh_id = article[0]
         account_name = article[1]
         title = article[2]
         # 判断标题是否存在违规记录
-        if whether_title_unsafe(lam_client, title):
+        if whether_title_unsafe(long_articles_db_client, title):
             continue
         url = article[3]
         wx_sn = article[4]
@@ -726,27 +664,46 @@ def main():
     )
     args = parser.parse_args()
 
+    # 初始化数据库连接
+    try:
+        piaoquan_crawler_db_client = DatabaseConnector(piaoquan_crawler_config)
+        piaoquan_crawler_db_client.connect()
+        aigc_db_client = DatabaseConnector(denet_config)
+        aigc_db_client.connect()
+        long_articles_db_client = DatabaseConnector(long_articles_config)
+    except Exception as e:
+        error_msg = traceback.format_exc()
+        bot(
+            title="更新文章任务连接数据库失败",
+            detail={
+                "error": e,
+                "msg": error_msg
+            }
+        )
+        return
+
     if args.run_task:
         run_task = args.run_task
         match run_task:
             case "update":
-                update_job()
+                update_job(piaoquan_crawler_db_client=piaoquan_crawler_db_client, aigc_db_client=aigc_db_client)
             case "check":
-                check_job()
+                check_job(piaoquan_crawler_db_client=piaoquan_crawler_db_client, aigc_db_client=aigc_db_client)
             case "detail":
-                get_article_detail_job()
+                get_article_detail_job(piaoquan_crawler_db_client=piaoquan_crawler_db_client)
             case "monitor":
                 if args.run_date:
                     run_date = args.run_date
                 else:
                     run_date = None
-                monitor(run_date)
+                monitor(piaoquan_crawler_db_client=piaoquan_crawler_db_client,
+                        long_articles_db_client=long_articles_db_client, run_date=run_date)
             case _:
                 print("No such task, input update: update_job, check: check_job, detail: get_article_detail_job")
     else:
-        update_job()
-        check_job()
-        get_article_detail_job()
+        update_job(piaoquan_crawler_db_client=piaoquan_crawler_db_client, aigc_db_client=aigc_db_client)
+        check_job(piaoquan_crawler_db_client=piaoquan_crawler_db_client, aigc_db_client=aigc_db_client)
+        get_article_detail_job(piaoquan_crawler_db_client=piaoquan_crawler_db_client)
 
 
 if __name__ == '__main__':