from typing import List, Dict from app.core.database import DatabaseManager from ._const import DecodeTaskConst class ArticlesDecodeTaskMapper(DecodeTaskConst): DECODE_TASK_QUEUE = "long_articles_new_decode_tasks" INNER_DECODE_CREATE_STATE = "long_articles_inner_decode_create_state" def __init__(self, pool: DatabaseManager): self.pool = pool # 存储解构任务 async def record_decode_task( self, task_id: str, content_id: str, task_type: int, payload: str, remark: str = None ) -> int: query = f""" INSERT INTO {self.DECODE_TASK_QUEUE} (task_id, content_id, task_type, payload, remark) VALUES (%s, %s, %s, %s, %s) """ return await self.pool.async_save(query=query, params=(task_id, content_id, task_type, payload, remark)) async def record_decode_task_if_absent( self, task_id: str, content_id: str, task_type: int, payload: str, remark: str = None ) -> int: query = f""" INSERT IGNORE INTO {self.DECODE_TASK_QUEUE} (task_id, content_id, task_type, payload, remark) VALUES (%s, %s, %s, %s, %s) """ return await self.pool.async_save( query=query, params=(task_id, content_id, task_type, payload, remark) ) # 更新解构任务状态 async def update_decode_task_status( self, task_id: str, ori_status: int, new_status: int, remark: str = None ) -> int: query = f""" UPDATE {self.DECODE_TASK_QUEUE} SET status = %s, remark = %s WHERE task_id = %s AND status = %s; """ return await self.pool.async_save( query=query, params=(new_status, remark, task_id, ori_status) ) # 设置解构结果 async def set_decode_result( self, task_id: str, result: str, remark: str = None ) -> int: query = f""" UPDATE {self.DECODE_TASK_QUEUE} SET status = %s, remark = %s, result = %s WHERE task_id = %s AND status = %s; """ return await self.pool.async_save( query=query, params=( self.TaskStatus.SUCCESS, remark, result, task_id, self.TaskStatus.PROCESSING, ), ) # 获取待拉取结果的解构任务(status=INIT,尚未拿到解构结果) async def fetch_decoding_tasks(self) -> List[Dict]: query = f""" SELECT task_id FROM {self.DECODE_TASK_QUEUE} WHERE status = %s LIMIT %s; """ return await self.pool.async_fetch( query=query, params=(self.TaskStatus.INIT, self.TASK_BATCH) ) # 获取待解析的任务(获取处理成功的任务) async def fetch_extract_tasks(self): query = f""" SELECT id, result FROM {self.DECODE_TASK_QUEUE} WHERE extract_status = %s AND status = %s; """ return await self.pool.async_fetch( query=query, params=(self.ExtractStatus.INIT, self.TaskStatus.SUCCESS) ) # 修改解析状态(用于加锁与状态流转) async def update_extract_status(self, task_id, ori_status, new_status): query = f""" UPDATE {self.DECODE_TASK_QUEUE} SET extract_status = %s WHERE extract_status = %s AND id = %s; """ return await self.pool.async_save( query=query, params=( new_status, ori_status, task_id, ), ) # 记录解析结果明细到 long_articles_decode_task_detail async def record_extract_detail(self, decode_task_id: int, detail: Dict) -> int: query = """ INSERT INTO long_articles_decode_task_detail (decode_task_id, inspiration, purpose, key_point, topic) VALUES (%s, %s, %s, %s, %s); """ return await self.pool.async_save( query=query, params=( decode_task_id, detail.get("inspiration", ""), detail.get("purpose", ""), detail.get("key_point", ""), detail.get("topic", ""), ), ) # 判断是否存在相同的任务 id async def fetch_exist_source_id(self, source_id, task_type): query = f""" SELECT id FROM {self.DECODE_TASK_QUEUE} WHERE content_id = %s AND task_type = %s; """ return await self.pool.async_fetch( query=query, params=(source_id, task_type) ) class AdPlatformArticlesDecodeTaskMapper(ArticlesDecodeTaskMapper): def __init__(self, pool: DatabaseManager): super().__init__(pool) async def record_decode_task( self, task_id: str, wx_sn: str, remark: str = None ) -> int: return await super().record_decode_task( task_id=task_id, content_id=wx_sn, task_type=self.TaskType.SOURCE_IMAGES_TEXT, payload="{}", remark=remark, ) # 修改文章解构状态 async def update_article_decode_status( self, id_: int, ori_status: int, new_status: int ) -> int: query = """ UPDATE ad_platform_accounts_daily_detail SET decode_status = %s WHERE id = %s AND decode_status = %s; """ return await self.pool.async_save( query=query, params=(new_status, id_, ori_status) ) # 获取待解构文章 async def fetch_decode_articles(self) -> List[Dict]: query = """ SELECT id, account_name, gh_id, article_title, article_cover, article_text, article_images, wx_sn FROM ad_platform_accounts_daily_detail WHERE fetch_status = %s AND decode_status = %s LIMIT %s; """ return await self.pool.async_fetch( query=query, params=(self.TaskStatus.SUCCESS, self.TaskStatus.INIT, self.TASK_BATCH), ) class InnerArticlesDecodeTaskMapper(ArticlesDecodeTaskMapper): def __init__(self, pool: DatabaseManager): super().__init__(pool) # 获取内部文章 async def fetch_inner_articles(self, date_string="20260401"): query = """ SELECT title, source_id, wx_sn, cover_img_url FROM long_articles_good_read_article WHERE dt = %s AND source_id IN ('20260222161421405412536', '20241011051312526697231', '20241125113847098601958', '20241011042712648349812') ORDER by max_read_rate DESC LIMIT %s; """ return await self.pool.async_fetch( query=query, params=(date_string, 20) ) # 获取内部文章生成信息 async def fetch_inner_articles_produce_detail(self, source_id) -> List[Dict]: query = """ SELECT produce_module_type, output FROM produce_plan_module_output WHERE plan_exe_id = %s AND produce_module_type in (1,2,3,4,18); """ return await self.pool.async_fetch( query=query, db_name="aigc", params=(source_id,) ) # 获取文章源信息 async def fetch_article_crawler_source_info(self, source_id: str): query = """ SELECT t2.channel_content_id, t3.body_text FROM produce_plan_exe_record t1 LEFT JOIN produce_plan_exe_refer_content t2 ON t2.plan_exe_id = t1.plan_exe_id LEFT JOIN crawler_content_blob t3 ON t3.channel_content_id = t2.channel_content_id WHERE t1.plan_exe_id = %s; """ return await self.pool.async_fetch( query=query, db_name="aigc", params=(source_id,) ) async def init_create_state(self, source_id: str, task_type: int, now_ts: int): query = f""" INSERT IGNORE INTO {self.INNER_DECODE_CREATE_STATE} (source_id, task_type, status, retry_count, locked_at, created_at, updated_at) VALUES (%s, %s, %s, %s, %s, %s, %s); """ return await self.pool.async_save( query=query, params=( source_id, task_type, self.TaskStatus.INIT, 0, 0, now_ts, now_ts, ), ) async def fetch_create_state(self, source_id: str, task_type: int): query = f""" SELECT source_id, task_type, status, retry_count, locked_at, remote_task_id, last_error FROM {self.INNER_DECODE_CREATE_STATE} WHERE source_id = %s AND task_type = %s LIMIT 1; """ rows = await self.pool.async_fetch(query=query, params=(source_id, task_type)) if not rows: return None return rows[0] async def acquire_create_lock( self, source_id: str, task_type: int, now_ts: int, max_retry_times: int, lock_expire_before: int, ): query = f""" UPDATE {self.INNER_DECODE_CREATE_STATE} SET status = %s, locked_at = %s, updated_at = %s, last_error = NULL WHERE source_id = %s AND task_type = %s AND ( status = %s OR (status = %s AND retry_count < %s) OR (status = %s AND locked_at > 0 AND locked_at < %s) ); """ return await self.pool.async_save( query=query, params=( self.TaskStatus.PROCESSING, now_ts, now_ts, source_id, task_type, self.TaskStatus.INIT, self.TaskStatus.FAILED, max_retry_times, self.TaskStatus.PROCESSING, lock_expire_before, ), ) async def mark_create_success( self, source_id: str, task_type: int, remote_task_id: str, now_ts: int, remark: str = None, ): query = f""" UPDATE {self.INNER_DECODE_CREATE_STATE} SET status = %s, remote_task_id = %s, last_error = %s, locked_at = 0, updated_at = %s WHERE source_id = %s AND task_type = %s AND status = %s; """ return await self.pool.async_save( query=query, params=( self.TaskStatus.SUCCESS, remote_task_id, remark, now_ts, source_id, task_type, self.TaskStatus.PROCESSING, ), ) async def mark_create_retry( self, source_id: str, task_type: int, now_ts: int, error_message: str, ): query = f""" UPDATE {self.INNER_DECODE_CREATE_STATE} SET status = %s, retry_count = retry_count + 1, last_error = %s, locked_at = 0, updated_at = %s WHERE source_id = %s AND task_type = %s AND status = %s; """ return await self.pool.async_save( query=query, params=( self.TaskStatus.INIT, error_message, now_ts, source_id, task_type, self.TaskStatus.PROCESSING, ), ) async def mark_create_failed( self, source_id: str, task_type: int, now_ts: int, error_message: str, ): query = f""" UPDATE {self.INNER_DECODE_CREATE_STATE} SET status = %s, retry_count = retry_count + 1, last_error = %s, locked_at = 0, updated_at = %s WHERE source_id = %s AND task_type = %s AND status = %s; """ return await self.pool.async_save( query=query, params=( self.TaskStatus.FAILED, error_message, now_ts, source_id, task_type, self.TaskStatus.PROCESSING, ), ) __all__ = [ "ArticlesDecodeTaskMapper", "AdPlatformArticlesDecodeTaskMapper", "InnerArticlesDecodeTaskMapper", ]