zhangbo 1 年之前
父节点
当前提交
a6af5b38f5
共有 2 个文件被更改,包括 201 次插入1 次删除
  1. 5 1
      alg_recsys_recall_tags_videos.py
  2. 196 0
      alg_recsys_recall_undertake.py

+ 5 - 1
alg_recsys_recall_tags_videos.py

@@ -30,7 +30,11 @@ TAG_SET = set(['元旦','腊八节','小年','小年','除夕','春节','情人
         "秋分","寒露","霜降","立冬","小雪","大雪","冬至",
 
         "孙中山诞辰","孙中山逝世","周恩来诞辰","周恩来逝世","邓小平诞辰","邓小平逝世","李克强诞辰","李克强逝世","袁隆平诞辰",
-        "袁隆平逝世","彭德怀诞辰","彭德怀逝世","朱德诞辰","朱德逝世","吴尊友逝世"
+        "袁隆平逝世","彭德怀诞辰","彭德怀逝世","朱德诞辰","朱德逝世","吴尊友逝世",
+
+               "初一","初二","初三","初四","初五"
+
+
 ])
 
 def get_video_tags():

+ 196 - 0
alg_recsys_recall_undertake.py

@@ -0,0 +1,196 @@
+# -*- coding: utf-8 -*-
+import traceback
+from config import set_config
+from log import Log
+from utils import execute_sql_from_odps
+from db_helper import RedisHelper
+from datetime import datetime, timedelta
+
+from alg_recsys_recall_4h_region_trend import records_process_for_list
+config_, _ = set_config()
+log_ = Log()
+redis_helper = RedisHelper()
+
+
+REDIS_PREFIX = "alg_recsys_video_tags_"
+
+def get_video_tags():
+    """获取视频的tag"""
+    PROJECT = "videoods"
+    TABLE = "videoods.dim_video"
+    try:
+        sql = "SELECT  videoid \
+,tags \
+FROM    {} \
+LATERAL VIEW EXPLODE(SPLIT(tags,',')) exploded AS exploded_value \
+WHERE   tags IS NOT NULL \
+AND     ( \
+exploded_value = '元旦' \
+OR      exploded_value = '腊八节' \
+OR      exploded_value = '小年' \
+OR      exploded_value = '除夕' \
+OR      exploded_value = '春节' \
+OR      exploded_value = '情人节' \
+OR      exploded_value = '元宵节' \
+OR      exploded_value = '龙抬头' \
+OR      exploded_value = '妇女节' \
+OR      exploded_value = '劳动节' \
+OR      exploded_value = '母亲节' \
+OR      exploded_value = '儿童节' \
+OR      exploded_value = '端午节' \
+OR      exploded_value = '父亲节' \
+OR      exploded_value = '建党节' \
+OR      exploded_value = '七七事变' \
+OR      exploded_value = '建军节' \
+OR      exploded_value = '七夕节' \
+OR      exploded_value = '中元节' \
+OR      exploded_value = '中秋节' \
+OR      exploded_value = '毛主席逝世' \
+OR      exploded_value = '国庆节' \
+OR      exploded_value = '重阳节' \
+OR      exploded_value = '感恩节' \
+OR      exploded_value = '公祭日' \
+OR      exploded_value = '平安夜' \
+OR      exploded_value = '圣诞节' \
+OR      exploded_value = '毛主席诞辰' \
+OR      exploded_value = '小寒' \
+OR      exploded_value = '大寒' \
+OR      exploded_value = '立春' \
+OR      exploded_value = '雨水' \
+OR      exploded_value = '惊蛰' \
+OR      exploded_value = '春分' \
+OR      exploded_value = '清明' \
+OR      exploded_value = '谷雨' \
+OR      exploded_value = '立夏' \
+OR      exploded_value = '小满' \
+OR      exploded_value = '芒种' \
+OR      exploded_value = '夏至' \
+OR      exploded_value = '小暑' \
+OR      exploded_value = '大暑' \
+OR      exploded_value = '立秋' \
+OR      exploded_value = '处暑' \
+OR      exploded_value = '白露' \
+OR      exploded_value = '秋分' \
+OR      exploded_value = '寒露' \
+OR      exploded_value = '霜降' \
+OR      exploded_value = '立冬' \
+OR      exploded_value = '小雪' \
+OR      exploded_value = '大雪' \
+OR      exploded_value = '冬至' \
+OR      exploded_value = '早上好' \
+OR      exploded_value = '中午好' \
+OR      exploded_value = '下午好' \
+OR      exploded_value = '晚上好' \
+OR      exploded_value = '晚安' \
+OR      exploded_value = '祝福' \
+OR      exploded_value = 'P1高风险' \
+OR      exploded_value = 'P0高风险' \
+)".format(TABLE)
+        print("sql:"+sql)
+        records = execute_sql_from_odps(project=PROJECT, sql=sql)
+        video_tags_list = []
+        with records.open_reader() as reader:
+            for record in reader:
+                video_id = int(record['videoid'])
+                tags = ",".join([i for i in str(record['tags']).split(",") if i in TAG_SET])
+                d = {}
+                d["video_id"] = video_id
+                d["tags"] = tags
+                video_tags_list.append(d)
+                # log_.info("{}:{}".format(video_id, tags))
+        log_.info("天级表:{}".format(str(len(video_tags_list))))
+        return video_tags_list
+    except Exception as e:
+        log_.error(str(e) + str(traceback.format_exc()))
+    return []
+def get_video_tags_v2():
+    PROJECT = "loghubods"
+    TABLE = "loghubods.automated_updates_category_labels_1"
+    # now_date = datetime.today()
+    # date = datetime.strftime(now_date, '%Y%m%d')
+    # previous_date = now_date - timedelta(days=1)
+    # previous_date_str = datetime.strftime(previous_date, '%Y%m%d')
+
+    # 获取当前日期
+    now_date = datetime.today()
+    # 获取当前月份
+    current_month = now_date.strftime('%Y%m')
+    # 获取上个月份
+    previous_month = (now_date - timedelta(days=now_date.day)).strftime('%Y%m')
+    try:
+        sql = '''SELECT  videoid
+        ,secondary_labels
+FROM    loghubods.automated_updates_category_labels_1
+WHERE   (
+            dt LIKE '{}%' OR dt LIKE '{}%'
+)
+'''.format(current_month, previous_month)
+        print("sql:" + sql)
+        records = execute_sql_from_odps(project=PROJECT, sql=sql)
+        video_tags_list = []
+        with records.open_reader() as reader:
+            for record in reader:
+                video_id = int(record['videoid'])
+                tags = ",".join([i for i in str(record['secondary_labels']).split(",") if i in TAG_SET])
+                d = {}
+                d["video_id"] = video_id
+                d["tags"] = tags
+                video_tags_list.append(d)
+                # log_.info("{}:{}".format(video_id, tags))
+        log_.info("增量表:{}".format(str(len(video_tags_list))))
+        return video_tags_list
+    except Exception as e:
+        log_.error(str(e) + str(traceback.format_exc()))
+    return []
+def process_and_store(row):
+    video_id = row["video_id"]
+    tags = row["tags"]
+    key = REDIS_PREFIX + str(video_id)
+    expire_time = 24 * 3600 * 2
+    redis_helper.set_data_to_redis(key, tags, expire_time)
+    # log_.info("video-tags写入数据key={},value={}".format(key, tags))
+
+def main():
+    log_.info("开始执行:" + datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
+    video_tags_list = get_video_tags()
+    video_tags_list2 = get_video_tags_v2()
+
+
+    m1 = {}
+    m2 = {}
+    for row in video_tags_list:
+        m1[row["video_id"]] = row["tags"]
+    for row in video_tags_list2:
+        m2[row["video_id"]] = row["tags"]
+    merged_map = {}
+    merged_map.update(m1)
+    for key, value in m2.items():
+        if key in merged_map:
+            l1 = merged_map[key].split(",")
+            l2 = value.split(",")
+            l1.extend(l2)
+            l3 = list(set(l1))
+            merged_map[key] = ",".join(l3)
+        else:
+            merged_map[key] = value
+    result = []
+    for key, value in merged_map.items():
+        tmp = {}
+        tmp["video_id"] = key
+        tmp["tags"] = value
+        result.append(tmp)
+        log_.info("{}:{}".format(key, value))
+
+    log_.info("video的数据量:{}".format(len(result)))
+    records_process_for_list(result, process_and_store, max_size=50, num_workers=8)
+    log_.info("video的数据量:{}".format(len(result)))
+    log_.info("完成执行:" + datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
+
+if __name__ == '__main__':
+    main()
+
+
+
+
+# cd /root/zhangbo/rov-offline
+# python alg_recsys_recall_shield_videos.py