Przeglądaj źródła

1. 修改快代理
2. 票圈发布接口 bugfix

luojunhui 1 tydzień temu
rodzic
commit
a025bd23b1

+ 11 - 0
app/domains/monitor_tasks/auto_reply_cards_monitor/_mapper.py

@@ -215,6 +215,17 @@ class AutoReplyCardsMonitorMapper(AutoReplyCardsMonitorConst):
         """
         return await self.pool.async_fetch(query=query, params=(root_source_id,))
 
+    # 获取封面下载失败的记录
+    async def get_failed_cover_records(self):
+        query = """
+            SELECT task_id, position, task_result
+            FROM cooperate_auto_reply_detail
+            WHERE cover_status = %s;
+        """
+        return await self.pool.async_fetch(
+            query=query, params=(self.FAIL_STATUS,)
+        )
+
     # 保存封面至封面表
     async def save_cover(self, cover_id, root_source_id, oss_path):
         query = """

+ 57 - 0
app/domains/monitor_tasks/auto_reply_cards_monitor/entrance.py

@@ -841,6 +841,60 @@ class AutoReplyCardsMonitor(AutoReplyCardsMonitorConst):
                         }
                     )
 
+    async def re_extract(self):
+        """重新处理封面下载失败的记录:读取 task_result 中的 xml_obj,重新下载封面"""
+        records = await self.mapper.get_failed_cover_records()
+        if not records:
+            await self.log_service.log(
+                contents={
+                    "task": "auto_reply_cards_monitor",
+                    "function": "re_extract",
+                    "status": "info",
+                    "message": "re_extract: 无封面下载失败记录",
+                }
+            )
+            return
+
+        success_cnt = 0
+        fail_cnt = 0
+
+        for record in tqdm(records, desc="重新提取封面"):
+            task_id = record.get("task_id")
+            position = record.get("position")
+            task_result = record.get("task_result")
+
+            if not all([task_id, position, task_result]):
+                fail_cnt += 1
+                continue
+
+            try:
+                xml_obj = json.loads(task_result) if isinstance(task_result, str) else task_result
+                cover_ok = await self._try_download_cover(task_id, position, xml_obj)
+                if cover_ok:
+                    success_cnt += 1
+                else:
+                    fail_cnt += 1
+            except (json.JSONDecodeError, TypeError) as e:
+                fail_cnt += 1
+                await self.log_service.log(
+                    contents={
+                        "task": "auto_reply_cards_monitor",
+                        "function": "re_extract",
+                        "status": "fail",
+                        "trace_id": task_id,
+                        "message": f"re_extract parse task_result failed, task_id={task_id}, error={e}",
+                    }
+                )
+
+        await self.log_service.log(
+            contents={
+                "task": "auto_reply_cards_monitor",
+                "function": "re_extract",
+                "status": "info",
+                "message": f"re_extract 完成, total={len(records)}, success={success_cnt}, fail={fail_cnt}",
+            }
+        )
+
     # main function
     async def deal(self, task_name):
         match task_name:
@@ -850,6 +904,9 @@ class AutoReplyCardsMonitor(AutoReplyCardsMonitorConst):
             case "get_auto_reply_task":
                 await self.get_auto_reply_response()
 
+            case "re_extract_task":
+                await self.re_extract()
+
             case "extract_task":
                 task_list = await self.mapper.get_extract_tasks()
                 if not task_list: