_mapper.py 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
  1. from app.core.database import DatabaseManager
  2. from ._const import VideoDecodeConst
  3. class VideoDecodeMapper(VideoDecodeConst):
  4. def __init__(self, pool: DatabaseManager):
  5. self.pool = pool
  6. async def fetch_video_source_content(self, root_source_id: str):
  7. query = """
  8. SELECT content_id, gh_id, video_id, trace_id
  9. FROM long_articles_root_source_id
  10. WHERE root_source_id = %s;
  11. """
  12. return await self.pool.async_fetch(query=query, params=(root_source_id,))
  13. async def fetch_video_match_result_v1(self, gh_id: str, content_id: str):
  14. query = """
  15. SELECT response FROM long_articles_match_videos WHERE gh_id = %s AND content_id = %s;
  16. """
  17. return await self.pool.async_fetch(
  18. query=query,
  19. params=(
  20. gh_id,
  21. content_id,
  22. ),
  23. )
  24. async def fetch_video_match_result_v2(self, trace_id: str):
  25. query = """
  26. SELECT response FROM long_articles_match_videos WHERE trace_id = %s
  27. """
  28. return await self.pool.async_fetch(query=query, params=(trace_id,))
  29. async def save_video_to_decode_data(self, data: tuple):
  30. """
  31. 存储数据到 video_decode_data
  32. """
  33. query = """
  34. INSERT IGNORE INTO video_decode_data
  35. (video_id, channel, hot_scene_type, video_path, title, root_source_id, dt)
  36. VALUES
  37. (%s, %s, %s, %s, %s, %s, %s);
  38. """
  39. return await self.pool.async_save(
  40. query=query,
  41. params=data,
  42. )
  43. async def update_video_decode_data_status(self, video_id, ori_status, new_status):
  44. query = """
  45. UPDATE video_decode_data
  46. SET status = %s
  47. WHERE video_id = %s AND status = %s;
  48. """
  49. affected_rows = await self.pool.async_save(
  50. query=query,
  51. params=(new_status, video_id, ori_status),
  52. )
  53. return bool(affected_rows)
  54. async def insert_into_decode_task_queue(self, data: tuple):
  55. query = """
  56. INSERT IGNORE INTO video_decode_queue
  57. (video_path, sample_video_id, dt)
  58. VALUES
  59. (%s, %s, %s);
  60. """
  61. return await self.pool.async_save(
  62. query=query,
  63. params=data,
  64. )
  65. async def is_video_decoded(self, video_id: int):
  66. query = """
  67. SELECT vid FROM aigc_topic_decode_task_result WHERE vid = %s AND status = %s;
  68. """
  69. return await self.pool.async_fetch(
  70. query=query, db_name="content_decode", params=(video_id, self.TaskStatus.SUCCESS)
  71. )