|
@@ -19,6 +19,7 @@ redis_helper = RedisHelper()
|
|
|
|
|
|
REDIS_PREFIX = "item_rt_fea_1day_"
|
|
|
|
|
|
+
|
|
|
def process_and_store(row):
|
|
|
video_id, json_str = row
|
|
|
key = REDIS_PREFIX + str(video_id)
|
|
@@ -26,6 +27,7 @@ def process_and_store(row):
|
|
|
redis_helper.set_data_to_redis(key, json_str, expire_time)
|
|
|
# log_.info("video写入数据key={},value={}".format(key, json_str))
|
|
|
|
|
|
+
|
|
|
def check_data(project, table, partition) -> int:
|
|
|
"""检查数据是否准备好,输出数据条数"""
|
|
|
odps = ODPS(
|
|
@@ -51,10 +53,12 @@ def check_data(project, table, partition) -> int:
|
|
|
log_.info("表{}分区{}不存在".format(table, partition))
|
|
|
data_count = 0
|
|
|
except Exception as e:
|
|
|
- log_.error("table:{},partition:{} no data. return data_count=0:{}".format(table, partition, e))
|
|
|
+ log_.error("table:{},partition:{} no data. return data_count=0:{}".format(
|
|
|
+ table, partition, e))
|
|
|
data_count = 0
|
|
|
return data_count
|
|
|
|
|
|
+
|
|
|
def get_sql(date, previous_date_str, project):
|
|
|
sql = '''
|
|
|
SELECT videoid
|
|
@@ -72,6 +76,8 @@ def get_sql(date, previous_date_str, project):
|
|
|
,CONCAT_WS(',',COLLECT_LIST(CONCAT(dt,":",2day_share_pv))) AS share_pv_list_2day
|
|
|
,CONCAT_WS(',',COLLECT_LIST(CONCAT(dt,":",3day_share_uv))) AS share_uv_list_3day
|
|
|
,CONCAT_WS(',',COLLECT_LIST(CONCAT(dt,":",3day_share_pv))) AS share_pv_list_3day
|
|
|
+ ,CONCAT_WS(',',COLLECT_LIST(CONCAT(dt,":",1day_sharedepth_max_avg))) AS sharedepth_max_avg_list_1day
|
|
|
+ ,CONCAT_WS(',',COLLECT_LIST(CONCAT(dt,":",1day_sharewidth_max_avg))) AS sharewidth_max_avg_list_1day
|
|
|
FROM (
|
|
|
SELECT videoid
|
|
|
,dt
|
|
@@ -89,6 +95,8 @@ def get_sql(date, previous_date_str, project):
|
|
|
,SUM(lasttwodays_share_total) AS 2day_share_pv
|
|
|
,SUM(lastthreedays_share) AS 3day_share_uv
|
|
|
,SUM(lastthreedays_share_total) AS 3day_share_pv
|
|
|
+ ,SUM(sharedepth_max_avg) AS 1day_sharedepth_max_avg
|
|
|
+ ,SUM(sharewidth_max_avg) AS 1day_sharewidth_max_avg
|
|
|
FROM loghubods.video_data_each_hour_dataset_24h_total_apptype
|
|
|
WHERE dt <= '{}23'
|
|
|
AND dt >= '{}00'
|
|
@@ -144,6 +152,14 @@ def get_sql(date, previous_date_str, project):
|
|
|
m["p_return_uv_list_1day"] = record['p_return_uv_list_1day']
|
|
|
except Exception as e:
|
|
|
log_.error(e)
|
|
|
+ try:
|
|
|
+ m["sharedepth_max_avg_list_1day"] = record['sharedepth_max_avg_list_1day']
|
|
|
+ except Exception as e:
|
|
|
+ log_.error(e)
|
|
|
+ try:
|
|
|
+ m["sharewidth_max_avg_list_1day"] = record['sharewidth_max_avg_list_1day']
|
|
|
+ except Exception as e:
|
|
|
+ log_.error(e)
|
|
|
json_str = json.dumps(m)
|
|
|
video_list.append([video_id, json_str])
|
|
|
return video_list
|
|
@@ -169,15 +185,16 @@ def h_timer_check():
|
|
|
else:
|
|
|
log_.info("上游数据就绪,count={},开始读取数据表".format(table_data_cnt))
|
|
|
# 2 读取数据表 处理特征
|
|
|
- previous_date_str = (datetime.strptime(date, "%Y%m%d") - timedelta(days=1)).strftime("%Y%m%d")
|
|
|
+ previous_date_str = (datetime.strptime(
|
|
|
+ date, "%Y%m%d") - timedelta(days=1)).strftime("%Y%m%d")
|
|
|
video_list = get_sql(date, previous_date_str, project)
|
|
|
# 3 写入redis
|
|
|
log_.info("video的数据量:{}".format(len(video_list)))
|
|
|
- records_process_for_list(video_list, process_and_store, max_size=50, num_workers=8)
|
|
|
-
|
|
|
- redis_helper.set_data_to_redis(REDIS_PREFIX + "partition", partition, 24 * 3600)
|
|
|
-
|
|
|
+ records_process_for_list(
|
|
|
+ video_list, process_and_store, max_size=50, num_workers=8)
|
|
|
|
|
|
+ redis_helper.set_data_to_redis(
|
|
|
+ REDIS_PREFIX + "partition", partition, 24 * 3600)
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
@@ -186,7 +203,5 @@ if __name__ == '__main__':
|
|
|
log_.info("完成执行:" + datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
|
|
|
|
|
|
|
|
|
-
|
|
|
-
|
|
|
# cd /root/zhangbo/rov-offline
|
|
|
-# python alg_recsys_rank_item_realtime_1day.py 20240117 20
|
|
|
+# python alg_recsys_rank_item_realtime_1day.py 20240117 20
|