utils.py 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239
  1. import traceback
  2. import requests
  3. import json
  4. import time
  5. from datetime import datetime
  6. from db_helper import HologresHelper, RedisHelper
  7. from config import set_config
  8. from log import Log
  9. config_ = set_config()
  10. log_ = Log()
  11. def send_msg_to_feishu(msg_text):
  12. """发送消息到飞书"""
  13. # webhook地址
  14. webhook = 'https://open.feishu.cn/open-apis/bot/v2/hook/8de4de35-30ed-4692-8854-7a154e89b2f2'
  15. # 自定义关键词key_word
  16. key_word = '服务报警'
  17. headers = {'Content-Type': 'application/json'}
  18. payload_message = {
  19. "msg_type": "text",
  20. "content": {
  21. "text": '{}: {}'.format(key_word, msg_text)
  22. }
  23. }
  24. response = requests.request('POST', url=webhook, headers=headers, data=json.dumps(payload_message))
  25. print(response.text)
  26. def request_post(request_url, request_data, timeout):
  27. """
  28. post 请求 HTTP接口
  29. :param request_url: 接口URL
  30. :param request_data: 请求参数
  31. :param timeout: 超时时间,单位为秒,type-float or tuple(connect_timeout, read_timeout)
  32. :return: res_data json格式
  33. """
  34. try:
  35. headers = {"Connection": "close"}
  36. response = requests.post(url=request_url, json=request_data, timeout=timeout, headers=headers)
  37. if response.status_code == 200:
  38. res_data = json.loads(response.text)
  39. return res_data
  40. else:
  41. return None
  42. except Exception as e:
  43. log_.error('url: {}, exception: {}, traceback: {}'.format(request_url, e, traceback.format_exc()))
  44. return None
  45. def get_videos_remain_view_count(app_type, videos):
  46. """
  47. 获取视频在流量池中的剩余可分发数
  48. :param app_type: 产品标识 type-int
  49. :param videos: 视频信息 (视频id, 流量池标记) type-list,[{'videoId': video_id, 'flowPool': flow_pool}, ...]
  50. :return: data type-list,[(video_id, flow_pool, view_count), ...]
  51. error_flag 错误标记,True为错误
  52. """
  53. error_flag = False
  54. if not videos:
  55. return [], error_flag
  56. request_data = {'appType': app_type, 'videos': videos}
  57. result = request_post(request_url=config_.GET_REMAIN_VIEW_COUNT_URL, request_data=request_data, timeout=(0.1, 1))
  58. if result is None:
  59. error_flag = True
  60. return [], error_flag
  61. if result['code'] != 0:
  62. log_.info('获取视频在流量池中的剩余可分发数失败')
  63. error_flag = True
  64. return [], error_flag
  65. data = [(item['videoId'], item['flowPool'], item['distributeCount']) for item in result['data']]
  66. return data, error_flag
  67. def get_videos_local_distribute_count(video_id, flow_pool):
  68. """
  69. 获取流量池视频本地分发数
  70. :param video_id: video_id
  71. :param flow_pool: 流量池标记
  72. :return: current_count 本地记录的分发数
  73. """
  74. # redis_h = datetime.now().hour
  75. # if datetime.now().minute >= 30:
  76. # redis_h += 0.5
  77. # key_name = config_.LOCAL_DISTRIBUTE_COUNT_PREFIX + str(redis_h)
  78. key_name = '{}{}.{}'.format(config_.LOCAL_DISTRIBUTE_COUNT_PREFIX, video_id, flow_pool)
  79. redis_helper = RedisHelper()
  80. # video = '{}-{}'.format(video_id, flow_pool)
  81. # current_count = redis_helper.get_score_with_value(key_name=key_name, value=video)
  82. current_count = redis_helper.get_data_from_redis(key_name=key_name)
  83. if current_count is not None:
  84. return int(current_count)
  85. else:
  86. return None
  87. class FilterVideos(object):
  88. """视频过滤"""
  89. def __init__(self, app_type, video_ids, mid='', uid=''):
  90. """
  91. 初始化
  92. :param app_type: 产品标识 type-int
  93. :param video_ids: 需过滤的视频列表 type-list
  94. :param mid: mid type-string
  95. :param uid: uid type-string
  96. """
  97. self.app_type = app_type
  98. self.mid = mid
  99. self.uid = uid
  100. self.video_ids = video_ids
  101. def filter_videos(self):
  102. """视频过滤"""
  103. # 预曝光过滤
  104. st_pre = time.time()
  105. filtered_pre_result = self.filter_video_previewed(self.video_ids)
  106. et_pre = time.time()
  107. log_.info('===filter by previewed: app_type = {}, result = {}, execute time = {}ms'.format(
  108. self.app_type, filtered_pre_result, (et_pre - st_pre) * 1000))
  109. if not filtered_pre_result:
  110. return None
  111. # 视频状态过滤采用离线定时过滤方案
  112. # 视频状态过滤
  113. # st_status = time.time()
  114. # filtered_status_result = self.filter_video_status(video_ids=filtered_pre_result)
  115. # et_status = time.time()
  116. # log_.info('filter by video status: result = {}, execute time = {}ms'.format(
  117. # filtered_status_result, (et_status - st_status) * 1000))
  118. # if not filtered_status_result:
  119. # return None
  120. # 视频已曝光过滤
  121. st_viewed = time.time()
  122. filtered_viewed_result = self.filter_video_viewed(video_ids=filtered_pre_result)
  123. et_viewed = time.time()
  124. log_.info('===filter by viewed: app_type = {}, mid = {}, uid = {}, result = {}, execute time = {}ms'.format(
  125. self.app_type, self.mid, self.uid, filtered_viewed_result, (et_viewed - st_viewed) * 1000))
  126. if not filtered_viewed_result:
  127. return None
  128. else:
  129. return [int(video_id) for video_id in filtered_viewed_result]
  130. def filter_video_previewed(self, video_ids):
  131. """
  132. 预曝光过滤
  133. :param video_ids: 需过滤的视频列表 type-list
  134. :return: filtered_videos 过滤后的列表 type-list
  135. """
  136. # 根据Redis缓存中的数据过滤
  137. redis_helper = RedisHelper()
  138. # key拼接
  139. key_name = config_.PREVIEW_KEY_PREFIX + '{}.{}'.format(self.app_type, self.mid)
  140. pe_videos_list = redis_helper.get_data_from_set(key_name)
  141. if not pe_videos_list:
  142. return video_ids
  143. pe_videos = [int(video) for video in pe_videos_list]
  144. filtered_videos = [video_id for video_id in video_ids if video_id not in pe_videos]
  145. return filtered_videos
  146. def filter_video_status(self, video_ids):
  147. """
  148. 对视频状态进行过滤
  149. :param video_ids: 视频id列表 type-list
  150. :return: filtered_videos
  151. """
  152. if len(video_ids) == 1:
  153. sql = "set hg_experimental_enable_shard_pruning=off; " \
  154. "SELECT video_id " \
  155. "FROM {} " \
  156. "WHERE audit_status = 5 " \
  157. "AND applet_rec_status IN (1, -6) " \
  158. "AND open_status = 1 " \
  159. "AND payment_status = 0 " \
  160. "AND encryption_status != 5 " \
  161. "AND transcoding_status = 3 " \
  162. "AND video_id IN ({});".format(config_.VIDEO_STATUS, video_ids[0])
  163. else:
  164. sql = "set hg_experimental_enable_shard_pruning=off; " \
  165. "SELECT video_id " \
  166. "FROM {} " \
  167. "WHERE audit_status = 5 " \
  168. "AND applet_rec_status IN (1, -6) " \
  169. "AND open_status = 1 " \
  170. "AND payment_status = 0 " \
  171. "AND encryption_status != 5 " \
  172. "AND transcoding_status = 3 " \
  173. "AND video_id IN {};".format(config_.VIDEO_STATUS, tuple(video_ids))
  174. hologres_helper = HologresHelper()
  175. data = hologres_helper.get_data(sql=sql)
  176. filtered_videos = [int(temp[0]) for temp in data]
  177. return filtered_videos
  178. def filter_video_viewed(self, video_ids, types=(1,)):
  179. """
  180. 调用后端接口过滤用户已观看视频
  181. :param video_ids: 视频id列表 type-list
  182. :param types: 过滤参数 type-tuple, 默认(1, ) 1-已观看 2-视频状态 3-是否进入老年人社区 4-话题状态 5-推荐状态
  183. :return: filtered_videos
  184. """
  185. # 调用http接口
  186. request_data = {"appType": self.app_type,
  187. "mid": self.mid,
  188. "uid": self.uid,
  189. "types": list(types),
  190. "videoIds": video_ids}
  191. result = request_post(request_url=config_.VIDEO_FILTER_URL, request_data=request_data, timeout=2.5)
  192. if result is None:
  193. log_.info('过滤失败,types: {}'.format(types))
  194. return video_ids
  195. if result['code'] != 0:
  196. log_.info('过滤失败,types: {}'.format(types))
  197. return video_ids
  198. filtered_videos = result['data']
  199. return filtered_videos
  200. if __name__ == '__main__':
  201. # filter_ = FilterVideos(app_type=1, mid='22', uid='www', video_ids=[1, 2, 3, 55])
  202. # filter_.filter_videos()
  203. # filter_.filter_video_status(video_ids=[1, 3, 5])
  204. # videos = [{'videoId': 9034659, 'flowPool': '3#11#3#1637824188547'}, {'videoId': 9035052, 'flowPool': '3#11#3#1637824172827'}]
  205. # res = get_videos_remain_view_count(4, videos)
  206. # print(res)
  207. text = '测试 @李倩'
  208. send_msg_to_feishu(text)