get_off_videos.py 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183
  1. import time
  2. import traceback
  3. from typing import List, Optional
  4. from tqdm import tqdm
  5. from applications.api import change_video_audit_status
  6. from applications.api import fetch_piaoquan_video_list_detail
  7. from applications.api import feishu_robot
  8. class GetOffVideosConst:
  9. EXPIRE_TIME = 3 * 24 * 3600
  10. EARLIEST_TIME = 7 * 24 * 3600
  11. VIDEO_AVAILABLE_STATUS = 1
  12. VIDEO_DISABLE_STATUS = 0
  13. # VIDEO_AUDIT
  14. VIDEO_AUDIT_FAIL_STATUS = 2
  15. VIDEO_AUDIT_SUCCESS_STATUS = 5
  16. # Task code
  17. TASK_SUCCESS_STATUS = 2
  18. TASK_FAILED_STATUS = 99
  19. # check status
  20. CHECK_INIT_STATUS = 0
  21. CHECK_FINISHED_STATUS = 1
  22. # table
  23. table = "get_off_videos"
  24. class GetOffVideos(GetOffVideosConst):
  25. def __init__(self, db_client, log_client):
  26. self.db_client = db_client
  27. self.log_client = log_client
  28. async def get_task_list(
  29. self, earliest_timestamp_threshold: int, expire_timestamp_threshold: int
  30. ):
  31. """get videos which need get off"""
  32. query = f"""
  33. select video_id from {self.table} where video_status = %s and publish_time between %s and %s;
  34. """
  35. video_list, error = await self.db_client.async_fetch(
  36. query,
  37. params=(
  38. self.VIDEO_AVAILABLE_STATUS,
  39. earliest_timestamp_threshold,
  40. expire_timestamp_threshold,
  41. ),
  42. )
  43. return video_list
  44. async def update_video_status(self, video_id):
  45. query = f"""
  46. update {self.table} set video_status = %s, get_off_time = %s where video_id = %s;
  47. """
  48. return await self.db_client.async_save(
  49. query, params=(self.VIDEO_DISABLE_STATUS, int(time.time()), video_id)
  50. )
  51. async def update_video_audit_status(self, video_id):
  52. """use pq api to update video status"""
  53. response = await change_video_audit_status(
  54. video_id, self.VIDEO_AUDIT_FAIL_STATUS
  55. )
  56. await self.update_video_status(video_id)
  57. return response
  58. async def get_off_job(self):
  59. """get off videos out of expire time"""
  60. expire_timestamp_threshold = int(time.time()) - self.EXPIRE_TIME
  61. earliest_timestamp_threshold = int(time.time()) - self.EARLIEST_TIME
  62. task_list = await self.get_task_list(
  63. earliest_timestamp_threshold, expire_timestamp_threshold
  64. )
  65. for task in tqdm(task_list):
  66. video_id = task["video_id"]
  67. try:
  68. await self.update_video_audit_status(video_id)
  69. except Exception as e:
  70. await self.log_client.log(
  71. contents={
  72. "task": "get_off_videos",
  73. "function": "get_off_job",
  74. "status": "fail",
  75. "message": "get off video fail",
  76. "data": {
  77. "video_id": video_id,
  78. "error": str(e),
  79. "traceback": traceback.format_exc(),
  80. },
  81. }
  82. )
  83. async def check(self):
  84. earliest_timestamp = int(time.time()) - self.EARLIEST_TIME
  85. expire_timestamp = int(time.time()) - self.EXPIRE_TIME
  86. task_list = await self.get_task_list(earliest_timestamp, expire_timestamp)
  87. if task_list:
  88. await feishu_robot.bot(
  89. title="自动下架视频失败",
  90. detail={
  91. "total_video": len(task_list),
  92. "video_list": [i["video_id"] for i in task_list],
  93. },
  94. mention=False,
  95. )
  96. return self.TASK_FAILED_STATUS
  97. else:
  98. return self.TASK_SUCCESS_STATUS
  99. class CheckVideoAuditStatus(GetOffVideosConst):
  100. def __init__(self, db_client, log_client):
  101. self.db_client = db_client
  102. self.log_client = log_client
  103. async def get_video_list_status(self, video_list: List[int]):
  104. response = await fetch_piaoquan_video_list_detail(video_list)
  105. video_detail_list = response.get("data", [])
  106. if video_detail_list:
  107. bad_video_list = [
  108. i["id"]
  109. for i in video_detail_list
  110. if i["auditStatus"] != self.VIDEO_AUDIT_SUCCESS_STATUS
  111. ]
  112. else:
  113. bad_video_list = []
  114. return bad_video_list
  115. async def get_unchecked_video_list(self) -> Optional[List[int]]:
  116. """find unchecked videos"""
  117. query = f"""
  118. select video_id from {self.table} where check_status = %s and video_status = %s limit 1000;
  119. """
  120. video_id_list, error = await self.db_client.async_fetch(
  121. query, params=(self.CHECK_INIT_STATUS, self.VIDEO_AVAILABLE_STATUS)
  122. )
  123. if error:
  124. print("error", error)
  125. return None
  126. else:
  127. return [i["video_id"] for i in video_id_list]
  128. async def update_check_status(self, video_list: List[int]):
  129. query = f"""update {self.table} set check_status = %s where video_id in %s;"""
  130. return await self.db_client.async_save(
  131. query, params=(self.CHECK_FINISHED_STATUS, tuple(video_list))
  132. )
  133. async def deal(self):
  134. def chuck_iterator(arr, chunk_size):
  135. for i in range(0, len(arr), chunk_size):
  136. yield arr[i : i + chunk_size]
  137. video_id_list = await self.get_unchecked_video_list()
  138. video_chunks = chuck_iterator(video_id_list, 10)
  139. bad_videos_count = 0
  140. fail_list = []
  141. for video_chunk in video_chunks:
  142. bad_video_id_list = await self.get_video_list_status(video_chunk)
  143. if bad_video_id_list:
  144. bad_videos_count += len(bad_video_id_list)
  145. for bad_video_id in tqdm(bad_video_id_list):
  146. response = await change_video_audit_status(bad_video_id)
  147. if not response:
  148. fail_list.append(bad_video_id)
  149. await self.update_check_status(video_chunk)
  150. if fail_list:
  151. await feishu_robot.bot(
  152. title="校验已发布视频状态出现错误", detail=fail_list, mention=False
  153. )
  154. return self.TASK_FAILED_STATUS
  155. else:
  156. return self.TASK_SUCCESS_STATUS