|  | @@ -0,0 +1,291 @@
 | 
	
		
			
				|  |  | +# -*- coding: utf-8 -*-
 | 
	
		
			
				|  |  | +import traceback
 | 
	
		
			
				|  |  | +from config import set_config
 | 
	
		
			
				|  |  | +from log import Log
 | 
	
		
			
				|  |  | +from utils import execute_sql_from_odps
 | 
	
		
			
				|  |  | +from db_helper import RedisHelper
 | 
	
		
			
				|  |  | +from datetime import datetime, timedelta
 | 
	
		
			
				|  |  | +from odps import ODPS
 | 
	
		
			
				|  |  | +from threading import Timer
 | 
	
		
			
				|  |  | +from alg_recsys_recall_4h_region_trend import records_process_for_list
 | 
	
		
			
				|  |  | +config_, _ = set_config()
 | 
	
		
			
				|  |  | +log_ = Log()
 | 
	
		
			
				|  |  | +redis_helper = RedisHelper()
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +REDIS_PREFIX = "alg_recsys_video_tags_"
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +TAG_SET = set(['元旦','腊八节','小年','小年','除夕','春节','情人节','元宵节','龙抬头','妇女节','劳动节','母亲节',
 | 
	
		
			
				|  |  | +    '儿童节','端午节','父亲节','建党节','七七事变','建军节','七夕节','中元节','中秋节','毛主席逝世',
 | 
	
		
			
				|  |  | +    '国庆节','重阳节','感恩节','公祭日','平安夜','圣诞节','毛主席诞辰','小寒','大寒','立春','雨水',
 | 
	
		
			
				|  |  | +    '惊蛰','春分','清明','谷雨','立夏','小满','芒种','夏至','小暑','大暑','立秋','处暑','白露','秋分',
 | 
	
		
			
				|  |  | +    '寒露','霜降','立冬','小雪','大雪','冬至','早上好','中午好','下午好','晚上好','晚安','祝福',
 | 
	
		
			
				|  |  | +    'P1高风险','P0高风险',
 | 
	
		
			
				|  |  | +"早上好","中午好","下午好","晚上好","晚安",
 | 
	
		
			
				|  |  | +        "祝福",
 | 
	
		
			
				|  |  | +"元旦","腊八节","小年","除夕","春节","情人节","元宵节","龙抬头","妇女节","劳动节","母亲节","儿童节","端午节","父亲节",
 | 
	
		
			
				|  |  | +        "建党节","七七事变","建军节","七夕节","中元节","中秋节","毛主席逝世","国庆节","重阳节","感恩节","公祭日","平安夜","圣诞节"
 | 
	
		
			
				|  |  | +        ,"毛主席诞辰",
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        "小寒","大寒","立春","雨水","惊蛰","春分","清明","谷雨","立夏","小满","芒种","夏至","小暑","大暑","立秋","处暑","白露",
 | 
	
		
			
				|  |  | +        "秋分","寒露","霜降","立冬","小雪","大雪","冬至",
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        "孙中山诞辰","孙中山逝世","周恩来诞辰","周恩来逝世","邓小平诞辰","邓小平逝世","李克强诞辰","李克强逝世","袁隆平诞辰",
 | 
	
		
			
				|  |  | +        "袁隆平逝世","彭德怀诞辰","彭德怀逝世","朱德诞辰","朱德逝世","吴尊友逝世",
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +               "初一","初二","初三","初四","初五"
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +])
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +def get_video_tags():
 | 
	
		
			
				|  |  | +    """获取视频的tag"""
 | 
	
		
			
				|  |  | +    PROJECT = "videoods"
 | 
	
		
			
				|  |  | +    TABLE = "videoods.dim_video"
 | 
	
		
			
				|  |  | +    try:
 | 
	
		
			
				|  |  | +        sql = "SELECT  videoid \
 | 
	
		
			
				|  |  | +,tags \
 | 
	
		
			
				|  |  | +FROM    {} \
 | 
	
		
			
				|  |  | +LATERAL VIEW EXPLODE(SPLIT(tags,',')) exploded AS exploded_value \
 | 
	
		
			
				|  |  | +WHERE   tags IS NOT NULL \
 | 
	
		
			
				|  |  | +AND     ( \
 | 
	
		
			
				|  |  | +exploded_value = '元旦' \
 | 
	
		
			
				|  |  | +OR      exploded_value = '腊八节' \
 | 
	
		
			
				|  |  | +OR      exploded_value = '小年' \
 | 
	
		
			
				|  |  | +OR      exploded_value = '除夕' \
 | 
	
		
			
				|  |  | +OR      exploded_value = '春节' \
 | 
	
		
			
				|  |  | +OR      exploded_value = '情人节' \
 | 
	
		
			
				|  |  | +OR      exploded_value = '元宵节' \
 | 
	
		
			
				|  |  | +OR      exploded_value = '龙抬头' \
 | 
	
		
			
				|  |  | +OR      exploded_value = '妇女节' \
 | 
	
		
			
				|  |  | +OR      exploded_value = '劳动节' \
 | 
	
		
			
				|  |  | +OR      exploded_value = '母亲节' \
 | 
	
		
			
				|  |  | +OR      exploded_value = '儿童节' \
 | 
	
		
			
				|  |  | +OR      exploded_value = '端午节' \
 | 
	
		
			
				|  |  | +OR      exploded_value = '父亲节' \
 | 
	
		
			
				|  |  | +OR      exploded_value = '建党节' \
 | 
	
		
			
				|  |  | +OR      exploded_value = '七七事变' \
 | 
	
		
			
				|  |  | +OR      exploded_value = '建军节' \
 | 
	
		
			
				|  |  | +OR      exploded_value = '七夕节' \
 | 
	
		
			
				|  |  | +OR      exploded_value = '中元节' \
 | 
	
		
			
				|  |  | +OR      exploded_value = '中秋节' \
 | 
	
		
			
				|  |  | +OR      exploded_value = '毛主席逝世' \
 | 
	
		
			
				|  |  | +OR      exploded_value = '国庆节' \
 | 
	
		
			
				|  |  | +OR      exploded_value = '重阳节' \
 | 
	
		
			
				|  |  | +OR      exploded_value = '感恩节' \
 | 
	
		
			
				|  |  | +OR      exploded_value = '公祭日' \
 | 
	
		
			
				|  |  | +OR      exploded_value = '平安夜' \
 | 
	
		
			
				|  |  | +OR      exploded_value = '圣诞节' \
 | 
	
		
			
				|  |  | +OR      exploded_value = '毛主席诞辰' \
 | 
	
		
			
				|  |  | +OR      exploded_value = '小寒' \
 | 
	
		
			
				|  |  | +OR      exploded_value = '大寒' \
 | 
	
		
			
				|  |  | +OR      exploded_value = '立春' \
 | 
	
		
			
				|  |  | +OR      exploded_value = '雨水' \
 | 
	
		
			
				|  |  | +OR      exploded_value = '惊蛰' \
 | 
	
		
			
				|  |  | +OR      exploded_value = '春分' \
 | 
	
		
			
				|  |  | +OR      exploded_value = '清明' \
 | 
	
		
			
				|  |  | +OR      exploded_value = '谷雨' \
 | 
	
		
			
				|  |  | +OR      exploded_value = '立夏' \
 | 
	
		
			
				|  |  | +OR      exploded_value = '小满' \
 | 
	
		
			
				|  |  | +OR      exploded_value = '芒种' \
 | 
	
		
			
				|  |  | +OR      exploded_value = '夏至' \
 | 
	
		
			
				|  |  | +OR      exploded_value = '小暑' \
 | 
	
		
			
				|  |  | +OR      exploded_value = '大暑' \
 | 
	
		
			
				|  |  | +OR      exploded_value = '立秋' \
 | 
	
		
			
				|  |  | +OR      exploded_value = '处暑' \
 | 
	
		
			
				|  |  | +OR      exploded_value = '白露' \
 | 
	
		
			
				|  |  | +OR      exploded_value = '秋分' \
 | 
	
		
			
				|  |  | +OR      exploded_value = '寒露' \
 | 
	
		
			
				|  |  | +OR      exploded_value = '霜降' \
 | 
	
		
			
				|  |  | +OR      exploded_value = '立冬' \
 | 
	
		
			
				|  |  | +OR      exploded_value = '小雪' \
 | 
	
		
			
				|  |  | +OR      exploded_value = '大雪' \
 | 
	
		
			
				|  |  | +OR      exploded_value = '冬至' \
 | 
	
		
			
				|  |  | +OR      exploded_value = '早上好' \
 | 
	
		
			
				|  |  | +OR      exploded_value = '中午好' \
 | 
	
		
			
				|  |  | +OR      exploded_value = '下午好' \
 | 
	
		
			
				|  |  | +OR      exploded_value = '晚上好' \
 | 
	
		
			
				|  |  | +OR      exploded_value = '晚安' \
 | 
	
		
			
				|  |  | +OR      exploded_value = '祝福' \
 | 
	
		
			
				|  |  | +OR      exploded_value = 'P1高风险' \
 | 
	
		
			
				|  |  | +OR      exploded_value = 'P0高风险' \
 | 
	
		
			
				|  |  | +OR      exploded_value = '初一' \
 | 
	
		
			
				|  |  | +OR      exploded_value = '初二' \
 | 
	
		
			
				|  |  | +OR      exploded_value = '初三' \
 | 
	
		
			
				|  |  | +OR      exploded_value = '初四' \
 | 
	
		
			
				|  |  | +OR      exploded_value = '初五' \
 | 
	
		
			
				|  |  | +)".format(TABLE)
 | 
	
		
			
				|  |  | +        print("sql:"+sql)
 | 
	
		
			
				|  |  | +        records = execute_sql_from_odps(project=PROJECT, sql=sql)
 | 
	
		
			
				|  |  | +        video_tags_list = []
 | 
	
		
			
				|  |  | +        with records.open_reader() as reader:
 | 
	
		
			
				|  |  | +            for record in reader:
 | 
	
		
			
				|  |  | +                video_id = int(record['videoid'])
 | 
	
		
			
				|  |  | +                tags = ",".join([i for i in str(record['tags']).split(",") if i in TAG_SET])
 | 
	
		
			
				|  |  | +                d = {}
 | 
	
		
			
				|  |  | +                d["video_id"] = video_id
 | 
	
		
			
				|  |  | +                d["tags"] = tags
 | 
	
		
			
				|  |  | +                video_tags_list.append(d)
 | 
	
		
			
				|  |  | +                # log_.info("{}:{}".format(video_id, tags))
 | 
	
		
			
				|  |  | +        log_.info("天级表:{}".format(str(len(video_tags_list))))
 | 
	
		
			
				|  |  | +        return video_tags_list
 | 
	
		
			
				|  |  | +    except Exception as e:
 | 
	
		
			
				|  |  | +        log_.error(str(e) + str(traceback.format_exc()))
 | 
	
		
			
				|  |  | +    return []
 | 
	
		
			
				|  |  | +def get_video_tags_v2():
 | 
	
		
			
				|  |  | +    PROJECT = "loghubods"
 | 
	
		
			
				|  |  | +    TABLE = "loghubods.automated_updates_category_labels_1"
 | 
	
		
			
				|  |  | +    # now_date = datetime.today()
 | 
	
		
			
				|  |  | +    # date = datetime.strftime(now_date, '%Y%m%d')
 | 
	
		
			
				|  |  | +    # previous_date = now_date - timedelta(days=1)
 | 
	
		
			
				|  |  | +    # previous_date_str = datetime.strftime(previous_date, '%Y%m%d')
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    # 获取当前日期
 | 
	
		
			
				|  |  | +    now_date = datetime.today()
 | 
	
		
			
				|  |  | +    # 获取当前月份
 | 
	
		
			
				|  |  | +    current_month = now_date.strftime('%Y%m')
 | 
	
		
			
				|  |  | +    # 获取上个月份
 | 
	
		
			
				|  |  | +    previous_month = (now_date - timedelta(days=now_date.day)).strftime('%Y%m')
 | 
	
		
			
				|  |  | +    try:
 | 
	
		
			
				|  |  | +        sql = '''SELECT  videoid
 | 
	
		
			
				|  |  | +        ,secondary_labels
 | 
	
		
			
				|  |  | +FROM    loghubods.automated_updates_category_labels_1
 | 
	
		
			
				|  |  | +WHERE   (
 | 
	
		
			
				|  |  | +            dt LIKE '{}%' OR dt LIKE '{}%'
 | 
	
		
			
				|  |  | +)
 | 
	
		
			
				|  |  | +'''.format(current_month, previous_month)
 | 
	
		
			
				|  |  | +        print("sql:" + sql)
 | 
	
		
			
				|  |  | +        records = execute_sql_from_odps(project=PROJECT, sql=sql)
 | 
	
		
			
				|  |  | +        video_tags_list = []
 | 
	
		
			
				|  |  | +        with records.open_reader() as reader:
 | 
	
		
			
				|  |  | +            for record in reader:
 | 
	
		
			
				|  |  | +                video_id = int(record['videoid'])
 | 
	
		
			
				|  |  | +                tags = ",".join([i for i in str(record['secondary_labels']).split(",") if i in TAG_SET])
 | 
	
		
			
				|  |  | +                d = {}
 | 
	
		
			
				|  |  | +                d["video_id"] = video_id
 | 
	
		
			
				|  |  | +                d["tags"] = tags
 | 
	
		
			
				|  |  | +                video_tags_list.append(d)
 | 
	
		
			
				|  |  | +                # log_.info("{}:{}".format(video_id, tags))
 | 
	
		
			
				|  |  | +        log_.info("增量表:{}".format(str(len(video_tags_list))))
 | 
	
		
			
				|  |  | +        return video_tags_list
 | 
	
		
			
				|  |  | +    except Exception as e:
 | 
	
		
			
				|  |  | +        log_.error(str(e) + str(traceback.format_exc()))
 | 
	
		
			
				|  |  | +    return []
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +def check_data(project, table, partition) -> int:
 | 
	
		
			
				|  |  | +    """检查数据是否准备好,输出数据条数"""
 | 
	
		
			
				|  |  | +    odps = ODPS(
 | 
	
		
			
				|  |  | +        access_id=config_.ODPS_CONFIG['ACCESSID'],
 | 
	
		
			
				|  |  | +        secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
 | 
	
		
			
				|  |  | +        project=project,
 | 
	
		
			
				|  |  | +        endpoint=config_.ODPS_CONFIG['ENDPOINT'],
 | 
	
		
			
				|  |  | +        connect_timeout=3000,
 | 
	
		
			
				|  |  | +        read_timeout=500000,
 | 
	
		
			
				|  |  | +        pool_maxsize=1000,
 | 
	
		
			
				|  |  | +        pool_connections=1000
 | 
	
		
			
				|  |  | +    )
 | 
	
		
			
				|  |  | +    try:
 | 
	
		
			
				|  |  | +        t = odps.get_table(name=table)
 | 
	
		
			
				|  |  | +        log_.info(f"检查分区是否存在-【 dt={partition} 】")
 | 
	
		
			
				|  |  | +        check_res = t.exist_partition(partition_spec=f'dt={partition}')
 | 
	
		
			
				|  |  | +        if check_res:
 | 
	
		
			
				|  |  | +            sql = f'select * from {project}.{table} where dt = {partition}'
 | 
	
		
			
				|  |  | +            log_.info(sql)
 | 
	
		
			
				|  |  | +            with odps.execute_sql(sql=sql).open_reader() as reader:
 | 
	
		
			
				|  |  | +                data_count = reader.count
 | 
	
		
			
				|  |  | +        else:
 | 
	
		
			
				|  |  | +            log_.info("表{}分区{}不存在".format(table, partition))
 | 
	
		
			
				|  |  | +            data_count = 0
 | 
	
		
			
				|  |  | +    except Exception as e:
 | 
	
		
			
				|  |  | +        log_.error("table:{},partition:{} no data. return data_count=0:{}".format(table, partition, e))
 | 
	
		
			
				|  |  | +        data_count = 0
 | 
	
		
			
				|  |  | +    return data_count
 | 
	
		
			
				|  |  | +def get_video_tags_v3(project, table, partition):
 | 
	
		
			
				|  |  | +    try:
 | 
	
		
			
				|  |  | +        sql = '''SELECT  videoid
 | 
	
		
			
				|  |  | +    ,secondary_labels
 | 
	
		
			
				|  |  | +FROM    {}.{}
 | 
	
		
			
				|  |  | +WHERE   (
 | 
	
		
			
				|  |  | +        dt = '{}'
 | 
	
		
			
				|  |  | +)
 | 
	
		
			
				|  |  | +'''.format(project, table, partition)
 | 
	
		
			
				|  |  | +        print("sql:" + sql)
 | 
	
		
			
				|  |  | +        records = execute_sql_from_odps(project=project, sql=sql)
 | 
	
		
			
				|  |  | +        video_tags_list = []
 | 
	
		
			
				|  |  | +        with records.open_reader() as reader:
 | 
	
		
			
				|  |  | +            for record in reader:
 | 
	
		
			
				|  |  | +                video_id = int(record['videoid'])
 | 
	
		
			
				|  |  | +                tags = ",".join([i for i in str(record['secondary_labels']).split(",") if i in TAG_SET])
 | 
	
		
			
				|  |  | +                d = {}
 | 
	
		
			
				|  |  | +                d["video_id"] = video_id
 | 
	
		
			
				|  |  | +                d["tags"] = tags
 | 
	
		
			
				|  |  | +                video_tags_list.append(d)
 | 
	
		
			
				|  |  | +        log_.info("增量表:{}".format(str(len(video_tags_list))))
 | 
	
		
			
				|  |  | +        return video_tags_list
 | 
	
		
			
				|  |  | +    except Exception as e:
 | 
	
		
			
				|  |  | +        log_.error(str(e) + ":" + str(traceback.format_exc()))
 | 
	
		
			
				|  |  | +    return []
 | 
	
		
			
				|  |  | +def process_and_store(row):
 | 
	
		
			
				|  |  | +    video_id = row["video_id"]
 | 
	
		
			
				|  |  | +    tags = row["tags"]
 | 
	
		
			
				|  |  | +    key = REDIS_PREFIX + str(video_id)
 | 
	
		
			
				|  |  | +    expire_time = 24 * 3600 * 2
 | 
	
		
			
				|  |  | +    redis_helper.set_data_to_redis(key, tags, expire_time)
 | 
	
		
			
				|  |  | +    # log_.info("video-tags写入数据key={},value={}".format(key, tags))
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +def main():
 | 
	
		
			
				|  |  | +    log_.info("开始执行:" + datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
 | 
	
		
			
				|  |  | +    now_date = datetime.today()
 | 
	
		
			
				|  |  | +    project = "loghubods"
 | 
	
		
			
				|  |  | +    table = "automated_updates_category_labels_new"
 | 
	
		
			
				|  |  | +    partition = datetime.strftime(now_date, '%Y%m%d%H')
 | 
	
		
			
				|  |  | +    table_data_cnt = check_data(project, table, partition)
 | 
	
		
			
				|  |  | +    if table_data_cnt == 0:
 | 
	
		
			
				|  |  | +        log_.info("上游数据{}未就绪{},等待...".format(table, partition))
 | 
	
		
			
				|  |  | +        Timer(60*3, main).start()
 | 
	
		
			
				|  |  | +    else:
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        video_tags_list = get_video_tags()
 | 
	
		
			
				|  |  | +        # video_tags_list2 = get_video_tags_v2()
 | 
	
		
			
				|  |  | +        log_.info("上游数据就绪,count={},开始读取数据表".format(table_data_cnt))
 | 
	
		
			
				|  |  | +        video_tags_list2 = get_video_tags_v3(project, table, partition)
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        m1 = {}
 | 
	
		
			
				|  |  | +        m2 = {}
 | 
	
		
			
				|  |  | +        for row in video_tags_list:
 | 
	
		
			
				|  |  | +            m1[row["video_id"]] = row["tags"]
 | 
	
		
			
				|  |  | +        for row in video_tags_list2:
 | 
	
		
			
				|  |  | +            m2[row["video_id"]] = row["tags"]
 | 
	
		
			
				|  |  | +        merged_map = {}
 | 
	
		
			
				|  |  | +        merged_map.update(m1)
 | 
	
		
			
				|  |  | +        for key, value in m2.items():
 | 
	
		
			
				|  |  | +            if key in merged_map:
 | 
	
		
			
				|  |  | +                l1 = merged_map[key].split(",")
 | 
	
		
			
				|  |  | +                l2 = value.split(",")
 | 
	
		
			
				|  |  | +                l1.extend(l2)
 | 
	
		
			
				|  |  | +                l3 = list(set(l1))
 | 
	
		
			
				|  |  | +                merged_map[key] = ",".join(l3)
 | 
	
		
			
				|  |  | +            else:
 | 
	
		
			
				|  |  | +                merged_map[key] = value
 | 
	
		
			
				|  |  | +        result = []
 | 
	
		
			
				|  |  | +        for key, value in merged_map.items():
 | 
	
		
			
				|  |  | +            tmp = {}
 | 
	
		
			
				|  |  | +            tmp["video_id"] = key
 | 
	
		
			
				|  |  | +            tmp["tags"] = value
 | 
	
		
			
				|  |  | +            result.append(tmp)
 | 
	
		
			
				|  |  | +            log_.info("{}:{}".format(key, value))
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        log_.info("video的数据量:{}".format(len(result)))
 | 
	
		
			
				|  |  | +        records_process_for_list(result, process_and_store, max_size=50, num_workers=8)
 | 
	
		
			
				|  |  | +        log_.info("video的数据量:{}".format(len(result)))
 | 
	
		
			
				|  |  | +        log_.info("完成执行:" + datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +if __name__ == '__main__':
 | 
	
		
			
				|  |  | +    main()
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +# cd /root/zhangbo/rov-offline
 | 
	
		
			
				|  |  | +# python alg_recsys_recall_tags_videos_v2.py
 | 
	
		
			
				|  |  | +# nohup python alg_recsys_recall_tags_videos_v2.py > p.log 2>&1 &
 |