Ver Fonte

优化获取账号逻辑

luojunhui há 1 mês atrás
pai
commit
4eb58ec007

+ 1 - 1
applications/tasks/monitor_tasks/auto_reply_cards_monitor.py

@@ -392,7 +392,7 @@ class AutoReplyCardsMonitorMapper(AutoReplyCardsMonitorUtils):
             query=fetch_query, db_name="growth", params=(account_name,)
         )
         if not fetch_response:
-            return
+            return 0
 
         account_detail = fetch_response[0]
         save_query = """

+ 80 - 9
applications/tasks/monitor_tasks/cooperate_accounts_monitor.py

@@ -1,4 +1,7 @@
+import asyncio
 import json
+import random
+import traceback
 
 from tqdm import tqdm
 from datetime import datetime, timedelta
@@ -74,15 +77,26 @@ class CooperateAccountsMonitorTaskUtils(CooperateAccountsMonitorTaskConst):
         return parse_qs(query).get("sn", [None])[0]
 
 
-class CooperateAccountsMonitorTask(CooperateAccountsMonitorTaskUtils):
+class CooperateAccountsMonitorMapper(CooperateAccountsMonitorTaskUtils):
     def __init__(self, pool, log_client):
         self.pool = pool
         self.log_client = log_client
 
+    # 从 growth 数据库获取账号
+    async def fetch_monitor_accounts(self):
+        query = """
+            SELECT t2.name AS partner_name, t2.channel AS partner_id,
+                   t1.name AS account_name, t1.gh_id
+            FROM content_platform_gzh_account t1 JOIN content_platform_account t2
+            ON t1.create_account_id = t2.id 
+            WHERE t1.status = 1;
+        """
+        return await self.pool.async_fetch(query=query, db_name="growth")
+
     # 获取 gh_id 的兜底逻辑
     async def fetch_gh_id(self, account_name):
         query = """
-         SELECT gh_id FROM content_platform_gzh_account WHERE name = %s AND status = %s;
+            SELECT gh_id FROM content_platform_gzh_account WHERE name = %s AND status = %s;
         """
         fetch_response = await self.pool.async_fetch(
             query=query, db_name="growth", params=(account_name, self.VALID_STATUS)
@@ -98,6 +112,44 @@ class CooperateAccountsMonitorTask(CooperateAccountsMonitorTaskUtils):
             query=query, params=(new_status, wx_sn, ori_status)
         )
 
+    # 存储账号并且维护账号状态
+    async def save_account(self, account):
+        query = """
+            INSERT IGNORE INTO cooperate_accounts (partner_name, partner_id, account_name, gh_id)
+            VALUES (%s, %s, %s, %s);
+        """
+        return await self.pool.async_save(
+            query=query,
+            params=(
+                account["partner_name"],
+                account["partner_id"],
+                account["account_name"],
+                account["gh_id"],
+            ),
+        )
+
+    # 修改账号状态
+    async def update_account_status(self, gh_id, ori_status, new_status, remark):
+        query = """
+            UPDATE cooperate_accounts SET status = %s, remark = %s WHERE gh_id = %s AND status = %s;
+        """
+        return await self.pool.async_save(
+            query=query, params=(new_status, remark, gh_id, ori_status)
+        )
+
+    # 获取账号状态
+    async def get_account_status(self, gh_id):
+        query = """
+            SELECT status FROM cooperate_accounts WHERE gh_id = %s;
+        """
+        fetch_response = await self.pool.async_fetch(query=query, params=(gh_id,))
+        return fetch_response
+
+
+class CooperateAccountsMonitorTask(CooperateAccountsMonitorMapper):
+    def __init__(self, pool, log_client):
+        super().__init__(pool, log_client)
+
     # 更新文章详情
     async def set_article_detail(self, article):
         wx_sn = article["wx_sn"]
@@ -244,16 +296,25 @@ class CooperateAccountsMonitorTask(CooperateAccountsMonitorTaskUtils):
 
     # 存储单个账号
     async def store_single_accounts(self, account):
-        account_name = account.公众号名
-        gh_id = account.ghid
-        if not gh_id:
-            gh_id = await self.fetch_gh_id(account_name)
-
-        if not gh_id:
+        account_name = account["account_name"]
+        gh_id = account["gh_id"]
+        account_status = await self.get_account_status(gh_id)
+
+        if not account_status:
+            # 账号没存在长文数据库
+            affected_row = await self.save_account(account)
+            print(f"账号{account_name}存储到数据库")
+            if not affected_row:
+                return
+
+        account_using_status = account_status[0].get("status", None)
+        if not account_using_status:
+            print("账号违规")
             return
 
         # 只抓最新的文章
         crawl_response = await get_article_list_from_account(gh_id)
+        await asyncio.sleep(random.randint(1, 3))
         if not crawl_response:
             return
 
@@ -264,6 +325,12 @@ class CooperateAccountsMonitorTask(CooperateAccountsMonitorTaskUtils):
                 # 将文章存储到库中
                 await self.store_articles(gh_id, account_name, article_list)
 
+            case 25013:
+                msg = crawl_response.get("msg")
+                await self.update_account_status(
+                    gh_id, self.VALID_STATUS, self.INVALID_STATUS, msg
+                )
+
             case _:
                 print(crawl_response["msg"])
                 pass
@@ -281,13 +348,17 @@ class CooperateAccountsMonitorTask(CooperateAccountsMonitorTaskUtils):
     async def deal(self, task_name):
         match task_name:
             case "save_articles":
-                account_list = self.get_monitor_account_list()
+                account_list = await self.fetch_monitor_accounts()
                 for account in tqdm(account_list):
+                    print(f"开始处理账号:{account['account_name']}")
                     try:
                         await self.store_single_accounts(account)
 
                     except Exception as e:
                         print(f"获取账号文章失败--{account.公众号名}--{e}")
+                        print(traceback.format_exc())
+
+
 
             case "get_detail":
                 article_list = await self.get_article_list()