utils.py 38 KB


  1. import traceback
  2. import requests
  3. import json
  4. import time
  5. import gevent
  6. import pandas as pd
  7. import random
  8. from datetime import datetime
  9. # from db_helper import HologresHelper, RedisHelper, MysqlHelper
  10. from db_helper import RedisHelper, MysqlHelper
  11. from config import set_config
  12. from log import Log
  13. import sys
  14. sys.path.append("parameter_dir/")
  15. from parameter_update import param_update_risk_rule
  16. from parameter_update import param_update_risk_videos
  17. from parameter_update import param_update_risk_filter_flag
  18. config_ = set_config()
  19. log_ = Log()
  20. def send_msg_to_feishu(msg_text):
  21. """发送消息到飞书"""
  22. # webhook地址
  23. webhook = 'https://open.feishu.cn/open-apis/bot/v2/hook/8de4de35-30ed-4692-8854-7a154e89b2f2'
  24. # 自定义关键词key_word
  25. key_word = '服务报警'
  26. headers = {'Content-Type': 'application/json'}
  27. payload_message = {
  28. "msg_type": "text",
  29. "content": {
  30. "text": '{}: {}'.format(key_word, msg_text)
  31. }
  32. }
  33. response = requests.request('POST', url=webhook, headers=headers, data=json.dumps(payload_message))
  34. # print(response.text)
  35. def request_post(request_url, request_data, timeout):
  36. """
  37. post 请求 HTTP接口
  38. :param request_url: 接口URL
  39. :param request_data: 请求参数
  40. :param timeout: 超时时间,单位为秒,type-float or tuple(connect_timeout, read_timeout)
  41. :return: res_data json格式
  42. """
  43. try:
  44. headers = {"Connection": "close"}
  45. #print(request_url)
  46. #print(headers)
  47. response = requests.post(url=request_url, json=request_data, timeout=timeout, headers=headers)
  48. #print("response:", response)
  49. if response.status_code == 200:
  50. res_data = json.loads(response.text)
  51. return res_data
  52. else:
  53. return None
  54. except Exception as e:
  55. #print(e)
  56. log_.error('url: {}, exception: {}, traceback: {}'.format(request_url, e, traceback.format_exc()))
  57. return None
  58. def request_post_data(request_url, request_data, timeout):
  59. """
  60. post 请求 HTTP接口
  61. :param request_url: 接口URL
  62. :param request_data: 请求参数
  63. :param timeout: 超时时间,单位为秒,type-float or tuple(connect_timeout, read_timeout)
  64. :return: res_data json格式
  65. """
  66. try:
  67. headers = {'content-type': 'application/json'}
  68. response = requests.post(url=request_url, data=request_data, timeout=timeout, headers=headers)
  69. #print("response:", response)
  70. if response.status_code == 200:
  71. res_data = json.loads(response.text)
  72. return res_data['outputs']
  73. else:
  74. return None
  75. except Exception as e:
  76. #print(e)
  77. log_.error('url: {}, exception: {}, traceback: {}'.format(request_url, e, traceback.format_exc()))
  78. return None
  79. def request_get(request_url, timeout):
  80. """
  81. get 请求 HTTP接口
  82. :param request_url: 接口URL
  83. :param timeout: 超时时间,单位为秒,type-float or tuple(connect_timeout, read_timeout)
  84. :return: res_data json格式
  85. """
  86. try:
  87. response = requests.get(url=request_url, timeout=timeout)
  88. if response.status_code == 200:
  89. res_data = json.loads(response.text)
  90. return res_data
  91. else:
  92. return None
  93. except Exception as e:
  94. log_.error('url: {}, exception: {}, traceback: {}'.format(request_url, e, traceback.format_exc()))
  95. return None
  96. def get_user_has30day_return(mid):
  97. """
  98. 获取用户近30天是否有回流
  99. :param mid: mid
  100. :return: data, type
  101. """
  102. if not mid:
  103. return None
  104. # 获取redis中存储的状态值
  105. user_key = f"{config_.KEY_NAME_PREFIX_USER_HAS30DAY_RETURN}{mid}"
  106. redis_helper = RedisHelper()
  107. data = redis_helper.get_data_from_redis(key_name=user_key)
  108. if data is not None:
  109. return int(data)
  110. else:
  111. request_url = f"{config_.GET_USER_30DayReturnCnt_URL}{mid}"
  112. result = request_get(request_url=request_url, timeout=0.1)
  113. if result is None:
  114. return None
  115. if result['code'] != 0:
  116. return None
  117. data = result['data']
  118. if data is True:
  119. redis_data = 1
  120. else:
  121. redis_data = 0
  122. redis_helper.set_data_to_redis(key_name=user_key, value=redis_data, expire_time=2 * 3600)
  123. return redis_data
  124. def get_videos_remain_view_count(app_type, videos):
  125. """
  126. 获取视频在流量池中的剩余可分发数
  127. :param app_type: 产品标识 type-int
  128. :param videos: 视频信息 (视频id, 流量池标记) type-list,[{'videoId': video_id, 'flowPool': flow_pool}, ...]
  129. :return: data type-list,[(video_id, flow_pool, view_count), ...]
  130. error_flag 错误标记,True为错误
  131. """
  132. error_flag = False
  133. if not videos:
  134. return [], error_flag
  135. request_data = {'appType': app_type, 'videos': videos}
  136. result = request_post(request_url=config_.GET_REMAIN_VIEW_COUNT_URL, request_data=request_data, timeout=(0.1, 1))
  137. if result is None:
  138. error_flag = True
  139. return [], error_flag
  140. if result['code'] != 0:
  141. log_.info('获取视频在流量池中的剩余可分发数失败')
  142. error_flag = True
  143. return [], error_flag
  144. data = [(item['videoId'], item['flowPool'], item['distributeCount']) for item in result['data']]
  145. return data, error_flag
  146. def get_videos_local_distribute_count(video_id, flow_pool):
  147. """
  148. 获取流量池视频本地分发数
  149. :param video_id: video_id
  150. :param flow_pool: 流量池标记
  151. :return: current_count 本地记录的分发数
  152. """
  153. # redis_h = datetime.now().hour
  154. # if datetime.now().minute >= 30:
  155. # redis_h += 0.5
  156. # key_name = config_.LOCAL_DISTRIBUTE_COUNT_PREFIX + str(redis_h)
  157. key_name = f'{config_.LOCAL_DISTRIBUTE_COUNT_PREFIX}{video_id}:{flow_pool}'
  158. redis_helper = RedisHelper()
  159. # video = '{}-{}'.format(video_id, flow_pool)
  160. # current_count = redis_helper.get_score_with_value(key_name=key_name, value=video)
  161. current_count = redis_helper.get_data_from_redis(key_name=key_name)
  162. if current_count is not None:
  163. return int(current_count)
  164. else:
  165. return None
  166. def update_video_w_h_rate(video_id, key_name):
  167. """
  168. 获取横屏视频的宽高比,并存入redis中 (width/height>1)
  169. :param video_id: videoId type-int
  170. :param key_name: redis key
  171. :return: None
  172. """
  173. # 获取数据
  174. sql = "SELECT id, width, height, rotate FROM longvideo.wx_video WHERE id = {};".format(video_id)
  175. mysql_helper = MysqlHelper()
  176. data = mysql_helper.get_data(sql=sql)
  177. if len(data) == 0:
  178. return
  179. # 更新到redis
  180. width, height, rotate = int(data[0][1]), int(data[0][2]), int(data[0][3])
  181. if width == 0 or height == 0:
  182. return
  183. if rotate in (90, 270):
  184. w_h_rate = height / width
  185. else:
  186. w_h_rate = width / height
  187. if w_h_rate > 1:
  188. info_data = {int(video_id): w_h_rate}
  189. else:
  190. return
  191. redis_helper = RedisHelper()
  192. # 写入新数据
  193. if len(info_data) > 0:
  194. redis_helper.add_data_with_zset(key_name=key_name, data=info_data)
  195. class FilterVideos(object):
  196. """视频过滤"""
  197. def __init__(self, request_id, app_type, video_ids, mid='', uid=''):
  198. """
  199. 初始化
  200. :param request_id: request_id
  201. :param app_type: 产品标识 type-int
  202. :param video_ids: 需过滤的视频列表 type-list
  203. :param mid: mid type-string
  204. :param uid: uid type-string
  205. """
  206. self.request_id = request_id
  207. self.app_type = app_type
  208. self.mid = mid
  209. self.uid = uid
  210. self.video_ids = video_ids
  211. def filter_video_status_h(self, video_ids, rule_key, data_key, ab_code, province_code, key_flag=''):
  212. """召回小时级更新的视频状态过滤"""
  213. # 根据Redis缓存中的数据过滤
  214. redis_helper = RedisHelper()
  215. # 获取不符合推荐状态的视频
  216. if ab_code in [code for _, code in config_.AB_CODE['region_rank_by_h'].items()]:
  217. if key_flag == 'region_24h':
  218. key_prefix = f"{config_.REGION_H_VIDEO_FILER_24H}{province_code}."
  219. elif key_flag == 'day_24h':
  220. key_prefix = f"{config_.H_VIDEO_FILER_24H}{province_code}."
  221. else:
  222. key_prefix = f"{config_.REGION_H_VIDEO_FILER}{province_code}."
  223. elif ab_code in [code for _, code in config_.AB_CODE['rank_by_24h'].items()]:
  224. key_prefix = config_.H_VIDEO_FILER_24H
  225. elif key_flag == '24h':
  226. key_prefix = config_.H_VIDEO_FILER_24H
  227. else:
  228. key_prefix = config_.H_VIDEO_FILER
  229. filter_videos_list = redis_helper.get_data_from_set(
  230. key_name=f"{key_prefix}{self.app_type}.{data_key}.{rule_key}"
  231. )
  232. if not filter_videos_list:
  233. return video_ids
  234. filter_videos = [int(video) for video in filter_videos_list]
  235. filtered_videos = [video_id for video_id in video_ids if video_id not in filter_videos]
  236. return filtered_videos
  237. def filter_videos_h(self, rule_key, data_key, ab_code, province_code, key_flag='', pool_type='rov'):
  238. """召回小时级更新的视频过滤"""
  239. # 预曝光过滤
  240. # st_pre = time.time()
  241. filtered_pre_result = self.filter_video_previewed(self.video_ids)
  242. # et_pre = time.time()
  243. # log_.info({
  244. # 'logTimestamp': int(time.time() * 1000),
  245. # 'request_id': self.request_id,
  246. # 'app_type': self.app_type,
  247. # 'mid': self.mid,
  248. # 'uid': self.uid,
  249. # 'operation': 'preview_filter',
  250. # 'request_videos': self.video_ids,
  251. # 'preview_filter_result': filtered_pre_result,
  252. # 'executeTime': (time.time() - st_pre) * 1000
  253. # })
  254. if not filtered_pre_result:
  255. return None
  256. # 视频状态过滤
  257. # st_status = time.time()
  258. filtered_status_result = self.filter_video_status_h(video_ids=filtered_pre_result, rule_key=rule_key,
  259. data_key=data_key, ab_code=ab_code,
  260. province_code=province_code, key_flag=key_flag)
  261. # et_status = time.time()
  262. # log_.info({
  263. # 'logTimestamp': int(time.time() * 1000),
  264. # 'request_id': self.request_id,
  265. # 'app_type': self.app_type,
  266. # 'mid': self.mid,
  267. # 'uid': self.uid,
  268. # 'operation': 'status_filter',
  269. # 'request_videos': filtered_pre_result,
  270. # 'status_filter_result': filtered_status_result,
  271. # 'executeTime': (time.time() - st_status) * 1000
  272. # })
  273. if not filtered_status_result:
  274. return None
  275. # 视频已曝光过滤
  276. st_viewed = time.time()
  277. filtered_viewed_result = self.filter_video_viewed(video_ids=filtered_status_result)
  278. # et_viewed = time.time()
  279. log_.info({
  280. 'logTimestamp': int(time.time() * 1000),
  281. 'pool_type': pool_type,
  282. 'request_id': self.request_id,
  283. 'app_type': self.app_type,
  284. 'mid': self.mid,
  285. 'uid': self.uid,
  286. 'operation': 'view_filter',
  287. 'request_videos': filtered_status_result,
  288. 'view_filter_result': filtered_viewed_result,
  289. 'executeTime': (time.time() - st_viewed) * 1000
  290. })
  291. if not filtered_viewed_result:
  292. return None
  293. else:
  294. return [int(video_id) for video_id in filtered_viewed_result]
  295. def filter_videos(self, pool_type='rov', region_code=None, shield_config=None):
  296. """视频过滤"""
  297. # todo: 添加app和region的风险过滤。
  298. st_viewed = time.time()
  299. videos_filtered = self.filter_videos_with_risk_video(self.video_ids, self.app_type, region_code)
  300. # log_.info({
  301. # 'logTimestamp': int(time.time() * 1000),
  302. # 'pool_type': "zhangbo-filter-pool_type",
  303. # 'request_id': self.request_id,
  304. # 'app_type': self.app_type,
  305. # 'mid': "zhangbo-filter_videos",
  306. # 'uid': self.uid,
  307. # 'operation': 'shield_filter',
  308. # 'request_videos': self.video_ids,
  309. # 'shield_filter_result': videos_filtered,
  310. # 'executeTime': (time.time() - st_viewed) * 1000
  311. # })
  312. # 预曝光过滤
  313. st_pre = time.time()
  314. filtered_pre_result = self.filter_video_previewed(videos_filtered)
  315. # print("filtered_pre:", (time.time()-st_pre)*1000)
  316. # et_pre = time.time()
  317. # log_.info({
  318. # 'logTimestamp': int(time.time() * 1000),
  319. # 'request_id': self.request_id,
  320. # 'app_type': self.app_type,
  321. # 'mid': self.mid,
  322. # 'uid': self.uid,
  323. # 'operation': 'preview_filter',
  324. # 'request_videos': self.video_ids,
  325. # 'preview_filter_result': filtered_pre_result,
  326. # 'executeTime': (time.time() - st_pre) * 1000
  327. # })
  328. if not filtered_pre_result:
  329. return None
  330. # 视频状态过滤采用离线定时过滤方案
  331. # 视频状态过滤
  332. # st_status = time.time()
  333. # filtered_status_result = self.filter_video_status(video_ids=filtered_pre_result)
  334. # et_status = time.time()
  335. # log_.info('filter by video status: result = {}, execute time = {}ms'.format(
  336. # filtered_status_result, (et_status - st_status) * 1000))
  337. # if not filtered_status_result:
  338. # return None
  339. # 视频已曝光过滤
  340. st_viewed = time.time()
  341. filtered_viewed_result = self.filter_video_viewed(video_ids=filtered_pre_result)
  342. # print("filtered_pre:", (time.time() - st_viewed) * 1000)
  343. # et_viewed = time.time()
  344. # log_.info({
  345. # 'logTimestamp': int(time.time() * 1000),
  346. # 'pool_type': pool_type,
  347. # 'request_id': self.request_id,
  348. # 'app_type': self.app_type,
  349. # 'mid': self.mid,
  350. # 'uid': self.uid,
  351. # 'operation': 'view_filter',
  352. # 'request_videos': filtered_pre_result,
  353. # 'view_filter_result': filtered_viewed_result,
  354. # 'executeTime': (time.time() - st_viewed) * 1000
  355. # })
  356. if not filtered_viewed_result:
  357. return None
  358. filtered_viewed_videos = [int(video_id) for video_id in filtered_viewed_result]
  359. return filtered_viewed_videos
  360. # if pool_type == 'flow' or pool_type=='normal':
  361. # # 流量池视频需过滤屏蔽视频
  362. # if region_code is None or shield_config is None:
  363. # return filtered_viewed_videos
  364. # else:
  365. # shield_key_name_list = shield_config.get(region_code, None)
  366. # if shield_key_name_list is not None:
  367. # filtered_shield_video_ids = self.filter_shield_video(
  368. # video_ids=filtered_viewed_videos, shield_key_name_list=shield_key_name_list
  369. # )
  370. # log_.info({
  371. # 'logTimestamp': int(time.time() * 1000),
  372. # 'pool_type': pool_type,
  373. # 'request_id': self.request_id,
  374. # 'app_type': self.app_type,
  375. # 'mid': self.mid,
  376. # 'uid': self.uid,
  377. # 'operation': 'shield_filter',
  378. # 'request_videos': filtered_viewed_videos,
  379. # 'shield_filter_result': filtered_shield_video_ids,
  380. # 'executeTime': (time.time() - st_viewed) * 1000
  381. # })
  382. # # print("filtered_pre flow:", (time.time() - st_viewed) * 1000)
  383. # return filtered_shield_video_ids
  384. # else:
  385. # return filtered_viewed_videos
  386. # else:
  387. # return filtered_viewed_videos
  388. def filter_video_previewed(self, video_ids):
  389. """
  390. 预曝光过滤
  391. :param video_ids: 需过滤的视频列表 type-list
  392. :return: filtered_videos 过滤后的列表 type-list
  393. """
  394. pre_time = time.time()
  395. if not self.mid or self.mid == 'null':
  396. # mid为空时,不做预曝光过滤
  397. return video_ids
  398. # 根据Redis缓存中的数据过滤
  399. redis_helper = RedisHelper()
  400. # key拼接
  401. key_name = f"{config_.PREVIEW_KEY_PREFIX}{self.app_type}:{self.mid}"
  402. #print("key_name:", key_name)
  403. pe_videos_list = redis_helper.get_data_from_set(key_name)
  404. #print("pe_videos_list:", pe_videos_list)
  405. # log_.info('****app_type = {}, mid = {}, uid = {}, pe_videos_list = {}'.format(
  406. # self.app_type, self.mid, self.uid, pe_videos_list))
  407. # log_.info('****app_type = {}, mid = {}, uid = {}, video_ids = {}'.format(
  408. # self.app_type, self.mid, self.uid, video_ids))
  409. if not pe_videos_list:
  410. return video_ids
  411. pe_videos = [int(video) for video in pe_videos_list]
  412. #print("pe_videos:", len(pe_videos))
  413. filtered_videos = [video_id for video_id in video_ids if video_id not in pe_videos]
  414. #print(f"pre res: {filtered_videos}\nexecute_time: {(time.time() - pre_time) * 1000}")
  415. return filtered_videos
  416. # def filter_video_status(self, video_ids):
  417. # """
  418. # 对视频状态进行过滤
  419. # :param video_ids: 视频id列表 type-list
  420. # :return: filtered_videos
  421. # """
  422. # if len(video_ids) == 1:
  423. # sql = "set hg_experimental_enable_shard_pruning=off; " \
  424. # "SELECT video_id " \
  425. # "FROM {} " \
  426. # "WHERE audit_status = 5 " \
  427. # "AND applet_rec_status IN (1, -6) " \
  428. # "AND open_status = 1 " \
  429. # "AND payment_status = 0 " \
  430. # "AND encryption_status != 5 " \
  431. # "AND transcoding_status = 3 " \
  432. # "AND video_id IN ({});".format(config_.VIDEO_STATUS, video_ids[0])
  433. # else:
  434. # sql = "set hg_experimental_enable_shard_pruning=off; " \
  435. # "SELECT video_id " \
  436. # "FROM {} " \
  437. # "WHERE audit_status = 5 " \
  438. # "AND applet_rec_status IN (1, -6) " \
  439. # "AND open_status = 1 " \
  440. # "AND payment_status = 0 " \
  441. # "AND encryption_status != 5 " \
  442. # "AND transcoding_status = 3 " \
  443. # "AND video_id IN {};".format(config_.VIDEO_STATUS, tuple(video_ids))
  444. #
  445. # hologres_helper = HologresHelper()
  446. # data = hologres_helper.get_data(sql=sql)
  447. # filtered_videos = [int(temp[0]) for temp in data]
  448. # return filtered_videos
  449. def filter_video_viewed(self, video_ids, types=(1, 6,)):
  450. """
  451. 调用后端接口过滤用户已观看视频
  452. :param video_ids: 视频id列表 type-list
  453. :param types: 过滤参数 type-tuple, 默认(1, )
  454. 1-已观看 2-视频状态 3-是否进入老年人社区 4-话题状态 5-推荐状态 6-白名单过滤 7-涉政视频过滤
  455. :return: filtered_videos
  456. """
  457. # 获取对应端的过滤参数types
  458. types = config_.FILTER_VIEWED_TYPES_CONFIG.get(self.app_type, None)
  459. if types is None:
  460. types = config_.FILTER_VIEWED_TYPES_CONFIG.get('other')
  461. request_data = {"appType": self.app_type,
  462. "mid": self.mid,
  463. "uid": self.uid,
  464. "types": list(types),
  465. "videoIds": video_ids}
  466. # print(request_data)
  467. # 调用http接口
  468. result = request_post(request_url=config_.VIDEO_FILTER_URL, request_data=request_data, timeout=(0.1, 1))
  469. # print("result:", result)
  470. if result is None:
  471. # print("result is None")
  472. # log_.info('过滤失败,types: {}'.format(types))
  473. return []
  474. if result['code'] != 0:
  475. # log_.info('过滤失败,types: {}'.format(types))
  476. return []
  477. filtered_videos = result['data']
  478. return filtered_videos
  479. def filter_video_viewed_new(self, video_ids):
  480. """
  481. 调用后端接口过滤用户已观看视频
  482. :param video_ids: 视频id列表 type-list
  483. :param types: 过滤参数 type-tuple, 默认(1, )
  484. 1-已观看 2-视频状态 3-是否进入老年人社区 4-话题状态 5-推荐状态 6-白名单过滤 7-涉政视频过滤
  485. :return: filtered_videos
  486. """
  487. # 获取对应端的过滤参数types
  488. st_time = time.time()
  489. types = config_.FILTER_VIEWED_TYPES_CONFIG.get(self.app_type, None)
  490. #print(types)
  491. if types is None:
  492. types = config_.FILTER_VIEWED_TYPES_CONFIG.get('other')
  493. if 6 in types:
  494. types = list(types)
  495. types.remove(6)
  496. #print(types)
  497. request_data = {"appType": self.app_type,
  498. "mid": self.mid,
  499. "uid": self.uid,
  500. "types": list(types),
  501. "videoIds": video_ids}
  502. # 调用http接口
  503. result = request_post(request_url=config_.VIDEO_FILTER_URL, request_data=request_data, timeout=(0.1, 1))
  504. #print(f"view res: {result}\nexecute_time: {(time.time() - st_time) * 1000}")
  505. if result is None:
  506. # log_.info('过滤失败,types: {}'.format(types))
  507. return []
  508. if result['code'] != 0:
  509. # log_.info('过滤失败,types: {}'.format(types))
  510. return []
  511. filtered_videos = result['data']
  512. return filtered_videos
  513. def filter_shield_video(self, video_ids, shield_key_name_list):
  514. """
  515. 过滤屏蔽视频视频
  516. :param video_ids: 需过滤的视频列表 type-list
  517. :param shield_key_name_list: 过滤视频 redis-key
  518. :return: filtered_videos 过滤后的列表 type-list
  519. """
  520. # print("filter_shield_video:", len(filter_shield_video))
  521. if len(video_ids) == 0:
  522. return video_ids
  523. # 根据Redis缓存中的数据过滤
  524. redis_helper = RedisHelper()
  525. for shield_key_name in shield_key_name_list:
  526. video_ids = [
  527. int(video_id) for video_id in video_ids
  528. if not redis_helper.data_exists_with_set(key_name=shield_key_name, value=video_id)
  529. ]
  530. # shield_videos_list = redis_helper.get_data_from_set(key_name=shield_key_name)
  531. # if not shield_videos_list:
  532. # continue
  533. # shield_videos = [int(video) for video in shield_videos_list]
  534. # video_ids = [int(video_id) for video_id in video_ids if int(video_id) not in shield_videos]
  535. # print("video_ids:", len(video_ids))
  536. return video_ids
  537. def new_filter_video(self):
  538. """视频过滤"""
  539. # 1. 预曝光过滤
  540. st_pre = time.time()
  541. #print("new_filter video_ids:", self.video_ids)
  542. filtered_pre_result = self.filter_video_previewed(self.video_ids)
  543. if not filtered_pre_result:
  544. return None
  545. # log_.info({
  546. # 'logTimestamp': int(time.time() * 1000),
  547. # 'request_id': self.request_id,
  548. # 'app_type': self.app_type,
  549. # 'mid': self.mid,
  550. # 'uid': self.uid,
  551. # 'operation': 'preview_filter',
  552. # 'request_videos': self.video_ids,
  553. # 'preview_filter_result': filtered_pre_result,
  554. # 'executeTime': (time.time() - st_pre) * 1000
  555. # })
  556. #2. 视频已曝光过滤
  557. st_viewed = time.time()
  558. #print("---filtered viewed---")
  559. #print("filtered_pre_result:",filtered_pre_result)
  560. filtered_viewed_result = self.filter_video_viewed(video_ids=filtered_pre_result)
  561. if not filtered_viewed_result:
  562. return None
  563. return filtered_viewed_result
  564. def new_flow_video(self, vid_list, flow_vids_set, region_code, shield_config):
  565. flow_video_list = []
  566. normal_video_list = []
  567. for v_id in vid_list:
  568. if v_id in flow_vids_set:
  569. flow_video_list.append(v_id)
  570. else:
  571. normal_video_list.append(v_id)
  572. shield_key_name_list = shield_config.get(region_code, None)
  573. if shield_key_name_list is not None:
  574. filtered_shield_video_ids = self.filter_shield_video(
  575. video_ids=flow_video_list, shield_key_name_list=shield_key_name_list
  576. )
  577. return normal_video_list, filtered_shield_video_ids
  578. else:
  579. return normal_video_list, flow_video_list
  580. def filter_movie_religion_video(self, video_ids):
  581. """过滤白名单视频(影视,宗教)"""
  582. # 影视 + 宗教: rov.filter.movie.{videoId}
  583. # 宗教: rov.filter.religion.{videoId}
  584. st_time = time.time()
  585. if self.app_type not in [config_.APP_TYPE['WAN_NENG_VIDEO'],
  586. config_.APP_TYPE['LAO_HAO_KAN_VIDEO'],
  587. config_.APP_TYPE['ZUI_JING_QI'],
  588. config_.APP_TYPE['H5']]:
  589. # 过滤 影视 + 宗教
  590. keys = [f"rov.filter.movie.{video_id}" for video_id in video_ids]
  591. elif self.app_type in [config_.APP_TYPE['WAN_NENG_VIDEO'],
  592. config_.APP_TYPE['ZUI_JING_QI'],
  593. config_.APP_TYPE['H5']]:
  594. # 过滤 影视 + 宗教
  595. keys = [f"rov.filter.religion.{video_id}" for video_id in video_ids]
  596. else:
  597. #print(f"m_r res: {video_ids}\nexecute_time: {(time.time() - st_time) * 1000}")
  598. return video_ids
  599. redis_helper = RedisHelper(redis_info=config_.REDIS_INFO_FILTER)
  600. filter_videos = []
  601. for i in range(len(keys)//1000 + 1):
  602. video_ids_temp = video_ids[i*1000:(i+1)*1000]
  603. if len(video_ids_temp) == 0:
  604. break
  605. mget_res = redis_helper.mget(keys=keys[i*1000:(i+1)*1000])
  606. filter_videos.extend([int(data) for data in mget_res if data is not None])
  607. if len(filter_videos) > 0:
  608. filtered_videos = set(video_ids) - set(filter_videos)
  609. #print(f"m_r res: {list(filtered_videos)}\nexecute_time: {(time.time() - st_time) * 1000}")
  610. return list(filtered_videos)
  611. else:
  612. #print(f"m_r res: {video_ids}\nexecute_time: {(time.time() - st_time) * 1000}")
  613. return video_ids
  614. def filter_videos_new(self, region_code=None, shield_config=None, flow_set=None):
  615. """视频过滤"""
  616. # 预曝光过滤
  617. st_pre = time.time()
  618. #print("self.video_ids:", len(self.video_ids))
  619. filtered_pre_result = self.filter_video_previewed(self.video_ids)
  620. if not filtered_pre_result:
  621. return None
  622. #print("filtered_pre_result:", len(filtered_pre_result))
  623. #print(filtered_pre_result)
  624. # 视频已曝光过滤/白名单过滤
  625. st_viewed = time.time()
  626. t = [
  627. gevent.spawn(self.filter_video_viewed_new, filtered_pre_result),
  628. gevent.spawn(self.filter_movie_religion_video, filtered_pre_result)]
  629. gevent.joinall(t)
  630. filtered_result_list = [i.get() for i in t]
  631. #print("filtered_result_list1:",filtered_result_list[0])
  632. #print("filtered_result_list2:",filtered_result_list[1])
  633. filtered_viewed_set = set('')
  634. for i in filtered_result_list[0]:
  635. filtered_viewed_set.add(int(i))
  636. filter_video_set =set('')
  637. for j in filtered_result_list[1]:
  638. filter_video_set.add(int(j))
  639. filtered_viewed_result = list(filtered_viewed_set & filter_video_set)
  640. #print(f"view&m_r res: {filtered_viewed_result}\nexecute_time: {(time.time() - st_viewed) * 1000}")
  641. #print("filtered:",len(filtered_viewed_result))
  642. if not filtered_viewed_result:
  643. return None
  644. filtered_viewed_videos = [int(video_id) for video_id in filtered_viewed_result]
  645. #print("result:", filtered_viewed_videos)
  646. if flow_set is None:
  647. return filtered_viewed_videos
  648. else:
  649. # 流量池视频需过滤屏蔽视频
  650. if region_code is None or shield_config is None:
  651. return filtered_viewed_videos
  652. else:
  653. normal_recall_ids = []
  654. left_flow_ids = []
  655. for vid in filtered_viewed_videos:
  656. if vid in flow_set:
  657. left_flow_ids.append(vid)
  658. else:
  659. normal_recall_ids.append(vid)
  660. shield_key_name_list = shield_config.get(region_code, None)
  661. if shield_key_name_list is not None:
  662. filtered_shield_video_ids = self.filter_shield_video(
  663. video_ids=left_flow_ids, shield_key_name_list=shield_key_name_list
  664. )
  665. return normal_recall_ids+filtered_shield_video_ids
  666. else:
  667. return filtered_viewed_videos
  668. def filter_videos_status(self, pool_type='rov', region_code=None, shield_config=None):
  669. """视频过滤"""
  670. # todo: 添加app和region的风险过滤。
  671. st_viewed = time.time()
  672. videos_filtered = self.filter_videos_with_risk_video(self.video_ids, self.app_type, region_code)
  673. # log_.info({
  674. # 'logTimestamp': int(time.time() * 1000),
  675. # 'pool_type': "zhangbo-filter-pool_type",
  676. # 'request_id': self.request_id,
  677. # 'app_type': self.app_type,
  678. # 'mid': "zhangbo-filter_videos_status",
  679. # 'uid': self.uid,
  680. # 'operation': 'shield_filter',
  681. # 'request_videos': self.video_ids,
  682. # 'shield_filter_result': videos_filtered,
  683. # 'executeTime': (time.time() - st_viewed) * 1000
  684. # })
  685. # 预曝光过滤
  686. st_pre = time.time()
  687. filtered_pre_result = self.filter_video_previewed(videos_filtered)
  688. # print("filtered_pre:", (time.time()-st_pre)*1000)
  689. # et_pre = time.time()
  690. # log_.info({
  691. # 'logTimestamp': int(time.time() * 1000),
  692. # 'request_id': self.request_id,
  693. # 'app_type': self.app_type,
  694. # 'mid': self.mid,
  695. # 'uid': self.uid,
  696. # 'operation': 'preview_filter',
  697. # 'request_videos': self.video_ids,
  698. # 'preview_filter_result': filtered_pre_result,
  699. # 'executeTime': (time.time() - st_pre) * 1000
  700. # })
  701. if not filtered_pre_result:
  702. return None
  703. # 视频状态过滤采用离线定时过滤方案
  704. # 视频状态过滤
  705. # st_status = time.time()
  706. # filtered_status_result = self.filter_video_status(video_ids=filtered_pre_result)
  707. # et_status = time.time()
  708. # log_.info('filter by video status: result = {}, execute time = {}ms'.format(
  709. # filtered_status_result, (et_status - st_status) * 1000))
  710. # if not filtered_status_result:
  711. # return None
  712. # 视频已曝光过滤
  713. st_viewed = time.time()
  714. filtered_viewed_result = self.filter_video_viewed_status(video_ids=filtered_pre_result)
  715. # print("filtered_pre:", (time.time() - st_viewed) * 1000)
  716. # et_viewed = time.time()
  717. # log_.info({
  718. # 'logTimestamp': int(time.time() * 1000),
  719. # 'pool_type': pool_type,
  720. # 'request_id': self.request_id,
  721. # 'app_type': self.app_type,
  722. # 'mid': self.mid,
  723. # 'uid': self.uid,
  724. # 'operation': 'view_filter',
  725. # 'request_videos': filtered_pre_result,
  726. # 'view_filter_result': filtered_viewed_result,
  727. # 'executeTime': (time.time() - st_viewed) * 1000
  728. # })
  729. if not filtered_viewed_result:
  730. return None
  731. filtered_viewed_videos = [int(video_id) for video_id in filtered_viewed_result]
  732. return filtered_viewed_videos
  733. # if pool_type == 'flow' or pool_type=='normal':
  734. # # 流量池视频需过滤屏蔽视频
  735. # if region_code is None or shield_config is None:
  736. # return filtered_viewed_videos
  737. # else:
  738. # shield_key_name_list = shield_config.get(region_code, None)
  739. # if shield_key_name_list is not None:
  740. # filtered_shield_video_ids = self.filter_shield_video(
  741. # video_ids=filtered_viewed_videos, shield_key_name_list=shield_key_name_list
  742. # )
  743. # log_.info({
  744. # 'logTimestamp': int(time.time() * 1000),
  745. # 'pool_type': pool_type,
  746. # 'request_id': self.request_id,
  747. # 'app_type': self.app_type,
  748. # 'mid': self.mid,
  749. # 'uid': self.uid,
  750. # 'operation': 'shield_filter',
  751. # 'request_videos': filtered_viewed_videos,
  752. # 'shield_filter_result': filtered_shield_video_ids,
  753. # 'executeTime': (time.time() - st_viewed) * 1000
  754. # })
  755. # # print("filtered_pre flow:", (time.time() - st_viewed) * 1000)
  756. # return filtered_shield_video_ids
  757. # else:
  758. # return filtered_viewed_videos
  759. # else:
  760. # return filtered_viewed_videos
  761. def filter_video_viewed_status(self, video_ids, types=(1, 6,)):
  762. """
  763. 调用后端接口过滤用户已观看视频
  764. :param video_ids: 视频id列表 type-list
  765. :param types: 过滤参数 type-tuple, 默认(1, )
  766. 1-已观看 2-视频状态 3-是否进入老年人社区 4-话题状态 5-推荐状态 6-白名单过滤 7-涉政视频过滤
  767. :return: filtered_videos
  768. """
  769. # 获取对应端的过滤参数types
  770. types = config_.FILTER_VIEWED_TYPES_CONFIG.get(self.app_type, None)
  771. if types is None:
  772. types = config_.FILTER_VIEWED_TYPES_CONFIG.get('other')
  773. types = list(types)
  774. types.append(2)
  775. request_data = {"appType": self.app_type,
  776. "mid": self.mid,
  777. "uid": self.uid,
  778. "types": types,
  779. "videoIds": video_ids}
  780. # print(request_data)
  781. # 调用http接口
  782. result = request_post(request_url=config_.VIDEO_FILTER_URL, request_data=request_data, timeout=(0.1, 1))
  783. # print("result:", result)
  784. if result is None:
  785. # print("result is None")
  786. # log_.info('过滤失败,types: {}'.format(types))
  787. return []
  788. if result['code'] != 0:
  789. # log_.info('过滤失败,types: {}'.format(types))
  790. return []
  791. filtered_videos = result['data']
  792. return filtered_videos
  793. def filter_videos_with_risk_video(self, video_ids, app_type, region_code):
  794. # 0 用一个开关控制,是否过滤生效。 便于回滚功能。
  795. flag = param_update_risk_filter_flag()
  796. if not flag:
  797. return video_ids[0: min(20, len(video_ids))]
  798. # 1 判断是否过滤,不展示的app+区域列表,-1必须过滤
  799. app_region_filtered = param_update_risk_rule()
  800. if app_type in app_region_filtered.keys():
  801. if_filtered = False
  802. if region_code in app_region_filtered[app_type]:
  803. if_filtered = True
  804. else:
  805. if_filtered = True
  806. if region_code == -1:
  807. if_filtered = True
  808. if not if_filtered:
  809. return video_ids[0: min(20, len(video_ids))]
  810. # 2 确认过滤,获取风险video列表param_update_risk_videos
  811. videos_with_risk = param_update_risk_videos()
  812. # 3 过滤 返回结果
  813. video_ids_new = [i for i in video_ids if i not in videos_with_risk]
  814. # print(flag)
  815. # print(app_region_filtered)
  816. # print(video_ids)
  817. # print(app_type)
  818. # print(region_code)
  819. # print(videos_with_risk)
  820. # print(video_ids_new)
  821. # print(len(video_ids))
  822. # print(len(video_ids_new))
  823. return video_ids_new[0: min(20, len(video_ids_new))]
  824. if __name__ == '__main__':
  825. user = [
  826. ('weixin_openid_o0w175fDc8pNnywrYN49E341tKfI', ''),
  827. ('weixin_openid_o0w175YwC3hStzcR5DAQdbgzdMeI', ''),
  828. ('weixin_openid_o0w175ftZDl6VJVDx9la3WVPh7mU', '15900461'),
  829. ('weixin_openid_o0w175SPqpCVRcp7x1XvnX4qpIvI', '19659040'),
  830. ('weixin_openid_o0w175cOnguapyWIrDrHkOWl4oFQ', '31210128'),
  831. ('weixin_openid_o0w175UXYId-o71e1Q3SOheYNteQ', '33099722'),
  832. ('weixin_openid_o0w175QQ5b42AtOe50bchrFgcttA', ''),
  833. ('weixin_openid_o0w175bgaPlfLsp3YLDKWqLWtXX8', '35371534'),
  834. ('weixin_openid_o0w175eRpvbmV6nOhM1VTyyLICWA', '30488803'),
  835. ('weixin_openid_o0w175TZYvG47pQkOjyJFoxQuqsw', '')
  836. ]
  837. video_df = pd.read_csv('./data/videoids.csv')
  838. videoid_list = video_df['videoid'].tolist()
  839. for mid, uid in user:
  840. video_ids = random.sample(videoid_list, 1000)
  841. start_time = time.time()
  842. filter_ = FilterVideos(request_id=f'{mid} - {uid}', app_type=0, mid=mid, uid=uid, video_ids=video_ids)
  843. res = filter_.filter_videos_new()
  844. print(f"res: {res}\nexecute_time: {(time.time() - start_time) * 1000}")
  845. # filter_.filter_video_status(video_ids=[1, 3, 5])
  846. # videos = [{'videoId': 9034659, 'flowPool': '3#11#3#1637824188547'}, {'videoId': 9035052, 'flowPool': '3#11#3#1637824172827'}]
  847. # res = get_videos_remain_view_count(4, videos)
  848. # print(res)
  849. # text = '测试 @李倩'
  850. # send_msg_to_feishu(text)
  851. # update_video_w_h_rate(video_id=113, key_name='')
  852. # mid = "weixin_openid_obHDW5c4g3aULfCWh-68LcUSxCB"
  853. # request_url = f"{config_.GET_USER_30DayReturnCnt_URL}{mid}"
  854. # res = request_get(request_url=request_url, timeout=100)
  855. # res = get_user_has30day_return(mid=mid)
  856. # print(res, type(res))