import requests import json import time from db_helper import HologresHelper, RedisHelper from config import set_config from log import Log config_ = set_config() log_ = Log() def request_post(request_url, request_data, timeout=1.0): """ post 请求 HTTP接口 :param request_url: 接口URL :param request_data: 请求参数 :param timeout: 超时时间,默认为1秒,type-float :return: res_data json格式 """ try: response = requests.post(url=request_url, json=request_data, timeout=timeout) if response.status_code == 200: res_data = json.loads(response.text) return res_data else: return None except requests.exceptions.Timeout as e: log_.error('url: {} timeout, exception: {}'.format(request_url, e)) return None def get_videos_remain_view_count(app_type, videos): """ 获取视频在流量池中的剩余可分发数 :param app_type: 产品标识 type-int :param videos: 视频信息 (视频id, 流量池标记) type-list,[{'videoId': video_id, 'flowPool': flow_pool}, ...] :return: data type-list,[(video_id, flow_pool, view_count), ...] """ if not videos: return [] request_data = {'appType': app_type, 'videos': videos} result = request_post(request_url=config_.GET_REMAIN_VIEW_COUNT_URL, request_data=request_data, timeout=1) if result is None: return [] if result['code'] != 0: log_.info('获取视频在流量池中的剩余可分发数失败') return [] data = [(item['videoId'], item['flowPool'], item['distributeCount']) for item in result['data']] return data class FilterVideos(object): """视频过滤""" def __init__(self, app_type, video_ids, mid='', uid=''): """ 初始化 :param app_type: 产品标识 type-int :param video_ids: 需过滤的视频列表 type-list :param mid: mid type-string :param uid: uid type-string """ self.app_type = app_type self.mid = mid self.uid = uid self.video_ids = video_ids def filter_videos(self): """视频过滤""" # 预曝光过滤 st_pre = time.time() filtered_pre_result = self.filter_video_previewed(self.video_ids) et_pre = time.time() log_.info('filter by previewed: app_type = {}, result = {}, execute time = {}ms'.format( self.app_type, filtered_pre_result, (et_pre - st_pre) * 1000)) if not filtered_pre_result: return None # 视频状态过滤 st_status = time.time() filtered_status_result = self.filter_video_status(video_ids=filtered_pre_result) et_status = time.time() log_.info('filter by video status: result = {}, execute time = {}ms'.format( filtered_status_result, (et_status - st_status) * 1000)) if not filtered_status_result: return None # 视频已曝光过滤 st_viewed = time.time() filtered_viewed_result = self.filter_video_viewed(video_ids=filtered_status_result) et_viewed = time.time() log_.info('filter by viewed: app_type = {}, mid = {}, uid = {}, result = {}, execute time = {}ms'.format( self.app_type, self.mid, self.uid, filtered_viewed_result, (et_viewed - st_viewed) * 1000)) if not filtered_viewed_result: return None else: return filtered_viewed_result def filter_video_previewed(self, video_ids): """ 预曝光过滤 :param video_ids: 需过滤的视频列表 type-list :return: filtered_videos 过滤后的列表 type-list """ # 根据Redis缓存中的数据过滤 redis_helper = RedisHelper() # key拼接 key_name = config_.PREVIEW_KEY_PREFIX + '{}.{}'.format(self.app_type, self.mid) pe_videos_list = redis_helper.get_data_from_set(key_name) if not pe_videos_list: return video_ids pe_videos = [eval(video) for video in pe_videos_list] filtered_videos = [video_id for video_id in video_ids if video_id not in pe_videos] return filtered_videos def filter_video_status(self, video_ids): """ 对视频状态进行过滤 :param video_ids: 视频id列表 type-list :return: filtered_videos """ if len(video_ids) == 1: sql = "set hg_experimental_enable_shard_pruning=off; " \ "SELECT video_id " \ "FROM {} " \ "WHERE audit_status = 5 " \ "AND applet_rec_status IN (1, -6) " \ "AND open_status = 1 " \ "AND payment_status = 0 " \ "AND encryption_status IS NULL " \ "AND transcoding_status = 3 " \ "AND video_id IN ({});".format(config_.VIDEO_STATUS, video_ids[0]) else: sql = "set hg_experimental_enable_shard_pruning=off; " \ "SELECT video_id " \ "FROM {} " \ "WHERE audit_status = 5 " \ "AND applet_rec_status IN (1, -6) " \ "AND open_status = 1 " \ "AND payment_status = 0 " \ "AND encryption_status IS NULL " \ "AND transcoding_status = 3 " \ "AND video_id IN {};".format(config_.VIDEO_STATUS, tuple(video_ids)) hologres_helper = HologresHelper() data = hologres_helper.get_data(sql=sql) filtered_videos = [temp[0] for temp in data] return filtered_videos def filter_video_viewed(self, video_ids, types=(1,)): """ 调用后端接口过滤用户已观看视频 :param video_ids: 视频id列表 type-list :param types: 过滤参数 type-tuple, 默认(1, ) 1-已观看 2-视频状态 3-是否进入老年人社区 4-话题状态 5-推荐状态 :return: filtered_videos """ # 调用http接口 request_data = {"appType": self.app_type, "mid": self.mid, "uid": self.uid, "types": list(types), "videoIds": video_ids} result = request_post(request_url=config_.VIDEO_FILTER_URL, request_data=request_data, timeout=2.5) if result is None: log_.info('过滤失败,types: {}'.format(types)) return video_ids if result['code'] != 0: log_.info('过滤失败,types: {}'.format(types)) return video_ids filtered_videos = result['data'] return filtered_videos if __name__ == '__main__': filter_ = FilterVideos(app_type=1, mid='22', uid='www', video_ids=[1, 2, 3, 55]) filter_.filter_videos() filter_.filter_video_status(video_ids=[1, 3, 5])