|
|
@@ -0,0 +1,356 @@
|
|
|
+import json
|
|
|
+from collections import defaultdict
|
|
|
+from typing import Dict, List
|
|
|
+
|
|
|
+from app.core.database import DatabaseManager
|
|
|
+from app.core.observability import LogService
|
|
|
+
|
|
|
+from ._const import DecodeCardConst
|
|
|
+from ._mapper import CardDecodeTaskMapper
|
|
|
+from ._utils import CardDecodeUtils
|
|
|
+
|
|
|
+
|
|
|
+class CreateCardsDecodeTask(DecodeCardConst):
|
|
|
+ def __init__(self, pool: DatabaseManager, log_service: LogService):
|
|
|
+ self.pool = pool
|
|
|
+ self.log_service = log_service
|
|
|
+ self.mapper = CardDecodeTaskMapper(self.pool)
|
|
|
+ self.tool = CardDecodeUtils()
|
|
|
+
|
|
|
+ async def _acquire_cards(self) -> List[Dict]:
|
|
|
+ """获取待解构卡片并加锁(status INIT → PROCESSING)"""
|
|
|
+ cards = await self.mapper.fetch_cards()
|
|
|
+ locked = []
|
|
|
+ for card in cards:
|
|
|
+ card_id = card["id"]
|
|
|
+ acquired = await self.mapper.update_card_status(
|
|
|
+ card_id, self.TaskStatus.INIT, self.TaskStatus.PROCESSING
|
|
|
+ )
|
|
|
+ if acquired:
|
|
|
+ locked.append(card)
|
|
|
+ else:
|
|
|
+ await self.log_service.log(
|
|
|
+ contents={
|
|
|
+ "card_id": card_id,
|
|
|
+ "task": "create_card_decode_task",
|
|
|
+ "status": "skip",
|
|
|
+ "message": "acquire lock failed",
|
|
|
+ }
|
|
|
+ )
|
|
|
+ return locked
|
|
|
+
|
|
|
+ async def _ensure_cover_ids(self, cards: List[Dict]):
|
|
|
+ """为缺少 card_cover_id 的卡片下载封面并计算 MD5"""
|
|
|
+ for card in cards:
|
|
|
+ if card.get("card_cover_id"):
|
|
|
+ continue
|
|
|
+ if not card.get("share_cover"):
|
|
|
+ continue # 空封面留给 _submit_and_record 标记 FAILED
|
|
|
+ try:
|
|
|
+ cover_id = self.tool.compute_cover_md5(card["share_cover"])
|
|
|
+ await self.mapper.set_card_cover_id(card["id"], cover_id)
|
|
|
+ card["card_cover_id"] = cover_id
|
|
|
+ await self.log_service.log(
|
|
|
+ contents={
|
|
|
+ "card_id": card["id"],
|
|
|
+ "task": "create_card_decode_task",
|
|
|
+ "status": "info",
|
|
|
+ "message": f"computed cover_id={cover_id}",
|
|
|
+ }
|
|
|
+ )
|
|
|
+ except Exception as e:
|
|
|
+ await self.mapper.update_card_status(
|
|
|
+ card["id"],
|
|
|
+ self.TaskStatus.PROCESSING,
|
|
|
+ self.TaskStatus.FAILED,
|
|
|
+ )
|
|
|
+ await self.log_service.log(
|
|
|
+ contents={
|
|
|
+ "card_id": card["id"],
|
|
|
+ "share_cover": card.get("share_cover"),
|
|
|
+ "task": "create_card_decode_task",
|
|
|
+ "status": "fail",
|
|
|
+ "message": f"compute cover md5 failed, marked as FAILED: {e}",
|
|
|
+ }
|
|
|
+ )
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def _group_cards_by_channel(cards: List[Dict]) -> Dict[int, List[Dict]]:
|
|
|
+ """按 auto_reply_top_cards_daily.channel 分组,映射到对应的 config_id"""
|
|
|
+ grouped = defaultdict(list)
|
|
|
+ for card in cards:
|
|
|
+ source_channel = card.get("channel", "")
|
|
|
+ config_id = DecodeCardConst.CHANNEL_CONFIG_MAP.get(source_channel)
|
|
|
+ if config_id:
|
|
|
+ grouped[config_id].append(card)
|
|
|
+ return dict(grouped)
|
|
|
+
|
|
|
+ async def _submit_and_record_for_config(
|
|
|
+ self, cards: List[Dict], config_id: int
|
|
|
+ ):
|
|
|
+ """对同一 config_id 的卡片执行提交与落库"""
|
|
|
+ if not cards:
|
|
|
+ return
|
|
|
+
|
|
|
+ # 过滤已有任务记录的卡片(按 card_cover_id 去重)
|
|
|
+ cards_with_cid = [c for c in cards if c.get("card_cover_id")]
|
|
|
+ if not cards_with_cid:
|
|
|
+ return
|
|
|
+
|
|
|
+ # 跨批次去重:查 DB 已有任务
|
|
|
+ all_source_ids = [str(c["card_cover_id"]) for c in cards_with_cid]
|
|
|
+ existing = await self.mapper.fetch_existing_source_ids(
|
|
|
+ all_source_ids, config_id
|
|
|
+ )
|
|
|
+
|
|
|
+ # 同批次去重:相同 card_cover_id 只保留第一条
|
|
|
+ seen = set()
|
|
|
+ deduped_cards = []
|
|
|
+ intra_dups = []
|
|
|
+ for c in cards_with_cid:
|
|
|
+ cid = str(c["card_cover_id"])
|
|
|
+ if cid in existing or cid in seen:
|
|
|
+ intra_dups.append(c)
|
|
|
+ else:
|
|
|
+ seen.add(cid)
|
|
|
+ deduped_cards.append(c)
|
|
|
+
|
|
|
+ if intra_dups:
|
|
|
+ await self.log_service.log(
|
|
|
+ contents={
|
|
|
+ "task": "create_card_decode_task",
|
|
|
+ "config_id": config_id,
|
|
|
+ "message": f"Skipped {len(intra_dups)} duplicate cards (already submitted or same-batch)",
|
|
|
+ }
|
|
|
+ )
|
|
|
+ for card in intra_dups:
|
|
|
+ await self.mapper.update_card_status(
|
|
|
+ card["id"],
|
|
|
+ self.TaskStatus.PROCESSING,
|
|
|
+ self.TaskStatus.SUCCESS,
|
|
|
+ )
|
|
|
+
|
|
|
+ new_cards = deduped_cards
|
|
|
+
|
|
|
+ if not new_cards:
|
|
|
+ return
|
|
|
+
|
|
|
+ posts = self.tool.prepare_posts(new_cards)
|
|
|
+ submit_results = await self.tool.submit_decode_batch(
|
|
|
+ posts, config_id=config_id
|
|
|
+ )
|
|
|
+ posts_by_cid = {p["channelContentId"]: p for p in posts}
|
|
|
+
|
|
|
+ for card in new_cards:
|
|
|
+ source_id = str(card["card_cover_id"])
|
|
|
+ card_id = card["id"]
|
|
|
+ result = submit_results.get(source_id)
|
|
|
+
|
|
|
+ if not result:
|
|
|
+ await self.mapper.update_card_status(
|
|
|
+ card_id, self.TaskStatus.PROCESSING, self.TaskStatus.INIT
|
|
|
+ )
|
|
|
+ await self.log_service.log(
|
|
|
+ contents={
|
|
|
+ "card_id": card_id,
|
|
|
+ "source_id": source_id,
|
|
|
+ "config_id": config_id,
|
|
|
+ "task": "create_card_decode_task",
|
|
|
+ "status": "fail",
|
|
|
+ "message": "no response for source_id, rolled back to INIT",
|
|
|
+ }
|
|
|
+ )
|
|
|
+ continue
|
|
|
+
|
|
|
+ status = result.get("status")
|
|
|
+ if status == self.SubmitStatus.FAILED:
|
|
|
+ await self.mapper.update_card_status(
|
|
|
+ card_id, self.TaskStatus.PROCESSING, self.TaskStatus.INIT
|
|
|
+ )
|
|
|
+ await self.log_service.log(
|
|
|
+ contents={
|
|
|
+ "card_id": card_id,
|
|
|
+ "source_id": source_id,
|
|
|
+ "config_id": config_id,
|
|
|
+ "task": "create_card_decode_task",
|
|
|
+ "status": "fail",
|
|
|
+ "data": result,
|
|
|
+ }
|
|
|
+ )
|
|
|
+ continue
|
|
|
+
|
|
|
+ if status == self.SubmitStatus.SUCCESS:
|
|
|
+ query_results = await self.tool.query_decode_results_batch(
|
|
|
+ [source_id], config_id=config_id
|
|
|
+ )
|
|
|
+ result_data = query_results.get(source_id)
|
|
|
+ if (
|
|
|
+ result_data
|
|
|
+ and result_data.get("status") == self.QueryStatus.SUCCESS
|
|
|
+ ):
|
|
|
+ data_content = result_data.get("dataContent") or "{}"
|
|
|
+ html = result_data.get("html")
|
|
|
+ await self.mapper.insert_decode_task(
|
|
|
+ source_id=source_id,
|
|
|
+ config_id=config_id,
|
|
|
+ payload=json.dumps(
|
|
|
+ posts_by_cid.get(source_id, {}), ensure_ascii=False
|
|
|
+ ),
|
|
|
+ remark="提交时已有解构结果,直接落库",
|
|
|
+ )
|
|
|
+ await self.mapper.set_decode_result(
|
|
|
+ source_id=source_id,
|
|
|
+ config_id=config_id,
|
|
|
+ result=json.dumps(
|
|
|
+ {"dataContent": data_content, "html": html},
|
|
|
+ ensure_ascii=False,
|
|
|
+ ),
|
|
|
+ remark="提交时已返回 SUCCESS,结果已落库",
|
|
|
+ )
|
|
|
+ await self.mapper.update_card_status(
|
|
|
+ card_id, self.TaskStatus.PROCESSING, self.TaskStatus.SUCCESS
|
|
|
+ )
|
|
|
+ await self.log_service.log(
|
|
|
+ contents={
|
|
|
+ "card_id": card_id,
|
|
|
+ "source_id": source_id,
|
|
|
+ "config_id": config_id,
|
|
|
+ "task": "create_card_decode_task",
|
|
|
+ "status": "success",
|
|
|
+ "message": "decode result already available on submit",
|
|
|
+ }
|
|
|
+ )
|
|
|
+ else:
|
|
|
+ await self.mapper.insert_decode_task(
|
|
|
+ source_id=source_id,
|
|
|
+ config_id=config_id,
|
|
|
+ payload=json.dumps(
|
|
|
+ posts_by_cid.get(source_id, {}), ensure_ascii=False
|
|
|
+ ),
|
|
|
+ remark="提交返回SUCCESS,查询未果,等待轮询",
|
|
|
+ status=self.TaskStatus.PROCESSING,
|
|
|
+ )
|
|
|
+ await self.mapper.update_card_status(
|
|
|
+ card_id, self.TaskStatus.PROCESSING, self.TaskStatus.SUCCESS
|
|
|
+ )
|
|
|
+ await self.log_service.log(
|
|
|
+ contents={
|
|
|
+ "card_id": card_id,
|
|
|
+ "source_id": source_id,
|
|
|
+ "config_id": config_id,
|
|
|
+ "task": "create_card_decode_task",
|
|
|
+ "status": "pending",
|
|
|
+ "message": "submit SUCCESS but query not ready, inserted for polling",
|
|
|
+ }
|
|
|
+ )
|
|
|
+ elif status == self.SubmitStatus.PENDING:
|
|
|
+ await self.mapper.insert_decode_task(
|
|
|
+ source_id=source_id,
|
|
|
+ config_id=config_id,
|
|
|
+ payload=json.dumps(
|
|
|
+ posts_by_cid.get(source_id, {}), ensure_ascii=False
|
|
|
+ ),
|
|
|
+ remark="卡片解构任务已提交,等待轮询",
|
|
|
+ status=self.TaskStatus.PROCESSING,
|
|
|
+ )
|
|
|
+ await self.mapper.update_card_status(
|
|
|
+ card_id, self.TaskStatus.PROCESSING, self.TaskStatus.SUCCESS
|
|
|
+ )
|
|
|
+ await self.log_service.log(
|
|
|
+ contents={
|
|
|
+ "card_id": card_id,
|
|
|
+ "source_id": source_id,
|
|
|
+ "config_id": config_id,
|
|
|
+ "task": "create_card_decode_task",
|
|
|
+ "status": "pending",
|
|
|
+ "message": "task submitted, waiting for polling",
|
|
|
+ }
|
|
|
+ )
|
|
|
+ else:
|
|
|
+ await self.mapper.update_card_status(
|
|
|
+ card_id, self.TaskStatus.PROCESSING, self.TaskStatus.INIT
|
|
|
+ )
|
|
|
+ await self.log_service.log(
|
|
|
+ contents={
|
|
|
+ "card_id": card_id,
|
|
|
+ "source_id": source_id,
|
|
|
+ "config_id": config_id,
|
|
|
+ "task": "create_card_decode_task",
|
|
|
+ "status": "fail",
|
|
|
+ "message": f"unexpected submit status: {status}, rolled back to INIT",
|
|
|
+ "data": result,
|
|
|
+ }
|
|
|
+ )
|
|
|
+
|
|
|
+ async def _submit_and_record(self, cards: List[Dict]):
|
|
|
+ if not cards:
|
|
|
+ return
|
|
|
+
|
|
|
+ # 无 share_cover 的卡片直接标记失败
|
|
|
+ valid_cards = []
|
|
|
+ for card in cards:
|
|
|
+ if not card.get("share_cover"):
|
|
|
+ await self.mapper.update_card_status(
|
|
|
+ card["id"],
|
|
|
+ self.TaskStatus.PROCESSING,
|
|
|
+ self.TaskStatus.FAILED,
|
|
|
+ )
|
|
|
+ await self.log_service.log(
|
|
|
+ contents={
|
|
|
+ "card_id": card["id"],
|
|
|
+ "task": "create_card_decode_task",
|
|
|
+ "status": "fail",
|
|
|
+ "message": "share_cover is empty, marked as FAILED",
|
|
|
+ }
|
|
|
+ )
|
|
|
+ else:
|
|
|
+ valid_cards.append(card)
|
|
|
+
|
|
|
+ if not valid_cards:
|
|
|
+ return
|
|
|
+
|
|
|
+ grouped = self._group_cards_by_channel(valid_cards)
|
|
|
+ for config_id, group_cards in grouped.items():
|
|
|
+ await self._submit_and_record_for_config(group_cards, config_id)
|
|
|
+
|
|
|
+ # 处理不在映射表中的卡片(回滚状态)
|
|
|
+ mapped_ids = {id(c) for g in grouped.values() for c in g}
|
|
|
+ for card in valid_cards:
|
|
|
+ if id(card) not in mapped_ids:
|
|
|
+ await self.mapper.update_card_status(
|
|
|
+ card["id"],
|
|
|
+ self.TaskStatus.PROCESSING,
|
|
|
+ self.TaskStatus.INIT,
|
|
|
+ )
|
|
|
+ await self.log_service.log(
|
|
|
+ contents={
|
|
|
+ "card_id": card["id"],
|
|
|
+ "channel": card.get("channel"),
|
|
|
+ "task": "create_card_decode_task",
|
|
|
+ "status": "fail",
|
|
|
+ "message": "unknown channel, rolled back to INIT",
|
|
|
+ }
|
|
|
+ )
|
|
|
+
|
|
|
+ async def deal(self):
|
|
|
+ cards = await self._acquire_cards()
|
|
|
+ if not cards:
|
|
|
+ await self.log_service.log(
|
|
|
+ contents={
|
|
|
+ "task": "create_card_decode_task",
|
|
|
+ "message": "No more cards to decode",
|
|
|
+ }
|
|
|
+ )
|
|
|
+ return
|
|
|
+
|
|
|
+ await self._ensure_cover_ids(cards)
|
|
|
+ await self._submit_and_record(cards)
|
|
|
+ await self.log_service.log(
|
|
|
+ contents={
|
|
|
+ "task": "create_card_decode_task",
|
|
|
+ "message": f"Processed {len(cards)} cards",
|
|
|
+ }
|
|
|
+ )
|
|
|
+
|
|
|
+
|
|
|
+__all__ = ["CreateCardsDecodeTask"]
|