bottom_videos.py 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. # coding:utf-8
  2. import datetime
  3. import json
  4. import traceback
  5. import os
  6. import json
  7. from my_utils import execute_sql_from_odps, request_post, update_video_w_h_rate
  8. from db_helper import RedisHelper
  9. from my_config import set_config
  10. from log import Log
  11. config_, env = set_config()
  12. log_ = Log()
  13. def get_bottom_videos_test():
  14. """获取测试环境兜底视频"""
  15. try:
  16. # 获取总播放量top1000的视频
  17. sql = "SELECT id " \
  18. ",play_count_total " \
  19. "FROM videoods.wx_video_test " \
  20. "WHERE transcode_status = 3 " \
  21. "AND STATUS = 1 " \
  22. "AND recommend_status IN ( - 6, 1) " \
  23. "ORDER BY play_count_total DESC " \
  24. "LIMIT 2000" \
  25. ";"
  26. records = execute_sql_from_odps(project='videoods', sql=sql)
  27. # 视频按照总播放量写入redis
  28. video_id_list = []
  29. videos = {}
  30. with records.open_reader() as reader:
  31. for record in reader:
  32. video_id = int(record['id'])
  33. video_id_list.append(video_id)
  34. videos[video_id] = record['play_count_total']
  35. return video_id_list, videos
  36. except Exception as e:
  37. log_.error(traceback.format_exc())
  38. def get_bottom_videos_pro(now_date):
  39. """获取生产环境兜底视频"""
  40. try:
  41. # 获取昨日播放量top1000的视频
  42. delta_date = now_date - datetime.timedelta(days=1)
  43. sql = "SELECT video_playcount.videoid " \
  44. ",video_playcount.play_count " \
  45. "FROM ( " \
  46. "SELECT videoid " \
  47. ",COUNT(*) play_count " \
  48. "FROM loghubods.video_action_log_applet " \
  49. "WHERE dt = {} " \
  50. "AND business = 'videoPlay' " \
  51. "GROUP BY videoid " \
  52. ") video_playcount INNER " \
  53. "JOIN ( " \
  54. "SELECT DISTINCT videoid " \
  55. "FROM videoods.dim_video " \
  56. "WHERE video_edit = '通过' " \
  57. "AND video_data_stat = '有效' " \
  58. "AND video_recommend IN ( '待推荐', '普通推荐') " \
  59. "AND charge = '免费' " \
  60. "AND is_pwd = '未加密' " \
  61. ") video_status " \
  62. "ON video_playcount.videoid = video_status.videoid " \
  63. "ORDER BY video_playcount.play_count DESC " \
  64. "LIMIT 2000 " \
  65. ";".format(delta_date.strftime('%Y%m%d'))
  66. records = execute_sql_from_odps(project='loghubods', sql=sql)
  67. # 视频按照昨日播放量写入redis
  68. video_id_list = []
  69. videos = {}
  70. with records.open_reader() as reader:
  71. for record in reader:
  72. video_id = int(record['videoid'])
  73. video_id_list.append(video_id)
  74. videos[video_id] = record['play_count']
  75. return video_id_list, videos
  76. except Exception as e:
  77. log_.error(traceback.format_exc())
  78. def update_bottom_videos():
  79. """更新兜底视频"""
  80. try:
  81. # 获取对应环境的兜底视频
  82. now_date = datetime.datetime.today()
  83. if env in ['dev', 'test']:
  84. video_id_list, videos = get_bottom_videos_test()
  85. elif env in ['pre', 'pro']:
  86. video_id_list, videos = get_bottom_videos_pro(now_date=now_date)
  87. else:
  88. log_.error('env error')
  89. return
  90. # redis数据更新
  91. redis_helper = RedisHelper()
  92. redis_helper.del_keys(key_name=config_.BOTTOM_KEY_NAME)
  93. redis_helper.add_data_with_zset(key_name=config_.BOTTOM_KEY_NAME, data=videos)
  94. # 与原有兜底视频排序,保留top1000
  95. # redis_helper.remove_by_rank_from_zset(key_name=config_.BOTTOM_KEY_NAME, start=config_.BOTTOM_NUM, stop=-1)
  96. # 移除bottom key的过期时间,将其转换为永久状态
  97. redis_helper.persist_key(key_name=config_.BOTTOM_KEY_NAME)
  98. log_.info('{} update bottom videos success!, count = {}'.format(now_date, len(videos)))
  99. # ###### 下线
  100. # # 更新视频的宽高比数据
  101. # video_ids = redis_helper.get_data_zset_with_index(key_name=config_.BOTTOM_KEY_NAME, start=0, end=-1)
  102. # if video_ids:
  103. # update_video_w_h_rate(video_ids=video_ids,
  104. # key_name=config_.W_H_RATE_UP_1_VIDEO_LIST_KEY_NAME['bottom_last'])
  105. # log_.info('update video w_h_rate to redis finished!')
  106. # # 获取今日兜底视频的json,并存入redis
  107. # video_json_list = []
  108. # for i in range(0, len(video_id_list)//10):
  109. # video_json = get_video_info_json(video_ids=video_id_list[i*10:(i+1)*10])
  110. # if video_json is not None:
  111. # video_json_list.extend(video_json)
  112. # if len(video_json_list) >= 1000:
  113. # break
  114. # # 写入redis,先删除key,再重新写入
  115. # redis_helper.del_keys(config_.BOTTOM_JSON_KEY_NAME)
  116. # redis_helper.add_data_with_set(key_name=config_.BOTTOM_JSON_KEY_NAME, values=video_json_list[:1000])
  117. # # 移除过期时间,将其转换为永久状态
  118. # redis_helper.persist_key(key_name=config_.BOTTOM_JSON_KEY_NAME)
  119. #
  120. # log_.info('{} update bottom videos info json success!, count = {}'.format(now_date,
  121. # len(video_json_list[:1000])))
  122. except Exception as e:
  123. log_.error(traceback.format_exc())
  124. def get_video_info_json(video_ids):
  125. """
  126. 获取视频对应json
  127. :param video_ids: type-list [int, int, ...]
  128. :return: json_data_list
  129. """
  130. request_data = {
  131. "appType": 4,
  132. "platform": "android",
  133. "versionCode": 295,
  134. "pageSize": 10,
  135. "machineCode": "weixin_openid_otjoB5WWWmkRjpMzkV5ltZ3osg3A",
  136. "uid": 6281917,
  137. "videoIds": video_ids
  138. }
  139. res = request_post(request_url=config_.BOTTOM_JSON_URL, request_data=request_data)
  140. if res is None:
  141. return None
  142. if res['code'] != 0:
  143. log_.info('获取视频json失败!')
  144. return None
  145. json_data_list = [json.dumps(item) for item in res['data']]
  146. return json_data_list
  147. if __name__ == '__main__':
  148. update_bottom_videos()