Parcourir la source

Merge branch 'feature/zhangbo_flow_recall' of algorithm/rov-offline into master

zhangbo il y a 1 an
Parent
commit
3a0e409c13

+ 167 - 0
alg_recsys_rank_item_realtime_1hroot.py

@@ -0,0 +1,167 @@
+# -*- coding: utf-8 -*-
+import traceback
+import datetime
+from odps import ODPS
+from threading import Timer
+from utils import RedisHelper, get_data_from_odps, send_msg_to_feishu
+from config import set_config
+from log import Log
+from alg_recsys_recall_4h_region_trend import records_process_for_list
+import json
+from datetime import datetime, timedelta
+import sys
+from utils import execute_sql_from_odps
+
+
+config_, _ = set_config()
+log_ = Log()
+redis_helper = RedisHelper()
+
+REDIS_PREFIX = "item_rt_fea_1hroot_"
+
+def process_and_store(row):
+    video_id, json_str = row
+    key = REDIS_PREFIX + str(video_id)
+    expire_time = 6 * 3600
+    redis_helper.set_data_to_redis(key, json_str, expire_time)
+    # log_.info("video写入数据key={},value={}".format(key, json_str))
+
+def check_data(project, table, partition) -> int:
+    """检查数据是否准备好,输出数据条数"""
+    odps = ODPS(
+        access_id=config_.ODPS_CONFIG['ACCESSID'],
+        secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
+        project=project,
+        endpoint=config_.ODPS_CONFIG['ENDPOINT'],
+        connect_timeout=3000,
+        read_timeout=500000,
+        pool_maxsize=1000,
+        pool_connections=1000
+    )
+    try:
+        t = odps.get_table(name=table)
+        log_.info(f"检查分区是否存在-【 dt={partition} 】")
+        check_res = t.exist_partition(partition_spec=f'dt={partition}')
+        if check_res:
+            sql = f'select * from {project}.{table} where dt = {partition}'
+            log_.info(sql)
+            with odps.execute_sql(sql=sql).open_reader() as reader:
+                data_count = reader.count
+        else:
+            log_.info("表{}分区{}不存在".format(table, partition))
+            data_count = 0
+    except Exception as e:
+        log_.error("table:{},partition:{} no data. return data_count=0:{}".format(table, partition, e))
+        data_count = 0
+    return data_count
+
+def get_sql(date, previous_date_str, project):
+    sql = '''
+    SELECT  videoid
+    ,CONCAT_WS(',',COLLECT_LIST(CONCAT(dt,":",return))) AS return
+    ,CONCAT_WS(',',COLLECT_LIST(CONCAT(dt,":",`view`))) AS `view`
+    ,CONCAT_WS(',',COLLECT_LIST(CONCAT(dt,":",share))) AS share
+    ,CONCAT_WS(',',COLLECT_LIST(CONCAT(dt,":",return1))) AS return1
+    ,CONCAT_WS(',',COLLECT_LIST(CONCAT(dt,":",return2))) AS return2
+    ,CONCAT_WS(',',COLLECT_LIST(CONCAT(dt,":",return3))) AS return3
+    ,CONCAT_WS(',',COLLECT_LIST(CONCAT(dt,":",return4))) AS return4
+    FROM    (
+    SELECT  videoid
+    ,dt
+    ,SUM(return_all) as return
+    ,SUM(lastonehour_recommend_view) as `view`
+    ,SUM(lastonehour_recommend_share) as share
+    ,SUM(lastonehour_recommend_return1) as return1
+    ,SUM(lastonehour_recommend_return2n) as return2
+    ,SUM(last2nhour_return1) as return3
+    ,SUM(last2nhour_return2n) as return4
+    FROM    loghubods.video_return_composition_1hour
+    WHERE   dt <= '{}23'
+    AND     dt >= '{}00'
+    GROUP BY videoid
+    ,dt
+    )
+    GROUP BY videoid
+    '''.format(date, previous_date_str)
+    print("sql:" + sql)
+    records = execute_sql_from_odps(project=project, sql=sql)
+    video_list = []
+    with records.open_reader() as reader:
+        for record in reader:
+            video_id = record['videoid']
+            m = dict()
+            try:
+                m["return"] = record['return']
+            except Exception as e:
+                log_.error(e)
+            try:
+                m["view"] = record['view']
+            except Exception as e:
+                log_.error(e)
+            try:
+                m["share"] = record['share']
+            except Exception as e:
+                log_.error(e)
+            try:
+                m["return1"] = record['return1']
+            except Exception as e:
+                log_.error(e)
+            try:
+                m["return2"] = record['return2']
+            except Exception as e:
+                log_.error(e)
+            try:
+                m["return3"] = record['return3']
+            except Exception as e:
+                log_.error(e)
+            try:
+                m["return4"] = record['return4']
+            except Exception as e:
+                log_.error(e)
+
+            json_str = json.dumps(m)
+            video_list.append([video_id, json_str])
+    return video_list
+
+
+def h_timer_check():
+    try:
+        date = sys.argv[1]
+        hour = sys.argv[2]
+    except Exception as e:
+        now_date = datetime.today()
+        date = datetime.strftime(now_date, '%Y%m%d')
+        hour = datetime.now().hour
+        log_.info("没有读取到参数,采用系统时间:{}".format(e))
+    # 1 判断上游数据表是否生产完成
+    project = "loghubods"
+    table = "video_return_composition_1hour"
+    partition = str(date) + str(hour)
+    table_data_cnt = check_data(project, table, partition)
+    if table_data_cnt == 0:
+        log_.info("上游数据{}未就绪{},等待...".format(table, partition))
+        Timer(60, h_timer_check).start()
+    else:
+        log_.info("上游数据就绪,count={},开始读取数据表".format(table_data_cnt))
+        # 2 读取数据表 处理特征
+        previous_date_str = (datetime.strptime(date, "%Y%m%d") - timedelta(days=1)).strftime("%Y%m%d")
+        video_list = get_sql(date, previous_date_str, project)
+        # 3 写入redis
+        log_.info("video的数据量:{}".format(len(video_list)))
+        records_process_for_list(video_list, process_and_store, max_size=50, num_workers=8)
+
+        redis_helper.set_data_to_redis(REDIS_PREFIX + "partition", partition, 24 * 3600)
+
+
+
+
+if __name__ == '__main__':
+    log_.info("开始执行:" + datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
+    h_timer_check()
+    log_.info("完成执行:" + datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
+
+
+
+
+# cd /root/zhangbo/rov-offline
+# python alg_recsys_rank_item_realtime_1hroot.py 20240408 14

+ 20 - 0
alg_recsys_rank_item_realtime_1hroot_task.sh

@@ -0,0 +1,20 @@
+source /etc/profile
+echo $ROV_OFFLINE_ENV
+if [ ! -d "my_logs_feature" ]; then
+    mkdir my_logs_feature
+fi
+cur_time="`date +%Y%m%d`"
+cur_h="`date +%H`"
+echo "开始执行时间:{$(date "+%Y-%m-%d %H:%M:%S")}"
+
+if [[ $ROV_OFFLINE_ENV == 'test' ]]; then
+  /root/anaconda3/bin/python alg_recsys_rank_item_realtime_1hroot.py $cur_time $cur_h
+  echo "结束执行时间:{$(date "+%Y-%m-%d %H:%M:%S")}"
+  echo "all done"
+elif [[ $ROV_OFFLINE_ENV == 'pro' ]]; then
+  /root/anaconda3/bin/python alg_recsys_rank_item_realtime_1hroot.py $cur_time $cur_h
+  echo "结束执行时间:{$(date "+%Y-%m-%d %H:%M:%S")}"
+  echo "all done"
+fi
+
+#sh alg_recsys_rank_item_realtime_1hroot_task.sh

+ 1 - 0
alg_recsys_recall_shield_videos.py

@@ -31,6 +31,7 @@ def get_special_area_limit_videos():
         log_.info("videos = {}".format(",".join([str(i) for i in video_id_list])))
         # 存入redis
         if len(video_id_list) > 0:
+            video_id_list = [1]
             value = ",".join([str(i) for i in video_id_list])
             redis_helper.set_data_to_redis(key_name=RISK_SHIELD_FILTER_VIDEO_V1_STR, value=value,
                                            expire_time=3600*24 * 7)

+ 5 - 6
alg_recsys_recall_tags_videos_v2.py

@@ -126,8 +126,7 @@ OR      exploded_value = '初五' \
                 d["video_id"] = video_id
                 d["tags"] = tags
                 video_tags_list.append(d)
-                # log_.info("{}:{}".format(video_id, tags))
-        log_.info("天级表:{}".format(str(len(video_tags_list))))
+        log_.info("第一张表天级表:{}".format(str(len(video_tags_list))))
         return video_tags_list
     except Exception as e:
         log_.error(str(e) + str(traceback.format_exc()))
@@ -220,7 +219,7 @@ WHERE   (
                 d["video_id"] = video_id
                 d["tags"] = tags
                 video_tags_list.append(d)
-        log_.info("增量表:{}".format(str(len(video_tags_list))))
+        log_.info("第二张表增量表:{}".format(str(len(video_tags_list))))
         return video_tags_list
     except Exception as e:
         log_.error(str(e) + ":" + str(traceback.format_exc()))
@@ -229,7 +228,7 @@ def process_and_store(row):
     video_id = row["video_id"]
     tags = row["tags"]
     key = REDIS_PREFIX + str(video_id)
-    expire_time = 24 * 3600 * 2
+    expire_time = 3600 * 1.5
     redis_helper.set_data_to_redis(key, tags, expire_time)
     # log_.info("video-tags写入数据key={},value={}".format(key, tags))
 
@@ -245,7 +244,8 @@ def main():
         Timer(60*3, main).start()
     else:
 
-        video_tags_list = get_video_tags()
+        # video_tags_list = get_video_tags()
+        video_tags_list = []
         # video_tags_list2 = get_video_tags_v2()
         log_.info("上游数据就绪,count={},开始读取数据表".format(table_data_cnt))
         video_tags_list2 = get_video_tags_v3(project, table, partition)
@@ -275,7 +275,6 @@ def main():
             result.append(tmp)
             log_.info("{}:{}".format(key, value))
 
-        log_.info("video的数据量:{}".format(len(result)))
         records_process_for_list(result, process_and_store, max_size=50, num_workers=8)
         log_.info("video的数据量:{}".format(len(result)))
         log_.info("完成执行:" + datetime.now().strftime("%Y-%m-%d %H:%M:%S"))

+ 0 - 215
alg_recsys_recall_tags_videosv2.py

@@ -1,215 +0,0 @@
-# -*- coding: utf-8 -*-
-import traceback
-from config import set_config
-from log import Log
-from utils import execute_sql_from_odps
-from db_helper import RedisHelper
-from datetime import datetime, timedelta
-
-from alg_recsys_recall_4h_region_trend import records_process_for_list
-config_, _ = set_config()
-log_ = Log()
-redis_helper = RedisHelper()
-
-
-REDIS_PREFIX = "alg_recsys_video_tags_"
-
-TAG_SET = set(['元旦','腊八节','小年','小年','除夕','春节','情人节','元宵节','龙抬头','妇女节','劳动节','母亲节',
-    '儿童节','端午节','父亲节','建党节','七七事变','建军节','七夕节','中元节','中秋节','毛主席逝世',
-    '国庆节','重阳节','感恩节','公祭日','平安夜','圣诞节','毛主席诞辰','小寒','大寒','立春','雨水',
-    '惊蛰','春分','清明','谷雨','立夏','小满','芒种','夏至','小暑','大暑','立秋','处暑','白露','秋分',
-    '寒露','霜降','立冬','小雪','大雪','冬至','早上好','中午好','下午好','晚上好','晚安','祝福',
-    'P1高风险','P0高风险',
-"早上好","中午好","下午好","晚上好","晚安",
-        "祝福",
-"元旦","腊八节","小年","除夕","春节","情人节","元宵节","龙抬头","妇女节","劳动节","母亲节","儿童节","端午节","父亲节",
-        "建党节","七七事变","建军节","七夕节","中元节","中秋节","毛主席逝世","国庆节","重阳节","感恩节","公祭日","平安夜","圣诞节"
-        ,"毛主席诞辰",
-
-        "小寒","大寒","立春","雨水","惊蛰","春分","清明","谷雨","立夏","小满","芒种","夏至","小暑","大暑","立秋","处暑","白露",
-        "秋分","寒露","霜降","立冬","小雪","大雪","冬至",
-
-        "孙中山诞辰","孙中山逝世","周恩来诞辰","周恩来逝世","邓小平诞辰","邓小平逝世","李克强诞辰","李克强逝世","袁隆平诞辰",
-        "袁隆平逝世","彭德怀诞辰","彭德怀逝世","朱德诞辰","朱德逝世","吴尊友逝世"
-])
-
-def get_video_tags():
-    """获取视频的tag"""
-    PROJECT = "videoods"
-    TABLE = "videoods.dim_video"
-    try:
-        sql = "SELECT  videoid \
-,tags \
-FROM    {} \
-LATERAL VIEW EXPLODE(SPLIT(tags,',')) exploded AS exploded_value \
-WHERE   tags IS NOT NULL \
-AND     ( \
-exploded_value = '元旦' \
-OR      exploded_value = '腊八节' \
-OR      exploded_value = '小年' \
-OR      exploded_value = '除夕' \
-OR      exploded_value = '春节' \
-OR      exploded_value = '情人节' \
-OR      exploded_value = '元宵节' \
-OR      exploded_value = '龙抬头' \
-OR      exploded_value = '妇女节' \
-OR      exploded_value = '劳动节' \
-OR      exploded_value = '母亲节' \
-OR      exploded_value = '儿童节' \
-OR      exploded_value = '端午节' \
-OR      exploded_value = '父亲节' \
-OR      exploded_value = '建党节' \
-OR      exploded_value = '七七事变' \
-OR      exploded_value = '建军节' \
-OR      exploded_value = '七夕节' \
-OR      exploded_value = '中元节' \
-OR      exploded_value = '中秋节' \
-OR      exploded_value = '毛主席逝世' \
-OR      exploded_value = '国庆节' \
-OR      exploded_value = '重阳节' \
-OR      exploded_value = '感恩节' \
-OR      exploded_value = '公祭日' \
-OR      exploded_value = '平安夜' \
-OR      exploded_value = '圣诞节' \
-OR      exploded_value = '毛主席诞辰' \
-OR      exploded_value = '小寒' \
-OR      exploded_value = '大寒' \
-OR      exploded_value = '立春' \
-OR      exploded_value = '雨水' \
-OR      exploded_value = '惊蛰' \
-OR      exploded_value = '春分' \
-OR      exploded_value = '清明' \
-OR      exploded_value = '谷雨' \
-OR      exploded_value = '立夏' \
-OR      exploded_value = '小满' \
-OR      exploded_value = '芒种' \
-OR      exploded_value = '夏至' \
-OR      exploded_value = '小暑' \
-OR      exploded_value = '大暑' \
-OR      exploded_value = '立秋' \
-OR      exploded_value = '处暑' \
-OR      exploded_value = '白露' \
-OR      exploded_value = '秋分' \
-OR      exploded_value = '寒露' \
-OR      exploded_value = '霜降' \
-OR      exploded_value = '立冬' \
-OR      exploded_value = '小雪' \
-OR      exploded_value = '大雪' \
-OR      exploded_value = '冬至' \
-OR      exploded_value = '早上好' \
-OR      exploded_value = '中午好' \
-OR      exploded_value = '下午好' \
-OR      exploded_value = '晚上好' \
-OR      exploded_value = '晚安' \
-OR      exploded_value = '祝福' \
-OR      exploded_value = 'P1高风险' \
-OR      exploded_value = 'P0高风险' \
-)".format(TABLE)
-        print("sql:"+sql)
-        records = execute_sql_from_odps(project=PROJECT, sql=sql)
-        video_tags_list = []
-        with records.open_reader() as reader:
-            for record in reader:
-                video_id = int(record['videoid'])
-                tags = ",".join([i for i in str(record['tags']).split(",") if i in TAG_SET])
-                d = {}
-                d["video_id"] = video_id
-                d["tags"] = tags
-                video_tags_list.append(d)
-                # log_.info("{}:{}".format(video_id, tags))
-        log_.info("天级表:{}".format(str(len(video_tags_list))))
-        return video_tags_list
-    except Exception as e:
-        log_.error(str(e) + str(traceback.format_exc()))
-    return []
-def get_video_tags_v2():
-    PROJECT = "loghubods"
-    TABLE = "loghubods.automated_updates_category_labels_1"
-    # now_date = datetime.today()
-    # date = datetime.strftime(now_date, '%Y%m%d')
-    # previous_date = now_date - timedelta(days=1)
-    # previous_date_str = datetime.strftime(previous_date, '%Y%m%d')
-
-    # 获取当前日期
-    now_date = datetime.today()
-    # 获取当前月份
-    current_month = now_date.strftime('%Y%m')
-    # 获取上个月份
-    previous_month = (now_date - timedelta(days=now_date.day)).strftime('%Y%m')
-    try:
-        sql = '''SELECT  videoid
-        ,secondary_labels
-FROM    loghubods.automated_updates_category_labels_1
-WHERE   (
-            dt LIKE '{}%' OR dt LIKE '{}%'
-)
-'''.format(current_month, previous_month)
-        print("sql:" + sql)
-        records = execute_sql_from_odps(project=PROJECT, sql=sql)
-        video_tags_list = []
-        with records.open_reader() as reader:
-            for record in reader:
-                video_id = int(record['videoid'])
-                tags = ",".join([i for i in str(record['secondary_labels']).split(",") if i in TAG_SET])
-                d = {}
-                d["video_id"] = video_id
-                d["tags"] = tags
-                video_tags_list.append(d)
-                # log_.info("{}:{}".format(video_id, tags))
-        log_.info("增量表:{}".format(str(len(video_tags_list))))
-        return video_tags_list
-    except Exception as e:
-        log_.error(str(e) + str(traceback.format_exc()))
-    return []
-def process_and_store(row):
-    video_id = row["video_id"]
-    tags = row["tags"]
-    key = REDIS_PREFIX + str(video_id)
-    expire_time = 24 * 3600 * 2
-    redis_helper.set_data_to_redis(key, tags, expire_time)
-    # log_.info("video-tags写入数据key={},value={}".format(key, tags))
-
-def main():
-    log_.info("开始执行:" + datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
-    video_tags_list = get_video_tags()
-    video_tags_list2 = get_video_tags_v2()
-
-
-    m1 = {}
-    m2 = {}
-    for row in video_tags_list:
-        m1[row["video_id"]] = row["tags"]
-    for row in video_tags_list2:
-        m2[row["video_id"]] = row["tags"]
-    merged_map = {}
-    merged_map.update(m1)
-    for key, value in m2.items():
-        if key in merged_map:
-            l1 = merged_map[key].split(",")
-            l2 = value.split(",")
-            l1.extend(l2)
-            l3 = list(set(l1))
-            merged_map[key] = ",".join(l3)
-        else:
-            merged_map[key] = value
-    result = []
-    for key, value in merged_map.items():
-        tmp = {}
-        tmp["video_id"] = key
-        tmp["tags"] = value
-        result.append(tmp)
-        log_.info("{}:{}".format(key, value))
-
-    log_.info("video的数据量:{}".format(len(result)))
-    records_process_for_list(result, process_and_store, max_size=50, num_workers=8)
-    log_.info("video的数据量:{}".format(len(result)))
-    log_.info("完成执行:" + datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
-
-if __name__ == '__main__':
-    main()
-
-
-
-
-# cd /root/zhangbo/rov-offline
-# python alg_recsys_recall_shield_videos.py

+ 2 - 2
config.py

@@ -206,14 +206,14 @@ class BaseConfig(object):
         'videos5': {APP_TYPE['LONG_VIDEO']: 1},  # [内容精选]
         'data66': {
             APP_TYPE['VLOG']: 0.3,
-            APP_TYPE['LOVE_LIVE']: 0.2,
+            # APP_TYPE['LOVE_LIVE']: 0.2,
             APP_TYPE['LONG_VIDEO']: 0.2,
             APP_TYPE['SHORT_VIDEO']: 0.05,
             APP_TYPE['WAN_NENG_VIDEO']: 0.05,
             # APP_TYPE['LAO_HAO_KAN_VIDEO']: 1,
             # APP_TYPE['ZUI_JING_QI']: 1,
             APP_TYPE['APP']: 0.05,
-            APP_TYPE['PIAO_QUAN_VIDEO_PLUS']: 0.05,
+            # APP_TYPE['PIAO_QUAN_VIDEO_PLUS']: 0.05,
             APP_TYPE['JOURNEY']: 0.05,
             APP_TYPE['BLESSING_YEAR']: 0.04,
             APP_TYPE['PIAO_QUAN_BLESSING']: 0.01