zhangbo 1 年之前
父节点
当前提交
08a07141ea
共有 1 个文件被更改,包括 34 次插入119 次删除
  1. 34 119
      alg_recsys_recall_undertake.py

+ 34 - 119
alg_recsys_recall_undertake.py

@@ -14,95 +14,7 @@ redis_helper = RedisHelper()
 
 REDIS_PREFIX = "alg_recsys_video_tags_"
 
-def get_video_tags():
-    """获取视频的tag"""
-    PROJECT = "videoods"
-    TABLE = "videoods.dim_video"
-    try:
-        sql = "SELECT  videoid \
-,tags \
-FROM    {} \
-LATERAL VIEW EXPLODE(SPLIT(tags,',')) exploded AS exploded_value \
-WHERE   tags IS NOT NULL \
-AND     ( \
-exploded_value = '元旦' \
-OR      exploded_value = '腊八节' \
-OR      exploded_value = '小年' \
-OR      exploded_value = '除夕' \
-OR      exploded_value = '春节' \
-OR      exploded_value = '情人节' \
-OR      exploded_value = '元宵节' \
-OR      exploded_value = '龙抬头' \
-OR      exploded_value = '妇女节' \
-OR      exploded_value = '劳动节' \
-OR      exploded_value = '母亲节' \
-OR      exploded_value = '儿童节' \
-OR      exploded_value = '端午节' \
-OR      exploded_value = '父亲节' \
-OR      exploded_value = '建党节' \
-OR      exploded_value = '七七事变' \
-OR      exploded_value = '建军节' \
-OR      exploded_value = '七夕节' \
-OR      exploded_value = '中元节' \
-OR      exploded_value = '中秋节' \
-OR      exploded_value = '毛主席逝世' \
-OR      exploded_value = '国庆节' \
-OR      exploded_value = '重阳节' \
-OR      exploded_value = '感恩节' \
-OR      exploded_value = '公祭日' \
-OR      exploded_value = '平安夜' \
-OR      exploded_value = '圣诞节' \
-OR      exploded_value = '毛主席诞辰' \
-OR      exploded_value = '小寒' \
-OR      exploded_value = '大寒' \
-OR      exploded_value = '立春' \
-OR      exploded_value = '雨水' \
-OR      exploded_value = '惊蛰' \
-OR      exploded_value = '春分' \
-OR      exploded_value = '清明' \
-OR      exploded_value = '谷雨' \
-OR      exploded_value = '立夏' \
-OR      exploded_value = '小满' \
-OR      exploded_value = '芒种' \
-OR      exploded_value = '夏至' \
-OR      exploded_value = '小暑' \
-OR      exploded_value = '大暑' \
-OR      exploded_value = '立秋' \
-OR      exploded_value = '处暑' \
-OR      exploded_value = '白露' \
-OR      exploded_value = '秋分' \
-OR      exploded_value = '寒露' \
-OR      exploded_value = '霜降' \
-OR      exploded_value = '立冬' \
-OR      exploded_value = '小雪' \
-OR      exploded_value = '大雪' \
-OR      exploded_value = '冬至' \
-OR      exploded_value = '早上好' \
-OR      exploded_value = '中午好' \
-OR      exploded_value = '下午好' \
-OR      exploded_value = '晚上好' \
-OR      exploded_value = '晚安' \
-OR      exploded_value = '祝福' \
-OR      exploded_value = 'P1高风险' \
-OR      exploded_value = 'P0高风险' \
-)".format(TABLE)
-        print("sql:"+sql)
-        records = execute_sql_from_odps(project=PROJECT, sql=sql)
-        video_tags_list = []
-        with records.open_reader() as reader:
-            for record in reader:
-                video_id = int(record['videoid'])
-                tags = ",".join([i for i in str(record['tags']).split(",") if i in TAG_SET])
-                d = {}
-                d["video_id"] = video_id
-                d["tags"] = tags
-                video_tags_list.append(d)
-                # log_.info("{}:{}".format(video_id, tags))
-        log_.info("天级表:{}".format(str(len(video_tags_list))))
-        return video_tags_list
-    except Exception as e:
-        log_.error(str(e) + str(traceback.format_exc()))
-    return []
+
 def get_video_tags_v2():
     PROJECT = "loghubods"
     TABLE = "loghubods.automated_updates_category_labels_1"
@@ -152,38 +64,41 @@ def process_and_store(row):
 
 def main():
     log_.info("开始执行:" + datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
-    video_tags_list = get_video_tags()
-    video_tags_list2 = get_video_tags_v2()
-
+    #1 读取当前时间及上一时间
+    now_date = datetime.today()
+    hour = datetime.strftime(now_date, '%H').lstrip('0')
+    date = datetime.strftime(now_date, '%Y%m%d')
+    if hour == "":
+        hour = "0"
+    previous_date = now_date + timedelta(hours=1)
+    hour_next = datetime.strftime(previous_date, '%H').lstrip('0')
+    date_next = datetime.strftime(previous_date, '%Y%m%d')
+    if hour_next == "":
+        hour_next = "0"
+    #2 循环拼接key,读一个写一个。
+    REGION_CODE = {
+        '北京': '110000', '天津': '120000', '河北省': '130000', '山西省': '140000', '内蒙古': '150000',
+        '辽宁省': '210000', '吉林省': '220000', '黑龙江省': '230000',
+        '上海': '310000', '江苏省': '320000', '浙江省': '330000', '安徽省': '340000', '福建省': '350000',
+        '江西省': '360000',
+        '山东省': '370000',
+        '河南省': '410000', '湖北省': '420000', '湖南省': '430000', '广东省': '440000', '广西': '450000',
+        '海南省': '460000',
+        '重庆': '500000', '四川省': '510000', '贵州省': '520000', '云南省': '530000', '西藏': '540000',
+        '陕西省': '610000', '甘肃省': '620000', '青海省': '630000', '宁夏': '640000', '新疆': '650000',
+        '台湾省': '710000', '香港': '810000', '澳门': '820000',
 
-    m1 = {}
-    m2 = {}
-    for row in video_tags_list:
-        m1[row["video_id"]] = row["tags"]
-    for row in video_tags_list2:
-        m2[row["video_id"]] = row["tags"]
-    merged_map = {}
-    merged_map.update(m1)
-    for key, value in m2.items():
-        if key in merged_map:
-            l1 = merged_map[key].split(",")
-            l2 = value.split(",")
-            l1.extend(l2)
-            l3 = list(set(l1))
-            merged_map[key] = ",".join(l3)
-        else:
-            merged_map[key] = value
-    result = []
-    for key, value in merged_map.items():
-        tmp = {}
-        tmp["video_id"] = key
-        tmp["tags"] = value
-        result.append(tmp)
-        log_.info("{}:{}".format(key, value))
+        '广州': '440100', '深圳': '440300', '成都': '510100', '长沙': '430100',
+    }
+    for _, code in REGION_CODE.items():
+        read_key = "recall:item:score:region:h:{}:data66:rule66:{}:{}".format(code, date, hour)
+        value = redis_helper.get_data_from_set(read_key)
+        print(read_key)
+        print(value)
+        write_key = "recall:item:score:region:h:{}:data66:rule66:{}:{}".format(code, date_next, hour_next)
+        print(write_key)
 
-    log_.info("video的数据量:{}".format(len(result)))
-    records_process_for_list(result, process_and_store, max_size=50, num_workers=8)
-    log_.info("video的数据量:{}".format(len(result)))
+    # records_process_for_list(result, process_and_store, max_size=50, num_workers=8)
     log_.info("完成执行:" + datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
 
 if __name__ == '__main__':