from app.core.database import DatabaseManager from ._const import VideoDecodeConst class VideoDecodeMapper(VideoDecodeConst): def __init__(self, pool: DatabaseManager): self.pool = pool async def fetch_video_source_content(self, root_source_id: str): query = """ SELECT content_id, gh_id, video_id, trace_id FROM long_articles_root_source_id WHERE root_source_id = %s; """ return await self.pool.async_fetch(query=query, params=(root_source_id,)) async def fetch_video_match_result_v1(self, gh_id: str, content_id: str): query = """ SELECT response FROM long_articles_match_videos WHERE gh_id = %s AND content_id = %s; """ return await self.pool.async_fetch( query=query, params=( gh_id, content_id, ), ) async def fetch_video_match_result_v2(self, trace_id: str): query = """ SELECT response FROM long_articles_match_videos WHERE trace_id = %s """ return await self.pool.async_fetch(query=query, params=(trace_id,)) async def save_video_to_decode_data(self, data: tuple): """ 存储数据到 video_decode_data """ query = """ INSERT IGNORE INTO video_decode_data (video_id, channel, hot_scene_type, video_path, title, root_source_id, dt) VALUES (%s, %s, %s, %s, %s, %s, %s); """ return await self.pool.async_save( query=query, params=data, ) async def update_video_decode_data_status(self, video_id, ori_status, new_status): query = """ UPDATE video_decode_data SET status = %s WHERE video_id = %s AND status = %s; """ affected_rows = await self.pool.async_save( query=query, params=(new_status, video_id, ori_status), ) return bool(affected_rows) async def insert_into_decode_task_queue(self, data: tuple): query = """ INSERT IGNORE INTO video_decode_queue (video_path, sample_video_id, dt) VALUES (%s, %s, %s); """ return await self.pool.async_save( query=query, params=data, ) async def is_video_decoded(self, video_id: int): query = """ SELECT vid FROM aigc_topic_decode_task_result WHERE vid = %s AND status = %s; """ return await self.pool.async_fetch( query=query, db_name="content_decode", params=(video_id, self.TaskStatus.SUCCESS) )