Procházet zdrojové kódy

Merge branch 'feature/luojunhui/20260121-get-fans-improve' of Server/LongArticleTaskServer into master

luojunhui před 1 měsícem
rodič
revize
788d6df5a7

+ 67 - 19
applications/tasks/monitor_tasks/auto_reply_cards_monitor.py

@@ -379,6 +379,36 @@ class AutoReplyCardsMonitorMapper(AutoReplyCardsMonitorUtils):
     async def store_extract_result(self, query, row_table):
         return await self.pool.async_save(query=query, params=row_table)
 
+    # 从 growth 数据库获取账号信息,并且存储在 cooperate_accounts 表中
+    async def fetch_cooperate_accounts(self, account_name):
+        fetch_query = """
+            SELECT t2.name AS partner_name, t2.channel AS partner_id,
+                   t1.name AS account_name, t1.gh_id
+            FROM content_platform_gzh_account t1 JOIN content_platform_account t2
+            ON t1.create_account_id = t2.id
+            WHERE t1.name =  %s;
+        """
+        fetch_response = await self.pool.async_fetch(
+            query=fetch_query, db_name="growth", params=(account_name,)
+        )
+        if not fetch_response:
+            return
+
+        account_detail = fetch_response[0]
+        save_query = """
+            INSERT INTO cooperate_accounts (partner_name, partner_id, account_name, gh_id)
+            VALUES (%s, %s, %s, %s);
+        """
+        return await self.pool.async_save(
+            query=save_query,
+            params=(
+                account_detail["partner_name"],
+                account_detail["partner_id"],
+                account_detail["account_name"],
+                account_detail["gh_id"],
+            ),
+        )
+
 
 class AutoReplyCardsMonitor(AutoReplyCardsMonitorMapper):
     def __init__(self, pool, log_client):
@@ -387,7 +417,9 @@ class AutoReplyCardsMonitor(AutoReplyCardsMonitorMapper):
     # 存储卡片信息
     async def store_card(self, task_id, index, msg_type, xml_obj):
         video_id, root_source_id = self.extract_page_path(xml_obj["page_path"])
-        cover_obj = await self.get_cover_url(xml_obj["aes_key"], xml_obj["file_size"], xml_obj["file_id"])
+        cover_obj = await self.get_cover_url(
+            xml_obj["aes_key"], xml_obj["file_size"], xml_obj["file_id"]
+        )
         cover_oss = self.download_and_upload_cover(task_id, index, cover_obj)
         query = """
             INSERT INTO cooperate_auto_reply_detail
@@ -422,7 +454,9 @@ class AutoReplyCardsMonitor(AutoReplyCardsMonitorMapper):
 
         fetch_fail_status = False
 
-        fetch_response = await get_article_detail(article_link=article_link, is_cache=False, is_count=True)
+        fetch_response = await get_article_detail(
+            article_link=article_link, is_cache=False, is_count=True
+        )
         if not fetch_response:
             fetch_fail_status = True
 
@@ -438,7 +472,16 @@ class AutoReplyCardsMonitor(AutoReplyCardsMonitorMapper):
                 (%s, %s, %s, %s, %s, %s, %s, %s);
             """
             remark = "获取文章详情失败"
-            insert_row = (task_id, index, msg_type, article_title, article_link, article_cover, article_desc, remark)
+            insert_row = (
+                task_id,
+                index,
+                msg_type,
+                article_title,
+                article_link,
+                article_cover,
+                article_desc,
+                remark,
+            )
             await self.store_extract_result(query, insert_row)
 
         else:
@@ -588,24 +631,29 @@ class AutoReplyCardsMonitor(AutoReplyCardsMonitorMapper):
             try:
                 fetch_response = await self.fetch_account_status(account.公众号名)
                 if not fetch_response:
-                    print("账号不存在", account)
-                    # todo 没有 gh_id, 暂时无法存储账号
-                    # affected_rows =await self.insert_accounts_task(account.公众号名, account.ghid)
-                    # if affected_rows:
-                    #     await self.create_follow_account_task(account.ghid)
-
-                else:
-                    account_detail = fetch_response[0]
-                    status = account_detail["status"]
-
-                    if not status:
-                        print("账号已经迁移或者封禁")
+                    affected_rows = await self.fetch_cooperate_accounts(
+                        account.公众号名
+                    )
+                    if affected_rows:
+                        fetch_response = await self.fetch_account_status(
+                            account.公众号名
+                        )
+
+                    else:
+                        print(f"系统中无账号,跳过: {account.公众号名}")
                         continue
 
-                    # 新逻辑,无需考虑账号是否关注
-                    await self.create_auto_reply_single_account_task(
-                        account_detail["gh_id"], account.公众号名
-                    )
+                account_detail = fetch_response[0]
+                status = account_detail["status"]
+
+                if not status:
+                    print("账号已经迁移或者封禁")
+                    continue
+
+                # 新逻辑,无需考虑账号是否关注
+                await self.create_auto_reply_single_account_task(
+                    account_detail["gh_id"], account.公众号名
+                )
 
             except Exception as e:
                 print(f"处理账号{account.公众号名}异常", e)