utils.py 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208
  1. import json
  2. import time
  3. import aiohttp
  4. from datetime import datetime
  5. import aiocache
  6. from config import set_config
  7. from log import Log
  8. config_ = set_config()
  9. log_ = Log()
  10. async def request_post(url, data, timeout=1):
  11. """
  12. post 请求 HTTP接口
  13. :param request_url: 接口URL
  14. :param request_data: 请求参数
  15. :param timeout: 超时时间,默认为1秒,type-float
  16. :return: res_data json格式
  17. """
  18. try:
  19. async with aiohttp.ClientSession() as session:
  20. async with session.post(url, data=data) as res:
  21. if res.status == 200:
  22. res_data = json.loads(await res.text())
  23. print('res_data', res_data)
  24. return res_data
  25. else:
  26. return None
  27. except:
  28. log_.error('url: {} timeout, exception: {}'.format(url, e))
  29. return None
  30. async def get_videos_remain_view_count(app_type, videos):
  31. """
  32. 获取视频在流量池中的剩余可分发数
  33. :param app_type: 产品标识 type-int
  34. :param videos: 视频信息 (视频id, 流量池标记) type-list,[{'videoId': video_id, 'flowPool': flow_pool}, ...]
  35. :return: data type-list,[(video_id, flow_pool, view_count), ...]
  36. """
  37. if not videos:
  38. return []
  39. request_data = {'appType': app_type, 'videos': videos}
  40. result = await request_post(url=config_.GET_REMAIN_VIEW_COUNT_URL, data=request_data, timeout=1)
  41. if result is None:
  42. return []
  43. if result['code'] != 0:
  44. log_.info('获取视频在流量池中的剩余可分发数失败')
  45. return []
  46. data = [(item['videoId'], item['flowPool'], item['distributeCount']) for item in result['data']]
  47. return data
  48. async def get_videos_local_distribute_count(video_id, flow_pool):
  49. """
  50. 获取流量池视频本地分发数
  51. :param video_id: video_id
  52. :param flow_pool: 流量池标记
  53. :return: current_count 本地记录的分发数
  54. """
  55. redis_h = datetime.now().hour
  56. if datetime.now().minute >= 30:
  57. redis_h += 0.5
  58. key_name = config_.LOCAL_DISTRIBUTE_COUNT_PREFIX + str(redis_h)
  59. video = '{}-{}'.format(video_id, flow_pool)
  60. current_count = await aiocache.get_score_with_value(key_name=key_name, value=video)
  61. if current_count is not None:
  62. return current_count
  63. else:
  64. return None
  65. class FilterVideos(object):
  66. """视频过滤"""
  67. def __init__(self, app_type, video_ids, mid='', uid=''):
  68. """
  69. 初始化
  70. :param app_type: 产品标识 type-int
  71. :param video_ids: 需过滤的视频列表 type-list
  72. :param mid: mid type-string
  73. :param uid: uid type-string
  74. """
  75. self.app_type = app_type
  76. self.mid = mid
  77. self.uid = uid
  78. self.video_ids = video_ids
  79. async def filter_videos(self):
  80. """视频过滤"""
  81. # 预曝光过滤
  82. st_pre = time.time()
  83. filtered_pre_result = await self.filter_video_previewed(self.video_ids)
  84. et_pre = time.time()
  85. log_.info('filter by previewed: app_type = {}, result = {}, execute time = {}ms'.format(
  86. self.app_type, filtered_pre_result, (et_pre - st_pre) * 1000))
  87. if not filtered_pre_result:
  88. return None
  89. # 视频状态过滤
  90. st_status = time.time()
  91. filtered_status_result = await self.filter_video_status(video_ids=filtered_pre_result)
  92. et_status = time.time()
  93. log_.info('filter by video status: result = {}, execute time = {}ms'.format(
  94. filtered_status_result, (et_status - st_status) * 1000))
  95. if not filtered_status_result:
  96. return None
  97. # 视频已曝光过滤
  98. st_viewed = time.time()
  99. filtered_viewed_result = await self.filter_video_viewed(video_ids=filtered_status_result)
  100. et_viewed = time.time()
  101. log_.info('filter by viewed: app_type = {}, mid = {}, uid = {}, result = {}, execute time = {}ms'.format(
  102. self.app_type, self.mid, self.uid, filtered_viewed_result, (et_viewed - st_viewed) * 1000))
  103. if not filtered_viewed_result:
  104. return None
  105. else:
  106. return filtered_viewed_result
  107. async def filter_video_previewed(self, video_ids):
  108. """
  109. 预曝光过滤
  110. :param video_ids: 需过滤的视频列表 type-list
  111. :return: filtered_videos 过滤后的列表 type-list
  112. """
  113. # 根据Redis缓存中的数据过滤
  114. # key拼接
  115. key_name = config_.PREVIEW_KEY_PREFIX + '{}.{}'.format(self.app_type, self.mid)
  116. pe_videos_list = await aiocache.get_data_from_set(key_name)
  117. if not pe_videos_list:
  118. return video_ids
  119. pe_videos = [eval(video) for video in pe_videos_list]
  120. filtered_videos = [video_id for video_id in video_ids if video_id not in pe_videos]
  121. return filtered_videos
  122. async def filter_video_status(self, video_ids):
  123. """
  124. 对视频状态进行过滤
  125. :param video_ids: 视频id列表 type-list
  126. :return: filtered_videos
  127. """
  128. b_time = time.time()
  129. if len(video_ids) == 1:
  130. sql = "set hg_experimental_enable_shard_pruning=off; " \
  131. "SELECT video_id " \
  132. "FROM {} " \
  133. "WHERE audit_status = 5 " \
  134. "AND applet_rec_status IN (1, -6) " \
  135. "AND open_status = 1 " \
  136. "AND payment_status = 0 " \
  137. "AND encryption_status IS NULL " \
  138. "AND transcoding_status = 3 " \
  139. "AND video_id IN ({});".format(config_.VIDEO_STATUS, video_ids[0])
  140. else:
  141. sql = "set hg_experimental_enable_shard_pruning=off; " \
  142. "SELECT video_id " \
  143. "FROM {} " \
  144. "WHERE audit_status = 5 " \
  145. "AND applet_rec_status IN (1, -6) " \
  146. "AND open_status = 1 " \
  147. "AND payment_status = 0 " \
  148. "AND encryption_status IS NULL " \
  149. "AND transcoding_status = 3 " \
  150. "AND video_id IN {};".format(config_.VIDEO_STATUS, tuple(video_ids))
  151. data = await aiocache.pg_getdata(sql=sql)
  152. filtered_videos = [temp[0] for temp in data]
  153. e_time = time.time()
  154. print('holo time',e_time-b_time)
  155. return filtered_videos
  156. async def filter_video_viewed(self, video_ids, types=(1,)):
  157. """
  158. 调用后端接口过滤用户已观看视频
  159. :param video_ids: 视频id列表 type-list
  160. :param types: 过滤参数 type-tuple, 默认(1, ) 1-已观看 2-视频状态 3-是否进入老年人社区 4-话题状态 5-推荐状态
  161. :return: filtered_videos
  162. """
  163. # 调用http接口
  164. request_data = {"appType": self.app_type,
  165. "mid": self.mid,
  166. "uid": self.uid,
  167. "types": list(types),
  168. "videoIds": video_ids}
  169. result = await request_post(url=config_.VIDEO_FILTER_URL, data=request_data, timeout=2.5)
  170. if result is None:
  171. log_.info('过滤失败,types: {}'.format(types))
  172. return video_ids
  173. if result['code'] != 0:
  174. log_.info('过滤失败,types: {}'.format(types))
  175. return video_ids
  176. filtered_videos = result['data']
  177. return filtered_videos
  178. if __name__ == '__main__':
  179. filter_ = FilterVideos(app_type=1, mid='22', uid='www', video_ids=[1, 2, 3, 55])
  180. filter_.filter_videos()
  181. filter_.filter_video_status(video_ids=[1, 3, 5])