1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768 |
- # coding:utf-8
- import datetime
- import traceback
- import os
- from utils import execute_sql_from_odps
- from db_helper import RedisHelper
- from config import set_config
- from log import Log
- config_ = set_config()
- log_ = Log()
- def update_bottom_videos():
- """更新兜底视频"""
- try:
- # 获取昨日播放量top1000的视频
- now_date = datetime.datetime.today()
- delta_date = now_date - datetime.timedelta(days=1)
- sql = "SELECT video_playcount.videoid " \
- ",video_playcount.play_count " \
- "FROM ( " \
- "SELECT videoid " \
- ",COUNT(*) play_count " \
- "FROM loghubods.video_action_log_applet " \
- "WHERE dt = {} " \
- "AND business = 'videoPlay' " \
- "GROUP BY videoid " \
- ") video_playcount INNER " \
- "JOIN ( " \
- "SELECT DISTINCT videoid " \
- "FROM videoods.dim_video " \
- "WHERE video_edit = '通过' " \
- "AND video_data_stat = '有效' " \
- "AND video_recommend IN ( '待推荐', '普通推荐') " \
- "AND charge = '免费' " \
- "AND is_pwd = '未加密' " \
- ") video_status " \
- "ON video_playcount.videoid = video_status.videoid " \
- "ORDER BY video_playcount.play_count DESC " \
- "LIMIT 1000 " \
- ";".format(delta_date.strftime('%Y%m%d'))
- records = execute_sql_from_odps(project='loghubods', sql=sql)
- # 视频按照昨日播放量写入redis
- videos = {}
- with records.open_reader() as reader:
- for record in reader:
- video_id = record['videoid']
- videos[video_id] = record['play_count']
- redis_helper = RedisHelper()
- redis_helper.add_data_with_zset(key_name=config_.BOTTOM_KEY_NAME, data=videos)
- # 与原有兜底视频排序,保留top1000
- redis_helper.remove_by_rank_from_zset(key_name=config_.BOTTOM_KEY_NAME, start=config_.BOTTOM_NUM, stop=-1)
- # 移除bottom key的过期时间,将其转换为永久状态
- redis_helper.persist_key(key_name=config_.BOTTOM_KEY_NAME)
- log_.info('{} update bottom videos success!, video nums = {}'.format(now_date, len(videos)))
- except Exception as e:
- log_.error(traceback.format_exc())
- if __name__ == '__main__':
- update_bottom_videos()
|