""" @author: luojunhui """ import json import time import traceback from applications.log import logging from applications.functions.forward_request import forward_requests class GetOffVideos(object): """ 下架视频 """ def __init__(self, params, mysql_client, config): self.params = params self.mysql_client = mysql_client self.article_match_video_table = config.article_match_video_table self.get_off_videos = config.get_off_video_table self.trace_id = None self.push_type = None def check_params(self): """ :return: """ try: self.trace_id = self.params['traceId'] self.push_type = self.params.get('pushType') logging( code="4121", info='自动下架视频接口参数校验成功', function="get_off_videos", data=self.params, trace_id=self.trace_id ) return None except Exception as e: response = { "error": "params error", "info": str(e), "data": self.params } return response async def record_published_trace_id(self, gh_id): """ 记录已发布的trace_id :return: """ insert_sql = f""" INSERT IGNORE INTO long_articles_published_trace_id (trace_id, gh_id, push_type, create_timestamp) VALUES (%s, %s, %s, %s); """ await self.mysql_client.async_insert( sql=insert_sql, params=( self.trace_id, gh_id, self.push_type, int(time.time()) ) ) async def record_get_off_videos(self, video_list): """ 存储视频id到getOffVideos表中 """ info_list = [ ( item['videoId'], int(time.time()), 1, self.trace_id ) for item in video_list ] insert_sql = f""" INSERT INTO {self.get_off_videos} (video_id, publish_time, video_status, trace_id) values (%s, %s, %s, %s); """ await self.mysql_client.async_insert_many( sql=insert_sql, params_list=info_list ) async def push_video_into_queue(self): """ 将视频id记录到待下架表中 :return: """ select_sql = f""" select gh_id, response from {self.article_match_video_table} where trace_id = '{self.trace_id}'; """ result = await self.mysql_client.async_select(sql=select_sql) if result: video_list = json.loads(result[0][1]) gh_id = result[0][0] try: await self.record_get_off_videos(video_list) if self.push_type: await self.record_published_trace_id(gh_id) logging( code="4122", info='自动下架视频接口存储成功', function="get_off_videos", trace_id=self.trace_id ) return { "status": "success", "traceId": self.trace_id } except Exception as e: logging( code="4123", info="自动下架视频处理失败: {}".format(e), data={ "traceback": traceback.format_exc() }, function="get_off_videos", trace_id=self.trace_id ) return { "status": "fail", "traceId": self.trace_id, "msg": "insert fails, {}".format(e) } else: return { "status": "fail", "traceId": self.trace_id, "msg": "traceId error, can't find trace_id" } async def check_trace_id(self): """ check trace id 是否存在与系统中 """ select_sql = f""" SELECT trace_id FROM {self.article_match_video_table} WHERE trace_id = '{self.trace_id}'; """ response = await self.mysql_client.async_select(select_sql) if response: return True else: return False async def check_trace_id_startswith(self): """ trace_id是否以search开头 """ if self.trace_id.startswith('search'): return True else: return False async def deal(self): """ :return: """ params_error = self.check_params() if params_error: return params_error else: trace_id_process_flag = await self.check_trace_id_startswith() if not trace_id_process_flag: return { "traceId": self.trace_id, "msg": "do not process trace id" } trace_id_exist_flag = await self.check_trace_id() if trace_id_exist_flag: return await self.push_video_into_queue() else: # 只需要传trace_id, 老系统接口不穿strategy参数默认strategy_v1 response = await forward_requests( params={ "traceId": self.trace_id }, api="get_off_videos" ) return response