123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196 |
- # -*- coding: utf-8 -*-
- import traceback
- from config import set_config
- from log import Log
- from utils import execute_sql_from_odps
- from db_helper import RedisHelper
- from datetime import datetime, timedelta
- from alg_recsys_recall_4h_region_trend import records_process_for_list
- config_, _ = set_config()
- log_ = Log()
- redis_helper = RedisHelper()
- REDIS_PREFIX = "alg_recsys_video_tags_"
- def get_video_tags():
- """获取视频的tag"""
- PROJECT = "videoods"
- TABLE = "videoods.dim_video"
- try:
- sql = "SELECT videoid \
- ,tags \
- FROM {} \
- LATERAL VIEW EXPLODE(SPLIT(tags,',')) exploded AS exploded_value \
- WHERE tags IS NOT NULL \
- AND ( \
- exploded_value = '元旦' \
- OR exploded_value = '腊八节' \
- OR exploded_value = '小年' \
- OR exploded_value = '除夕' \
- OR exploded_value = '春节' \
- OR exploded_value = '情人节' \
- OR exploded_value = '元宵节' \
- OR exploded_value = '龙抬头' \
- OR exploded_value = '妇女节' \
- OR exploded_value = '劳动节' \
- OR exploded_value = '母亲节' \
- OR exploded_value = '儿童节' \
- OR exploded_value = '端午节' \
- OR exploded_value = '父亲节' \
- OR exploded_value = '建党节' \
- OR exploded_value = '七七事变' \
- OR exploded_value = '建军节' \
- OR exploded_value = '七夕节' \
- OR exploded_value = '中元节' \
- OR exploded_value = '中秋节' \
- OR exploded_value = '毛主席逝世' \
- OR exploded_value = '国庆节' \
- OR exploded_value = '重阳节' \
- OR exploded_value = '感恩节' \
- OR exploded_value = '公祭日' \
- OR exploded_value = '平安夜' \
- OR exploded_value = '圣诞节' \
- OR exploded_value = '毛主席诞辰' \
- OR exploded_value = '小寒' \
- OR exploded_value = '大寒' \
- OR exploded_value = '立春' \
- OR exploded_value = '雨水' \
- OR exploded_value = '惊蛰' \
- OR exploded_value = '春分' \
- OR exploded_value = '清明' \
- OR exploded_value = '谷雨' \
- OR exploded_value = '立夏' \
- OR exploded_value = '小满' \
- OR exploded_value = '芒种' \
- OR exploded_value = '夏至' \
- OR exploded_value = '小暑' \
- OR exploded_value = '大暑' \
- OR exploded_value = '立秋' \
- OR exploded_value = '处暑' \
- OR exploded_value = '白露' \
- OR exploded_value = '秋分' \
- OR exploded_value = '寒露' \
- OR exploded_value = '霜降' \
- OR exploded_value = '立冬' \
- OR exploded_value = '小雪' \
- OR exploded_value = '大雪' \
- OR exploded_value = '冬至' \
- OR exploded_value = '早上好' \
- OR exploded_value = '中午好' \
- OR exploded_value = '下午好' \
- OR exploded_value = '晚上好' \
- OR exploded_value = '晚安' \
- OR exploded_value = '祝福' \
- OR exploded_value = 'P1高风险' \
- OR exploded_value = 'P0高风险' \
- )".format(TABLE)
- print("sql:"+sql)
- records = execute_sql_from_odps(project=PROJECT, sql=sql)
- video_tags_list = []
- with records.open_reader() as reader:
- for record in reader:
- video_id = int(record['videoid'])
- tags = ",".join([i for i in str(record['tags']).split(",") if i in TAG_SET])
- d = {}
- d["video_id"] = video_id
- d["tags"] = tags
- video_tags_list.append(d)
- # log_.info("{}:{}".format(video_id, tags))
- log_.info("天级表:{}".format(str(len(video_tags_list))))
- return video_tags_list
- except Exception as e:
- log_.error(str(e) + str(traceback.format_exc()))
- return []
- def get_video_tags_v2():
- PROJECT = "loghubods"
- TABLE = "loghubods.automated_updates_category_labels_1"
- # now_date = datetime.today()
- # date = datetime.strftime(now_date, '%Y%m%d')
- # previous_date = now_date - timedelta(days=1)
- # previous_date_str = datetime.strftime(previous_date, '%Y%m%d')
- # 获取当前日期
- now_date = datetime.today()
- # 获取当前月份
- current_month = now_date.strftime('%Y%m')
- # 获取上个月份
- previous_month = (now_date - timedelta(days=now_date.day)).strftime('%Y%m')
- try:
- sql = '''SELECT videoid
- ,secondary_labels
- FROM loghubods.automated_updates_category_labels_1
- WHERE (
- dt LIKE '{}%' OR dt LIKE '{}%'
- )
- '''.format(current_month, previous_month)
- print("sql:" + sql)
- records = execute_sql_from_odps(project=PROJECT, sql=sql)
- video_tags_list = []
- with records.open_reader() as reader:
- for record in reader:
- video_id = int(record['videoid'])
- tags = ",".join([i for i in str(record['secondary_labels']).split(",") if i in TAG_SET])
- d = {}
- d["video_id"] = video_id
- d["tags"] = tags
- video_tags_list.append(d)
- # log_.info("{}:{}".format(video_id, tags))
- log_.info("增量表:{}".format(str(len(video_tags_list))))
- return video_tags_list
- except Exception as e:
- log_.error(str(e) + str(traceback.format_exc()))
- return []
- def process_and_store(row):
- video_id = row["video_id"]
- tags = row["tags"]
- key = REDIS_PREFIX + str(video_id)
- expire_time = 24 * 3600 * 2
- redis_helper.set_data_to_redis(key, tags, expire_time)
- # log_.info("video-tags写入数据key={},value={}".format(key, tags))
- def main():
- log_.info("开始执行:" + datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
- video_tags_list = get_video_tags()
- video_tags_list2 = get_video_tags_v2()
- m1 = {}
- m2 = {}
- for row in video_tags_list:
- m1[row["video_id"]] = row["tags"]
- for row in video_tags_list2:
- m2[row["video_id"]] = row["tags"]
- merged_map = {}
- merged_map.update(m1)
- for key, value in m2.items():
- if key in merged_map:
- l1 = merged_map[key].split(",")
- l2 = value.split(",")
- l1.extend(l2)
- l3 = list(set(l1))
- merged_map[key] = ",".join(l3)
- else:
- merged_map[key] = value
- result = []
- for key, value in merged_map.items():
- tmp = {}
- tmp["video_id"] = key
- tmp["tags"] = value
- result.append(tmp)
- log_.info("{}:{}".format(key, value))
- log_.info("video的数据量:{}".format(len(result)))
- records_process_for_list(result, process_and_store, max_size=50, num_workers=8)
- log_.info("video的数据量:{}".format(len(result)))
- log_.info("完成执行:" + datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
- if __name__ == '__main__':
- main()
- # cd /root/zhangbo/rov-offline
- # python alg_recsys_recall_shield_videos.py
|