get_off_videos.py 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264
  1. import time
  2. import traceback
  3. from typing import List, Optional
  4. from tqdm import tqdm
  5. from applications.api import AsyncApolloApi
  6. from applications.api import change_video_audit_status
  7. from applications.api import fetch_piaoquan_video_list_detail
  8. from applications.api import feishu_robot
  9. class GetOffVideosConst:
  10. EXPIRE_TIME = 3 * 24 * 3600
  11. EARLIEST_TIME = 7 * 24 * 3600
  12. VIDEO_AVAILABLE_STATUS = 1
  13. VIDEO_DISABLE_STATUS = 0
  14. # VIDEO_AUDIT
  15. VIDEO_AUDIT_FAIL_STATUS = 2
  16. VIDEO_AUDIT_SUCCESS_STATUS = 5
  17. # Task code
  18. TASK_SUCCESS_STATUS = 2
  19. TASK_FAILED_STATUS = 99
  20. # check status
  21. CHECK_INIT_STATUS = 0
  22. CHECK_FINISHED_STATUS = 1
  23. # table
  24. table = "get_off_videos"
  25. class GetOffVideos(GetOffVideosConst, AsyncApolloApi):
  26. def __init__(self, db_client, log_client, trace_id):
  27. super().__init__()
  28. self.db_client = db_client
  29. self.log_client = log_client
  30. self.trace_id = trace_id
  31. async def get_no_need_get_off_videos() -> set:
  32. """get video_ids which don't need to get off"""
  33. # video_ids = await self.get_config_value(key="white_video_ids")
  34. video_ids = [61719952]
  35. return set(video_ids)
  36. async def get_task_list(
  37. self, earliest_timestamp_threshold: int, expire_timestamp_threshold: int
  38. ):
  39. """get videos which need get off"""
  40. query = f"""
  41. select video_id from {self.table} where video_status = %s and publish_time between %s and %s
  42. and video_id > 1;
  43. """
  44. video_list = await self.db_client.async_fetch(
  45. query,
  46. params=(
  47. self.VIDEO_AVAILABLE_STATUS,
  48. earliest_timestamp_threshold,
  49. expire_timestamp_threshold,
  50. ),
  51. )
  52. no_need_get_off_videos = await self.get_no_need_get_off_videos()
  53. video_list = [
  54. video for video in video_list if video["video_id"] not in no_need_get_off_videos
  55. ]
  56. return video_list
  57. async def update_video_status(self, video_id):
  58. query = f"""
  59. update {self.table} set video_status = %s, get_off_time = %s where video_id = %s;
  60. """
  61. return await self.db_client.async_save(
  62. query, params=(self.VIDEO_DISABLE_STATUS, int(time.time()), video_id)
  63. )
  64. async def update_video_audit_status(self, video_id):
  65. """use pq api to update video status"""
  66. response = await change_video_audit_status(
  67. video_id, self.VIDEO_AUDIT_FAIL_STATUS
  68. )
  69. await self.update_video_status(video_id)
  70. return response
  71. async def get_off_job(self):
  72. """get off videos out of expire time"""
  73. expire_timestamp_threshold = int(time.time()) - self.EXPIRE_TIME
  74. earliest_timestamp_threshold = int(time.time()) - self.EARLIEST_TIME
  75. task_list = await self.get_task_list(
  76. earliest_timestamp_threshold, expire_timestamp_threshold
  77. )
  78. await self.log_client.log(
  79. contents={
  80. "task": "get_off_videos",
  81. "trace_id": self.trace_id,
  82. "message": f"获取{len(task_list)}条待下架视频",
  83. "data": task_list,
  84. }
  85. )
  86. success_count = 0
  87. failed_count = 0
  88. for task in tqdm(task_list):
  89. video_id = task["video_id"]
  90. if video_id in (0, 1):
  91. continue
  92. try:
  93. await self.update_video_audit_status(video_id)
  94. success_count += 1
  95. except Exception as e:
  96. await self.log_client.log(
  97. contents={
  98. "task": "get_off_videos",
  99. "function": "get_off_job",
  100. "status": "fail",
  101. "message": "get off video fail",
  102. "trace_id": self.trace_id,
  103. "data": {
  104. "video_id": video_id,
  105. "error": str(e),
  106. "traceback": traceback.format_exc(),
  107. },
  108. }
  109. )
  110. failed_count += 1
  111. if success_count or failed_count:
  112. await feishu_robot.bot(
  113. title="自动下架任务完成",
  114. detail={
  115. "成功下架视频数量": success_count,
  116. "失败数量": failed_count,
  117. },
  118. mention=False,
  119. )
  120. async def check(self):
  121. earliest_timestamp = int(time.time()) - self.EARLIEST_TIME
  122. expire_timestamp = int(time.time()) - self.EXPIRE_TIME
  123. task_list = await self.get_task_list(earliest_timestamp, expire_timestamp)
  124. if task_list:
  125. await feishu_robot.bot(
  126. title="自动下架视频失败",
  127. detail={
  128. "total_video": len(task_list),
  129. "video_list": [i["video_id"] for i in task_list],
  130. },
  131. mention=False,
  132. )
  133. return self.TASK_FAILED_STATUS
  134. else:
  135. return self.TASK_SUCCESS_STATUS
  136. async def deal(self):
  137. await self.get_off_job()
  138. task_status = await self.check()
  139. await self.log_client.log(
  140. contents={
  141. "task": "get_off_videos",
  142. "function": "deal",
  143. "trace_id": self.trace_id,
  144. "message": "任务执行完成",
  145. }
  146. )
  147. return task_status
  148. class CheckVideoAuditStatus(GetOffVideosConst):
  149. def __init__(self, db_client, log_client, trace_id):
  150. self.db_client = db_client
  151. self.log_client = log_client
  152. self.trace_id = trace_id
  153. async def get_video_list_status(self, video_list: List[int]):
  154. response = await fetch_piaoquan_video_list_detail(video_list)
  155. video_detail_list = response.get("data", [])
  156. if video_detail_list:
  157. bad_video_list = [
  158. i["id"]
  159. for i in video_detail_list
  160. if i["auditStatus"] != self.VIDEO_AUDIT_SUCCESS_STATUS
  161. ]
  162. else:
  163. bad_video_list = []
  164. return bad_video_list
  165. async def get_unchecked_video_list(self) -> Optional[List[int]]:
  166. """find unchecked videos"""
  167. query = f"""
  168. select video_id from {self.table} where check_status = %s and video_status = %s and video_id > 1 limit 1000;
  169. """
  170. video_id_list = await self.db_client.async_fetch(
  171. query, params=(self.CHECK_INIT_STATUS, self.VIDEO_AVAILABLE_STATUS)
  172. )
  173. if video_id_list:
  174. return [i["video_id"] for i in video_id_list]
  175. else:
  176. return None
  177. async def update_check_status(self, video_list: List[int]):
  178. query = f"""update {self.table} set check_status = %s where video_id in %s;"""
  179. return await self.db_client.async_save(
  180. query, params=(self.CHECK_FINISHED_STATUS, tuple(video_list))
  181. )
  182. async def deal(self):
  183. def chuck_iterator(arr, chunk_size):
  184. for i in range(0, len(arr), chunk_size):
  185. yield arr[i : i + chunk_size]
  186. video_id_list = await self.get_unchecked_video_list()
  187. if video_id_list:
  188. await self.log_client.log(
  189. contents={
  190. "task": "check_video_audit_status",
  191. "function": "deal",
  192. "trace_id": self.trace_id,
  193. "message": f"一共获取{len(video_id_list)}条视频",
  194. }
  195. )
  196. else:
  197. return self.TASK_SUCCESS_STATUS
  198. video_chunks = chuck_iterator(video_id_list, 10)
  199. bad_videos_count = 0
  200. fail_list = []
  201. batch = 0
  202. for video_chunk in video_chunks:
  203. batch += 1
  204. bad_video_id_list = await self.get_video_list_status(video_chunk)
  205. if bad_video_id_list:
  206. bad_videos_count += len(bad_video_id_list)
  207. await self.log_client.log(
  208. contents={
  209. "task": "check_video_audit_status",
  210. "function": "deal",
  211. "trace_id": self.trace_id,
  212. "message": f"batch: {batch} has {len(bad_video_id_list)} bad videos",
  213. "data": bad_video_id_list,
  214. }
  215. )
  216. for bad_video_id in tqdm(bad_video_id_list):
  217. response = await change_video_audit_status(bad_video_id)
  218. if not response:
  219. fail_list.append(bad_video_id)
  220. await self.update_check_status(video_chunk)
  221. await self.log_client.log(
  222. contents={
  223. "task": "check_video_audit_status",
  224. "function": "deal",
  225. "trace_id": self.trace_id,
  226. "message": f"finish process batch: {batch}",
  227. }
  228. )
  229. if fail_list:
  230. await feishu_robot.bot(
  231. title="校验已发布视频状态出现错误", detail=fail_list, mention=False
  232. )
  233. return self.TASK_FAILED_STATUS
  234. else:
  235. return self.TASK_SUCCESS_STATUS