|
@@ -9,11 +9,59 @@ from threading import Timer
|
|
from log import Log
|
|
from log import Log
|
|
from db_helper import RedisHelper
|
|
from db_helper import RedisHelper
|
|
from config import set_config
|
|
from config import set_config
|
|
|
|
+from odps import ODPS
|
|
|
|
|
|
log_ = Log()
|
|
log_ = Log()
|
|
config_, env = set_config()
|
|
config_, env = set_config()
|
|
|
|
|
|
|
|
|
|
|
|
+def op_data_check(project, table, hour):
|
|
|
|
+ """检查数据是否准备好"""
|
|
|
|
+ odps = ODPS(
|
|
|
|
+ access_id=config_.ODPS_CONFIG['ACCESSID'],
|
|
|
|
+ secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
|
|
|
|
+ project=project,
|
|
|
|
+ endpoint=config_.ODPS_CONFIG['ENDPOINT'],
|
|
|
|
+ connect_timeout=3000,
|
|
|
|
+ read_timeout=500000,
|
|
|
|
+ pool_maxsize=1000,
|
|
|
|
+ pool_connections=1000
|
|
|
|
+ )
|
|
|
|
+
|
|
|
|
+ try:
|
|
|
|
+ sql = f'select * from {project}.{table} where hour = {hour}'
|
|
|
|
+ with odps.execute_sql(sql=sql).open_reader() as reader:
|
|
|
|
+ data_count = reader.count
|
|
|
|
+ except Exception as e:
|
|
|
|
+ data_count = 0
|
|
|
|
+ return data_count
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+def get_op_data(now_date, project, table):
|
|
|
|
+ """
|
|
|
|
+ 获取运营提供的数据
|
|
|
|
+ """
|
|
|
|
+ hour = dt.strftime(now_date, '%Y%m%d%H')
|
|
|
|
+ odps = ODPS(
|
|
|
|
+ access_id=config_.ODPS_CONFIG['ACCESSID'],
|
|
|
|
+ secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
|
|
|
|
+ project=project,
|
|
|
|
+ endpoint=config_.ODPS_CONFIG['ENDPOINT'],
|
|
|
|
+ connect_timeout=3000,
|
|
|
|
+ read_timeout=500000,
|
|
|
|
+ pool_maxsize=1000,
|
|
|
|
+ pool_connections=1000
|
|
|
|
+ )
|
|
|
|
+ records = odps.read_table(name=table, partition='hour=%s' % hour)
|
|
|
|
+ op_data = []
|
|
|
|
+ for record in records:
|
|
|
|
+ item = {'videoid': record['videoid'], 'rank': record['rank']}
|
|
|
|
+ # print(item)
|
|
|
|
+ op_data.append(item)
|
|
|
|
+ # print(op_data)
|
|
|
|
+ return op_data
|
|
|
|
+
|
|
|
|
+
|
|
def app_rank_op(now_date, now_h):
|
|
def app_rank_op(now_date, now_h):
|
|
"""
|
|
"""
|
|
票圈视频App推荐列表小时级更新,小时级数据由运营提供
|
|
票圈视频App推荐列表小时级更新,小时级数据由运营提供
|
|
@@ -26,9 +74,12 @@ def app_rank_op(now_date, now_h):
|
|
initial_data = redis_helper.get_data_zset_with_index(key_name=key_name, start=0, end=-1, with_scores=True)
|
|
initial_data = redis_helper.get_data_zset_with_index(key_name=key_name, start=0, end=-1, with_scores=True)
|
|
|
|
|
|
# 获取当前小时op更新的数据
|
|
# 获取当前小时op更新的数据
|
|
- op_key_name = f"{config_.APP_OP_VIDEOS_KEY_NAME_PREFIX}{dt.strftime(now_date, '%Y%m%d')}.{now_h}"
|
|
|
|
- op_data = redis_helper.get_data_zset_with_index(key_name=op_key_name, start=0, end=-1, with_scores=True)
|
|
|
|
- op_video_ids = [item[0] for item in op_data]
|
|
|
|
|
|
+ # op_key_name = f"{config_.APP_OP_VIDEOS_KEY_NAME_PREFIX}{dt.strftime(now_date, '%Y%m%d')}.{now_h}"
|
|
|
|
+ # op_data = redis_helper.get_data_zset_with_index(key_name=op_key_name, start=0, end=-1, with_scores=True)
|
|
|
|
+ op_data = get_op_data(now_date=now_date, project=config_.APP_OP_PROJECT, table=config_.APP_OP_TABLE)
|
|
|
|
+ # 倒序排序
|
|
|
|
+ op_data.sort(key=lambda x: x['rank'], reverse=True)
|
|
|
|
+ op_video_ids = [int(item['videoid']) for item in op_data]
|
|
|
|
|
|
# 对op更新数据给定score
|
|
# 对op更新数据给定score
|
|
final_data = dict()
|
|
final_data = dict()
|
|
@@ -38,15 +89,15 @@ def app_rank_op(now_date, now_h):
|
|
|
|
|
|
# 合并结果
|
|
# 合并结果
|
|
for video_id, score in initial_data:
|
|
for video_id, score in initial_data:
|
|
- if video_id not in op_video_ids:
|
|
|
|
- final_data[video_id] = score
|
|
|
|
|
|
+ if int(video_id) not in op_video_ids:
|
|
|
|
+ final_data[int(video_id)] = score
|
|
# print(video_id, score)
|
|
# print(video_id, score)
|
|
# print(op_data)
|
|
# print(op_data)
|
|
# print(final_data)
|
|
# print(final_data)
|
|
|
|
|
|
# 存入对应的redis
|
|
# 存入对应的redis
|
|
final_key_name = f"{config_.APP_FINAL_RECALL_KEY_NAME_PREFIX}{dt.strftime(now_date, '%Y%m%d')}.{now_h}"
|
|
final_key_name = f"{config_.APP_FINAL_RECALL_KEY_NAME_PREFIX}{dt.strftime(now_date, '%Y%m%d')}.{now_h}"
|
|
- redis_helper.add_data_with_zset(key_name=final_key_name, data=final_data, expire_time=24*3600)
|
|
|
|
|
|
+ redis_helper.add_data_with_zset(key_name=final_key_name, data=final_data, expire_time=24 * 3600)
|
|
|
|
|
|
|
|
|
|
def get_redis_key_date(now_date):
|
|
def get_redis_key_date(now_date):
|
|
@@ -76,12 +127,14 @@ def app_rank_bottom(now_date, now_h):
|
|
|
|
|
|
def app_timer_check():
|
|
def app_timer_check():
|
|
now_date = dt.today()
|
|
now_date = dt.today()
|
|
|
|
+ print(dt.strftime(now_date, '%Y%m%d%H'))
|
|
now_h = dt.now().hour
|
|
now_h = dt.now().hour
|
|
now_min = dt.now().minute
|
|
now_min = dt.now().minute
|
|
# 查看当前小时op更新的数据是否已准备好
|
|
# 查看当前小时op更新的数据是否已准备好
|
|
- op_key_name = f"{config_.APP_OP_VIDEOS_KEY_NAME_PREFIX}{dt.strftime(now_date, '%Y%m%d')}.{now_h}"
|
|
|
|
- redis_helper = RedisHelper()
|
|
|
|
- if redis_helper.key_exists(op_key_name):
|
|
|
|
|
|
+ # op_key_name = f"{config_.APP_OP_VIDEOS_KEY_NAME_PREFIX}{dt.strftime(now_date, '%Y%m%d')}.{now_h}"
|
|
|
|
+ op_data_count = op_data_check(project=config_.APP_OP_PROJECT, table=config_.APP_OP_TABLE,
|
|
|
|
+ hour=dt.strftime(now_date, '%Y%m%d%H'))
|
|
|
|
+ if op_data_count > 0:
|
|
# 数据准备好,进行更新
|
|
# 数据准备好,进行更新
|
|
app_rank_op(now_date=now_date, now_h=now_h)
|
|
app_rank_op(now_date=now_date, now_h=now_h)
|
|
elif now_min > 50:
|
|
elif now_min > 50:
|
|
@@ -93,7 +146,10 @@ def app_timer_check():
|
|
|
|
|
|
if __name__ == '__main__':
|
|
if __name__ == '__main__':
|
|
# now_date = dt.today()
|
|
# now_date = dt.today()
|
|
- # app_rank_op()
|
|
|
|
|
|
+ # print(dt.strftime(now_date, '%Y%m%d%H'))
|
|
|
|
+ # get_op_data(now_date=now_date)
|
|
|
|
+ # now_h = dt.now().hour
|
|
|
|
+ # app_rank_op(now_date=now_date, now_h=now_h)
|
|
# key_name = get_redis_key_date(now_date=now_date)
|
|
# key_name = get_redis_key_date(now_date=now_date)
|
|
# print(key_name)
|
|
# print(key_name)
|
|
app_timer_check()
|
|
app_timer_check()
|