import time import traceback from typing import List, Optional from tqdm import tqdm from applications.api import change_video_audit_status from applications.api import fetch_piaoquan_video_list_detail from applications.api import feishu_robot class GetOffVideosConst: EXPIRE_TIME = 3 * 24 * 3600 EARLIEST_TIME = 7 * 24 * 3600 VIDEO_AVAILABLE_STATUS = 1 VIDEO_DISABLE_STATUS = 0 # VIDEO_AUDIT VIDEO_AUDIT_FAIL_STATUS = 2 VIDEO_AUDIT_SUCCESS_STATUS = 5 # Task code TASK_SUCCESS_STATUS = 2 TASK_FAILED_STATUS = 99 # check status CHECK_INIT_STATUS = 0 CHECK_FINISHED_STATUS = 1 # table table = "get_off_videos" class GetOffVideos(GetOffVideosConst): def __init__(self, db_client, log_client): self.db_client = db_client self.log_client = log_client async def get_task_list( self, earliest_timestamp_threshold: int, expire_timestamp_threshold: int ): """get videos which need get off""" query = f""" select video_id from {self.table} where video_status = %s and publish_time between %s and %s; """ video_list, error = await self.db_client.async_fetch( query, params=( self.VIDEO_AVAILABLE_STATUS, earliest_timestamp_threshold, expire_timestamp_threshold, ), ) return video_list async def update_video_status(self, video_id): query = f""" update {self.table} set video_status = %s, get_off_time = %s where video_id = %s; """ return await self.db_client.async_save( query, params=(self.VIDEO_DISABLE_STATUS, int(time.time()), video_id) ) async def update_video_audit_status(self, video_id): """use pq api to update video status""" response = await change_video_audit_status( video_id, self.VIDEO_AUDIT_FAIL_STATUS ) await self.update_video_status(video_id) return response async def get_off_job(self): """get off videos out of expire time""" expire_timestamp_threshold = int(time.time()) - self.EXPIRE_TIME earliest_timestamp_threshold = int(time.time()) - self.EARLIEST_TIME task_list = await self.get_task_list( earliest_timestamp_threshold, expire_timestamp_threshold ) for task in tqdm(task_list): video_id = task["video_id"] try: await self.update_video_audit_status(video_id) except Exception as e: await self.log_client.log( contents={ "task": "get_off_videos", "function": "get_off_job", "status": "fail", "message": "get off video fail", "data": { "video_id": video_id, "error": str(e), "traceback": traceback.format_exc(), }, } ) async def check(self): earliest_timestamp = int(time.time()) - self.EARLIEST_TIME expire_timestamp = int(time.time()) - self.EXPIRE_TIME task_list = await self.get_task_list(earliest_timestamp, expire_timestamp) if task_list: await feishu_robot.bot( title="自动下架视频失败", detail={ "total_video": len(task_list), "video_list": [i["video_id"] for i in task_list], }, mention=False, ) return self.TASK_FAILED_STATUS else: return self.TASK_SUCCESS_STATUS class CheckVideoAuditStatus(GetOffVideosConst): def __init__(self, db_client, log_client): self.db_client = db_client self.log_client = log_client async def get_video_list_status(self, video_list: List[int]): response = await fetch_piaoquan_video_list_detail(video_list) video_detail_list = response.get("data", []) if video_detail_list: bad_video_list = [ i["id"] for i in video_detail_list if i["auditStatus"] != self.VIDEO_AUDIT_SUCCESS_STATUS ] else: bad_video_list = [] return bad_video_list async def get_unchecked_video_list(self) -> Optional[List[int]]: """find unchecked videos""" query = f""" select video_id from {self.table} where check_status = %s and video_status = %s limit 1000; """ video_id_list, error = await self.db_client.async_fetch( query, params=(self.CHECK_INIT_STATUS, self.VIDEO_AVAILABLE_STATUS) ) if error: print("error", error) return None else: return [i["video_id"] for i in video_id_list] async def update_check_status(self, video_list: List[int]): query = f"""update {self.table} set check_status = %s where video_id in %s;""" return await self.db_client.async_save( query, params=(self.CHECK_FINISHED_STATUS, tuple(video_list)) ) async def deal(self): def chuck_iterator(arr, chunk_size): for i in range(0, len(arr), chunk_size): yield arr[i : i + chunk_size] video_id_list = await self.get_unchecked_video_list() video_chunks = chuck_iterator(video_id_list, 10) bad_videos_count = 0 fail_list = [] for video_chunk in video_chunks: bad_video_id_list = await self.get_video_list_status(video_chunk) if bad_video_id_list: bad_videos_count += len(bad_video_id_list) for bad_video_id in tqdm(bad_video_id_list): response = await change_video_audit_status(bad_video_id) if not response: fail_list.append(bad_video_id) await self.update_check_status(video_chunk) if fail_list: await feishu_robot.bot( title="校验已发布视频状态出现错误", detail=fail_list, mention=False ) return self.TASK_FAILED_STATUS else: return self.TASK_SUCCESS_STATUS