from typing import Dict, List from app.core.database import DatabaseManager from ._const import DecodeMaterialConst TABLE_SOURCE = "growth_daily_material" TABLE_TASK = "long_articles_decode_tasks" class MaterialDecodeTaskMapper(DecodeMaterialConst): """素材解构 Mapper — 操作 growth_daily_material 与 long_articles_decode_tasks""" def __init__(self, pool: DatabaseManager): self.pool = pool # ——— growth_daily_material ——— async def fetch_materials(self) -> List[Dict]: """获取待解构素材:status=INIT""" query = f""" SELECT id, material_id, material_title, material_cover FROM {TABLE_SOURCE} WHERE status = %s LIMIT %s """ return await self.pool.async_fetch( query=query, params=(self.TaskStatus.INIT, self.TASK_BATCH) ) async def update_material_status( self, id_: int, ori_status: int, new_status: int ) -> int: """更新素材解构状态(乐观锁)""" query = f""" UPDATE {TABLE_SOURCE} SET status = %s WHERE id = %s AND status = %s """ return await self.pool.async_save( query=query, params=(new_status, id_, ori_status) ) # ——— long_articles_decode_tasks ——— async def insert_decode_task( self, source_id: str, payload: str, remark: str = None, status: int = None, ) -> int: if status is not None: query = f""" INSERT IGNORE INTO {TABLE_TASK} (source_id, config_id, source, channel, payload, remark, status) VALUES (%s, %s, %s, %s, %s, %s, %s) """ params = ( source_id, self.CONFIG_ID, self.SourceType.MATERIAL, self.TaskChannel.MATERIAL, payload, remark, status, ) else: query = f""" INSERT IGNORE INTO {TABLE_TASK} (source_id, config_id, source, channel, payload, remark) VALUES (%s, %s, %s, %s, %s, %s) """ params = ( source_id, self.CONFIG_ID, self.SourceType.MATERIAL, self.TaskChannel.MATERIAL, payload, remark, ) return await self.pool.async_save(query=query, params=params) async def set_decode_result( self, source_id: str, result: str, remark: str = None, ) -> int: query = f""" UPDATE {TABLE_TASK} SET status = %s, result = %s, remark = %s WHERE source_id = %s AND status IN (%s, %s) AND config_id = %s """ return await self.pool.async_save( query=query, params=( self.TaskStatus.SUCCESS, result, remark, source_id, self.TaskStatus.INIT, self.TaskStatus.PROCESSING, self.CONFIG_ID, ), ) async def fetch_pending_tasks(self) -> List[Dict]: query = f""" SELECT source_id FROM {TABLE_TASK} WHERE status IN (%s, %s) AND source = %s AND config_id = %s LIMIT %s """ return await self.pool.async_fetch( query=query, params=( self.TaskStatus.INIT, self.TaskStatus.PROCESSING, self.SourceType.MATERIAL, self.CONFIG_ID, self.TASK_BATCH, ), ) async def update_task_status_by_source_id( self, source_id: str, new_status: int, remark: str = None, ) -> int: query = f""" UPDATE {TABLE_TASK} SET status = %s, remark = %s WHERE source_id = %s AND status IN (%s, %s) AND config_id = %s """ return await self.pool.async_save( query=query, params=( new_status, remark, source_id, self.TaskStatus.INIT, self.TaskStatus.PROCESSING, self.CONFIG_ID, ), ) async def fetch_existing_source_ids(self, source_ids: List[str]) -> set: """批量查询已有任务记录的 source_id,用于去重跳过""" if not source_ids: return set() placeholders = ",".join(["%s"] * len(source_ids)) query = f""" SELECT source_id FROM {TABLE_TASK} WHERE source_id IN ({placeholders}) AND config_id = %s AND source = %s AND status IN (%s, %s, %s, %s) """ return { r["source_id"] for r in await self.pool.async_fetch( query=query, params=( *source_ids, self.CONFIG_ID, self.SourceType.MATERIAL, self.TaskStatus.INIT, self.TaskStatus.PROCESSING, self.TaskStatus.SUCCESS, self.TaskStatus.FAILED, ), ) } __all__ = ["MaterialDecodeTaskMapper"]