|  | @@ -3,14 +3,14 @@ import traceback
 | 
	
		
			
				|  |  |  import datetime
 | 
	
		
			
				|  |  |  from odps import ODPS
 | 
	
		
			
				|  |  |  from threading import Timer
 | 
	
		
			
				|  |  | -from utils import RedisHelper, get_data_from_odps, send_msg_to_feishu
 | 
	
		
			
				|  |  | -from config import set_config
 | 
	
		
			
				|  |  | +from my_utils import RedisHelper, get_data_from_odps, send_msg_to_feishu
 | 
	
		
			
				|  |  | +from my_config import set_config
 | 
	
		
			
				|  |  |  from log import Log
 | 
	
		
			
				|  |  |  from alg_recsys_recall_4h_region_trend import records_process_for_list
 | 
	
		
			
				|  |  |  import json
 | 
	
		
			
				|  |  |  from datetime import datetime, timedelta
 | 
	
		
			
				|  |  |  import sys
 | 
	
		
			
				|  |  | -from utils import execute_sql_from_odps
 | 
	
		
			
				|  |  | +from my_utils import execute_sql_from_odps
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  config_, _ = set_config()
 | 
	
	
		
			
				|  | @@ -19,6 +19,7 @@ redis_helper = RedisHelper()
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  REDIS_PREFIX = "item_rt_fea_1day_"
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |  def process_and_store(row):
 | 
	
		
			
				|  |  |      video_id, json_str = row
 | 
	
		
			
				|  |  |      key = REDIS_PREFIX + str(video_id)
 | 
	
	
		
			
				|  | @@ -26,6 +27,7 @@ def process_and_store(row):
 | 
	
		
			
				|  |  |      redis_helper.set_data_to_redis(key, json_str, expire_time)
 | 
	
		
			
				|  |  |      # log_.info("video写入数据key={},value={}".format(key, json_str))
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |  def check_data(project, table, partition) -> int:
 | 
	
		
			
				|  |  |      """检查数据是否准备好,输出数据条数"""
 | 
	
		
			
				|  |  |      odps = ODPS(
 | 
	
	
		
			
				|  | @@ -51,10 +53,12 @@ def check_data(project, table, partition) -> int:
 | 
	
		
			
				|  |  |              log_.info("表{}分区{}不存在".format(table, partition))
 | 
	
		
			
				|  |  |              data_count = 0
 | 
	
		
			
				|  |  |      except Exception as e:
 | 
	
		
			
				|  |  | -        log_.error("table:{},partition:{} no data. return data_count=0:{}".format(table, partition, e))
 | 
	
		
			
				|  |  | +        log_.error("table:{},partition:{} no data. return data_count=0:{}".format(
 | 
	
		
			
				|  |  | +            table, partition, e))
 | 
	
		
			
				|  |  |          data_count = 0
 | 
	
		
			
				|  |  |      return data_count
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |  def get_sql(date, previous_date_str, project):
 | 
	
		
			
				|  |  |      sql = '''
 | 
	
		
			
				|  |  |      SELECT  videoid
 | 
	
	
		
			
				|  | @@ -72,6 +76,8 @@ def get_sql(date, previous_date_str, project):
 | 
	
		
			
				|  |  |              ,CONCAT_WS(',',COLLECT_LIST(CONCAT(dt,":",2day_share_pv))) AS share_pv_list_2day
 | 
	
		
			
				|  |  |              ,CONCAT_WS(',',COLLECT_LIST(CONCAT(dt,":",3day_share_uv))) AS share_uv_list_3day
 | 
	
		
			
				|  |  |              ,CONCAT_WS(',',COLLECT_LIST(CONCAT(dt,":",3day_share_pv))) AS share_pv_list_3day
 | 
	
		
			
				|  |  | +            ,CONCAT_WS(',',COLLECT_LIST(CONCAT(dt,":",1day_sharedepth_max_avg))) AS sharedepth_max_avg_list_1day
 | 
	
		
			
				|  |  | +            ,CONCAT_WS(',',COLLECT_LIST(CONCAT(dt,":",1day_sharewidth_max_avg))) AS sharewidth_max_avg_list_1day
 | 
	
		
			
				|  |  |      FROM    (
 | 
	
		
			
				|  |  |                  SELECT  videoid
 | 
	
		
			
				|  |  |                          ,dt
 | 
	
	
		
			
				|  | @@ -89,6 +95,8 @@ def get_sql(date, previous_date_str, project):
 | 
	
		
			
				|  |  |                          ,SUM(lasttwodays_share_total) AS 2day_share_pv
 | 
	
		
			
				|  |  |                          ,SUM(lastthreedays_share) AS 3day_share_uv
 | 
	
		
			
				|  |  |                          ,SUM(lastthreedays_share_total) AS 3day_share_pv
 | 
	
		
			
				|  |  | +                        ,SUM(sharedepth_max_avg) AS 1day_sharedepth_max_avg
 | 
	
		
			
				|  |  | +                        ,SUM(sharewidth_max_avg) AS 1day_sharewidth_max_avg
 | 
	
		
			
				|  |  |                  FROM    loghubods.video_data_each_hour_dataset_24h_total_apptype
 | 
	
		
			
				|  |  |                  WHERE   dt <= '{}23'
 | 
	
		
			
				|  |  |                  AND     dt >= '{}00'
 | 
	
	
		
			
				|  | @@ -144,6 +152,14 @@ def get_sql(date, previous_date_str, project):
 | 
	
		
			
				|  |  |                  m["p_return_uv_list_1day"] = record['p_return_uv_list_1day']
 | 
	
		
			
				|  |  |              except Exception as e:
 | 
	
		
			
				|  |  |                  log_.error(e)
 | 
	
		
			
				|  |  | +            try:
 | 
	
		
			
				|  |  | +                m["sharedepth_max_avg_list_1day"] = record['sharedepth_max_avg_list_1day']
 | 
	
		
			
				|  |  | +            except Exception as e:
 | 
	
		
			
				|  |  | +                log_.error(e)
 | 
	
		
			
				|  |  | +            try:
 | 
	
		
			
				|  |  | +                m["sharewidth_max_avg_list_1day"] = record['sharewidth_max_avg_list_1day']
 | 
	
		
			
				|  |  | +            except Exception as e:
 | 
	
		
			
				|  |  | +                log_.error(e)
 | 
	
		
			
				|  |  |              json_str = json.dumps(m)
 | 
	
		
			
				|  |  |              video_list.append([video_id, json_str])
 | 
	
		
			
				|  |  |      return video_list
 | 
	
	
		
			
				|  | @@ -169,15 +185,16 @@ def h_timer_check():
 | 
	
		
			
				|  |  |      else:
 | 
	
		
			
				|  |  |          log_.info("上游数据就绪,count={},开始读取数据表".format(table_data_cnt))
 | 
	
		
			
				|  |  |          # 2 读取数据表 处理特征
 | 
	
		
			
				|  |  | -        previous_date_str = (datetime.strptime(date, "%Y%m%d") - timedelta(days=1)).strftime("%Y%m%d")
 | 
	
		
			
				|  |  | +        previous_date_str = (datetime.strptime(
 | 
	
		
			
				|  |  | +            date, "%Y%m%d") - timedelta(days=1)).strftime("%Y%m%d")
 | 
	
		
			
				|  |  |          video_list = get_sql(date, previous_date_str, project)
 | 
	
		
			
				|  |  |          # 3 写入redis
 | 
	
		
			
				|  |  |          log_.info("video的数据量:{}".format(len(video_list)))
 | 
	
		
			
				|  |  | -        records_process_for_list(video_list, process_and_store, max_size=50, num_workers=8)
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -        redis_helper.set_data_to_redis(REDIS_PREFIX + "partition", partition, 24 * 3600)
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | +        records_process_for_list(
 | 
	
		
			
				|  |  | +            video_list, process_and_store, max_size=50, num_workers=8)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +        redis_helper.set_data_to_redis(
 | 
	
		
			
				|  |  | +            REDIS_PREFIX + "partition", partition, 24 * 3600)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  if __name__ == '__main__':
 | 
	
	
		
			
				|  | @@ -186,7 +203,5 @@ if __name__ == '__main__':
 | 
	
		
			
				|  |  |      log_.info("完成执行:" + datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  |  # cd /root/zhangbo/rov-offline
 | 
	
		
			
				|  |  | -# python alg_recsys_rank_item_realtime_1day.py 20240117 20
 | 
	
		
			
				|  |  | +# python alg_recsys_rank_item_realtime_1day.py 20240117 20
 |