|
- """
- @author: luojunhui
- 将抓取的视频发送至pq获取视频的审核结果
- """
- import time
- import traceback
- from typing import List, Dict
- from tqdm import tqdm
- from pymysql.cursors import DictCursor
- from applications import log
- from applications import PQAPI
- from applications.api import AigcSystemApi
- from applications.api import fetch_moon_shot_response
- from applications.const import WeixinVideoCrawlerConst
- from applications.db import DatabaseConnector
- from config import long_articles_config
- const = WeixinVideoCrawlerConst()
- pq_functions = PQAPI()
- aigc = AigcSystemApi()
- class PublishVideosForAudit(object):
- """
- 发布视频到pq,获取video_id,并且轮询获取视频id状态
- """
- def __init__(self):
- self.db_client = DatabaseConnector(db_config=long_articles_config)
- self.db_client.connect()
- def get_publish_video_list(self) -> List[Dict]:
- """
- 获取视频的信息
- :return:
- """
- already_published_count = self.get_published_articles_today()
- rest_count = const.MAX_VIDEO_NUM - already_published_count
- limit_count = min(rest_count, const.MAX_VIDEO_NUM_PER_PUBLISH)
- sql = f"""
- SELECT id, article_title, video_oss_path
- FROM publish_single_video_source
- WHERE audit_status = {const.VIDEO_AUDIT_INIT_STATUS} and bad_status = {const.TITLE_DEFAULT_STATUS} and platform = 'sohu'
- and score > 0.5
- ORDER BY score DESC
- LIMIT {limit_count};
- """
- response = self.db_client.fetch(sql, cursor_type=DictCursor)
- return response
- def update_audit_status(self, video_id: int, ori_audit_status: int, new_audit_status: int) -> int:
- """
- 更新视频的审核状态
- :param new_audit_status:
- :param ori_audit_status:
- :param video_id:
- :param
- :return:
- """
- update_sql = f"""
- UPDATE publish_single_video_source
- SET audit_status = %s
- WHERE audit_video_id = %s and audit_status = %s;
- """
- affected_rows = self.db_client.save(
- query=update_sql,
- params=(new_audit_status, video_id, ori_audit_status)
- )
- return affected_rows
- def get_published_articles_today(self):
- """
- 获取今天发布的视频数量总量
- :return:
- """
- select_sql = f"""
- SELECT COUNT(1) as total_count
- FROM publish_single_video_source
- WHERE audit_status != {const.VIDEO_AUDIT_INIT_STATUS}
- AND DATE(FROM_UNIXTIME(audit_timestamp)) = CURDATE();
- """
- response = self.db_client.fetch(select_sql, cursor_type=DictCursor)
- return response[0]['total_count']
- def publish_each_video(self, video_obj: Dict) -> Dict:
- """
- 发布视频到pq
- :param video_obj:
- :return:
- """
- response = pq_functions.publish_to_pq(
- oss_path=video_obj.get("video_oss_path"),
- uid=const.DEFAULT_ACCOUNT_UID,
- title=video_obj.get("article_title")
- )
- response_json = response.json()
- if response_json.get("code") == const.REQUEST_SUCCESS:
- video_id = response_json['data']['id']
- update_sql = f"""
- UPDATE publish_single_video_source
- SET audit_status = %s, audit_video_id = %s, audit_timestamp = %s
- WHERE id = %s;
- """
- affected_rows = self.db_client.save(
- query=update_sql,
- params=(const.VIDEO_AUDIT_PROCESSING_STATUS, video_id, int(time.time()), video_obj['id'])
- )
- if affected_rows:
- result = {
- "status": "success",
- "video_id": video_id
- }
- return result
- else:
- result = {
- "status": "fail",
- "video_id": video_id,
- "error_msg": "抢占锁失败,update执行操作修改0行"
- }
- return result
- else:
- if response_json.get("code") == const.PUBLISHED_ILLEGAL_TITLE_CODE:
- # 发布了标题违规的视频,发布失败, 修改审核状态从0-->2
- update_sql = f"""
- UPDATE publish_single_video_source
- SET audit_status = %s
- WHERE id = %s and audit_status = %s;
- """
- self.db_client.save(update_sql, params=(const.VIDEO_AUDIT_FAIL_STATUS, video_obj['id'], const.VIDEO_AUDIT_INIT_STATUS))
- result = {
- "status": "fail",
- "error_msg": "发布到pq失败",
- "title": video_obj.get("article_title"),
- "oss_path": video_obj.get("video_oss_path"),
- "response": response_json
- }
- return result
- def get_check_article_list(self) -> List[Dict]:
- """
- 获取需要检查的视频列表
- :return:
- """
- sql = f"""
- select content_trace_id, audit_video_id, score, platform
- from publish_single_video_source
- where audit_status = {const.VIDEO_AUDIT_PROCESSING_STATUS};
- """
- response = self.db_client.fetch(sql, cursor_type=DictCursor)
- return response
- def update_mini_program_title(self, video_id: int) -> bool:
- """
- :param video_id:
- """
- select_sql = f"""
- SELECT article_title FROM publish_single_video_source WHERE audit_video_id = {video_id};
- """
- title = self.db_client.fetch(select_sql, cursor_type=DictCursor)[0]['article_title']
- try:
- # generate kimi title
- mini_program_title = fetch_moon_shot_response(task='generate_kimi_title', input_text=title)
- # score kimi title
- kimi_safe_title = None
- title_safe_score = fetch_moon_shot_response(task='get_title_safe_score', input_text=mini_program_title)
- if int(title_safe_score) > const.TITLE_SAFE_SCORE_THRESHOLD:
- kimi_safe_title_obj = fetch_moon_shot_response(task='make_title_safe', input_text=title, output_type='json')
- kimi_safe_title = kimi_safe_title_obj['title_v2']
- mini_program_title = kimi_safe_title if kimi_safe_title else mini_program_title
- update_sql = f"""
- UPDATE publish_single_video_source SET mini_program_title = %s WHERE audit_video_id = %s;
- """
- self.db_client.save(update_sql, params=(mini_program_title, video_id))
- log(
- task="publish_video_for_audit",
- function="update_mini_program_title",
- message="修改小程序标题成功",
- data={
- "video_id": video_id,
- "title": title,
- "mini_program_title": mini_program_title
- }
- )
- return True
- except Exception as e:
- log(
- task="publish_video_for_audit",
- function="update_mini_program_title",
- status="fail",
- data={
- "video_id": video_id,
- "title": title,
- "error": str(e),
- "error_stack": traceback.format_exc()
- }
- )
- return False
- def insert_into_task_queue(self, video) -> int:
- """
- enqueue
- """
- insert_query = f"""
- insert into single_video_transform_queue
- (content_trace_id, pq_vid, score, platform)
- values (%s, %s, %s, %s);
- """
- affected_rows = self.db_client.save(
- query=insert_query,
- params=(
- video['content_trace_id'], video['audit_video_id'], video['score'], video['platform']
- )
- )
- return affected_rows
- def check_video_status(self, video_obj: dict) -> Dict:
- """
- 检查视频的状态,若视频审核通过or不通过,修改记录状态
- :param video_obj:
- :return:
- """
- video_id = video_obj['audit_video_id']
- response = pq_functions.getPQVideoListDetail([video_id])
- audit_status = response.get("data")[0].get("auditStatus")
- # 请求成功
- if audit_status == const.PQ_AUDIT_SUCCESS_STATUS:
- # 更新小程序标题字段
- mini_program_title_flag = self.update_mini_program_title(video_id)
- if mini_program_title_flag:
- # 处理成功,修改审核状态为1
- affected_rows = self.update_audit_status(
- video_id=video_id,
- ori_audit_status=const.VIDEO_AUDIT_PROCESSING_STATUS,
- new_audit_status=const.VIDEO_AUDIT_SUCCESS_STATUS
- )
- # 将视频存储到任务队列
- self.insert_into_task_queue(video_obj)
- # 将视频存储到 aigc 表
- aigc.insert_crawler_relation_to_aigc_system(
- relation_list=[
- {
- "videoPoolTraceId": video_obj['content_trace_id'],
- "channelContentId": str(video_id),
- "platform": video_obj['platform'],
- }
- ]
- )
- else:
- # 修改小程序标题失败,修改审核状态为4
- affected_rows = self.update_audit_status(
- video_id=video_id,
- ori_audit_status=const.VIDEO_AUDIT_PROCESSING_STATUS,
- new_audit_status=const.VIDEO_TITLE_GENERATE_FAIL_STATUS
- )
- elif audit_status in {const.PQ_AUDIT_SELF_VISIBLE_STATUS, const.PQ_AUDIT_FAIL_STATUS}:
- # 视频审核失败,修改审核状态为2
- affected_rows = self.update_audit_status(
- video_id=video_id,
- ori_audit_status=const.VIDEO_AUDIT_PROCESSING_STATUS,
- new_audit_status=const.VIDEO_AUDIT_FAIL_STATUS
- )
- elif audit_status == const.PQ_AUDIT_PROCESSING_STATUS:
- # 视频正在审核中,不做处理
- affected_rows = 0
- else:
- # 其他情况,暂时不做处理
- affected_rows = 0
- result = {
- "affected_rows": affected_rows,
- "video_id": video_id,
- "audit_status": audit_status
- }
- return result
- def publish_job(self):
- """
- 发布视频到pq
- :return:
- """
- video_list = self.get_publish_video_list()
- for video_obj in tqdm(video_list, desc="视频发布"):
- try:
- response = self.publish_each_video(video_obj)
- if response.get("status") == "success":
- log(
- task="publish_video_for_audit",
- message="发送至PQ成功",
- function="publish_each_video",
- data={
- "video_id": response.get("video_id")
- }
- )
- else:
- log(
- task="publish_video_for_audit",
- message=response.get('error_msg'),
- function="publish_each_video",
- status="fail",
- data={
- "response": response,
- "video_obj": video_obj
- }
- )
- except Exception as e:
- error_msg = traceback.format_exc()
- log(
- task="publish_video_for_audit",
- message="发送至PQ代码执行失败",
- function="publish_each_video",
- status="fail",
- data={
- "error_msg": error_msg,
- "video_obj": video_obj,
- "error": str(e)
- }
- )
- def check_job(self):
- """
- 检查视频的状态
- :return:
- """
- video_list = self.get_check_article_list()
- for video_obj in tqdm(video_list, desc="视频检查"):
- video_id = video_obj.get("audit_video_id")
- try:
- response = self.check_video_status(video_obj)
- if response.get("affected_rows"):
- continue
- else:
- log(
- task="publish_video_for_audit",
- function="check_each_video",
- message="修改行数为0",
- data={
- "video_id": video_id,
- "audit_status": response['audit_status']
- }
- )
- except Exception as e:
- error_msg = traceback.format_exc()
- log(
- task="publish_video_for_audit",
- message="查询状态执行失败",
- function="check_each_video",
- status="fail",
- data={
- "error_msg": error_msg,
- "video_obj": video_obj,
- "error": str(e)
- }
- )
|