from typing import Dict, List from app.core.database import DatabaseManager from ._const import DecodeCardConst TABLE_SOURCE = "auto_reply_top_cards_daily" TABLE_TASK = "long_articles_decode_tasks" class CardDecodeTaskMapper(DecodeCardConst): """卡片解构 Mapper — 操作 auto_reply_top_cards_daily 与 long_articles_decode_tasks""" def __init__(self, pool: DatabaseManager): self.pool = pool # ——— auto_reply_top_cards_daily ——— async def fetch_cards(self) -> List[Dict]: """获取待解构卡片:status=INIT""" query = f""" SELECT id, channel, share_cover, share_title, card_cover_id FROM {TABLE_SOURCE} WHERE status = %s LIMIT %s """ return await self.pool.async_fetch( query=query, params=(self.TaskStatus.INIT, self.TASK_BATCH) ) async def update_card_status( self, id_: int, ori_status: int, new_status: int ) -> int: """更新卡片解构状态(乐观锁)""" query = f""" UPDATE {TABLE_SOURCE} SET status = %s WHERE id = %s AND status = %s """ return await self.pool.async_save( query=query, params=(new_status, id_, ori_status) ) async def set_card_cover_id(self, id_: int, cover_id: str) -> int: """回填 card_cover_id""" query = f""" UPDATE {TABLE_SOURCE} SET card_cover_id = %s WHERE id = %s """ return await self.pool.async_save(query=query, params=(cover_id, id_)) # ——— long_articles_decode_tasks ——— async def insert_decode_task( self, source_id: str, config_id: int, source: int, payload: str, remark: str = None, status: int = None, ) -> int: if status is not None: query = f""" INSERT IGNORE INTO {TABLE_TASK} (source_id, config_id, source, channel, payload, remark, status) VALUES (%s, %s, %s, %s, %s, %s, %s) """ params = ( source_id, config_id, source, self.TaskChannel.PARTNER_CARD, payload, remark, status, ) else: query = f""" INSERT IGNORE INTO {TABLE_TASK} (source_id, config_id, source, channel, payload, remark) VALUES (%s, %s, %s, %s, %s, %s) """ params = ( source_id, config_id, source, self.TaskChannel.PARTNER_CARD, payload, remark, ) return await self.pool.async_save(query=query, params=params) async def set_decode_result( self, source_id: str, config_id: int, result: str, remark: str = None, ) -> int: query = f""" UPDATE {TABLE_TASK} SET status = %s, result = %s, remark = %s WHERE source_id = %s AND status IN (%s, %s) AND config_id = %s """ return await self.pool.async_save( query=query, params=( self.TaskStatus.SUCCESS, result, remark, source_id, self.TaskStatus.INIT, self.TaskStatus.PROCESSING, config_id, ), ) async def fetch_pending_tasks(self) -> List[Dict]: query = f""" SELECT source_id, config_id FROM {TABLE_TASK} WHERE status IN (%s, %s) AND source IN (%s, %s) ORDER BY config_id LIMIT %s """ return await self.pool.async_fetch( query=query, params=( self.TaskStatus.INIT, self.TaskStatus.PROCESSING, self.SourceType.PARTNER_CARD_TOULIU, self.SourceType.PARTNER_CARD_COOPERATE, self.TASK_BATCH, ), ) async def update_task_status_by_source_id( self, source_id: str, config_id: int, new_status: int, remark: str = None, ) -> int: query = f""" UPDATE {TABLE_TASK} SET status = %s, remark = %s WHERE source_id = %s AND status IN (%s, %s) AND config_id = %s """ return await self.pool.async_save( query=query, params=( new_status, remark, source_id, self.TaskStatus.INIT, self.TaskStatus.PROCESSING, config_id, ), ) async def fetch_existing_source_ids( self, source_ids: List[str], config_id: int, source: int ) -> set: """批量查询已有任务记录的 source_id,用于去重跳过""" if not source_ids: return set() placeholders = ",".join(["%s"] * len(source_ids)) query = f""" SELECT source_id FROM {TABLE_TASK} WHERE source_id IN ({placeholders}) AND config_id = %s AND source = %s AND status IN (%s, %s, %s, %s) """ return { r["source_id"] for r in await self.pool.async_fetch( query=query, params=( *source_ids, config_id, source, self.TaskStatus.INIT, self.TaskStatus.PROCESSING, self.TaskStatus.SUCCESS, self.TaskStatus.FAILED, ), ) } __all__ = ["CardDecodeTaskMapper"]