123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008 |
- import traceback
- import requests
- import json
- import time
- import gevent
- import pandas as pd
- import random
- from datetime import date, timedelta, datetime
- from typing import List
- # from db_helper import HologresHelper, RedisHelper, MysqlHelper
- from db_helper import RedisHelper, MysqlHelper
- from config import set_config
- from log import Log
- from parameter_update import param_update_risk_rule
- from parameter_update import param_update_risk_videos
- from parameter_update import param_update_risk_filter_flag
- config_ = set_config()
- log_ = Log()
- FESTIVAL = [
- ["除夕", 2024020900, 2024030100],
- ["春节", 2024020900, 2024021800],
- ["初一", 2024021010, 2024021800],
- ["初二", 2024021110, 2024021800],
- ["初三", 2024021210, 2024021800],
- ["初四", 2024021310, 2024021800],
- ["初五", 2024021410, 2024021800],
- ["情人节", 2024021410, 2024021800],
- ["初六", 2024021410, 2024021800],
- ["初七", 2024021410, 2024021800],
- ["初八", 2024021410, 2024021800],
- ["雨水", 2024021909, 2024022000],
- ["妇女节", 2024030808, 2024031200],
- ["龙抬头", 2024031109, 2024031400],
- ["清明", 2024040408, 2024040700]
- ]
- def send_msg_to_feishu(msg_text):
- """发送消息到飞书"""
- # webhook地址
- webhook = 'https://open.feishu.cn/open-apis/bot/v2/hook/8de4de35-30ed-4692-8854-7a154e89b2f2'
- # 自定义关键词key_word
- key_word = '服务报警'
- headers = {'Content-Type': 'application/json'}
- payload_message = {
- "msg_type": "text",
- "content": {
- "text": '{}: {}'.format(key_word, msg_text)
- }
- }
- response = requests.request('POST', url=webhook, headers=headers, data=json.dumps(payload_message))
- # print(response.text)
- def request_post(request_url, request_data, timeout):
- """
- post 请求 HTTP接口
- :param request_url: 接口URL
- :param request_data: 请求参数
- :param timeout: 超时时间,单位为秒,type-float or tuple(connect_timeout, read_timeout)
- :return: res_data json格式
- """
- try:
- headers = {"Connection": "close"}
- #print(request_url)
- #print(headers)
- response = requests.post(url=request_url, json=request_data, timeout=timeout, headers=headers)
- #print("response:", response)
- if response.status_code == 200:
- res_data = json.loads(response.text)
- return res_data
- else:
- return None
- except Exception as e:
- #print(e)
- log_.error('url: {}, exception: {}, traceback: {}'.format(request_url, e, traceback.format_exc()))
- return None
- def request_post_data(request_url, request_data, timeout):
- """
- post 请求 HTTP接口
- :param request_url: 接口URL
- :param request_data: 请求参数
- :param timeout: 超时时间,单位为秒,type-float or tuple(connect_timeout, read_timeout)
- :return: res_data json格式
- """
- try:
- headers = {'content-type': 'application/json'}
- response = requests.post(url=request_url, data=request_data, timeout=timeout, headers=headers)
- #print("response:", response)
- if response.status_code == 200:
- res_data = json.loads(response.text)
- return res_data['outputs']
- else:
- return None
- except Exception as e:
- #print(e)
- log_.error('url: {}, exception: {}, traceback: {}'.format(request_url, e, traceback.format_exc()))
- return None
- def request_get(request_url, timeout):
- """
- get 请求 HTTP接口
- :param request_url: 接口URL
- :param timeout: 超时时间,单位为秒,type-float or tuple(connect_timeout, read_timeout)
- :return: res_data json格式
- """
- try:
- response = requests.get(url=request_url, timeout=timeout)
- if response.status_code == 200:
- res_data = json.loads(response.text)
- return res_data
- else:
- return None
- except Exception as e:
- log_.error('url: {}, exception: {}, traceback: {}'.format(request_url, e, traceback.format_exc()))
- return None
- def get_user_has30day_return(mid):
- """
- 获取用户近30天是否有回流
- :param mid: mid
- :return: data, type
- """
- if not mid:
- return None
- # 获取redis中存储的状态值
- user_key = f"{config_.KEY_NAME_PREFIX_USER_HAS30DAY_RETURN}{mid}"
- redis_helper = RedisHelper()
- data = redis_helper.get_data_from_redis(key_name=user_key)
- if data is not None:
- return int(data)
- else:
- request_url = f"{config_.GET_USER_30DayReturnCnt_URL}{mid}"
- result = request_get(request_url=request_url, timeout=0.1)
- if result is None:
- return None
- if result['code'] != 0:
- return None
- data = result['data']
- if data is True:
- redis_data = 1
- else:
- redis_data = 0
- redis_helper.set_data_to_redis(key_name=user_key, value=redis_data, expire_time=2 * 3600)
- return redis_data
- def get_videos_remain_view_count(app_type, videos):
- """
- 获取视频在流量池中的剩余可分发数
- :param app_type: 产品标识 type-int
- :param videos: 视频信息 (视频id, 流量池标记) type-list,[{'videoId': video_id, 'flowPool': flow_pool}, ...]
- :return: data type-list,[(video_id, flow_pool, view_count), ...]
- error_flag 错误标记,True为错误
- """
- error_flag = False
- if not videos:
- return [], error_flag
- request_data = {'appType': app_type, 'videos': videos}
- result = request_post(request_url=config_.GET_REMAIN_VIEW_COUNT_URL, request_data=request_data, timeout=(0.1, 1))
- if result is None:
- error_flag = True
- return [], error_flag
- if result['code'] != 0:
- log_.info('获取视频在流量池中的剩余可分发数失败')
- error_flag = True
- return [], error_flag
- data = [(item['videoId'], item['flowPool'], item['distributeCount']) for item in result['data']]
- return data, error_flag
- def get_videos_local_distribute_count(video_id, flow_pool):
- """
- 获取流量池视频本地分发数
- :param video_id: video_id
- :param flow_pool: 流量池标记
- :return: current_count 本地记录的分发数
- """
- # redis_h = datetime.now().hour
- # if datetime.now().minute >= 30:
- # redis_h += 0.5
- # key_name = config_.LOCAL_DISTRIBUTE_COUNT_PREFIX + str(redis_h)
- key_name = f'{config_.LOCAL_DISTRIBUTE_COUNT_PREFIX}{video_id}:{flow_pool}'
- redis_helper = RedisHelper()
- # video = '{}-{}'.format(video_id, flow_pool)
- # current_count = redis_helper.get_score_with_value(key_name=key_name, value=video)
- current_count = redis_helper.get_data_from_redis(key_name=key_name)
- if current_count is not None:
- return int(current_count)
- else:
- return None
- def update_video_w_h_rate(video_id, key_name):
- """
- 获取横屏视频的宽高比,并存入redis中 (width/height>1)
- :param video_id: videoId type-int
- :param key_name: redis key
- :return: None
- """
- # 获取数据
- sql = "SELECT id, width, height, rotate FROM longvideo.wx_video WHERE id = {};".format(video_id)
- mysql_helper = MysqlHelper()
- data = mysql_helper.get_data(sql=sql)
- if len(data) == 0:
- return
- # 更新到redis
- width, height, rotate = int(data[0][1]), int(data[0][2]), int(data[0][3])
- if width == 0 or height == 0:
- return
- if rotate in (90, 270):
- w_h_rate = height / width
- else:
- w_h_rate = width / height
- if w_h_rate > 1:
- info_data = {int(video_id): w_h_rate}
- else:
- return
- redis_helper = RedisHelper()
- # 写入新数据
- if len(info_data) > 0:
- redis_helper.add_data_with_zset(key_name=key_name, data=info_data)
- class FilterVideos(object):
- """视频过滤"""
- def __init__(self, request_id, app_type, video_ids, mid='', uid='',
- expansion_factor=None,
- risk_filter_flag=None,
- app_region_filtered=None,
- videos_with_risk=None,
- force_truncation=None,
- env_dict=None
- ):
- """
- 初始化
- :param request_id: request_id
- :param app_type: 产品标识 type-int
- :param video_ids: 需过滤的视频列表 type-list
- :param mid: mid type-string
- :param uid: uid type-string
- """
- self.request_id = request_id
- self.app_type = app_type
- self.mid = mid
- self.uid = uid
- self.video_ids = video_ids
- self.expansion_factor = expansion_factor
- self.risk_filter_flag = risk_filter_flag
- self.app_region_filtered = app_region_filtered
- self.videos_with_risk = videos_with_risk
- self.force_truncation = force_truncation
- self.env_dict = env_dict
- def filter_video_status_h(self, video_ids, rule_key, data_key, ab_code, province_code, key_flag=''):
- """召回小时级更新的视频状态过滤"""
- # 根据Redis缓存中的数据过滤
- redis_helper = RedisHelper()
- # 获取不符合推荐状态的视频
- if ab_code in [code for _, code in config_.AB_CODE['region_rank_by_h'].items()]:
- if key_flag == 'region_24h':
- key_prefix = f"{config_.REGION_H_VIDEO_FILER_24H}{province_code}."
- elif key_flag == 'day_24h':
- key_prefix = f"{config_.H_VIDEO_FILER_24H}{province_code}."
- else:
- key_prefix = f"{config_.REGION_H_VIDEO_FILER}{province_code}."
- elif ab_code in [code for _, code in config_.AB_CODE['rank_by_24h'].items()]:
- key_prefix = config_.H_VIDEO_FILER_24H
- elif key_flag == '24h':
- key_prefix = config_.H_VIDEO_FILER_24H
- else:
- key_prefix = config_.H_VIDEO_FILER
- filter_videos_list = redis_helper.get_data_from_set(
- key_name=f"{key_prefix}{self.app_type}.{data_key}.{rule_key}"
- )
- if not filter_videos_list:
- return video_ids
- filter_videos = [int(video) for video in filter_videos_list]
- filtered_videos = [video_id for video_id in video_ids if video_id not in filter_videos]
- return filtered_videos
- def filter_videos_h(self, rule_key, data_key, ab_code, province_code, key_flag='', pool_type='rov'):
- """召回小时级更新的视频过滤"""
- # 预曝光过滤
- # st_pre = time.time()
- filtered_pre_result = self.filter_video_previewed(self.video_ids)
- # et_pre = time.time()
- # log_.info({
- # 'logTimestamp': int(time.time() * 1000),
- # 'request_id': self.request_id,
- # 'app_type': self.app_type,
- # 'mid': self.mid,
- # 'uid': self.uid,
- # 'operation': 'preview_filter',
- # 'request_videos': self.video_ids,
- # 'preview_filter_result': filtered_pre_result,
- # 'executeTime': (time.time() - st_pre) * 1000
- # })
- if not filtered_pre_result:
- return None
- # 视频状态过滤
- # st_status = time.time()
- filtered_status_result = self.filter_video_status_h(video_ids=filtered_pre_result, rule_key=rule_key,
- data_key=data_key, ab_code=ab_code,
- province_code=province_code, key_flag=key_flag)
- # et_status = time.time()
- # log_.info({
- # 'logTimestamp': int(time.time() * 1000),
- # 'request_id': self.request_id,
- # 'app_type': self.app_type,
- # 'mid': self.mid,
- # 'uid': self.uid,
- # 'operation': 'status_filter',
- # 'request_videos': filtered_pre_result,
- # 'status_filter_result': filtered_status_result,
- # 'executeTime': (time.time() - st_status) * 1000
- # })
- if not filtered_status_result:
- return None
- # 视频已曝光过滤
- st_viewed = time.time()
- filtered_viewed_result = self.filter_video_viewed(video_ids=filtered_status_result)
- # et_viewed = time.time()
- log_.info({
- 'logTimestamp': int(time.time() * 1000),
- 'pool_type': pool_type,
- 'request_id': self.request_id,
- 'app_type': self.app_type,
- 'mid': self.mid,
- 'uid': self.uid,
- 'operation': 'view_filter',
- 'request_videos': filtered_status_result,
- 'view_filter_result': filtered_viewed_result,
- 'executeTime': (time.time() - st_viewed) * 1000
- })
- if not filtered_viewed_result:
- return None
- else:
- return [int(video_id) for video_id in filtered_viewed_result]
- def filter_videos(self, pool_type='rov', region_code=None, shield_config=None):
- """视频过滤"""
- # todo: 添加app和region的风险过滤。
- st_viewed = time.time()
- videos_filtered = self.filter_videos_with_risk_video(self.video_ids, self.app_type, region_code)
- # videos_filtered.append(18562889)
- # videos_filtered.append(18613648)
- # videos_filtered.append(18608478)
- videos_filtered = self.filter_videos_with_festival(videos_filtered)
- # print(str(videos_filtered))
- # log_.info({
- # 'logTimestamp': int(time.time() * 1000),
- # 'pool_type': "zhangbo-filter-pool_type",
- # 'request_id': self.request_id,
- # 'app_type': self.app_type,
- # 'mid': "zhangbo-filter_videos",
- # 'uid': self.uid,
- # 'operation': 'shield_filter',
- # 'request_videos': self.video_ids,
- # 'shield_filter_result': videos_filtered,
- # 'executeTime': (time.time() - st_viewed) * 1000
- # })
- # 预曝光过滤
- st_pre = time.time()
- filtered_pre_result = self.filter_video_previewed(videos_filtered)
- # print("filtered_pre:", (time.time()-st_pre)*1000)
- # et_pre = time.time()
- # log_.info({
- # 'logTimestamp': int(time.time() * 1000),
- # 'request_id': self.request_id,
- # 'app_type': self.app_type,
- # 'mid': self.mid,
- # 'uid': self.uid,
- # 'operation': 'preview_filter',
- # 'request_videos': self.video_ids,
- # 'preview_filter_result': filtered_pre_result,
- # 'executeTime': (time.time() - st_pre) * 1000
- # })
- if not filtered_pre_result:
- return None
- # 视频状态过滤采用离线定时过滤方案
- # 视频状态过滤
- # st_status = time.time()
- # filtered_status_result = self.filter_video_status(video_ids=filtered_pre_result)
- # et_status = time.time()
- # log_.info('filter by video status: result = {}, execute time = {}ms'.format(
- # filtered_status_result, (et_status - st_status) * 1000))
- # if not filtered_status_result:
- # return None
- # 视频已曝光过滤
- st_viewed = time.time()
- filtered_viewed_result = self.filter_video_viewed(video_ids=filtered_pre_result, region_code=region_code)
- # print("filtered_pre:", (time.time() - st_viewed) * 1000)
- # et_viewed = time.time()
- # log_.info({
- # 'logTimestamp': int(time.time() * 1000),
- # 'pool_type': pool_type,
- # 'request_id': self.request_id,
- # 'app_type': self.app_type,
- # 'mid': self.mid,
- # 'uid': self.uid,
- # 'operation': 'view_filter',
- # 'request_videos': filtered_pre_result,
- # 'view_filter_result': filtered_viewed_result,
- # 'executeTime': (time.time() - st_viewed) * 1000
- # })
- if not filtered_viewed_result:
- return None
- filtered_viewed_videos = [int(video_id) for video_id in filtered_viewed_result]
- return filtered_viewed_videos
- # if pool_type == 'flow' or pool_type=='normal':
- # # 流量池视频需过滤屏蔽视频
- # if region_code is None or shield_config is None:
- # return filtered_viewed_videos
- # else:
- # shield_key_name_list = shield_config.get(region_code, None)
- # if shield_key_name_list is not None:
- # filtered_shield_video_ids = self.filter_shield_video(
- # video_ids=filtered_viewed_videos, shield_key_name_list=shield_key_name_list
- # )
- # log_.info({
- # 'logTimestamp': int(time.time() * 1000),
- # 'pool_type': pool_type,
- # 'request_id': self.request_id,
- # 'app_type': self.app_type,
- # 'mid': self.mid,
- # 'uid': self.uid,
- # 'operation': 'shield_filter',
- # 'request_videos': filtered_viewed_videos,
- # 'shield_filter_result': filtered_shield_video_ids,
- # 'executeTime': (time.time() - st_viewed) * 1000
- # })
- # # print("filtered_pre flow:", (time.time() - st_viewed) * 1000)
- # return filtered_shield_video_ids
- # else:
- # return filtered_viewed_videos
- # else:
- # return filtered_viewed_videos
- def filter_video_previewed(self, video_ids):
- """
- 预曝光过滤
- :param video_ids: 需过滤的视频列表 type-list
- :return: filtered_videos 过滤后的列表 type-list
- """
- pre_time = time.time()
- if not self.mid or self.mid == 'null':
- # mid为空时,不做预曝光过滤
- return video_ids
- # 根据Redis缓存中的数据过滤
- redis_helper = RedisHelper()
- # key拼接
- key_name = f"{config_.PREVIEW_KEY_PREFIX}{self.app_type}:{self.mid}"
- #print("key_name:", key_name)
- pe_videos_list = redis_helper.get_data_from_set(key_name)
- #print("pe_videos_list:", pe_videos_list)
- # log_.info('****app_type = {}, mid = {}, uid = {}, pe_videos_list = {}'.format(
- # self.app_type, self.mid, self.uid, pe_videos_list))
- # log_.info('****app_type = {}, mid = {}, uid = {}, video_ids = {}'.format(
- # self.app_type, self.mid, self.uid, video_ids))
- if not pe_videos_list:
- return video_ids
- pe_videos = [int(video) for video in pe_videos_list]
- #print("pe_videos:", len(pe_videos))
- filtered_videos = [video_id for video_id in video_ids if video_id not in pe_videos]
- #print(f"pre res: {filtered_videos}\nexecute_time: {(time.time() - pre_time) * 1000}")
- return filtered_videos
- # def filter_video_status(self, video_ids):
- # """
- # 对视频状态进行过滤
- # :param video_ids: 视频id列表 type-list
- # :return: filtered_videos
- # """
- # if len(video_ids) == 1:
- # sql = "set hg_experimental_enable_shard_pruning=off; " \
- # "SELECT video_id " \
- # "FROM {} " \
- # "WHERE audit_status = 5 " \
- # "AND applet_rec_status IN (1, -6) " \
- # "AND open_status = 1 " \
- # "AND payment_status = 0 " \
- # "AND encryption_status != 5 " \
- # "AND transcoding_status = 3 " \
- # "AND video_id IN ({});".format(config_.VIDEO_STATUS, video_ids[0])
- # else:
- # sql = "set hg_experimental_enable_shard_pruning=off; " \
- # "SELECT video_id " \
- # "FROM {} " \
- # "WHERE audit_status = 5 " \
- # "AND applet_rec_status IN (1, -6) " \
- # "AND open_status = 1 " \
- # "AND payment_status = 0 " \
- # "AND encryption_status != 5 " \
- # "AND transcoding_status = 3 " \
- # "AND video_id IN {};".format(config_.VIDEO_STATUS, tuple(video_ids))
- #
- # hologres_helper = HologresHelper()
- # data = hologres_helper.get_data(sql=sql)
- # filtered_videos = [int(temp[0]) for temp in data]
- # return filtered_videos
- def filter_video_viewed(self, video_ids, region_code, types=(1, 6,)):
- """
- 调用后端接口过滤用户已观看视频
- :param video_ids: 视频id列表 type-list
- :param types: 过滤参数 type-tuple, 默认(1, )
- 1-已观看 2-视频状态 3-是否进入老年人社区 4-话题状态 5-推荐状态 6-白名单过滤 7-涉政视频过滤
- :return: filtered_videos
- """
- # 获取对应端的过滤参数types
- types = config_.FILTER_VIEWED_TYPES_CONFIG.get(self.app_type, None)
- if types is None:
- types = config_.FILTER_VIEWED_TYPES_CONFIG.get('other')
- try:
- log_.info("cityCode:" + self.env_dict["cityCode"] if "cityCode" in self.env_dict else "-1")
- log_.info("hotSenceType:" + str(self.env_dict and self.env_dict["hotSenceType"] if "hotSenceType" in self.env_dict else 0))
- except Exception as e:
- pass
- request_data = {"appType": self.app_type,
- "mid": self.mid,
- "uid": self.uid,
- "types": list(types),
- "videoIds": video_ids,
- "cityCode": self.env_dict["cityCode"] if self.env_dict and "cityCode" in self.env_dict else "-1",
- "hotSenceType": self.env_dict["hotSenceType"] if self.env_dict and "hotSenceType" in self.env_dict else 0
- }
- # print(request_data)
- # 调用http接口
- result = request_post(request_url=config_.VIDEO_FILTER_URL, request_data=request_data, timeout=(0.2, 1))
- # print("result:", result)
- if result is None:
- # print("result is None")
- # log_.info('过滤失败,types: {}'.format(types))
- return []
- if result['code'] != 0:
- # log_.info('过滤失败,types: {}'.format(types))
- return []
- filtered_videos = result['data']
- return filtered_videos
- def filter_video_viewed_new(self, video_ids):
- """
- 调用后端接口过滤用户已观看视频
- :param video_ids: 视频id列表 type-list
- :param types: 过滤参数 type-tuple, 默认(1, )
- 1-已观看 2-视频状态 3-是否进入老年人社区 4-话题状态 5-推荐状态 6-白名单过滤 7-涉政视频过滤
- :return: filtered_videos
- """
- # 获取对应端的过滤参数types
- st_time = time.time()
- types = config_.FILTER_VIEWED_TYPES_CONFIG.get(self.app_type, None)
- #print(types)
- if types is None:
- types = config_.FILTER_VIEWED_TYPES_CONFIG.get('other')
- if 6 in types:
- types = list(types)
- types.remove(6)
- #print(types)
- request_data = {"appType": self.app_type,
- "mid": self.mid,
- "uid": self.uid,
- "types": list(types),
- "videoIds": video_ids}
- # 调用http接口
- result = request_post(request_url=config_.VIDEO_FILTER_URL, request_data=request_data, timeout=(0.1, 1))
- #print(f"view res: {result}\nexecute_time: {(time.time() - st_time) * 1000}")
- if result is None:
- # log_.info('过滤失败,types: {}'.format(types))
- return []
- if result['code'] != 0:
- # log_.info('过滤失败,types: {}'.format(types))
- return []
- filtered_videos = result['data']
- return filtered_videos
- def filter_shield_video(self, video_ids, shield_key_name_list):
- """
- 过滤屏蔽视频视频
- :param video_ids: 需过滤的视频列表 type-list
- :param shield_key_name_list: 过滤视频 redis-key
- :return: filtered_videos 过滤后的列表 type-list
- """
- # print("filter_shield_video:", len(filter_shield_video))
- if len(video_ids) == 0:
- return video_ids
- # 根据Redis缓存中的数据过滤
- redis_helper = RedisHelper()
- for shield_key_name in shield_key_name_list:
- video_ids = [
- int(video_id) for video_id in video_ids
- if not redis_helper.data_exists_with_set(key_name=shield_key_name, value=video_id)
- ]
- # shield_videos_list = redis_helper.get_data_from_set(key_name=shield_key_name)
- # if not shield_videos_list:
- # continue
- # shield_videos = [int(video) for video in shield_videos_list]
- # video_ids = [int(video_id) for video_id in video_ids if int(video_id) not in shield_videos]
- # print("video_ids:", len(video_ids))
- return video_ids
- def new_filter_video(self):
- """视频过滤"""
- # 1. 预曝光过滤
- st_pre = time.time()
- #print("new_filter video_ids:", self.video_ids)
- filtered_pre_result = self.filter_video_previewed(self.video_ids)
- if not filtered_pre_result:
- return None
- # log_.info({
- # 'logTimestamp': int(time.time() * 1000),
- # 'request_id': self.request_id,
- # 'app_type': self.app_type,
- # 'mid': self.mid,
- # 'uid': self.uid,
- # 'operation': 'preview_filter',
- # 'request_videos': self.video_ids,
- # 'preview_filter_result': filtered_pre_result,
- # 'executeTime': (time.time() - st_pre) * 1000
- # })
- #2. 视频已曝光过滤
- st_viewed = time.time()
- #print("---filtered viewed---")
- #print("filtered_pre_result:",filtered_pre_result)
- filtered_viewed_result = self.filter_video_viewed(video_ids=filtered_pre_result)
- if not filtered_viewed_result:
- return None
- return filtered_viewed_result
- def new_flow_video(self, vid_list, flow_vids_set, region_code, shield_config):
- flow_video_list = []
- normal_video_list = []
- for v_id in vid_list:
- if v_id in flow_vids_set:
- flow_video_list.append(v_id)
- else:
- normal_video_list.append(v_id)
- shield_key_name_list = shield_config.get(region_code, None)
- if shield_key_name_list is not None:
- filtered_shield_video_ids = self.filter_shield_video(
- video_ids=flow_video_list, shield_key_name_list=shield_key_name_list
- )
- return normal_video_list, filtered_shield_video_ids
- else:
- return normal_video_list, flow_video_list
- def filter_movie_religion_video(self, video_ids):
- """过滤白名单视频(影视,宗教)"""
- # 影视 + 宗教: rov.filter.movie.{videoId}
- # 宗教: rov.filter.religion.{videoId}
- st_time = time.time()
- if self.app_type not in [config_.APP_TYPE['WAN_NENG_VIDEO'],
- config_.APP_TYPE['LAO_HAO_KAN_VIDEO'],
- config_.APP_TYPE['ZUI_JING_QI'],
- config_.APP_TYPE['H5']]:
- # 过滤 影视 + 宗教
- keys = [f"rov.filter.movie.{video_id}" for video_id in video_ids]
- elif self.app_type in [config_.APP_TYPE['WAN_NENG_VIDEO'],
- config_.APP_TYPE['ZUI_JING_QI'],
- config_.APP_TYPE['H5']]:
- # 过滤 影视 + 宗教
- keys = [f"rov.filter.religion.{video_id}" for video_id in video_ids]
- else:
- #print(f"m_r res: {video_ids}\nexecute_time: {(time.time() - st_time) * 1000}")
- return video_ids
- redis_helper = RedisHelper(redis_info=config_.REDIS_INFO_FILTER)
- filter_videos = []
- for i in range(len(keys)//1000 + 1):
- video_ids_temp = video_ids[i*1000:(i+1)*1000]
- if len(video_ids_temp) == 0:
- break
- mget_res = redis_helper.mget(keys=keys[i*1000:(i+1)*1000])
- filter_videos.extend([int(data) for data in mget_res if data is not None])
- if len(filter_videos) > 0:
- filtered_videos = set(video_ids) - set(filter_videos)
- #print(f"m_r res: {list(filtered_videos)}\nexecute_time: {(time.time() - st_time) * 1000}")
- return list(filtered_videos)
- else:
- #print(f"m_r res: {video_ids}\nexecute_time: {(time.time() - st_time) * 1000}")
- return video_ids
- def filter_videos_new(self, region_code=None, shield_config=None, flow_set=None):
- """视频过滤"""
- # 预曝光过滤
- st_pre = time.time()
- #print("self.video_ids:", len(self.video_ids))
- filtered_pre_result = self.filter_video_previewed(self.video_ids)
- if not filtered_pre_result:
- return None
- #print("filtered_pre_result:", len(filtered_pre_result))
- #print(filtered_pre_result)
- # 视频已曝光过滤/白名单过滤
- st_viewed = time.time()
- t = [
- gevent.spawn(self.filter_video_viewed_new, filtered_pre_result),
- gevent.spawn(self.filter_movie_religion_video, filtered_pre_result)]
- gevent.joinall(t)
- filtered_result_list = [i.get() for i in t]
- #print("filtered_result_list1:",filtered_result_list[0])
- #print("filtered_result_list2:",filtered_result_list[1])
- filtered_viewed_set = set('')
- for i in filtered_result_list[0]:
- filtered_viewed_set.add(int(i))
- filter_video_set =set('')
- for j in filtered_result_list[1]:
- filter_video_set.add(int(j))
- filtered_viewed_result = list(filtered_viewed_set & filter_video_set)
- #print(f"view&m_r res: {filtered_viewed_result}\nexecute_time: {(time.time() - st_viewed) * 1000}")
- #print("filtered:",len(filtered_viewed_result))
- if not filtered_viewed_result:
- return None
- filtered_viewed_videos = [int(video_id) for video_id in filtered_viewed_result]
- #print("result:", filtered_viewed_videos)
- if flow_set is None:
- return filtered_viewed_videos
- else:
- # 流量池视频需过滤屏蔽视频
- if region_code is None or shield_config is None:
- return filtered_viewed_videos
- else:
- normal_recall_ids = []
- left_flow_ids = []
- for vid in filtered_viewed_videos:
- if vid in flow_set:
- left_flow_ids.append(vid)
- else:
- normal_recall_ids.append(vid)
- shield_key_name_list = shield_config.get(region_code, None)
- if shield_key_name_list is not None:
- filtered_shield_video_ids = self.filter_shield_video(
- video_ids=left_flow_ids, shield_key_name_list=shield_key_name_list
- )
- return normal_recall_ids+filtered_shield_video_ids
- else:
- return filtered_viewed_videos
- def filter_videos_status(self, pool_type='rov', region_code=None, shield_config=None):
- """视频过滤"""
- # todo: 添加app和region的风险过滤。
- st_viewed = time.time()
- videos_filtered = self.filter_videos_with_risk_video(self.video_ids, self.app_type, region_code)
- videos_filtered = self.filter_videos_with_festival(videos_filtered)
- # log_.info({
- # 'logTimestamp': int(time.time() * 1000),
- # 'pool_type': "zhangbo-filter-pool_type",
- # 'request_id': self.request_id,
- # 'app_type': self.app_type,
- # 'mid': "zhangbo-filter_videos_status",
- # 'uid': self.uid,
- # 'operation': 'shield_filter',
- # 'request_videos': self.video_ids,
- # 'shield_filter_result': videos_filtered,
- # 'executeTime': (time.time() - st_viewed) * 1000
- # })
- # 预曝光过滤
- st_pre = time.time()
- filtered_pre_result = self.filter_video_previewed(videos_filtered)
- # print("filtered_pre:", (time.time()-st_pre)*1000)
- # et_pre = time.time()
- # log_.info({
- # 'logTimestamp': int(time.time() * 1000),
- # 'request_id': self.request_id,
- # 'app_type': self.app_type,
- # 'mid': self.mid,
- # 'uid': self.uid,
- # 'operation': 'preview_filter',
- # 'request_videos': self.video_ids,
- # 'preview_filter_result': filtered_pre_result,
- # 'executeTime': (time.time() - st_pre) * 1000
- # })
- if not filtered_pre_result:
- return None
- # 视频状态过滤采用离线定时过滤方案
- # 视频状态过滤
- # st_status = time.time()
- # filtered_status_result = self.filter_video_status(video_ids=filtered_pre_result)
- # et_status = time.time()
- # log_.info('filter by video status: result = {}, execute time = {}ms'.format(
- # filtered_status_result, (et_status - st_status) * 1000))
- # if not filtered_status_result:
- # return None
- # 视频已曝光过滤
- st_viewed = time.time()
- filtered_viewed_result = self.filter_video_viewed_status(video_ids=filtered_pre_result, region_code=region_code)
- # print("filtered_pre:", (time.time() - st_viewed) * 1000)
- # et_viewed = time.time()
- # log_.info({
- # 'logTimestamp': int(time.time() * 1000),
- # 'pool_type': pool_type,
- # 'request_id': self.request_id,
- # 'app_type': self.app_type,
- # 'mid': self.mid,
- # 'uid': self.uid,
- # 'operation': 'view_filter',
- # 'request_videos': filtered_pre_result,
- # 'view_filter_result': filtered_viewed_result,
- # 'executeTime': (time.time() - st_viewed) * 1000
- # })
- if not filtered_viewed_result:
- return None
- filtered_viewed_videos = [int(video_id) for video_id in filtered_viewed_result]
- return filtered_viewed_videos
- # if pool_type == 'flow' or pool_type=='normal':
- # # 流量池视频需过滤屏蔽视频
- # if region_code is None or shield_config is None:
- # return filtered_viewed_videos
- # else:
- # shield_key_name_list = shield_config.get(region_code, None)
- # if shield_key_name_list is not None:
- # filtered_shield_video_ids = self.filter_shield_video(
- # video_ids=filtered_viewed_videos, shield_key_name_list=shield_key_name_list
- # )
- # log_.info({
- # 'logTimestamp': int(time.time() * 1000),
- # 'pool_type': pool_type,
- # 'request_id': self.request_id,
- # 'app_type': self.app_type,
- # 'mid': self.mid,
- # 'uid': self.uid,
- # 'operation': 'shield_filter',
- # 'request_videos': filtered_viewed_videos,
- # 'shield_filter_result': filtered_shield_video_ids,
- # 'executeTime': (time.time() - st_viewed) * 1000
- # })
- # # print("filtered_pre flow:", (time.time() - st_viewed) * 1000)
- # return filtered_shield_video_ids
- # else:
- # return filtered_viewed_videos
- # else:
- # return filtered_viewed_videos
- def filter_video_viewed_status(self, video_ids, region_code, types=(1, 6,)):
- """
- 调用后端接口过滤用户已观看视频
- :param video_ids: 视频id列表 type-list
- :param types: 过滤参数 type-tuple, 默认(1, )
- 1-已观看 2-视频状态 3-是否进入老年人社区 4-话题状态 5-推荐状态 6-白名单过滤 7-涉政视频过滤
- :return: filtered_videos
- """
- # 获取对应端的过滤参数types
- types = config_.FILTER_VIEWED_TYPES_CONFIG.get(self.app_type, None)
- if types is None:
- types = config_.FILTER_VIEWED_TYPES_CONFIG.get('other')
- types = list(types)
- types.append(2)
- request_data = {"appType": self.app_type,
- "mid": self.mid,
- "uid": self.uid,
- "types": types,
- "videoIds": video_ids,
- "cityCode": self.env_dict["cityCode"] if self.env_dict and "cityCode" in self.env_dict else "-1",
- "hotSenceType": self.env_dict["hotSenceType"] if self.env_dict and "hotSenceType" in self.env_dict else 0
- }
- # print(request_data)
- # 调用http接口
- result = request_post(request_url=config_.VIDEO_FILTER_URL, request_data=request_data, timeout=(0.2, 1))
- # print("result:", result)
- if result is None:
- # print("result is None")
- # log_.info('过滤失败,types: {}'.format(types))
- return []
- if result['code'] != 0:
- # log_.info('过滤失败,types: {}'.format(types))
- return []
- filtered_videos = result['data']
- return filtered_videos
- def filter_videos_with_risk_video(self, video_ids, app_type, region_code):
- # 0 用一个开关控制,是否过滤生效。 便于回滚功能。
- risk_filter_flag = self.risk_filter_flag
- if not risk_filter_flag:
- return self.truncation(video_ids)
- # 1 判断是否过滤,不展示的app+区域列表,-1必须过滤
- app_region_filtered = self.app_region_filtered
- if app_type in app_region_filtered.keys():
- if_filtered = False
- if region_code in app_region_filtered[app_type]:
- if_filtered = True
- else:
- if_filtered = True
- if not if_filtered:
- return self.truncation(video_ids)
- # 2 确认过滤,获取风险video列表param_update_risk_videos
- videos_with_risk = self.videos_with_risk
- # 3 过滤 返回结果
- video_ids_new = [i for i in video_ids if i not in videos_with_risk]
- # print(risk_filter_flag)
- # print(app_region_filtered)
- # print(video_ids)
- # print(app_type)
- # print(region_code)
- # print(videos_with_risk)
- # print(video_ids_new)
- # print(len(video_ids))
- # print(len(video_ids_new))
- return self.truncation(video_ids_new)
- def truncation(self, video_ids):
- if self.force_truncation is None:
- return video_ids
- else:
- return video_ids[:min(self.force_truncation, len(video_ids))]
- def filter_videos_with_festival(self, video_ids: List[int]):
- # 1 获取当前时间,判断过滤标准
- now_date = datetime.today()
- now_dt = datetime.strftime(now_date, '%Y%m%d%H')
- now_dt_int = int(now_dt)
- filter_fes = []
- for k, v1, v2 in FESTIVAL:
- if now_dt_int >= v1 and now_dt_int < v2:
- filter_fes.append(k)
- if len(filter_fes) == 0:
- return video_ids
- # 2 过滤
- redis_keys = ["alg_recsys_video_tags_" + str(id) for id in video_ids]
- redis_helper = RedisHelper()
- redis_values = redis_helper.get_batch_key(redis_keys)
- # print(str(video_ids))
- # print(str(redis_values))
- if redis_values and len(redis_values) > 0 and len(redis_values) == len(redis_keys):
- video_ids_new = []
- for id, tags in zip(video_ids, redis_values):
- flag = True
- if tags and len(tags) > 0:
- for t in tags.split(","):
- if t in filter_fes:
- flag = False
- break
- if flag:
- video_ids_new.append(id)
- return video_ids_new
- else:
- return video_ids
- if __name__ == '__main__':
- user = [
- ('weixin_openid_o0w175fDc8pNnywrYN49E341tKfI', ''),
- ('weixin_openid_o0w175YwC3hStzcR5DAQdbgzdMeI', ''),
- ('weixin_openid_o0w175ftZDl6VJVDx9la3WVPh7mU', '15900461'),
- ('weixin_openid_o0w175SPqpCVRcp7x1XvnX4qpIvI', '19659040'),
- ('weixin_openid_o0w175cOnguapyWIrDrHkOWl4oFQ', '31210128'),
- ('weixin_openid_o0w175UXYId-o71e1Q3SOheYNteQ', '33099722'),
- ('weixin_openid_o0w175QQ5b42AtOe50bchrFgcttA', ''),
- ('weixin_openid_o0w175bgaPlfLsp3YLDKWqLWtXX8', '35371534'),
- ('weixin_openid_o0w175eRpvbmV6nOhM1VTyyLICWA', '30488803'),
- ('weixin_openid_o0w175TZYvG47pQkOjyJFoxQuqsw', '')
- ]
- video_df = pd.read_csv('./data/videoids.csv')
- videoid_list = video_df['videoid'].tolist()
- for mid, uid in user:
- video_ids = random.sample(videoid_list, 1000)
- start_time = time.time()
- filter_ = FilterVideos(request_id=f'{mid} - {uid}', app_type=0, mid=mid, uid=uid, video_ids=video_ids)
- res = filter_.filter_videos_new()
- print(f"res: {res}\nexecute_time: {(time.time() - start_time) * 1000}")
- # filter_.filter_video_status(video_ids=[1, 3, 5])
- # videos = [{'videoId': 9034659, 'flowPool': '3#11#3#1637824188547'}, {'videoId': 9035052, 'flowPool': '3#11#3#1637824172827'}]
- # res = get_videos_remain_view_count(4, videos)
- # print(res)
- # text = '测试 @李倩'
- # send_msg_to_feishu(text)
- # update_video_w_h_rate(video_id=113, key_name='')
- # mid = "weixin_openid_obHDW5c4g3aULfCWh-68LcUSxCB"
- # request_url = f"{config_.GET_USER_30DayReturnCnt_URL}{mid}"
- # res = request_get(request_url=request_url, timeout=100)
- # res = get_user_has30day_return(mid=mid)
- # print(res, type(res))
|