utils.py 36 KB


  1. import traceback
  2. import requests
  3. import json
  4. import time
  5. import gevent
  6. import pandas as pd
  7. import random
  8. import copy
  9. from datetime import datetime
  10. # from db_helper import HologresHelper, RedisHelper, MysqlHelper
  11. from db_helper import RedisHelper, MysqlHelper
  12. from config import set_config
  13. from log import Log
  14. config_ = set_config()
  15. log_ = Log()
  16. def send_msg_to_feishu(msg_text):
  17. """发送消息到飞书"""
  18. # webhook地址
  19. webhook = 'https://open.feishu.cn/open-apis/bot/v2/hook/8de4de35-30ed-4692-8854-7a154e89b2f2'
  20. # 自定义关键词key_word
  21. key_word = '服务报警'
  22. headers = {'Content-Type': 'application/json'}
  23. payload_message = {
  24. "msg_type": "text",
  25. "content": {
  26. "text": '{}: {}'.format(key_word, msg_text)
  27. }
  28. }
  29. response = requests.request('POST', url=webhook, headers=headers, data=json.dumps(payload_message))
  30. # print(response.text)
  31. def request_post(request_url, request_data, timeout):
  32. """
  33. post 请求 HTTP接口
  34. :param request_url: 接口URL
  35. :param request_data: 请求参数
  36. :param timeout: 超时时间,单位为秒,type-float or tuple(connect_timeout, read_timeout)
  37. :return: res_data json格式
  38. """
  39. try:
  40. headers = {"Connection": "close"}
  41. #print(request_url)
  42. #print(headers)
  43. response = requests.post(url=request_url, json=request_data, timeout=timeout, headers=headers)
  44. #print("response:", response)
  45. if response.status_code == 200:
  46. res_data = json.loads(response.text)
  47. return res_data
  48. else:
  49. return None
  50. except Exception as e:
  51. #print(e)
  52. log_.error('url: {}, exception: {}, traceback: {}'.format(request_url, e, traceback.format_exc()))
  53. return None
  54. def request_post_data(request_url, request_data, timeout):
  55. """
  56. post 请求 HTTP接口
  57. :param request_url: 接口URL
  58. :param request_data: 请求参数
  59. :param timeout: 超时时间,单位为秒,type-float or tuple(connect_timeout, read_timeout)
  60. :return: res_data json格式
  61. """
  62. try:
  63. headers = {'content-type': 'application/json'}
  64. response = requests.post(url=request_url, data=request_data, timeout=timeout, headers=headers)
  65. #print("response:", response)
  66. if response.status_code == 200:
  67. res_data = json.loads(response.text)
  68. return res_data['outputs']
  69. else:
  70. return None
  71. except Exception as e:
  72. #print(e)
  73. log_.error('url: {}, exception: {}, traceback: {}'.format(request_url, e, traceback.format_exc()))
  74. return None
  75. def request_get(request_url, timeout):
  76. """
  77. get 请求 HTTP接口
  78. :param request_url: 接口URL
  79. :param timeout: 超时时间,单位为秒,type-float or tuple(connect_timeout, read_timeout)
  80. :return: res_data json格式
  81. """
  82. try:
  83. response = requests.get(url=request_url, timeout=timeout)
  84. if response.status_code == 200:
  85. res_data = json.loads(response.text)
  86. return res_data
  87. else:
  88. return None
  89. except Exception as e:
  90. log_.error('url: {}, exception: {}, traceback: {}'.format(request_url, e, traceback.format_exc()))
  91. return None
  92. def get_user_has30day_return(mid):
  93. """
  94. 获取用户近30天是否有回流
  95. :param mid: mid
  96. :return: data, type
  97. """
  98. if not mid:
  99. return None
  100. # 获取redis中存储的状态值
  101. user_key = f"{config_.KEY_NAME_PREFIX_USER_HAS30DAY_RETURN}{mid}"
  102. redis_helper = RedisHelper()
  103. data = redis_helper.get_data_from_redis(key_name=user_key)
  104. if data is not None:
  105. return int(data)
  106. else:
  107. request_url = f"{config_.GET_USER_30DayReturnCnt_URL}{mid}"
  108. result = request_get(request_url=request_url, timeout=0.1)
  109. if result is None:
  110. return None
  111. if result['code'] != 0:
  112. return None
  113. data = result['data']
  114. if data is True:
  115. redis_data = 1
  116. else:
  117. redis_data = 0
  118. redis_helper.set_data_to_redis(key_name=user_key, value=redis_data, expire_time=2 * 3600)
  119. return redis_data
  120. def get_videos_remain_view_count(app_type, videos):
  121. """
  122. 获取视频在流量池中的剩余可分发数
  123. :param app_type: 产品标识 type-int
  124. :param videos: 视频信息 (视频id, 流量池标记) type-list,[{'videoId': video_id, 'flowPool': flow_pool}, ...]
  125. :return: data type-list,[(video_id, flow_pool, view_count), ...]
  126. error_flag 错误标记,True为错误
  127. """
  128. error_flag = False
  129. if not videos:
  130. return [], error_flag
  131. request_data = {'appType': app_type, 'videos': videos}
  132. result = request_post(request_url=config_.GET_REMAIN_VIEW_COUNT_URL, request_data=request_data, timeout=(0.1, 1))
  133. if result is None:
  134. error_flag = True
  135. return [], error_flag
  136. if result['code'] != 0:
  137. log_.info('获取视频在流量池中的剩余可分发数失败')
  138. error_flag = True
  139. return [], error_flag
  140. data = [(item['videoId'], item['flowPool'], item['distributeCount']) for item in result['data']]
  141. return data, error_flag
  142. def get_videos_local_distribute_count(video_id, flow_pool):
  143. """
  144. 获取流量池视频本地分发数
  145. :param video_id: video_id
  146. :param flow_pool: 流量池标记
  147. :return: current_count 本地记录的分发数
  148. """
  149. # redis_h = datetime.now().hour
  150. # if datetime.now().minute >= 30:
  151. # redis_h += 0.5
  152. # key_name = config_.LOCAL_DISTRIBUTE_COUNT_PREFIX + str(redis_h)
  153. key_name = f'{config_.LOCAL_DISTRIBUTE_COUNT_PREFIX}{video_id}:{flow_pool}'
  154. redis_helper = RedisHelper()
  155. # video = '{}-{}'.format(video_id, flow_pool)
  156. # current_count = redis_helper.get_score_with_value(key_name=key_name, value=video)
  157. current_count = redis_helper.get_data_from_redis(key_name=key_name)
  158. if current_count is not None:
  159. return int(current_count)
  160. else:
  161. return None
  162. def update_video_w_h_rate(video_id, key_name):
  163. """
  164. 获取横屏视频的宽高比,并存入redis中 (width/height>1)
  165. :param video_id: videoId type-int
  166. :param key_name: redis key
  167. :return: None
  168. """
  169. # 获取数据
  170. sql = "SELECT id, width, height, rotate FROM longvideo.wx_video WHERE id = {};".format(video_id)
  171. mysql_helper = MysqlHelper()
  172. data = mysql_helper.get_data(sql=sql)
  173. if len(data) == 0:
  174. return
  175. # 更新到redis
  176. width, height, rotate = int(data[0][1]), int(data[0][2]), int(data[0][3])
  177. if width == 0 or height == 0:
  178. return
  179. if rotate in (90, 270):
  180. w_h_rate = height / width
  181. else:
  182. w_h_rate = width / height
  183. if w_h_rate > 1:
  184. info_data = {int(video_id): w_h_rate}
  185. else:
  186. return
  187. redis_helper = RedisHelper()
  188. # 写入新数据
  189. if len(info_data) > 0:
  190. redis_helper.add_data_with_zset(key_name=key_name, data=info_data)
  191. class FilterVideos(object):
  192. """视频过滤"""
  193. def __init__(self, request_id, app_type, video_ids, mid='', uid=''):
  194. """
  195. 初始化
  196. :param request_id: request_id
  197. :param app_type: 产品标识 type-int
  198. :param video_ids: 需过滤的视频列表 type-list
  199. :param mid: mid type-string
  200. :param uid: uid type-string
  201. """
  202. self.request_id = request_id
  203. self.app_type = app_type
  204. self.mid = mid
  205. self.uid = uid
  206. self.video_ids = video_ids
  207. def filter_video_status_h(self, video_ids, rule_key, data_key, ab_code, province_code, key_flag=''):
  208. """召回小时级更新的视频状态过滤"""
  209. # 根据Redis缓存中的数据过滤
  210. redis_helper = RedisHelper()
  211. # 获取不符合推荐状态的视频
  212. if ab_code in [code for _, code in config_.AB_CODE['region_rank_by_h'].items()]:
  213. if key_flag == 'region_24h':
  214. key_prefix = f"{config_.REGION_H_VIDEO_FILER_24H}{province_code}."
  215. elif key_flag == 'day_24h':
  216. key_prefix = f"{config_.H_VIDEO_FILER_24H}{province_code}."
  217. else:
  218. key_prefix = f"{config_.REGION_H_VIDEO_FILER}{province_code}."
  219. elif ab_code in [code for _, code in config_.AB_CODE['rank_by_24h'].items()]:
  220. key_prefix = config_.H_VIDEO_FILER_24H
  221. elif key_flag == '24h':
  222. key_prefix = config_.H_VIDEO_FILER_24H
  223. else:
  224. key_prefix = config_.H_VIDEO_FILER
  225. filter_videos_list = redis_helper.get_data_from_set(
  226. key_name=f"{key_prefix}{self.app_type}.{data_key}.{rule_key}"
  227. )
  228. if not filter_videos_list:
  229. return video_ids
  230. filter_videos = [int(video) for video in filter_videos_list]
  231. filtered_videos = [video_id for video_id in video_ids if video_id not in filter_videos]
  232. return filtered_videos
  233. def filter_videos_h(self, rule_key, data_key, ab_code, province_code, key_flag='', pool_type='rov'):
  234. """召回小时级更新的视频过滤"""
  235. # 预曝光过滤
  236. # st_pre = time.time()
  237. filtered_pre_result = self.filter_video_previewed(self.video_ids)
  238. # et_pre = time.time()
  239. # log_.info({
  240. # 'logTimestamp': int(time.time() * 1000),
  241. # 'request_id': self.request_id,
  242. # 'app_type': self.app_type,
  243. # 'mid': self.mid,
  244. # 'uid': self.uid,
  245. # 'operation': 'preview_filter',
  246. # 'request_videos': self.video_ids,
  247. # 'preview_filter_result': filtered_pre_result,
  248. # 'executeTime': (time.time() - st_pre) * 1000
  249. # })
  250. if not filtered_pre_result:
  251. return None
  252. # 视频状态过滤
  253. # st_status = time.time()
  254. filtered_status_result = self.filter_video_status_h(video_ids=filtered_pre_result, rule_key=rule_key,
  255. data_key=data_key, ab_code=ab_code,
  256. province_code=province_code, key_flag=key_flag)
  257. # et_status = time.time()
  258. # log_.info({
  259. # 'logTimestamp': int(time.time() * 1000),
  260. # 'request_id': self.request_id,
  261. # 'app_type': self.app_type,
  262. # 'mid': self.mid,
  263. # 'uid': self.uid,
  264. # 'operation': 'status_filter',
  265. # 'request_videos': filtered_pre_result,
  266. # 'status_filter_result': filtered_status_result,
  267. # 'executeTime': (time.time() - st_status) * 1000
  268. # })
  269. if not filtered_status_result:
  270. return None
  271. # 视频已曝光过滤
  272. st_viewed = time.time()
  273. filtered_viewed_result = self.filter_video_viewed(video_ids=filtered_status_result)
  274. # et_viewed = time.time()
  275. log_.info({
  276. 'logTimestamp': int(time.time() * 1000),
  277. 'pool_type': pool_type,
  278. 'request_id': self.request_id,
  279. 'app_type': self.app_type,
  280. 'mid': self.mid,
  281. 'uid': self.uid,
  282. 'operation': 'view_filter',
  283. 'request_videos': filtered_status_result,
  284. 'view_filter_result': filtered_viewed_result,
  285. 'executeTime': (time.time() - st_viewed) * 1000
  286. })
  287. if not filtered_viewed_result:
  288. return None
  289. else:
  290. return [int(video_id) for video_id in filtered_viewed_result]
  291. def filter_videos(self, pool_type='rov', region_code=None, shield_config=None):
  292. """视频过滤"""
  293. # todo: 添加app和region的风险过滤。
  294. videos_filtered = self.filter_videos_with_risk_video(self.video_ids, self.app_type, region_code)
  295. # 预曝光过滤
  296. st_pre = time.time()
  297. filtered_pre_result = self.filter_video_previewed(videos_filtered)
  298. # print("filtered_pre:", (time.time()-st_pre)*1000)
  299. # et_pre = time.time()
  300. # log_.info({
  301. # 'logTimestamp': int(time.time() * 1000),
  302. # 'request_id': self.request_id,
  303. # 'app_type': self.app_type,
  304. # 'mid': self.mid,
  305. # 'uid': self.uid,
  306. # 'operation': 'preview_filter',
  307. # 'request_videos': self.video_ids,
  308. # 'preview_filter_result': filtered_pre_result,
  309. # 'executeTime': (time.time() - st_pre) * 1000
  310. # })
  311. if not filtered_pre_result:
  312. return None
  313. # 视频状态过滤采用离线定时过滤方案
  314. # 视频状态过滤
  315. # st_status = time.time()
  316. # filtered_status_result = self.filter_video_status(video_ids=filtered_pre_result)
  317. # et_status = time.time()
  318. # log_.info('filter by video status: result = {}, execute time = {}ms'.format(
  319. # filtered_status_result, (et_status - st_status) * 1000))
  320. # if not filtered_status_result:
  321. # return None
  322. # 视频已曝光过滤
  323. st_viewed = time.time()
  324. filtered_viewed_result = self.filter_video_viewed(video_ids=filtered_pre_result)
  325. # print("filtered_pre:", (time.time() - st_viewed) * 1000)
  326. # et_viewed = time.time()
  327. # log_.info({
  328. # 'logTimestamp': int(time.time() * 1000),
  329. # 'pool_type': pool_type,
  330. # 'request_id': self.request_id,
  331. # 'app_type': self.app_type,
  332. # 'mid': self.mid,
  333. # 'uid': self.uid,
  334. # 'operation': 'view_filter',
  335. # 'request_videos': filtered_pre_result,
  336. # 'view_filter_result': filtered_viewed_result,
  337. # 'executeTime': (time.time() - st_viewed) * 1000
  338. # })
  339. if not filtered_viewed_result:
  340. return None
  341. filtered_viewed_videos = [int(video_id) for video_id in filtered_viewed_result]
  342. return filtered_viewed_videos
  343. # if pool_type == 'flow' or pool_type=='normal':
  344. # # 流量池视频需过滤屏蔽视频
  345. # if region_code is None or shield_config is None:
  346. # return filtered_viewed_videos
  347. # else:
  348. # shield_key_name_list = shield_config.get(region_code, None)
  349. # if shield_key_name_list is not None:
  350. # filtered_shield_video_ids = self.filter_shield_video(
  351. # video_ids=filtered_viewed_videos, shield_key_name_list=shield_key_name_list
  352. # )
  353. # log_.info({
  354. # 'logTimestamp': int(time.time() * 1000),
  355. # 'pool_type': pool_type,
  356. # 'request_id': self.request_id,
  357. # 'app_type': self.app_type,
  358. # 'mid': self.mid,
  359. # 'uid': self.uid,
  360. # 'operation': 'shield_filter',
  361. # 'request_videos': filtered_viewed_videos,
  362. # 'shield_filter_result': filtered_shield_video_ids,
  363. # 'executeTime': (time.time() - st_viewed) * 1000
  364. # })
  365. # # print("filtered_pre flow:", (time.time() - st_viewed) * 1000)
  366. # return filtered_shield_video_ids
  367. # else:
  368. # return filtered_viewed_videos
  369. # else:
  370. # return filtered_viewed_videos
  371. def filter_video_previewed(self, video_ids):
  372. """
  373. 预曝光过滤
  374. :param video_ids: 需过滤的视频列表 type-list
  375. :return: filtered_videos 过滤后的列表 type-list
  376. """
  377. pre_time = time.time()
  378. if not self.mid or self.mid == 'null':
  379. # mid为空时,不做预曝光过滤
  380. return video_ids
  381. # 根据Redis缓存中的数据过滤
  382. redis_helper = RedisHelper()
  383. # key拼接
  384. key_name = f"{config_.PREVIEW_KEY_PREFIX}{self.app_type}:{self.mid}"
  385. #print("key_name:", key_name)
  386. pe_videos_list = redis_helper.get_data_from_set(key_name)
  387. #print("pe_videos_list:", pe_videos_list)
  388. # log_.info('****app_type = {}, mid = {}, uid = {}, pe_videos_list = {}'.format(
  389. # self.app_type, self.mid, self.uid, pe_videos_list))
  390. # log_.info('****app_type = {}, mid = {}, uid = {}, video_ids = {}'.format(
  391. # self.app_type, self.mid, self.uid, video_ids))
  392. if not pe_videos_list:
  393. return video_ids
  394. pe_videos = [int(video) for video in pe_videos_list]
  395. #print("pe_videos:", len(pe_videos))
  396. filtered_videos = [video_id for video_id in video_ids if video_id not in pe_videos]
  397. #print(f"pre res: {filtered_videos}\nexecute_time: {(time.time() - pre_time) * 1000}")
  398. return filtered_videos
  399. # def filter_video_status(self, video_ids):
  400. # """
  401. # 对视频状态进行过滤
  402. # :param video_ids: 视频id列表 type-list
  403. # :return: filtered_videos
  404. # """
  405. # if len(video_ids) == 1:
  406. # sql = "set hg_experimental_enable_shard_pruning=off; " \
  407. # "SELECT video_id " \
  408. # "FROM {} " \
  409. # "WHERE audit_status = 5 " \
  410. # "AND applet_rec_status IN (1, -6) " \
  411. # "AND open_status = 1 " \
  412. # "AND payment_status = 0 " \
  413. # "AND encryption_status != 5 " \
  414. # "AND transcoding_status = 3 " \
  415. # "AND video_id IN ({});".format(config_.VIDEO_STATUS, video_ids[0])
  416. # else:
  417. # sql = "set hg_experimental_enable_shard_pruning=off; " \
  418. # "SELECT video_id " \
  419. # "FROM {} " \
  420. # "WHERE audit_status = 5 " \
  421. # "AND applet_rec_status IN (1, -6) " \
  422. # "AND open_status = 1 " \
  423. # "AND payment_status = 0 " \
  424. # "AND encryption_status != 5 " \
  425. # "AND transcoding_status = 3 " \
  426. # "AND video_id IN {};".format(config_.VIDEO_STATUS, tuple(video_ids))
  427. #
  428. # hologres_helper = HologresHelper()
  429. # data = hologres_helper.get_data(sql=sql)
  430. # filtered_videos = [int(temp[0]) for temp in data]
  431. # return filtered_videos
  432. def filter_video_viewed(self, video_ids, types=(1, 6,)):
  433. """
  434. 调用后端接口过滤用户已观看视频
  435. :param video_ids: 视频id列表 type-list
  436. :param types: 过滤参数 type-tuple, 默认(1, )
  437. 1-已观看 2-视频状态 3-是否进入老年人社区 4-话题状态 5-推荐状态 6-白名单过滤 7-涉政视频过滤
  438. :return: filtered_videos
  439. """
  440. # 获取对应端的过滤参数types
  441. types = config_.FILTER_VIEWED_TYPES_CONFIG.get(self.app_type, None)
  442. if types is None:
  443. types = config_.FILTER_VIEWED_TYPES_CONFIG.get('other')
  444. request_data = {"appType": self.app_type,
  445. "mid": self.mid,
  446. "uid": self.uid,
  447. "types": list(types),
  448. "videoIds": video_ids}
  449. # print(request_data)
  450. # 调用http接口
  451. result = request_post(request_url=config_.VIDEO_FILTER_URL, request_data=request_data, timeout=(0.1, 1))
  452. # print("result:", result)
  453. if result is None:
  454. # print("result is None")
  455. # log_.info('过滤失败,types: {}'.format(types))
  456. return []
  457. if result['code'] != 0:
  458. # log_.info('过滤失败,types: {}'.format(types))
  459. return []
  460. filtered_videos = result['data']
  461. return filtered_videos
  462. def filter_video_viewed_new(self, video_ids):
  463. """
  464. 调用后端接口过滤用户已观看视频
  465. :param video_ids: 视频id列表 type-list
  466. :param types: 过滤参数 type-tuple, 默认(1, )
  467. 1-已观看 2-视频状态 3-是否进入老年人社区 4-话题状态 5-推荐状态 6-白名单过滤 7-涉政视频过滤
  468. :return: filtered_videos
  469. """
  470. # 获取对应端的过滤参数types
  471. st_time = time.time()
  472. types = config_.FILTER_VIEWED_TYPES_CONFIG.get(self.app_type, None)
  473. #print(types)
  474. if types is None:
  475. types = config_.FILTER_VIEWED_TYPES_CONFIG.get('other')
  476. if 6 in types:
  477. types = list(types)
  478. types.remove(6)
  479. #print(types)
  480. request_data = {"appType": self.app_type,
  481. "mid": self.mid,
  482. "uid": self.uid,
  483. "types": list(types),
  484. "videoIds": video_ids}
  485. # 调用http接口
  486. result = request_post(request_url=config_.VIDEO_FILTER_URL, request_data=request_data, timeout=(0.1, 1))
  487. #print(f"view res: {result}\nexecute_time: {(time.time() - st_time) * 1000}")
  488. if result is None:
  489. # log_.info('过滤失败,types: {}'.format(types))
  490. return []
  491. if result['code'] != 0:
  492. # log_.info('过滤失败,types: {}'.format(types))
  493. return []
  494. filtered_videos = result['data']
  495. return filtered_videos
  496. def filter_shield_video(self, video_ids, shield_key_name_list):
  497. """
  498. 过滤屏蔽视频视频
  499. :param video_ids: 需过滤的视频列表 type-list
  500. :param shield_key_name_list: 过滤视频 redis-key
  501. :return: filtered_videos 过滤后的列表 type-list
  502. """
  503. # print("filter_shield_video:", len(filter_shield_video))
  504. if len(video_ids) == 0:
  505. return video_ids
  506. # 根据Redis缓存中的数据过滤
  507. redis_helper = RedisHelper()
  508. for shield_key_name in shield_key_name_list:
  509. video_ids = [
  510. int(video_id) for video_id in video_ids
  511. if not redis_helper.data_exists_with_set(key_name=shield_key_name, value=video_id)
  512. ]
  513. # shield_videos_list = redis_helper.get_data_from_set(key_name=shield_key_name)
  514. # if not shield_videos_list:
  515. # continue
  516. # shield_videos = [int(video) for video in shield_videos_list]
  517. # video_ids = [int(video_id) for video_id in video_ids if int(video_id) not in shield_videos]
  518. # print("video_ids:", len(video_ids))
  519. return video_ids
  520. def new_filter_video(self):
  521. """视频过滤"""
  522. # 1. 预曝光过滤
  523. st_pre = time.time()
  524. #print("new_filter video_ids:", self.video_ids)
  525. filtered_pre_result = self.filter_video_previewed(self.video_ids)
  526. if not filtered_pre_result:
  527. return None
  528. # log_.info({
  529. # 'logTimestamp': int(time.time() * 1000),
  530. # 'request_id': self.request_id,
  531. # 'app_type': self.app_type,
  532. # 'mid': self.mid,
  533. # 'uid': self.uid,
  534. # 'operation': 'preview_filter',
  535. # 'request_videos': self.video_ids,
  536. # 'preview_filter_result': filtered_pre_result,
  537. # 'executeTime': (time.time() - st_pre) * 1000
  538. # })
  539. #2. 视频已曝光过滤
  540. st_viewed = time.time()
  541. #print("---filtered viewed---")
  542. #print("filtered_pre_result:",filtered_pre_result)
  543. filtered_viewed_result = self.filter_video_viewed(video_ids=filtered_pre_result)
  544. if not filtered_viewed_result:
  545. return None
  546. return filtered_viewed_result
  547. def new_flow_video(self, vid_list, flow_vids_set, region_code, shield_config):
  548. flow_video_list = []
  549. normal_video_list = []
  550. for v_id in vid_list:
  551. if v_id in flow_vids_set:
  552. flow_video_list.append(v_id)
  553. else:
  554. normal_video_list.append(v_id)
  555. shield_key_name_list = shield_config.get(region_code, None)
  556. if shield_key_name_list is not None:
  557. filtered_shield_video_ids = self.filter_shield_video(
  558. video_ids=flow_video_list, shield_key_name_list=shield_key_name_list
  559. )
  560. return normal_video_list, filtered_shield_video_ids
  561. else:
  562. return normal_video_list, flow_video_list
  563. def filter_movie_religion_video(self, video_ids):
  564. """过滤白名单视频(影视,宗教)"""
  565. # 影视 + 宗教: rov.filter.movie.{videoId}
  566. # 宗教: rov.filter.religion.{videoId}
  567. st_time = time.time()
  568. if self.app_type not in [config_.APP_TYPE['WAN_NENG_VIDEO'],
  569. config_.APP_TYPE['LAO_HAO_KAN_VIDEO'],
  570. config_.APP_TYPE['ZUI_JING_QI'],
  571. config_.APP_TYPE['H5']]:
  572. # 过滤 影视 + 宗教
  573. keys = [f"rov.filter.movie.{video_id}" for video_id in video_ids]
  574. elif self.app_type in [config_.APP_TYPE['WAN_NENG_VIDEO'],
  575. config_.APP_TYPE['ZUI_JING_QI'],
  576. config_.APP_TYPE['H5']]:
  577. # 过滤 影视 + 宗教
  578. keys = [f"rov.filter.religion.{video_id}" for video_id in video_ids]
  579. else:
  580. #print(f"m_r res: {video_ids}\nexecute_time: {(time.time() - st_time) * 1000}")
  581. return video_ids
  582. redis_helper = RedisHelper(redis_info=config_.REDIS_INFO_FILTER)
  583. filter_videos = []
  584. for i in range(len(keys)//1000 + 1):
  585. video_ids_temp = video_ids[i*1000:(i+1)*1000]
  586. if len(video_ids_temp) == 0:
  587. break
  588. mget_res = redis_helper.mget(keys=keys[i*1000:(i+1)*1000])
  589. filter_videos.extend([int(data) for data in mget_res if data is not None])
  590. if len(filter_videos) > 0:
  591. filtered_videos = set(video_ids) - set(filter_videos)
  592. #print(f"m_r res: {list(filtered_videos)}\nexecute_time: {(time.time() - st_time) * 1000}")
  593. return list(filtered_videos)
  594. else:
  595. #print(f"m_r res: {video_ids}\nexecute_time: {(time.time() - st_time) * 1000}")
  596. return video_ids
  597. def filter_videos_new(self, region_code=None, shield_config=None, flow_set=None):
  598. """视频过滤"""
  599. # 预曝光过滤
  600. st_pre = time.time()
  601. #print("self.video_ids:", len(self.video_ids))
  602. filtered_pre_result = self.filter_video_previewed(self.video_ids)
  603. if not filtered_pre_result:
  604. return None
  605. #print("filtered_pre_result:", len(filtered_pre_result))
  606. #print(filtered_pre_result)
  607. # 视频已曝光过滤/白名单过滤
  608. st_viewed = time.time()
  609. t = [
  610. gevent.spawn(self.filter_video_viewed_new, filtered_pre_result),
  611. gevent.spawn(self.filter_movie_religion_video, filtered_pre_result)]
  612. gevent.joinall(t)
  613. filtered_result_list = [i.get() for i in t]
  614. #print("filtered_result_list1:",filtered_result_list[0])
  615. #print("filtered_result_list2:",filtered_result_list[1])
  616. filtered_viewed_set = set('')
  617. for i in filtered_result_list[0]:
  618. filtered_viewed_set.add(int(i))
  619. filter_video_set =set('')
  620. for j in filtered_result_list[1]:
  621. filter_video_set.add(int(j))
  622. filtered_viewed_result = list(filtered_viewed_set & filter_video_set)
  623. #print(f"view&m_r res: {filtered_viewed_result}\nexecute_time: {(time.time() - st_viewed) * 1000}")
  624. #print("filtered:",len(filtered_viewed_result))
  625. if not filtered_viewed_result:
  626. return None
  627. filtered_viewed_videos = [int(video_id) for video_id in filtered_viewed_result]
  628. #print("result:", filtered_viewed_videos)
  629. if flow_set is None:
  630. return filtered_viewed_videos
  631. else:
  632. # 流量池视频需过滤屏蔽视频
  633. if region_code is None or shield_config is None:
  634. return filtered_viewed_videos
  635. else:
  636. normal_recall_ids = []
  637. left_flow_ids = []
  638. for vid in filtered_viewed_videos:
  639. if vid in flow_set:
  640. left_flow_ids.append(vid)
  641. else:
  642. normal_recall_ids.append(vid)
  643. shield_key_name_list = shield_config.get(region_code, None)
  644. if shield_key_name_list is not None:
  645. filtered_shield_video_ids = self.filter_shield_video(
  646. video_ids=left_flow_ids, shield_key_name_list=shield_key_name_list
  647. )
  648. return normal_recall_ids+filtered_shield_video_ids
  649. else:
  650. return filtered_viewed_videos
  651. def filter_videos_status(self, pool_type='rov', region_code=None, shield_config=None):
  652. """视频过滤"""
  653. # todo: 添加app和region的风险过滤。
  654. videos_filtered = self.filter_videos_with_risk_video(self.video_ids, self.app_type, region_code)
  655. # 预曝光过滤
  656. st_pre = time.time()
  657. filtered_pre_result = self.filter_video_previewed(videos_filtered)
  658. # print("filtered_pre:", (time.time()-st_pre)*1000)
  659. # et_pre = time.time()
  660. # log_.info({
  661. # 'logTimestamp': int(time.time() * 1000),
  662. # 'request_id': self.request_id,
  663. # 'app_type': self.app_type,
  664. # 'mid': self.mid,
  665. # 'uid': self.uid,
  666. # 'operation': 'preview_filter',
  667. # 'request_videos': self.video_ids,
  668. # 'preview_filter_result': filtered_pre_result,
  669. # 'executeTime': (time.time() - st_pre) * 1000
  670. # })
  671. if not filtered_pre_result:
  672. return None
  673. # 视频状态过滤采用离线定时过滤方案
  674. # 视频状态过滤
  675. # st_status = time.time()
  676. # filtered_status_result = self.filter_video_status(video_ids=filtered_pre_result)
  677. # et_status = time.time()
  678. # log_.info('filter by video status: result = {}, execute time = {}ms'.format(
  679. # filtered_status_result, (et_status - st_status) * 1000))
  680. # if not filtered_status_result:
  681. # return None
  682. # 视频已曝光过滤
  683. st_viewed = time.time()
  684. filtered_viewed_result = self.filter_video_viewed_status(video_ids=filtered_pre_result)
  685. # print("filtered_pre:", (time.time() - st_viewed) * 1000)
  686. # et_viewed = time.time()
  687. # log_.info({
  688. # 'logTimestamp': int(time.time() * 1000),
  689. # 'pool_type': pool_type,
  690. # 'request_id': self.request_id,
  691. # 'app_type': self.app_type,
  692. # 'mid': self.mid,
  693. # 'uid': self.uid,
  694. # 'operation': 'view_filter',
  695. # 'request_videos': filtered_pre_result,
  696. # 'view_filter_result': filtered_viewed_result,
  697. # 'executeTime': (time.time() - st_viewed) * 1000
  698. # })
  699. if not filtered_viewed_result:
  700. return None
  701. filtered_viewed_videos = [int(video_id) for video_id in filtered_viewed_result]
  702. return filtered_viewed_videos
  703. # if pool_type == 'flow' or pool_type=='normal':
  704. # # 流量池视频需过滤屏蔽视频
  705. # if region_code is None or shield_config is None:
  706. # return filtered_viewed_videos
  707. # else:
  708. # shield_key_name_list = shield_config.get(region_code, None)
  709. # if shield_key_name_list is not None:
  710. # filtered_shield_video_ids = self.filter_shield_video(
  711. # video_ids=filtered_viewed_videos, shield_key_name_list=shield_key_name_list
  712. # )
  713. # log_.info({
  714. # 'logTimestamp': int(time.time() * 1000),
  715. # 'pool_type': pool_type,
  716. # 'request_id': self.request_id,
  717. # 'app_type': self.app_type,
  718. # 'mid': self.mid,
  719. # 'uid': self.uid,
  720. # 'operation': 'shield_filter',
  721. # 'request_videos': filtered_viewed_videos,
  722. # 'shield_filter_result': filtered_shield_video_ids,
  723. # 'executeTime': (time.time() - st_viewed) * 1000
  724. # })
  725. # # print("filtered_pre flow:", (time.time() - st_viewed) * 1000)
  726. # return filtered_shield_video_ids
  727. # else:
  728. # return filtered_viewed_videos
  729. # else:
  730. # return filtered_viewed_videos
  731. def filter_video_viewed_status(self, video_ids, types=(1, 6,)):
  732. """
  733. 调用后端接口过滤用户已观看视频
  734. :param video_ids: 视频id列表 type-list
  735. :param types: 过滤参数 type-tuple, 默认(1, )
  736. 1-已观看 2-视频状态 3-是否进入老年人社区 4-话题状态 5-推荐状态 6-白名单过滤 7-涉政视频过滤
  737. :return: filtered_videos
  738. """
  739. # 获取对应端的过滤参数types
  740. types = config_.FILTER_VIEWED_TYPES_CONFIG.get(self.app_type, None)
  741. if types is None:
  742. types = config_.FILTER_VIEWED_TYPES_CONFIG.get('other')
  743. types = list(types)
  744. types.append(2)
  745. request_data = {"appType": self.app_type,
  746. "mid": self.mid,
  747. "uid": self.uid,
  748. "types": types,
  749. "videoIds": video_ids}
  750. # print(request_data)
  751. # 调用http接口
  752. result = request_post(request_url=config_.VIDEO_FILTER_URL, request_data=request_data, timeout=(0.1, 1))
  753. # print("result:", result)
  754. if result is None:
  755. # print("result is None")
  756. # log_.info('过滤失败,types: {}'.format(types))
  757. return []
  758. if result['code'] != 0:
  759. # log_.info('过滤失败,types: {}'.format(types))
  760. return []
  761. filtered_videos = result['data']
  762. return filtered_videos
  763. def filter_videos_with_risk_video(self, video_ids, app_type, region_code):
  764. # 1 判断是否过滤,不展示的app+区域列表
  765. app_region_filtered = {
  766. 0: [110000]
  767. }
  768. # 初始化为:展示,不过滤。todo 测试时使用 True
  769. if_filtered = False
  770. if_filtered = True
  771. if app_type in app_region_filtered.keys() and region_code in app_region_filtered[app_type]:
  772. if_filtered = True
  773. if not if_filtered:
  774. return copy.deepcopy(video_ids[0: min(20, len(video_ids))])
  775. # 2 确认过滤,获取风险video列表
  776. videos_with_risk = [i for i in range(1000)]
  777. videos_with_risk.append(7536230)
  778. # 3 过滤 返回结果
  779. video_ids_new = [id for id in video_ids if id not in videos_with_risk]
  780. return video_ids_new[0: min(20, len(video_ids_new))]
  781. if __name__ == '__main__':
  782. user = [
  783. ('weixin_openid_o0w175fDc8pNnywrYN49E341tKfI', ''),
  784. ('weixin_openid_o0w175YwC3hStzcR5DAQdbgzdMeI', ''),
  785. ('weixin_openid_o0w175ftZDl6VJVDx9la3WVPh7mU', '15900461'),
  786. ('weixin_openid_o0w175SPqpCVRcp7x1XvnX4qpIvI', '19659040'),
  787. ('weixin_openid_o0w175cOnguapyWIrDrHkOWl4oFQ', '31210128'),
  788. ('weixin_openid_o0w175UXYId-o71e1Q3SOheYNteQ', '33099722'),
  789. ('weixin_openid_o0w175QQ5b42AtOe50bchrFgcttA', ''),
  790. ('weixin_openid_o0w175bgaPlfLsp3YLDKWqLWtXX8', '35371534'),
  791. ('weixin_openid_o0w175eRpvbmV6nOhM1VTyyLICWA', '30488803'),
  792. ('weixin_openid_o0w175TZYvG47pQkOjyJFoxQuqsw', '')
  793. ]
  794. video_df = pd.read_csv('./data/videoids.csv')
  795. videoid_list = video_df['videoid'].tolist()
  796. for mid, uid in user:
  797. video_ids = random.sample(videoid_list, 1000)
  798. start_time = time.time()
  799. filter_ = FilterVideos(request_id=f'{mid} - {uid}', app_type=0, mid=mid, uid=uid, video_ids=video_ids)
  800. res = filter_.filter_videos_new()
  801. print(f"res: {res}\nexecute_time: {(time.time() - start_time) * 1000}")
  802. # filter_.filter_video_status(video_ids=[1, 3, 5])
  803. # videos = [{'videoId': 9034659, 'flowPool': '3#11#3#1637824188547'}, {'videoId': 9035052, 'flowPool': '3#11#3#1637824172827'}]
  804. # res = get_videos_remain_view_count(4, videos)
  805. # print(res)
  806. # text = '测试 @李倩'
  807. # send_msg_to_feishu(text)
  808. # update_video_w_h_rate(video_id=113, key_name='')
  809. # mid = "weixin_openid_obHDW5c4g3aULfCWh-68LcUSxCB"
  810. # request_url = f"{config_.GET_USER_30DayReturnCnt_URL}{mid}"
  811. # res = request_get(request_url=request_url, timeout=100)
  812. # res = get_user_has30day_return(mid=mid)
  813. # print(res, type(res))