123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195 |
- """
- @author: luojunhui
- """
- import json
- import time
- import traceback
- from applications.log import logging
- from applications.functions.forward_request import forward_requests
- class GetOffVideos(object):
- """
- 下架视频
- """
- def __init__(self, params, mysql_client, config):
- self.params = params
- self.mysql_client = mysql_client
- self.article_match_video_table = config.article_match_video_table
- self.get_off_videos = config.get_off_video_table
- self.trace_id = None
- self.push_type = None
- def check_params(self):
- """
- :return:
- """
- try:
- self.trace_id = self.params['traceId']
- self.push_type = self.params.get('pushType')
- logging(
- code="4121",
- info='自动下架视频接口参数校验成功',
- function="get_off_videos",
- data=self.params,
- trace_id=self.trace_id
- )
- return None
- except Exception as e:
- response = {
- "error": "params error",
- "info": str(e),
- "data": self.params
- }
- return response
- async def record_published_trace_id(self, gh_id):
- """
- 记录已发布的trace_id
- :return:
- """
- insert_sql = f"""
- INSERT IGNORE INTO long_articles_published_trace_id
- (trace_id, gh_id, push_type, create_timestamp)
- VALUES
- (%s, %s, %s, %s);
- """
- await self.mysql_client.async_insert(
- sql=insert_sql,
- params=(
- self.trace_id,
- gh_id,
- self.push_type,
- int(time.time())
- )
- )
- async def record_get_off_videos(self, video_list):
- """
- 存储视频id到getOffVideos表中
- """
- info_list = [
- (
- item['videoId'],
- int(time.time()),
- 1,
- self.trace_id
- )
- for item in video_list
- ]
- insert_sql = f"""
- INSERT INTO {self.get_off_videos}
- (video_id, publish_time, video_status, trace_id)
- values
- (%s, %s, %s, %s);
- """
- await self.mysql_client.async_insert_many(
- sql=insert_sql,
- params_list=info_list
- )
- async def push_video_into_queue(self):
- """
- 将视频id记录到待下架表中
- :return:
- """
- select_sql = f"""
- select gh_id, response from {self.article_match_video_table} where trace_id = '{self.trace_id}';
- """
- result = await self.mysql_client.async_select(sql=select_sql)
- if result:
- video_list = json.loads(result[0][1])
- gh_id = result[0][0]
- try:
- await self.record_get_off_videos(video_list)
- if self.push_type:
- await self.record_published_trace_id(gh_id)
- logging(
- code="4122",
- info='自动下架视频接口存储成功',
- function="get_off_videos",
- trace_id=self.trace_id
- )
- return {
- "status": "success",
- "traceId": self.trace_id
- }
- except Exception as e:
- logging(
- code="4123",
- info="自动下架视频处理失败: {}".format(e),
- data={
- "traceback": traceback.format_exc()
- },
- function="get_off_videos",
- trace_id=self.trace_id
- )
- return {
- "status": "fail",
- "traceId": self.trace_id,
- "msg": "insert fails, {}".format(e)
- }
- else:
- return {
- "status": "fail",
- "traceId": self.trace_id,
- "msg": "traceId error, can't find trace_id"
- }
- async def check_trace_id(self):
- """
- check trace id 是否存在与系统中
- """
- select_sql = f"""
- SELECT trace_id
- FROM {self.article_match_video_table}
- WHERE trace_id = '{self.trace_id}';
- """
- response = await self.mysql_client.async_select(select_sql)
- if response:
- return True
- else:
- return False
- async def check_trace_id_startswith(self):
- """
- trace_id是否以search开头
- """
- if self.trace_id.startswith('search'):
- return True
- else:
- return False
- async def deal(self):
- """
- :return:
- """
- params_error = self.check_params()
- if params_error:
- return params_error
- else:
- trace_id_process_flag = await self.check_trace_id_startswith()
- if not trace_id_process_flag:
- return {
- "traceId": self.trace_id,
- "msg": "do not process trace id"
- }
- trace_id_exist_flag = await self.check_trace_id()
- if trace_id_exist_flag:
- return await self.push_video_into_queue()
- else:
- # 只需要传trace_id, 老系统接口不穿strategy参数默认strategy_v1
- response = await forward_requests(
- params={
- "traceId": self.trace_id
- },
- api="get_off_videos"
- )
- return response
|