""" @author: luojunhui @description: GetOffVideos Daily """ import json import time from concurrent.futures.thread import ThreadPoolExecutor import requests import schedule from applications import PQMySQL, Functions from applications.decoratorApi import retryOnTimeout @retryOnTimeout() def bot(account_list): """ 机器人 """ url = "https://open.feishu.cn/open-apis/bot/v2/hook/b44333f2-16c0-4cb1-af01-d135f8704410" headers = {"Content-Type": "application/json"} payload = { "msg_type": "interactive", "card": { "elements": [ { "tag": "div", "text": { "content": "存在视频下架失败\n", "tag": "lark_md", }, }, { "tag": "div", "text": { "content": json.dumps( account_list, ensure_ascii=False, indent=4 ), "tag": "lark_md", }, }, ], "header": {"title": {"content": "【重点关注】", "tag": "plain_text"}}, }, } requests.request("POST", url=url, headers=headers, data=json.dumps(payload), timeout=10) class AutoGetOffVideos(object): """ 自动下架视频 """ pqMysql = PQMySQL() @classmethod def getLongArticlesVideos(cls, time_stamp): """ 获取待下架的视频 :return: """ select_sql = f""" SELECT video_id FROM get_off_videos WHERE video_status = 1 and publish_time < {time_stamp}; """ result = cls.pqMysql.select(sql=select_sql) return result @classmethod def updateVideoIdStatus(cls, video_id): """ 修改数据库内视频状态 :param video_id: :return: """ time_stamp = int(time.time()) select_sql = f""" UPDATE get_off_videos SET video_status = 0, get_off_time = {time_stamp} WHERE video_id = %s; """ cls.pqMysql.update( sql=select_sql, params=video_id ) @classmethod def changeVideoIdStatus(cls, video_id): """ 修改视频规则 :return: """ url = "https://admin.piaoquantv.com/manager/video/audit/v2/updateAuditStatus" payload = "videoId={}&auditStatus=2&updateReasonJson=&rejectReasonJson=%5B%7B%22reason%22%3A%22%E9%95%BF%E6%96%87%E8%87%AA%E5%8A%A8%E4%B8%8B%E6%9E%B6%22%2C%22reasonId%22%3A-1%7D%5D&adminUid=206".format( video_id) headers = { 'accept': 'application/json', 'accept-language': 'en,zh;q=0.9,zh-CN;q=0.8', 'content-type': 'application/x-www-form-urlencoded;charset=UTF-8', 'cookie': 'SESSION=NTljNTg2YjktMTU0MC00YWQ5LWE4ZTktNDFhODY0NzM3NTcx', 'origin': 'https://admin.piaoquantv.com', 'priority': 'u=1, i', 'sec-ch-ua': '"Not/A)Brand";v="8", "Chromium";v="126", "Google Chrome";v="126"', 'sec-ch-ua-mobile': '?0', 'sec-ch-ua-platform': '"macOS"', 'sec-fetch-dest': 'empty', 'sec-fetch-mode': 'cors', 'sec-fetch-site': 'same-origin', 'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36' } response = requests.request( "POST", url, headers=headers, data=payload ) if response.status_code == 2: result = response.json() if result.get("code", None) == 0: cls.updateVideoIdStatus(video_id=video_id) @classmethod def task1(cls): """ 已经请求超过3天的视频全部下架 :return: """ now_stamp = int(time.time()) three_days_before = now_stamp - 3 * 24 * 60 * 60 video_set = cls.getLongArticlesVideos(time_stamp=three_days_before) vid_list = [i[0] for i in video_set] with ThreadPoolExecutor(max_workers=8) as Pool1: Pool1.map(cls.changeVideoIdStatus, vid_list) @classmethod def task2(cls): """ 校验 3 天前发布的视频是否已经下架 :return: """ three_days_ago = int(time.time()) - 3 * 24 * 3600 sql = f""" SELECT video_id FROM get_off_videos WHERE publish_time < {three_days_ago} and video_status = 1; """ vid_tuple = cls.pqMysql.select(sql) if vid_tuple: vid_list = [i[0] for i in vid_tuple] with ThreadPoolExecutor(max_workers=8) as Pool1: Pool1.map(cls.changeVideoIdStatus, vid_list) vid_tuple2 = cls.pqMysql.select(sql) if vid_tuple2: vid_list2 = [i[0] for i in vid_tuple2] bot(vid_list2) else: return else: return def getOffJob(): """ 下架任务 :return: """ AGV = AutoGetOffVideos() AGV.task1() def checkJob(): """ 校验 3 天前发布的视频是否已经被下架 :return: """ AGV = AutoGetOffVideos() AGV.task2() if __name__ == '__main__': schedule.every().day.at("09:30").do(Functions().job_with_thread, getOffJob) schedule.every().day.at("10:30").do(Functions().job_with_thread, checkJob) schedule.every().day.at("14:30").do(Functions().job_with_thread, getOffJob) schedule.every().day.at("15:00").do(Functions().job_with_thread, checkJob) while True: schedule.run_pending() time.sleep(1) print("自动下架视频任务正常执行")