import traceback import requests import json import time import gevent import pandas as pd import random from datetime import date, timedelta, datetime from typing import List # from db_helper import HologresHelper, RedisHelper, MysqlHelper from db_helper import RedisHelper, MysqlHelper from config import set_config from log import Log from parameter_update import param_update_risk_rule from parameter_update import param_update_risk_videos from parameter_update import param_update_risk_filter_flag config_ = set_config() log_ = Log() FESTIVAL = { "春节": [2024020510, 2024021000], "初一": [2024021010, 2024021100], "初二": [2024021110, 2024021200], "初三": [2024021210, 2024021300], "初四": [2024021310, 2024021400], "初五": [2024021410, 2024021500], "情人节": [2024021410, 2024021500] } def send_msg_to_feishu(msg_text): """发送消息到飞书""" # webhook地址 webhook = 'https://open.feishu.cn/open-apis/bot/v2/hook/8de4de35-30ed-4692-8854-7a154e89b2f2' # 自定义关键词key_word key_word = '服务报警' headers = {'Content-Type': 'application/json'} payload_message = { "msg_type": "text", "content": { "text": '{}: {}'.format(key_word, msg_text) } } response = requests.request('POST', url=webhook, headers=headers, data=json.dumps(payload_message)) # print(response.text) def request_post(request_url, request_data, timeout): """ post 请求 HTTP接口 :param request_url: 接口URL :param request_data: 请求参数 :param timeout: 超时时间,单位为秒,type-float or tuple(connect_timeout, read_timeout) :return: res_data json格式 """ try: headers = {"Connection": "close"} #print(request_url) #print(headers) response = requests.post(url=request_url, json=request_data, timeout=timeout, headers=headers) #print("response:", response) if response.status_code == 200: res_data = json.loads(response.text) return res_data else: return None except Exception as e: #print(e) log_.error('url: {}, exception: {}, traceback: {}'.format(request_url, e, traceback.format_exc())) return None def request_post_data(request_url, request_data, timeout): """ post 请求 HTTP接口 :param request_url: 接口URL :param request_data: 请求参数 :param timeout: 超时时间,单位为秒,type-float or tuple(connect_timeout, read_timeout) :return: res_data json格式 """ try: headers = {'content-type': 'application/json'} response = requests.post(url=request_url, data=request_data, timeout=timeout, headers=headers) #print("response:", response) if response.status_code == 200: res_data = json.loads(response.text) return res_data['outputs'] else: return None except Exception as e: #print(e) log_.error('url: {}, exception: {}, traceback: {}'.format(request_url, e, traceback.format_exc())) return None def request_get(request_url, timeout): """ get 请求 HTTP接口 :param request_url: 接口URL :param timeout: 超时时间,单位为秒,type-float or tuple(connect_timeout, read_timeout) :return: res_data json格式 """ try: response = requests.get(url=request_url, timeout=timeout) if response.status_code == 200: res_data = json.loads(response.text) return res_data else: return None except Exception as e: log_.error('url: {}, exception: {}, traceback: {}'.format(request_url, e, traceback.format_exc())) return None def get_user_has30day_return(mid): """ 获取用户近30天是否有回流 :param mid: mid :return: data, type """ if not mid: return None # 获取redis中存储的状态值 user_key = f"{config_.KEY_NAME_PREFIX_USER_HAS30DAY_RETURN}{mid}" redis_helper = RedisHelper() data = redis_helper.get_data_from_redis(key_name=user_key) if data is not None: return int(data) else: request_url = f"{config_.GET_USER_30DayReturnCnt_URL}{mid}" result = request_get(request_url=request_url, timeout=0.1) if result is None: return None if result['code'] != 0: return None data = result['data'] if data is True: redis_data = 1 else: redis_data = 0 redis_helper.set_data_to_redis(key_name=user_key, value=redis_data, expire_time=2 * 3600) return redis_data def get_videos_remain_view_count(app_type, videos): """ 获取视频在流量池中的剩余可分发数 :param app_type: 产品标识 type-int :param videos: 视频信息 (视频id, 流量池标记) type-list,[{'videoId': video_id, 'flowPool': flow_pool}, ...] :return: data type-list,[(video_id, flow_pool, view_count), ...] error_flag 错误标记,True为错误 """ error_flag = False if not videos: return [], error_flag request_data = {'appType': app_type, 'videos': videos} result = request_post(request_url=config_.GET_REMAIN_VIEW_COUNT_URL, request_data=request_data, timeout=(0.1, 1)) if result is None: error_flag = True return [], error_flag if result['code'] != 0: log_.info('获取视频在流量池中的剩余可分发数失败') error_flag = True return [], error_flag data = [(item['videoId'], item['flowPool'], item['distributeCount']) for item in result['data']] return data, error_flag def get_videos_local_distribute_count(video_id, flow_pool): """ 获取流量池视频本地分发数 :param video_id: video_id :param flow_pool: 流量池标记 :return: current_count 本地记录的分发数 """ # redis_h = datetime.now().hour # if datetime.now().minute >= 30: # redis_h += 0.5 # key_name = config_.LOCAL_DISTRIBUTE_COUNT_PREFIX + str(redis_h) key_name = f'{config_.LOCAL_DISTRIBUTE_COUNT_PREFIX}{video_id}:{flow_pool}' redis_helper = RedisHelper() # video = '{}-{}'.format(video_id, flow_pool) # current_count = redis_helper.get_score_with_value(key_name=key_name, value=video) current_count = redis_helper.get_data_from_redis(key_name=key_name) if current_count is not None: return int(current_count) else: return None def update_video_w_h_rate(video_id, key_name): """ 获取横屏视频的宽高比,并存入redis中 (width/height>1) :param video_id: videoId type-int :param key_name: redis key :return: None """ # 获取数据 sql = "SELECT id, width, height, rotate FROM longvideo.wx_video WHERE id = {};".format(video_id) mysql_helper = MysqlHelper() data = mysql_helper.get_data(sql=sql) if len(data) == 0: return # 更新到redis width, height, rotate = int(data[0][1]), int(data[0][2]), int(data[0][3]) if width == 0 or height == 0: return if rotate in (90, 270): w_h_rate = height / width else: w_h_rate = width / height if w_h_rate > 1: info_data = {int(video_id): w_h_rate} else: return redis_helper = RedisHelper() # 写入新数据 if len(info_data) > 0: redis_helper.add_data_with_zset(key_name=key_name, data=info_data) class FilterVideos(object): """视频过滤""" def __init__(self, request_id, app_type, video_ids, mid='', uid='', expansion_factor=None, risk_filter_flag=None, app_region_filtered=None, videos_with_risk=None, force_truncation=None): """ 初始化 :param request_id: request_id :param app_type: 产品标识 type-int :param video_ids: 需过滤的视频列表 type-list :param mid: mid type-string :param uid: uid type-string """ self.request_id = request_id self.app_type = app_type self.mid = mid self.uid = uid self.video_ids = video_ids self.expansion_factor = expansion_factor self.risk_filter_flag = risk_filter_flag self.app_region_filtered = app_region_filtered self.videos_with_risk = videos_with_risk self.force_truncation = force_truncation def filter_video_status_h(self, video_ids, rule_key, data_key, ab_code, province_code, key_flag=''): """召回小时级更新的视频状态过滤""" # 根据Redis缓存中的数据过滤 redis_helper = RedisHelper() # 获取不符合推荐状态的视频 if ab_code in [code for _, code in config_.AB_CODE['region_rank_by_h'].items()]: if key_flag == 'region_24h': key_prefix = f"{config_.REGION_H_VIDEO_FILER_24H}{province_code}." elif key_flag == 'day_24h': key_prefix = f"{config_.H_VIDEO_FILER_24H}{province_code}." else: key_prefix = f"{config_.REGION_H_VIDEO_FILER}{province_code}." elif ab_code in [code for _, code in config_.AB_CODE['rank_by_24h'].items()]: key_prefix = config_.H_VIDEO_FILER_24H elif key_flag == '24h': key_prefix = config_.H_VIDEO_FILER_24H else: key_prefix = config_.H_VIDEO_FILER filter_videos_list = redis_helper.get_data_from_set( key_name=f"{key_prefix}{self.app_type}.{data_key}.{rule_key}" ) if not filter_videos_list: return video_ids filter_videos = [int(video) for video in filter_videos_list] filtered_videos = [video_id for video_id in video_ids if video_id not in filter_videos] return filtered_videos def filter_videos_h(self, rule_key, data_key, ab_code, province_code, key_flag='', pool_type='rov'): """召回小时级更新的视频过滤""" # 预曝光过滤 # st_pre = time.time() filtered_pre_result = self.filter_video_previewed(self.video_ids) # et_pre = time.time() # log_.info({ # 'logTimestamp': int(time.time() * 1000), # 'request_id': self.request_id, # 'app_type': self.app_type, # 'mid': self.mid, # 'uid': self.uid, # 'operation': 'preview_filter', # 'request_videos': self.video_ids, # 'preview_filter_result': filtered_pre_result, # 'executeTime': (time.time() - st_pre) * 1000 # }) if not filtered_pre_result: return None # 视频状态过滤 # st_status = time.time() filtered_status_result = self.filter_video_status_h(video_ids=filtered_pre_result, rule_key=rule_key, data_key=data_key, ab_code=ab_code, province_code=province_code, key_flag=key_flag) # et_status = time.time() # log_.info({ # 'logTimestamp': int(time.time() * 1000), # 'request_id': self.request_id, # 'app_type': self.app_type, # 'mid': self.mid, # 'uid': self.uid, # 'operation': 'status_filter', # 'request_videos': filtered_pre_result, # 'status_filter_result': filtered_status_result, # 'executeTime': (time.time() - st_status) * 1000 # }) if not filtered_status_result: return None # 视频已曝光过滤 st_viewed = time.time() filtered_viewed_result = self.filter_video_viewed(video_ids=filtered_status_result) # et_viewed = time.time() log_.info({ 'logTimestamp': int(time.time() * 1000), 'pool_type': pool_type, 'request_id': self.request_id, 'app_type': self.app_type, 'mid': self.mid, 'uid': self.uid, 'operation': 'view_filter', 'request_videos': filtered_status_result, 'view_filter_result': filtered_viewed_result, 'executeTime': (time.time() - st_viewed) * 1000 }) if not filtered_viewed_result: return None else: return [int(video_id) for video_id in filtered_viewed_result] def filter_videos(self, pool_type='rov', region_code=None, shield_config=None): """视频过滤""" # todo: 添加app和region的风险过滤。 st_viewed = time.time() videos_filtered = self.filter_videos_with_risk_video(self.video_ids, self.app_type, region_code) videos_filtered = self.filter_videos_with_festival(videos_filtered) # log_.info({ # 'logTimestamp': int(time.time() * 1000), # 'pool_type': "zhangbo-filter-pool_type", # 'request_id': self.request_id, # 'app_type': self.app_type, # 'mid': "zhangbo-filter_videos", # 'uid': self.uid, # 'operation': 'shield_filter', # 'request_videos': self.video_ids, # 'shield_filter_result': videos_filtered, # 'executeTime': (time.time() - st_viewed) * 1000 # }) # 预曝光过滤 st_pre = time.time() filtered_pre_result = self.filter_video_previewed(videos_filtered) # print("filtered_pre:", (time.time()-st_pre)*1000) # et_pre = time.time() # log_.info({ # 'logTimestamp': int(time.time() * 1000), # 'request_id': self.request_id, # 'app_type': self.app_type, # 'mid': self.mid, # 'uid': self.uid, # 'operation': 'preview_filter', # 'request_videos': self.video_ids, # 'preview_filter_result': filtered_pre_result, # 'executeTime': (time.time() - st_pre) * 1000 # }) if not filtered_pre_result: return None # 视频状态过滤采用离线定时过滤方案 # 视频状态过滤 # st_status = time.time() # filtered_status_result = self.filter_video_status(video_ids=filtered_pre_result) # et_status = time.time() # log_.info('filter by video status: result = {}, execute time = {}ms'.format( # filtered_status_result, (et_status - st_status) * 1000)) # if not filtered_status_result: # return None # 视频已曝光过滤 st_viewed = time.time() filtered_viewed_result = self.filter_video_viewed(video_ids=filtered_pre_result) # print("filtered_pre:", (time.time() - st_viewed) * 1000) # et_viewed = time.time() # log_.info({ # 'logTimestamp': int(time.time() * 1000), # 'pool_type': pool_type, # 'request_id': self.request_id, # 'app_type': self.app_type, # 'mid': self.mid, # 'uid': self.uid, # 'operation': 'view_filter', # 'request_videos': filtered_pre_result, # 'view_filter_result': filtered_viewed_result, # 'executeTime': (time.time() - st_viewed) * 1000 # }) if not filtered_viewed_result: return None filtered_viewed_videos = [int(video_id) for video_id in filtered_viewed_result] return filtered_viewed_videos # if pool_type == 'flow' or pool_type=='normal': # # 流量池视频需过滤屏蔽视频 # if region_code is None or shield_config is None: # return filtered_viewed_videos # else: # shield_key_name_list = shield_config.get(region_code, None) # if shield_key_name_list is not None: # filtered_shield_video_ids = self.filter_shield_video( # video_ids=filtered_viewed_videos, shield_key_name_list=shield_key_name_list # ) # log_.info({ # 'logTimestamp': int(time.time() * 1000), # 'pool_type': pool_type, # 'request_id': self.request_id, # 'app_type': self.app_type, # 'mid': self.mid, # 'uid': self.uid, # 'operation': 'shield_filter', # 'request_videos': filtered_viewed_videos, # 'shield_filter_result': filtered_shield_video_ids, # 'executeTime': (time.time() - st_viewed) * 1000 # }) # # print("filtered_pre flow:", (time.time() - st_viewed) * 1000) # return filtered_shield_video_ids # else: # return filtered_viewed_videos # else: # return filtered_viewed_videos def filter_video_previewed(self, video_ids): """ 预曝光过滤 :param video_ids: 需过滤的视频列表 type-list :return: filtered_videos 过滤后的列表 type-list """ pre_time = time.time() if not self.mid or self.mid == 'null': # mid为空时,不做预曝光过滤 return video_ids # 根据Redis缓存中的数据过滤 redis_helper = RedisHelper() # key拼接 key_name = f"{config_.PREVIEW_KEY_PREFIX}{self.app_type}:{self.mid}" #print("key_name:", key_name) pe_videos_list = redis_helper.get_data_from_set(key_name) #print("pe_videos_list:", pe_videos_list) # log_.info('****app_type = {}, mid = {}, uid = {}, pe_videos_list = {}'.format( # self.app_type, self.mid, self.uid, pe_videos_list)) # log_.info('****app_type = {}, mid = {}, uid = {}, video_ids = {}'.format( # self.app_type, self.mid, self.uid, video_ids)) if not pe_videos_list: return video_ids pe_videos = [int(video) for video in pe_videos_list] #print("pe_videos:", len(pe_videos)) filtered_videos = [video_id for video_id in video_ids if video_id not in pe_videos] #print(f"pre res: {filtered_videos}\nexecute_time: {(time.time() - pre_time) * 1000}") return filtered_videos # def filter_video_status(self, video_ids): # """ # 对视频状态进行过滤 # :param video_ids: 视频id列表 type-list # :return: filtered_videos # """ # if len(video_ids) == 1: # sql = "set hg_experimental_enable_shard_pruning=off; " \ # "SELECT video_id " \ # "FROM {} " \ # "WHERE audit_status = 5 " \ # "AND applet_rec_status IN (1, -6) " \ # "AND open_status = 1 " \ # "AND payment_status = 0 " \ # "AND encryption_status != 5 " \ # "AND transcoding_status = 3 " \ # "AND video_id IN ({});".format(config_.VIDEO_STATUS, video_ids[0]) # else: # sql = "set hg_experimental_enable_shard_pruning=off; " \ # "SELECT video_id " \ # "FROM {} " \ # "WHERE audit_status = 5 " \ # "AND applet_rec_status IN (1, -6) " \ # "AND open_status = 1 " \ # "AND payment_status = 0 " \ # "AND encryption_status != 5 " \ # "AND transcoding_status = 3 " \ # "AND video_id IN {};".format(config_.VIDEO_STATUS, tuple(video_ids)) # # hologres_helper = HologresHelper() # data = hologres_helper.get_data(sql=sql) # filtered_videos = [int(temp[0]) for temp in data] # return filtered_videos def filter_video_viewed(self, video_ids, types=(1, 6,)): """ 调用后端接口过滤用户已观看视频 :param video_ids: 视频id列表 type-list :param types: 过滤参数 type-tuple, 默认(1, ) 1-已观看 2-视频状态 3-是否进入老年人社区 4-话题状态 5-推荐状态 6-白名单过滤 7-涉政视频过滤 :return: filtered_videos """ # 获取对应端的过滤参数types types = config_.FILTER_VIEWED_TYPES_CONFIG.get(self.app_type, None) if types is None: types = config_.FILTER_VIEWED_TYPES_CONFIG.get('other') request_data = {"appType": self.app_type, "mid": self.mid, "uid": self.uid, "types": list(types), "videoIds": video_ids} # print(request_data) # 调用http接口 result = request_post(request_url=config_.VIDEO_FILTER_URL, request_data=request_data, timeout=(0.1, 1)) # print("result:", result) if result is None: # print("result is None") # log_.info('过滤失败,types: {}'.format(types)) return [] if result['code'] != 0: # log_.info('过滤失败,types: {}'.format(types)) return [] filtered_videos = result['data'] return filtered_videos def filter_video_viewed_new(self, video_ids): """ 调用后端接口过滤用户已观看视频 :param video_ids: 视频id列表 type-list :param types: 过滤参数 type-tuple, 默认(1, ) 1-已观看 2-视频状态 3-是否进入老年人社区 4-话题状态 5-推荐状态 6-白名单过滤 7-涉政视频过滤 :return: filtered_videos """ # 获取对应端的过滤参数types st_time = time.time() types = config_.FILTER_VIEWED_TYPES_CONFIG.get(self.app_type, None) #print(types) if types is None: types = config_.FILTER_VIEWED_TYPES_CONFIG.get('other') if 6 in types: types = list(types) types.remove(6) #print(types) request_data = {"appType": self.app_type, "mid": self.mid, "uid": self.uid, "types": list(types), "videoIds": video_ids} # 调用http接口 result = request_post(request_url=config_.VIDEO_FILTER_URL, request_data=request_data, timeout=(0.1, 1)) #print(f"view res: {result}\nexecute_time: {(time.time() - st_time) * 1000}") if result is None: # log_.info('过滤失败,types: {}'.format(types)) return [] if result['code'] != 0: # log_.info('过滤失败,types: {}'.format(types)) return [] filtered_videos = result['data'] return filtered_videos def filter_shield_video(self, video_ids, shield_key_name_list): """ 过滤屏蔽视频视频 :param video_ids: 需过滤的视频列表 type-list :param shield_key_name_list: 过滤视频 redis-key :return: filtered_videos 过滤后的列表 type-list """ # print("filter_shield_video:", len(filter_shield_video)) if len(video_ids) == 0: return video_ids # 根据Redis缓存中的数据过滤 redis_helper = RedisHelper() for shield_key_name in shield_key_name_list: video_ids = [ int(video_id) for video_id in video_ids if not redis_helper.data_exists_with_set(key_name=shield_key_name, value=video_id) ] # shield_videos_list = redis_helper.get_data_from_set(key_name=shield_key_name) # if not shield_videos_list: # continue # shield_videos = [int(video) for video in shield_videos_list] # video_ids = [int(video_id) for video_id in video_ids if int(video_id) not in shield_videos] # print("video_ids:", len(video_ids)) return video_ids def new_filter_video(self): """视频过滤""" # 1. 预曝光过滤 st_pre = time.time() #print("new_filter video_ids:", self.video_ids) filtered_pre_result = self.filter_video_previewed(self.video_ids) if not filtered_pre_result: return None # log_.info({ # 'logTimestamp': int(time.time() * 1000), # 'request_id': self.request_id, # 'app_type': self.app_type, # 'mid': self.mid, # 'uid': self.uid, # 'operation': 'preview_filter', # 'request_videos': self.video_ids, # 'preview_filter_result': filtered_pre_result, # 'executeTime': (time.time() - st_pre) * 1000 # }) #2. 视频已曝光过滤 st_viewed = time.time() #print("---filtered viewed---") #print("filtered_pre_result:",filtered_pre_result) filtered_viewed_result = self.filter_video_viewed(video_ids=filtered_pre_result) if not filtered_viewed_result: return None return filtered_viewed_result def new_flow_video(self, vid_list, flow_vids_set, region_code, shield_config): flow_video_list = [] normal_video_list = [] for v_id in vid_list: if v_id in flow_vids_set: flow_video_list.append(v_id) else: normal_video_list.append(v_id) shield_key_name_list = shield_config.get(region_code, None) if shield_key_name_list is not None: filtered_shield_video_ids = self.filter_shield_video( video_ids=flow_video_list, shield_key_name_list=shield_key_name_list ) return normal_video_list, filtered_shield_video_ids else: return normal_video_list, flow_video_list def filter_movie_religion_video(self, video_ids): """过滤白名单视频(影视,宗教)""" # 影视 + 宗教: rov.filter.movie.{videoId} # 宗教: rov.filter.religion.{videoId} st_time = time.time() if self.app_type not in [config_.APP_TYPE['WAN_NENG_VIDEO'], config_.APP_TYPE['LAO_HAO_KAN_VIDEO'], config_.APP_TYPE['ZUI_JING_QI'], config_.APP_TYPE['H5']]: # 过滤 影视 + 宗教 keys = [f"rov.filter.movie.{video_id}" for video_id in video_ids] elif self.app_type in [config_.APP_TYPE['WAN_NENG_VIDEO'], config_.APP_TYPE['ZUI_JING_QI'], config_.APP_TYPE['H5']]: # 过滤 影视 + 宗教 keys = [f"rov.filter.religion.{video_id}" for video_id in video_ids] else: #print(f"m_r res: {video_ids}\nexecute_time: {(time.time() - st_time) * 1000}") return video_ids redis_helper = RedisHelper(redis_info=config_.REDIS_INFO_FILTER) filter_videos = [] for i in range(len(keys)//1000 + 1): video_ids_temp = video_ids[i*1000:(i+1)*1000] if len(video_ids_temp) == 0: break mget_res = redis_helper.mget(keys=keys[i*1000:(i+1)*1000]) filter_videos.extend([int(data) for data in mget_res if data is not None]) if len(filter_videos) > 0: filtered_videos = set(video_ids) - set(filter_videos) #print(f"m_r res: {list(filtered_videos)}\nexecute_time: {(time.time() - st_time) * 1000}") return list(filtered_videos) else: #print(f"m_r res: {video_ids}\nexecute_time: {(time.time() - st_time) * 1000}") return video_ids def filter_videos_new(self, region_code=None, shield_config=None, flow_set=None): """视频过滤""" # 预曝光过滤 st_pre = time.time() #print("self.video_ids:", len(self.video_ids)) filtered_pre_result = self.filter_video_previewed(self.video_ids) if not filtered_pre_result: return None #print("filtered_pre_result:", len(filtered_pre_result)) #print(filtered_pre_result) # 视频已曝光过滤/白名单过滤 st_viewed = time.time() t = [ gevent.spawn(self.filter_video_viewed_new, filtered_pre_result), gevent.spawn(self.filter_movie_religion_video, filtered_pre_result)] gevent.joinall(t) filtered_result_list = [i.get() for i in t] #print("filtered_result_list1:",filtered_result_list[0]) #print("filtered_result_list2:",filtered_result_list[1]) filtered_viewed_set = set('') for i in filtered_result_list[0]: filtered_viewed_set.add(int(i)) filter_video_set =set('') for j in filtered_result_list[1]: filter_video_set.add(int(j)) filtered_viewed_result = list(filtered_viewed_set & filter_video_set) #print(f"view&m_r res: {filtered_viewed_result}\nexecute_time: {(time.time() - st_viewed) * 1000}") #print("filtered:",len(filtered_viewed_result)) if not filtered_viewed_result: return None filtered_viewed_videos = [int(video_id) for video_id in filtered_viewed_result] #print("result:", filtered_viewed_videos) if flow_set is None: return filtered_viewed_videos else: # 流量池视频需过滤屏蔽视频 if region_code is None or shield_config is None: return filtered_viewed_videos else: normal_recall_ids = [] left_flow_ids = [] for vid in filtered_viewed_videos: if vid in flow_set: left_flow_ids.append(vid) else: normal_recall_ids.append(vid) shield_key_name_list = shield_config.get(region_code, None) if shield_key_name_list is not None: filtered_shield_video_ids = self.filter_shield_video( video_ids=left_flow_ids, shield_key_name_list=shield_key_name_list ) return normal_recall_ids+filtered_shield_video_ids else: return filtered_viewed_videos def filter_videos_status(self, pool_type='rov', region_code=None, shield_config=None): """视频过滤""" # todo: 添加app和region的风险过滤。 st_viewed = time.time() videos_filtered = self.filter_videos_with_risk_video(self.video_ids, self.app_type, region_code) videos_filtered = self.filter_videos_with_festival(videos_filtered) # log_.info({ # 'logTimestamp': int(time.time() * 1000), # 'pool_type': "zhangbo-filter-pool_type", # 'request_id': self.request_id, # 'app_type': self.app_type, # 'mid': "zhangbo-filter_videos_status", # 'uid': self.uid, # 'operation': 'shield_filter', # 'request_videos': self.video_ids, # 'shield_filter_result': videos_filtered, # 'executeTime': (time.time() - st_viewed) * 1000 # }) # 预曝光过滤 st_pre = time.time() filtered_pre_result = self.filter_video_previewed(videos_filtered) # print("filtered_pre:", (time.time()-st_pre)*1000) # et_pre = time.time() # log_.info({ # 'logTimestamp': int(time.time() * 1000), # 'request_id': self.request_id, # 'app_type': self.app_type, # 'mid': self.mid, # 'uid': self.uid, # 'operation': 'preview_filter', # 'request_videos': self.video_ids, # 'preview_filter_result': filtered_pre_result, # 'executeTime': (time.time() - st_pre) * 1000 # }) if not filtered_pre_result: return None # 视频状态过滤采用离线定时过滤方案 # 视频状态过滤 # st_status = time.time() # filtered_status_result = self.filter_video_status(video_ids=filtered_pre_result) # et_status = time.time() # log_.info('filter by video status: result = {}, execute time = {}ms'.format( # filtered_status_result, (et_status - st_status) * 1000)) # if not filtered_status_result: # return None # 视频已曝光过滤 st_viewed = time.time() filtered_viewed_result = self.filter_video_viewed_status(video_ids=filtered_pre_result) # print("filtered_pre:", (time.time() - st_viewed) * 1000) # et_viewed = time.time() # log_.info({ # 'logTimestamp': int(time.time() * 1000), # 'pool_type': pool_type, # 'request_id': self.request_id, # 'app_type': self.app_type, # 'mid': self.mid, # 'uid': self.uid, # 'operation': 'view_filter', # 'request_videos': filtered_pre_result, # 'view_filter_result': filtered_viewed_result, # 'executeTime': (time.time() - st_viewed) * 1000 # }) if not filtered_viewed_result: return None filtered_viewed_videos = [int(video_id) for video_id in filtered_viewed_result] return filtered_viewed_videos # if pool_type == 'flow' or pool_type=='normal': # # 流量池视频需过滤屏蔽视频 # if region_code is None or shield_config is None: # return filtered_viewed_videos # else: # shield_key_name_list = shield_config.get(region_code, None) # if shield_key_name_list is not None: # filtered_shield_video_ids = self.filter_shield_video( # video_ids=filtered_viewed_videos, shield_key_name_list=shield_key_name_list # ) # log_.info({ # 'logTimestamp': int(time.time() * 1000), # 'pool_type': pool_type, # 'request_id': self.request_id, # 'app_type': self.app_type, # 'mid': self.mid, # 'uid': self.uid, # 'operation': 'shield_filter', # 'request_videos': filtered_viewed_videos, # 'shield_filter_result': filtered_shield_video_ids, # 'executeTime': (time.time() - st_viewed) * 1000 # }) # # print("filtered_pre flow:", (time.time() - st_viewed) * 1000) # return filtered_shield_video_ids # else: # return filtered_viewed_videos # else: # return filtered_viewed_videos def filter_video_viewed_status(self, video_ids, types=(1, 6,)): """ 调用后端接口过滤用户已观看视频 :param video_ids: 视频id列表 type-list :param types: 过滤参数 type-tuple, 默认(1, ) 1-已观看 2-视频状态 3-是否进入老年人社区 4-话题状态 5-推荐状态 6-白名单过滤 7-涉政视频过滤 :return: filtered_videos """ # 获取对应端的过滤参数types types = config_.FILTER_VIEWED_TYPES_CONFIG.get(self.app_type, None) if types is None: types = config_.FILTER_VIEWED_TYPES_CONFIG.get('other') types = list(types) types.append(2) request_data = {"appType": self.app_type, "mid": self.mid, "uid": self.uid, "types": types, "videoIds": video_ids} # print(request_data) # 调用http接口 result = request_post(request_url=config_.VIDEO_FILTER_URL, request_data=request_data, timeout=(0.1, 1)) # print("result:", result) if result is None: # print("result is None") # log_.info('过滤失败,types: {}'.format(types)) return [] if result['code'] != 0: # log_.info('过滤失败,types: {}'.format(types)) return [] filtered_videos = result['data'] return filtered_videos def filter_videos_with_risk_video(self, video_ids, app_type, region_code): # 0 用一个开关控制,是否过滤生效。 便于回滚功能。 risk_filter_flag = self.risk_filter_flag if not risk_filter_flag: return self.truncation(video_ids) # 1 判断是否过滤,不展示的app+区域列表,-1必须过滤 app_region_filtered = self.app_region_filtered if app_type in app_region_filtered.keys(): if_filtered = False if region_code in app_region_filtered[app_type]: if_filtered = True else: if_filtered = True if not if_filtered: return self.truncation(video_ids) # 2 确认过滤,获取风险video列表param_update_risk_videos videos_with_risk = self.videos_with_risk # 3 过滤 返回结果 video_ids_new = [i for i in video_ids if i not in videos_with_risk] # print(risk_filter_flag) # print(app_region_filtered) # print(video_ids) # print(app_type) # print(region_code) # print(videos_with_risk) # print(video_ids_new) # print(len(video_ids)) # print(len(video_ids_new)) return self.truncation(video_ids_new) def truncation(self, video_ids): if self.force_truncation is None: return video_ids else: return video_ids[:min(self.force_truncation, len(video_ids))] def filter_videos_with_festival(self, video_ids: List[int]): # 1 获取当前时间,判断过滤标准 now_date = datetime.today() now_dt = datetime.strftime(now_date, '%Y%m%d%H') now_dt_int = int(now_dt) filter_fes = [] for k, v in FESTIVAL.items(): if now_dt_int >= v[0] and now_dt_int < v[1]: filter_fes.append(k) if len(filter_fes) == 0: return video_ids # 2 过滤 redis_keys = ["alg_recsys_video_tags_" + str(id) for id in video_ids] redis_helper = RedisHelper() redis_values = redis_helper.get_batch_key(redis_keys) if redis_values and len(redis_values) > 0 and len(redis_values) == len(redis_keys): video_ids_new = [] for id, tags in zip(video_ids, redis_values): flag = True if tags and len(tags) > 0: for t in tags.split(","): if t in filter_fes: flag = False break if flag: video_ids_new.append(id) return video_ids_new else: return video_ids if __name__ == '__main__': user = [ ('weixin_openid_o0w175fDc8pNnywrYN49E341tKfI', ''), ('weixin_openid_o0w175YwC3hStzcR5DAQdbgzdMeI', ''), ('weixin_openid_o0w175ftZDl6VJVDx9la3WVPh7mU', '15900461'), ('weixin_openid_o0w175SPqpCVRcp7x1XvnX4qpIvI', '19659040'), ('weixin_openid_o0w175cOnguapyWIrDrHkOWl4oFQ', '31210128'), ('weixin_openid_o0w175UXYId-o71e1Q3SOheYNteQ', '33099722'), ('weixin_openid_o0w175QQ5b42AtOe50bchrFgcttA', ''), ('weixin_openid_o0w175bgaPlfLsp3YLDKWqLWtXX8', '35371534'), ('weixin_openid_o0w175eRpvbmV6nOhM1VTyyLICWA', '30488803'), ('weixin_openid_o0w175TZYvG47pQkOjyJFoxQuqsw', '') ] video_df = pd.read_csv('./data/videoids.csv') videoid_list = video_df['videoid'].tolist() for mid, uid in user: video_ids = random.sample(videoid_list, 1000) start_time = time.time() filter_ = FilterVideos(request_id=f'{mid} - {uid}', app_type=0, mid=mid, uid=uid, video_ids=video_ids) res = filter_.filter_videos_new() print(f"res: {res}\nexecute_time: {(time.time() - start_time) * 1000}") # filter_.filter_video_status(video_ids=[1, 3, 5]) # videos = [{'videoId': 9034659, 'flowPool': '3#11#3#1637824188547'}, {'videoId': 9035052, 'flowPool': '3#11#3#1637824172827'}] # res = get_videos_remain_view_count(4, videos) # print(res) # text = '测试 @李倩' # send_msg_to_feishu(text) # update_video_w_h_rate(video_id=113, key_name='') # mid = "weixin_openid_obHDW5c4g3aULfCWh-68LcUSxCB" # request_url = f"{config_.GET_USER_30DayReturnCnt_URL}{mid}" # res = request_get(request_url=request_url, timeout=100) # res = get_user_has30day_return(mid=mid) # print(res, type(res))