utils.py 19 KB


  1. import traceback
  2. import requests
  3. import json
  4. import time
  5. from datetime import datetime
  6. # from db_helper import HologresHelper, RedisHelper, MysqlHelper
  7. from db_helper import RedisHelper, MysqlHelper
  8. from config import set_config
  9. from log import Log
  10. config_ = set_config()
  11. log_ = Log()
  12. def send_msg_to_feishu(msg_text):
  13. """发送消息到飞书"""
  14. # webhook地址
  15. webhook = 'https://open.feishu.cn/open-apis/bot/v2/hook/8de4de35-30ed-4692-8854-7a154e89b2f2'
  16. # 自定义关键词key_word
  17. key_word = '服务报警'
  18. headers = {'Content-Type': 'application/json'}
  19. payload_message = {
  20. "msg_type": "text",
  21. "content": {
  22. "text": '{}: {}'.format(key_word, msg_text)
  23. }
  24. }
  25. response = requests.request('POST', url=webhook, headers=headers, data=json.dumps(payload_message))
  26. # print(response.text)
  27. def request_post(request_url, request_data, timeout):
  28. """
  29. post 请求 HTTP接口
  30. :param request_url: 接口URL
  31. :param request_data: 请求参数
  32. :param timeout: 超时时间,单位为秒,type-float or tuple(connect_timeout, read_timeout)
  33. :return: res_data json格式
  34. """
  35. try:
  36. headers = {"Connection": "close"}
  37. response = requests.post(url=request_url, json=request_data, timeout=timeout, headers=headers)
  38. if response.status_code == 200:
  39. res_data = json.loads(response.text)
  40. return res_data
  41. else:
  42. return None
  43. except Exception as e:
  44. log_.error('url: {}, exception: {}, traceback: {}'.format(request_url, e, traceback.format_exc()))
  45. return None
  46. def request_get(request_url, timeout):
  47. """
  48. get 请求 HTTP接口
  49. :param request_url: 接口URL
  50. :param timeout: 超时时间,单位为秒,type-float or tuple(connect_timeout, read_timeout)
  51. :return: res_data json格式
  52. """
  53. try:
  54. response = requests.get(url=request_url, timeout=timeout)
  55. if response.status_code == 200:
  56. res_data = json.loads(response.text)
  57. return res_data
  58. else:
  59. return None
  60. except Exception as e:
  61. log_.error('url: {}, exception: {}, traceback: {}'.format(request_url, e, traceback.format_exc()))
  62. return None
  63. def get_user_has30day_return(mid):
  64. """
  65. 获取用户近30天是否有回流
  66. :param mid: mid
  67. :return: data, type
  68. """
  69. if not mid:
  70. return None
  71. # 获取redis中存储的状态值
  72. user_key = f"{config_.KEY_NAME_PREFIX_USER_HAS30DAY_RETURN}{mid}"
  73. redis_helper = RedisHelper()
  74. data = redis_helper.get_data_from_redis(key_name=user_key)
  75. if data is not None:
  76. return int(data)
  77. else:
  78. request_url = f"{config_.GET_USER_30DayReturnCnt_URL}{mid}"
  79. result = request_get(request_url=request_url, timeout=0.1)
  80. if result is None:
  81. return None
  82. if result['code'] != 0:
  83. return None
  84. data = result['data']
  85. if data is True:
  86. redis_data = 1
  87. else:
  88. redis_data = 0
  89. redis_helper.set_data_to_redis(key_name=user_key, value=redis_data, expire_time=2 * 3600)
  90. return redis_data
  91. def get_videos_remain_view_count(app_type, videos):
  92. """
  93. 获取视频在流量池中的剩余可分发数
  94. :param app_type: 产品标识 type-int
  95. :param videos: 视频信息 (视频id, 流量池标记) type-list,[{'videoId': video_id, 'flowPool': flow_pool}, ...]
  96. :return: data type-list,[(video_id, flow_pool, view_count), ...]
  97. error_flag 错误标记,True为错误
  98. """
  99. error_flag = False
  100. if not videos:
  101. return [], error_flag
  102. request_data = {'appType': app_type, 'videos': videos}
  103. result = request_post(request_url=config_.GET_REMAIN_VIEW_COUNT_URL, request_data=request_data, timeout=(0.1, 1))
  104. if result is None:
  105. error_flag = True
  106. return [], error_flag
  107. if result['code'] != 0:
  108. log_.info('获取视频在流量池中的剩余可分发数失败')
  109. error_flag = True
  110. return [], error_flag
  111. data = [(item['videoId'], item['flowPool'], item['distributeCount']) for item in result['data']]
  112. return data, error_flag
  113. def get_videos_local_distribute_count(video_id, flow_pool):
  114. """
  115. 获取流量池视频本地分发数
  116. :param video_id: video_id
  117. :param flow_pool: 流量池标记
  118. :return: current_count 本地记录的分发数
  119. """
  120. # redis_h = datetime.now().hour
  121. # if datetime.now().minute >= 30:
  122. # redis_h += 0.5
  123. # key_name = config_.LOCAL_DISTRIBUTE_COUNT_PREFIX + str(redis_h)
  124. key_name = f'{config_.LOCAL_DISTRIBUTE_COUNT_PREFIX}{video_id}:{flow_pool}'
  125. redis_helper = RedisHelper()
  126. # video = '{}-{}'.format(video_id, flow_pool)
  127. # current_count = redis_helper.get_score_with_value(key_name=key_name, value=video)
  128. current_count = redis_helper.get_data_from_redis(key_name=key_name)
  129. if current_count is not None:
  130. return int(current_count)
  131. else:
  132. return None
  133. def update_video_w_h_rate(video_id, key_name):
  134. """
  135. 获取横屏视频的宽高比,并存入redis中 (width/height>1)
  136. :param video_id: videoId type-int
  137. :param key_name: redis key
  138. :return: None
  139. """
  140. # 获取数据
  141. sql = "SELECT id, width, height, rotate FROM longvideo.wx_video WHERE id = {};".format(video_id)
  142. mysql_helper = MysqlHelper()
  143. data = mysql_helper.get_data(sql=sql)
  144. if len(data) == 0:
  145. return
  146. # 更新到redis
  147. width, height, rotate = int(data[0][1]), int(data[0][2]), int(data[0][3])
  148. if width == 0 or height == 0:
  149. return
  150. if rotate in (90, 270):
  151. w_h_rate = height / width
  152. else:
  153. w_h_rate = width / height
  154. if w_h_rate > 1:
  155. info_data = {int(video_id): w_h_rate}
  156. else:
  157. return
  158. redis_helper = RedisHelper()
  159. # 写入新数据
  160. if len(info_data) > 0:
  161. redis_helper.add_data_with_zset(key_name=key_name, data=info_data)
  162. class FilterVideos(object):
  163. """视频过滤"""
  164. def __init__(self, request_id, app_type, video_ids, mid='', uid=''):
  165. """
  166. 初始化
  167. :param request_id: request_id
  168. :param app_type: 产品标识 type-int
  169. :param video_ids: 需过滤的视频列表 type-list
  170. :param mid: mid type-string
  171. :param uid: uid type-string
  172. """
  173. self.request_id = request_id
  174. self.app_type = app_type
  175. self.mid = mid
  176. self.uid = uid
  177. self.video_ids = video_ids
  178. def filter_video_status_h(self, video_ids, rule_key, data_key, ab_code, province_code, key_flag=''):
  179. """召回小时级更新的视频状态过滤"""
  180. # 根据Redis缓存中的数据过滤
  181. redis_helper = RedisHelper()
  182. # 获取不符合推荐状态的视频
  183. if ab_code in [code for _, code in config_.AB_CODE['region_rank_by_h'].items()]:
  184. if key_flag == 'region_24h':
  185. key_prefix = f"{config_.REGION_H_VIDEO_FILER_24H}{province_code}."
  186. elif key_flag == 'day_24h':
  187. key_prefix = f"{config_.H_VIDEO_FILER_24H}{province_code}."
  188. else:
  189. key_prefix = f"{config_.REGION_H_VIDEO_FILER}{province_code}."
  190. elif ab_code in [code for _, code in config_.AB_CODE['rank_by_24h'].items()]:
  191. key_prefix = config_.H_VIDEO_FILER_24H
  192. elif key_flag == '24h':
  193. key_prefix = config_.H_VIDEO_FILER_24H
  194. else:
  195. key_prefix = config_.H_VIDEO_FILER
  196. filter_videos_list = redis_helper.get_data_from_set(
  197. key_name=f"{key_prefix}{self.app_type}.{data_key}.{rule_key}"
  198. )
  199. if not filter_videos_list:
  200. return video_ids
  201. filter_videos = [int(video) for video in filter_videos_list]
  202. filtered_videos = [video_id for video_id in video_ids if video_id not in filter_videos]
  203. return filtered_videos
  204. def filter_videos_h(self, rule_key, data_key, ab_code, province_code, key_flag='', pool_type='rov'):
  205. """召回小时级更新的视频过滤"""
  206. # 预曝光过滤
  207. # st_pre = time.time()
  208. filtered_pre_result = self.filter_video_previewed(self.video_ids)
  209. # et_pre = time.time()
  210. # log_.info({
  211. # 'logTimestamp': int(time.time() * 1000),
  212. # 'request_id': self.request_id,
  213. # 'app_type': self.app_type,
  214. # 'mid': self.mid,
  215. # 'uid': self.uid,
  216. # 'operation': 'preview_filter',
  217. # 'request_videos': self.video_ids,
  218. # 'preview_filter_result': filtered_pre_result,
  219. # 'executeTime': (time.time() - st_pre) * 1000
  220. # })
  221. if not filtered_pre_result:
  222. return None
  223. # 视频状态过滤
  224. # st_status = time.time()
  225. filtered_status_result = self.filter_video_status_h(video_ids=filtered_pre_result, rule_key=rule_key,
  226. data_key=data_key, ab_code=ab_code,
  227. province_code=province_code, key_flag=key_flag)
  228. # et_status = time.time()
  229. # log_.info({
  230. # 'logTimestamp': int(time.time() * 1000),
  231. # 'request_id': self.request_id,
  232. # 'app_type': self.app_type,
  233. # 'mid': self.mid,
  234. # 'uid': self.uid,
  235. # 'operation': 'status_filter',
  236. # 'request_videos': filtered_pre_result,
  237. # 'status_filter_result': filtered_status_result,
  238. # 'executeTime': (time.time() - st_status) * 1000
  239. # })
  240. if not filtered_status_result:
  241. return None
  242. # 视频已曝光过滤
  243. st_viewed = time.time()
  244. filtered_viewed_result = self.filter_video_viewed(video_ids=filtered_status_result)
  245. # et_viewed = time.time()
  246. log_.info({
  247. 'logTimestamp': int(time.time() * 1000),
  248. 'pool_type': pool_type,
  249. 'request_id': self.request_id,
  250. 'app_type': self.app_type,
  251. 'mid': self.mid,
  252. 'uid': self.uid,
  253. 'operation': 'view_filter',
  254. 'request_videos': filtered_status_result,
  255. 'view_filter_result': filtered_viewed_result,
  256. 'executeTime': (time.time() - st_viewed) * 1000
  257. })
  258. if not filtered_viewed_result:
  259. return None
  260. else:
  261. return [int(video_id) for video_id in filtered_viewed_result]
  262. def filter_videos(self, pool_type='rov', region_code=None):
  263. """视频过滤"""
  264. # 预曝光过滤
  265. st_pre = time.time()
  266. filtered_pre_result = self.filter_video_previewed(self.video_ids)
  267. # et_pre = time.time()
  268. # log_.info({
  269. # 'logTimestamp': int(time.time() * 1000),
  270. # 'request_id': self.request_id,
  271. # 'app_type': self.app_type,
  272. # 'mid': self.mid,
  273. # 'uid': self.uid,
  274. # 'operation': 'preview_filter',
  275. # 'request_videos': self.video_ids,
  276. # 'preview_filter_result': filtered_pre_result,
  277. # 'executeTime': (time.time() - st_pre) * 1000
  278. # })
  279. if not filtered_pre_result:
  280. return None
  281. # 视频状态过滤采用离线定时过滤方案
  282. # 视频状态过滤
  283. # st_status = time.time()
  284. # filtered_status_result = self.filter_video_status(video_ids=filtered_pre_result)
  285. # et_status = time.time()
  286. # log_.info('filter by video status: result = {}, execute time = {}ms'.format(
  287. # filtered_status_result, (et_status - st_status) * 1000))
  288. # if not filtered_status_result:
  289. # return None
  290. # 视频已曝光过滤
  291. st_viewed = time.time()
  292. filtered_viewed_result = self.filter_video_viewed(video_ids=filtered_pre_result)
  293. # et_viewed = time.time()
  294. # log_.info({
  295. # 'logTimestamp': int(time.time() * 1000),
  296. # 'pool_type': pool_type,
  297. # 'request_id': self.request_id,
  298. # 'app_type': self.app_type,
  299. # 'mid': self.mid,
  300. # 'uid': self.uid,
  301. # 'operation': 'view_filter',
  302. # 'request_videos': filtered_pre_result,
  303. # 'view_filter_result': filtered_viewed_result,
  304. # 'executeTime': (time.time() - st_viewed) * 1000
  305. # })
  306. if not filtered_viewed_result:
  307. return None
  308. filtered_viewed_videos = [int(video_id) for video_id in filtered_viewed_result]
  309. if pool_type == 'flow':
  310. # 流量池视频需过滤屏蔽视频
  311. if region_code is None:
  312. return filtered_viewed_videos
  313. else:
  314. shield_key_name_list = config_.SHIELD_CONFIG.get(region_code, None)
  315. if shield_key_name_list is not None:
  316. filtered_shield_video_ids = self.filter_shield_video(
  317. video_ids=filtered_viewed_videos, shield_key_name_list=shield_key_name_list
  318. )
  319. # log_.info({
  320. # 'logTimestamp': int(time.time() * 1000),
  321. # 'pool_type': pool_type,
  322. # 'request_id': self.request_id,
  323. # 'app_type': self.app_type,
  324. # 'mid': self.mid,
  325. # 'uid': self.uid,
  326. # 'operation': 'shield_filter',
  327. # 'request_videos': filtered_viewed_videos,
  328. # 'shield_filter_result': filtered_shield_video_ids,
  329. # 'executeTime': (time.time() - st_viewed) * 1000
  330. # })
  331. return filtered_shield_video_ids
  332. else:
  333. return filtered_viewed_videos
  334. else:
  335. return filtered_viewed_videos
  336. def filter_video_previewed(self, video_ids):
  337. """
  338. 预曝光过滤
  339. :param video_ids: 需过滤的视频列表 type-list
  340. :return: filtered_videos 过滤后的列表 type-list
  341. """
  342. if not self.mid or self.mid == 'null':
  343. # mid为空时,不做预曝光过滤
  344. return video_ids
  345. # 根据Redis缓存中的数据过滤
  346. redis_helper = RedisHelper()
  347. # key拼接
  348. key_name = f"{config_.PREVIEW_KEY_PREFIX}{self.app_type}:{self.mid}"
  349. pe_videos_list = redis_helper.get_data_from_set(key_name)
  350. # log_.info('****app_type = {}, mid = {}, uid = {}, pe_videos_list = {}'.format(
  351. # self.app_type, self.mid, self.uid, pe_videos_list))
  352. # log_.info('****app_type = {}, mid = {}, uid = {}, video_ids = {}'.format(
  353. # self.app_type, self.mid, self.uid, video_ids))
  354. if not pe_videos_list:
  355. return video_ids
  356. pe_videos = [int(video) for video in pe_videos_list]
  357. filtered_videos = [video_id for video_id in video_ids if video_id not in pe_videos]
  358. return filtered_videos
  359. # def filter_video_status(self, video_ids):
  360. # """
  361. # 对视频状态进行过滤
  362. # :param video_ids: 视频id列表 type-list
  363. # :return: filtered_videos
  364. # """
  365. # if len(video_ids) == 1:
  366. # sql = "set hg_experimental_enable_shard_pruning=off; " \
  367. # "SELECT video_id " \
  368. # "FROM {} " \
  369. # "WHERE audit_status = 5 " \
  370. # "AND applet_rec_status IN (1, -6) " \
  371. # "AND open_status = 1 " \
  372. # "AND payment_status = 0 " \
  373. # "AND encryption_status != 5 " \
  374. # "AND transcoding_status = 3 " \
  375. # "AND video_id IN ({});".format(config_.VIDEO_STATUS, video_ids[0])
  376. # else:
  377. # sql = "set hg_experimental_enable_shard_pruning=off; " \
  378. # "SELECT video_id " \
  379. # "FROM {} " \
  380. # "WHERE audit_status = 5 " \
  381. # "AND applet_rec_status IN (1, -6) " \
  382. # "AND open_status = 1 " \
  383. # "AND payment_status = 0 " \
  384. # "AND encryption_status != 5 " \
  385. # "AND transcoding_status = 3 " \
  386. # "AND video_id IN {};".format(config_.VIDEO_STATUS, tuple(video_ids))
  387. #
  388. # hologres_helper = HologresHelper()
  389. # data = hologres_helper.get_data(sql=sql)
  390. # filtered_videos = [int(temp[0]) for temp in data]
  391. # return filtered_videos
  392. def filter_video_viewed(self, video_ids, types=(1, 6,)):
  393. """
  394. 调用后端接口过滤用户已观看视频
  395. :param video_ids: 视频id列表 type-list
  396. :param types: 过滤参数 type-tuple, 默认(1, ) 1-已观看 2-视频状态 3-是否进入老年人社区 4-话题状态 5-推荐状态 6-白名单过滤
  397. :return: filtered_videos
  398. """
  399. # 调用http接口
  400. if self.app_type == config_.APP_TYPE['APP']:
  401. # app不进行白名单过滤
  402. types = (1, )
  403. request_data = {"appType": self.app_type,
  404. "mid": self.mid,
  405. "uid": self.uid,
  406. "types": list(types),
  407. "videoIds": video_ids}
  408. result = request_post(request_url=config_.VIDEO_FILTER_URL, request_data=request_data, timeout=(0.1, 1))
  409. if result is None:
  410. # log_.info('过滤失败,types: {}'.format(types))
  411. return []
  412. if result['code'] != 0:
  413. # log_.info('过滤失败,types: {}'.format(types))
  414. return []
  415. filtered_videos = result['data']
  416. return filtered_videos
  417. def filter_shield_video(self, video_ids, shield_key_name_list):
  418. """
  419. 过滤屏蔽视频视频
  420. :param video_ids: 需过滤的视频列表 type-list
  421. :param shield_key_name_list: 过滤视频 redis-key
  422. :return: filtered_videos 过滤后的列表 type-list
  423. """
  424. if len(video_ids) == 0:
  425. return video_ids
  426. # 根据Redis缓存中的数据过滤
  427. redis_helper = RedisHelper()
  428. for shield_key_name in shield_key_name_list:
  429. video_ids = [
  430. int(video_id) for video_id in video_ids
  431. if not redis_helper.data_exists_with_set(key_name=shield_key_name, value=video_id)
  432. ]
  433. # shield_videos_list = redis_helper.get_data_from_set(key_name=shield_key_name)
  434. # if not shield_videos_list:
  435. # continue
  436. # shield_videos = [int(video) for video in shield_videos_list]
  437. # video_ids = [int(video_id) for video_id in video_ids if int(video_id) not in shield_videos]
  438. return video_ids
  439. if __name__ == '__main__':
  440. # filter_ = FilterVideos(app_type=1, mid='22', uid='www', video_ids=[1, 2, 3, 55])
  441. # filter_.filter_videos()
  442. # filter_.filter_video_status(video_ids=[1, 3, 5])
  443. # videos = [{'videoId': 9034659, 'flowPool': '3#11#3#1637824188547'}, {'videoId': 9035052, 'flowPool': '3#11#3#1637824172827'}]
  444. # res = get_videos_remain_view_count(4, videos)
  445. # print(res)
  446. # text = '测试 @李倩'
  447. # send_msg_to_feishu(text)
  448. # update_video_w_h_rate(video_id=113, key_name='')
  449. mid = "weixin_openid_obHDW5c4g3aULfCWh-68LcUSxCB"
  450. # request_url = f"{config_.GET_USER_30DayReturnCnt_URL}{mid}"
  451. # res = request_get(request_url=request_url, timeout=100)
  452. res = get_user_has30day_return(mid=mid)
  453. print(res, type(res))