from typing import Dict, List from app.core.database import DatabaseManager from ._const import DecodeArticleConst TABLE = "long_articles_decode_tasks" class ArticlesDecodeTaskMapper(DecodeArticleConst): def __init__(self, pool: DatabaseManager): self.pool = pool async def insert_decode_task( self, source_id: str, source: int, payload: str, remark: str = None, status: int = None, ) -> int: if status is not None: query = f""" INSERT IGNORE INTO {TABLE} (source_id, config_id, source, payload, remark, status) VALUES (%s, %s, %s, %s, %s, %s) """ params = ( source_id, self.CONFIG_ID, source, payload, remark, status, ) else: query = f""" INSERT IGNORE INTO {TABLE} (source_id, config_id, source, payload, remark) VALUES (%s, %s, %s, %s, %s) """ params = ( source_id, self.CONFIG_ID, source, payload, remark, ) return await self.pool.async_save(query=query, params=params) async def update_task_status_by_source_id( self, source_id: str, ori_status: int, new_status: int, remark: str = None, ) -> int: query = f""" UPDATE {TABLE} SET status = %s, remark = %s WHERE source_id = %s AND status = %s AND config_id = %s """ return await self.pool.async_save( query=query, params=(new_status, remark, source_id, ori_status, self.CONFIG_ID), ) async def set_decode_result( self, source_id: str, result: str, remark: str = None, ) -> int: query = f""" UPDATE {TABLE} SET status = %s, result = %s, remark = %s WHERE source_id = %s AND status IN (%s, %s) AND config_id = %s """ return await self.pool.async_save( query=query, params=( self.TaskStatus.SUCCESS, result, remark, source_id, self.TaskStatus.INIT, self.TaskStatus.PROCESSING, self.CONFIG_ID, ), ) async def fetch_pending_tasks(self, source: int = None) -> List[Dict]: if source is not None: query = f""" SELECT source_id FROM {TABLE} WHERE status IN (%s, %s) AND source = %s AND config_id = %s LIMIT %s """ params = ( self.TaskStatus.INIT, self.TaskStatus.PROCESSING, source, self.CONFIG_ID, self.TASK_BATCH, ) else: query = f""" SELECT source_id FROM {TABLE} WHERE status IN (%s, %s) AND config_id = %s LIMIT %s """ params = ( self.TaskStatus.INIT, self.TaskStatus.PROCESSING, self.CONFIG_ID, self.TASK_BATCH, ) return await self.pool.async_fetch(query=query, params=params) async def fetch_existing_source_ids(self, source_ids: List[str]) -> set: """批量查询哪些 source_id 已有进行中或成功的解构任务,用于去重跳过""" if not source_ids: return set() placeholders = ",".join(["%s"] * len(source_ids)) query = f""" SELECT source_id FROM {TABLE} WHERE source_id IN ({placeholders}) AND config_id = %s AND status IN (%s, %s, %s) """ rows = await self.pool.async_fetch( query=query, params=( *source_ids, self.CONFIG_ID, self.TaskStatus.INIT, self.TaskStatus.PROCESSING, self.TaskStatus.SUCCESS, ), ) return {r["source_id"] for r in rows} async def fetch_extract_tasks(self) -> List[Dict]: query = f""" SELECT id, result FROM {TABLE} WHERE extract_status = %s AND status = %s AND config_id = %s """ return await self.pool.async_fetch( query=query, params=(self.ExtractStatus.INIT, self.TaskStatus.SUCCESS, self.CONFIG_ID), ) async def update_extract_status( self, task_id: int, ori_status: int, new_status: int ) -> int: query = f""" UPDATE {TABLE} SET extract_status = %s WHERE extract_status = %s AND id = %s """ return await self.pool.async_save( query=query, params=(new_status, ori_status, task_id) ) async def record_extract_detail(self, decode_task_id: int, detail: Dict) -> int: query = """ INSERT INTO long_articles_decode_task_detail (decode_task_id, inspiration, purpose, key_point, topic) VALUES (%s, %s, %s, %s, %s) """ return await self.pool.async_save( query=query, params=( decode_task_id, detail.get("inspiration", ""), detail.get("purpose", ""), detail.get("key_point", ""), detail.get("topic", ""), ), ) class AdPlatformArticlesDecodeTaskMapper(ArticlesDecodeTaskMapper): def __init__(self, pool: DatabaseManager): super().__init__(pool) async def update_article_decode_status( self, id_: int, ori_status: int, new_status: int ) -> int: query = """ UPDATE ad_platform_accounts_daily_detail SET decode_status = %s WHERE id = %s AND decode_status = %s """ return await self.pool.async_save( query=query, params=(new_status, id_, ori_status) ) async def fetch_decode_articles(self) -> List[Dict]: query = """ SELECT id, account_name, gh_id, article_title, article_cover, article_text, article_images, wx_sn FROM ad_platform_accounts_daily_detail WHERE fetch_status = %s AND decode_status = %s LIMIT %s """ return await self.pool.async_fetch( query=query, params=(self.TaskStatus.SUCCESS, self.TaskStatus.INIT, self.TASK_BATCH), ) class InnerArticlesDecodeTaskMapper(ArticlesDecodeTaskMapper): TABLE_INNER = "long_articles_decode_articles" def __init__(self, pool: DatabaseManager): super().__init__(pool) async def fetch_inner_articles(self) -> List[Dict]: query = f""" SELECT id, title, source_id, coverimgurl, article_text, summary, card_title FROM {self.TABLE_INNER} WHERE status = %s LIMIT %s """ return await self.pool.async_fetch( query=query, params=(self.TaskStatus.INIT, self.TASK_BATCH) ) async def update_inner_article_status( self, id_: int, ori_status: int, new_status: int ) -> int: query = f""" UPDATE {self.TABLE_INNER} SET status = %s WHERE id = %s AND status = %s """ return await self.pool.async_save( query=query, params=(new_status, id_, ori_status) ) async def fetch_inner_articles_produce_detail(self, source_id) -> List[Dict]: query = """ SELECT produce_module_type, output FROM produce_plan_module_output WHERE plan_exe_id = %s AND produce_module_type IN (1, 2, 4) """ return await self.pool.async_fetch( query=query, db_name="aigc", params=(source_id,) ) __all__ = [ "ArticlesDecodeTaskMapper", "AdPlatformArticlesDecodeTaskMapper", "InnerArticlesDecodeTaskMapper", ]