|
@@ -1,3 +1,4 @@
|
|
|
+import datetime
|
|
|
import random
|
|
|
import time
|
|
|
import os
|
|
@@ -8,6 +9,7 @@ from config import set_config
|
|
|
from utils import request_post, filter_video_status, send_msg_to_feishu
|
|
|
from log import Log
|
|
|
from db_helper import RedisHelper
|
|
|
+from odps import ODPS
|
|
|
|
|
|
config_, _ = set_config()
|
|
|
log_ = Log()
|
|
@@ -126,17 +128,96 @@ def predict(app_type):
|
|
|
)
|
|
|
|
|
|
|
|
|
+def get_data_from_odps(project, sql):
|
|
|
+ """检查数据是否准备好"""
|
|
|
+ odps = ODPS(
|
|
|
+ access_id=config_.ODPS_CONFIG['ACCESSID'],
|
|
|
+ secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
|
|
|
+ project=project,
|
|
|
+ endpoint=config_.ODPS_CONFIG['ENDPOINT'],
|
|
|
+ connect_timeout=3000,
|
|
|
+ read_timeout=500000,
|
|
|
+ pool_maxsize=1000,
|
|
|
+ pool_connections=1000
|
|
|
+ )
|
|
|
+
|
|
|
+ try:
|
|
|
+ with odps.execute_sql(sql=sql).open_reader() as reader:
|
|
|
+ data_df = reader.to_pandas()
|
|
|
+ except Exception as e:
|
|
|
+ data_df = None
|
|
|
+ return data_df
|
|
|
+
|
|
|
+
|
|
|
+def predict_18_19(app_type):
|
|
|
+ log_.info(f'app_type = {app_type}')
|
|
|
+ now = datetime.datetime.now()
|
|
|
+ log_.info(f"now = {datetime.datetime.strftime(now, '%Y-%m-%d %H:%M:%S')}")
|
|
|
+ create_time = datetime.datetime.strftime(now - datetime.timedelta(hours=24), '%Y-%m-%d %H:%M:%S')
|
|
|
+ if app_type == config_.APP_TYPE['LAO_HAO_KAN_VIDEO']:
|
|
|
+ sql = f"SELECT video_id FROM videoods.movie_store_video_allow_list " \
|
|
|
+ f"WHERE allow_list_type=1 AND create_time>='{create_time}'"
|
|
|
+ elif app_type == config_.APP_TYPE['ZUI_JING_QI']:
|
|
|
+ sql = f"SELECT video_id FROM videoods.movie_store_video_allow_list " \
|
|
|
+ f"WHERE allow_list_type=0 AND " \
|
|
|
+ f"video_id NOT IN (" \
|
|
|
+ f"SELECT video_id FROM videoods.movie_store_video_allow_list WHERE allow_list_type=1" \
|
|
|
+ f") AND " \
|
|
|
+ f"create_time>='{create_time}'"
|
|
|
+ else:
|
|
|
+ sql = ""
|
|
|
+ data_df = get_data_from_odps(project='videoods', sql=sql)
|
|
|
+ if data_df is not None:
|
|
|
+ video_ids = [int(video_id) for video_id in data_df['video_id'].to_list()]
|
|
|
+ log_.info(f'video_ids count = {len(video_ids)}')
|
|
|
+ # 对视频状态进行过滤
|
|
|
+ filtered_videos = filter_video_status(list(video_ids))
|
|
|
+ log_.info('filter videos status finished, filtered_videos nums={}'.format(len(filtered_videos)))
|
|
|
+ if not filtered_videos:
|
|
|
+ log_.info('流量池中视频状态不符合分发')
|
|
|
+ return None
|
|
|
+ # 预测
|
|
|
+ video_score = get_score(filtered_videos)
|
|
|
+ log_.info('predict finished!')
|
|
|
+ # 上传数据到redis
|
|
|
+ redis_data = {}
|
|
|
+ for i in range(len(video_score)):
|
|
|
+ video_id = filtered_videos[i]
|
|
|
+ score = video_score[i]
|
|
|
+ redis_data[video_id] = score
|
|
|
+ key_name = config_.FLOWPOOL_KEY_NAME_PREFIX + str(app_type)
|
|
|
+ redis_helper = RedisHelper()
|
|
|
+ # 如果key已存在,删除key
|
|
|
+ if redis_helper.key_exists(key_name):
|
|
|
+ redis_helper.del_keys(key_name)
|
|
|
+ # 写入redis
|
|
|
+ redis_helper.add_data_with_zset(key_name=key_name, data=redis_data, expire_time=24 * 3600)
|
|
|
+ log_.info('data to redis finished!')
|
|
|
+
|
|
|
+
|
|
|
if __name__ == '__main__':
|
|
|
# res = get_videos_from_pool(app_type=0)
|
|
|
# res = get_videos_remain_view_count(app_type=0, videos_info=[('12345', '#2#1#111')])
|
|
|
# print(res)
|
|
|
|
|
|
+ app_type_list = [config_.APP_TYPE['LAO_HAO_KAN_VIDEO'], config_.APP_TYPE['ZUI_JING_QI']]
|
|
|
log_.info('flow pool predict start...')
|
|
|
for app_name, app_type in config_.APP_TYPE.items():
|
|
|
log_.info('{} predict start...'.format(app_name))
|
|
|
- predict(app_type=app_type)
|
|
|
+ if app_type in app_type_list:
|
|
|
+ predict_18_19(app_type=app_type)
|
|
|
+ else:
|
|
|
+ pass
|
|
|
+ predict(app_type=app_type)
|
|
|
log_.info('{} predict end...'.format(app_name))
|
|
|
log_.info('flow pool predict end...')
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
# 将日志上传到oss
|
|
|
# log_cmd = "ossutil cp -r -f {} oss://{}/{}".format(log_.logname, config_.BUCKET_NAME,
|
|
|
# config_.OSS_FOLDER_LOGS + 'flow_pool/')
|