# -*- coding: utf-8 -*- import traceback from my_config import set_config from log import Log from my_utils import execute_sql_from_odps from db_helper import RedisHelper from datetime import datetime, timedelta from alg_recsys_recall_4h_region_trend import records_process_for_list config_, _ = set_config() log_ = Log() redis_helper = RedisHelper() def main(): log_.info("开始执行:" + datetime.now().strftime("%Y-%m-%d %H:%M:%S")) #1 读取当前时间及上一时间 now_date = datetime.today() # - timedelta(hours=1) hour = datetime.strftime(now_date, '%H').lstrip('0') date = datetime.strftime(now_date, '%Y%m%d') if hour == "": hour = "0" previous_date = now_date + timedelta(hours=1) hour_next = datetime.strftime(previous_date, '%H').lstrip('0') date_next = datetime.strftime(previous_date, '%Y%m%d') if hour_next == "": hour_next = "0" #2 循环拼接key,读一个写一个。 REGION_CODE = { '北京': '110000', '天津': '120000', '河北省': '130000', '山西省': '140000', '内蒙古': '150000', '辽宁省': '210000', '吉林省': '220000', '黑龙江省': '230000', '上海': '310000', '江苏省': '320000', '浙江省': '330000', '安徽省': '340000', '福建省': '350000', '江西省': '360000', '山东省': '370000', '河南省': '410000', '湖北省': '420000', '湖南省': '430000', '广东省': '440000', '广西': '450000', '海南省': '460000', '重庆': '500000', '四川省': '510000', '贵州省': '520000', '云南省': '530000', '西藏': '540000', '陕西省': '610000', '甘肃省': '620000', '青海省': '630000', '宁夏': '640000', '新疆': '650000', '台湾省': '710000', '香港': '810000', '澳门': '820000', '广州': '440100', '深圳': '440300', '成都': '510100', '长沙': '430100', } keys = [] for _, code in REGION_CODE.items(): read_key = "recall:item:score:region:h:{}:data66:rule66:{}:{}".format(code, date, hour) keys.append(read_key) values = redis_helper.get_data_zset_with_index(read_key, 0, -1, True, True) print(str(read_key)) print(str(values)) if values and len(values) > 0: new_kvs = {} for k, v in values: new_kvs[k] = v write_key = "recall:item:score:region:h:{}:data66:rule66:{}:{}".format(code, date_next, hour_next) redis_helper.add_data_with_zset(write_key, new_kvs, expire_time = 1*24*3600) print(str(write_key)) # records_process_for_list(result, process_and_store, max_size=50, num_workers=8) log_.info("完成执行:" + datetime.now().strftime("%Y-%m-%d %H:%M:%S")) if __name__ == '__main__': main() # cd /root/zhangbo/rov-offline # python alg_recsys_recall_undertake.py