import json from collections import defaultdict from typing import Dict, List from app.core.database import DatabaseManager from app.core.observability import LogService from ._const import DecodeCardConst from ._mapper import CardDecodeTaskMapper from ._utils import CardDecodeUtils class CreateCardsDecodeTask(DecodeCardConst): def __init__(self, pool: DatabaseManager, log_service: LogService): self.pool = pool self.log_service = log_service self.mapper = CardDecodeTaskMapper(self.pool) self.tool = CardDecodeUtils() async def _acquire_cards(self) -> List[Dict]: """获取待解构卡片并加锁(status INIT → PROCESSING)""" cards = await self.mapper.fetch_cards() locked = [] for card in cards: card_id = card["id"] acquired = await self.mapper.update_card_status( card_id, self.TaskStatus.INIT, self.TaskStatus.PROCESSING ) if acquired: locked.append(card) else: await self.log_service.log( contents={ "card_id": card_id, "task": "create_card_decode_task", "status": "skip", "message": "acquire lock failed", } ) return locked async def _ensure_cover_ids(self, cards: List[Dict]): """为缺少 card_cover_id 的卡片下载封面并计算 MD5""" for card in cards: if card.get("card_cover_id"): continue if not card.get("share_cover"): continue # 空封面留给 _submit_and_record 标记 FAILED try: cover_id = self.tool.compute_cover_md5(card["share_cover"]) await self.mapper.set_card_cover_id(card["id"], cover_id) card["card_cover_id"] = cover_id await self.log_service.log( contents={ "card_id": card["id"], "task": "create_card_decode_task", "status": "info", "message": f"computed cover_id={cover_id}", } ) except Exception as e: await self.mapper.update_card_status( card["id"], self.TaskStatus.PROCESSING, self.TaskStatus.FAILED, ) await self.log_service.log( contents={ "card_id": card["id"], "share_cover": card.get("share_cover"), "task": "create_card_decode_task", "status": "fail", "message": f"compute cover md5 failed, marked as FAILED: {e}", } ) @staticmethod def _group_cards_by_channel(cards: List[Dict]) -> Dict[int, List[Dict]]: """按 auto_reply_top_cards_daily.channel 分组,映射到对应的 config_id""" grouped = defaultdict(list) for card in cards: source_channel = card.get("channel", "") config_id = DecodeCardConst.CHANNEL_CONFIG_MAP.get(source_channel) if config_id: grouped[config_id].append(card) return dict(grouped) async def _submit_and_record_for_config( self, cards: List[Dict], config_id: int, source: int ): """对同一 config_id 的卡片执行提交与落库""" if not cards: return # 过滤已有任务记录的卡片(按 card_cover_id 去重) cards_with_cid = [c for c in cards if c.get("card_cover_id")] if not cards_with_cid: return # 跨批次去重:查 DB 已有任务 all_source_ids = [str(c["card_cover_id"]) for c in cards_with_cid] existing = await self.mapper.fetch_existing_source_ids( all_source_ids, config_id, source ) # 同批次去重:相同 card_cover_id 只保留第一条 seen = set() deduped_cards = [] intra_dups = [] for c in cards_with_cid: cid = str(c["card_cover_id"]) if cid in existing or cid in seen: intra_dups.append(c) else: seen.add(cid) deduped_cards.append(c) if intra_dups: await self.log_service.log( contents={ "task": "create_card_decode_task", "config_id": config_id, "message": f"Skipped {len(intra_dups)} duplicate cards (already submitted or same-batch)", } ) for card in intra_dups: await self.mapper.update_card_status( card["id"], self.TaskStatus.PROCESSING, self.TaskStatus.SUCCESS, ) new_cards = deduped_cards if not new_cards: return posts = self.tool.prepare_posts(new_cards) submit_results = await self.tool.submit_decode_batch(posts, config_id=config_id) posts_by_cid = {p["channelContentId"]: p for p in posts} for card in new_cards: source_id = str(card["card_cover_id"]) card_id = card["id"] result = submit_results.get(source_id) if not result: await self.mapper.update_card_status( card_id, self.TaskStatus.PROCESSING, self.TaskStatus.INIT ) await self.log_service.log( contents={ "card_id": card_id, "source_id": source_id, "config_id": config_id, "task": "create_card_decode_task", "status": "fail", "message": "no response for source_id, rolled back to INIT", } ) continue status = result.get("status") if status == self.SubmitStatus.FAILED: await self.mapper.update_card_status( card_id, self.TaskStatus.PROCESSING, self.TaskStatus.INIT ) await self.log_service.log( contents={ "card_id": card_id, "source_id": source_id, "config_id": config_id, "task": "create_card_decode_task", "status": "fail", "data": result, } ) continue if status == self.SubmitStatus.SUCCESS: query_results = await self.tool.query_decode_results_batch( [source_id], config_id=config_id ) result_data = query_results.get(source_id) if ( result_data and result_data.get("status") == self.QueryStatus.SUCCESS ): data_content = result_data.get("dataContent") or "{}" html = result_data.get("html") await self.mapper.insert_decode_task( source_id=source_id, config_id=config_id, source=source, payload=json.dumps( posts_by_cid.get(source_id, {}), ensure_ascii=False ), remark="提交时已有解构结果,直接落库", ) await self.mapper.set_decode_result( source_id=source_id, config_id=config_id, result=json.dumps( {"dataContent": data_content, "html": html}, ensure_ascii=False, ), remark="提交时已返回 SUCCESS,结果已落库", ) await self.mapper.update_card_status( card_id, self.TaskStatus.PROCESSING, self.TaskStatus.SUCCESS ) await self.log_service.log( contents={ "card_id": card_id, "source_id": source_id, "config_id": config_id, "task": "create_card_decode_task", "status": "success", "message": "decode result already available on submit", } ) else: await self.mapper.insert_decode_task( source_id=source_id, config_id=config_id, source=source, payload=json.dumps( posts_by_cid.get(source_id, {}), ensure_ascii=False ), remark="提交返回SUCCESS,查询未果,等待轮询", status=self.TaskStatus.PROCESSING, ) await self.mapper.update_card_status( card_id, self.TaskStatus.PROCESSING, self.TaskStatus.SUCCESS ) await self.log_service.log( contents={ "card_id": card_id, "source_id": source_id, "config_id": config_id, "task": "create_card_decode_task", "status": "pending", "message": "submit SUCCESS but query not ready, inserted for polling", } ) elif status == self.SubmitStatus.PENDING: await self.mapper.insert_decode_task( source_id=source_id, config_id=config_id, source=source, payload=json.dumps( posts_by_cid.get(source_id, {}), ensure_ascii=False ), remark="卡片解构任务已提交,等待轮询", status=self.TaskStatus.PROCESSING, ) await self.mapper.update_card_status( card_id, self.TaskStatus.PROCESSING, self.TaskStatus.SUCCESS ) await self.log_service.log( contents={ "card_id": card_id, "source_id": source_id, "config_id": config_id, "task": "create_card_decode_task", "status": "pending", "message": "task submitted, waiting for polling", } ) else: await self.mapper.update_card_status( card_id, self.TaskStatus.PROCESSING, self.TaskStatus.INIT ) await self.log_service.log( contents={ "card_id": card_id, "source_id": source_id, "config_id": config_id, "task": "create_card_decode_task", "status": "fail", "message": f"unexpected submit status: {status}, rolled back to INIT", "data": result, } ) async def _submit_and_record(self, cards: List[Dict]): if not cards: return # 无 share_cover 的卡片直接标记失败 valid_cards = [] for card in cards: if not card.get("share_cover"): await self.mapper.update_card_status( card["id"], self.TaskStatus.PROCESSING, self.TaskStatus.FAILED, ) await self.log_service.log( contents={ "card_id": card["id"], "task": "create_card_decode_task", "status": "fail", "message": "share_cover is empty, marked as FAILED", } ) else: valid_cards.append(card) if not valid_cards: return grouped = self._group_cards_by_channel(valid_cards) for config_id, group_cards in grouped.items(): source_channel = group_cards[0]["channel"] source = self.CHANNEL_SOURCE_MAP[source_channel] await self._submit_and_record_for_config(group_cards, config_id, source) # 处理不在映射表中的卡片(回滚状态) mapped_ids = {id(c) for g in grouped.values() for c in g} for card in valid_cards: if id(card) not in mapped_ids: await self.mapper.update_card_status( card["id"], self.TaskStatus.PROCESSING, self.TaskStatus.INIT, ) await self.log_service.log( contents={ "card_id": card["id"], "channel": card.get("channel"), "task": "create_card_decode_task", "status": "fail", "message": "unknown channel, rolled back to INIT", } ) async def deal(self): cards = await self._acquire_cards() if not cards: await self.log_service.log( contents={ "task": "create_card_decode_task", "message": "No more cards to decode", } ) return await self._ensure_cover_ids(cards) await self._submit_and_record(cards) await self.log_service.log( contents={ "task": "create_card_decode_task", "message": f"Processed {len(cards)} cards", } ) __all__ = ["CreateCardsDecodeTask"]