get_off_videos.py 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249
  1. import time
  2. import traceback
  3. from typing import List, Optional
  4. from tqdm import tqdm
  5. from applications.api import change_video_audit_status
  6. from applications.api import fetch_piaoquan_video_list_detail
  7. from applications.api import feishu_robot
  8. class GetOffVideosConst:
  9. EXPIRE_TIME = 3 * 24 * 3600
  10. EARLIEST_TIME = 7 * 24 * 3600
  11. VIDEO_AVAILABLE_STATUS = 1
  12. VIDEO_DISABLE_STATUS = 0
  13. # VIDEO_AUDIT
  14. VIDEO_AUDIT_FAIL_STATUS = 2
  15. VIDEO_AUDIT_SUCCESS_STATUS = 5
  16. # Task code
  17. TASK_SUCCESS_STATUS = 2
  18. TASK_FAILED_STATUS = 99
  19. # check status
  20. CHECK_INIT_STATUS = 0
  21. CHECK_FINISHED_STATUS = 1
  22. # table
  23. table = "get_off_videos"
  24. class GetOffVideos(GetOffVideosConst):
  25. def __init__(self, db_client, log_client, trace_id):
  26. self.db_client = db_client
  27. self.log_client = log_client
  28. self.trace_id = trace_id
  29. async def get_task_list(
  30. self, earliest_timestamp_threshold: int, expire_timestamp_threshold: int
  31. ):
  32. """get videos which need get off"""
  33. query = f"""
  34. select video_id from {self.table} where video_status = %s and publish_time between %s and %s;
  35. """
  36. video_list = await self.db_client.async_fetch(
  37. query,
  38. params=(
  39. self.VIDEO_AVAILABLE_STATUS,
  40. earliest_timestamp_threshold,
  41. expire_timestamp_threshold,
  42. ),
  43. )
  44. return video_list
  45. async def update_video_status(self, video_id):
  46. query = f"""
  47. update {self.table} set video_status = %s, get_off_time = %s where video_id = %s;
  48. """
  49. return await self.db_client.async_save(
  50. query, params=(self.VIDEO_DISABLE_STATUS, int(time.time()), video_id)
  51. )
  52. async def update_video_audit_status(self, video_id):
  53. """use pq api to update video status"""
  54. response = await change_video_audit_status(
  55. video_id, self.VIDEO_AUDIT_FAIL_STATUS
  56. )
  57. await self.update_video_status(video_id)
  58. return response
  59. async def get_off_job(self):
  60. """get off videos out of expire time"""
  61. expire_timestamp_threshold = int(time.time()) - self.EXPIRE_TIME
  62. earliest_timestamp_threshold = int(time.time()) - self.EARLIEST_TIME
  63. task_list = await self.get_task_list(
  64. earliest_timestamp_threshold, expire_timestamp_threshold
  65. )
  66. await self.log_client.log(
  67. contents={
  68. "task": "get_off_videos",
  69. "trace_id": self.trace_id,
  70. "message": f"获取{len(task_list)}条待下架视频",
  71. "data": task_list,
  72. }
  73. )
  74. success_count = 0
  75. failed_count = 0
  76. for task in tqdm(task_list):
  77. video_id = task["video_id"]
  78. try:
  79. await self.update_video_audit_status(video_id)
  80. success_count += 1
  81. except Exception as e:
  82. await self.log_client.log(
  83. contents={
  84. "task": "get_off_videos",
  85. "function": "get_off_job",
  86. "status": "fail",
  87. "message": "get off video fail",
  88. "trace_id": self.trace_id,
  89. "data": {
  90. "video_id": video_id,
  91. "error": str(e),
  92. "traceback": traceback.format_exc(),
  93. },
  94. }
  95. )
  96. failed_count += 1
  97. if success_count or failed_count:
  98. await feishu_robot.bot(
  99. title="自动下架任务完成",
  100. detail={
  101. "成功下架视频数量": success_count,
  102. "失败数量": failed_count,
  103. },
  104. mention=False,
  105. )
  106. async def check(self):
  107. earliest_timestamp = int(time.time()) - self.EARLIEST_TIME
  108. expire_timestamp = int(time.time()) - self.EXPIRE_TIME
  109. task_list = await self.get_task_list(earliest_timestamp, expire_timestamp)
  110. if task_list:
  111. await feishu_robot.bot(
  112. title="自动下架视频失败",
  113. detail={
  114. "total_video": len(task_list),
  115. "video_list": [i["video_id"] for i in task_list],
  116. },
  117. mention=False,
  118. )
  119. return self.TASK_FAILED_STATUS
  120. else:
  121. return self.TASK_SUCCESS_STATUS
  122. async def deal(self):
  123. await self.get_off_job()
  124. task_status = await self.check()
  125. await self.log_client.log(
  126. contents={
  127. "task": "get_off_videos",
  128. "function": "deal",
  129. "trace_id": self.trace_id,
  130. "message": "任务执行完成"
  131. }
  132. )
  133. return task_status
  134. class CheckVideoAuditStatus(GetOffVideosConst):
  135. def __init__(self, db_client, log_client, trace_id):
  136. self.db_client = db_client
  137. self.log_client = log_client
  138. self.trace_id = trace_id
  139. async def get_video_list_status(self, video_list: List[int]):
  140. response = await fetch_piaoquan_video_list_detail(video_list)
  141. video_detail_list = response.get("data", [])
  142. if video_detail_list:
  143. bad_video_list = [
  144. i["id"]
  145. for i in video_detail_list
  146. if i["auditStatus"] != self.VIDEO_AUDIT_SUCCESS_STATUS
  147. ]
  148. else:
  149. bad_video_list = []
  150. return bad_video_list
  151. async def get_unchecked_video_list(self) -> Optional[List[int]]:
  152. """find unchecked videos"""
  153. query = f"""
  154. select video_id from {self.table} where check_status = %s and video_status = %s limit 1000;
  155. """
  156. video_id_list = await self.db_client.async_fetch(
  157. query, params=(self.CHECK_INIT_STATUS, self.VIDEO_AVAILABLE_STATUS)
  158. )
  159. if video_id_list:
  160. return [i["video_id"] for i in video_id_list]
  161. else:
  162. return None
  163. async def update_check_status(self, video_list: List[int]):
  164. query = f"""update {self.table} set check_status = %s where video_id in %s;"""
  165. return await self.db_client.async_save(
  166. query, params=(self.CHECK_FINISHED_STATUS, tuple(video_list))
  167. )
  168. async def deal(self):
  169. def chuck_iterator(arr, chunk_size):
  170. for i in range(0, len(arr), chunk_size):
  171. yield arr[i : i + chunk_size]
  172. video_id_list = await self.get_unchecked_video_list()
  173. if video_id_list:
  174. await self.log_client.log(
  175. contents={
  176. "task": "check_video_audit_status",
  177. "function": "deal",
  178. "trace_id": self.trace_id,
  179. "message": f"一共获取{len(video_id_list)}条视频",
  180. }
  181. )
  182. else:
  183. return self.TASK_SUCCESS_STATUS
  184. video_chunks = chuck_iterator(video_id_list, 10)
  185. bad_videos_count = 0
  186. fail_list = []
  187. batch = 0
  188. for video_chunk in video_chunks:
  189. batch += 1
  190. bad_video_id_list = await self.get_video_list_status(video_chunk)
  191. if bad_video_id_list:
  192. bad_videos_count += len(bad_video_id_list)
  193. await self.log_client.log(
  194. contents={
  195. "task": "check_video_audit_status",
  196. "function": "deal",
  197. "trace_id": self.trace_id,
  198. "message": f"batch: {batch} has {len(bad_video_id_list)} bad videos",
  199. "data": bad_video_id_list,
  200. }
  201. )
  202. for bad_video_id in tqdm(bad_video_id_list):
  203. response = await change_video_audit_status(bad_video_id)
  204. if not response:
  205. fail_list.append(bad_video_id)
  206. await self.update_check_status(video_chunk)
  207. await self.log_client.log(
  208. contents={
  209. "task": "check_video_audit_status",
  210. "function": "deal",
  211. "trace_id": self.trace_id,
  212. "message": f"finish process batch: {batch}",
  213. }
  214. )
  215. if fail_list:
  216. await feishu_robot.bot(
  217. title="校验已发布视频状态出现错误", detail=fail_list, mention=False
  218. )
  219. return self.TASK_FAILED_STATUS
  220. else:
  221. return self.TASK_SUCCESS_STATUS