utils.py 38 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928
  1. import traceback
  2. import requests
  3. import json
  4. import time
  5. import gevent
  6. import pandas as pd
  7. import random
  8. from datetime import datetime
  9. # from db_helper import HologresHelper, RedisHelper, MysqlHelper
  10. from db_helper import RedisHelper, MysqlHelper
  11. from config import set_config
  12. from log import Log
  13. from parameter_update import param_update_risk_rule
  14. from parameter_update import param_update_risk_videos
  15. from parameter_update import param_update_risk_filter_flag
  16. config_ = set_config()
  17. log_ = Log()
  18. def send_msg_to_feishu(msg_text):
  19. """发送消息到飞书"""
  20. # webhook地址
  21. webhook = 'https://open.feishu.cn/open-apis/bot/v2/hook/8de4de35-30ed-4692-8854-7a154e89b2f2'
  22. # 自定义关键词key_word
  23. key_word = '服务报警'
  24. headers = {'Content-Type': 'application/json'}
  25. payload_message = {
  26. "msg_type": "text",
  27. "content": {
  28. "text": '{}: {}'.format(key_word, msg_text)
  29. }
  30. }
  31. response = requests.request('POST', url=webhook, headers=headers, data=json.dumps(payload_message))
  32. # print(response.text)
  33. def request_post(request_url, request_data, timeout):
  34. """
  35. post 请求 HTTP接口
  36. :param request_url: 接口URL
  37. :param request_data: 请求参数
  38. :param timeout: 超时时间,单位为秒,type-float or tuple(connect_timeout, read_timeout)
  39. :return: res_data json格式
  40. """
  41. try:
  42. headers = {"Connection": "close"}
  43. #print(request_url)
  44. #print(headers)
  45. response = requests.post(url=request_url, json=request_data, timeout=timeout, headers=headers)
  46. #print("response:", response)
  47. if response.status_code == 200:
  48. res_data = json.loads(response.text)
  49. return res_data
  50. else:
  51. return None
  52. except Exception as e:
  53. #print(e)
  54. log_.error('url: {}, exception: {}, traceback: {}'.format(request_url, e, traceback.format_exc()))
  55. return None
  56. def request_post_data(request_url, request_data, timeout):
  57. """
  58. post 请求 HTTP接口
  59. :param request_url: 接口URL
  60. :param request_data: 请求参数
  61. :param timeout: 超时时间,单位为秒,type-float or tuple(connect_timeout, read_timeout)
  62. :return: res_data json格式
  63. """
  64. try:
  65. headers = {'content-type': 'application/json'}
  66. response = requests.post(url=request_url, data=request_data, timeout=timeout, headers=headers)
  67. #print("response:", response)
  68. if response.status_code == 200:
  69. res_data = json.loads(response.text)
  70. return res_data['outputs']
  71. else:
  72. return None
  73. except Exception as e:
  74. #print(e)
  75. log_.error('url: {}, exception: {}, traceback: {}'.format(request_url, e, traceback.format_exc()))
  76. return None
  77. def request_get(request_url, timeout):
  78. """
  79. get 请求 HTTP接口
  80. :param request_url: 接口URL
  81. :param timeout: 超时时间,单位为秒,type-float or tuple(connect_timeout, read_timeout)
  82. :return: res_data json格式
  83. """
  84. try:
  85. response = requests.get(url=request_url, timeout=timeout)
  86. if response.status_code == 200:
  87. res_data = json.loads(response.text)
  88. return res_data
  89. else:
  90. return None
  91. except Exception as e:
  92. log_.error('url: {}, exception: {}, traceback: {}'.format(request_url, e, traceback.format_exc()))
  93. return None
  94. def get_user_has30day_return(mid):
  95. """
  96. 获取用户近30天是否有回流
  97. :param mid: mid
  98. :return: data, type
  99. """
  100. if not mid:
  101. return None
  102. # 获取redis中存储的状态值
  103. user_key = f"{config_.KEY_NAME_PREFIX_USER_HAS30DAY_RETURN}{mid}"
  104. redis_helper = RedisHelper()
  105. data = redis_helper.get_data_from_redis(key_name=user_key)
  106. if data is not None:
  107. return int(data)
  108. else:
  109. request_url = f"{config_.GET_USER_30DayReturnCnt_URL}{mid}"
  110. result = request_get(request_url=request_url, timeout=0.1)
  111. if result is None:
  112. return None
  113. if result['code'] != 0:
  114. return None
  115. data = result['data']
  116. if data is True:
  117. redis_data = 1
  118. else:
  119. redis_data = 0
  120. redis_helper.set_data_to_redis(key_name=user_key, value=redis_data, expire_time=2 * 3600)
  121. return redis_data
  122. def get_videos_remain_view_count(app_type, videos):
  123. """
  124. 获取视频在流量池中的剩余可分发数
  125. :param app_type: 产品标识 type-int
  126. :param videos: 视频信息 (视频id, 流量池标记) type-list,[{'videoId': video_id, 'flowPool': flow_pool}, ...]
  127. :return: data type-list,[(video_id, flow_pool, view_count), ...]
  128. error_flag 错误标记,True为错误
  129. """
  130. error_flag = False
  131. if not videos:
  132. return [], error_flag
  133. request_data = {'appType': app_type, 'videos': videos}
  134. result = request_post(request_url=config_.GET_REMAIN_VIEW_COUNT_URL, request_data=request_data, timeout=(0.1, 1))
  135. if result is None:
  136. error_flag = True
  137. return [], error_flag
  138. if result['code'] != 0:
  139. log_.info('获取视频在流量池中的剩余可分发数失败')
  140. error_flag = True
  141. return [], error_flag
  142. data = [(item['videoId'], item['flowPool'], item['distributeCount']) for item in result['data']]
  143. return data, error_flag
  144. def get_videos_local_distribute_count(video_id, flow_pool):
  145. """
  146. 获取流量池视频本地分发数
  147. :param video_id: video_id
  148. :param flow_pool: 流量池标记
  149. :return: current_count 本地记录的分发数
  150. """
  151. # redis_h = datetime.now().hour
  152. # if datetime.now().minute >= 30:
  153. # redis_h += 0.5
  154. # key_name = config_.LOCAL_DISTRIBUTE_COUNT_PREFIX + str(redis_h)
  155. key_name = f'{config_.LOCAL_DISTRIBUTE_COUNT_PREFIX}{video_id}:{flow_pool}'
  156. redis_helper = RedisHelper()
  157. # video = '{}-{}'.format(video_id, flow_pool)
  158. # current_count = redis_helper.get_score_with_value(key_name=key_name, value=video)
  159. current_count = redis_helper.get_data_from_redis(key_name=key_name)
  160. if current_count is not None:
  161. return int(current_count)
  162. else:
  163. return None
  164. def update_video_w_h_rate(video_id, key_name):
  165. """
  166. 获取横屏视频的宽高比,并存入redis中 (width/height>1)
  167. :param video_id: videoId type-int
  168. :param key_name: redis key
  169. :return: None
  170. """
  171. # 获取数据
  172. sql = "SELECT id, width, height, rotate FROM longvideo.wx_video WHERE id = {};".format(video_id)
  173. mysql_helper = MysqlHelper()
  174. data = mysql_helper.get_data(sql=sql)
  175. if len(data) == 0:
  176. return
  177. # 更新到redis
  178. width, height, rotate = int(data[0][1]), int(data[0][2]), int(data[0][3])
  179. if width == 0 or height == 0:
  180. return
  181. if rotate in (90, 270):
  182. w_h_rate = height / width
  183. else:
  184. w_h_rate = width / height
  185. if w_h_rate > 1:
  186. info_data = {int(video_id): w_h_rate}
  187. else:
  188. return
  189. redis_helper = RedisHelper()
  190. # 写入新数据
  191. if len(info_data) > 0:
  192. redis_helper.add_data_with_zset(key_name=key_name, data=info_data)
  193. class FilterVideos(object):
  194. """视频过滤"""
  195. def __init__(self, request_id, app_type, video_ids, mid='', uid='',
  196. expansion_factor=None,
  197. risk_filter_flag=None,
  198. app_region_filtered=None,
  199. videos_with_risk=None):
  200. """
  201. 初始化
  202. :param request_id: request_id
  203. :param app_type: 产品标识 type-int
  204. :param video_ids: 需过滤的视频列表 type-list
  205. :param mid: mid type-string
  206. :param uid: uid type-string
  207. """
  208. self.request_id = request_id
  209. self.app_type = app_type
  210. self.mid = mid
  211. self.uid = uid
  212. self.video_ids = video_ids
  213. self.expansion_factor = expansion_factor
  214. self.risk_filter_flag = risk_filter_flag
  215. self.app_region_filtered = app_region_filtered
  216. self.videos_with_risk = videos_with_risk
  217. def filter_video_status_h(self, video_ids, rule_key, data_key, ab_code, province_code, key_flag=''):
  218. """召回小时级更新的视频状态过滤"""
  219. # 根据Redis缓存中的数据过滤
  220. redis_helper = RedisHelper()
  221. # 获取不符合推荐状态的视频
  222. if ab_code in [code for _, code in config_.AB_CODE['region_rank_by_h'].items()]:
  223. if key_flag == 'region_24h':
  224. key_prefix = f"{config_.REGION_H_VIDEO_FILER_24H}{province_code}."
  225. elif key_flag == 'day_24h':
  226. key_prefix = f"{config_.H_VIDEO_FILER_24H}{province_code}."
  227. else:
  228. key_prefix = f"{config_.REGION_H_VIDEO_FILER}{province_code}."
  229. elif ab_code in [code for _, code in config_.AB_CODE['rank_by_24h'].items()]:
  230. key_prefix = config_.H_VIDEO_FILER_24H
  231. elif key_flag == '24h':
  232. key_prefix = config_.H_VIDEO_FILER_24H
  233. else:
  234. key_prefix = config_.H_VIDEO_FILER
  235. filter_videos_list = redis_helper.get_data_from_set(
  236. key_name=f"{key_prefix}{self.app_type}.{data_key}.{rule_key}"
  237. )
  238. if not filter_videos_list:
  239. return video_ids
  240. filter_videos = [int(video) for video in filter_videos_list]
  241. filtered_videos = [video_id for video_id in video_ids if video_id not in filter_videos]
  242. return filtered_videos
  243. def filter_videos_h(self, rule_key, data_key, ab_code, province_code, key_flag='', pool_type='rov'):
  244. """召回小时级更新的视频过滤"""
  245. # 预曝光过滤
  246. # st_pre = time.time()
  247. filtered_pre_result = self.filter_video_previewed(self.video_ids)
  248. # et_pre = time.time()
  249. # log_.info({
  250. # 'logTimestamp': int(time.time() * 1000),
  251. # 'request_id': self.request_id,
  252. # 'app_type': self.app_type,
  253. # 'mid': self.mid,
  254. # 'uid': self.uid,
  255. # 'operation': 'preview_filter',
  256. # 'request_videos': self.video_ids,
  257. # 'preview_filter_result': filtered_pre_result,
  258. # 'executeTime': (time.time() - st_pre) * 1000
  259. # })
  260. if not filtered_pre_result:
  261. return None
  262. # 视频状态过滤
  263. # st_status = time.time()
  264. filtered_status_result = self.filter_video_status_h(video_ids=filtered_pre_result, rule_key=rule_key,
  265. data_key=data_key, ab_code=ab_code,
  266. province_code=province_code, key_flag=key_flag)
  267. # et_status = time.time()
  268. # log_.info({
  269. # 'logTimestamp': int(time.time() * 1000),
  270. # 'request_id': self.request_id,
  271. # 'app_type': self.app_type,
  272. # 'mid': self.mid,
  273. # 'uid': self.uid,
  274. # 'operation': 'status_filter',
  275. # 'request_videos': filtered_pre_result,
  276. # 'status_filter_result': filtered_status_result,
  277. # 'executeTime': (time.time() - st_status) * 1000
  278. # })
  279. if not filtered_status_result:
  280. return None
  281. # 视频已曝光过滤
  282. st_viewed = time.time()
  283. filtered_viewed_result = self.filter_video_viewed(video_ids=filtered_status_result)
  284. # et_viewed = time.time()
  285. log_.info({
  286. 'logTimestamp': int(time.time() * 1000),
  287. 'pool_type': pool_type,
  288. 'request_id': self.request_id,
  289. 'app_type': self.app_type,
  290. 'mid': self.mid,
  291. 'uid': self.uid,
  292. 'operation': 'view_filter',
  293. 'request_videos': filtered_status_result,
  294. 'view_filter_result': filtered_viewed_result,
  295. 'executeTime': (time.time() - st_viewed) * 1000
  296. })
  297. if not filtered_viewed_result:
  298. return None
  299. else:
  300. return [int(video_id) for video_id in filtered_viewed_result]
  301. def filter_videos(self, pool_type='rov', region_code=None, shield_config=None):
  302. """视频过滤"""
  303. # todo: 添加app和region的风险过滤。
  304. st_viewed = time.time()
  305. videos_filtered = self.filter_videos_with_risk_video(self.video_ids, self.app_type, region_code)
  306. # log_.info({
  307. # 'logTimestamp': int(time.time() * 1000),
  308. # 'pool_type': "zhangbo-filter-pool_type",
  309. # 'request_id': self.request_id,
  310. # 'app_type': self.app_type,
  311. # 'mid': "zhangbo-filter_videos",
  312. # 'uid': self.uid,
  313. # 'operation': 'shield_filter',
  314. # 'request_videos': self.video_ids,
  315. # 'shield_filter_result': videos_filtered,
  316. # 'executeTime': (time.time() - st_viewed) * 1000
  317. # })
  318. # 预曝光过滤
  319. st_pre = time.time()
  320. filtered_pre_result = self.filter_video_previewed(videos_filtered)
  321. # print("filtered_pre:", (time.time()-st_pre)*1000)
  322. # et_pre = time.time()
  323. # log_.info({
  324. # 'logTimestamp': int(time.time() * 1000),
  325. # 'request_id': self.request_id,
  326. # 'app_type': self.app_type,
  327. # 'mid': self.mid,
  328. # 'uid': self.uid,
  329. # 'operation': 'preview_filter',
  330. # 'request_videos': self.video_ids,
  331. # 'preview_filter_result': filtered_pre_result,
  332. # 'executeTime': (time.time() - st_pre) * 1000
  333. # })
  334. if not filtered_pre_result:
  335. return None
  336. # 视频状态过滤采用离线定时过滤方案
  337. # 视频状态过滤
  338. # st_status = time.time()
  339. # filtered_status_result = self.filter_video_status(video_ids=filtered_pre_result)
  340. # et_status = time.time()
  341. # log_.info('filter by video status: result = {}, execute time = {}ms'.format(
  342. # filtered_status_result, (et_status - st_status) * 1000))
  343. # if not filtered_status_result:
  344. # return None
  345. # 视频已曝光过滤
  346. st_viewed = time.time()
  347. filtered_viewed_result = self.filter_video_viewed(video_ids=filtered_pre_result)
  348. # print("filtered_pre:", (time.time() - st_viewed) * 1000)
  349. # et_viewed = time.time()
  350. # log_.info({
  351. # 'logTimestamp': int(time.time() * 1000),
  352. # 'pool_type': pool_type,
  353. # 'request_id': self.request_id,
  354. # 'app_type': self.app_type,
  355. # 'mid': self.mid,
  356. # 'uid': self.uid,
  357. # 'operation': 'view_filter',
  358. # 'request_videos': filtered_pre_result,
  359. # 'view_filter_result': filtered_viewed_result,
  360. # 'executeTime': (time.time() - st_viewed) * 1000
  361. # })
  362. if not filtered_viewed_result:
  363. return None
  364. filtered_viewed_videos = [int(video_id) for video_id in filtered_viewed_result]
  365. return filtered_viewed_videos
  366. # if pool_type == 'flow' or pool_type=='normal':
  367. # # 流量池视频需过滤屏蔽视频
  368. # if region_code is None or shield_config is None:
  369. # return filtered_viewed_videos
  370. # else:
  371. # shield_key_name_list = shield_config.get(region_code, None)
  372. # if shield_key_name_list is not None:
  373. # filtered_shield_video_ids = self.filter_shield_video(
  374. # video_ids=filtered_viewed_videos, shield_key_name_list=shield_key_name_list
  375. # )
  376. # log_.info({
  377. # 'logTimestamp': int(time.time() * 1000),
  378. # 'pool_type': pool_type,
  379. # 'request_id': self.request_id,
  380. # 'app_type': self.app_type,
  381. # 'mid': self.mid,
  382. # 'uid': self.uid,
  383. # 'operation': 'shield_filter',
  384. # 'request_videos': filtered_viewed_videos,
  385. # 'shield_filter_result': filtered_shield_video_ids,
  386. # 'executeTime': (time.time() - st_viewed) * 1000
  387. # })
  388. # # print("filtered_pre flow:", (time.time() - st_viewed) * 1000)
  389. # return filtered_shield_video_ids
  390. # else:
  391. # return filtered_viewed_videos
  392. # else:
  393. # return filtered_viewed_videos
  394. def filter_video_previewed(self, video_ids):
  395. """
  396. 预曝光过滤
  397. :param video_ids: 需过滤的视频列表 type-list
  398. :return: filtered_videos 过滤后的列表 type-list
  399. """
  400. pre_time = time.time()
  401. if not self.mid or self.mid == 'null':
  402. # mid为空时,不做预曝光过滤
  403. return video_ids
  404. # 根据Redis缓存中的数据过滤
  405. redis_helper = RedisHelper()
  406. # key拼接
  407. key_name = f"{config_.PREVIEW_KEY_PREFIX}{self.app_type}:{self.mid}"
  408. #print("key_name:", key_name)
  409. pe_videos_list = redis_helper.get_data_from_set(key_name)
  410. #print("pe_videos_list:", pe_videos_list)
  411. # log_.info('****app_type = {}, mid = {}, uid = {}, pe_videos_list = {}'.format(
  412. # self.app_type, self.mid, self.uid, pe_videos_list))
  413. # log_.info('****app_type = {}, mid = {}, uid = {}, video_ids = {}'.format(
  414. # self.app_type, self.mid, self.uid, video_ids))
  415. if not pe_videos_list:
  416. return video_ids
  417. pe_videos = [int(video) for video in pe_videos_list]
  418. #print("pe_videos:", len(pe_videos))
  419. filtered_videos = [video_id for video_id in video_ids if video_id not in pe_videos]
  420. #print(f"pre res: {filtered_videos}\nexecute_time: {(time.time() - pre_time) * 1000}")
  421. return filtered_videos
  422. # def filter_video_status(self, video_ids):
  423. # """
  424. # 对视频状态进行过滤
  425. # :param video_ids: 视频id列表 type-list
  426. # :return: filtered_videos
  427. # """
  428. # if len(video_ids) == 1:
  429. # sql = "set hg_experimental_enable_shard_pruning=off; " \
  430. # "SELECT video_id " \
  431. # "FROM {} " \
  432. # "WHERE audit_status = 5 " \
  433. # "AND applet_rec_status IN (1, -6) " \
  434. # "AND open_status = 1 " \
  435. # "AND payment_status = 0 " \
  436. # "AND encryption_status != 5 " \
  437. # "AND transcoding_status = 3 " \
  438. # "AND video_id IN ({});".format(config_.VIDEO_STATUS, video_ids[0])
  439. # else:
  440. # sql = "set hg_experimental_enable_shard_pruning=off; " \
  441. # "SELECT video_id " \
  442. # "FROM {} " \
  443. # "WHERE audit_status = 5 " \
  444. # "AND applet_rec_status IN (1, -6) " \
  445. # "AND open_status = 1 " \
  446. # "AND payment_status = 0 " \
  447. # "AND encryption_status != 5 " \
  448. # "AND transcoding_status = 3 " \
  449. # "AND video_id IN {};".format(config_.VIDEO_STATUS, tuple(video_ids))
  450. #
  451. # hologres_helper = HologresHelper()
  452. # data = hologres_helper.get_data(sql=sql)
  453. # filtered_videos = [int(temp[0]) for temp in data]
  454. # return filtered_videos
  455. def filter_video_viewed(self, video_ids, types=(1, 6,)):
  456. """
  457. 调用后端接口过滤用户已观看视频
  458. :param video_ids: 视频id列表 type-list
  459. :param types: 过滤参数 type-tuple, 默认(1, )
  460. 1-已观看 2-视频状态 3-是否进入老年人社区 4-话题状态 5-推荐状态 6-白名单过滤 7-涉政视频过滤
  461. :return: filtered_videos
  462. """
  463. # 获取对应端的过滤参数types
  464. types = config_.FILTER_VIEWED_TYPES_CONFIG.get(self.app_type, None)
  465. if types is None:
  466. types = config_.FILTER_VIEWED_TYPES_CONFIG.get('other')
  467. request_data = {"appType": self.app_type,
  468. "mid": self.mid,
  469. "uid": self.uid,
  470. "types": list(types),
  471. "videoIds": video_ids}
  472. # print(request_data)
  473. # 调用http接口
  474. result = request_post(request_url=config_.VIDEO_FILTER_URL, request_data=request_data, timeout=(0.1, 1))
  475. # print("result:", result)
  476. if result is None:
  477. # print("result is None")
  478. # log_.info('过滤失败,types: {}'.format(types))
  479. return []
  480. if result['code'] != 0:
  481. # log_.info('过滤失败,types: {}'.format(types))
  482. return []
  483. filtered_videos = result['data']
  484. return filtered_videos
  485. def filter_video_viewed_new(self, video_ids):
  486. """
  487. 调用后端接口过滤用户已观看视频
  488. :param video_ids: 视频id列表 type-list
  489. :param types: 过滤参数 type-tuple, 默认(1, )
  490. 1-已观看 2-视频状态 3-是否进入老年人社区 4-话题状态 5-推荐状态 6-白名单过滤 7-涉政视频过滤
  491. :return: filtered_videos
  492. """
  493. # 获取对应端的过滤参数types
  494. st_time = time.time()
  495. types = config_.FILTER_VIEWED_TYPES_CONFIG.get(self.app_type, None)
  496. #print(types)
  497. if types is None:
  498. types = config_.FILTER_VIEWED_TYPES_CONFIG.get('other')
  499. if 6 in types:
  500. types = list(types)
  501. types.remove(6)
  502. #print(types)
  503. request_data = {"appType": self.app_type,
  504. "mid": self.mid,
  505. "uid": self.uid,
  506. "types": list(types),
  507. "videoIds": video_ids}
  508. # 调用http接口
  509. result = request_post(request_url=config_.VIDEO_FILTER_URL, request_data=request_data, timeout=(0.1, 1))
  510. #print(f"view res: {result}\nexecute_time: {(time.time() - st_time) * 1000}")
  511. if result is None:
  512. # log_.info('过滤失败,types: {}'.format(types))
  513. return []
  514. if result['code'] != 0:
  515. # log_.info('过滤失败,types: {}'.format(types))
  516. return []
  517. filtered_videos = result['data']
  518. return filtered_videos
  519. def filter_shield_video(self, video_ids, shield_key_name_list):
  520. """
  521. 过滤屏蔽视频视频
  522. :param video_ids: 需过滤的视频列表 type-list
  523. :param shield_key_name_list: 过滤视频 redis-key
  524. :return: filtered_videos 过滤后的列表 type-list
  525. """
  526. # print("filter_shield_video:", len(filter_shield_video))
  527. if len(video_ids) == 0:
  528. return video_ids
  529. # 根据Redis缓存中的数据过滤
  530. redis_helper = RedisHelper()
  531. for shield_key_name in shield_key_name_list:
  532. video_ids = [
  533. int(video_id) for video_id in video_ids
  534. if not redis_helper.data_exists_with_set(key_name=shield_key_name, value=video_id)
  535. ]
  536. # shield_videos_list = redis_helper.get_data_from_set(key_name=shield_key_name)
  537. # if not shield_videos_list:
  538. # continue
  539. # shield_videos = [int(video) for video in shield_videos_list]
  540. # video_ids = [int(video_id) for video_id in video_ids if int(video_id) not in shield_videos]
  541. # print("video_ids:", len(video_ids))
  542. return video_ids
  543. def new_filter_video(self):
  544. """视频过滤"""
  545. # 1. 预曝光过滤
  546. st_pre = time.time()
  547. #print("new_filter video_ids:", self.video_ids)
  548. filtered_pre_result = self.filter_video_previewed(self.video_ids)
  549. if not filtered_pre_result:
  550. return None
  551. # log_.info({
  552. # 'logTimestamp': int(time.time() * 1000),
  553. # 'request_id': self.request_id,
  554. # 'app_type': self.app_type,
  555. # 'mid': self.mid,
  556. # 'uid': self.uid,
  557. # 'operation': 'preview_filter',
  558. # 'request_videos': self.video_ids,
  559. # 'preview_filter_result': filtered_pre_result,
  560. # 'executeTime': (time.time() - st_pre) * 1000
  561. # })
  562. #2. 视频已曝光过滤
  563. st_viewed = time.time()
  564. #print("---filtered viewed---")
  565. #print("filtered_pre_result:",filtered_pre_result)
  566. filtered_viewed_result = self.filter_video_viewed(video_ids=filtered_pre_result)
  567. if not filtered_viewed_result:
  568. return None
  569. return filtered_viewed_result
  570. def new_flow_video(self, vid_list, flow_vids_set, region_code, shield_config):
  571. flow_video_list = []
  572. normal_video_list = []
  573. for v_id in vid_list:
  574. if v_id in flow_vids_set:
  575. flow_video_list.append(v_id)
  576. else:
  577. normal_video_list.append(v_id)
  578. shield_key_name_list = shield_config.get(region_code, None)
  579. if shield_key_name_list is not None:
  580. filtered_shield_video_ids = self.filter_shield_video(
  581. video_ids=flow_video_list, shield_key_name_list=shield_key_name_list
  582. )
  583. return normal_video_list, filtered_shield_video_ids
  584. else:
  585. return normal_video_list, flow_video_list
  586. def filter_movie_religion_video(self, video_ids):
  587. """过滤白名单视频(影视,宗教)"""
  588. # 影视 + 宗教: rov.filter.movie.{videoId}
  589. # 宗教: rov.filter.religion.{videoId}
  590. st_time = time.time()
  591. if self.app_type not in [config_.APP_TYPE['WAN_NENG_VIDEO'],
  592. config_.APP_TYPE['LAO_HAO_KAN_VIDEO'],
  593. config_.APP_TYPE['ZUI_JING_QI'],
  594. config_.APP_TYPE['H5']]:
  595. # 过滤 影视 + 宗教
  596. keys = [f"rov.filter.movie.{video_id}" for video_id in video_ids]
  597. elif self.app_type in [config_.APP_TYPE['WAN_NENG_VIDEO'],
  598. config_.APP_TYPE['ZUI_JING_QI'],
  599. config_.APP_TYPE['H5']]:
  600. # 过滤 影视 + 宗教
  601. keys = [f"rov.filter.religion.{video_id}" for video_id in video_ids]
  602. else:
  603. #print(f"m_r res: {video_ids}\nexecute_time: {(time.time() - st_time) * 1000}")
  604. return video_ids
  605. redis_helper = RedisHelper(redis_info=config_.REDIS_INFO_FILTER)
  606. filter_videos = []
  607. for i in range(len(keys)//1000 + 1):
  608. video_ids_temp = video_ids[i*1000:(i+1)*1000]
  609. if len(video_ids_temp) == 0:
  610. break
  611. mget_res = redis_helper.mget(keys=keys[i*1000:(i+1)*1000])
  612. filter_videos.extend([int(data) for data in mget_res if data is not None])
  613. if len(filter_videos) > 0:
  614. filtered_videos = set(video_ids) - set(filter_videos)
  615. #print(f"m_r res: {list(filtered_videos)}\nexecute_time: {(time.time() - st_time) * 1000}")
  616. return list(filtered_videos)
  617. else:
  618. #print(f"m_r res: {video_ids}\nexecute_time: {(time.time() - st_time) * 1000}")
  619. return video_ids
  620. def filter_videos_new(self, region_code=None, shield_config=None, flow_set=None):
  621. """视频过滤"""
  622. # 预曝光过滤
  623. st_pre = time.time()
  624. #print("self.video_ids:", len(self.video_ids))
  625. filtered_pre_result = self.filter_video_previewed(self.video_ids)
  626. if not filtered_pre_result:
  627. return None
  628. #print("filtered_pre_result:", len(filtered_pre_result))
  629. #print(filtered_pre_result)
  630. # 视频已曝光过滤/白名单过滤
  631. st_viewed = time.time()
  632. t = [
  633. gevent.spawn(self.filter_video_viewed_new, filtered_pre_result),
  634. gevent.spawn(self.filter_movie_religion_video, filtered_pre_result)]
  635. gevent.joinall(t)
  636. filtered_result_list = [i.get() for i in t]
  637. #print("filtered_result_list1:",filtered_result_list[0])
  638. #print("filtered_result_list2:",filtered_result_list[1])
  639. filtered_viewed_set = set('')
  640. for i in filtered_result_list[0]:
  641. filtered_viewed_set.add(int(i))
  642. filter_video_set =set('')
  643. for j in filtered_result_list[1]:
  644. filter_video_set.add(int(j))
  645. filtered_viewed_result = list(filtered_viewed_set & filter_video_set)
  646. #print(f"view&m_r res: {filtered_viewed_result}\nexecute_time: {(time.time() - st_viewed) * 1000}")
  647. #print("filtered:",len(filtered_viewed_result))
  648. if not filtered_viewed_result:
  649. return None
  650. filtered_viewed_videos = [int(video_id) for video_id in filtered_viewed_result]
  651. #print("result:", filtered_viewed_videos)
  652. if flow_set is None:
  653. return filtered_viewed_videos
  654. else:
  655. # 流量池视频需过滤屏蔽视频
  656. if region_code is None or shield_config is None:
  657. return filtered_viewed_videos
  658. else:
  659. normal_recall_ids = []
  660. left_flow_ids = []
  661. for vid in filtered_viewed_videos:
  662. if vid in flow_set:
  663. left_flow_ids.append(vid)
  664. else:
  665. normal_recall_ids.append(vid)
  666. shield_key_name_list = shield_config.get(region_code, None)
  667. if shield_key_name_list is not None:
  668. filtered_shield_video_ids = self.filter_shield_video(
  669. video_ids=left_flow_ids, shield_key_name_list=shield_key_name_list
  670. )
  671. return normal_recall_ids+filtered_shield_video_ids
  672. else:
  673. return filtered_viewed_videos
  674. def filter_videos_status(self, pool_type='rov', region_code=None, shield_config=None):
  675. """视频过滤"""
  676. # todo: 添加app和region的风险过滤。
  677. st_viewed = time.time()
  678. videos_filtered = self.filter_videos_with_risk_video(self.video_ids, self.app_type, region_code)
  679. # log_.info({
  680. # 'logTimestamp': int(time.time() * 1000),
  681. # 'pool_type': "zhangbo-filter-pool_type",
  682. # 'request_id': self.request_id,
  683. # 'app_type': self.app_type,
  684. # 'mid': "zhangbo-filter_videos_status",
  685. # 'uid': self.uid,
  686. # 'operation': 'shield_filter',
  687. # 'request_videos': self.video_ids,
  688. # 'shield_filter_result': videos_filtered,
  689. # 'executeTime': (time.time() - st_viewed) * 1000
  690. # })
  691. # 预曝光过滤
  692. st_pre = time.time()
  693. filtered_pre_result = self.filter_video_previewed(videos_filtered)
  694. # print("filtered_pre:", (time.time()-st_pre)*1000)
  695. # et_pre = time.time()
  696. # log_.info({
  697. # 'logTimestamp': int(time.time() * 1000),
  698. # 'request_id': self.request_id,
  699. # 'app_type': self.app_type,
  700. # 'mid': self.mid,
  701. # 'uid': self.uid,
  702. # 'operation': 'preview_filter',
  703. # 'request_videos': self.video_ids,
  704. # 'preview_filter_result': filtered_pre_result,
  705. # 'executeTime': (time.time() - st_pre) * 1000
  706. # })
  707. if not filtered_pre_result:
  708. return None
  709. # 视频状态过滤采用离线定时过滤方案
  710. # 视频状态过滤
  711. # st_status = time.time()
  712. # filtered_status_result = self.filter_video_status(video_ids=filtered_pre_result)
  713. # et_status = time.time()
  714. # log_.info('filter by video status: result = {}, execute time = {}ms'.format(
  715. # filtered_status_result, (et_status - st_status) * 1000))
  716. # if not filtered_status_result:
  717. # return None
  718. # 视频已曝光过滤
  719. st_viewed = time.time()
  720. filtered_viewed_result = self.filter_video_viewed_status(video_ids=filtered_pre_result)
  721. # print("filtered_pre:", (time.time() - st_viewed) * 1000)
  722. # et_viewed = time.time()
  723. # log_.info({
  724. # 'logTimestamp': int(time.time() * 1000),
  725. # 'pool_type': pool_type,
  726. # 'request_id': self.request_id,
  727. # 'app_type': self.app_type,
  728. # 'mid': self.mid,
  729. # 'uid': self.uid,
  730. # 'operation': 'view_filter',
  731. # 'request_videos': filtered_pre_result,
  732. # 'view_filter_result': filtered_viewed_result,
  733. # 'executeTime': (time.time() - st_viewed) * 1000
  734. # })
  735. if not filtered_viewed_result:
  736. return None
  737. filtered_viewed_videos = [int(video_id) for video_id in filtered_viewed_result]
  738. return filtered_viewed_videos
  739. # if pool_type == 'flow' or pool_type=='normal':
  740. # # 流量池视频需过滤屏蔽视频
  741. # if region_code is None or shield_config is None:
  742. # return filtered_viewed_videos
  743. # else:
  744. # shield_key_name_list = shield_config.get(region_code, None)
  745. # if shield_key_name_list is not None:
  746. # filtered_shield_video_ids = self.filter_shield_video(
  747. # video_ids=filtered_viewed_videos, shield_key_name_list=shield_key_name_list
  748. # )
  749. # log_.info({
  750. # 'logTimestamp': int(time.time() * 1000),
  751. # 'pool_type': pool_type,
  752. # 'request_id': self.request_id,
  753. # 'app_type': self.app_type,
  754. # 'mid': self.mid,
  755. # 'uid': self.uid,
  756. # 'operation': 'shield_filter',
  757. # 'request_videos': filtered_viewed_videos,
  758. # 'shield_filter_result': filtered_shield_video_ids,
  759. # 'executeTime': (time.time() - st_viewed) * 1000
  760. # })
  761. # # print("filtered_pre flow:", (time.time() - st_viewed) * 1000)
  762. # return filtered_shield_video_ids
  763. # else:
  764. # return filtered_viewed_videos
  765. # else:
  766. # return filtered_viewed_videos
  767. def filter_video_viewed_status(self, video_ids, types=(1, 6,)):
  768. """
  769. 调用后端接口过滤用户已观看视频
  770. :param video_ids: 视频id列表 type-list
  771. :param types: 过滤参数 type-tuple, 默认(1, )
  772. 1-已观看 2-视频状态 3-是否进入老年人社区 4-话题状态 5-推荐状态 6-白名单过滤 7-涉政视频过滤
  773. :return: filtered_videos
  774. """
  775. # 获取对应端的过滤参数types
  776. types = config_.FILTER_VIEWED_TYPES_CONFIG.get(self.app_type, None)
  777. if types is None:
  778. types = config_.FILTER_VIEWED_TYPES_CONFIG.get('other')
  779. types = list(types)
  780. types.append(2)
  781. request_data = {"appType": self.app_type,
  782. "mid": self.mid,
  783. "uid": self.uid,
  784. "types": types,
  785. "videoIds": video_ids}
  786. # print(request_data)
  787. # 调用http接口
  788. result = request_post(request_url=config_.VIDEO_FILTER_URL, request_data=request_data, timeout=(0.1, 1))
  789. # print("result:", result)
  790. if result is None:
  791. # print("result is None")
  792. # log_.info('过滤失败,types: {}'.format(types))
  793. return []
  794. if result['code'] != 0:
  795. # log_.info('过滤失败,types: {}'.format(types))
  796. return []
  797. filtered_videos = result['data']
  798. return filtered_videos
  799. def filter_videos_with_risk_video(self, video_ids, app_type, region_code):
  800. # 0 用一个开关控制,是否过滤生效。 便于回滚功能。
  801. risk_filter_flag = self.risk_filter_flag
  802. if not risk_filter_flag:
  803. return video_ids[0: min(20, len(video_ids))]
  804. # 1 判断是否过滤,不展示的app+区域列表,-1必须过滤
  805. app_region_filtered = self.app_region_filtered
  806. if app_type in app_region_filtered.keys():
  807. if_filtered = False
  808. if region_code in app_region_filtered[app_type]:
  809. if_filtered = True
  810. else:
  811. if_filtered = True
  812. if region_code == -1:
  813. if_filtered = True
  814. if not if_filtered:
  815. return video_ids[0: min(20, len(video_ids))]
  816. # 2 确认过滤,获取风险video列表param_update_risk_videos
  817. videos_with_risk = self.videos_with_risk
  818. # 3 过滤 返回结果
  819. video_ids_new = [i for i in video_ids if i not in videos_with_risk]
  820. # print(risk_filter_flag)
  821. # print(app_region_filtered)
  822. # print(video_ids)
  823. # print(app_type)
  824. # print(region_code)
  825. # print(videos_with_risk)
  826. # print(video_ids_new)
  827. # print(len(video_ids))
  828. # print(len(video_ids_new))
  829. return video_ids_new[0: min(20, len(video_ids_new))]
  830. if __name__ == '__main__':
  831. user = [
  832. ('weixin_openid_o0w175fDc8pNnywrYN49E341tKfI', ''),
  833. ('weixin_openid_o0w175YwC3hStzcR5DAQdbgzdMeI', ''),
  834. ('weixin_openid_o0w175ftZDl6VJVDx9la3WVPh7mU', '15900461'),
  835. ('weixin_openid_o0w175SPqpCVRcp7x1XvnX4qpIvI', '19659040'),
  836. ('weixin_openid_o0w175cOnguapyWIrDrHkOWl4oFQ', '31210128'),
  837. ('weixin_openid_o0w175UXYId-o71e1Q3SOheYNteQ', '33099722'),
  838. ('weixin_openid_o0w175QQ5b42AtOe50bchrFgcttA', ''),
  839. ('weixin_openid_o0w175bgaPlfLsp3YLDKWqLWtXX8', '35371534'),
  840. ('weixin_openid_o0w175eRpvbmV6nOhM1VTyyLICWA', '30488803'),
  841. ('weixin_openid_o0w175TZYvG47pQkOjyJFoxQuqsw', '')
  842. ]
  843. video_df = pd.read_csv('./data/videoids.csv')
  844. videoid_list = video_df['videoid'].tolist()
  845. for mid, uid in user:
  846. video_ids = random.sample(videoid_list, 1000)
  847. start_time = time.time()
  848. filter_ = FilterVideos(request_id=f'{mid} - {uid}', app_type=0, mid=mid, uid=uid, video_ids=video_ids)
  849. res = filter_.filter_videos_new()
  850. print(f"res: {res}\nexecute_time: {(time.time() - start_time) * 1000}")
  851. # filter_.filter_video_status(video_ids=[1, 3, 5])
  852. # videos = [{'videoId': 9034659, 'flowPool': '3#11#3#1637824188547'}, {'videoId': 9035052, 'flowPool': '3#11#3#1637824172827'}]
  853. # res = get_videos_remain_view_count(4, videos)
  854. # print(res)
  855. # text = '测试 @李倩'
  856. # send_msg_to_feishu(text)
  857. # update_video_w_h_rate(video_id=113, key_name='')
  858. # mid = "weixin_openid_obHDW5c4g3aULfCWh-68LcUSxCB"
  859. # request_url = f"{config_.GET_USER_30DayReturnCnt_URL}{mid}"
  860. # res = request_get(request_url=request_url, timeout=100)
  861. # res = get_user_has30day_return(mid=mid)
  862. # print(res, type(res))