""" @author: luojunhui 将抓取的视频发送至pq获取视频的审核结果 """ from typing import List, Dict from tqdm import tqdm from pymysql.cursors import DictCursor from applications import PQAPI from applications import longArticlesMySQL from applications.const import WeixinVideoCrawlerConst const = WeixinVideoCrawlerConst() pq_functions = PQAPI() class PublishVideosForAudit(object): """ 发布视频到pq,获取video_id,并且轮询获取视频id状态 """ def __init__(self): self.db = longArticlesMySQL() def get_publish_video_list(self) -> List[Dict]: """ 获取视频的信息 :return: """ sql = f"""SELECT id, article_title, video_oss_path FROM publish_single_video_source WHERE audit_status = {const.VIDEO_AUDIT_INIT_STATUS};""" response = self.db.select(sql, cursor_type=DictCursor) return response def update_audit_status(self, video_id: int, ori_audit_status: int, new_audit_status: int) -> int: """ 更新视频的审核状态 :param new_audit_status: :param ori_audit_status: :param video_id: :param :return: """ update_sql = f""" UPDATE publish_single_video_source SET audit_status = %s WHERE audit_video_id = %s and audit_status = %s; """ affected_rows = self.db.update( sql=update_sql, params=(new_audit_status, video_id, ori_audit_status) ) return affected_rows def publish_each_video(self, video_obj: Dict) -> None: """ 发布视频到pq :param video_obj: :return: """ response = pq_functions.publish_to_pq( oss_path=video_obj.get("video_oss_path"), uid=const.DEFAULT_ACCOUNT_UID, title=video_obj.get("article_title") ) response_json = response.json() if response_json.get("code") == const.REQUEST_SUCCESS: video_id = response_json['data']['id'] update_sql = f""" UPDATE publish_single_video_source SET audit_status = %s, audit_video_id = %s WHERE id = %s; """ affected_rows = self.db.update( sql=update_sql, params=(const.VIDEO_AUDIT_PROCESSING_STATUS, video_id, video_obj['id']) ) if affected_rows: print("视频发布成功--{}".format(video_id)) else: print("视频发布失败--{}".format(video_id)) else: print("视频发布失败--{}".format(video_obj.get("video_oss_path"))) def get_check_article_list(self) -> List[Dict]: """ 获取需要检查的视频列表 :return: """ sql = f"""SELECT audit_video_id FROM publish_single_video_source WHERE audit_status = {const.VIDEO_AUDIT_PROCESSING_STATUS};""" response = self.db.select(sql, cursor_type=DictCursor) return response def check_video_status(self, video_id: int) -> bool: """ 检查视频的状态,若视频审核通过or不通过,修改记录状态 :param video_id: :return: """ response = pq_functions.getPQVideoListDetail([video_id]) audit_status = response.get("data")[0].get("auditStatus") if audit_status == const.PQ_AUDIT_SUCCESS_STATUS: affected_rows = self.update_audit_status( video_id=video_id, ori_audit_status=const.VIDEO_AUDIT_PROCESSING_STATUS, new_audit_status=const.VIDEO_AUDIT_SUCCESS_STATUS ) if affected_rows: return True else: return False elif audit_status in {const.PQ_AUDIT_SELF_VISIBLE_STATUS, const.PQ_AUDIT_FAIL_STATUS}: affected_rows = self.update_audit_status( video_id=video_id, ori_audit_status=const.VIDEO_AUDIT_PROCESSING_STATUS, new_audit_status=const.VIDEO_AUDIT_FAIL_STATUS ) if affected_rows: return True else: return False else: return False def publish_job(self): """ 发布视频到pq :return: """ video_list = self.get_publish_video_list() for video_obj in tqdm(video_list, desc="视频发布"): self.publish_each_video(video_obj) print("视频发布完成") def check_job(self): """ 检查视频的状态 :return: """ video_list = self.get_check_article_list() for video_obj in tqdm(video_list, desc="视频检查"): video_id = video_obj.get("audit_video_id") self.check_video_status(video_id) print("视频检查完成")