""" @author: luojunhui @desc: auto get off videos in 3 days for safety """ import time from pymysql.cursors import DictCursor from tqdm import tqdm from applications import log, PQAPI from applications.db import DatabaseConnector from applications.api import FeishuBotApi from config import long_articles_config class GetOffVideos: def __init__(self): self.db_client = DatabaseConnector(long_articles_config) self.db_client.connect() self.pq_api = PQAPI() self.feishu_robot = FeishuBotApi() self.EXPIRE_TIME = 3 * 24 * 60 * 60 def get_long_articles_videos(self, timestamp): """ 获取待下架的视频 :return: """ fetch_query = f""" select video_id from get_off_videos where video_status = %s and publish_time < %s; """ article_list = self.db_client.fetch( query=fetch_query, cursor_type=DictCursor, params=(1, timestamp) ) log( task="getOffVideosDaily", function="get_long_articles_videos", message="查找到视频 id_list,一共{}条视频".format(len(article_list)), ) return article_list def update_video_statsu(self, video_id, ori_status, new_status): """ 更新get_off_videos表内,视频状态 """ now_timestamp = int(time.time()) update_query = f""" update get_off_videos set video_status = %s, get_off_time = %s where video_id = %s and video_status = %s; """ self.db_client.save( query=update_query, params=(new_status, now_timestamp, video_id, ori_status) ) def change_video_id_status(self, video_id): """ 修改视频状态 """ response = self.pq_api.changeVideoStatus(video_id, 2) if response: self.update_video_statsu(video_id=video_id, ori_status=1, new_status=0) else: log( task="getOffVideosDaily", function="change_video_status", status="fail", message="请求票圈修改状态异常: ---video_id = {}".format(video_id), ) def check_task(self): three_days_ago = int(time.time()) - self.EXPIRE_TIME fetch_query = f""" select video_id from get_off_videos where publish_time < %s and video_status = %s; """ video_id_list = self.db_client.fetch( fetch_query, cursor_type=DictCursor, params=(three_days_ago, 1) ) if video_id_list: video_id_list = [i["video_id"] for i in video_id_list] for vid in video_id_list: try: self.change_video_id_status(video_id=vid) except Exception as e: log( task="getOffVideosDaily", function="check_job", status="fail", message="task2下架单个视频失败,video_id={}, 报错信息={}".format( vid, e ), ) time.sleep(10) # check rest = self.db_client.fetch( query=fetch_query, cursor_type=DictCursor, params=(three_days_ago, 1) ) if rest: rest = [i["video_id"] for i in rest] self.feishu_robot.bot( title="get_off_videos 下架视频异常", detail={"video_id": rest} ) else: return else: return def get_off_job(self): """ 自动下架视频任务 """ three_days_before = int(time.time()) - self.EXPIRE_TIME fetch_response = self.get_long_articles_videos(timestamp=three_days_before) video_id_list = [i["video_id"] for i in fetch_response] for video_id in tqdm(video_id_list): try: self.change_video_id_status(video_id=video_id) except Exception as e: log( task="getOffVideosDaily", function="get_off_job", status="fail", message="get_off_job下架单个视频失败,video_id={}, 报错信息={}".format( video_id, e ), ) def deal(self): self.get_off_job() time.sleep(60) self.check_task() self.feishu_robot.bot( title="get_off_videos 下架视频任务完成", detail={"message": "get_off_videos 下架视频任务完成"}, env="long_articles_task", mention=False, )