# -*- coding: utf-8 -*- import traceback from config import set_config from log import Log from utils import execute_sql_from_odps from db_helper import RedisHelper from datetime import datetime, timedelta from alg_recsys_recall_4h_region_trend import records_process_for_list config_, _ = set_config() log_ = Log() redis_helper = RedisHelper() REDIS_PREFIX = "alg_recsys_video_tags_" def get_video_tags(): """获取视频的tag""" PROJECT = "videoods" TABLE = "videoods.dim_video" try: sql = "SELECT videoid \ ,tags \ FROM {} \ LATERAL VIEW EXPLODE(SPLIT(tags,',')) exploded AS exploded_value \ WHERE tags IS NOT NULL \ AND ( \ exploded_value = '元旦' \ OR exploded_value = '腊八节' \ OR exploded_value = '小年' \ OR exploded_value = '除夕' \ OR exploded_value = '春节' \ OR exploded_value = '情人节' \ OR exploded_value = '元宵节' \ OR exploded_value = '龙抬头' \ OR exploded_value = '妇女节' \ OR exploded_value = '劳动节' \ OR exploded_value = '母亲节' \ OR exploded_value = '儿童节' \ OR exploded_value = '端午节' \ OR exploded_value = '父亲节' \ OR exploded_value = '建党节' \ OR exploded_value = '七七事变' \ OR exploded_value = '建军节' \ OR exploded_value = '七夕节' \ OR exploded_value = '中元节' \ OR exploded_value = '中秋节' \ OR exploded_value = '毛主席逝世' \ OR exploded_value = '国庆节' \ OR exploded_value = '重阳节' \ OR exploded_value = '感恩节' \ OR exploded_value = '公祭日' \ OR exploded_value = '平安夜' \ OR exploded_value = '圣诞节' \ OR exploded_value = '毛主席诞辰' \ OR exploded_value = '小寒' \ OR exploded_value = '大寒' \ OR exploded_value = '立春' \ OR exploded_value = '雨水' \ OR exploded_value = '惊蛰' \ OR exploded_value = '春分' \ OR exploded_value = '清明' \ OR exploded_value = '谷雨' \ OR exploded_value = '立夏' \ OR exploded_value = '小满' \ OR exploded_value = '芒种' \ OR exploded_value = '夏至' \ OR exploded_value = '小暑' \ OR exploded_value = '大暑' \ OR exploded_value = '立秋' \ OR exploded_value = '处暑' \ OR exploded_value = '白露' \ OR exploded_value = '秋分' \ OR exploded_value = '寒露' \ OR exploded_value = '霜降' \ OR exploded_value = '立冬' \ OR exploded_value = '小雪' \ OR exploded_value = '大雪' \ OR exploded_value = '冬至' \ OR exploded_value = '早上好' \ OR exploded_value = '中午好' \ OR exploded_value = '下午好' \ OR exploded_value = '晚上好' \ OR exploded_value = '晚安' \ OR exploded_value = '祝福' \ OR exploded_value = 'P1高风险' \ OR exploded_value = 'P0高风险' \ )".format(TABLE) print("sql:"+sql) records = execute_sql_from_odps(project=PROJECT, sql=sql) video_tags_list = [] with records.open_reader() as reader: for record in reader: video_id = int(record['videoid']) tags = ",".join([i for i in str(record['tags']).split(",") if i in TAG_SET]) d = {} d["video_id"] = video_id d["tags"] = tags video_tags_list.append(d) # log_.info("{}:{}".format(video_id, tags)) log_.info("天级表:{}".format(str(len(video_tags_list)))) return video_tags_list except Exception as e: log_.error(str(e) + str(traceback.format_exc())) return [] def get_video_tags_v2(): PROJECT = "loghubods" TABLE = "loghubods.automated_updates_category_labels_1" # now_date = datetime.today() # date = datetime.strftime(now_date, '%Y%m%d') # previous_date = now_date - timedelta(days=1) # previous_date_str = datetime.strftime(previous_date, '%Y%m%d') # 获取当前日期 now_date = datetime.today() # 获取当前月份 current_month = now_date.strftime('%Y%m') # 获取上个月份 previous_month = (now_date - timedelta(days=now_date.day)).strftime('%Y%m') try: sql = '''SELECT videoid ,secondary_labels FROM loghubods.automated_updates_category_labels_1 WHERE ( dt LIKE '{}%' OR dt LIKE '{}%' ) '''.format(current_month, previous_month) print("sql:" + sql) records = execute_sql_from_odps(project=PROJECT, sql=sql) video_tags_list = [] with records.open_reader() as reader: for record in reader: video_id = int(record['videoid']) tags = ",".join([i for i in str(record['secondary_labels']).split(",") if i in TAG_SET]) d = {} d["video_id"] = video_id d["tags"] = tags video_tags_list.append(d) # log_.info("{}:{}".format(video_id, tags)) log_.info("增量表:{}".format(str(len(video_tags_list)))) return video_tags_list except Exception as e: log_.error(str(e) + str(traceback.format_exc())) return [] def process_and_store(row): video_id = row["video_id"] tags = row["tags"] key = REDIS_PREFIX + str(video_id) expire_time = 24 * 3600 * 2 redis_helper.set_data_to_redis(key, tags, expire_time) # log_.info("video-tags写入数据key={},value={}".format(key, tags)) def main(): log_.info("开始执行:" + datetime.now().strftime("%Y-%m-%d %H:%M:%S")) video_tags_list = get_video_tags() video_tags_list2 = get_video_tags_v2() m1 = {} m2 = {} for row in video_tags_list: m1[row["video_id"]] = row["tags"] for row in video_tags_list2: m2[row["video_id"]] = row["tags"] merged_map = {} merged_map.update(m1) for key, value in m2.items(): if key in merged_map: l1 = merged_map[key].split(",") l2 = value.split(",") l1.extend(l2) l3 = list(set(l1)) merged_map[key] = ",".join(l3) else: merged_map[key] = value result = [] for key, value in merged_map.items(): tmp = {} tmp["video_id"] = key tmp["tags"] = value result.append(tmp) log_.info("{}:{}".format(key, value)) log_.info("video的数据量:{}".format(len(result))) records_process_for_list(result, process_and_store, max_size=50, num_workers=8) log_.info("video的数据量:{}".format(len(result))) log_.info("完成执行:" + datetime.now().strftime("%Y-%m-%d %H:%M:%S")) if __name__ == '__main__': main() # cd /root/zhangbo/rov-offline # python alg_recsys_recall_shield_videos.py