123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183 |
- import time
- import traceback
- from typing import List, Optional
- from tqdm import tqdm
- from applications.api import change_video_audit_status
- from applications.api import fetch_piaoquan_video_list_detail
- from applications.api import feishu_robot
- class GetOffVideosConst:
- EXPIRE_TIME = 3 * 24 * 3600
- EARLIEST_TIME = 7 * 24 * 3600
- VIDEO_AVAILABLE_STATUS = 1
- VIDEO_DISABLE_STATUS = 0
- # VIDEO_AUDIT
- VIDEO_AUDIT_FAIL_STATUS = 2
- VIDEO_AUDIT_SUCCESS_STATUS = 5
- # Task code
- TASK_SUCCESS_STATUS = 2
- TASK_FAILED_STATUS = 99
- # check status
- CHECK_INIT_STATUS = 0
- CHECK_FINISHED_STATUS = 1
- # table
- table = "get_off_videos"
- class GetOffVideos(GetOffVideosConst):
- def __init__(self, db_client, log_client):
- self.db_client = db_client
- self.log_client = log_client
- async def get_task_list(
- self, earliest_timestamp_threshold: int, expire_timestamp_threshold: int
- ):
- """get videos which need get off"""
- query = f"""
- select video_id from {self.table} where video_status = %s and publish_time between %s and %s;
- """
- video_list, error = await self.db_client.async_fetch(
- query,
- params=(
- self.VIDEO_AVAILABLE_STATUS,
- earliest_timestamp_threshold,
- expire_timestamp_threshold,
- ),
- )
- return video_list
- async def update_video_status(self, video_id):
- query = f"""
- update {self.table} set video_status = %s, get_off_time = %s where video_id = %s;
- """
- return await self.db_client.async_save(
- query, params=(self.VIDEO_DISABLE_STATUS, int(time.time()), video_id)
- )
- async def update_video_audit_status(self, video_id):
- """use pq api to update video status"""
- response = await change_video_audit_status(
- video_id, self.VIDEO_AUDIT_FAIL_STATUS
- )
- await self.update_video_status(video_id)
- return response
- async def get_off_job(self):
- """get off videos out of expire time"""
- expire_timestamp_threshold = int(time.time()) - self.EXPIRE_TIME
- earliest_timestamp_threshold = int(time.time()) - self.EARLIEST_TIME
- task_list = await self.get_task_list(
- earliest_timestamp_threshold, expire_timestamp_threshold
- )
- for task in tqdm(task_list):
- video_id = task["video_id"]
- try:
- await self.update_video_audit_status(video_id)
- except Exception as e:
- await self.log_client.log(
- contents={
- "task": "get_off_videos",
- "function": "get_off_job",
- "status": "fail",
- "message": "get off video fail",
- "data": {
- "video_id": video_id,
- "error": str(e),
- "traceback": traceback.format_exc(),
- },
- }
- )
- async def check(self):
- earliest_timestamp = int(time.time()) - self.EARLIEST_TIME
- expire_timestamp = int(time.time()) - self.EXPIRE_TIME
- task_list = await self.get_task_list(earliest_timestamp, expire_timestamp)
- if task_list:
- await feishu_robot.bot(
- title="自动下架视频失败",
- detail={
- "total_video": len(task_list),
- "video_list": [i["video_id"] for i in task_list],
- },
- mention=False,
- )
- return self.TASK_FAILED_STATUS
- else:
- return self.TASK_SUCCESS_STATUS
- class CheckVideoAuditStatus(GetOffVideosConst):
- def __init__(self, db_client, log_client):
- self.db_client = db_client
- self.log_client = log_client
- async def get_video_list_status(self, video_list: List[int]):
- response = await fetch_piaoquan_video_list_detail(video_list)
- video_detail_list = response.get("data", [])
- if video_detail_list:
- bad_video_list = [
- i["id"]
- for i in video_detail_list
- if i["auditStatus"] != self.VIDEO_AUDIT_SUCCESS_STATUS
- ]
- else:
- bad_video_list = []
- return bad_video_list
- async def get_unchecked_video_list(self) -> Optional[List[int]]:
- """find unchecked videos"""
- query = f"""
- select video_id from {self.table} where check_status = %s and video_status = %s limit 1000;
- """
- video_id_list, error = await self.db_client.async_fetch(
- query, params=(self.CHECK_INIT_STATUS, self.VIDEO_AVAILABLE_STATUS)
- )
- if error:
- print("error", error)
- return None
- else:
- return [i["video_id"] for i in video_id_list]
- async def update_check_status(self, video_list: List[int]):
- query = f"""update {self.table} set check_status = %s where video_id in %s;"""
- return await self.db_client.async_save(
- query, params=(self.CHECK_FINISHED_STATUS, tuple(video_list))
- )
- async def deal(self):
- def chuck_iterator(arr, chunk_size):
- for i in range(0, len(arr), chunk_size):
- yield arr[i : i + chunk_size]
- video_id_list = await self.get_unchecked_video_list()
- video_chunks = chuck_iterator(video_id_list, 10)
- bad_videos_count = 0
- fail_list = []
- for video_chunk in video_chunks:
- bad_video_id_list = await self.get_video_list_status(video_chunk)
- if bad_video_id_list:
- bad_videos_count += len(bad_video_id_list)
- for bad_video_id in tqdm(bad_video_id_list):
- response = await change_video_audit_status(bad_video_id)
- if not response:
- fail_list.append(bad_video_id)
- await self.update_check_status(video_chunk)
- if fail_list:
- await feishu_robot.bot(
- title="校验已发布视频状态出现错误", detail=fail_list, mention=False
- )
- return self.TASK_FAILED_STATUS
- else:
- return self.TASK_SUCCESS_STATUS
|