from typing import List, Dict from app.core.database import DatabaseManager from ._const import InnerArticlesDecodeConst class InnerArticlesDecodeMapper(InnerArticlesDecodeConst): def __init__(self, pool: DatabaseManager): self.pool = pool # 存储解构任务 async def record_decode_task( self, task_id: str, wx_sn: str, remark: str = None ) -> int: query = """ INSERT IGNORE INTO long_articles_decode_tasks (task_id, wx_sn, remark, source) VALUES (%s, %s, %s, %s) """ return await self.pool.async_save( query=query, params=(task_id, wx_sn, remark, self.SourceType.INNER) ) # 更新解构任务状态 async def update_decode_task_status( self, task_id: str, ori_status: int, new_status: int, remark: str = None ) -> int: query = """ UPDATE long_articles_decode_tasks SET status = %s, remark = %s WHERE task_id = %s AND status = %s; """ return await self.pool.async_save( query=query, params=(new_status, remark, task_id, ori_status) ) # 设置解构结果 async def set_decode_result( self, task_id: str, result: str, remark: str = None ) -> int: query = """ UPDATE long_articles_decode_tasks SET status = %s, remark = %s, result = %s WHERE task_id = %s AND status = %s; """ return await self.pool.async_save( query=query, params=( self.TaskStatus.SUCCESS, remark, result, task_id, self.TaskStatus.PROCESSING, ), ) # 获取内部文章生成信息 async def fetch_inner_articles_produce_detail(self, source_id) -> List[Dict]: query = """ SELECT produce_module_type, output FROM produce_plan_module_output WHERE plan_exe_id = %s AND produce_module_type in (1,2,3,4,18); """ return await self.pool.async_fetch( query=query, db_name="aigc", params=(source_id,) ) # 获取待拉取结果的解构任务(status=INIT,尚未拿到解构结果) async def fetch_decoding_tasks(self) -> List[Dict]: query = """ SELECT task_id FROM long_articles_decode_tasks WHERE status = %s AND source = %s LIMIT %s; """ return await self.pool.async_fetch( query=query, params=(self.TaskStatus.INIT, self.SourceType.INNER, self.TASK_BATCH) ) # 获取待解析的任务(获取处理成功的任务) async def fetch_extract_tasks(self): query = """ SELECT id, result FROM long_articles_decode_tasks WHERE extract_status = %s AND status = %s; """ return await self.pool.async_fetch( query=query, params=(self.ExtractStatus.INIT, self.TaskStatus.SUCCESS) ) # 修改解析状态(用于加锁与状态流转) async def update_extract_status(self, task_id, ori_status, new_status): query = """ UPDATE long_articles_decode_tasks SET extract_status = %s WHERE extract_status = %s AND id = %s; """ return await self.pool.async_save( query=query, params=( new_status, ori_status, task_id, ), ) # 记录解析结果明细到 long_articles_decode_task_detail async def record_extract_detail(self, decode_task_id: int, detail: Dict) -> int: query = """ INSERT INTO long_articles_decode_task_detail (decode_task_id, inspiration, purpose, key_point, topic) VALUES (%s, %s, %s, %s, %s); """ return await self.pool.async_save( query=query, params=( decode_task_id, detail.get("inspiration", ""), detail.get("purpose", ""), detail.get("key_point", ""), detail.get("topic", ""), ), ) # 获取内部文章 async def fetch_inner_articles(self): query = """ SELECT source_id, title, wx_sn FROM datastat_sort_strategy WHERE date_str >= '20260101' AND account_type != '服务号' AND position = 1 AND source_id is not null GROUP BY source_id HAVING sum(view_count) / sum(avg_view_count) >= 1.2 AND min(read_rate) >= 0.2; """ return await self.pool.async_fetch(query=query)