| 
					
				 | 
			
			
				@@ -4,26 +4,39 @@ from config import set_config 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 from log import Log 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 from utils import execute_sql_from_odps 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 from db_helper import RedisHelper 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-import datetime 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+from datetime import datetime, timedelta 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 from alg_recsys_recall_4h_region_trend import records_process_for_list 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 config_, _ = set_config() 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 log_ = Log() 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 redis_helper = RedisHelper() 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-PROJECT = "videoods" 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-TABLE = "videoods.dim_video" 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 REDIS_PREFIX = "alg_recsys_video_tags_" 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-TAG_SET = ['元旦','腊八节','小年','小年','除夕','春节','情人节','元宵节','龙抬头','妇女节','劳动节','母亲节', 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+TAG_SET = set(['元旦','腊八节','小年','小年','除夕','春节','情人节','元宵节','龙抬头','妇女节','劳动节','母亲节', 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     '儿童节','端午节','父亲节','建党节','七七事变','建军节','七夕节','中元节','中秋节','毛主席逝世', 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     '国庆节','重阳节','感恩节','公祭日','平安夜','圣诞节','毛主席诞辰','小寒','大寒','立春','雨水', 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     '惊蛰','春分','清明','谷雨','立夏','小满','芒种','夏至','小暑','大暑','立秋','处暑','白露','秋分', 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     '寒露','霜降','立冬','小雪','大雪','冬至','早上好','中午好','下午好','晚上好','晚安','祝福', 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-    'P1高风险','P0高风险' 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-] 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    'P1高风险','P0高风险', 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+"早上好","中午好","下午好","晚上好","晚安", 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        "祝福", 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+"元旦","腊八节","小年","除夕","春节","情人节","元宵节","龙抬头","妇女节","劳动节","母亲节","儿童节","端午节","父亲节", 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        "建党节","七七事变","建军节","七夕节","中元节","中秋节","毛主席逝世","国庆节","重阳节","感恩节","公祭日","平安夜","圣诞节" 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        ,"毛主席诞辰", 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        "小寒","大寒","立春","雨水","惊蛰","春分","清明","谷雨","立夏","小满","芒种","夏至","小暑","大暑","立秋","处暑","白露", 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        "秋分","寒露","霜降","立冬","小雪","大雪","冬至", 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        "孙中山诞辰","孙中山逝世","周恩来诞辰","周恩来逝世","邓小平诞辰","邓小平逝世","李克强诞辰","李克强逝世","袁隆平诞辰", 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        "袁隆平逝世","彭德怀诞辰","彭德怀逝世","朱德诞辰","朱德逝世","吴尊友逝世" 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+]) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 def get_video_tags(): 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     """获取视频的tag""" 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    PROJECT = "videoods" 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    TABLE = "videoods.dim_video" 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     try: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         sql = "SELECT  videoid \ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 ,tags \ 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -103,25 +116,83 @@ OR      exploded_value = 'P0高风险' \ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                 d["video_id"] = video_id 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                 d["tags"] = tags 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                 video_tags_list.append(d) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                log_.info("{}:{}".format(video_id, tags)) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        records_process_for_list(video_tags_list, process_and_store, max_size=50, num_workers=8) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        log_.info("video的数据量:{}".format(len(video_tags_list))) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				- 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                # log_.info("{}:{}".format(video_id, tags)) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        return video_tags_list 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     except Exception as e: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         log_.error(str(e) + str(traceback.format_exc())) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				- 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    return [] 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+def get_video_tags_v2(): 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    PROJECT = "loghubods" 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    TABLE = "loghubods.automated_updates_category_labels_1" 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    now_date = datetime.today() 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    date = datetime.strftime(now_date, '%Y%m%d%H') 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    try: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        sql = '''SELECT  videoid 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        ,secondary_labels 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+FROM    loghubods.automated_updates_category_labels_1 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+WHERE   ( 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            dt LIKE '{}%' 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+'''.format(date) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        print("sql:" + sql) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        records = execute_sql_from_odps(project=PROJECT, sql=sql) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        video_tags_list = [] 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        with records.open_reader() as reader: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            for record in reader: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                video_id = int(record['videoid']) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                tags = ",".join([i for i in str(record['secondary_labels']).split(",") if i in TAG_SET]) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                d = {} 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                d["video_id"] = video_id 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                d["tags"] = tags 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                video_tags_list.append(d) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                # log_.info("{}:{}".format(video_id, tags)) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        return video_tags_list 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    except Exception as e: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        log_.error(str(e) + str(traceback.format_exc())) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    return [] 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 def process_and_store(row): 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     video_id = row["video_id"] 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     tags = row["tags"] 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     key = REDIS_PREFIX + str(video_id) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     expire_time = 24 * 3600 * 2 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     redis_helper.set_data_to_redis(key, tags, expire_time) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-    log_.info("video-tags写入数据key={},value={}".format(key, tags)) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    # log_.info("video-tags写入数据key={},value={}".format(key, tags)) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 def main(): 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-    log_.info("开始执行:" + datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-    get_video_tags() 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-    log_.info("完成执行:" + datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    log_.info("开始执行:" + datetime.now().strftime("%Y-%m-%d %H:%M:%S")) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    video_tags_list = get_video_tags() 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    video_tags_list2 = get_video_tags_v2() 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    m1 = {} 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    m2 = {} 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    for row in video_tags_list: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        m1[row["video_id"]] = row["tags"] 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    for row in video_tags_list2: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        m2[row["video_id"]] = row["tags"] 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    merged_map = {} 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    merged_map.update(m1) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    for key, value in m2.items(): 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        if key in merged_map: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            l1 = merged_map[key].split(",") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            l2 = value.split(",") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            l1.extend(l2) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            l3 = list(set(l1)) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            merged_map[key] = ",".join(l3) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        else: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            merged_map[key] = value 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    result = [] 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    for key, value in merged_map.items(): 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        tmp = {} 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        tmp["video_id"] = key 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        tmp["tags"] = value 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        result.append(tmp) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        log_.info("{}:{}".format(key, value)) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    log_.info("video的数据量:{}".format(len(result))) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    records_process_for_list(result, process_and_store, max_size=50, num_workers=8) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    log_.info("video的数据量:{}".format(len(result))) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    log_.info("完成执行:" + datetime.now().strftime("%Y-%m-%d %H:%M:%S")) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 if __name__ == '__main__': 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     main() 
			 |