Prechádzať zdrojové kódy

Merge branch 'feature/luojunhui/20260525-monitor-task-cop-cards-improve' of Server/LongArticleTaskServer into master

luojunhui 1 týždeň pred
rodič
commit
80027e54fe

+ 2 - 1
.gitignore

@@ -63,4 +63,5 @@ docs/_build/
 target/
 
 .claude
-.cursor
+.cursor
+scripts

+ 3 - 0
app/domains/llm_tasks/decode_article/_const.py

@@ -33,6 +33,9 @@ class DecodeArticleConst:
         VIDEO = 4  # 视频
         AUDIO = 5  # 音频
 
+    class TaskChannel:
+        ARTICLE = 1  # long_articles_decode_tasks.channel: 长文解构文章
+
     class Channel:
         XIAOHONGSHU = 1  # 小红书
         DOUYIN = 2  # 抖音

+ 8 - 4
app/domains/llm_tasks/decode_article/_mapper.py

@@ -18,17 +18,20 @@ class ArticlesDecodeTaskMapper(DecodeArticleConst):
         payload: str,
         remark: str = None,
         status: int = None,
+        channel: int = None,
     ) -> int:
+        ch = channel if channel is not None else self.TaskChannel.ARTICLE
         if status is not None:
             query = f"""
                 INSERT IGNORE INTO {TABLE}
-                    (source_id, config_id, source, payload, remark, status)
-                VALUES (%s, %s, %s, %s, %s, %s)
+                    (source_id, config_id, source, channel, payload, remark, status)
+                VALUES (%s, %s, %s, %s, %s, %s, %s)
             """
             params = (
                 source_id,
                 self.CONFIG_ID,
                 source,
+                ch,
                 payload,
                 remark,
                 status,
@@ -36,13 +39,14 @@ class ArticlesDecodeTaskMapper(DecodeArticleConst):
         else:
             query = f"""
                 INSERT IGNORE INTO {TABLE}
-                    (source_id, config_id, source, payload, remark)
-                VALUES (%s, %s, %s, %s, %s)
+                    (source_id, config_id, source, channel, payload, remark)
+                VALUES (%s, %s, %s, %s, %s, %s)
             """
             params = (
                 source_id,
                 self.CONFIG_ID,
                 source,
+                ch,
                 payload,
                 remark,
             )

+ 3 - 0
app/domains/llm_tasks/decode_video/decode_ad_videos.py

@@ -0,0 +1,3 @@
+"""
+解构广告素材信息
+"""

+ 10 - 0
app/domains/monitor_tasks/auto_reply_cards_monitor/_const.py

@@ -14,3 +14,13 @@ class AutoReplyCardsMonitorConst:
     # account_status
     VALID_STATUS = 1
     INVALID_STATUS = 0
+
+    # cover_status
+    class CoverStatus:
+        INIT = 0
+        SKIP = 4  # 非票圈视频,无需下载封面
+        SUCCESS = 2
+        FAILED = 99
+
+    # 封面下载失败率超过此阈值触发飞书报警
+    FEISHU_ALERT_COVER_FAIL_RATE = 0.5

+ 14 - 0
app/domains/monitor_tasks/auto_reply_cards_monitor/_mapper.py

@@ -161,6 +161,20 @@ class AutoReplyCardsMonitorMapper(AutoReplyCardsMonitorConst):
     async def store_extract_result(self, query, row_table):
         return await self.pool.async_save(query=query, params=row_table)
 
+    # 更新封面下载状态和封面信息
+    async def update_cover_info(
+        self, task_id, position, cover_id, oss_key, cover_status
+    ):
+        query = """
+            UPDATE cooperate_auto_reply_detail
+            SET card_cover_id = %s, card_cover = %s, cover_status = %s
+            WHERE task_id = %s AND position = %s
+        """
+        return await self.pool.async_save(
+            query=query,
+            params=(cover_id, oss_key, cover_status, task_id, position),
+        )
+
     # 从 growth 数据库获取账号信息,并且存储在 cooperate_accounts 表中
     async def fetch_cooperate_accounts(self, account_name):
         fetch_query = """

+ 32 - 5
app/domains/monitor_tasks/auto_reply_cards_monitor/_utils.py

@@ -137,7 +137,7 @@ class AutoReplyCardsMonitorUtils:
         return video_id, root_source_id
 
     @staticmethod
-    async def get_cover_url(aes_key, file_id):
+    async def get_cover_url(aes_key, file_id, log_service=None):
         url = "https://wechat-protocol.aiddit.com/xed/getCdnUrl"
         data = {"file_id": file_id, "aes_key": aes_key}
         headers = {
@@ -149,13 +149,40 @@ class AutoReplyCardsMonitorUtils:
                     url, headers=headers, data=json.dumps(data)
                 )
             if response and response.get("status") != 0:
-                print(
-                    f"[ERROR] get_cover_url API error: status={response.get('status')}, msg={response.get('msg')}"
-                )
+                api_status = response.get("status")
+                api_msg = response.get("msg", "")
+                if log_service:
+                    await log_service.log(
+                        contents={
+                            "task": "auto_reply_cards_monitor",
+                            "function": "get_cover_url",
+                            "status": "fail",
+                            "message": f"get_cover_url API error: status={api_status}, msg={api_msg}",
+                            "api_status": api_status,
+                            "api_msg": api_msg,
+                            "file_id": file_id,
+                        }
+                    )
+                else:
+                    print(
+                        f"[ERROR] get_cover_url API error: status={api_status}, msg={api_msg}"
+                    )
                 return None
             return response
         except Exception as e:
-            print(f"[ERROR] get_cover_url failed: {e}")
+            if log_service:
+                await log_service.log(
+                    contents={
+                        "task": "auto_reply_cards_monitor",
+                        "function": "get_cover_url",
+                        "status": "fail",
+                        "message": f"get_cover_url HTTP request failed: {e}",
+                        "exception": str(e),
+                        "file_id": file_id,
+                    }
+                )
+            else:
+                print(f"[ERROR] get_cover_url failed: {e}")
             return None
 
     @staticmethod

+ 176 - 76
app/domains/monitor_tasks/auto_reply_cards_monitor/entrance.py

@@ -1,10 +1,12 @@
 import asyncio
 import json
 import traceback
+import xml.etree.ElementTree as ET
 
 from tqdm import tqdm
 from urllib.parse import parse_qs, urlparse
 
+from app.infra.external import feishu_robot
 from app.infra.crawler.wechat import get_article_detail
 from app.infra.crawler.wechat import get_article_list_from_account
 
@@ -32,25 +34,57 @@ class AutoReplyCardsMonitor(AutoReplyCardsMonitorConst):
             config=config
         )
 
-    # 存储卡片信息
+    # 存储卡片信息(先落库,再异步下载封面)
     async def store_card(self, task_id, index, msg_type, xml_obj) -> bool:
-        page_path = xml_obj.get("page_path")
-        if not page_path:
-            await self.log_service.log(
-                contents={
-                    "task": "auto_reply_cards_monitor",
-                    "function": "store_card",
-                    "status": "fail",
-                    "trace_id": task_id,
-                    "message": f"store_card missing page_path, task_id={task_id}",
-                }
-            )
-            return False
-        video_id, root_source_id = self.tool.extract_page_path(page_path)
+        page_path = xml_obj.get("page_path", "")
+        video_id, root_source_id = (
+            self.tool.extract_page_path(page_path) if page_path else (None, None)
+        )
 
+        mini_program = xml_obj.get("mini_program", "")
+        need_cover = mini_program == "票圈视频"
+
+        # Phase 1: 直接落库
+        query = """
+            INSERT INTO cooperate_auto_reply_detail
+            (
+                task_id, position, msg_type, card_title, card_cover_id, card_cover,
+                video_id, root_source_id, mini_program_name, task_result, cover_status
+            ) VALUES
+            (
+                %s, %s, %s, %s, %s, %s,
+                %s, %s, %s, %s, %s
+            );
+        """
+        insert_row = (
+            task_id,
+            index,
+            msg_type,
+            xml_obj.get("title", ""),
+            "",
+            "",
+            video_id or "",
+            root_source_id or "",
+            mini_program,
+            json.dumps(xml_obj, ensure_ascii=False),
+            self.CoverStatus.INIT if need_cover else self.CoverStatus.SKIP,
+        )
+        await self.mapper.store_extract_result(query, insert_row)
+
+        # Phase 2: 仅票圈视频下载封面
+        if need_cover:
+            cover_ok = await self._try_download_cover(task_id, index, xml_obj)
+            return cover_ok
+        return True
+
+    async def _try_download_cover(self, task_id, index, xml_obj) -> bool:
+        """尝试下载并更新封面,成功返回 True,失败更新 cover_status=FAILED 并返回 False"""
         aes_key = xml_obj.get("aes_key")
         file_id = xml_obj.get("file_id")
         if not all([aes_key, file_id]):
+            await self.mapper.update_cover_info(
+                task_id, index, "", "", self.CoverStatus.FAILED
+            )
             await self.log_service.log(
                 contents={
                     "task": "auto_reply_cards_monitor",
@@ -62,8 +96,13 @@ class AutoReplyCardsMonitor(AutoReplyCardsMonitorConst):
             )
             return False
 
-        cover_obj = await self.tool.get_cover_url(aes_key, file_id)
+        cover_obj = await self.tool.get_cover_url(
+            aes_key, file_id, log_service=self.log_service
+        )
         if not cover_obj:
+            await self.mapper.update_cover_info(
+                task_id, index, "", "", self.CoverStatus.FAILED
+            )
             await self.log_service.log(
                 contents={
                     "task": "auto_reply_cards_monitor",
@@ -71,6 +110,8 @@ class AutoReplyCardsMonitor(AutoReplyCardsMonitorConst):
                     "status": "fail",
                     "trace_id": task_id,
                     "message": f"store_card get_cover_url failed, task_id={task_id}",
+                    "file_id": file_id,
+                    "aes_key": aes_key,
                 }
             )
             return False
@@ -78,7 +119,24 @@ class AutoReplyCardsMonitor(AutoReplyCardsMonitorConst):
         file_name = f"{task_id}_{index}.jpg"
         save_path = self.tool.download_cover(file_name, cover_obj)
         if save_path is None:
+            await self.mapper.update_cover_info(
+                task_id, index, "", "", self.CoverStatus.FAILED
+            )
+            await self.log_service.log(
+                contents={
+                    "task": "auto_reply_cards_monitor",
+                    "function": "store_card",
+                    "status": "fail",
+                    "trace_id": task_id,
+                    "message": f"store_card download_cover failed, task_id={task_id}",
+                }
+            )
             return False
+
+        page_path = xml_obj.get("page_path", "")
+        _, root_source_id = (
+            self.tool.extract_page_path(page_path) if page_path else (None, None)
+        )
         exist_covers = await self.mapper.fetch_exist_covers(root_source_id)
         if exist_covers:
             exist_cover_id, oss_key = self.tool.check_cover(save_path, exist_covers)
@@ -86,7 +144,6 @@ class AutoReplyCardsMonitor(AutoReplyCardsMonitorConst):
                 cover_id = exist_cover_id
                 self.tool.remove_local_cover(save_path)
             else:
-                # upload to oss
                 oss_key, cover_id = self.tool.upload_cover(file_name, save_path)
                 if oss_key is not None:
                     await self.mapper.save_cover(
@@ -96,6 +153,9 @@ class AutoReplyCardsMonitor(AutoReplyCardsMonitorConst):
                     )
                 else:
                     self.tool.remove_local_cover(save_path)
+                    await self.mapper.update_cover_info(
+                        task_id, index, "", "", self.CoverStatus.FAILED
+                    )
                     await self.log_service.log(
                         contents={
                             "task": "auto_reply_cards_monitor",
@@ -107,7 +167,6 @@ class AutoReplyCardsMonitor(AutoReplyCardsMonitorConst):
                     )
                     return False
         else:
-            # upload to oss
             oss_key, cover_id = self.tool.upload_cover(file_name, save_path)
             if oss_key is not None:
                 await self.mapper.save_cover(
@@ -115,6 +174,9 @@ class AutoReplyCardsMonitor(AutoReplyCardsMonitorConst):
                 )
             else:
                 self.tool.remove_local_cover(save_path)
+                await self.mapper.update_cover_info(
+                    task_id, index, "", "", self.CoverStatus.FAILED
+                )
                 await self.log_service.log(
                     contents={
                         "task": "auto_reply_cards_monitor",
@@ -126,30 +188,9 @@ class AutoReplyCardsMonitor(AutoReplyCardsMonitorConst):
                 )
                 return False
 
-        query = """
-            INSERT INTO cooperate_auto_reply_detail
-            (
-                task_id, position, msg_type, card_title, card_cover_id, card_cover,
-                video_id, root_source_id, mini_program_name, task_result
-            ) VALUES
-            (
-                %s, %s, %s, %s, %s, %s,
-                %s, %s, %s, %s
-            );
-        """
-        insert_row = (
-            task_id,
-            index,
-            msg_type,
-            xml_obj.get("title", ""),
-            cover_id,
-            oss_key,
-            video_id,
-            root_source_id,
-            xml_obj.get("mini_program", ""),
-            json.dumps(xml_obj, ensure_ascii=False),
+        await self.mapper.update_cover_info(
+            task_id, index, cover_id, oss_key, self.CoverStatus.SUCCESS
         )
-        await self.mapper.store_extract_result(query, insert_row)
         return True
 
     # 存储文章信息
@@ -602,7 +643,7 @@ class AutoReplyCardsMonitor(AutoReplyCardsMonitorConst):
                     }
                 )
 
-    # 解析单个xml
+    # 解析单个xml,返回 (total, parse_fail_items, cover_fail_items)
     async def extract_single_xml(self, task):
         task_id = task.get("task_id")
         result = task.get("result")
@@ -616,14 +657,14 @@ class AutoReplyCardsMonitor(AutoReplyCardsMonitorConst):
                     "message": "extract_single_xml missing task_id or result",
                 }
             )
-            return
+            return 0, [], []
 
         # acquire lock
         acquire_lock = await self.mapper.update_auto_reply_task_status(
             task_id, "extract", self.INIT_STATUS, self.PROCESSING_STATUS
         )
         if not acquire_lock:
-            return
+            return 0, [], []
 
         try:
             # parse xml
@@ -645,41 +686,39 @@ class AutoReplyCardsMonitor(AutoReplyCardsMonitorConst):
                 await self.mapper.update_auto_reply_task_status(
                     task_id, "extract", self.PROCESSING_STATUS, self.FAIL_STATUS
                 )
-                return
+                return 0, [], []
 
             index = 0
-            success_count = 0
-            fail_count = 0
+            parse_fail_items = []  # XML 结构异常项
+            cover_fail_items = []  # 封面下载失败项
+            total = 0
 
             for item in xml_list:
                 xml_obj_list = self.tool.extract_callback_xml(item)
                 if xml_obj_list:
                     for xml_obj in xml_obj_list:
                         index += 1
+                        total += 1
                         msg_type = xml_obj.get("msg_type", None)
                         try:
-                            result = False
                             match msg_type:
                                 case "33":
-                                    result = await self.store_card(
+                                    cover_ok = await self.store_card(
                                         task_id, index, msg_type, xml_obj
                                     )
+                                    if not cover_ok:
+                                        cover_fail_items.append(index)
 
                                 case "5":
-                                    result = await self.store_article(
+                                    await self.store_article(
                                         task_id, index, msg_type, xml_obj
                                     )
 
                                 case _:
                                     continue
 
-                            if result:
-                                success_count += 1
-                            else:
-                                fail_count += 1
-
                         except Exception as e:
-                            fail_count += 1
+                            cover_fail_items.append(index)
                             await self.log_service.log(
                                 contents={
                                     "task": "auto_reply_cards_monitor",
@@ -689,38 +728,30 @@ class AutoReplyCardsMonitor(AutoReplyCardsMonitorConst):
                                     "message": f"处理 xml_obj 异常, task_id={task_id}, index={index}, msg_type={msg_type}, error={e}",
                                 }
                             )
+                else:
+                    # extract_callback_xml 返回空:可能是不支持的 msg_type,也可能是 XML 解析异常
+                    try:
+                        ET.fromstring(item)
+                        # XML 合法但 msg_type 不在处理范围内,跳过
+                    except ET.ParseError:
+                        parse_fail_items.append(item[:200])
 
+                if xml_obj_list:
                     await asyncio.sleep(5)
 
-            # 根据成功/失败数量决定最终状态
-            if fail_count == 0 and success_count > 0:
-                final_status = self.SUCCESS_STATUS
-            elif success_count > 0 and fail_count > 0:
-                # 部分成功,记录警告但仍标记为成功
+            # 数据已落库,任务始终标记为成功
+            if total > 0:
                 final_status = self.SUCCESS_STATUS
                 await self.log_service.log(
                     contents={
                         "task": "auto_reply_cards_monitor",
                         "function": "extract_single_xml",
-                        "status": "warn",
-                        "trace_id": task_id,
-                        "message": f"任务部分成功, task_id={task_id}, success={success_count}, fail={fail_count}",
-                    }
-                )
-            elif success_count == 0 and fail_count > 0:
-                # 全部失败
-                final_status = self.FAIL_STATUS
-                await self.log_service.log(
-                    contents={
-                        "task": "auto_reply_cards_monitor",
-                        "function": "extract_single_xml",
-                        "status": "fail",
+                        "status": "info",
                         "trace_id": task_id,
-                        "message": f"任务全部失败, task_id={task_id}, fail={fail_count}",
+                        "message": f"任务完成, task_id={task_id}, total={total}, cover_fail={len(cover_fail_items)}, parse_fail={len(parse_fail_items)}",
                     }
                 )
             else:
-                # success_count == 0 and fail_count == 0,没有处理任何数据
                 final_status = self.SUCCESS_STATUS
                 await self.log_service.log(
                     contents={
@@ -736,6 +767,8 @@ class AutoReplyCardsMonitor(AutoReplyCardsMonitorConst):
                 task_id, "extract", self.PROCESSING_STATUS, final_status
             )
 
+            return total, parse_fail_items, cover_fail_items
+
         except Exception as e:
             print(traceback.format_exc())
             await self.log_service.log(
@@ -750,6 +783,63 @@ class AutoReplyCardsMonitor(AutoReplyCardsMonitorConst):
             await self.mapper.update_auto_reply_task_status(
                 task_id, "extract", self.PROCESSING_STATUS, self.FAIL_STATUS
             )
+            return 0, [], []
+
+    async def _alert_batch(self, batch_total, all_parse_fails, all_cover_fails):
+        """批量报警:聚合整批任务结果后统一发送"""
+        total = batch_total
+        parse_count = len(all_parse_fails)
+        cover_count = len(all_cover_fails)
+
+        # XML 解析失败 → 报警
+        if parse_count > 0:
+            detail = {
+                "fail_count": parse_count,
+                "samples": all_parse_fails[:3],
+            }
+            try:
+                await feishu_robot.bot(
+                    title="自动回复卡片 XML 解析失败,外部数据结构可能已变更",
+                    detail=detail,
+                    mention=True,
+                    env="long_articles_task",
+                )
+            except Exception as e:
+                await self.log_service.log(
+                    contents={
+                        "task": "auto_reply_cards_monitor",
+                        "function": "_alert_batch",
+                        "status": "fail",
+                        "message": f"XML解析失败飞书报警发送失败: {e}",
+                    }
+                )
+
+        # 封面下载失败率超阈值 → 报警
+        if total > 0:
+            fail_rate = cover_count / total
+            if fail_rate >= self.FEISHU_ALERT_COVER_FAIL_RATE:
+                detail = {
+                    "total": total,
+                    "fail_count": cover_count,
+                    "fail_rate": f"{fail_rate:.1%}",
+                    "fail_positions": all_cover_fails[:10],
+                }
+                try:
+                    await feishu_robot.bot(
+                        title="自动回复卡片封面下载失败率过高",
+                        detail=detail,
+                        mention=True,
+                        env="long_articles_task",
+                    )
+                except Exception as e:
+                    await self.log_service.log(
+                        contents={
+                            "task": "auto_reply_cards_monitor",
+                            "function": "_alert_batch",
+                            "status": "fail",
+                            "message": f"封面失败率飞书报警发送失败: {e}",
+                        }
+                    )
 
     # main function
     async def deal(self, task_name):
@@ -765,10 +855,20 @@ class AutoReplyCardsMonitor(AutoReplyCardsMonitorConst):
                 if not task_list:
                     return
 
+                batch_total = 0
+                all_parse_fails = []
+                all_cover_fails = []
+
                 for task in tqdm(task_list, desc="解析任务"):
-                    await self.extract_single_xml(task)
+                    total, parse_fails, cover_fails = await self.extract_single_xml(task)
+                    batch_total += total
+                    all_parse_fails.extend(parse_fails)
+                    all_cover_fails.extend(cover_fails)
                     await asyncio.sleep(10)
 
+                # 批量完成后统一报警
+                await self._alert_batch(batch_total, all_parse_fails, all_cover_fails)
+
             case _:
                 await self.log_service.log(
                     contents={

+ 1 - 1
app/infra/internal/piaoquan.py

@@ -50,7 +50,7 @@ async def publish_video_to_piaoquan(oss_path: str, uid: str, title: str) -> Dict
         "referer": "http://appspeed.piaoquantv.com",
         "token": "524a8bc871dbb0f4d4717895083172ab37c02d2f",
         "accept-language": "zh-CN,zh-Hans;q=0.9",
-        "Content-Type": "application/json",
+        "Content-Type": "application/x-www-form-urlencoded",
     }
     payload = {
         "deviceToken": "9ef064f2f7869b3fd67d6141f8a899175dddc91240971172f1f2a662ef891408",

+ 4 - 4
app/infra/shared/tools.py

@@ -49,8 +49,8 @@ def proxy():
     tunnel = "j685.kdltps.com:15818"
 
     # 用户名密码方式
-    username = "t14070979713487"
-    password = "hqwanfvy"
+    username = "t16899444538299"
+    password = "5w5ersso"
     proxies = {
         "http": "http://%(user)s:%(pwd)s@%(proxy)s/"
         % {"user": username, "pwd": password, "proxy": tunnel},
@@ -63,8 +63,8 @@ def proxy():
 def async_proxy():
     return {
         "url": "http://j685.kdltps.com:15818",
-        "username": "t14070979713487",
-        "password": "hqwanfvy",
+        "username": "t16899444538299",
+        "password": "5w5ersso",
     }