|  | @@ -1,3 +1,4 @@
 | 
											
												
													
														|  | 
 |  | +import datetime
 | 
											
												
													
														|  |  import random
 |  |  import random
 | 
											
												
													
														|  |  import time
 |  |  import time
 | 
											
												
													
														|  |  import os
 |  |  import os
 | 
											
										
											
												
													
														|  | @@ -8,6 +9,7 @@ from config import set_config
 | 
											
												
													
														|  |  from utils import request_post, filter_video_status, send_msg_to_feishu
 |  |  from utils import request_post, filter_video_status, send_msg_to_feishu
 | 
											
												
													
														|  |  from log import Log
 |  |  from log import Log
 | 
											
												
													
														|  |  from db_helper import RedisHelper
 |  |  from db_helper import RedisHelper
 | 
											
												
													
														|  | 
 |  | +from odps import ODPS
 | 
											
												
													
														|  |  
 |  |  
 | 
											
												
													
														|  |  config_, _ = set_config()
 |  |  config_, _ = set_config()
 | 
											
												
													
														|  |  log_ = Log()
 |  |  log_ = Log()
 | 
											
										
											
												
													
														|  | @@ -126,17 +128,96 @@ def predict(app_type):
 | 
											
												
													
														|  |          )
 |  |          )
 | 
											
												
													
														|  |  
 |  |  
 | 
											
												
													
														|  |  
 |  |  
 | 
											
												
													
														|  | 
 |  | +def get_data_from_odps(project, sql):
 | 
											
												
													
														|  | 
 |  | +    """检查数据是否准备好"""
 | 
											
												
													
														|  | 
 |  | +    odps = ODPS(
 | 
											
												
													
														|  | 
 |  | +        access_id=config_.ODPS_CONFIG['ACCESSID'],
 | 
											
												
													
														|  | 
 |  | +        secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
 | 
											
												
													
														|  | 
 |  | +        project=project,
 | 
											
												
													
														|  | 
 |  | +        endpoint=config_.ODPS_CONFIG['ENDPOINT'],
 | 
											
												
													
														|  | 
 |  | +        connect_timeout=3000,
 | 
											
												
													
														|  | 
 |  | +        read_timeout=500000,
 | 
											
												
													
														|  | 
 |  | +        pool_maxsize=1000,
 | 
											
												
													
														|  | 
 |  | +        pool_connections=1000
 | 
											
												
													
														|  | 
 |  | +    )
 | 
											
												
													
														|  | 
 |  | +
 | 
											
												
													
														|  | 
 |  | +    try:
 | 
											
												
													
														|  | 
 |  | +        with odps.execute_sql(sql=sql).open_reader() as reader:
 | 
											
												
													
														|  | 
 |  | +            data_df = reader.to_pandas()
 | 
											
												
													
														|  | 
 |  | +    except Exception as e:
 | 
											
												
													
														|  | 
 |  | +        data_df = None
 | 
											
												
													
														|  | 
 |  | +    return data_df
 | 
											
												
													
														|  | 
 |  | +
 | 
											
												
													
														|  | 
 |  | +
 | 
											
												
													
														|  | 
 |  | +def predict_18_19(app_type):
 | 
											
												
													
														|  | 
 |  | +    log_.info(f'app_type = {app_type}')
 | 
											
												
													
														|  | 
 |  | +    now = datetime.datetime.now()
 | 
											
												
													
														|  | 
 |  | +    log_.info(f"now = {datetime.datetime.strftime(now, '%Y-%m-%d %H:%M:%S')}")
 | 
											
												
													
														|  | 
 |  | +    create_time = datetime.datetime.strftime(now - datetime.timedelta(hours=24), '%Y-%m-%d %H:%M:%S')
 | 
											
												
													
														|  | 
 |  | +    if app_type == config_.APP_TYPE['LAO_HAO_KAN_VIDEO']:
 | 
											
												
													
														|  | 
 |  | +        sql = f"SELECT video_id FROM videoods.movie_store_video_allow_list " \
 | 
											
												
													
														|  | 
 |  | +              f"WHERE allow_list_type=1 AND create_time>='{create_time}'"
 | 
											
												
													
														|  | 
 |  | +    elif app_type == config_.APP_TYPE['ZUI_JING_QI']:
 | 
											
												
													
														|  | 
 |  | +        sql = f"SELECT video_id FROM videoods.movie_store_video_allow_list " \
 | 
											
												
													
														|  | 
 |  | +              f"WHERE allow_list_type=0 AND " \
 | 
											
												
													
														|  | 
 |  | +              f"video_id NOT IN (" \
 | 
											
												
													
														|  | 
 |  | +              f"SELECT video_id FROM videoods.movie_store_video_allow_list WHERE allow_list_type=1" \
 | 
											
												
													
														|  | 
 |  | +              f") AND " \
 | 
											
												
													
														|  | 
 |  | +              f"create_time>='{create_time}'"
 | 
											
												
													
														|  | 
 |  | +    else:
 | 
											
												
													
														|  | 
 |  | +        sql = ""
 | 
											
												
													
														|  | 
 |  | +    data_df = get_data_from_odps(project='videoods', sql=sql)
 | 
											
												
													
														|  | 
 |  | +    if data_df is not None:
 | 
											
												
													
														|  | 
 |  | +        video_ids = [int(video_id) for video_id in data_df['video_id'].to_list()]
 | 
											
												
													
														|  | 
 |  | +        log_.info(f'video_ids count = {len(video_ids)}')
 | 
											
												
													
														|  | 
 |  | +        # 对视频状态进行过滤
 | 
											
												
													
														|  | 
 |  | +        filtered_videos = filter_video_status(list(video_ids))
 | 
											
												
													
														|  | 
 |  | +        log_.info('filter videos status finished, filtered_videos nums={}'.format(len(filtered_videos)))
 | 
											
												
													
														|  | 
 |  | +        if not filtered_videos:
 | 
											
												
													
														|  | 
 |  | +            log_.info('流量池中视频状态不符合分发')
 | 
											
												
													
														|  | 
 |  | +            return None
 | 
											
												
													
														|  | 
 |  | +        # 预测
 | 
											
												
													
														|  | 
 |  | +        video_score = get_score(filtered_videos)
 | 
											
												
													
														|  | 
 |  | +        log_.info('predict finished!')
 | 
											
												
													
														|  | 
 |  | +        # 上传数据到redis
 | 
											
												
													
														|  | 
 |  | +        redis_data = {}
 | 
											
												
													
														|  | 
 |  | +        for i in range(len(video_score)):
 | 
											
												
													
														|  | 
 |  | +            video_id = filtered_videos[i]
 | 
											
												
													
														|  | 
 |  | +            score = video_score[i]
 | 
											
												
													
														|  | 
 |  | +            redis_data[video_id] = score
 | 
											
												
													
														|  | 
 |  | +        key_name = config_.FLOWPOOL_KEY_NAME_PREFIX + str(app_type)
 | 
											
												
													
														|  | 
 |  | +        redis_helper = RedisHelper()
 | 
											
												
													
														|  | 
 |  | +        # 如果key已存在,删除key
 | 
											
												
													
														|  | 
 |  | +        if redis_helper.key_exists(key_name):
 | 
											
												
													
														|  | 
 |  | +            redis_helper.del_keys(key_name)
 | 
											
												
													
														|  | 
 |  | +        # 写入redis
 | 
											
												
													
														|  | 
 |  | +        redis_helper.add_data_with_zset(key_name=key_name, data=redis_data, expire_time=24 * 3600)
 | 
											
												
													
														|  | 
 |  | +        log_.info('data to redis finished!')
 | 
											
												
													
														|  | 
 |  | +
 | 
											
												
													
														|  | 
 |  | +
 | 
											
												
													
														|  |  if __name__ == '__main__':
 |  |  if __name__ == '__main__':
 | 
											
												
													
														|  |      # res = get_videos_from_pool(app_type=0)
 |  |      # res = get_videos_from_pool(app_type=0)
 | 
											
												
													
														|  |      # res = get_videos_remain_view_count(app_type=0, videos_info=[('12345', '#2#1#111')])
 |  |      # res = get_videos_remain_view_count(app_type=0, videos_info=[('12345', '#2#1#111')])
 | 
											
												
													
														|  |      # print(res)
 |  |      # print(res)
 | 
											
												
													
														|  |  
 |  |  
 | 
											
												
													
														|  | 
 |  | +    app_type_list = [config_.APP_TYPE['LAO_HAO_KAN_VIDEO'], config_.APP_TYPE['ZUI_JING_QI']]
 | 
											
												
													
														|  |      log_.info('flow pool predict start...')
 |  |      log_.info('flow pool predict start...')
 | 
											
												
													
														|  |      for app_name, app_type in config_.APP_TYPE.items():
 |  |      for app_name, app_type in config_.APP_TYPE.items():
 | 
											
												
													
														|  |          log_.info('{} predict start...'.format(app_name))
 |  |          log_.info('{} predict start...'.format(app_name))
 | 
											
												
													
														|  | -        predict(app_type=app_type)
 |  | 
 | 
											
												
													
														|  | 
 |  | +        if app_type in app_type_list:
 | 
											
												
													
														|  | 
 |  | +            predict_18_19(app_type=app_type)
 | 
											
												
													
														|  | 
 |  | +        else:
 | 
											
												
													
														|  | 
 |  | +            pass
 | 
											
												
													
														|  | 
 |  | +            predict(app_type=app_type)
 | 
											
												
													
														|  |          log_.info('{} predict end...'.format(app_name))
 |  |          log_.info('{} predict end...'.format(app_name))
 | 
											
												
													
														|  |      log_.info('flow pool predict end...')
 |  |      log_.info('flow pool predict end...')
 | 
											
												
													
														|  | 
 |  | +
 | 
											
												
													
														|  | 
 |  | +
 | 
											
												
													
														|  | 
 |  | +
 | 
											
												
													
														|  | 
 |  | +
 | 
											
												
													
														|  | 
 |  | +
 | 
											
												
													
														|  | 
 |  | +
 | 
											
												
													
														|  | 
 |  | +
 | 
											
												
													
														|  |      # 将日志上传到oss
 |  |      # 将日志上传到oss
 | 
											
												
													
														|  |      # log_cmd = "ossutil cp -r -f {} oss://{}/{}".format(log_.logname, config_.BUCKET_NAME,
 |  |      # log_cmd = "ossutil cp -r -f {} oss://{}/{}".format(log_.logname, config_.BUCKET_NAME,
 | 
											
												
													
														|  |      #                                                    config_.OSS_FOLDER_LOGS + 'flow_pool/')
 |  |      #                                                    config_.OSS_FOLDER_LOGS + 'flow_pool/')
 |