# -*- coding: utf-8 -*- import traceback from config import set_config from log import Log from utils import execute_sql_from_odps from db_helper import RedisHelper from datetime import datetime, timedelta from alg_recsys_recall_4h_region_trend import records_process_for_list config_, _ = set_config() log_ = Log() redis_helper = RedisHelper() REDIS_PREFIX = "alg_recsys_video_tags_" def get_video_tags_v2(): PROJECT = "loghubods" TABLE = "loghubods.automated_updates_category_labels_1" # now_date = datetime.today() # date = datetime.strftime(now_date, '%Y%m%d') # previous_date = now_date - timedelta(days=1) # previous_date_str = datetime.strftime(previous_date, '%Y%m%d') # 获取当前日期 now_date = datetime.today() # 获取当前月份 current_month = now_date.strftime('%Y%m') # 获取上个月份 previous_month = (now_date - timedelta(days=now_date.day)).strftime('%Y%m') try: sql = '''SELECT videoid ,secondary_labels FROM loghubods.automated_updates_category_labels_1 WHERE ( dt LIKE '{}%' OR dt LIKE '{}%' ) '''.format(current_month, previous_month) print("sql:" + sql) records = execute_sql_from_odps(project=PROJECT, sql=sql) video_tags_list = [] with records.open_reader() as reader: for record in reader: video_id = int(record['videoid']) tags = ",".join([i for i in str(record['secondary_labels']).split(",") if i in TAG_SET]) d = {} d["video_id"] = video_id d["tags"] = tags video_tags_list.append(d) # log_.info("{}:{}".format(video_id, tags)) log_.info("增量表:{}".format(str(len(video_tags_list)))) return video_tags_list except Exception as e: log_.error(str(e) + str(traceback.format_exc())) return [] def process_and_store(row): video_id = row["video_id"] tags = row["tags"] key = REDIS_PREFIX + str(video_id) expire_time = 24 * 3600 * 2 redis_helper.set_data_to_redis(key, tags, expire_time) # log_.info("video-tags写入数据key={},value={}".format(key, tags)) def main(): log_.info("开始执行:" + datetime.now().strftime("%Y-%m-%d %H:%M:%S")) #1 读取当前时间及上一时间 now_date = datetime.today() hour = datetime.strftime(now_date, '%H').lstrip('0') date = datetime.strftime(now_date, '%Y%m%d') if hour == "": hour = "0" previous_date = now_date + timedelta(hours=1) hour_next = datetime.strftime(previous_date, '%H').lstrip('0') date_next = datetime.strftime(previous_date, '%Y%m%d') if hour_next == "": hour_next = "0" #2 循环拼接key,读一个写一个。 REGION_CODE = { '北京': '110000', '天津': '120000', '河北省': '130000', '山西省': '140000', '内蒙古': '150000', '辽宁省': '210000', '吉林省': '220000', '黑龙江省': '230000', '上海': '310000', '江苏省': '320000', '浙江省': '330000', '安徽省': '340000', '福建省': '350000', '江西省': '360000', '山东省': '370000', '河南省': '410000', '湖北省': '420000', '湖南省': '430000', '广东省': '440000', '广西': '450000', '海南省': '460000', '重庆': '500000', '四川省': '510000', '贵州省': '520000', '云南省': '530000', '西藏': '540000', '陕西省': '610000', '甘肃省': '620000', '青海省': '630000', '宁夏': '640000', '新疆': '650000', '台湾省': '710000', '香港': '810000', '澳门': '820000', '广州': '440100', '深圳': '440300', '成都': '510100', '长沙': '430100', } for _, code in REGION_CODE.items(): read_key = "recall:item:score:region:h:{}:data66:rule66:{}:{}".format(code, date, hour) value = redis_helper.get_data_from_set(read_key) print(read_key) print(value) write_key = "recall:item:score:region:h:{}:data66:rule66:{}:{}".format(code, date_next, hour_next) print(write_key) # records_process_for_list(result, process_and_store, max_size=50, num_workers=8) log_.info("完成执行:" + datetime.now().strftime("%Y-%m-%d %H:%M:%S")) if __name__ == '__main__': main() # cd /root/zhangbo/rov-offline # python alg_recsys_recall_shield_videos.py