| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081 |
- from app.core.database import DatabaseManager
- from ._const import VideoDecodeConst
- class VideoDecodeMapper(VideoDecodeConst):
- def __init__(self, pool: DatabaseManager):
- self.pool = pool
- async def fetch_video_source_content(self, root_source_id: str):
- query = """
- SELECT content_id, gh_id, video_id, trace_id
- FROM long_articles_root_source_id
- WHERE root_source_id = %s;
- """
- return await self.pool.async_fetch(query=query, params=(root_source_id,))
- async def fetch_video_match_result_v1(self, gh_id: str, content_id: str):
- query = """
- SELECT response FROM long_articles_match_videos WHERE gh_id = %s AND content_id = %s;
- """
- return await self.pool.async_fetch(
- query=query,
- params=(
- gh_id,
- content_id,
- ),
- )
- async def fetch_video_match_result_v2(self, trace_id: str):
- query = """
- SELECT response FROM long_articles_match_videos WHERE trace_id = %s
- """
- return await self.pool.async_fetch(query=query, params=(trace_id,))
- async def save_video_to_decode_data(self, data: tuple):
- """
- 存储数据到 video_decode_data
- """
- query = """
- INSERT IGNORE INTO video_decode_data
- (video_id, channel, hot_scene_type, video_path, title, root_source_id, dt)
- VALUES
- (%s, %s, %s, %s, %s, %s, %s);
- """
- return await self.pool.async_save(
- query=query,
- params=data,
- )
- async def update_video_decode_data_status(self, video_id, ori_status, new_status):
- query = """
- UPDATE video_decode_data
- SET status = %s
- WHERE video_id = %s AND status = %s;
- """
- affected_rows = await self.pool.async_save(
- query=query,
- params=(new_status, video_id, ori_status),
- )
- return bool(affected_rows)
- async def insert_into_decode_task_queue(self, data: tuple):
- query = """
- INSERT IGNORE INTO video_decode_queue
- (video_path, sample_video_id, dt)
- VALUES
- (%s, %s, %s);
- """
- return await self.pool.async_save(
- query=query,
- params=data,
- )
- async def is_video_decoded(self, video_id: int):
- query = """
- SELECT vid FROM aigc_topic_decode_task_result WHERE vid = %s AND status = %s;
- """
- return await self.pool.async_fetch(
- query=query, db_name="content_decode", params=(video_id, self.TaskStatus.SUCCESS)
- )
|