utils.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440
  1. import traceback
  2. import requests
  3. import json
  4. import time
  5. from datetime import datetime
  6. # from db_helper import HologresHelper, RedisHelper, MysqlHelper
  7. from db_helper import RedisHelper, MysqlHelper
  8. from config import set_config
  9. from log import Log
  10. config_ = set_config()
  11. log_ = Log()
  12. def send_msg_to_feishu(msg_text):
  13. """发送消息到飞书"""
  14. # webhook地址
  15. webhook = 'https://open.feishu.cn/open-apis/bot/v2/hook/8de4de35-30ed-4692-8854-7a154e89b2f2'
  16. # 自定义关键词key_word
  17. key_word = '服务报警'
  18. headers = {'Content-Type': 'application/json'}
  19. payload_message = {
  20. "msg_type": "text",
  21. "content": {
  22. "text": '{}: {}'.format(key_word, msg_text)
  23. }
  24. }
  25. response = requests.request('POST', url=webhook, headers=headers, data=json.dumps(payload_message))
  26. # print(response.text)
  27. def request_post(request_url, request_data, timeout):
  28. """
  29. post 请求 HTTP接口
  30. :param request_url: 接口URL
  31. :param request_data: 请求参数
  32. :param timeout: 超时时间,单位为秒,type-float or tuple(connect_timeout, read_timeout)
  33. :return: res_data json格式
  34. """
  35. try:
  36. headers = {"Connection": "close"}
  37. response = requests.post(url=request_url, json=request_data, timeout=timeout, headers=headers)
  38. if response.status_code == 200:
  39. res_data = json.loads(response.text)
  40. return res_data
  41. else:
  42. return None
  43. except Exception as e:
  44. log_.error('url: {}, exception: {}, traceback: {}'.format(request_url, e, traceback.format_exc()))
  45. return None
  46. def get_videos_remain_view_count(app_type, videos):
  47. """
  48. 获取视频在流量池中的剩余可分发数
  49. :param app_type: 产品标识 type-int
  50. :param videos: 视频信息 (视频id, 流量池标记) type-list,[{'videoId': video_id, 'flowPool': flow_pool}, ...]
  51. :return: data type-list,[(video_id, flow_pool, view_count), ...]
  52. error_flag 错误标记,True为错误
  53. """
  54. error_flag = False
  55. if not videos:
  56. return [], error_flag
  57. request_data = {'appType': app_type, 'videos': videos}
  58. result = request_post(request_url=config_.GET_REMAIN_VIEW_COUNT_URL, request_data=request_data, timeout=(0.1, 1))
  59. if result is None:
  60. error_flag = True
  61. return [], error_flag
  62. if result['code'] != 0:
  63. log_.info('获取视频在流量池中的剩余可分发数失败')
  64. error_flag = True
  65. return [], error_flag
  66. data = [(item['videoId'], item['flowPool'], item['distributeCount']) for item in result['data']]
  67. return data, error_flag
  68. def get_videos_local_distribute_count(video_id, flow_pool):
  69. """
  70. 获取流量池视频本地分发数
  71. :param video_id: video_id
  72. :param flow_pool: 流量池标记
  73. :return: current_count 本地记录的分发数
  74. """
  75. # redis_h = datetime.now().hour
  76. # if datetime.now().minute >= 30:
  77. # redis_h += 0.5
  78. # key_name = config_.LOCAL_DISTRIBUTE_COUNT_PREFIX + str(redis_h)
  79. key_name = f'{config_.LOCAL_DISTRIBUTE_COUNT_PREFIX}{video_id}:{flow_pool}'
  80. redis_helper = RedisHelper()
  81. # video = '{}-{}'.format(video_id, flow_pool)
  82. # current_count = redis_helper.get_score_with_value(key_name=key_name, value=video)
  83. current_count = redis_helper.get_data_from_redis(key_name=key_name)
  84. if current_count is not None:
  85. return int(current_count)
  86. else:
  87. return None
  88. def update_video_w_h_rate(video_id, key_name):
  89. """
  90. 获取横屏视频的宽高比,并存入redis中 (width/height>1)
  91. :param video_id: videoId type-int
  92. :param key_name: redis key
  93. :return: None
  94. """
  95. # 获取数据
  96. sql = "SELECT id, width, height, rotate FROM longvideo.wx_video WHERE id = {};".format(video_id)
  97. mysql_helper = MysqlHelper()
  98. data = mysql_helper.get_data(sql=sql)
  99. if len(data) == 0:
  100. return
  101. # 更新到redis
  102. width, height, rotate = int(data[0][1]), int(data[0][2]), int(data[0][3])
  103. if width == 0 or height == 0:
  104. return
  105. if rotate in (90, 270):
  106. w_h_rate = height / width
  107. else:
  108. w_h_rate = width / height
  109. if w_h_rate > 1:
  110. info_data = {int(video_id): w_h_rate}
  111. else:
  112. return
  113. redis_helper = RedisHelper()
  114. # 写入新数据
  115. if len(info_data) > 0:
  116. redis_helper.add_data_with_zset(key_name=key_name, data=info_data)
  117. class FilterVideos(object):
  118. """视频过滤"""
  119. def __init__(self, request_id, app_type, video_ids, mid='', uid=''):
  120. """
  121. 初始化
  122. :param request_id: request_id
  123. :param app_type: 产品标识 type-int
  124. :param video_ids: 需过滤的视频列表 type-list
  125. :param mid: mid type-string
  126. :param uid: uid type-string
  127. """
  128. self.request_id = request_id
  129. self.app_type = app_type
  130. self.mid = mid
  131. self.uid = uid
  132. self.video_ids = video_ids
  133. def filter_video_status_h(self, video_ids, rule_key, data_key, ab_code, province_code, key_flag=''):
  134. """召回小时级更新的视频状态过滤"""
  135. # 根据Redis缓存中的数据过滤
  136. redis_helper = RedisHelper()
  137. # 获取不符合推荐状态的视频
  138. if ab_code in [code for _, code in config_.AB_CODE['region_rank_by_h'].items()]:
  139. if key_flag == 'region_24h':
  140. key_prefix = f"{config_.REGION_H_VIDEO_FILER_24H}{province_code}."
  141. elif key_flag == 'day_24h':
  142. key_prefix = f"{config_.H_VIDEO_FILER_24H}{province_code}."
  143. else:
  144. key_prefix = f"{config_.REGION_H_VIDEO_FILER}{province_code}."
  145. elif ab_code in [code for _, code in config_.AB_CODE['rank_by_24h'].items()]:
  146. key_prefix = config_.H_VIDEO_FILER_24H
  147. elif key_flag == '24h':
  148. key_prefix = config_.H_VIDEO_FILER_24H
  149. else:
  150. key_prefix = config_.H_VIDEO_FILER
  151. filter_videos_list = redis_helper.get_data_from_set(
  152. key_name=f"{key_prefix}{self.app_type}.{data_key}.{rule_key}"
  153. )
  154. if not filter_videos_list:
  155. return video_ids
  156. filter_videos = [int(video) for video in filter_videos_list]
  157. filtered_videos = [video_id for video_id in video_ids if video_id not in filter_videos]
  158. return filtered_videos
  159. def filter_videos_h(self, rule_key, data_key, ab_code, province_code, key_flag='', pool_type='rov'):
  160. """召回小时级更新的视频过滤"""
  161. # 预曝光过滤
  162. # st_pre = time.time()
  163. filtered_pre_result = self.filter_video_previewed(self.video_ids)
  164. # et_pre = time.time()
  165. # log_.info({
  166. # 'logTimestamp': int(time.time() * 1000),
  167. # 'request_id': self.request_id,
  168. # 'app_type': self.app_type,
  169. # 'mid': self.mid,
  170. # 'uid': self.uid,
  171. # 'operation': 'preview_filter',
  172. # 'request_videos': self.video_ids,
  173. # 'preview_filter_result': filtered_pre_result,
  174. # 'executeTime': (time.time() - st_pre) * 1000
  175. # })
  176. if not filtered_pre_result:
  177. return None
  178. # 视频状态过滤
  179. # st_status = time.time()
  180. filtered_status_result = self.filter_video_status_h(video_ids=filtered_pre_result, rule_key=rule_key,
  181. data_key=data_key, ab_code=ab_code,
  182. province_code=province_code, key_flag=key_flag)
  183. # et_status = time.time()
  184. # log_.info({
  185. # 'logTimestamp': int(time.time() * 1000),
  186. # 'request_id': self.request_id,
  187. # 'app_type': self.app_type,
  188. # 'mid': self.mid,
  189. # 'uid': self.uid,
  190. # 'operation': 'status_filter',
  191. # 'request_videos': filtered_pre_result,
  192. # 'status_filter_result': filtered_status_result,
  193. # 'executeTime': (time.time() - st_status) * 1000
  194. # })
  195. if not filtered_status_result:
  196. return None
  197. # 视频已曝光过滤
  198. st_viewed = time.time()
  199. filtered_viewed_result = self.filter_video_viewed(video_ids=filtered_status_result)
  200. # et_viewed = time.time()
  201. log_.info({
  202. 'logTimestamp': int(time.time() * 1000),
  203. 'pool_type': pool_type,
  204. 'request_id': self.request_id,
  205. 'app_type': self.app_type,
  206. 'mid': self.mid,
  207. 'uid': self.uid,
  208. 'operation': 'view_filter',
  209. 'request_videos': filtered_status_result,
  210. 'view_filter_result': filtered_viewed_result,
  211. 'executeTime': (time.time() - st_viewed) * 1000
  212. })
  213. if not filtered_viewed_result:
  214. return None
  215. else:
  216. return [int(video_id) for video_id in filtered_viewed_result]
  217. def filter_videos(self, pool_type='rov', province_code=None):
  218. """视频过滤"""
  219. # 预曝光过滤
  220. st_pre = time.time()
  221. filtered_pre_result = self.filter_video_previewed(self.video_ids)
  222. # et_pre = time.time()
  223. # log_.info({
  224. # 'logTimestamp': int(time.time() * 1000),
  225. # 'request_id': self.request_id,
  226. # 'app_type': self.app_type,
  227. # 'mid': self.mid,
  228. # 'uid': self.uid,
  229. # 'operation': 'preview_filter',
  230. # 'request_videos': self.video_ids,
  231. # 'preview_filter_result': filtered_pre_result,
  232. # 'executeTime': (time.time() - st_pre) * 1000
  233. # })
  234. if not filtered_pre_result:
  235. return None
  236. # 视频状态过滤采用离线定时过滤方案
  237. # 视频状态过滤
  238. # st_status = time.time()
  239. # filtered_status_result = self.filter_video_status(video_ids=filtered_pre_result)
  240. # et_status = time.time()
  241. # log_.info('filter by video status: result = {}, execute time = {}ms'.format(
  242. # filtered_status_result, (et_status - st_status) * 1000))
  243. # if not filtered_status_result:
  244. # return None
  245. # 视频已曝光过滤
  246. st_viewed = time.time()
  247. filtered_viewed_result = self.filter_video_viewed(video_ids=filtered_pre_result)
  248. # et_viewed = time.time()
  249. log_.info({
  250. 'logTimestamp': int(time.time() * 1000),
  251. 'pool_type': pool_type,
  252. 'request_id': self.request_id,
  253. 'app_type': self.app_type,
  254. 'mid': self.mid,
  255. 'uid': self.uid,
  256. 'operation': 'view_filter',
  257. 'request_videos': filtered_pre_result,
  258. 'view_filter_result': filtered_viewed_result,
  259. 'executeTime': (time.time() - st_viewed) * 1000
  260. })
  261. if not filtered_viewed_result:
  262. return None
  263. filtered_viewed_videos = [int(video_id) for video_id in filtered_viewed_result]
  264. if pool_type == 'flow':
  265. # 流量池视频需过滤屏蔽视频
  266. if province_code is None:
  267. return filtered_viewed_videos
  268. else:
  269. shield_key_name_list = config_.SHIELD_CONFIG.get(province_code, None)
  270. if shield_key_name_list is not None:
  271. filtered_shield_video_ids = self.filter_shield_video(
  272. video_ids=filtered_viewed_videos, shield_key_name_list=shield_key_name_list
  273. )
  274. log_.info({
  275. 'logTimestamp': int(time.time() * 1000),
  276. 'pool_type': pool_type,
  277. 'request_id': self.request_id,
  278. 'app_type': self.app_type,
  279. 'mid': self.mid,
  280. 'uid': self.uid,
  281. 'operation': 'shield_filter',
  282. 'request_videos': filtered_viewed_videos,
  283. 'shield_filter_result': filtered_shield_video_ids,
  284. 'executeTime': (time.time() - st_viewed) * 1000
  285. })
  286. return filtered_shield_video_ids
  287. else:
  288. return filtered_viewed_videos
  289. else:
  290. return filtered_viewed_videos
  291. def filter_video_previewed(self, video_ids):
  292. """
  293. 预曝光过滤
  294. :param video_ids: 需过滤的视频列表 type-list
  295. :return: filtered_videos 过滤后的列表 type-list
  296. """
  297. if not self.mid or self.mid == 'null':
  298. # mid为空时,不做预曝光过滤
  299. return video_ids
  300. # 根据Redis缓存中的数据过滤
  301. redis_helper = RedisHelper()
  302. # key拼接
  303. key_name = f"{config_.PREVIEW_KEY_PREFIX}{self.app_type}:{self.mid}"
  304. pe_videos_list = redis_helper.get_data_from_set(key_name)
  305. # log_.info('****app_type = {}, mid = {}, uid = {}, pe_videos_list = {}'.format(
  306. # self.app_type, self.mid, self.uid, pe_videos_list))
  307. # log_.info('****app_type = {}, mid = {}, uid = {}, video_ids = {}'.format(
  308. # self.app_type, self.mid, self.uid, video_ids))
  309. if not pe_videos_list:
  310. return video_ids
  311. pe_videos = [int(video) for video in pe_videos_list]
  312. filtered_videos = [video_id for video_id in video_ids if video_id not in pe_videos]
  313. return filtered_videos
  314. # def filter_video_status(self, video_ids):
  315. # """
  316. # 对视频状态进行过滤
  317. # :param video_ids: 视频id列表 type-list
  318. # :return: filtered_videos
  319. # """
  320. # if len(video_ids) == 1:
  321. # sql = "set hg_experimental_enable_shard_pruning=off; " \
  322. # "SELECT video_id " \
  323. # "FROM {} " \
  324. # "WHERE audit_status = 5 " \
  325. # "AND applet_rec_status IN (1, -6) " \
  326. # "AND open_status = 1 " \
  327. # "AND payment_status = 0 " \
  328. # "AND encryption_status != 5 " \
  329. # "AND transcoding_status = 3 " \
  330. # "AND video_id IN ({});".format(config_.VIDEO_STATUS, video_ids[0])
  331. # else:
  332. # sql = "set hg_experimental_enable_shard_pruning=off; " \
  333. # "SELECT video_id " \
  334. # "FROM {} " \
  335. # "WHERE audit_status = 5 " \
  336. # "AND applet_rec_status IN (1, -6) " \
  337. # "AND open_status = 1 " \
  338. # "AND payment_status = 0 " \
  339. # "AND encryption_status != 5 " \
  340. # "AND transcoding_status = 3 " \
  341. # "AND video_id IN {};".format(config_.VIDEO_STATUS, tuple(video_ids))
  342. #
  343. # hologres_helper = HologresHelper()
  344. # data = hologres_helper.get_data(sql=sql)
  345. # filtered_videos = [int(temp[0]) for temp in data]
  346. # return filtered_videos
  347. def filter_video_viewed(self, video_ids, types=(1, 6,)):
  348. """
  349. 调用后端接口过滤用户已观看视频
  350. :param video_ids: 视频id列表 type-list
  351. :param types: 过滤参数 type-tuple, 默认(1, ) 1-已观看 2-视频状态 3-是否进入老年人社区 4-话题状态 5-推荐状态 6-白名单过滤
  352. :return: filtered_videos
  353. """
  354. # 调用http接口
  355. request_data = {"appType": self.app_type,
  356. "mid": self.mid,
  357. "uid": self.uid,
  358. "types": list(types),
  359. "videoIds": video_ids}
  360. result = request_post(request_url=config_.VIDEO_FILTER_URL, request_data=request_data, timeout=(0.1, 1))
  361. if result is None:
  362. # log_.info('过滤失败,types: {}'.format(types))
  363. return []
  364. if result['code'] != 0:
  365. # log_.info('过滤失败,types: {}'.format(types))
  366. return []
  367. filtered_videos = result['data']
  368. return filtered_videos
  369. def filter_shield_video(self, video_ids, shield_key_name_list):
  370. """
  371. 过滤屏蔽视频视频
  372. :param video_ids: 需过滤的视频列表 type-list
  373. :param shield_key_name_list: 过滤视频 redis-key
  374. :return: filtered_videos 过滤后的列表 type-list
  375. """
  376. if len(video_ids) == 0:
  377. return video_ids
  378. # 根据Redis缓存中的数据过滤
  379. redis_helper = RedisHelper()
  380. for shield_key_name in shield_key_name_list:
  381. shield_videos_list = redis_helper.get_data_from_set(key_name=shield_key_name)
  382. if not shield_videos_list:
  383. continue
  384. shield_videos = [int(video) for video in shield_videos_list]
  385. video_ids = [int(video_id) for video_id in video_ids if int(video_id) not in shield_videos]
  386. return video_ids
  387. if __name__ == '__main__':
  388. # filter_ = FilterVideos(app_type=1, mid='22', uid='www', video_ids=[1, 2, 3, 55])
  389. # filter_.filter_videos()
  390. # filter_.filter_video_status(video_ids=[1, 3, 5])
  391. # videos = [{'videoId': 9034659, 'flowPool': '3#11#3#1637824188547'}, {'videoId': 9035052, 'flowPool': '3#11#3#1637824172827'}]
  392. # res = get_videos_remain_view_count(4, videos)
  393. # print(res)
  394. # text = '测试 @李倩'
  395. # send_msg_to_feishu(text)
  396. # update_video_w_h_rate(video_id=113, key_name='')
  397. request_url = "http://videotest-internal.yishihui.com/longvideoapi/dev/test/testTimeout"
  398. headers = {"Connection": "close"}
  399. response = requests.post(url=request_url, json={}, timeout=(0.1, 1), headers=headers)
  400. print(response.status_code, response.text)