Procházet zdrojové kódy

添加图片链接过滤功能

luojunhui před 2 dny
rodič
revize
e6f4bf6b3b

+ 3 - 0
app/domains/llm_tasks/decode_article/_const.py

@@ -33,6 +33,9 @@ class DecodeArticleConst:
         VIDEO = 4  # 视频
         AUDIO = 5  # 音频
 
+    class TaskChannel:
+        ARTICLE = 1  # long_articles_decode_tasks.channel: 长文解构文章
+
     class Channel:
         XIAOHONGSHU = 1  # 小红书
         DOUYIN = 2  # 抖音

+ 8 - 4
app/domains/llm_tasks/decode_article/_mapper.py

@@ -18,17 +18,20 @@ class ArticlesDecodeTaskMapper(DecodeArticleConst):
         payload: str,
         remark: str = None,
         status: int = None,
+        channel: int = None,
     ) -> int:
+        ch = channel if channel is not None else self.TaskChannel.ARTICLE
         if status is not None:
             query = f"""
                 INSERT IGNORE INTO {TABLE}
-                    (source_id, config_id, source, payload, remark, status)
-                VALUES (%s, %s, %s, %s, %s, %s)
+                    (source_id, config_id, source, channel, payload, remark, status)
+                VALUES (%s, %s, %s, %s, %s, %s, %s)
             """
             params = (
                 source_id,
                 self.CONFIG_ID,
                 source,
+                ch,
                 payload,
                 remark,
                 status,
@@ -36,13 +39,14 @@ class ArticlesDecodeTaskMapper(DecodeArticleConst):
         else:
             query = f"""
                 INSERT IGNORE INTO {TABLE}
-                    (source_id, config_id, source, payload, remark)
-                VALUES (%s, %s, %s, %s, %s)
+                    (source_id, config_id, source, channel, payload, remark)
+                VALUES (%s, %s, %s, %s, %s, %s)
             """
             params = (
                 source_id,
                 self.CONFIG_ID,
                 source,
+                ch,
                 payload,
                 remark,
             )

+ 9 - 0
app/domains/monitor_tasks/auto_reply_cards_monitor/_const.py

@@ -14,3 +14,12 @@ class AutoReplyCardsMonitorConst:
     # account_status
     VALID_STATUS = 1
     INVALID_STATUS = 0
+
+    # cover_status
+    class CoverStatus:
+        INIT = 0
+        SUCCESS = 2
+        FAILED = 99
+
+    # 封面下载失败率超过此阈值触发飞书报警
+    FEISHU_ALERT_COVER_FAIL_RATE = 0.5

+ 14 - 0
app/domains/monitor_tasks/auto_reply_cards_monitor/_mapper.py

@@ -161,6 +161,20 @@ class AutoReplyCardsMonitorMapper(AutoReplyCardsMonitorConst):
     async def store_extract_result(self, query, row_table):
         return await self.pool.async_save(query=query, params=row_table)
 
+    # 更新封面下载状态和封面信息
+    async def update_cover_info(
+        self, task_id, position, cover_id, oss_key, cover_status
+    ):
+        query = """
+            UPDATE cooperate_auto_reply_detail
+            SET card_cover_id = %s, card_cover = %s, cover_status = %s
+            WHERE task_id = %s AND position = %s
+        """
+        return await self.pool.async_save(
+            query=query,
+            params=(cover_id, oss_key, cover_status, task_id, position),
+        )
+
     # 从 growth 数据库获取账号信息,并且存储在 cooperate_accounts 表中
     async def fetch_cooperate_accounts(self, account_name):
         fetch_query = """

+ 32 - 5
app/domains/monitor_tasks/auto_reply_cards_monitor/_utils.py

@@ -137,7 +137,7 @@ class AutoReplyCardsMonitorUtils:
         return video_id, root_source_id
 
     @staticmethod
-    async def get_cover_url(aes_key, file_id):
+    async def get_cover_url(aes_key, file_id, log_service=None):
         url = "https://wechat-protocol.aiddit.com/xed/getCdnUrl"
         data = {"file_id": file_id, "aes_key": aes_key}
         headers = {
@@ -149,13 +149,40 @@ class AutoReplyCardsMonitorUtils:
                     url, headers=headers, data=json.dumps(data)
                 )
             if response and response.get("status") != 0:
-                print(
-                    f"[ERROR] get_cover_url API error: status={response.get('status')}, msg={response.get('msg')}"
-                )
+                api_status = response.get("status")
+                api_msg = response.get("msg", "")
+                if log_service:
+                    await log_service.log(
+                        contents={
+                            "task": "auto_reply_cards_monitor",
+                            "function": "get_cover_url",
+                            "status": "fail",
+                            "message": f"get_cover_url API error: status={api_status}, msg={api_msg}",
+                            "api_status": api_status,
+                            "api_msg": api_msg,
+                            "file_id": file_id,
+                        }
+                    )
+                else:
+                    print(
+                        f"[ERROR] get_cover_url API error: status={api_status}, msg={api_msg}"
+                    )
                 return None
             return response
         except Exception as e:
-            print(f"[ERROR] get_cover_url failed: {e}")
+            if log_service:
+                await log_service.log(
+                    contents={
+                        "task": "auto_reply_cards_monitor",
+                        "function": "get_cover_url",
+                        "status": "fail",
+                        "message": f"get_cover_url HTTP request failed: {e}",
+                        "exception": str(e),
+                        "file_id": file_id,
+                    }
+                )
+            else:
+                print(f"[ERROR] get_cover_url failed: {e}")
             return None
 
     @staticmethod

+ 152 - 71
app/domains/monitor_tasks/auto_reply_cards_monitor/entrance.py

@@ -1,10 +1,12 @@
 import asyncio
 import json
 import traceback
+import xml.etree.ElementTree as ET
 
 from tqdm import tqdm
 from urllib.parse import parse_qs, urlparse
 
+from app.infra.external import feishu_robot
 from app.infra.crawler.wechat import get_article_detail
 from app.infra.crawler.wechat import get_article_list_from_account
 
@@ -32,25 +34,52 @@ class AutoReplyCardsMonitor(AutoReplyCardsMonitorConst):
             config=config
         )
 
-    # 存储卡片信息
+    # 存储卡片信息(先落库,再异步下载封面)
     async def store_card(self, task_id, index, msg_type, xml_obj) -> bool:
-        page_path = xml_obj.get("page_path")
-        if not page_path:
-            await self.log_service.log(
-                contents={
-                    "task": "auto_reply_cards_monitor",
-                    "function": "store_card",
-                    "status": "fail",
-                    "trace_id": task_id,
-                    "message": f"store_card missing page_path, task_id={task_id}",
-                }
-            )
-            return False
-        video_id, root_source_id = self.tool.extract_page_path(page_path)
+        page_path = xml_obj.get("page_path", "")
+        video_id, root_source_id = (
+            self.tool.extract_page_path(page_path) if page_path else (None, None)
+        )
 
+        # Phase 1: 直接落库,封面信息暂空
+        query = """
+            INSERT INTO cooperate_auto_reply_detail
+            (
+                task_id, position, msg_type, card_title, card_cover_id, card_cover,
+                video_id, root_source_id, mini_program_name, task_result, cover_status
+            ) VALUES
+            (
+                %s, %s, %s, %s, %s, %s,
+                %s, %s, %s, %s, %s
+            );
+        """
+        insert_row = (
+            task_id,
+            index,
+            msg_type,
+            xml_obj.get("title", ""),
+            "",
+            "",
+            video_id or "",
+            root_source_id or "",
+            xml_obj.get("mini_program", ""),
+            json.dumps(xml_obj, ensure_ascii=False),
+            self.CoverStatus.INIT,
+        )
+        await self.mapper.store_extract_result(query, insert_row)
+
+        # Phase 2: 异步下载封面(best effort,不影响落库结果)
+        cover_ok = await self._try_download_cover(task_id, index, xml_obj)
+        return cover_ok
+
+    async def _try_download_cover(self, task_id, index, xml_obj) -> bool:
+        """尝试下载并更新封面,成功返回 True,失败更新 cover_status=FAILED 并返回 False"""
         aes_key = xml_obj.get("aes_key")
         file_id = xml_obj.get("file_id")
         if not all([aes_key, file_id]):
+            await self.mapper.update_cover_info(
+                task_id, index, "", "", self.CoverStatus.FAILED
+            )
             await self.log_service.log(
                 contents={
                     "task": "auto_reply_cards_monitor",
@@ -62,8 +91,13 @@ class AutoReplyCardsMonitor(AutoReplyCardsMonitorConst):
             )
             return False
 
-        cover_obj = await self.tool.get_cover_url(aes_key, file_id)
+        cover_obj = await self.tool.get_cover_url(
+            aes_key, file_id, log_service=self.log_service
+        )
         if not cover_obj:
+            await self.mapper.update_cover_info(
+                task_id, index, "", "", self.CoverStatus.FAILED
+            )
             await self.log_service.log(
                 contents={
                     "task": "auto_reply_cards_monitor",
@@ -71,6 +105,8 @@ class AutoReplyCardsMonitor(AutoReplyCardsMonitorConst):
                     "status": "fail",
                     "trace_id": task_id,
                     "message": f"store_card get_cover_url failed, task_id={task_id}",
+                    "file_id": file_id,
+                    "aes_key": aes_key,
                 }
             )
             return False
@@ -78,7 +114,15 @@ class AutoReplyCardsMonitor(AutoReplyCardsMonitorConst):
         file_name = f"{task_id}_{index}.jpg"
         save_path = self.tool.download_cover(file_name, cover_obj)
         if save_path is None:
+            await self.mapper.update_cover_info(
+                task_id, index, "", "", self.CoverStatus.FAILED
+            )
             return False
+
+        page_path = xml_obj.get("page_path", "")
+        _, root_source_id = (
+            self.tool.extract_page_path(page_path) if page_path else (None, None)
+        )
         exist_covers = await self.mapper.fetch_exist_covers(root_source_id)
         if exist_covers:
             exist_cover_id, oss_key = self.tool.check_cover(save_path, exist_covers)
@@ -86,7 +130,6 @@ class AutoReplyCardsMonitor(AutoReplyCardsMonitorConst):
                 cover_id = exist_cover_id
                 self.tool.remove_local_cover(save_path)
             else:
-                # upload to oss
                 oss_key, cover_id = self.tool.upload_cover(file_name, save_path)
                 if oss_key is not None:
                     await self.mapper.save_cover(
@@ -96,6 +139,9 @@ class AutoReplyCardsMonitor(AutoReplyCardsMonitorConst):
                     )
                 else:
                     self.tool.remove_local_cover(save_path)
+                    await self.mapper.update_cover_info(
+                        task_id, index, "", "", self.CoverStatus.FAILED
+                    )
                     await self.log_service.log(
                         contents={
                             "task": "auto_reply_cards_monitor",
@@ -107,7 +153,6 @@ class AutoReplyCardsMonitor(AutoReplyCardsMonitorConst):
                     )
                     return False
         else:
-            # upload to oss
             oss_key, cover_id = self.tool.upload_cover(file_name, save_path)
             if oss_key is not None:
                 await self.mapper.save_cover(
@@ -115,6 +160,9 @@ class AutoReplyCardsMonitor(AutoReplyCardsMonitorConst):
                 )
             else:
                 self.tool.remove_local_cover(save_path)
+                await self.mapper.update_cover_info(
+                    task_id, index, "", "", self.CoverStatus.FAILED
+                )
                 await self.log_service.log(
                     contents={
                         "task": "auto_reply_cards_monitor",
@@ -126,30 +174,9 @@ class AutoReplyCardsMonitor(AutoReplyCardsMonitorConst):
                 )
                 return False
 
-        query = """
-            INSERT INTO cooperate_auto_reply_detail
-            (
-                task_id, position, msg_type, card_title, card_cover_id, card_cover,
-                video_id, root_source_id, mini_program_name, task_result
-            ) VALUES
-            (
-                %s, %s, %s, %s, %s, %s,
-                %s, %s, %s, %s
-            );
-        """
-        insert_row = (
-            task_id,
-            index,
-            msg_type,
-            xml_obj.get("title", ""),
-            cover_id,
-            oss_key,
-            video_id,
-            root_source_id,
-            xml_obj.get("mini_program", ""),
-            json.dumps(xml_obj, ensure_ascii=False),
+        await self.mapper.update_cover_info(
+            task_id, index, cover_id, oss_key, self.CoverStatus.SUCCESS
         )
-        await self.mapper.store_extract_result(query, insert_row)
         return True
 
     # 存储文章信息
@@ -648,38 +675,36 @@ class AutoReplyCardsMonitor(AutoReplyCardsMonitorConst):
                 return
 
             index = 0
-            success_count = 0
-            fail_count = 0
+            parse_fail_items = []  # XML 结构异常项
+            cover_fail_items = []  # 封面下载失败项
+            total = 0
 
             for item in xml_list:
                 xml_obj_list = self.tool.extract_callback_xml(item)
                 if xml_obj_list:
                     for xml_obj in xml_obj_list:
                         index += 1
+                        total += 1
                         msg_type = xml_obj.get("msg_type", None)
                         try:
-                            result = False
                             match msg_type:
                                 case "33":
-                                    result = await self.store_card(
+                                    cover_ok = await self.store_card(
                                         task_id, index, msg_type, xml_obj
                                     )
+                                    if not cover_ok:
+                                        cover_fail_items.append(index)
 
                                 case "5":
-                                    result = await self.store_article(
+                                    await self.store_article(
                                         task_id, index, msg_type, xml_obj
                                     )
 
                                 case _:
                                     continue
 
-                            if result:
-                                success_count += 1
-                            else:
-                                fail_count += 1
-
                         except Exception as e:
-                            fail_count += 1
+                            cover_fail_items.append(index)
                             await self.log_service.log(
                                 contents={
                                     "task": "auto_reply_cards_monitor",
@@ -689,38 +714,30 @@ class AutoReplyCardsMonitor(AutoReplyCardsMonitorConst):
                                     "message": f"处理 xml_obj 异常, task_id={task_id}, index={index}, msg_type={msg_type}, error={e}",
                                 }
                             )
+                else:
+                    # extract_callback_xml 返回空:可能是不支持的 msg_type,也可能是 XML 解析异常
+                    try:
+                        ET.fromstring(item)
+                        # XML 合法但 msg_type 不在处理范围内,跳过
+                    except ET.ParseError:
+                        parse_fail_items.append(item[:200])
 
+                if xml_obj_list:
                     await asyncio.sleep(5)
 
-            # 根据成功/失败数量决定最终状态
-            if fail_count == 0 and success_count > 0:
+            # 数据已落库,任务始终标记为成功
+            if total > 0:
                 final_status = self.SUCCESS_STATUS
-            elif success_count > 0 and fail_count > 0:
-                # 部分成功,记录警告但仍标记为成功
-                final_status = self.SUCCESS_STATUS
-                await self.log_service.log(
-                    contents={
-                        "task": "auto_reply_cards_monitor",
-                        "function": "extract_single_xml",
-                        "status": "warn",
-                        "trace_id": task_id,
-                        "message": f"任务部分成功, task_id={task_id}, success={success_count}, fail={fail_count}",
-                    }
-                )
-            elif success_count == 0 and fail_count > 0:
-                # 全部失败
-                final_status = self.FAIL_STATUS
                 await self.log_service.log(
                     contents={
                         "task": "auto_reply_cards_monitor",
                         "function": "extract_single_xml",
-                        "status": "fail",
+                        "status": "info",
                         "trace_id": task_id,
-                        "message": f"任务全部失败, task_id={task_id}, fail={fail_count}",
+                        "message": f"任务完成, task_id={task_id}, total={total}, cover_fail={len(cover_fail_items)}, parse_fail={len(parse_fail_items)}",
                     }
                 )
             else:
-                # success_count == 0 and fail_count == 0,没有处理任何数据
                 final_status = self.SUCCESS_STATUS
                 await self.log_service.log(
                     contents={
@@ -736,6 +753,18 @@ class AutoReplyCardsMonitor(AutoReplyCardsMonitorConst):
                 task_id, "extract", self.PROCESSING_STATUS, final_status
             )
 
+            # 飞书报警:XML 结构异常(外部数据结构变化)
+            if parse_fail_items:
+                await self._alert_xml_parse_failure(task_id, parse_fail_items)
+
+            # 飞书报警:封面下载失败率超阈值
+            if total > 0:
+                cover_fail_rate = len(cover_fail_items) / total
+                if cover_fail_rate >= self.FEISHU_ALERT_COVER_FAIL_RATE:
+                    await self._alert_cover_failure(
+                        task_id, total, cover_fail_items, cover_fail_rate
+                    )
+
         except Exception as e:
             print(traceback.format_exc())
             await self.log_service.log(
@@ -751,6 +780,58 @@ class AutoReplyCardsMonitor(AutoReplyCardsMonitorConst):
                 task_id, "extract", self.PROCESSING_STATUS, self.FAIL_STATUS
             )
 
+    async def _alert_xml_parse_failure(self, task_id, parse_fail_items):
+        """XML 解析失败报警:外部数据结构可能已变更"""
+        detail = {
+            "task_id": task_id,
+            "fail_count": len(parse_fail_items),
+            "samples": parse_fail_items[:3],
+        }
+        try:
+            await feishu_robot.bot(
+                title="⚠️ 自动回复卡片 XML 解析失败,外部数据结构可能已变更",
+                detail=detail,
+                mention=True,
+                env="prod",
+            )
+        except Exception as e:
+            await self.log_service.log(
+                contents={
+                    "task": "auto_reply_cards_monitor",
+                    "function": "_alert_xml_parse_failure",
+                    "status": "fail",
+                    "trace_id": task_id,
+                    "message": f"飞书报警发送失败: {e}",
+                }
+            )
+
+    async def _alert_cover_failure(self, task_id, total, fail_items, fail_rate):
+        """封面下载失败率过高报警"""
+        detail = {
+            "task_id": task_id,
+            "total": total,
+            "fail_count": len(fail_items),
+            "fail_rate": f"{fail_rate:.1%}",
+            "fail_positions": fail_items[:10],
+        }
+        try:
+            await feishu_robot.bot(
+                title="⚠️ 自动回复卡片封面下载失败率过高",
+                detail=detail,
+                mention=True,
+                env="prod",
+            )
+        except Exception as e:
+            await self.log_service.log(
+                contents={
+                    "task": "auto_reply_cards_monitor",
+                    "function": "_alert_cover_failure",
+                    "status": "fail",
+                    "trace_id": task_id,
+                    "message": f"飞书报警发送失败: {e}",
+                }
+            )
+
     # main function
     async def deal(self, task_name):
         match task_name: