Przeglądaj źródła

db_connector improve

luojunhui 4 miesięcy temu
rodzic
commit
c8a6e07a05
1 zmienionych plików z 148 dodań i 149 usunięć
  1. 148 149
      updatePublishedMsgDaily.py

+ 148 - 149
updatePublishedMsgDaily.py

@@ -8,7 +8,7 @@ import traceback
 import urllib.parse
 from argparse import ArgumentParser
 from datetime import datetime
-from typing import Dict, List
+from typing import Dict, List, Tuple
 
 from pymysql.cursors import DictCursor
 from tqdm import tqdm
@@ -45,7 +45,7 @@ def generate_bot_columns():
     return columns
 
 
-def get_account_status(aigc_db_client: DatabaseConnector(denet_config)) -> Dict:
+def get_account_status(db_client: DatabaseConnector) -> Dict:
     """
     获取账号的实验状态
     :return:
@@ -56,12 +56,12 @@ def get_account_status(aigc_db_client: DatabaseConnector(denet_config)) -> Dict:
             JOIN wx_statistics_group_source t2
             ON t1.group_source_name = t2.account_source_name;
             """
-    account_status_list = aigc_db_client.fetch(sql, cursor_type=DictCursor)
+    account_status_list = db_client.fetch(sql, cursor_type=DictCursor)
     account_status_dict = {account['account_id']: account['status'] for account in account_status_list}
     return account_status_dict
 
 
-def get_accounts(aigc_db_client: DatabaseConnector(denet_config)) -> List[Dict]:
+def get_accounts(db_client: DatabaseConnector) -> List[Dict]:
     """
     从 aigc 数据库中获取目前处于发布状态的账号
     :return:
@@ -73,7 +73,7 @@ def get_accounts(aigc_db_client: DatabaseConnector(denet_config)) -> List[Dict]:
     "account_auth": line[5]
     """
     account_list_with_out_using_status = aiditApi.get_publish_account_from_aigc()
-    account_status_dict = get_account_status(aigc_db_client)
+    account_status_dict = get_account_status(db_client)
     account_list = [
         {
             **item,
@@ -84,17 +84,16 @@ def get_accounts(aigc_db_client: DatabaseConnector(denet_config)) -> List[Dict]:
     return account_list
 
 
-def insert_each_msg(db_client, account_info, account_name, msg_list):
+def insert_each_msg(db_client: DatabaseConnector, account_info: Dict, msg_list: List[Dict]) -> None:
     """
     把消息数据更新到数据库中
     :param account_info:
     :param db_client:
-    :param account_name:
     :param msg_list:
     :return:
     """
     gh_id = account_info['ghId']
-    account_name = account_name['name']
+    account_name = account_info['name']
     for info in msg_list:
         baseInfo = info.get("BaseInfo", {})
         appMsgId = info.get("AppMsg", {}).get("BaseInfo", {}).get("AppMsgId", None)
@@ -158,7 +157,7 @@ def insert_each_msg(db_client, account_info, account_name, msg_list):
                         values
                         (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
                         """
-                    db_client.save(sql=insert_sql, params=info_tuple)
+                    db_client.save(query=insert_sql, params=info_tuple)
                     log(
                         task="updatePublishedMsgDaily",
                         function="insert_each_msg",
@@ -175,7 +174,7 @@ def insert_each_msg(db_client, account_info, account_name, msg_list):
                         SET show_view_count = %s, show_like_count=%s
                         WHERE wx_sn = %s;
                         """
-                        db_client.save(sql=update_sql,
+                        db_client.save(query=update_sql,
                                        params=(show_view_count, show_like_count, wx_sn))
                         log(
                             task="updatePublishedMsgDaily",
@@ -198,7 +197,7 @@ def insert_each_msg(db_client, account_info, account_name, msg_list):
                         continue
 
 
-def update_each_account(db_client: DatabaseConnector(piaoquan_crawler_config), account_info: Dict, latest_update_time: int, cursor=None):
+def update_each_account(db_client: DatabaseConnector, account_info: Dict, latest_update_time: int, cursor=None):
     """
     更新每一个账号信息
     :param account_info:
@@ -209,7 +208,7 @@ def update_each_account(db_client: DatabaseConnector(piaoquan_crawler_config), a
     """
     gh_id = account_info['ghId']
     response = spider.update_msg_list(ghId=gh_id, index=cursor)
-    msg_list = response.get("data", {}).get("data", {})
+    msg_list = response.get("data", {}).get("data", [])
     if msg_list:
         # do
         last_article_in_this_msg = msg_list[-1]
@@ -248,10 +247,10 @@ def update_each_account(db_client: DatabaseConnector(piaoquan_crawler_config), a
         return
 
 
-def check_account_info(piaoquan_crawler_db_client: DatabaseConnector(piaoquan_crawler_config), gh_id: str) -> int:
+def check_account_info(db_client: DatabaseConnector, gh_id: str) -> int:
     """
     通过 gh_id查询账号信息的最新发布时间
-    :param piaoquan_crawler_db_client:
+    :param db_client:
     :param gh_id:
     :return:
     """
@@ -260,7 +259,7 @@ def check_account_info(piaoquan_crawler_db_client: DatabaseConnector(piaoquan_cr
         FROM {ARTICLE_TABLE}
         WHERE ghId = '{gh_id}';
         """
-    result = piaoquan_crawler_db_client.fetch(sql)
+    result = db_client.fetch(sql)
     if result:
         return result[0][0]
     else:
@@ -268,23 +267,23 @@ def check_account_info(piaoquan_crawler_db_client: DatabaseConnector(piaoquan_cr
         return int(time.time()) - const.NEW_ACCOUNT_CRAWL_PERIOD
 
 
-def update_single_account(piaoquan_crawler_db_client: DatabaseConnector(piaoquan_crawler_config), account_info: Dict):
+def update_single_account(db_client: DatabaseConnector, account_info: Dict):
     """
     更新单个账号
-    :param piaoquan_crawler_db_client:
+    :param db_client:
     :param account_info:
     :return:
     """
     gh_id = account_info['ghId']
-    max_publish_time = check_account_info(piaoquan_crawler_db_client, gh_id)
+    max_publish_time = check_account_info(db_client, gh_id)
     update_each_account(
-        db_client=piaoquan_crawler_db_client,
+        db_client=db_client,
         account_info=account_info,
         latest_update_time=max_publish_time
     )
 
 
-def check_single_account(db_client, account_item):
+def check_single_account(db_client: DatabaseConnector, account_item: Dict) -> bool:
     """
     校验每个账号是否更新
     :param db_client:
@@ -319,121 +318,7 @@ def check_single_account(db_client, account_item):
         return False
 
 
-def update_job(piaoquan_crawler_db_client, aigc_db_client):
-    """
-    更新任务
-    :return:
-    """
-    account_list = get_accounts(aigc_db_client=aigc_db_client)
-    # 订阅号
-    subscription_accounts = [i for i in account_list if i['account_type'] in const.SUBSCRIBE_TYPE_SET]
-    success_count = 0
-    fail_count = 0
-    for sub_item in tqdm(subscription_accounts):
-        try:
-            update_single_account(piaoquan_crawler_db_client, sub_item)
-            success_count += 1
-            time.sleep(5)
-        except Exception as e:
-            fail_count += 1
-            log(
-                task="updatePublishedMsgDaily",
-                function="update_job",
-                message="单个账号文章更新失败, 报错信息是: {}".format(e),
-                status="fail",
-            )
-    log(
-        task="updatePublishedMsgDaily",
-        function="update_job",
-        message="订阅号更新完成",
-        data={
-            "success": success_count,
-            "fail": fail_count
-        }
-    )
-
-    if fail_count / (success_count + fail_count) > const.SUBSCRIBE_FAIL_RATE_THRESHOLD:
-        bot(
-            title="订阅号超过 30% 的账号更新失败",
-            detail={
-                "success": success_count,
-                "fail": fail_count,
-                "failRate": fail_count / (success_count + fail_count)
-            }
-        )
-    bot(
-        title="更新每日发布文章任务完成通知",
-        detail={
-            "msg": "订阅号更新完成",
-            "finish_time": datetime.today().__str__()
-        },
-        mention=False
-    )
-    # 服务号
-    server_accounts = [i for i in account_list if i['account_type'] == const.SERVICE_TYPE]
-    for sub_item in tqdm(server_accounts):
-        try:
-            update_single_account(piaoquan_crawler_db_client, sub_item)
-            time.sleep(5)
-        except Exception as e:
-            print(e)
-    bot(
-        title="更新每日发布文章任务完成通知",
-        detail={
-            "msg": "服务号更新完成",
-            "finish_time": datetime.today().__str__()
-        },
-        mention=False
-    )
-
-
-def check_job(piaoquan_crawler_db_client, aigc_db_client):
-    """
-    校验任务
-    :return:
-    """
-    account_list = get_accounts(aigc_db_client=aigc_db_client)
-    # 订阅号
-    subscription_accounts = [i for i in account_list if i['account_type'] in const.SUBSCRIBE_TYPE_SET]
-    fail_list = []
-    # check and rework if fail
-    for sub_item in tqdm(subscription_accounts):
-        res = check_single_account(piaoquan_crawler_db_client, sub_item)
-        if not res:
-            update_single_account(piaoquan_crawler_db_client, sub_item)
-    # check whether success and bot if fails
-    for sub_item in tqdm(account_list):
-        res = check_single_account(piaoquan_crawler_db_client, sub_item)
-        if not res:
-            # 去掉三个不需要查看的字段
-            sub_item.pop('account_type', None)
-            sub_item.pop('account_auth', None)
-            sub_item.pop('account_id', None)
-            fail_list.append(sub_item)
-    if fail_list:
-        try:
-            bot(
-                title="更新当天发布文章,存在未更新的账号",
-                detail={
-                    "columns": generate_bot_columns(),
-                    "rows": fail_list
-                },
-                table=True
-            )
-        except Exception as e:
-            print("Timeout Error: {}".format(e))
-    else:
-        bot(
-            title="更新当天发布文章,所有账号均更新成功",
-            mention=False,
-            detail={
-                "msg": "校验任务完成",
-                "finish_time": datetime.today().__str__()
-            }
-        )
-
-
-def get_articles(db_client):
+def get_articles(db_client: DatabaseConnector):
     """
 
     :return:
@@ -446,7 +331,7 @@ def get_articles(db_client):
     return response
 
 
-def update_publish_timestamp(db_client, row):
+def update_publish_timestamp(db_client: DatabaseConnector, row: Tuple):
     """
     更新发布时间戳 && minigram 信息
     :param db_client:
@@ -494,7 +379,7 @@ def update_publish_timestamp(db_client, row):
             WHERE wx_sn = %s;
         """
     db_client.save(
-        sql=update_sql,
+        query=update_sql,
         params=(
             publish_timestamp_s,
             json.dumps(root_source_id_list, ensure_ascii=False),
@@ -506,27 +391,27 @@ def update_publish_timestamp(db_client, row):
         return None
 
 
-def get_article_detail_job(piaoquan_crawler_db_client):
+def get_article_detail_job(db_client: DatabaseConnector):
     """
     获取发布文章详情
     :return:
     """
 
-    article_tuple = get_articles(piaoquan_crawler_db_client)
+    article_tuple = get_articles(db_client)
     for article in tqdm(article_tuple):
         try:
-            update_publish_timestamp(db_client=piaoquan_crawler_db_client, row=article)
+            update_publish_timestamp(db_client=db_client, row=article)
         except Exception as e:
             print(e)
             error_msg = traceback.format_exc()
             print(error_msg)
     # check 一遍存在请求失败-1 && 0 的文章
-    process_failed_articles = get_articles(piaoquan_crawler_db_client)
+    process_failed_articles = get_articles(db_client)
     fail_list = []
     if process_failed_articles:
         for article in tqdm(process_failed_articles):
             try:
-                res = update_publish_timestamp(db_client=piaoquan_crawler_db_client, row=article)
+                res = update_publish_timestamp(db_client=db_client, row=article)
                 fail_list.append({"wx_sn": res[1], "url": res[0]})
             except Exception as e:
                 print(e)
@@ -546,8 +431,8 @@ def get_article_detail_job(piaoquan_crawler_db_client):
         SET oav.publish_timestamp = vv.publish_timestamp
         WHERE oav.publish_timestamp <= %s;
     """
-    piaoquan_crawler_db_client.save(
-        sql=update_sql,
+    db_client.save(
+        query=update_sql,
         params=(0, 0)
     )
 
@@ -557,8 +442,8 @@ def get_article_detail_job(piaoquan_crawler_db_client):
         SET publish_timestamp = updateTime
         WHERE publish_timestamp < %s;
     """
-    piaoquan_crawler_db_client.save(
-        sql=update_sql_2,
+    db_client.save(
+        query=update_sql_2,
         params=0
     )
     if fail_list:
@@ -568,7 +453,7 @@ def get_article_detail_job(piaoquan_crawler_db_client):
         )
 
 
-def whether_title_unsafe(db_client, title):
+def whether_title_unsafe(db_client: DatabaseConnector, title: str):
     """
     检查文章标题是否已经存在违规记录
     :param db_client:
@@ -588,6 +473,120 @@ def whether_title_unsafe(db_client, title):
         return False
 
 
+def update_job(piaoquan_crawler_db_client, aigc_db_client):
+    """
+    更新任务
+    :return:
+    """
+    account_list = get_accounts(db_client=aigc_db_client)
+    # 订阅号
+    subscription_accounts = [i for i in account_list if i['account_type'] in const.SUBSCRIBE_TYPE_SET]
+    success_count = 0
+    fail_count = 0
+    for sub_item in tqdm(subscription_accounts):
+        try:
+            update_single_account(piaoquan_crawler_db_client, sub_item)
+            success_count += 1
+            time.sleep(5)
+        except Exception as e:
+            fail_count += 1
+            log(
+                task="updatePublishedMsgDaily",
+                function="update_job",
+                message="单个账号文章更新失败, 报错信息是: {}".format(e),
+                status="fail",
+            )
+    log(
+        task="updatePublishedMsgDaily",
+        function="update_job",
+        message="订阅号更新完成",
+        data={
+            "success": success_count,
+            "fail": fail_count
+        }
+    )
+
+    if fail_count / (success_count + fail_count) > const.SUBSCRIBE_FAIL_RATE_THRESHOLD:
+        bot(
+            title="订阅号超过 30% 的账号更新失败",
+            detail={
+                "success": success_count,
+                "fail": fail_count,
+                "failRate": fail_count / (success_count + fail_count)
+            }
+        )
+    bot(
+        title="更新每日发布文章任务完成通知",
+        detail={
+            "msg": "订阅号更新完成",
+            "finish_time": datetime.today().__str__()
+        },
+        mention=False
+    )
+    # 服务号
+    server_accounts = [i for i in account_list if i['account_type'] == const.SERVICE_TYPE]
+    for sub_item in tqdm(server_accounts):
+        try:
+            update_single_account(piaoquan_crawler_db_client, sub_item)
+            time.sleep(5)
+        except Exception as e:
+            print(e)
+    bot(
+        title="更新每日发布文章任务完成通知",
+        detail={
+            "msg": "服务号更新完成",
+            "finish_time": datetime.today().__str__()
+        },
+        mention=False
+    )
+
+
+def check_job(piaoquan_crawler_db_client, aigc_db_client):
+    """
+    校验任务
+    :return:
+    """
+    account_list = get_accounts(db_client=aigc_db_client)
+    # 订阅号
+    subscription_accounts = [i for i in account_list if i['account_type'] in const.SUBSCRIBE_TYPE_SET]
+    fail_list = []
+    # check and rework if fail
+    for sub_item in tqdm(subscription_accounts):
+        res = check_single_account(piaoquan_crawler_db_client, sub_item)
+        if not res:
+            update_single_account(piaoquan_crawler_db_client, sub_item)
+    # check whether success and bot if fails
+    for sub_item in tqdm(account_list):
+        res = check_single_account(piaoquan_crawler_db_client, sub_item)
+        if not res:
+            # 去掉三个不需要查看的字段
+            sub_item.pop('account_type', None)
+            sub_item.pop('account_auth', None)
+            sub_item.pop('account_id', None)
+            fail_list.append(sub_item)
+    if fail_list:
+        try:
+            bot(
+                title="更新当天发布文章,存在未更新的账号",
+                detail={
+                    "columns": generate_bot_columns(),
+                    "rows": fail_list
+                },
+                table=True
+            )
+        except Exception as e:
+            print("Timeout Error: {}".format(e))
+    else:
+        bot(
+            title="更新当天发布文章,所有账号均更新成功",
+            mention=False,
+            detail={
+                "msg": "校验任务完成",
+                "finish_time": datetime.today().__str__()
+            }
+        )
+
+
 def monitor(piaoquan_crawler_db_client, long_articles_db_client, run_date):
     """
     监控任务, 监测周期为7天,监测文章是否被违规,若监测到违规文章,则进行告警
@@ -690,7 +689,7 @@ def main():
             case "check":
                 check_job(piaoquan_crawler_db_client=piaoquan_crawler_db_client, aigc_db_client=aigc_db_client)
             case "detail":
-                get_article_detail_job(piaoquan_crawler_db_client=piaoquan_crawler_db_client)
+                get_article_detail_job(db_client=piaoquan_crawler_db_client)
             case "monitor":
                 if args.run_date:
                     run_date = args.run_date
@@ -703,7 +702,7 @@ def main():
     else:
         update_job(piaoquan_crawler_db_client=piaoquan_crawler_db_client, aigc_db_client=aigc_db_client)
         check_job(piaoquan_crawler_db_client=piaoquan_crawler_db_client, aigc_db_client=aigc_db_client)
-        get_article_detail_job(piaoquan_crawler_db_client=piaoquan_crawler_db_client)
+        get_article_detail_job(db_client=piaoquan_crawler_db_client)
 
 
 if __name__ == '__main__':