|
@@ -41,7 +41,10 @@ class AutoReplyCardsMonitor(AutoReplyCardsMonitorConst):
|
|
|
self.tool.extract_page_path(page_path) if page_path else (None, None)
|
|
self.tool.extract_page_path(page_path) if page_path else (None, None)
|
|
|
)
|
|
)
|
|
|
|
|
|
|
|
- # Phase 1: 直接落库,封面信息暂空
|
|
|
|
|
|
|
+ mini_program = xml_obj.get("mini_program", "")
|
|
|
|
|
+ need_cover = mini_program == "票圈视频"
|
|
|
|
|
+
|
|
|
|
|
+ # Phase 1: 直接落库
|
|
|
query = """
|
|
query = """
|
|
|
INSERT INTO cooperate_auto_reply_detail
|
|
INSERT INTO cooperate_auto_reply_detail
|
|
|
(
|
|
(
|
|
@@ -62,15 +65,17 @@ class AutoReplyCardsMonitor(AutoReplyCardsMonitorConst):
|
|
|
"",
|
|
"",
|
|
|
video_id or "",
|
|
video_id or "",
|
|
|
root_source_id or "",
|
|
root_source_id or "",
|
|
|
- xml_obj.get("mini_program", ""),
|
|
|
|
|
|
|
+ mini_program,
|
|
|
json.dumps(xml_obj, ensure_ascii=False),
|
|
json.dumps(xml_obj, ensure_ascii=False),
|
|
|
- self.CoverStatus.INIT,
|
|
|
|
|
|
|
+ self.CoverStatus.INIT if need_cover else self.CoverStatus.SKIP,
|
|
|
)
|
|
)
|
|
|
await self.mapper.store_extract_result(query, insert_row)
|
|
await self.mapper.store_extract_result(query, insert_row)
|
|
|
|
|
|
|
|
- # Phase 2: 异步下载封面(best effort,不影响落库结果)
|
|
|
|
|
- cover_ok = await self._try_download_cover(task_id, index, xml_obj)
|
|
|
|
|
- return cover_ok
|
|
|
|
|
|
|
+ # Phase 2: 仅票圈视频下载封面
|
|
|
|
|
+ if need_cover:
|
|
|
|
|
+ cover_ok = await self._try_download_cover(task_id, index, xml_obj)
|
|
|
|
|
+ return cover_ok
|
|
|
|
|
+ return True
|
|
|
|
|
|
|
|
async def _try_download_cover(self, task_id, index, xml_obj) -> bool:
|
|
async def _try_download_cover(self, task_id, index, xml_obj) -> bool:
|
|
|
"""尝试下载并更新封面,成功返回 True,失败更新 cover_status=FAILED 并返回 False"""
|
|
"""尝试下载并更新封面,成功返回 True,失败更新 cover_status=FAILED 并返回 False"""
|
|
@@ -117,6 +122,15 @@ class AutoReplyCardsMonitor(AutoReplyCardsMonitorConst):
|
|
|
await self.mapper.update_cover_info(
|
|
await self.mapper.update_cover_info(
|
|
|
task_id, index, "", "", self.CoverStatus.FAILED
|
|
task_id, index, "", "", self.CoverStatus.FAILED
|
|
|
)
|
|
)
|
|
|
|
|
+ await self.log_service.log(
|
|
|
|
|
+ contents={
|
|
|
|
|
+ "task": "auto_reply_cards_monitor",
|
|
|
|
|
+ "function": "store_card",
|
|
|
|
|
+ "status": "fail",
|
|
|
|
|
+ "trace_id": task_id,
|
|
|
|
|
+ "message": f"store_card download_cover failed, task_id={task_id}",
|
|
|
|
|
+ }
|
|
|
|
|
+ )
|
|
|
return False
|
|
return False
|
|
|
|
|
|
|
|
page_path = xml_obj.get("page_path", "")
|
|
page_path = xml_obj.get("page_path", "")
|
|
@@ -629,7 +643,7 @@ class AutoReplyCardsMonitor(AutoReplyCardsMonitorConst):
|
|
|
}
|
|
}
|
|
|
)
|
|
)
|
|
|
|
|
|
|
|
- # 解析单个xml
|
|
|
|
|
|
|
+ # 解析单个xml,返回 (total, parse_fail_items, cover_fail_items)
|
|
|
async def extract_single_xml(self, task):
|
|
async def extract_single_xml(self, task):
|
|
|
task_id = task.get("task_id")
|
|
task_id = task.get("task_id")
|
|
|
result = task.get("result")
|
|
result = task.get("result")
|
|
@@ -643,14 +657,14 @@ class AutoReplyCardsMonitor(AutoReplyCardsMonitorConst):
|
|
|
"message": "extract_single_xml missing task_id or result",
|
|
"message": "extract_single_xml missing task_id or result",
|
|
|
}
|
|
}
|
|
|
)
|
|
)
|
|
|
- return
|
|
|
|
|
|
|
+ return 0, [], []
|
|
|
|
|
|
|
|
# acquire lock
|
|
# acquire lock
|
|
|
acquire_lock = await self.mapper.update_auto_reply_task_status(
|
|
acquire_lock = await self.mapper.update_auto_reply_task_status(
|
|
|
task_id, "extract", self.INIT_STATUS, self.PROCESSING_STATUS
|
|
task_id, "extract", self.INIT_STATUS, self.PROCESSING_STATUS
|
|
|
)
|
|
)
|
|
|
if not acquire_lock:
|
|
if not acquire_lock:
|
|
|
- return
|
|
|
|
|
|
|
+ return 0, [], []
|
|
|
|
|
|
|
|
try:
|
|
try:
|
|
|
# parse xml
|
|
# parse xml
|
|
@@ -672,7 +686,7 @@ class AutoReplyCardsMonitor(AutoReplyCardsMonitorConst):
|
|
|
await self.mapper.update_auto_reply_task_status(
|
|
await self.mapper.update_auto_reply_task_status(
|
|
|
task_id, "extract", self.PROCESSING_STATUS, self.FAIL_STATUS
|
|
task_id, "extract", self.PROCESSING_STATUS, self.FAIL_STATUS
|
|
|
)
|
|
)
|
|
|
- return
|
|
|
|
|
|
|
+ return 0, [], []
|
|
|
|
|
|
|
|
index = 0
|
|
index = 0
|
|
|
parse_fail_items = [] # XML 结构异常项
|
|
parse_fail_items = [] # XML 结构异常项
|
|
@@ -753,17 +767,7 @@ class AutoReplyCardsMonitor(AutoReplyCardsMonitorConst):
|
|
|
task_id, "extract", self.PROCESSING_STATUS, final_status
|
|
task_id, "extract", self.PROCESSING_STATUS, final_status
|
|
|
)
|
|
)
|
|
|
|
|
|
|
|
- # 飞书报警:XML 结构异常(外部数据结构变化)
|
|
|
|
|
- if parse_fail_items:
|
|
|
|
|
- await self._alert_xml_parse_failure(task_id, parse_fail_items)
|
|
|
|
|
-
|
|
|
|
|
- # 飞书报警:封面下载失败率超阈值
|
|
|
|
|
- if total > 0:
|
|
|
|
|
- cover_fail_rate = len(cover_fail_items) / total
|
|
|
|
|
- if cover_fail_rate >= self.FEISHU_ALERT_COVER_FAIL_RATE:
|
|
|
|
|
- await self._alert_cover_failure(
|
|
|
|
|
- task_id, total, cover_fail_items, cover_fail_rate
|
|
|
|
|
- )
|
|
|
|
|
|
|
+ return total, parse_fail_items, cover_fail_items
|
|
|
|
|
|
|
|
except Exception as e:
|
|
except Exception as e:
|
|
|
print(traceback.format_exc())
|
|
print(traceback.format_exc())
|
|
@@ -779,58 +783,63 @@ class AutoReplyCardsMonitor(AutoReplyCardsMonitorConst):
|
|
|
await self.mapper.update_auto_reply_task_status(
|
|
await self.mapper.update_auto_reply_task_status(
|
|
|
task_id, "extract", self.PROCESSING_STATUS, self.FAIL_STATUS
|
|
task_id, "extract", self.PROCESSING_STATUS, self.FAIL_STATUS
|
|
|
)
|
|
)
|
|
|
|
|
+ return 0, [], []
|
|
|
|
|
+
|
|
|
|
|
+ async def _alert_batch(self, batch_total, all_parse_fails, all_cover_fails):
|
|
|
|
|
+ """批量报警:聚合整批任务结果后统一发送"""
|
|
|
|
|
+ total = batch_total
|
|
|
|
|
+ parse_count = len(all_parse_fails)
|
|
|
|
|
+ cover_count = len(all_cover_fails)
|
|
|
|
|
+
|
|
|
|
|
+ # XML 解析失败 → 报警
|
|
|
|
|
+ if parse_count > 0:
|
|
|
|
|
+ detail = {
|
|
|
|
|
+ "fail_count": parse_count,
|
|
|
|
|
+ "samples": all_parse_fails[:3],
|
|
|
|
|
+ }
|
|
|
|
|
+ try:
|
|
|
|
|
+ await feishu_robot.bot(
|
|
|
|
|
+ title="自动回复卡片 XML 解析失败,外部数据结构可能已变更",
|
|
|
|
|
+ detail=detail,
|
|
|
|
|
+ mention=True,
|
|
|
|
|
+ env="long_articles_task",
|
|
|
|
|
+ )
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ await self.log_service.log(
|
|
|
|
|
+ contents={
|
|
|
|
|
+ "task": "auto_reply_cards_monitor",
|
|
|
|
|
+ "function": "_alert_batch",
|
|
|
|
|
+ "status": "fail",
|
|
|
|
|
+ "message": f"XML解析失败飞书报警发送失败: {e}",
|
|
|
|
|
+ }
|
|
|
|
|
+ )
|
|
|
|
|
|
|
|
- async def _alert_xml_parse_failure(self, task_id, parse_fail_items):
|
|
|
|
|
- """XML 解析失败报警:外部数据结构可能已变更"""
|
|
|
|
|
- detail = {
|
|
|
|
|
- "task_id": task_id,
|
|
|
|
|
- "fail_count": len(parse_fail_items),
|
|
|
|
|
- "samples": parse_fail_items[:3],
|
|
|
|
|
- }
|
|
|
|
|
- try:
|
|
|
|
|
- await feishu_robot.bot(
|
|
|
|
|
- title="⚠️ 自动回复卡片 XML 解析失败,外部数据结构可能已变更",
|
|
|
|
|
- detail=detail,
|
|
|
|
|
- mention=True,
|
|
|
|
|
- env="prod",
|
|
|
|
|
- )
|
|
|
|
|
- except Exception as e:
|
|
|
|
|
- await self.log_service.log(
|
|
|
|
|
- contents={
|
|
|
|
|
- "task": "auto_reply_cards_monitor",
|
|
|
|
|
- "function": "_alert_xml_parse_failure",
|
|
|
|
|
- "status": "fail",
|
|
|
|
|
- "trace_id": task_id,
|
|
|
|
|
- "message": f"飞书报警发送失败: {e}",
|
|
|
|
|
- }
|
|
|
|
|
- )
|
|
|
|
|
-
|
|
|
|
|
- async def _alert_cover_failure(self, task_id, total, fail_items, fail_rate):
|
|
|
|
|
- """封面下载失败率过高报警"""
|
|
|
|
|
- detail = {
|
|
|
|
|
- "task_id": task_id,
|
|
|
|
|
- "total": total,
|
|
|
|
|
- "fail_count": len(fail_items),
|
|
|
|
|
- "fail_rate": f"{fail_rate:.1%}",
|
|
|
|
|
- "fail_positions": fail_items[:10],
|
|
|
|
|
- }
|
|
|
|
|
- try:
|
|
|
|
|
- await feishu_robot.bot(
|
|
|
|
|
- title="⚠️ 自动回复卡片封面下载失败率过高",
|
|
|
|
|
- detail=detail,
|
|
|
|
|
- mention=True,
|
|
|
|
|
- env="prod",
|
|
|
|
|
- )
|
|
|
|
|
- except Exception as e:
|
|
|
|
|
- await self.log_service.log(
|
|
|
|
|
- contents={
|
|
|
|
|
- "task": "auto_reply_cards_monitor",
|
|
|
|
|
- "function": "_alert_cover_failure",
|
|
|
|
|
- "status": "fail",
|
|
|
|
|
- "trace_id": task_id,
|
|
|
|
|
- "message": f"飞书报警发送失败: {e}",
|
|
|
|
|
|
|
+ # 封面下载失败率超阈值 → 报警
|
|
|
|
|
+ if total > 0:
|
|
|
|
|
+ fail_rate = cover_count / total
|
|
|
|
|
+ if fail_rate >= self.FEISHU_ALERT_COVER_FAIL_RATE:
|
|
|
|
|
+ detail = {
|
|
|
|
|
+ "total": total,
|
|
|
|
|
+ "fail_count": cover_count,
|
|
|
|
|
+ "fail_rate": f"{fail_rate:.1%}",
|
|
|
|
|
+ "fail_positions": all_cover_fails[:10],
|
|
|
}
|
|
}
|
|
|
- )
|
|
|
|
|
|
|
+ try:
|
|
|
|
|
+ await feishu_robot.bot(
|
|
|
|
|
+ title="自动回复卡片封面下载失败率过高",
|
|
|
|
|
+ detail=detail,
|
|
|
|
|
+ mention=True,
|
|
|
|
|
+ env="long_articles_task",
|
|
|
|
|
+ )
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ await self.log_service.log(
|
|
|
|
|
+ contents={
|
|
|
|
|
+ "task": "auto_reply_cards_monitor",
|
|
|
|
|
+ "function": "_alert_batch",
|
|
|
|
|
+ "status": "fail",
|
|
|
|
|
+ "message": f"封面失败率飞书报警发送失败: {e}",
|
|
|
|
|
+ }
|
|
|
|
|
+ )
|
|
|
|
|
|
|
|
# main function
|
|
# main function
|
|
|
async def deal(self, task_name):
|
|
async def deal(self, task_name):
|
|
@@ -846,10 +855,20 @@ class AutoReplyCardsMonitor(AutoReplyCardsMonitorConst):
|
|
|
if not task_list:
|
|
if not task_list:
|
|
|
return
|
|
return
|
|
|
|
|
|
|
|
|
|
+ batch_total = 0
|
|
|
|
|
+ all_parse_fails = []
|
|
|
|
|
+ all_cover_fails = []
|
|
|
|
|
+
|
|
|
for task in tqdm(task_list, desc="解析任务"):
|
|
for task in tqdm(task_list, desc="解析任务"):
|
|
|
- await self.extract_single_xml(task)
|
|
|
|
|
|
|
+ total, parse_fails, cover_fails = await self.extract_single_xml(task)
|
|
|
|
|
+ batch_total += total
|
|
|
|
|
+ all_parse_fails.extend(parse_fails)
|
|
|
|
|
+ all_cover_fails.extend(cover_fails)
|
|
|
await asyncio.sleep(10)
|
|
await asyncio.sleep(10)
|
|
|
|
|
|
|
|
|
|
+ # 批量完成后统一报警
|
|
|
|
|
+ await self._alert_batch(batch_total, all_parse_fails, all_cover_fails)
|
|
|
|
|
+
|
|
|
case _:
|
|
case _:
|
|
|
await self.log_service.log(
|
|
await self.log_service.log(
|
|
|
contents={
|
|
contents={
|