123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111 |
- # -*- coding: utf-8 -*-
- import traceback
- from config import set_config
- from log import Log
- from utils import execute_sql_from_odps
- from db_helper import RedisHelper
- from datetime import datetime, timedelta
- from alg_recsys_recall_4h_region_trend import records_process_for_list
- config_, _ = set_config()
- log_ = Log()
- redis_helper = RedisHelper()
- REDIS_PREFIX = "alg_recsys_video_tags_"
- def get_video_tags_v2():
- PROJECT = "loghubods"
- TABLE = "loghubods.automated_updates_category_labels_1"
- # now_date = datetime.today()
- # date = datetime.strftime(now_date, '%Y%m%d')
- # previous_date = now_date - timedelta(days=1)
- # previous_date_str = datetime.strftime(previous_date, '%Y%m%d')
- # 获取当前日期
- now_date = datetime.today()
- # 获取当前月份
- current_month = now_date.strftime('%Y%m')
- # 获取上个月份
- previous_month = (now_date - timedelta(days=now_date.day)).strftime('%Y%m')
- try:
- sql = '''SELECT videoid
- ,secondary_labels
- FROM loghubods.automated_updates_category_labels_1
- WHERE (
- dt LIKE '{}%' OR dt LIKE '{}%'
- )
- '''.format(current_month, previous_month)
- print("sql:" + sql)
- records = execute_sql_from_odps(project=PROJECT, sql=sql)
- video_tags_list = []
- with records.open_reader() as reader:
- for record in reader:
- video_id = int(record['videoid'])
- tags = ",".join([i for i in str(record['secondary_labels']).split(",") if i in TAG_SET])
- d = {}
- d["video_id"] = video_id
- d["tags"] = tags
- video_tags_list.append(d)
- # log_.info("{}:{}".format(video_id, tags))
- log_.info("增量表:{}".format(str(len(video_tags_list))))
- return video_tags_list
- except Exception as e:
- log_.error(str(e) + str(traceback.format_exc()))
- return []
- def process_and_store(row):
- video_id = row["video_id"]
- tags = row["tags"]
- key = REDIS_PREFIX + str(video_id)
- expire_time = 24 * 3600 * 2
- redis_helper.set_data_to_redis(key, tags, expire_time)
- # log_.info("video-tags写入数据key={},value={}".format(key, tags))
- def main():
- log_.info("开始执行:" + datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
- #1 读取当前时间及上一时间
- now_date = datetime.today()
- hour = datetime.strftime(now_date, '%H').lstrip('0')
- date = datetime.strftime(now_date, '%Y%m%d')
- if hour == "":
- hour = "0"
- previous_date = now_date + timedelta(hours=1)
- hour_next = datetime.strftime(previous_date, '%H').lstrip('0')
- date_next = datetime.strftime(previous_date, '%Y%m%d')
- if hour_next == "":
- hour_next = "0"
- #2 循环拼接key,读一个写一个。
- REGION_CODE = {
- '北京': '110000', '天津': '120000', '河北省': '130000', '山西省': '140000', '内蒙古': '150000',
- '辽宁省': '210000', '吉林省': '220000', '黑龙江省': '230000',
- '上海': '310000', '江苏省': '320000', '浙江省': '330000', '安徽省': '340000', '福建省': '350000',
- '江西省': '360000',
- '山东省': '370000',
- '河南省': '410000', '湖北省': '420000', '湖南省': '430000', '广东省': '440000', '广西': '450000',
- '海南省': '460000',
- '重庆': '500000', '四川省': '510000', '贵州省': '520000', '云南省': '530000', '西藏': '540000',
- '陕西省': '610000', '甘肃省': '620000', '青海省': '630000', '宁夏': '640000', '新疆': '650000',
- '台湾省': '710000', '香港': '810000', '澳门': '820000',
- '广州': '440100', '深圳': '440300', '成都': '510100', '长沙': '430100',
- }
- for _, code in REGION_CODE.items():
- read_key = "recall:item:score:region:h:{}:data66:rule66:{}:{}".format(code, date, hour)
- value = redis_helper.get_data_from_set(read_key)
- print(read_key)
- print(value)
- write_key = "recall:item:score:region:h:{}:data66:rule66:{}:{}".format(code, date_next, hour_next)
- print(write_key)
- # records_process_for_list(result, process_and_store, max_size=50, num_workers=8)
- log_.info("完成执行:" + datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
- if __name__ == '__main__':
- main()
- # cd /root/zhangbo/rov-offline
- # python alg_recsys_recall_shield_videos.py
|