import time import traceback from tqdm import tqdm from applications.api import change_video_audit_status from applications.api import feishu_robot class GetOffVideosConst: EXPIRE_TIME = 3 * 24 * 3600 EARLIEST_TIME = 7 * 24 * 3600 VIDEO_AVAILABLE_STATUS = 1 VIDEO_DISABLE_STATUS = 0 # VIDEO_AUDIT VIDEO_AUDIT_FAIL_STATUS = 2 # Task code TASK_SUCCESS_STATUS = 2 TASK_FAILED_STATUS = 99 class GetOffVideos(GetOffVideosConst): def __init__(self, db_client, log_client): self.db_client = db_client self.log_client = log_client self.table = "get_off_videos" async def get_task_list(self, earliest_timestamp_threshold, expire_timestamp_threshold): """get videos which need get off""" earliest_timestamp_threshold = 0 query = f""" select video_id from {self.table} where video_status = %s and publish_time between %s and %s; """ video_list, error = await self.db_client.async_fetch(query, params=(self.VIDEO_AVAILABLE_STATUS, earliest_timestamp_threshold, expire_timestamp_threshold)) return video_list async def update_video_status(self, video_id): query = f""" update {self.table} set video_status = %s, get_off_time = %s where video_id = %s; """ return await self.db_client.async_save(query, params=(self.VIDEO_DISABLE_STATUS, int(time.time()), video_id)) async def update_video_audit_status(self, video_id): """use pq api to update video status""" response = await change_video_audit_status(video_id, self.VIDEO_AUDIT_FAIL_STATUS) await self.update_video_status(video_id) return response async def get_off_job(self): """get off videos out of expire time""" expire_timestamp_threshold = int(time.time()) - self.EXPIRE_TIME earliest_timestamp_threshold = int(time.time()) - self.EARLIEST_TIME task_list = await self.get_task_list(earliest_timestamp_threshold, expire_timestamp_threshold) for task in tqdm(task_list): video_id = task['video_id'] try: await self.update_video_audit_status(video_id) except Exception as e: self.log_client.log( contents={ "task": "get_off_videos", "function": "get_off_job", "status": "fail", "message": "get off video fail", "data": { "video_id": video_id, "error": str(e), "traceback": traceback.format_exc() } } ) async def check(self): earliest_timestamp = int(time.time()) - self.EARLIEST_TIME expire_timestamp = int(time.time()) - self.EXPIRE_TIME task_list = await self.get_task_list(earliest_timestamp, expire_timestamp) if task_list: await feishu_robot.bot( title="自动下架视频失败", detail={ "total_video": len(task_list), "video_list": [i['video_id'] for i in task_list] }, mention=False ) return self.TASK_FAILED_STATUS else: return self.TASK_SUCCESS_STATUS