from typing import Dict, List from app.core.database import DatabaseManager from ._const import DecodeArticleConst TABLE = "long_articles_decode_tasks_v2" class ArticlesDecodeTaskMapper(DecodeArticleConst): def __init__(self, pool: DatabaseManager): self.pool = pool async def insert_decode_task( self, channel_content_id: str, content_id: str, source: int, payload: str, remark: str = None, ) -> int: query = f""" INSERT IGNORE INTO {TABLE} (channel_content_id, config_id, content_id, source, payload, remark) VALUES (%s, %s, %s, %s, %s, %s) """ return await self.pool.async_save( query=query, params=( channel_content_id, self.CONFIG_ID, content_id, source, payload, remark, ), ) async def update_task_status_by_channel( self, channel_content_id: str, ori_status: int, new_status: int, remark: str = None, ) -> int: query = f""" UPDATE {TABLE} SET status = %s, remark = %s WHERE channel_content_id = %s AND status = %s AND config_id = %s """ return await self.pool.async_save( query=query, params=(new_status, remark, channel_content_id, ori_status, self.CONFIG_ID), ) async def set_decode_result( self, channel_content_id: str, result: str, remark: str = None, ) -> int: query = f""" UPDATE {TABLE} SET status = %s, result = %s, remark = %s WHERE channel_content_id = %s AND status IN (%s, %s) AND config_id = %s """ return await self.pool.async_save( query=query, params=( self.TaskStatus.SUCCESS, result, remark, channel_content_id, self.TaskStatus.INIT, self.TaskStatus.PROCESSING, self.CONFIG_ID, ), ) async def fetch_pending_tasks( self, source: int = None ) -> List[Dict]: if source is not None: query = f""" SELECT channel_content_id, content_id FROM {TABLE} WHERE status = %s AND source = %s AND config_id = %s LIMIT %s """ params = (self.TaskStatus.INIT, source, self.CONFIG_ID, self.TASK_BATCH) else: query = f""" SELECT channel_content_id, content_id FROM {TABLE} WHERE status = %s AND config_id = %s LIMIT %s """ params = (self.TaskStatus.INIT, self.CONFIG_ID, self.TASK_BATCH) return await self.pool.async_fetch(query=query, params=params) async def fetch_existing_channel_content_ids( self, channel_content_ids: List[str] ) -> set: """批量查询哪些 channel_content_id 已有成功解构结果,用于去重跳过""" if not channel_content_ids: return set() placeholders = ",".join(["%s"] * len(channel_content_ids)) query = f""" SELECT channel_content_id FROM {TABLE} WHERE channel_content_id IN ({placeholders}) AND config_id = %s AND status = %s """ rows = await self.pool.async_fetch( query=query, params=(*channel_content_ids, self.CONFIG_ID, self.TaskStatus.SUCCESS), ) return {r["channel_content_id"] for r in rows} async def fetch_extract_tasks(self) -> List[Dict]: query = f""" SELECT id, result FROM {TABLE} WHERE extract_status = %s AND status = %s AND config_id = %s """ return await self.pool.async_fetch( query=query, params=(self.ExtractStatus.INIT, self.TaskStatus.SUCCESS, self.CONFIG_ID), ) async def update_extract_status( self, task_id: int, ori_status: int, new_status: int ) -> int: query = f""" UPDATE {TABLE} SET extract_status = %s WHERE extract_status = %s AND id = %s """ return await self.pool.async_save( query=query, params=(new_status, ori_status, task_id) ) async def record_extract_detail( self, decode_task_id: int, detail: Dict ) -> int: query = """ INSERT INTO long_articles_decode_task_detail_v2 (decode_task_id, inspiration, purpose, key_point, topic) VALUES (%s, %s, %s, %s, %s) """ return await self.pool.async_save( query=query, params=( decode_task_id, detail.get("inspiration", ""), detail.get("purpose", ""), detail.get("key_point", ""), detail.get("topic", ""), ), ) class AdPlatformArticlesDecodeTaskMapper(ArticlesDecodeTaskMapper): def __init__(self, pool: DatabaseManager): super().__init__(pool) async def update_article_decode_status( self, id_: int, ori_status: int, new_status: int ) -> int: query = """ UPDATE ad_platform_accounts_daily_detail SET decode_status = %s WHERE id = %s AND decode_status = %s """ return await self.pool.async_save( query=query, params=(new_status, id_, ori_status) ) async def fetch_decode_articles(self) -> List[Dict]: query = """ SELECT id, account_name, gh_id, article_title, article_cover, article_text, article_images, wx_sn FROM ad_platform_accounts_daily_detail WHERE fetch_status = %s AND decode_status = %s LIMIT %s """ return await self.pool.async_fetch( query=query, params=(self.TaskStatus.SUCCESS, self.TaskStatus.INIT, self.TASK_BATCH), ) class InnerArticlesDecodeTaskMapper(ArticlesDecodeTaskMapper): TABLE_INNER = "long_articles_decode_articles" def __init__(self, pool: DatabaseManager): super().__init__(pool) async def fetch_inner_articles(self) -> List[Dict]: query = f""" SELECT id, title, source_id, wx_sn, coverimgurl, article_text, summary, card_title FROM {self.TABLE_INNER} WHERE status = %s ORDER BY max_read_rate DESC LIMIT %s """ return await self.pool.async_fetch( query=query, params=(self.TaskStatus.INIT, self.TASK_BATCH) ) async def update_inner_article_status( self, id_: int, ori_status: int, new_status: int ) -> int: query = f""" UPDATE {self.TABLE_INNER} SET status = %s WHERE id = %s AND status = %s """ return await self.pool.async_save( query=query, params=(new_status, id_, ori_status) ) async def fetch_inner_articles_produce_detail( self, source_id ) -> List[Dict]: query = """ SELECT produce_module_type, output FROM produce_plan_module_output WHERE plan_exe_id = %s AND produce_module_type IN (1, 2, 4) """ return await self.pool.async_fetch( query=query, db_name="aigc", params=(source_id,) ) __all__ = [ "ArticlesDecodeTaskMapper", "AdPlatformArticlesDecodeTaskMapper", "InnerArticlesDecodeTaskMapper", ]