Sfoglia il codice sorgente

Merge branch 'feature/luojunhui/20260603-cooperate-auto-reply-improve' of Server/LongArticleTaskServer into master

luojunhui 1 settimana fa
parent
commit
7b6ff155e9

+ 3 - 0
app/domains/monitor_tasks/auto_reply_cards_monitor/_const.py

@@ -25,6 +25,9 @@ class AutoReplyCardsMonitorConst:
     # 封面下载失败率超过此阈值触发飞书报警
     FEISHU_ALERT_COVER_FAIL_RATE = 0.1
 
+    # blogger 接口返回码:账号违规/封禁
+    BLOGGER_ACCOUNT_BANNED_CODE = 25013
+
     # 任务优先级分档(AIGC 侧 ORDER BY priority DESC 消费)
     class TaskPriority:
         HIGH = 3

+ 13 - 7
app/domains/monitor_tasks/auto_reply_cards_monitor/_mapper.py

@@ -29,7 +29,7 @@ class AutoReplyCardsMonitorMapper(AutoReplyCardsMonitorConst):
         query = """
             SELECT partner_name, partner_id, gh_id, status, follow_status
             FROM cooperate_accounts
-            WHERE account_name = %s;
+            WHERE account_name = %s AND status = 1;
         """
         return await self.pool.async_fetch(query=query, params=(account_name,))
 
@@ -161,18 +161,23 @@ class AutoReplyCardsMonitorMapper(AutoReplyCardsMonitorConst):
     async def store_extract_result(self, query, row_table):
         return await self.pool.async_save(query=query, params=row_table)
 
-    # 更新封面下载状态和封面信息
+    # 更新封面下载状态和封面信息(失败时自增重试计数,成功时重置)
     async def update_cover_info(
         self, task_id, position, cover_id, oss_key, cover_status
     ):
         query = """
             UPDATE cooperate_auto_reply_detail
-            SET card_cover_id = %s, card_cover = %s, cover_status = %s
+            SET card_cover_id = %s, card_cover = %s, cover_status = %s,
+                cover_retry_cnt = CASE WHEN %s = %s THEN cover_retry_cnt + 1 ELSE 0 END
             WHERE task_id = %s AND position = %s
         """
         return await self.pool.async_save(
             query=query,
-            params=(cover_id, oss_key, cover_status, task_id, position),
+            params=(
+                cover_id, oss_key, cover_status,
+                cover_status, self.CoverStatus.FAILED,
+                task_id, position,
+            ),
         )
 
     # 从 growth 数据库获取账号信息,并且存储在 cooperate_accounts 表中
@@ -182,7 +187,7 @@ class AutoReplyCardsMonitorMapper(AutoReplyCardsMonitorConst):
                    t1.name AS account_name, t1.gh_id
             FROM content_platform_gzh_account t1 JOIN content_platform_account t2
             ON t1.create_account_id = t2.id
-            WHERE t1.name =  %s;
+            WHERE t1.name = %s AND t1.status = 1;
         """
         fetch_response = await self.pool.async_fetch(
             query=fetch_query, db_name="growth", params=(account_name,)
@@ -215,12 +220,13 @@ class AutoReplyCardsMonitorMapper(AutoReplyCardsMonitorConst):
         """
         return await self.pool.async_fetch(query=query, params=(root_source_id,))
 
-    # 获取封面下载失败的记录
+    # 获取封面下载失败的记录(最多重试3次)
     async def get_failed_cover_records(self):
         query = """
             SELECT task_id, position, task_result
             FROM cooperate_auto_reply_detail
-            WHERE cover_status IN (%s, %s);
+            WHERE cover_status IN (%s, %s)
+              AND cover_retry_cnt < 3;
         """
         return await self.pool.async_fetch(
             query=query, params=(self.CoverStatus.INIT, self.CoverStatus.FAILED)

+ 53 - 9
app/domains/monitor_tasks/auto_reply_cards_monitor/entrance.py

@@ -443,7 +443,7 @@ class AutoReplyCardsMonitor(AutoReplyCardsMonitorConst):
                             gh_id, self.INIT_STATUS, self.PROCESSING_STATUS
                         )
 
-            case 25013:
+            case self.BLOGGER_ACCOUNT_BANNED_CODE:
                 await self.mapper.set_account_as_invalid(gh_id)
 
             case _:
@@ -583,31 +583,75 @@ class AutoReplyCardsMonitor(AutoReplyCardsMonitorConst):
                     continue
 
                 account_detail = fetch_response[0]
-                status = account_detail.get("status")
-                if not status:
+                gh_id = account_detail.get("gh_id")
+                if not gh_id:
+                    await self.log_service.log(
+                        contents={
+                            "task": "auto_reply_cards_monitor",
+                            "function": "follow_gzh_task",
+                            "status": "fail",
+                            "message": f"账号缺少 gh_id: {account_name}",
+                        }
+                    )
+                    continue
+
+                # 调用 blogger 接口检查账号实时状态(违规/迁移)
+                blogger_response = await get_article_list_from_account(
+                    gh_id, is_cache=False
+                )
+                if blogger_response is None:
                     await self.log_service.log(
                         contents={
                             "task": "auto_reply_cards_monitor",
                             "function": "follow_gzh_task",
                             "status": "warn",
-                            "message": f"账号已经迁移或者封禁: {account_name}",
+                            "message": f"blogger 接口无响应, 跳过: {account_name}, gh_id={gh_id}",
                         }
                     )
                     continue
 
-                gh_id = account_detail.get("gh_id")
-                if not gh_id:
+                blogger_code = blogger_response.get("code")
+                if blogger_code != 0:
+                    blogger_msg = blogger_response.get("msg", "")
+                    if blogger_code == self.BLOGGER_ACCOUNT_BANNED_CODE:
+                        alert_title = f"自动回复卡片-账号违规: {account_name}"
+                    else:
+                        alert_title = f"自动回复卡片-账号异常(可能已迁移): {account_name}"
+
+                    await self.mapper.set_account_as_invalid(gh_id)
+                    try:
+                        await feishu_robot.bot(
+                            title=alert_title,
+                            detail={
+                                "account_name": account_name,
+                                "gh_id": gh_id,
+                                "code": blogger_code,
+                                "msg": blogger_msg,
+                            },
+                            mention=True,
+                            env="long_articles_task",
+                        )
+                    except Exception as e:
+                        await self.log_service.log(
+                            contents={
+                                "task": "auto_reply_cards_monitor",
+                                "function": "follow_gzh_task",
+                                "status": "fail",
+                                "message": f"飞书告警发送失败: {account_name}, gh_id={gh_id}, error={e}",
+                            }
+                        )
+
                     await self.log_service.log(
                         contents={
                             "task": "auto_reply_cards_monitor",
                             "function": "follow_gzh_task",
-                            "status": "fail",
-                            "message": f"账号缺少 gh_id: {account_name}",
+                            "status": "warn",
+                            "message": f"账号状态异常, 已标记无效并跳过: {account_name}, gh_id={gh_id}, code={blogger_code}, msg={blogger_msg}",
                         }
                     )
                     continue
 
-                # print(gh_id, account_name, uv, priority)
+                # print(gh_id, account_name, priority)
                 await self.create_auto_reply_single_account_task(
                     gh_id, account_name, priority
                 )

+ 3 - 3
app/infra/internal/aigc_decode_server.py

@@ -31,7 +31,7 @@ class AigcDecodeServer:
                 "posts": self._sanitize_images(posts),
             }
         }
-        async with AsyncHttpClient() as client:
+        async with AsyncHttpClient(timeout=180) as client:
             return await client.post(url, json=payload, headers=headers)
 
     async def query_decode_results(
@@ -45,7 +45,7 @@ class AigcDecodeServer:
         payload = {
             "params": {"configId": config_id, "channelContentIds": channel_content_ids}
         }
-        async with AsyncHttpClient() as client:
+        async with AsyncHttpClient(timeout=60) as client:
             return await client.post(url, json=payload, headers=headers)
 
     async def cancel_decode_tasks(
@@ -59,7 +59,7 @@ class AigcDecodeServer:
         payload = {
             "params": {"configId": config_id, "channelContentIds": channel_content_ids}
         }
-        async with AsyncHttpClient() as client:
+        async with AsyncHttpClient(timeout=30) as client:
             return await client.post(url, json=payload, headers=headers)