utils.py 35 KB


  1. import traceback
  2. import requests
  3. import json
  4. import time
  5. import gevent
  6. import pandas as pd
  7. import random
  8. from datetime import datetime
  9. # from db_helper import HologresHelper, RedisHelper, MysqlHelper
  10. from db_helper import RedisHelper, MysqlHelper
  11. from config import set_config
  12. from log import Log
  13. config_ = set_config()
  14. log_ = Log()
  15. def send_msg_to_feishu(msg_text):
  16. """发送消息到飞书"""
  17. # webhook地址
  18. webhook = 'https://open.feishu.cn/open-apis/bot/v2/hook/8de4de35-30ed-4692-8854-7a154e89b2f2'
  19. # 自定义关键词key_word
  20. key_word = '服务报警'
  21. headers = {'Content-Type': 'application/json'}
  22. payload_message = {
  23. "msg_type": "text",
  24. "content": {
  25. "text": '{}: {}'.format(key_word, msg_text)
  26. }
  27. }
  28. response = requests.request('POST', url=webhook, headers=headers, data=json.dumps(payload_message))
  29. # print(response.text)
  30. def request_post(request_url, request_data, timeout):
  31. """
  32. post 请求 HTTP接口
  33. :param request_url: 接口URL
  34. :param request_data: 请求参数
  35. :param timeout: 超时时间,单位为秒,type-float or tuple(connect_timeout, read_timeout)
  36. :return: res_data json格式
  37. """
  38. try:
  39. headers = {"Connection": "close"}
  40. print(request_url)
  41. print(headers)
  42. response = requests.post(url=request_url, json=request_data, timeout=timeout, headers=headers)
  43. print("response:", response)
  44. if response.status_code == 200:
  45. res_data = json.loads(response.text)
  46. return res_data
  47. else:
  48. return None
  49. except Exception as e:
  50. print(e)
  51. log_.error('url: {}, exception: {}, traceback: {}'.format(request_url, e, traceback.format_exc()))
  52. return None
  53. def request_post_data(request_url, request_data, timeout):
  54. """
  55. post 请求 HTTP接口
  56. :param request_url: 接口URL
  57. :param request_data: 请求参数
  58. :param timeout: 超时时间,单位为秒,type-float or tuple(connect_timeout, read_timeout)
  59. :return: res_data json格式
  60. """
  61. try:
  62. headers = {'content-type': 'application/json'}
  63. print(request_url)
  64. print(headers)
  65. response = requests.post(url=request_url, data=request_data, timeout=timeout, headers=headers)
  66. print("response:", response)
  67. if response.status_code == 200:
  68. res_data = json.loads(response.text)
  69. return res_data['outputs']
  70. else:
  71. return None
  72. except Exception as e:
  73. print(e)
  74. log_.error('url: {}, exception: {}, traceback: {}'.format(request_url, e, traceback.format_exc()))
  75. return None
  76. def request_get(request_url, timeout):
  77. """
  78. get 请求 HTTP接口
  79. :param request_url: 接口URL
  80. :param timeout: 超时时间,单位为秒,type-float or tuple(connect_timeout, read_timeout)
  81. :return: res_data json格式
  82. """
  83. try:
  84. response = requests.get(url=request_url, timeout=timeout)
  85. if response.status_code == 200:
  86. res_data = json.loads(response.text)
  87. return res_data
  88. else:
  89. return None
  90. except Exception as e:
  91. log_.error('url: {}, exception: {}, traceback: {}'.format(request_url, e, traceback.format_exc()))
  92. return None
  93. def get_user_has30day_return(mid):
  94. """
  95. 获取用户近30天是否有回流
  96. :param mid: mid
  97. :return: data, type
  98. """
  99. if not mid:
  100. return None
  101. # 获取redis中存储的状态值
  102. user_key = f"{config_.KEY_NAME_PREFIX_USER_HAS30DAY_RETURN}{mid}"
  103. redis_helper = RedisHelper()
  104. data = redis_helper.get_data_from_redis(key_name=user_key)
  105. if data is not None:
  106. return int(data)
  107. else:
  108. request_url = f"{config_.GET_USER_30DayReturnCnt_URL}{mid}"
  109. result = request_get(request_url=request_url, timeout=0.1)
  110. if result is None:
  111. return None
  112. if result['code'] != 0:
  113. return None
  114. data = result['data']
  115. if data is True:
  116. redis_data = 1
  117. else:
  118. redis_data = 0
  119. redis_helper.set_data_to_redis(key_name=user_key, value=redis_data, expire_time=2 * 3600)
  120. return redis_data
  121. def get_videos_remain_view_count(app_type, videos):
  122. """
  123. 获取视频在流量池中的剩余可分发数
  124. :param app_type: 产品标识 type-int
  125. :param videos: 视频信息 (视频id, 流量池标记) type-list,[{'videoId': video_id, 'flowPool': flow_pool}, ...]
  126. :return: data type-list,[(video_id, flow_pool, view_count), ...]
  127. error_flag 错误标记,True为错误
  128. """
  129. error_flag = False
  130. if not videos:
  131. return [], error_flag
  132. request_data = {'appType': app_type, 'videos': videos}
  133. result = request_post(request_url=config_.GET_REMAIN_VIEW_COUNT_URL, request_data=request_data, timeout=(0.1, 1))
  134. if result is None:
  135. error_flag = True
  136. return [], error_flag
  137. if result['code'] != 0:
  138. log_.info('获取视频在流量池中的剩余可分发数失败')
  139. error_flag = True
  140. return [], error_flag
  141. data = [(item['videoId'], item['flowPool'], item['distributeCount']) for item in result['data']]
  142. return data, error_flag
  143. def get_videos_local_distribute_count(video_id, flow_pool):
  144. """
  145. 获取流量池视频本地分发数
  146. :param video_id: video_id
  147. :param flow_pool: 流量池标记
  148. :return: current_count 本地记录的分发数
  149. """
  150. # redis_h = datetime.now().hour
  151. # if datetime.now().minute >= 30:
  152. # redis_h += 0.5
  153. # key_name = config_.LOCAL_DISTRIBUTE_COUNT_PREFIX + str(redis_h)
  154. key_name = f'{config_.LOCAL_DISTRIBUTE_COUNT_PREFIX}{video_id}:{flow_pool}'
  155. redis_helper = RedisHelper()
  156. # video = '{}-{}'.format(video_id, flow_pool)
  157. # current_count = redis_helper.get_score_with_value(key_name=key_name, value=video)
  158. current_count = redis_helper.get_data_from_redis(key_name=key_name)
  159. if current_count is not None:
  160. return int(current_count)
  161. else:
  162. return None
  163. def update_video_w_h_rate(video_id, key_name):
  164. """
  165. 获取横屏视频的宽高比,并存入redis中 (width/height>1)
  166. :param video_id: videoId type-int
  167. :param key_name: redis key
  168. :return: None
  169. """
  170. # 获取数据
  171. sql = "SELECT id, width, height, rotate FROM longvideo.wx_video WHERE id = {};".format(video_id)
  172. mysql_helper = MysqlHelper()
  173. data = mysql_helper.get_data(sql=sql)
  174. if len(data) == 0:
  175. return
  176. # 更新到redis
  177. width, height, rotate = int(data[0][1]), int(data[0][2]), int(data[0][3])
  178. if width == 0 or height == 0:
  179. return
  180. if rotate in (90, 270):
  181. w_h_rate = height / width
  182. else:
  183. w_h_rate = width / height
  184. if w_h_rate > 1:
  185. info_data = {int(video_id): w_h_rate}
  186. else:
  187. return
  188. redis_helper = RedisHelper()
  189. # 写入新数据
  190. if len(info_data) > 0:
  191. redis_helper.add_data_with_zset(key_name=key_name, data=info_data)
  192. class FilterVideos(object):
  193. """视频过滤"""
  194. def __init__(self, request_id, app_type, video_ids, mid='', uid=''):
  195. """
  196. 初始化
  197. :param request_id: request_id
  198. :param app_type: 产品标识 type-int
  199. :param video_ids: 需过滤的视频列表 type-list
  200. :param mid: mid type-string
  201. :param uid: uid type-string
  202. """
  203. self.request_id = request_id
  204. self.app_type = app_type
  205. self.mid = mid
  206. self.uid = uid
  207. self.video_ids = video_ids
  208. def filter_video_status_h(self, video_ids, rule_key, data_key, ab_code, province_code, key_flag=''):
  209. """召回小时级更新的视频状态过滤"""
  210. # 根据Redis缓存中的数据过滤
  211. redis_helper = RedisHelper()
  212. # 获取不符合推荐状态的视频
  213. if ab_code in [code for _, code in config_.AB_CODE['region_rank_by_h'].items()]:
  214. if key_flag == 'region_24h':
  215. key_prefix = f"{config_.REGION_H_VIDEO_FILER_24H}{province_code}."
  216. elif key_flag == 'day_24h':
  217. key_prefix = f"{config_.H_VIDEO_FILER_24H}{province_code}."
  218. else:
  219. key_prefix = f"{config_.REGION_H_VIDEO_FILER}{province_code}."
  220. elif ab_code in [code for _, code in config_.AB_CODE['rank_by_24h'].items()]:
  221. key_prefix = config_.H_VIDEO_FILER_24H
  222. elif key_flag == '24h':
  223. key_prefix = config_.H_VIDEO_FILER_24H
  224. else:
  225. key_prefix = config_.H_VIDEO_FILER
  226. filter_videos_list = redis_helper.get_data_from_set(
  227. key_name=f"{key_prefix}{self.app_type}.{data_key}.{rule_key}"
  228. )
  229. if not filter_videos_list:
  230. return video_ids
  231. filter_videos = [int(video) for video in filter_videos_list]
  232. filtered_videos = [video_id for video_id in video_ids if video_id not in filter_videos]
  233. return filtered_videos
  234. def filter_videos_h(self, rule_key, data_key, ab_code, province_code, key_flag='', pool_type='rov'):
  235. """召回小时级更新的视频过滤"""
  236. # 预曝光过滤
  237. # st_pre = time.time()
  238. filtered_pre_result = self.filter_video_previewed(self.video_ids)
  239. # et_pre = time.time()
  240. # log_.info({
  241. # 'logTimestamp': int(time.time() * 1000),
  242. # 'request_id': self.request_id,
  243. # 'app_type': self.app_type,
  244. # 'mid': self.mid,
  245. # 'uid': self.uid,
  246. # 'operation': 'preview_filter',
  247. # 'request_videos': self.video_ids,
  248. # 'preview_filter_result': filtered_pre_result,
  249. # 'executeTime': (time.time() - st_pre) * 1000
  250. # })
  251. if not filtered_pre_result:
  252. return None
  253. # 视频状态过滤
  254. # st_status = time.time()
  255. filtered_status_result = self.filter_video_status_h(video_ids=filtered_pre_result, rule_key=rule_key,
  256. data_key=data_key, ab_code=ab_code,
  257. province_code=province_code, key_flag=key_flag)
  258. # et_status = time.time()
  259. # log_.info({
  260. # 'logTimestamp': int(time.time() * 1000),
  261. # 'request_id': self.request_id,
  262. # 'app_type': self.app_type,
  263. # 'mid': self.mid,
  264. # 'uid': self.uid,
  265. # 'operation': 'status_filter',
  266. # 'request_videos': filtered_pre_result,
  267. # 'status_filter_result': filtered_status_result,
  268. # 'executeTime': (time.time() - st_status) * 1000
  269. # })
  270. if not filtered_status_result:
  271. return None
  272. # 视频已曝光过滤
  273. st_viewed = time.time()
  274. filtered_viewed_result = self.filter_video_viewed(video_ids=filtered_status_result)
  275. # et_viewed = time.time()
  276. log_.info({
  277. 'logTimestamp': int(time.time() * 1000),
  278. 'pool_type': pool_type,
  279. 'request_id': self.request_id,
  280. 'app_type': self.app_type,
  281. 'mid': self.mid,
  282. 'uid': self.uid,
  283. 'operation': 'view_filter',
  284. 'request_videos': filtered_status_result,
  285. 'view_filter_result': filtered_viewed_result,
  286. 'executeTime': (time.time() - st_viewed) * 1000
  287. })
  288. if not filtered_viewed_result:
  289. return None
  290. else:
  291. return [int(video_id) for video_id in filtered_viewed_result]
  292. def filter_videos(self, pool_type='rov', region_code=None, shield_config=None):
  293. """视频过滤"""
  294. # 预曝光过滤
  295. st_pre = time.time()
  296. filtered_pre_result = self.filter_video_previewed(self.video_ids)
  297. # print("filtered_pre:", (time.time()-st_pre)*1000)
  298. # et_pre = time.time()
  299. # log_.info({
  300. # 'logTimestamp': int(time.time() * 1000),
  301. # 'request_id': self.request_id,
  302. # 'app_type': self.app_type,
  303. # 'mid': self.mid,
  304. # 'uid': self.uid,
  305. # 'operation': 'preview_filter',
  306. # 'request_videos': self.video_ids,
  307. # 'preview_filter_result': filtered_pre_result,
  308. # 'executeTime': (time.time() - st_pre) * 1000
  309. # })
  310. if not filtered_pre_result:
  311. return None
  312. # 视频状态过滤采用离线定时过滤方案
  313. # 视频状态过滤
  314. # st_status = time.time()
  315. # filtered_status_result = self.filter_video_status(video_ids=filtered_pre_result)
  316. # et_status = time.time()
  317. # log_.info('filter by video status: result = {}, execute time = {}ms'.format(
  318. # filtered_status_result, (et_status - st_status) * 1000))
  319. # if not filtered_status_result:
  320. # return None
  321. # 视频已曝光过滤
  322. st_viewed = time.time()
  323. filtered_viewed_result = self.filter_video_viewed(video_ids=filtered_pre_result)
  324. # print("filtered_pre:", (time.time() - st_viewed) * 1000)
  325. # et_viewed = time.time()
  326. # log_.info({
  327. # 'logTimestamp': int(time.time() * 1000),
  328. # 'pool_type': pool_type,
  329. # 'request_id': self.request_id,
  330. # 'app_type': self.app_type,
  331. # 'mid': self.mid,
  332. # 'uid': self.uid,
  333. # 'operation': 'view_filter',
  334. # 'request_videos': filtered_pre_result,
  335. # 'view_filter_result': filtered_viewed_result,
  336. # 'executeTime': (time.time() - st_viewed) * 1000
  337. # })
  338. if not filtered_viewed_result:
  339. return None
  340. filtered_viewed_videos = [int(video_id) for video_id in filtered_viewed_result]
  341. if pool_type == 'flow' or pool_type=='normal':
  342. # 流量池视频需过滤屏蔽视频
  343. if region_code is None or shield_config is None:
  344. return filtered_viewed_videos
  345. else:
  346. shield_key_name_list = shield_config.get(region_code, None)
  347. if shield_key_name_list is not None:
  348. filtered_shield_video_ids = self.filter_shield_video(
  349. video_ids=filtered_viewed_videos, shield_key_name_list=shield_key_name_list
  350. )
  351. log_.info({
  352. 'logTimestamp': int(time.time() * 1000),
  353. 'pool_type': pool_type,
  354. 'request_id': self.request_id,
  355. 'app_type': self.app_type,
  356. 'mid': self.mid,
  357. 'uid': self.uid,
  358. 'operation': 'shield_filter',
  359. 'request_videos': filtered_viewed_videos,
  360. 'shield_filter_result': filtered_shield_video_ids,
  361. 'executeTime': (time.time() - st_viewed) * 1000
  362. })
  363. # print("filtered_pre flow:", (time.time() - st_viewed) * 1000)
  364. return filtered_shield_video_ids
  365. else:
  366. return filtered_viewed_videos
  367. else:
  368. return filtered_viewed_videos
  369. def filter_video_previewed(self, video_ids):
  370. """
  371. 预曝光过滤
  372. :param video_ids: 需过滤的视频列表 type-list
  373. :return: filtered_videos 过滤后的列表 type-list
  374. """
  375. pre_time = time.time()
  376. if not self.mid or self.mid == 'null':
  377. # mid为空时,不做预曝光过滤
  378. return video_ids
  379. # 根据Redis缓存中的数据过滤
  380. redis_helper = RedisHelper()
  381. # key拼接
  382. key_name = f"{config_.PREVIEW_KEY_PREFIX}{self.app_type}:{self.mid}"
  383. #print("key_name:", key_name)
  384. pe_videos_list = redis_helper.get_data_from_set(key_name)
  385. #print("pe_videos_list:", pe_videos_list)
  386. # log_.info('****app_type = {}, mid = {}, uid = {}, pe_videos_list = {}'.format(
  387. # self.app_type, self.mid, self.uid, pe_videos_list))
  388. # log_.info('****app_type = {}, mid = {}, uid = {}, video_ids = {}'.format(
  389. # self.app_type, self.mid, self.uid, video_ids))
  390. if not pe_videos_list:
  391. return video_ids
  392. pe_videos = [int(video) for video in pe_videos_list]
  393. #print("pe_videos:", len(pe_videos))
  394. filtered_videos = [video_id for video_id in video_ids if video_id not in pe_videos]
  395. #print(f"pre res: {filtered_videos}\nexecute_time: {(time.time() - pre_time) * 1000}")
  396. return filtered_videos
  397. # def filter_video_status(self, video_ids):
  398. # """
  399. # 对视频状态进行过滤
  400. # :param video_ids: 视频id列表 type-list
  401. # :return: filtered_videos
  402. # """
  403. # if len(video_ids) == 1:
  404. # sql = "set hg_experimental_enable_shard_pruning=off; " \
  405. # "SELECT video_id " \
  406. # "FROM {} " \
  407. # "WHERE audit_status = 5 " \
  408. # "AND applet_rec_status IN (1, -6) " \
  409. # "AND open_status = 1 " \
  410. # "AND payment_status = 0 " \
  411. # "AND encryption_status != 5 " \
  412. # "AND transcoding_status = 3 " \
  413. # "AND video_id IN ({});".format(config_.VIDEO_STATUS, video_ids[0])
  414. # else:
  415. # sql = "set hg_experimental_enable_shard_pruning=off; " \
  416. # "SELECT video_id " \
  417. # "FROM {} " \
  418. # "WHERE audit_status = 5 " \
  419. # "AND applet_rec_status IN (1, -6) " \
  420. # "AND open_status = 1 " \
  421. # "AND payment_status = 0 " \
  422. # "AND encryption_status != 5 " \
  423. # "AND transcoding_status = 3 " \
  424. # "AND video_id IN {};".format(config_.VIDEO_STATUS, tuple(video_ids))
  425. #
  426. # hologres_helper = HologresHelper()
  427. # data = hologres_helper.get_data(sql=sql)
  428. # filtered_videos = [int(temp[0]) for temp in data]
  429. # return filtered_videos
  430. def filter_video_viewed(self, video_ids, types=(1, 6,)):
  431. """
  432. 调用后端接口过滤用户已观看视频
  433. :param video_ids: 视频id列表 type-list
  434. :param types: 过滤参数 type-tuple, 默认(1, )
  435. 1-已观看 2-视频状态 3-是否进入老年人社区 4-话题状态 5-推荐状态 6-白名单过滤 7-涉政视频过滤
  436. :return: filtered_videos
  437. """
  438. # 获取对应端的过滤参数types
  439. types = config_.FILTER_VIEWED_TYPES_CONFIG.get(self.app_type, None)
  440. if types is None:
  441. types = config_.FILTER_VIEWED_TYPES_CONFIG.get('other')
  442. request_data = {"appType": self.app_type,
  443. "mid": self.mid,
  444. "uid": self.uid,
  445. "types": list(types),
  446. "videoIds": video_ids}
  447. # print(request_data)
  448. # 调用http接口
  449. result = request_post(request_url=config_.VIDEO_FILTER_URL, request_data=request_data, timeout=(0.1, 1))
  450. # print("result:", result)
  451. if result is None:
  452. # print("result is None")
  453. # log_.info('过滤失败,types: {}'.format(types))
  454. return []
  455. if result['code'] != 0:
  456. # log_.info('过滤失败,types: {}'.format(types))
  457. return []
  458. filtered_videos = result['data']
  459. return filtered_videos
  460. def filter_video_viewed_new(self, video_ids):
  461. """
  462. 调用后端接口过滤用户已观看视频
  463. :param video_ids: 视频id列表 type-list
  464. :param types: 过滤参数 type-tuple, 默认(1, )
  465. 1-已观看 2-视频状态 3-是否进入老年人社区 4-话题状态 5-推荐状态 6-白名单过滤 7-涉政视频过滤
  466. :return: filtered_videos
  467. """
  468. # 获取对应端的过滤参数types
  469. st_time = time.time()
  470. types = config_.FILTER_VIEWED_TYPES_CONFIG.get(self.app_type, None)
  471. #print(types)
  472. if types is None:
  473. types = config_.FILTER_VIEWED_TYPES_CONFIG.get('other')
  474. if 6 in types:
  475. types = list(types)
  476. types.remove(6)
  477. #print(types)
  478. request_data = {"appType": self.app_type,
  479. "mid": self.mid,
  480. "uid": self.uid,
  481. "types": list(types),
  482. "videoIds": video_ids}
  483. # 调用http接口
  484. result = request_post(request_url=config_.VIDEO_FILTER_URL, request_data=request_data, timeout=(0.1, 1))
  485. #print(f"view res: {result}\nexecute_time: {(time.time() - st_time) * 1000}")
  486. if result is None:
  487. # log_.info('过滤失败,types: {}'.format(types))
  488. return []
  489. if result['code'] != 0:
  490. # log_.info('过滤失败,types: {}'.format(types))
  491. return []
  492. filtered_videos = result['data']
  493. return filtered_videos
  494. def filter_shield_video(self, video_ids, shield_key_name_list):
  495. """
  496. 过滤屏蔽视频视频
  497. :param video_ids: 需过滤的视频列表 type-list
  498. :param shield_key_name_list: 过滤视频 redis-key
  499. :return: filtered_videos 过滤后的列表 type-list
  500. """
  501. # print("filter_shield_video:", len(filter_shield_video))
  502. if len(video_ids) == 0:
  503. return video_ids
  504. # 根据Redis缓存中的数据过滤
  505. redis_helper = RedisHelper()
  506. for shield_key_name in shield_key_name_list:
  507. video_ids = [
  508. int(video_id) for video_id in video_ids
  509. if not redis_helper.data_exists_with_set(key_name=shield_key_name, value=video_id)
  510. ]
  511. # shield_videos_list = redis_helper.get_data_from_set(key_name=shield_key_name)
  512. # if not shield_videos_list:
  513. # continue
  514. # shield_videos = [int(video) for video in shield_videos_list]
  515. # video_ids = [int(video_id) for video_id in video_ids if int(video_id) not in shield_videos]
  516. # print("video_ids:", len(video_ids))
  517. return video_ids
  518. def new_filter_video(self):
  519. """视频过滤"""
  520. # 1. 预曝光过滤
  521. st_pre = time.time()
  522. #print("new_filter video_ids:", self.video_ids)
  523. filtered_pre_result = self.filter_video_previewed(self.video_ids)
  524. if not filtered_pre_result:
  525. return None
  526. # log_.info({
  527. # 'logTimestamp': int(time.time() * 1000),
  528. # 'request_id': self.request_id,
  529. # 'app_type': self.app_type,
  530. # 'mid': self.mid,
  531. # 'uid': self.uid,
  532. # 'operation': 'preview_filter',
  533. # 'request_videos': self.video_ids,
  534. # 'preview_filter_result': filtered_pre_result,
  535. # 'executeTime': (time.time() - st_pre) * 1000
  536. # })
  537. #2. 视频已曝光过滤
  538. st_viewed = time.time()
  539. #print("---filtered viewed---")
  540. #print("filtered_pre_result:",filtered_pre_result)
  541. filtered_viewed_result = self.filter_video_viewed(video_ids=filtered_pre_result)
  542. if not filtered_viewed_result:
  543. return None
  544. return filtered_viewed_result
  545. def new_flow_video(self, vid_list, flow_vids_set, region_code, shield_config):
  546. flow_video_list = []
  547. normal_video_list = []
  548. for v_id in vid_list:
  549. if v_id in flow_vids_set:
  550. flow_video_list.append(v_id)
  551. else:
  552. normal_video_list.append(v_id)
  553. shield_key_name_list = shield_config.get(region_code, None)
  554. if shield_key_name_list is not None:
  555. filtered_shield_video_ids = self.filter_shield_video(
  556. video_ids=flow_video_list, shield_key_name_list=shield_key_name_list
  557. )
  558. return normal_video_list, filtered_shield_video_ids
  559. else:
  560. return normal_video_list, flow_video_list
  561. def filter_movie_religion_video(self, video_ids):
  562. """过滤白名单视频(影视,宗教)"""
  563. # 影视 + 宗教: rov.filter.movie.{videoId}
  564. # 宗教: rov.filter.religion.{videoId}
  565. st_time = time.time()
  566. if self.app_type not in [config_.APP_TYPE['WAN_NENG_VIDEO'],
  567. config_.APP_TYPE['LAO_HAO_KAN_VIDEO'],
  568. config_.APP_TYPE['ZUI_JING_QI'],
  569. config_.APP_TYPE['H5']]:
  570. # 过滤 影视 + 宗教
  571. keys = [f"rov.filter.movie.{video_id}" for video_id in video_ids]
  572. elif self.app_type in [config_.APP_TYPE['WAN_NENG_VIDEO'],
  573. config_.APP_TYPE['ZUI_JING_QI'],
  574. config_.APP_TYPE['H5']]:
  575. # 过滤 影视 + 宗教
  576. keys = [f"rov.filter.religion.{video_id}" for video_id in video_ids]
  577. else:
  578. #print(f"m_r res: {video_ids}\nexecute_time: {(time.time() - st_time) * 1000}")
  579. return video_ids
  580. redis_helper = RedisHelper(redis_info=config_.REDIS_INFO_FILTER)
  581. filter_videos = []
  582. for i in range(len(keys)//1000 + 1):
  583. video_ids_temp = video_ids[i*1000:(i+1)*1000]
  584. if len(video_ids_temp) == 0:
  585. break
  586. mget_res = redis_helper.mget(keys=keys[i*1000:(i+1)*1000])
  587. filter_videos.extend([int(data) for data in mget_res if data is not None])
  588. if len(filter_videos) > 0:
  589. filtered_videos = set(video_ids) - set(filter_videos)
  590. #print(f"m_r res: {list(filtered_videos)}\nexecute_time: {(time.time() - st_time) * 1000}")
  591. return list(filtered_videos)
  592. else:
  593. #print(f"m_r res: {video_ids}\nexecute_time: {(time.time() - st_time) * 1000}")
  594. return video_ids
  595. def filter_videos_new(self, region_code=None, shield_config=None, flow_set=None):
  596. """视频过滤"""
  597. # 预曝光过滤
  598. st_pre = time.time()
  599. #print("self.video_ids:", len(self.video_ids))
  600. filtered_pre_result = self.filter_video_previewed(self.video_ids)
  601. if not filtered_pre_result:
  602. return None
  603. #print("filtered_pre_result:", len(filtered_pre_result))
  604. #print(filtered_pre_result)
  605. # 视频已曝光过滤/白名单过滤
  606. st_viewed = time.time()
  607. t = [
  608. gevent.spawn(self.filter_video_viewed_new, filtered_pre_result),
  609. gevent.spawn(self.filter_movie_religion_video, filtered_pre_result)]
  610. gevent.joinall(t)
  611. filtered_result_list = [i.get() for i in t]
  612. #print("filtered_result_list1:",filtered_result_list[0])
  613. #print("filtered_result_list2:",filtered_result_list[1])
  614. filtered_viewed_set = set('')
  615. for i in filtered_result_list[0]:
  616. filtered_viewed_set.add(int(i))
  617. filter_video_set =set('')
  618. for j in filtered_result_list[1]:
  619. filter_video_set.add(int(j))
  620. filtered_viewed_result = list(filtered_viewed_set & filter_video_set)
  621. #print(f"view&m_r res: {filtered_viewed_result}\nexecute_time: {(time.time() - st_viewed) * 1000}")
  622. #print("filtered:",len(filtered_viewed_result))
  623. if not filtered_viewed_result:
  624. return None
  625. filtered_viewed_videos = [int(video_id) for video_id in filtered_viewed_result]
  626. #print("result:", filtered_viewed_videos)
  627. if flow_set is None:
  628. return filtered_viewed_videos
  629. else:
  630. # 流量池视频需过滤屏蔽视频
  631. if region_code is None or shield_config is None:
  632. return filtered_viewed_videos
  633. else:
  634. normal_recall_ids = []
  635. left_flow_ids = []
  636. for vid in filtered_viewed_videos:
  637. if vid in flow_set:
  638. left_flow_ids.append(vid)
  639. else:
  640. normal_recall_ids.append(vid)
  641. shield_key_name_list = shield_config.get(region_code, None)
  642. if shield_key_name_list is not None:
  643. filtered_shield_video_ids = self.filter_shield_video(
  644. video_ids=left_flow_ids, shield_key_name_list=shield_key_name_list
  645. )
  646. return normal_recall_ids+filtered_shield_video_ids
  647. else:
  648. return filtered_viewed_videos
  649. def filter_videos_status(self, pool_type='rov', region_code=None, shield_config=None):
  650. """视频过滤"""
  651. # 预曝光过滤
  652. st_pre = time.time()
  653. filtered_pre_result = self.filter_video_previewed(self.video_ids)
  654. # print("filtered_pre:", (time.time()-st_pre)*1000)
  655. # et_pre = time.time()
  656. # log_.info({
  657. # 'logTimestamp': int(time.time() * 1000),
  658. # 'request_id': self.request_id,
  659. # 'app_type': self.app_type,
  660. # 'mid': self.mid,
  661. # 'uid': self.uid,
  662. # 'operation': 'preview_filter',
  663. # 'request_videos': self.video_ids,
  664. # 'preview_filter_result': filtered_pre_result,
  665. # 'executeTime': (time.time() - st_pre) * 1000
  666. # })
  667. if not filtered_pre_result:
  668. return None
  669. # 视频状态过滤采用离线定时过滤方案
  670. # 视频状态过滤
  671. # st_status = time.time()
  672. # filtered_status_result = self.filter_video_status(video_ids=filtered_pre_result)
  673. # et_status = time.time()
  674. # log_.info('filter by video status: result = {}, execute time = {}ms'.format(
  675. # filtered_status_result, (et_status - st_status) * 1000))
  676. # if not filtered_status_result:
  677. # return None
  678. # 视频已曝光过滤
  679. st_viewed = time.time()
  680. filtered_viewed_result = self.filter_video_viewed_status(video_ids=filtered_pre_result)
  681. # print("filtered_pre:", (time.time() - st_viewed) * 1000)
  682. # et_viewed = time.time()
  683. # log_.info({
  684. # 'logTimestamp': int(time.time() * 1000),
  685. # 'pool_type': pool_type,
  686. # 'request_id': self.request_id,
  687. # 'app_type': self.app_type,
  688. # 'mid': self.mid,
  689. # 'uid': self.uid,
  690. # 'operation': 'view_filter',
  691. # 'request_videos': filtered_pre_result,
  692. # 'view_filter_result': filtered_viewed_result,
  693. # 'executeTime': (time.time() - st_viewed) * 1000
  694. # })
  695. if not filtered_viewed_result:
  696. return None
  697. filtered_viewed_videos = [int(video_id) for video_id in filtered_viewed_result]
  698. if pool_type == 'flow' or pool_type=='normal':
  699. # 流量池视频需过滤屏蔽视频
  700. if region_code is None or shield_config is None:
  701. return filtered_viewed_videos
  702. else:
  703. shield_key_name_list = shield_config.get(region_code, None)
  704. if shield_key_name_list is not None:
  705. filtered_shield_video_ids = self.filter_shield_video(
  706. video_ids=filtered_viewed_videos, shield_key_name_list=shield_key_name_list
  707. )
  708. log_.info({
  709. 'logTimestamp': int(time.time() * 1000),
  710. 'pool_type': pool_type,
  711. 'request_id': self.request_id,
  712. 'app_type': self.app_type,
  713. 'mid': self.mid,
  714. 'uid': self.uid,
  715. 'operation': 'shield_filter',
  716. 'request_videos': filtered_viewed_videos,
  717. 'shield_filter_result': filtered_shield_video_ids,
  718. 'executeTime': (time.time() - st_viewed) * 1000
  719. })
  720. # print("filtered_pre flow:", (time.time() - st_viewed) * 1000)
  721. return filtered_shield_video_ids
  722. else:
  723. return filtered_viewed_videos
  724. else:
  725. return filtered_viewed_videos
  726. def filter_video_viewed_status(self, video_ids, types=(1, 6,)):
  727. """
  728. 调用后端接口过滤用户已观看视频
  729. :param video_ids: 视频id列表 type-list
  730. :param types: 过滤参数 type-tuple, 默认(1, )
  731. 1-已观看 2-视频状态 3-是否进入老年人社区 4-话题状态 5-推荐状态 6-白名单过滤 7-涉政视频过滤
  732. :return: filtered_videos
  733. """
  734. # 获取对应端的过滤参数types
  735. types = config_.FILTER_VIEWED_TYPES_CONFIG.get(self.app_type, None)
  736. if types is None:
  737. types = config_.FILTER_VIEWED_TYPES_CONFIG.get('other')
  738. types = list(types)
  739. types.append(2)
  740. request_data = {"appType": self.app_type,
  741. "mid": self.mid,
  742. "uid": self.uid,
  743. "types": types,
  744. "videoIds": video_ids}
  745. # print(request_data)
  746. # 调用http接口
  747. result = request_post(request_url=config_.VIDEO_FILTER_URL, request_data=request_data, timeout=(0.1, 1))
  748. # print("result:", result)
  749. if result is None:
  750. # print("result is None")
  751. # log_.info('过滤失败,types: {}'.format(types))
  752. return []
  753. if result['code'] != 0:
  754. # log_.info('过滤失败,types: {}'.format(types))
  755. return []
  756. filtered_videos = result['data']
  757. return filtered_videos
  758. if __name__ == '__main__':
  759. user = [
  760. ('weixin_openid_o0w175fDc8pNnywrYN49E341tKfI', ''),
  761. ('weixin_openid_o0w175YwC3hStzcR5DAQdbgzdMeI', ''),
  762. ('weixin_openid_o0w175ftZDl6VJVDx9la3WVPh7mU', '15900461'),
  763. ('weixin_openid_o0w175SPqpCVRcp7x1XvnX4qpIvI', '19659040'),
  764. ('weixin_openid_o0w175cOnguapyWIrDrHkOWl4oFQ', '31210128'),
  765. ('weixin_openid_o0w175UXYId-o71e1Q3SOheYNteQ', '33099722'),
  766. ('weixin_openid_o0w175QQ5b42AtOe50bchrFgcttA', ''),
  767. ('weixin_openid_o0w175bgaPlfLsp3YLDKWqLWtXX8', '35371534'),
  768. ('weixin_openid_o0w175eRpvbmV6nOhM1VTyyLICWA', '30488803'),
  769. ('weixin_openid_o0w175TZYvG47pQkOjyJFoxQuqsw', '')
  770. ]
  771. video_df = pd.read_csv('./data/videoids.csv')
  772. videoid_list = video_df['videoid'].tolist()
  773. for mid, uid in user:
  774. video_ids = random.sample(videoid_list, 1000)
  775. start_time = time.time()
  776. filter_ = FilterVideos(request_id=f'{mid} - {uid}', app_type=0, mid=mid, uid=uid, video_ids=video_ids)
  777. res = filter_.filter_videos_new()
  778. print(f"res: {res}\nexecute_time: {(time.time() - start_time) * 1000}")
  779. # filter_.filter_video_status(video_ids=[1, 3, 5])
  780. # videos = [{'videoId': 9034659, 'flowPool': '3#11#3#1637824188547'}, {'videoId': 9035052, 'flowPool': '3#11#3#1637824172827'}]
  781. # res = get_videos_remain_view_count(4, videos)
  782. # print(res)
  783. # text = '测试 @李倩'
  784. # send_msg_to_feishu(text)
  785. # update_video_w_h_rate(video_id=113, key_name='')
  786. # mid = "weixin_openid_obHDW5c4g3aULfCWh-68LcUSxCB"
  787. # request_url = f"{config_.GET_USER_30DayReturnCnt_URL}{mid}"
  788. # res = request_get(request_url=request_url, timeout=100)
  789. # res = get_user_has30day_return(mid=mid)
  790. # print(res, type(res))