123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203 |
- """
- @author: luojunhui
- 将抓取的视频发送至pq获取视频的审核结果
- """
- import time
- import traceback
- from typing import List, Dict
- from tqdm import tqdm
- from pymysql.cursors import DictCursor
- from applications import log
- from applications import PQAPI
- from applications import longArticlesMySQL
- from applications.const import WeixinVideoCrawlerConst
- const = WeixinVideoCrawlerConst()
- pq_functions = PQAPI()
- class PublishVideosForAudit(object):
- """
- 发布视频到pq,获取video_id,并且轮询获取视频id状态
- """
- def __init__(self):
- self.db = longArticlesMySQL()
- def get_publish_video_list(self) -> List[Dict]:
- """
- 获取视频的信息
- :return:
- """
- already_published_count = self.get_published_articles_today()
- rest_count = const.MAX_VIDEO_NUM - already_published_count
- sql = f"""
- SELECT id, article_title, video_oss_path
- FROM publish_single_video_source
- WHERE audit_status = {const.VIDEO_AUDIT_INIT_STATUS}
- LIMIT {rest_count};
- """
- response = self.db.select(sql, cursor_type=DictCursor)
- return response
- def update_audit_status(self, video_id: int, ori_audit_status: int, new_audit_status: int) -> int:
- """
- 更新视频的审核状态
- :param new_audit_status:
- :param ori_audit_status:
- :param video_id:
- :param
- :return:
- """
- update_sql = f"""
- UPDATE publish_single_video_source
- SET audit_status = %s
- WHERE audit_video_id = %s and audit_status = %s;
- """
- affected_rows = self.db.update(
- sql=update_sql,
- params=(new_audit_status, video_id, ori_audit_status)
- )
- return affected_rows
- def get_published_articles_today(self):
- """
- 获取今天发布的视频数量总量
- :return:
- """
- select_sql = f"""
- SELECT COUNT(1) as total_count
- FROM publish_single_video_source
- WHERE audit_status != {const.VIDEO_AUDIT_INIT_STATUS}
- AND DATE(FROM_UNIXTIME(audit_timestamp)) = CURDATE();
- """
- response = self.db.select(select_sql, cursor_type=DictCursor)
- return response[0]['total_count']
- def publish_each_video(self, video_obj: Dict) -> None:
- """
- 发布视频到pq
- :param video_obj:
- :return:
- """
- response = pq_functions.publish_to_pq(
- oss_path=video_obj.get("video_oss_path"),
- uid=const.DEFAULT_ACCOUNT_UID,
- title=video_obj.get("article_title")
- )
- response_json = response.json()
- if response_json.get("code") == const.REQUEST_SUCCESS:
- video_id = response_json['data']['id']
- update_sql = f"""
- UPDATE publish_single_video_source
- SET audit_status = %s, audit_video_id = %s, audit_timestamp = %s
- WHERE id = %s;
- """
- affected_rows = self.db.update(
- sql=update_sql,
- params=(const.VIDEO_AUDIT_PROCESSING_STATUS, video_id, int(time.time()), video_obj['id'])
- )
- if affected_rows:
- print("视频发布成功--{}".format(video_id))
- else:
- print("视频发布失败--{}".format(video_id))
- else:
- print("视频发布失败--{}".format(video_obj.get("video_oss_path")))
- def get_check_article_list(self) -> List[Dict]:
- """
- 获取需要检查的视频列表
- :return:
- """
- sql = f"""SELECT audit_video_id FROM publish_single_video_source WHERE audit_status = {const.VIDEO_AUDIT_PROCESSING_STATUS};"""
- response = self.db.select(sql, cursor_type=DictCursor)
- return response
- def check_video_status(self, video_id: int) -> bool:
- """
- 检查视频的状态,若视频审核通过or不通过,修改记录状态
- :param video_id:
- :return:
- """
- response = pq_functions.getPQVideoListDetail([video_id])
- audit_status = response.get("data")[0].get("auditStatus")
- if audit_status == const.PQ_AUDIT_SUCCESS_STATUS:
- affected_rows = self.update_audit_status(
- video_id=video_id,
- ori_audit_status=const.VIDEO_AUDIT_PROCESSING_STATUS,
- new_audit_status=const.VIDEO_AUDIT_SUCCESS_STATUS
- )
- if affected_rows:
- return True
- else:
- return False
- elif audit_status in {const.PQ_AUDIT_SELF_VISIBLE_STATUS, const.PQ_AUDIT_FAIL_STATUS}:
- affected_rows = self.update_audit_status(
- video_id=video_id,
- ori_audit_status=const.VIDEO_AUDIT_PROCESSING_STATUS,
- new_audit_status=const.VIDEO_AUDIT_FAIL_STATUS
- )
- if affected_rows:
- return True
- else:
- return False
- else:
- return False
- def publish_job(self):
- """
- 发布视频到pq
- :return:
- """
- video_list = self.get_publish_video_list()
- for video_obj in tqdm(video_list, desc="视频发布"):
- try:
- self.publish_each_video(video_obj)
- log(
- task="publish_video_for_audit",
- message="成功发送至pq",
- function="publish_each_video",
- data={
- "video_obj": video_obj
- }
- )
- except Exception as e:
- error_msg = traceback.format_exc()
- log(
- task="publish_video_for_audit",
- message="发送至PQ失败",
- function="publish_each_video",
- status="fail",
- data={
- "error_msg": error_msg,
- "video_obj": video_obj,
- "error": str(e)
- }
- )
- def check_job(self):
- """
- 检查视频的状态
- :return:
- """
- video_list = self.get_check_article_list()
- for video_obj in tqdm(video_list, desc="视频检查"):
- video_id = video_obj.get("audit_video_id")
- try:
- self.check_video_status(video_id)
- except Exception as e:
- error_msg = traceback.format_exc()
- log(
- task="publish_video_for_audit",
- message="查询状态失败",
- function="check_each_video",
- status="fail",
- data={
- "error_msg": error_msg,
- "video_obj": video_obj,
- "error": str(e)
- }
- )
|