Procházet zdrojové kódy

Merge branch 'master' into feature/luojunhui/20260508-aigc-decode-article

merge master
.
luojunhui před 2 dny
rodič
revize
682ce25871

+ 11 - 8
app/domains/monitor_tasks/auto_reply_cards_monitor/_mapper.py

@@ -117,8 +117,8 @@ class AutoReplyCardsMonitorMapper(AutoReplyCardsMonitorConst):
                     query=extract_query, params=(new_status, task_id, ori_status)
                 )
             case _:
-                print("status_type_error")
-                return None
+                print(f"status_type_error: {status_type}")
+                return 0
 
     # 获取正在自动回复卡片的任务 id
     async def fetch_auto_replying_tasks(self):
@@ -176,18 +176,21 @@ class AutoReplyCardsMonitorMapper(AutoReplyCardsMonitorConst):
             return 0
 
         account_detail = fetch_response[0]
+        partner_name = account_detail.get("partner_name")
+        partner_id = account_detail.get("partner_id")
+        account_name_db = account_detail.get("account_name")
+        gh_id = account_detail.get("gh_id")
+        if not all([partner_name, partner_id, account_name_db, gh_id]):
+            print(f"[ERROR] fetch_cooperate_accounts missing fields: {account_detail}")
+            return 0
+
         save_query = """
             INSERT INTO cooperate_accounts (partner_name, partner_id, account_name, gh_id)
             VALUES (%s, %s, %s, %s);
         """
         return await self.pool.async_save(
             query=save_query,
-            params=(
-                account_detail["partner_name"],
-                account_detail["partner_id"],
-                account_detail["account_name"],
-                account_detail["gh_id"],
-            ),
+            params=(partner_name, partner_id, account_name_db, gh_id),
         )
 
     # 通过 root_source_id 获取已经下载过的卡片封面 id

+ 76 - 15
app/domains/monitor_tasks/auto_reply_cards_monitor/_utils.py

@@ -137,7 +137,33 @@ class AutoReplyCardsMonitorUtils:
         return video_id, root_source_id
 
     @staticmethod
-    async def get_cover_url(aes_key, total_size, file_id):
+    async def get_cover_url(aes_key, file_id):
+        url = "https://wechat-protocol.aiddit.com/xed/getCdnUrl"
+        data = {
+            "file_id": file_id,
+            "aes_key": aes_key
+        }
+        headers = {
+            "Content-Type": "application/json",
+        }
+        try:
+            async with AsyncHttpClient() as client:
+                response = await client.post(
+                    url, headers=headers, data=json.dumps(data)
+                )
+            if response and response.get("status") != 0:
+                print(f"[ERROR] get_cover_url API error: status={response.get('status')}, msg={response.get('msg')}")
+                return None
+            return response
+        except Exception as e:
+            print(f"[ERROR] get_cover_url failed: {e}")
+            return None
+
+    @staticmethod
+    async def get_cover_url_old(aes_key, total_size, file_id):
+        """
+        gewe API 暂时失效
+        """
         url = "http://api.geweapi.com/gewe/v2/api/message/downloadCdn"
         data = {
             "appId": "wx_anFlUnezoUynU3SKcqTWk",
@@ -151,10 +177,15 @@ class AutoReplyCardsMonitorUtils:
             "X-GEWE-TOKEN": "d3fb918f-0f36-4769-b095-410181614231",
             "Content-Type": "application/json",
         }
-        async with AsyncHttpClient() as client:
-            response = await client.post(url, headers=headers, data=json.dumps(data))
-
-        return response
+        try:
+            async with AsyncHttpClient() as client:
+                response = await client.post(
+                    url, headers=headers, data=json.dumps(data)
+                )
+            return response
+        except Exception as e:
+            print(f"[ERROR] get_cover_url failed: {e}")
+            return None
 
     @staticmethod
     async def get_sample_url(recent_articles):
@@ -172,7 +203,6 @@ class AutoReplyCardsMonitorUtils:
     # 获取检测的账号 list
     @staticmethod
     def get_monitor_account_list():
-        # dt = (datetime.today() - timedelta(days=1)).strftime("%Y%m%d")
         week_ago = (datetime.today() - timedelta(days=7)).strftime("%Y-%m-%d %H:%M:%S")
         query = f"""
             SELECT  公众号名, ghid, count(DISTINCT mid) AS uv
@@ -187,18 +217,31 @@ class AutoReplyCardsMonitorUtils:
             ORDER BY uv DESC
             ;
         """
-        result = fetch_from_odps(query)
+        try:
+            result = fetch_from_odps(query)
+        except Exception as e:
+            print(f"[ERROR] fetch_from_odps failed: {e}")
+            return []
         return result
 
     @staticmethod
     def download_cover(file_name, cover_obj, timeout=10):
         try:
-            cover_url = cover_obj["data"]["fileUrl"]
+            cover_url = cover_obj["oss_object"]["cdn_url"]
         except (KeyError, TypeError):
             print(f"[WARN] Invalid cover_obj structure: {cover_obj}")
             return None
 
-        save_dir = os.path.join(os.getcwd(), "static")
+        save_dir = os.path.join(
+            os.path.dirname(
+                os.path.dirname(
+                    os.path.dirname(
+                        os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
+                    )
+                )
+            ),
+            "static",
+        )
         save_path = os.path.join(save_dir, file_name)
 
         os.makedirs(save_dir, exist_ok=True)
@@ -225,7 +268,13 @@ class AutoReplyCardsMonitorUtils:
             return None, None
         oss_dir = "auto_reply_cards_cover"
         image_file = ImagePath(path=save_path, path_type="filepath")
-        image_md5 = self.image_tool.get_image_md5(image_file)
+        try:
+            image_md5 = self.image_tool.get_image_md5(image_file)
+        except Exception as e:
+            print(f"[ERROR] get_image_md5 failed: {e}")
+            self.remove_local_cover(save_path)
+            return None, None
+
         oss_key = None
         try:
             full_key = f"{oss_dir}/{file_name}"
@@ -233,6 +282,7 @@ class AutoReplyCardsMonitorUtils:
             oss_key = full_key
         except Exception as e:
             print(f"[ERROR] Upload to OSS failed: {e}")
+            self.remove_local_cover(save_path)
             return None, image_md5
 
         try:
@@ -256,10 +306,21 @@ class AutoReplyCardsMonitorUtils:
     def check_cover(self, save_path, exist_covers):
         img1 = ImagePath(path=save_path, path_type="filepath")
         for exist_cover in exist_covers:
-            exist_oss = exist_cover["oss_path"]
-            img2 = ImagePath(path=exist_oss)
-            cover_id = exist_cover["cover_id"]
-            if self.image_tool.is_same_image(img1, img2):
-                return cover_id, exist_oss
+            exist_oss = exist_cover.get("oss_path")
+            if not exist_oss:
+                continue
+            try:
+                img2 = ImagePath(path=exist_oss)
+            except Exception as e:
+                print(f"[WARN] Invalid oss_path in cover record: {e}")
+                continue
+
+            cover_id = exist_cover.get("cover_id")
+            try:
+                if self.image_tool.is_same_image(img1, img2):
+                    return cover_id, exist_oss
+            except Exception as e:
+                print(f"[WARN] is_same_image failed: {e}")
+                continue
 
         return False, None

+ 448 - 90
app/domains/monitor_tasks/auto_reply_cards_monitor/entrance.py

@@ -24,6 +24,7 @@ class AutoReplyCardsMonitor(AutoReplyCardsMonitorConst):
         log_service: LogService,
         config: GlobalConfigSettings,
     ):
+        self.log_service: LogService = log_service
         self.mapper: AutoReplyCardsMonitorMapper = AutoReplyCardsMonitorMapper(
             pool=pool, log_service=log_service
         )
@@ -32,15 +33,52 @@ class AutoReplyCardsMonitor(AutoReplyCardsMonitorConst):
         )
 
     # 存储卡片信息
-    async def store_card(self, task_id, index, msg_type, xml_obj):
-        video_id, root_source_id = self.tool.extract_page_path(xml_obj["page_path"])
-        cover_obj = await self.tool.get_cover_url(
-            xml_obj["aes_key"], xml_obj["file_size"], xml_obj["file_id"]
-        )
+    async def store_card(self, task_id, index, msg_type, xml_obj) -> bool:
+        page_path = xml_obj.get("page_path")
+        if not page_path:
+            await self.log_service.log(
+                contents={
+                    "task": "auto_reply_cards_monitor",
+                    "function": "store_card",
+                    "status": "fail",
+                    "trace_id": task_id,
+                    "message": f"store_card missing page_path, task_id={task_id}",
+                }
+            )
+            return False
+        video_id, root_source_id = self.tool.extract_page_path(page_path)
+
+        aes_key = xml_obj.get("aes_key")
+        file_id = xml_obj.get("file_id")
+        if not all([aes_key, file_id]):
+            await self.log_service.log(
+                contents={
+                    "task": "auto_reply_cards_monitor",
+                    "function": "store_card",
+                    "status": "fail",
+                    "trace_id": task_id,
+                    "message": f"store_card missing cover params, task_id={task_id}",
+                }
+            )
+            return False
+
+        cover_obj = await self.tool.get_cover_url(aes_key, file_id)
+        if not cover_obj:
+            await self.log_service.log(
+                contents={
+                    "task": "auto_reply_cards_monitor",
+                    "function": "store_card",
+                    "status": "fail",
+                    "trace_id": task_id,
+                    "message": f"store_card get_cover_url failed, task_id={task_id}",
+                }
+            )
+            return False
+
         file_name = f"{task_id}_{index}.jpg"
         save_path = self.tool.download_cover(file_name, cover_obj)
         if save_path is None:
-            return
+            return False
         exist_covers = await self.mapper.fetch_exist_covers(root_source_id)
         if exist_covers:
             exist_cover_id, oss_key = self.tool.check_cover(save_path, exist_covers)
@@ -58,7 +96,16 @@ class AutoReplyCardsMonitor(AutoReplyCardsMonitorConst):
                     )
                 else:
                     self.tool.remove_local_cover(save_path)
-                    return
+                    await self.log_service.log(
+                        contents={
+                            "task": "auto_reply_cards_monitor",
+                            "function": "store_card",
+                            "status": "fail",
+                            "trace_id": task_id,
+                            "message": f"store_card upload_cover failed, task_id={task_id}",
+                        }
+                    )
+                    return False
         else:
             # upload to oss
             oss_key, cover_id = self.tool.upload_cover(file_name, save_path)
@@ -68,14 +115,23 @@ class AutoReplyCardsMonitor(AutoReplyCardsMonitorConst):
                 )
             else:
                 self.tool.remove_local_cover(save_path)
-                return
+                await self.log_service.log(
+                    contents={
+                        "task": "auto_reply_cards_monitor",
+                        "function": "store_card",
+                        "status": "fail",
+                        "trace_id": task_id,
+                        "message": f"store_card upload_cover failed, task_id={task_id}",
+                    }
+                )
+                return False
 
         query = """
             INSERT INTO cooperate_auto_reply_detail
             (
-                task_id, position, msg_type, card_title, card_cover_id, card_cover, 
+                task_id, position, msg_type, card_title, card_cover_id, card_cover,
                 video_id, root_source_id, mini_program_name, task_result
-            ) VALUES 
+            ) VALUES
             (
                 %s, %s, %s, %s, %s, %s,
                 %s, %s, %s, %s
@@ -85,34 +141,39 @@ class AutoReplyCardsMonitor(AutoReplyCardsMonitorConst):
             task_id,
             index,
             msg_type,
-            xml_obj["title"],
+            xml_obj.get("title", ""),
             cover_id,
             oss_key,
             video_id,
             root_source_id,
-            xml_obj["mini_program"],
+            xml_obj.get("mini_program", ""),
             json.dumps(xml_obj, ensure_ascii=False),
         )
         await self.mapper.store_extract_result(query, insert_row)
+        return True
 
     # 存储文章信息
-    async def store_article(self, task_id, index, msg_type, xml_obj):
+    async def store_article(self, task_id, index, msg_type, xml_obj) -> bool:
         article_title = xml_obj.get("title")
         article_link = xml_obj.get("url")
         article_cover = xml_obj.get("cover_url")
         article_desc = xml_obj.get("desc")
 
         fetch_fail_status = False
+        fetch_fail_reason = "获取文章详情失败"
 
         fetch_response = await get_article_detail(
             article_link=article_link, is_cache=False, is_count=True
         )
         if not fetch_response:
             fetch_fail_status = True
+            fetch_fail_reason = "获取文章详情失败: 响应为空"
 
         if not fetch_fail_status:
-            if fetch_response.get("code") != 0:
+            code = fetch_response.get("code")
+            if code is None or code != 0:
                 fetch_fail_status = True
+                fetch_fail_reason = f"获取文章详情失败: code={code}"
 
         if fetch_fail_status:
             query = """
@@ -121,7 +182,6 @@ class AutoReplyCardsMonitor(AutoReplyCardsMonitorConst):
                 VALUES
                 (%s, %s, %s, %s, %s, %s, %s, %s);
             """
-            remark = "获取文章详情失败"
             insert_row = (
                 task_id,
                 index,
@@ -130,34 +190,61 @@ class AutoReplyCardsMonitor(AutoReplyCardsMonitorConst):
                 article_link,
                 article_cover,
                 article_desc,
-                remark,
+                fetch_fail_reason,
             )
             await self.mapper.store_extract_result(query, insert_row)
+            await self.log_service.log(
+                contents={
+                    "task": "auto_reply_cards_monitor",
+                    "function": "store_article",
+                    "status": "fail",
+                    "trace_id": task_id,
+                    "message": f"store_article fetch failed, task_id={task_id}, reason={fetch_fail_reason}",
+                }
+            )
+            return False
 
         else:
-            article_detail = fetch_response["data"]["data"]
+            article_detail = fetch_response.get("data", {}).get("data", {})
+            if not article_detail:
+                await self.log_service.log(
+                    contents={
+                        "task": "auto_reply_cards_monitor",
+                        "function": "store_article",
+                        "status": "fail",
+                        "trace_id": task_id,
+                        "message": f"store_article missing data.data, task_id={task_id}",
+                    }
+                )
+                return False
+
             article_text = article_detail.get("body_text", "")
             article_images = article_detail.get("image_url_list", [])
             read_cnt = article_detail.get("view_count") or 0
             like_cnt = article_detail.get("like_count") or 0
             pt = article_detail.get("publish_timestamp")
             publish_timestamp = int(pt / 1000) if pt is not None else 0
-            parsed = urlparse(article_detail["content_link"])
-            params = parse_qs(parsed.query)
-            wx_sn = params.get("sn", [None])[0]
+
+            content_link = article_detail.get("content_link", "")
+            if content_link:
+                parsed = urlparse(content_link)
+                params = parse_qs(parsed.query)
+                wx_sn = params.get("sn", [None])[0]
+            else:
+                wx_sn = None
+
             mini_info = article_detail.get("mini_program")
             if not mini_info:
-                # video_id, root_source_id = None, None
                 query = """
-                    INSERT INTO cooperate_auto_reply_detail 
+                    INSERT INTO cooperate_auto_reply_detail
                     (
-                        task_id, position, msg_type, article_title, article_link, 
-                        article_cover, article_text, article_images, article_desc, read_cnt, 
+                        task_id, position, msg_type, article_title, article_link,
+                        article_cover, article_text, article_images, article_desc, read_cnt,
                         like_cnt, publish_timestamp, task_result, wx_sn
-                    ) VALUES 
+                    ) VALUES
                     (
-                        %s, %s, %s, %s, %s, 
-                        %s, %s, %s, %s, %s, 
+                        %s, %s, %s, %s, %s,
+                        %s, %s, %s, %s, %s,
                         %s, %s, %s, %s
                     );
                 """
@@ -178,27 +265,40 @@ class AutoReplyCardsMonitor(AutoReplyCardsMonitorConst):
                     wx_sn,
                 )
                 await self.mapper.store_extract_result(query, values)
+                return True
 
             else:
-                for card_index, i in enumerate(mini_info, 1):
+                for card_index, card_item in enumerate(mini_info, 1):
+                    card_path = card_item.get("path")
+                    if not card_path:
+                        await self.log_service.log(
+                            contents={
+                                "task": "auto_reply_cards_monitor",
+                                "function": "store_article",
+                                "status": "fail",
+                                "trace_id": task_id,
+                                "message": f"store_article mini_program missing path, task_id={task_id}",
+                            }
+                        )
+                        continue
                     try:
                         video_id, root_source_id = self.tool.extract_page_path(
-                            i["path"]
+                            card_path
                         )
-                        card_title = i["title"]
-                        card_cover = i["image_url"]
-                        mini_name = i["nike_name"]
+                        card_title = card_item.get("title", "")
+                        card_cover = card_item.get("image_url", "")
+                        mini_name = card_item.get("nike_name", "")
                         query = """
-                            INSERT INTO cooperate_auto_reply_detail 
+                            INSERT INTO cooperate_auto_reply_detail
                                 (
-                                    task_id, position, msg_type, card_title, card_cover, 
-                                    video_id, root_source_id, mini_program_name, article_title, article_link, 
-                                    article_cover, article_text, article_images, article_desc, read_cnt, 
+                                    task_id, position, msg_type, card_title, card_cover,
+                                    video_id, root_source_id, mini_program_name, article_title, article_link,
+                                    article_cover, article_text, article_images, article_desc, read_cnt,
                                     like_cnt, publish_timestamp, task_result, wx_sn, card_index
                                 ) VALUES
                                 (
-                                    %s, %s, %s, %s, %s, 
-                                    %s, %s, %s, %s, %s, 
+                                    %s, %s, %s, %s, %s,
+                                    %s, %s, %s, %s, %s,
                                     %s, %s, %s, %s, %s,
                                     %s, %s, %s, %s, %s
                                 );
@@ -229,16 +329,64 @@ class AutoReplyCardsMonitor(AutoReplyCardsMonitorConst):
                     except Exception as e:
                         print(traceback.format_exc())
                         print(e)
+                        await self.log_service.log(
+                            contents={
+                                "task": "auto_reply_cards_monitor",
+                                "function": "store_article",
+                                "status": "fail",
+                                "trace_id": task_id,
+                                "message": f"store_article mini_program insert failed, task_id={task_id}, card_index={card_index}, error={e}",
+                            }
+                        )
+                return True
 
     # 创建单个关注公众号任务
     async def create_follow_single_account_task(self, gh_id):
         response = await get_article_list_from_account(account_id=gh_id)
+        if not response:
+            await self.log_service.log(
+                contents={
+                    "task": "auto_reply_cards_monitor",
+                    "function": "create_follow_single_account_task",
+                    "status": "fail",
+                    "message": f"create_follow_single_account_task response is None, gh_id={gh_id}",
+                }
+            )
+            return
+
         code = response.get("code")
         match code:
             case 0:
-                recent_articles = response["data"]["data"][0]["AppMsg"]["DetailInfo"]
+                try:
+                    recent_articles = (
+                        response.get("data", {})
+                        .get("data", [{}])[0]
+                        .get("AppMsg", {})
+                        .get("DetailInfo", [])
+                    )
+                except (IndexError, TypeError, AttributeError) as e:
+                    await self.log_service.log(
+                        contents={
+                            "task": "auto_reply_cards_monitor",
+                            "function": "create_follow_single_account_task",
+                            "status": "fail",
+                            "message": f"create_follow_single_account_task parse response failed, gh_id={gh_id}, error={e}",
+                        }
+                    )
+                    return
+
+                if not recent_articles:
+                    await self.log_service.log(
+                        contents={
+                            "task": "auto_reply_cards_monitor",
+                            "function": "create_follow_single_account_task",
+                            "status": "warn",
+                            "message": f"create_follow_single_account_task no recent articles, gh_id={gh_id}",
+                        }
+                    )
+                    return
+
                 article_url = await self.tool.get_sample_url(recent_articles)
-                print(article_url)
                 if article_url:
                     await self.mapper.set_sample_url(gh_id, article_url)
 
@@ -258,11 +406,17 @@ class AutoReplyCardsMonitor(AutoReplyCardsMonitorConst):
                 await self.mapper.set_account_as_invalid(gh_id)
 
             case _:
-                pass
+                await self.log_service.log(
+                    contents={
+                        "task": "auto_reply_cards_monitor",
+                        "function": "create_follow_single_account_task",
+                        "status": "warn",
+                        "message": f"create_follow_single_account_task unexpected code={code}, gh_id={gh_id}",
+                    }
+                )
 
     # 创建单个账号自动回复任务
     async def create_auto_reply_single_account_task(self, gh_id, account_name):
-        print(account_name)
         task_id = self.tool.generate_task_id(task_name="auto_reply", gh_id=gh_id)
         # 先插入 task, 再创建自动回复任务
         create_row = await self.mapper.create_auto_reply_task(task_id, gh_id)
@@ -271,66 +425,157 @@ class AutoReplyCardsMonitor(AutoReplyCardsMonitorConst):
                 task_id, gh_id
             )
             if not affected_rows:
-                print("发布任务至 AIGC 失败")
+                await self.log_service.log(
+                    contents={
+                        "task": "auto_reply_cards_monitor",
+                        "function": "create_auto_reply_single_account_task",
+                        "status": "fail",
+                        "trace_id": task_id,
+                        "message": f"发布任务至 AIGC 失败, task_id={task_id}, account={account_name}",
+                    }
+                )
             else:
                 await self.mapper.update_auto_reply_task_status(
                     task_id, "task", self.INIT_STATUS, self.PROCESSING_STATUS
                 )
         else:
-            print("创建任务至 DB 失败")
+            await self.log_service.log(
+                contents={
+                    "task": "auto_reply_cards_monitor",
+                    "function": "create_auto_reply_single_account_task",
+                    "status": "fail",
+                    "trace_id": task_id,
+                    "message": f"创建任务至 DB 失败, task_id={task_id}, account={account_name}",
+                }
+            )
 
     async def follow_gzh_task(self):
-        account_list = self.tool.get_monitor_account_list()
+        try:
+            account_list = self.tool.get_monitor_account_list()
+        except Exception as e:
+            await self.log_service.log(
+                contents={
+                    "task": "auto_reply_cards_monitor",
+                    "function": "follow_gzh_task",
+                    "status": "fail",
+                    "message": f"follow_gzh_task get_monitor_account_list failed, error={e}",
+                }
+            )
+            return
+
+        if not account_list:
+            await self.log_service.log(
+                contents={
+                    "task": "auto_reply_cards_monitor",
+                    "function": "follow_gzh_task",
+                    "status": "warn",
+                    "message": "follow_gzh_task: no accounts from ODPS",
+                }
+            )
+            return
+
         for account in account_list:
-            try:
-                fetch_response = await self.mapper.fetch_account_status(
-                    account.公众号名
+            account_name = getattr(account, "公众号名", None)
+            if not account_name:
+                await self.log_service.log(
+                    contents={
+                        "task": "auto_reply_cards_monitor",
+                        "function": "follow_gzh_task",
+                        "status": "warn",
+                        "message": "follow_gzh_task account missing 公众号名, skip",
+                    }
                 )
+                continue
+
+            try:
+                fetch_response = await self.mapper.fetch_account_status(account_name)
                 if not fetch_response:
                     affected_rows = await self.mapper.fetch_cooperate_accounts(
-                        account.公众号名
+                        account_name
                     )
                     if affected_rows:
                         fetch_response = await self.mapper.fetch_account_status(
-                            account.公众号名
+                            account_name
                         )
-
                     else:
-                        print(f"系统中无账号,跳过: {account.公众号名}")
+                        await self.log_service.log(
+                            contents={
+                                "task": "auto_reply_cards_monitor",
+                                "function": "follow_gzh_task",
+                                "status": "warn",
+                                "message": f"系统中无账号,跳过: {account_name}",
+                            }
+                        )
                         continue
 
-                account_detail = fetch_response[0]
-                status = account_detail["status"]
+                if not fetch_response:
+                    await self.log_service.log(
+                        contents={
+                            "task": "auto_reply_cards_monitor",
+                            "function": "follow_gzh_task",
+                            "status": "warn",
+                            "message": f"fetch_account_status 返回空, 跳过: {account_name}",
+                        }
+                    )
+                    continue
 
+                account_detail = fetch_response[0]
+                status = account_detail.get("status")
                 if not status:
-                    print("账号已经迁移或者封禁")
+                    await self.log_service.log(
+                        contents={
+                            "task": "auto_reply_cards_monitor",
+                            "function": "follow_gzh_task",
+                            "status": "warn",
+                            "message": f"账号已经迁移或者封禁: {account_name}",
+                        }
+                    )
                     continue
 
-                # 新逻辑,无需考虑账号是否关注
-                await self.create_auto_reply_single_account_task(
-                    account_detail["gh_id"], account.公众号名
-                )
+                gh_id = account_detail.get("gh_id")
+                if not gh_id:
+                    await self.log_service.log(
+                        contents={
+                            "task": "auto_reply_cards_monitor",
+                            "function": "follow_gzh_task",
+                            "status": "fail",
+                            "message": f"账号缺少 gh_id: {account_name}",
+                        }
+                    )
+                    continue
+
+                await self.create_auto_reply_single_account_task(gh_id, account_name)
 
             except Exception as e:
-                print(f"处理账号{account.公众号名}异常", e)
+                await self.log_service.log(
+                    contents={
+                        "task": "auto_reply_cards_monitor",
+                        "function": "follow_gzh_task",
+                        "status": "fail",
+                        "message": f"处理账号异常 account={account_name}, error={e}",
+                    }
+                )
 
-    # 异步获取关注结果
+    # 异步获取自动回复结果
     async def get_auto_reply_response(self):
         task_list = await self.mapper.fetch_auto_replying_tasks()
         if not task_list:
-            print("No processing task yet")
             return
 
         for task in tqdm(task_list):
+            task_id = task.get("task_id")
+            if not task_id:
+                continue
+
             try:
-                task_id = task["task_id"]
                 response = await self.mapper.get_auto_reply_task_result(task_id)
                 if not response:
                     continue
 
-                task_status = response[0]["task_status"]
-                task_result = response[0]["task_result"]
-                update_timestamp = response[0]["update_timestamp"]
+                task_record = response[0]
+                task_status = task_record.get("task_status")
+                task_result = task_record.get("task_result")
+                update_timestamp = task_record.get("update_timestamp")
 
                 match task_status:
                     case self.FETCH_FAIL_STATUS:
@@ -347,12 +592,31 @@ class AutoReplyCardsMonitor(AutoReplyCardsMonitorConst):
                         continue
 
             except Exception as e:
-                print(e)
+                await self.log_service.log(
+                    contents={
+                        "task": "auto_reply_cards_monitor",
+                        "function": "get_auto_reply_response",
+                        "status": "fail",
+                        "trace_id": task_id,
+                        "message": f"get_auto_reply_response failed, task_id={task_id}, error={e}",
+                    }
+                )
 
     # 解析单个xml
     async def extract_single_xml(self, task):
-        task_id = task["task_id"]
-        result = task["result"]
+        task_id = task.get("task_id")
+        result = task.get("result")
+        if not task_id or not result:
+            await self.log_service.log(
+                contents={
+                    "task": "auto_reply_cards_monitor",
+                    "function": "extract_single_xml",
+                    "status": "fail",
+                    "trace_id": task_id,
+                    "message": "extract_single_xml missing task_id or result",
+                }
+            )
+            return
 
         # acquire lock
         acquire_lock = await self.mapper.update_auto_reply_task_status(
@@ -363,35 +627,126 @@ class AutoReplyCardsMonitor(AutoReplyCardsMonitorConst):
 
         try:
             # parse xml
-            xml_list = json.loads(result) if isinstance(result, str) else result
+            if isinstance(result, str):
+                xml_list = json.loads(result)
+            else:
+                xml_list = result
+
+            if not isinstance(xml_list, list):
+                await self.log_service.log(
+                    contents={
+                        "task": "auto_reply_cards_monitor",
+                        "function": "extract_single_xml",
+                        "status": "fail",
+                        "trace_id": task_id,
+                        "message": f"extract_single_xml result is not a list, task_id={task_id}",
+                    }
+                )
+                await self.mapper.update_auto_reply_task_status(
+                    task_id, "extract", self.PROCESSING_STATUS, self.FAIL_STATUS
+                )
+                return
+
             index = 0
+            success_count = 0
+            fail_count = 0
+
             for item in xml_list:
                 xml_obj_list = self.tool.extract_callback_xml(item)
                 if xml_obj_list:
                     for xml_obj in xml_obj_list:
                         index += 1
                         msg_type = xml_obj.get("msg_type", None)
-                        match msg_type:
-                            case "33":
-                                await self.store_card(task_id, index, msg_type, xml_obj)
-
-                            case "5":
-                                await self.store_article(
-                                    task_id, index, msg_type, xml_obj
-                                )
-
-                            case _:
-                                continue
-
-                await asyncio.sleep(5)
+                        try:
+                            result = False
+                            match msg_type:
+                                case "33":
+                                    result = await self.store_card(
+                                        task_id, index, msg_type, xml_obj
+                                    )
+
+                                case "5":
+                                    result = await self.store_article(
+                                        task_id, index, msg_type, xml_obj
+                                    )
+
+                                case _:
+                                    continue
+
+                            if result:
+                                success_count += 1
+                            else:
+                                fail_count += 1
+
+                        except Exception as e:
+                            fail_count += 1
+                            await self.log_service.log(
+                                contents={
+                                    "task": "auto_reply_cards_monitor",
+                                    "function": "extract_single_xml",
+                                    "status": "fail",
+                                    "trace_id": task_id,
+                                    "message": f"处理 xml_obj 异常, task_id={task_id}, index={index}, msg_type={msg_type}, error={e}",
+                                }
+                            )
+
+                    await asyncio.sleep(5)
+
+            # 根据成功/失败数量决定最终状态
+            if fail_count == 0 and success_count > 0:
+                final_status = self.SUCCESS_STATUS
+            elif success_count > 0 and fail_count > 0:
+                # 部分成功,记录警告但仍标记为成功
+                final_status = self.SUCCESS_STATUS
+                await self.log_service.log(
+                    contents={
+                        "task": "auto_reply_cards_monitor",
+                        "function": "extract_single_xml",
+                        "status": "warn",
+                        "trace_id": task_id,
+                        "message": f"任务部分成功, task_id={task_id}, success={success_count}, fail={fail_count}",
+                    }
+                )
+            elif success_count == 0 and fail_count > 0:
+                # 全部失败
+                final_status = self.FAIL_STATUS
+                await self.log_service.log(
+                    contents={
+                        "task": "auto_reply_cards_monitor",
+                        "function": "extract_single_xml",
+                        "status": "fail",
+                        "trace_id": task_id,
+                        "message": f"任务全部失败, task_id={task_id}, fail={fail_count}",
+                    }
+                )
+            else:
+                # success_count == 0 and fail_count == 0,没有处理任何数据
+                final_status = self.SUCCESS_STATUS
+                await self.log_service.log(
+                    contents={
+                        "task": "auto_reply_cards_monitor",
+                        "function": "extract_single_xml",
+                        "status": "warn",
+                        "trace_id": task_id,
+                        "message": f"任务无数据处理, task_id={task_id}",
+                    }
+                )
 
             await self.mapper.update_auto_reply_task_status(
-                task_id, "extract", self.PROCESSING_STATUS, self.SUCCESS_STATUS
+                task_id, "extract", self.PROCESSING_STATUS, final_status
             )
 
         except Exception as e:
-            print(e)
             print(traceback.format_exc())
+            await self.log_service.log(
+                contents={
+                    "task": "auto_reply_cards_monitor",
+                    "function": "extract_single_xml",
+                    "status": "fail",
+                    "trace_id": task_id,
+                    "message": f"extract_single_xml failed, task_id={task_id}, error={e}",
+                }
+            )
             await self.mapper.update_auto_reply_task_status(
                 task_id, "extract", self.PROCESSING_STATUS, self.FAIL_STATUS
             )
@@ -408,15 +763,18 @@ class AutoReplyCardsMonitor(AutoReplyCardsMonitorConst):
             case "extract_task":
                 task_list = await self.mapper.get_extract_tasks()
                 if not task_list:
-                    print("No tasks to extract now")
                     return
 
                 for task in tqdm(task_list, desc="解析任务"):
                     await self.extract_single_xml(task)
                     await asyncio.sleep(10)
 
-            case "re_extract_task":
-                pass
-
             case _:
-                print("task_error")
+                await self.log_service.log(
+                    contents={
+                        "task": "auto_reply_cards_monitor",
+                        "function": "deal",
+                        "status": "fail",
+                        "message": f"unknown task_name: {task_name}",
+                    }
+                )