Browse Source

Merge branch 'feature/zhangbo_flow_recall' of algorithm/rov-offline into master

zhangbo 1 year ago
parent
commit
a4137087b5

+ 8 - 0
alg_recsys_delete_file.sh

@@ -0,0 +1,8 @@
+
+day="$(date -d '4 days ago' +%Y-%m-%d)"
+rm -rf /root/zhangbo/rov-offline/my_logs_tags/tags_${day}*
+rm -rf /root/zhangbo/rov-offline/my_logs/task_${day}*
+rm -rf /root/zhangbo/rov-offline/my_logs_feature/rt_1day_${day}*
+rm -rf /root/zhangbo/rov-offline/my_logs_feature/rt_1h_${day}*
+rm -rf /root/zhangbo/rov-offline/my_logs_shield/shield_videos_${day}*
+rm -rf /root/zhangbo/rov-offline/logs/${day}*

+ 192 - 0
alg_recsys_rank_item_realtime_1day.py

@@ -0,0 +1,192 @@
+# -*- coding: utf-8 -*-
+import traceback
+import datetime
+from odps import ODPS
+from threading import Timer
+from utils import RedisHelper, get_data_from_odps, send_msg_to_feishu
+from config import set_config
+from log import Log
+from alg_recsys_recall_4h_region_trend import records_process_for_list
+import json
+from datetime import datetime, timedelta
+import sys
+from utils import execute_sql_from_odps
+
+
+config_, _ = set_config()
+log_ = Log()
+redis_helper = RedisHelper()
+
+REDIS_PREFIX = "item_rt_fea_1day_"
+
+def process_and_store(row):
+    video_id, json_str = row
+    key = REDIS_PREFIX + str(video_id)
+    expire_time = 24 * 3600
+    redis_helper.set_data_to_redis(key, json_str, expire_time)
+    # log_.info("video写入数据key={},value={}".format(key, json_str))
+
+def check_data(project, table, partition) -> int:
+    """检查数据是否准备好,输出数据条数"""
+    odps = ODPS(
+        access_id=config_.ODPS_CONFIG['ACCESSID'],
+        secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
+        project=project,
+        endpoint=config_.ODPS_CONFIG['ENDPOINT'],
+        connect_timeout=3000,
+        read_timeout=500000,
+        pool_maxsize=1000,
+        pool_connections=1000
+    )
+    try:
+        t = odps.get_table(name=table)
+        log_.info(f"检查分区是否存在-【 dt={partition} 】")
+        check_res = t.exist_partition(partition_spec=f'dt={partition}')
+        if check_res:
+            sql = f'select * from {project}.{table} where dt = {partition}'
+            log_.info(sql)
+            with odps.execute_sql(sql=sql).open_reader() as reader:
+                data_count = reader.count
+        else:
+            log_.info("表{}分区{}不存在".format(table, partition))
+            data_count = 0
+    except Exception as e:
+        log_.error("table:{},partition:{} no data. return data_count=0:{}".format(table, partition, e))
+        data_count = 0
+    return data_count
+
+def get_sql(date, previous_date_str, project):
+    sql = '''
+    SELECT  videoid
+            ,CONCAT_WS(',',COLLECT_LIST(CONCAT(dt,":",view_pv))) AS view_pv_list_1day
+            ,CONCAT_WS(',',COLLECT_LIST(CONCAT(dt,":",view_uv))) AS view_uv_list_1day
+            ,CONCAT_WS(',',COLLECT_LIST(CONCAT(dt,":",play_pv))) AS play_pv_list_1day
+            ,CONCAT_WS(',',COLLECT_LIST(CONCAT(dt,":",play_uv))) AS play_uv_list_1day
+            ,CONCAT_WS(',',COLLECT_LIST(CONCAT(dt,":",share_pv))) AS share_pv_list_1day
+            ,CONCAT_WS(',',COLLECT_LIST(CONCAT(dt,":",share_uv))) AS share_uv_list_1day
+            ,CONCAT_WS(',',COLLECT_LIST(CONCAT(dt,":",return_uv))) AS return_uv_list_1day
+            ,CONCAT_WS(',',COLLECT_LIST(CONCAT(dt,":",p_view_uv))) AS p_view_uv_list_1day
+            ,CONCAT_WS(',',COLLECT_LIST(CONCAT(dt,":",p_view_pv))) AS p_view_pv_list_1day
+            ,CONCAT_WS(',',COLLECT_LIST(CONCAT(dt,":",p_return_uv))) AS p_return_uv_list_1day
+            ,CONCAT_WS(',',COLLECT_LIST(CONCAT(dt,":",2day_share_uv))) AS share_uv_list_2day
+            ,CONCAT_WS(',',COLLECT_LIST(CONCAT(dt,":",2day_share_pv))) AS share_pv_list_2day
+            ,CONCAT_WS(',',COLLECT_LIST(CONCAT(dt,":",3day_share_uv))) AS share_uv_list_3day
+            ,CONCAT_WS(',',COLLECT_LIST(CONCAT(dt,":",3day_share_pv))) AS share_pv_list_3day
+    FROM    (
+                SELECT  videoid
+                        ,dt
+                        ,SUM(view次数) AS view_pv
+                        ,SUM(view人数) AS view_uv
+                        ,SUM(play次数) AS play_pv
+                        ,SUM(play人数) AS play_uv
+                        ,SUM(share次数) AS share_pv
+                        ,SUM(share人数) AS share_uv
+                        ,SUM(回流人数) AS return_uv
+                        ,SUM(platform_view) AS p_view_uv
+                        ,SUM(platform_view_total) AS p_view_pv
+                        ,SUM(platform_return) AS p_return_uv
+                        ,SUM(lasttwodays_share) AS 2day_share_uv
+                        ,SUM(lasttwodays_share_total) AS 2day_share_pv
+                        ,SUM(lastthreedays_share) AS 3day_share_uv
+                        ,SUM(lastthreedays_share_total) AS 3day_share_pv
+                FROM    loghubods.video_data_each_hour_dataset_24h_total_apptype
+                WHERE   dt <= '{}23'
+                AND     dt >= '{}00'
+                GROUP BY videoid
+                         ,dt
+            ) 
+    GROUP BY videoid
+    '''.format(date, previous_date_str)
+    print("sql:" + sql)
+    records = execute_sql_from_odps(project=project, sql=sql)
+    video_list = []
+    with records.open_reader() as reader:
+        for record in reader:
+            video_id = record['videoid']
+            m = dict()
+            try:
+                m["view_pv_list_1day"] = record['view_pv_list_1day']
+            except Exception as e:
+                log_.error(e)
+            try:
+                m["view_uv_list_1day"] = record['view_uv_list_1day']
+            except Exception as e:
+                log_.error(e)
+            try:
+                m["play_pv_list_1day"] = record['play_pv_list_1day']
+            except Exception as e:
+                log_.error(e)
+            try:
+                m["play_uv_list_1day"] = record['play_uv_list_1day']
+            except Exception as e:
+                log_.error(e)
+            try:
+                m["share_pv_list_1day"] = record['share_pv_list_1day']
+            except Exception as e:
+                log_.error(e)
+            try:
+                m["share_uv_list_1day"] = record['share_uv_list_1day']
+            except Exception as e:
+                log_.error(e)
+            try:
+                m["return_uv_list_1day"] = record['return_uv_list_1day']
+            except Exception as e:
+                log_.error(e)
+            try:
+                m["p_view_pv_list_1day"] = record['p_view_pv_list_1day']
+            except Exception as e:
+                log_.error(e)
+            try:
+                m["p_view_uv_list_1day"] = record['p_view_uv_list_1day']
+            except Exception as e:
+                log_.error(e)
+            try:
+                m["p_return_uv_list_1day"] = record['p_return_uv_list_1day']
+            except Exception as e:
+                log_.error(e)
+            json_str = json.dumps(m)
+            video_list.append([video_id, json_str])
+    return video_list
+
+
+def h_timer_check():
+    try:
+        date = sys.argv[1]
+        hour = sys.argv[2]
+    except Exception as e:
+        now_date = datetime.today()
+        date = datetime.strftime(now_date, '%Y%m%d')
+        hour = datetime.now().hour
+        log_.info("没有读取到参数,采用系统时间,报错info:{}".format(e))
+    # 1 判断上游数据表是否生产完成
+    project = "loghubods"
+    table = "video_data_each_hour_dataset_24h_total_apptype"
+    partition = str(date) + str(hour)
+    table_data_cnt = check_data(project, table, partition)
+    if table_data_cnt == 0:
+        log_.info("上游数据{}未就绪{},等待...".format(table, partition))
+        Timer(60, h_timer_check).start()
+    else:
+        log_.info("上游数据就绪,count={},开始读取数据表".format(table_data_cnt))
+        # 2 读取数据表 处理特征
+        previous_date_str = (datetime.strptime(date, "%Y%m%d") - timedelta(days=1)).strftime("%Y%m%d")
+        video_list = get_sql(date, previous_date_str, project)
+        # 3 写入redis
+        log_.info("video的数据量:{}".format(len(video_list)))
+        records_process_for_list(video_list, process_and_store, max_size=50, num_workers=8)
+
+        redis_helper.set_data_to_redis(REDIS_PREFIX + "partition", partition, 24 * 3600)
+
+
+
+
+if __name__ == '__main__':
+    log_.info("开始执行:" + datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
+    h_timer_check()
+    log_.info("完成执行:" + datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
+
+
+
+
+# cd /root/zhangbo/rov-offline
+# python alg_recsys_rank_item_realtime_1day.py 20240117 20

+ 20 - 0
alg_recsys_rank_item_realtime_1day_task.sh

@@ -0,0 +1,20 @@
+source /etc/profile
+echo $ROV_OFFLINE_ENV
+if [ ! -d "my_logs_feature" ]; then
+    mkdir my_logs_feature
+fi
+cur_time="`date +%Y%m%d`"
+cur_h="`date +%H`"
+echo "开始执行时间:{$(date "+%Y-%m-%d %H:%M:%S")}"
+
+if [[ $ROV_OFFLINE_ENV == 'test' ]]; then
+  /root/anaconda3/bin/python alg_recsys_rank_item_realtime_1day.py $cur_time $cur_h
+  echo "结束执行时间:{$(date "+%Y-%m-%d %H:%M:%S")}"
+  echo "all done"
+elif [[ $ROV_OFFLINE_ENV == 'pro' ]]; then
+  /root/anaconda3/bin/python alg_recsys_rank_item_realtime_1day.py $cur_time $cur_h
+  echo "结束执行时间:{$(date "+%Y-%m-%d %H:%M:%S")}"
+  echo "all done"
+fi
+
+#sh alg_recsys_rank_item_realtime_1day_task.sh

+ 172 - 0
alg_recsys_rank_item_realtime_1h.py

@@ -0,0 +1,172 @@
+# -*- coding: utf-8 -*-
+import traceback
+import datetime
+from odps import ODPS
+from threading import Timer
+from utils import RedisHelper, get_data_from_odps, send_msg_to_feishu
+from config import set_config
+from log import Log
+from alg_recsys_recall_4h_region_trend import records_process_for_list
+import json
+from datetime import datetime, timedelta
+import sys
+from utils import execute_sql_from_odps
+
+
+config_, _ = set_config()
+log_ = Log()
+redis_helper = RedisHelper()
+
+REDIS_PREFIX = "item_rt_fea_1h_"
+
+def process_and_store(row):
+    video_id, json_str = row
+    key = REDIS_PREFIX + str(video_id)
+    expire_time = 24 * 3600
+    redis_helper.set_data_to_redis(key, json_str, expire_time)
+    # log_.info("video写入数据key={},value={}".format(key, json_str))
+
+def check_data(project, table, partition) -> int:
+    """检查数据是否准备好,输出数据条数"""
+    odps = ODPS(
+        access_id=config_.ODPS_CONFIG['ACCESSID'],
+        secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
+        project=project,
+        endpoint=config_.ODPS_CONFIG['ENDPOINT'],
+        connect_timeout=3000,
+        read_timeout=500000,
+        pool_maxsize=1000,
+        pool_connections=1000
+    )
+    try:
+        t = odps.get_table(name=table)
+        log_.info(f"检查分区是否存在-【 dt={partition} 】")
+        check_res = t.exist_partition(partition_spec=f'dt={partition}')
+        if check_res:
+            sql = f'select * from {project}.{table} where dt = {partition}'
+            log_.info(sql)
+            with odps.execute_sql(sql=sql).open_reader() as reader:
+                data_count = reader.count
+        else:
+            log_.info("表{}分区{}不存在".format(table, partition))
+            data_count = 0
+    except Exception as e:
+        log_.error("table:{},partition:{} no data. return data_count=0:{}".format(table, partition, e))
+        data_count = 0
+    return data_count
+
+def get_sql(date, previous_date_str, project):
+    sql = '''
+    SELECT  videoid
+    ,CONCAT_WS(',',COLLECT_LIST(CONCAT(dt,":",view_uv))) AS view_uv_list_1h
+    ,CONCAT_WS(',',COLLECT_LIST(CONCAT(dt,":",view_pv))) AS view_pv_list_1h
+    ,CONCAT_WS(',',COLLECT_LIST(CONCAT(dt,":",play_uv))) AS play_uv_list_1h
+    ,CONCAT_WS(',',COLLECT_LIST(CONCAT(dt,":",play_pv))) AS play_pv_list_1h
+    ,CONCAT_WS(',',COLLECT_LIST(CONCAT(dt,":",share_uv))) AS share_uv_list_1h
+    ,CONCAT_WS(',',COLLECT_LIST(CONCAT(dt,":",share_pv))) AS share_pv_list_1h
+    ,CONCAT_WS(',',COLLECT_LIST(CONCAT(dt,":",return_uv))) AS return_uv_list_1h
+    ,CONCAT_WS(',',COLLECT_LIST(CONCAT(dt,":",p_return_uv))) AS p_return_uv_list_1h
+    FROM    (
+    SELECT  videoid
+    ,dt
+    ,SUM(lastonehour_view) AS view_uv
+    ,SUM(lastonehour_view_total) AS view_pv
+    ,SUM(lastonehour_play) AS play_uv
+    ,SUM(lastonehour_play_total) AS play_pv
+    ,SUM(lastonehour_share) AS share_uv
+    ,SUM(lastonehour_share_total) AS share_pv
+    ,SUM(lastonehour_return) AS return_uv
+    ,SUM(platform_return) AS p_return_uv
+    FROM    loghubods.video_each_hour_update_no_province_apptype
+    WHERE   dt <= '{}23'
+    AND     dt >= '{}00'
+    GROUP BY videoid
+    ,dt
+    )
+    GROUP BY videoid
+    '''.format(date, previous_date_str)
+    print("sql:" + sql)
+    records = execute_sql_from_odps(project=project, sql=sql)
+    video_list = []
+    with records.open_reader() as reader:
+        for record in reader:
+            video_id = record['videoid']
+            m = dict()
+            try:
+                m["view_uv_list_1h"] = record['view_uv_list_1h']
+            except Exception as e:
+                log_.error(e)
+            try:
+                m["view_pv_list_1h"] = record['view_pv_list_1h']
+            except Exception as e:
+                log_.error(e)
+            try:
+                m["play_uv_list_1h"] = record['play_uv_list_1h']
+            except Exception as e:
+                log_.error(e)
+            try:
+                m["play_pv_list_1h"] = record['play_pv_list_1h']
+            except Exception as e:
+                log_.error(e)
+            try:
+                m["share_uv_list_1h"] = record['share_uv_list_1h']
+            except Exception as e:
+                log_.error(e)
+            try:
+                m["share_pv_list_1h"] = record['share_pv_list_1h']
+            except Exception as e:
+                log_.error(e)
+            try:
+                m["return_uv_list_1h"] = record['return_uv_list_1h']
+            except Exception as e:
+                log_.error(e)
+            try:
+                m["p_return_uv_list_1h"] = record['p_return_uv_list_1h']
+            except Exception as e:
+                log_.error(e)
+            json_str = json.dumps(m)
+            video_list.append([video_id, json_str])
+    return video_list
+
+
+def h_timer_check():
+    try:
+        date = sys.argv[1]
+        hour = sys.argv[2]
+    except Exception as e:
+        now_date = datetime.today()
+        date = datetime.strftime(now_date, '%Y%m%d')
+        hour = datetime.now().hour
+        log_.info("没有读取到参数,采用系统时间,报错info:{}".format(e))
+    # 1 判断上游数据表是否生产完成
+    project = "loghubods"
+    table = "video_each_hour_update_no_province_apptype"
+    partition = str(date) + str(hour)
+    table_data_cnt = check_data(project, table, partition)
+    if table_data_cnt == 0:
+        log_.info("上游数据{}未就绪{},等待...".format(table, partition))
+        Timer(60, h_timer_check).start()
+    else:
+        log_.info("上游数据就绪,count={},开始读取数据表".format(table_data_cnt))
+        # 2 读取数据表 处理特征
+        previous_date_str = (datetime.strptime(date, "%Y%m%d") - timedelta(days=1)).strftime("%Y%m%d")
+        video_list = get_sql(date, previous_date_str, project)
+        # 3 写入redis
+        log_.info("video的数据量:{}".format(len(video_list)))
+        records_process_for_list(video_list, process_and_store, max_size=50, num_workers=8)
+
+        redis_helper.set_data_to_redis(REDIS_PREFIX + "partition", partition, 24 * 3600)
+
+
+
+
+if __name__ == '__main__':
+    log_.info("开始执行:" + datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
+    h_timer_check()
+    log_.info("完成执行:" + datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
+
+
+
+
+# cd /root/zhangbo/rov-offline
+# python alg_recsys_rank_item_realtime_1h.py 20240117 20

+ 20 - 0
alg_recsys_rank_item_realtime_1h_task.sh

@@ -0,0 +1,20 @@
+source /etc/profile
+echo $ROV_OFFLINE_ENV
+if [ ! -d "my_logs_feature" ]; then
+    mkdir my_logs_feature
+fi
+cur_time="`date +%Y%m%d`"
+cur_h="`date +%H`"
+echo "开始执行时间:{$(date "+%Y-%m-%d %H:%M:%S")}"
+
+if [[ $ROV_OFFLINE_ENV == 'test' ]]; then
+  /root/anaconda3/bin/python alg_recsys_rank_item_realtime_1h.py $cur_time $cur_h
+  echo "结束执行时间:{$(date "+%Y-%m-%d %H:%M:%S")}"
+  echo "all done"
+elif [[ $ROV_OFFLINE_ENV == 'pro' ]]; then
+  /root/anaconda3/bin/python alg_recsys_rank_item_realtime_1h.py $cur_time $cur_h
+  echo "结束执行时间:{$(date "+%Y-%m-%d %H:%M:%S")}"
+  echo "all done"
+fi
+
+#sh alg_recsys_rank_item_realtime_1h_task.sh

+ 1 - 1
alg_recsys_recall_1h_region.py

@@ -41,7 +41,7 @@ RULE_PARAMS = {
     'params_list': [
     'params_list': [
         # 532
         # 532
         # {'data': 'data66', 'rule': 'rule66'},  # 523-> 523 & 518
         # {'data': 'data66', 'rule': 'rule66'},  # 523-> 523 & 518
-        {'data': 'data66', 'rule': 'rule67'},  # 523->510
+        # {'data': 'data66', 'rule': 'rule67'},  # 523->510
         # {'data': 'data66', 'rule': 'rule68'},  # 523->514
         # {'data': 'data66', 'rule': 'rule68'},  # 523->514
         # {'data': 'data66', 'rule': 'rule69'},  # 523->518
         # {'data': 'data66', 'rule': 'rule69'},  # 523->518
     ],
     ],

+ 105 - 15
alg_recsys_recall_tags_videos.py

@@ -4,26 +4,43 @@ from config import set_config
 from log import Log
 from log import Log
 from utils import execute_sql_from_odps
 from utils import execute_sql_from_odps
 from db_helper import RedisHelper
 from db_helper import RedisHelper
-import datetime
+from datetime import datetime, timedelta
+
 from alg_recsys_recall_4h_region_trend import records_process_for_list
 from alg_recsys_recall_4h_region_trend import records_process_for_list
 config_, _ = set_config()
 config_, _ = set_config()
 log_ = Log()
 log_ = Log()
 redis_helper = RedisHelper()
 redis_helper = RedisHelper()
 
 
-PROJECT = "videoods"
-TABLE = "videoods.dim_video"
+
 REDIS_PREFIX = "alg_recsys_video_tags_"
 REDIS_PREFIX = "alg_recsys_video_tags_"
 
 
-TAG_SET = ['元旦','腊八节','小年','小年','除夕','春节','情人节','元宵节','龙抬头','妇女节','劳动节','母亲节',
+TAG_SET = set(['元旦','腊八节','小年','小年','除夕','春节','情人节','元宵节','龙抬头','妇女节','劳动节','母亲节',
     '儿童节','端午节','父亲节','建党节','七七事变','建军节','七夕节','中元节','中秋节','毛主席逝世',
     '儿童节','端午节','父亲节','建党节','七七事变','建军节','七夕节','中元节','中秋节','毛主席逝世',
     '国庆节','重阳节','感恩节','公祭日','平安夜','圣诞节','毛主席诞辰','小寒','大寒','立春','雨水',
     '国庆节','重阳节','感恩节','公祭日','平安夜','圣诞节','毛主席诞辰','小寒','大寒','立春','雨水',
     '惊蛰','春分','清明','谷雨','立夏','小满','芒种','夏至','小暑','大暑','立秋','处暑','白露','秋分',
     '惊蛰','春分','清明','谷雨','立夏','小满','芒种','夏至','小暑','大暑','立秋','处暑','白露','秋分',
     '寒露','霜降','立冬','小雪','大雪','冬至','早上好','中午好','下午好','晚上好','晚安','祝福',
     '寒露','霜降','立冬','小雪','大雪','冬至','早上好','中午好','下午好','晚上好','晚安','祝福',
-    'P1高风险','P0高风险'
-]
+    'P1高风险','P0高风险',
+"早上好","中午好","下午好","晚上好","晚安",
+        "祝福",
+"元旦","腊八节","小年","除夕","春节","情人节","元宵节","龙抬头","妇女节","劳动节","母亲节","儿童节","端午节","父亲节",
+        "建党节","七七事变","建军节","七夕节","中元节","中秋节","毛主席逝世","国庆节","重阳节","感恩节","公祭日","平安夜","圣诞节"
+        ,"毛主席诞辰",
+
+        "小寒","大寒","立春","雨水","惊蛰","春分","清明","谷雨","立夏","小满","芒种","夏至","小暑","大暑","立秋","处暑","白露",
+        "秋分","寒露","霜降","立冬","小雪","大雪","冬至",
+
+        "孙中山诞辰","孙中山逝世","周恩来诞辰","周恩来逝世","邓小平诞辰","邓小平逝世","李克强诞辰","李克强逝世","袁隆平诞辰",
+        "袁隆平逝世","彭德怀诞辰","彭德怀逝世","朱德诞辰","朱德逝世","吴尊友逝世",
+
+               "初一","初二","初三","初四","初五"
+
+
+])
 
 
 def get_video_tags():
 def get_video_tags():
     """获取视频的tag"""
     """获取视频的tag"""
+    PROJECT = "videoods"
+    TABLE = "videoods.dim_video"
     try:
     try:
         sql = "SELECT  videoid \
         sql = "SELECT  videoid \
 ,tags \
 ,tags \
@@ -91,6 +108,11 @@ OR      exploded_value = '晚安' \
 OR      exploded_value = '祝福' \
 OR      exploded_value = '祝福' \
 OR      exploded_value = 'P1高风险' \
 OR      exploded_value = 'P1高风险' \
 OR      exploded_value = 'P0高风险' \
 OR      exploded_value = 'P0高风险' \
+OR      exploded_value = '初一' \
+OR      exploded_value = '初二' \
+OR      exploded_value = '初三' \
+OR      exploded_value = '初四' \
+OR      exploded_value = '初五' \
 )".format(TABLE)
 )".format(TABLE)
         print("sql:"+sql)
         print("sql:"+sql)
         records = execute_sql_from_odps(project=PROJECT, sql=sql)
         records = execute_sql_from_odps(project=PROJECT, sql=sql)
@@ -103,26 +125,94 @@ OR      exploded_value = 'P0高风险' \
                 d["video_id"] = video_id
                 d["video_id"] = video_id
                 d["tags"] = tags
                 d["tags"] = tags
                 video_tags_list.append(d)
                 video_tags_list.append(d)
-                log_.info("{}:{}".format(video_id, tags))
-        log_.info("video的数据量:{}".format(len(video_tags_list)))
-        records_process_for_list(video_tags_list, process_and_store, max_size=50, num_workers=8)
-        log_.info("video的数据量:{}".format(len(video_tags_list)))
-
+                # log_.info("{}:{}".format(video_id, tags))
+        log_.info("天级表:{}".format(str(len(video_tags_list))))
+        return video_tags_list
     except Exception as e:
     except Exception as e:
         log_.error(str(e) + str(traceback.format_exc()))
         log_.error(str(e) + str(traceback.format_exc()))
+    return []
+def get_video_tags_v2():
+    PROJECT = "loghubods"
+    TABLE = "loghubods.automated_updates_category_labels_1"
+    # now_date = datetime.today()
+    # date = datetime.strftime(now_date, '%Y%m%d')
+    # previous_date = now_date - timedelta(days=1)
+    # previous_date_str = datetime.strftime(previous_date, '%Y%m%d')
 
 
+    # 获取当前日期
+    now_date = datetime.today()
+    # 获取当前月份
+    current_month = now_date.strftime('%Y%m')
+    # 获取上个月份
+    previous_month = (now_date - timedelta(days=now_date.day)).strftime('%Y%m')
+    try:
+        sql = '''SELECT  videoid
+        ,secondary_labels
+FROM    loghubods.automated_updates_category_labels_1
+WHERE   (
+            dt LIKE '{}%' OR dt LIKE '{}%'
+)
+'''.format(current_month, previous_month)
+        print("sql:" + sql)
+        records = execute_sql_from_odps(project=PROJECT, sql=sql)
+        video_tags_list = []
+        with records.open_reader() as reader:
+            for record in reader:
+                video_id = int(record['videoid'])
+                tags = ",".join([i for i in str(record['secondary_labels']).split(",") if i in TAG_SET])
+                d = {}
+                d["video_id"] = video_id
+                d["tags"] = tags
+                video_tags_list.append(d)
+                # log_.info("{}:{}".format(video_id, tags))
+        log_.info("增量表:{}".format(str(len(video_tags_list))))
+        return video_tags_list
+    except Exception as e:
+        log_.error(str(e) + str(traceback.format_exc()))
+    return []
 def process_and_store(row):
 def process_and_store(row):
     video_id = row["video_id"]
     video_id = row["video_id"]
     tags = row["tags"]
     tags = row["tags"]
     key = REDIS_PREFIX + str(video_id)
     key = REDIS_PREFIX + str(video_id)
     expire_time = 24 * 3600 * 2
     expire_time = 24 * 3600 * 2
     redis_helper.set_data_to_redis(key, tags, expire_time)
     redis_helper.set_data_to_redis(key, tags, expire_time)
-    log_.info("video-tags写入数据key={},value={}".format(key, tags))
+    # log_.info("video-tags写入数据key={},value={}".format(key, tags))
 
 
 def main():
 def main():
-    log_.info("开始执行:" + datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
-    get_video_tags()
-    log_.info("完成执行:" + datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
+    log_.info("开始执行:" + datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
+    video_tags_list = get_video_tags()
+    video_tags_list2 = get_video_tags_v2()
+
+
+    m1 = {}
+    m2 = {}
+    for row in video_tags_list:
+        m1[row["video_id"]] = row["tags"]
+    for row in video_tags_list2:
+        m2[row["video_id"]] = row["tags"]
+    merged_map = {}
+    merged_map.update(m1)
+    for key, value in m2.items():
+        if key in merged_map:
+            l1 = merged_map[key].split(",")
+            l2 = value.split(",")
+            l1.extend(l2)
+            l3 = list(set(l1))
+            merged_map[key] = ",".join(l3)
+        else:
+            merged_map[key] = value
+    result = []
+    for key, value in merged_map.items():
+        tmp = {}
+        tmp["video_id"] = key
+        tmp["tags"] = value
+        result.append(tmp)
+        log_.info("{}:{}".format(key, value))
+
+    log_.info("video的数据量:{}".format(len(result)))
+    records_process_for_list(result, process_and_store, max_size=50, num_workers=8)
+    log_.info("video的数据量:{}".format(len(result)))
+    log_.info("完成执行:" + datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
 
 
 if __name__ == '__main__':
 if __name__ == '__main__':
     main()
     main()

+ 215 - 0
alg_recsys_recall_tags_videosv2.py

@@ -0,0 +1,215 @@
+# -*- coding: utf-8 -*-
+import traceback
+from config import set_config
+from log import Log
+from utils import execute_sql_from_odps
+from db_helper import RedisHelper
+from datetime import datetime, timedelta
+
+from alg_recsys_recall_4h_region_trend import records_process_for_list
+config_, _ = set_config()
+log_ = Log()
+redis_helper = RedisHelper()
+
+
+REDIS_PREFIX = "alg_recsys_video_tags_"
+
+TAG_SET = set(['元旦','腊八节','小年','小年','除夕','春节','情人节','元宵节','龙抬头','妇女节','劳动节','母亲节',
+    '儿童节','端午节','父亲节','建党节','七七事变','建军节','七夕节','中元节','中秋节','毛主席逝世',
+    '国庆节','重阳节','感恩节','公祭日','平安夜','圣诞节','毛主席诞辰','小寒','大寒','立春','雨水',
+    '惊蛰','春分','清明','谷雨','立夏','小满','芒种','夏至','小暑','大暑','立秋','处暑','白露','秋分',
+    '寒露','霜降','立冬','小雪','大雪','冬至','早上好','中午好','下午好','晚上好','晚安','祝福',
+    'P1高风险','P0高风险',
+"早上好","中午好","下午好","晚上好","晚安",
+        "祝福",
+"元旦","腊八节","小年","除夕","春节","情人节","元宵节","龙抬头","妇女节","劳动节","母亲节","儿童节","端午节","父亲节",
+        "建党节","七七事变","建军节","七夕节","中元节","中秋节","毛主席逝世","国庆节","重阳节","感恩节","公祭日","平安夜","圣诞节"
+        ,"毛主席诞辰",
+
+        "小寒","大寒","立春","雨水","惊蛰","春分","清明","谷雨","立夏","小满","芒种","夏至","小暑","大暑","立秋","处暑","白露",
+        "秋分","寒露","霜降","立冬","小雪","大雪","冬至",
+
+        "孙中山诞辰","孙中山逝世","周恩来诞辰","周恩来逝世","邓小平诞辰","邓小平逝世","李克强诞辰","李克强逝世","袁隆平诞辰",
+        "袁隆平逝世","彭德怀诞辰","彭德怀逝世","朱德诞辰","朱德逝世","吴尊友逝世"
+])
+
+def get_video_tags():
+    """获取视频的tag"""
+    PROJECT = "videoods"
+    TABLE = "videoods.dim_video"
+    try:
+        sql = "SELECT  videoid \
+,tags \
+FROM    {} \
+LATERAL VIEW EXPLODE(SPLIT(tags,',')) exploded AS exploded_value \
+WHERE   tags IS NOT NULL \
+AND     ( \
+exploded_value = '元旦' \
+OR      exploded_value = '腊八节' \
+OR      exploded_value = '小年' \
+OR      exploded_value = '除夕' \
+OR      exploded_value = '春节' \
+OR      exploded_value = '情人节' \
+OR      exploded_value = '元宵节' \
+OR      exploded_value = '龙抬头' \
+OR      exploded_value = '妇女节' \
+OR      exploded_value = '劳动节' \
+OR      exploded_value = '母亲节' \
+OR      exploded_value = '儿童节' \
+OR      exploded_value = '端午节' \
+OR      exploded_value = '父亲节' \
+OR      exploded_value = '建党节' \
+OR      exploded_value = '七七事变' \
+OR      exploded_value = '建军节' \
+OR      exploded_value = '七夕节' \
+OR      exploded_value = '中元节' \
+OR      exploded_value = '中秋节' \
+OR      exploded_value = '毛主席逝世' \
+OR      exploded_value = '国庆节' \
+OR      exploded_value = '重阳节' \
+OR      exploded_value = '感恩节' \
+OR      exploded_value = '公祭日' \
+OR      exploded_value = '平安夜' \
+OR      exploded_value = '圣诞节' \
+OR      exploded_value = '毛主席诞辰' \
+OR      exploded_value = '小寒' \
+OR      exploded_value = '大寒' \
+OR      exploded_value = '立春' \
+OR      exploded_value = '雨水' \
+OR      exploded_value = '惊蛰' \
+OR      exploded_value = '春分' \
+OR      exploded_value = '清明' \
+OR      exploded_value = '谷雨' \
+OR      exploded_value = '立夏' \
+OR      exploded_value = '小满' \
+OR      exploded_value = '芒种' \
+OR      exploded_value = '夏至' \
+OR      exploded_value = '小暑' \
+OR      exploded_value = '大暑' \
+OR      exploded_value = '立秋' \
+OR      exploded_value = '处暑' \
+OR      exploded_value = '白露' \
+OR      exploded_value = '秋分' \
+OR      exploded_value = '寒露' \
+OR      exploded_value = '霜降' \
+OR      exploded_value = '立冬' \
+OR      exploded_value = '小雪' \
+OR      exploded_value = '大雪' \
+OR      exploded_value = '冬至' \
+OR      exploded_value = '早上好' \
+OR      exploded_value = '中午好' \
+OR      exploded_value = '下午好' \
+OR      exploded_value = '晚上好' \
+OR      exploded_value = '晚安' \
+OR      exploded_value = '祝福' \
+OR      exploded_value = 'P1高风险' \
+OR      exploded_value = 'P0高风险' \
+)".format(TABLE)
+        print("sql:"+sql)
+        records = execute_sql_from_odps(project=PROJECT, sql=sql)
+        video_tags_list = []
+        with records.open_reader() as reader:
+            for record in reader:
+                video_id = int(record['videoid'])
+                tags = ",".join([i for i in str(record['tags']).split(",") if i in TAG_SET])
+                d = {}
+                d["video_id"] = video_id
+                d["tags"] = tags
+                video_tags_list.append(d)
+                # log_.info("{}:{}".format(video_id, tags))
+        log_.info("天级表:{}".format(str(len(video_tags_list))))
+        return video_tags_list
+    except Exception as e:
+        log_.error(str(e) + str(traceback.format_exc()))
+    return []
+def get_video_tags_v2():
+    PROJECT = "loghubods"
+    TABLE = "loghubods.automated_updates_category_labels_1"
+    # now_date = datetime.today()
+    # date = datetime.strftime(now_date, '%Y%m%d')
+    # previous_date = now_date - timedelta(days=1)
+    # previous_date_str = datetime.strftime(previous_date, '%Y%m%d')
+
+    # 获取当前日期
+    now_date = datetime.today()
+    # 获取当前月份
+    current_month = now_date.strftime('%Y%m')
+    # 获取上个月份
+    previous_month = (now_date - timedelta(days=now_date.day)).strftime('%Y%m')
+    try:
+        sql = '''SELECT  videoid
+        ,secondary_labels
+FROM    loghubods.automated_updates_category_labels_1
+WHERE   (
+            dt LIKE '{}%' OR dt LIKE '{}%'
+)
+'''.format(current_month, previous_month)
+        print("sql:" + sql)
+        records = execute_sql_from_odps(project=PROJECT, sql=sql)
+        video_tags_list = []
+        with records.open_reader() as reader:
+            for record in reader:
+                video_id = int(record['videoid'])
+                tags = ",".join([i for i in str(record['secondary_labels']).split(",") if i in TAG_SET])
+                d = {}
+                d["video_id"] = video_id
+                d["tags"] = tags
+                video_tags_list.append(d)
+                # log_.info("{}:{}".format(video_id, tags))
+        log_.info("增量表:{}".format(str(len(video_tags_list))))
+        return video_tags_list
+    except Exception as e:
+        log_.error(str(e) + str(traceback.format_exc()))
+    return []
+def process_and_store(row):
+    video_id = row["video_id"]
+    tags = row["tags"]
+    key = REDIS_PREFIX + str(video_id)
+    expire_time = 24 * 3600 * 2
+    redis_helper.set_data_to_redis(key, tags, expire_time)
+    # log_.info("video-tags写入数据key={},value={}".format(key, tags))
+
+def main():
+    log_.info("开始执行:" + datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
+    video_tags_list = get_video_tags()
+    video_tags_list2 = get_video_tags_v2()
+
+
+    m1 = {}
+    m2 = {}
+    for row in video_tags_list:
+        m1[row["video_id"]] = row["tags"]
+    for row in video_tags_list2:
+        m2[row["video_id"]] = row["tags"]
+    merged_map = {}
+    merged_map.update(m1)
+    for key, value in m2.items():
+        if key in merged_map:
+            l1 = merged_map[key].split(",")
+            l2 = value.split(",")
+            l1.extend(l2)
+            l3 = list(set(l1))
+            merged_map[key] = ",".join(l3)
+        else:
+            merged_map[key] = value
+    result = []
+    for key, value in merged_map.items():
+        tmp = {}
+        tmp["video_id"] = key
+        tmp["tags"] = value
+        result.append(tmp)
+        log_.info("{}:{}".format(key, value))
+
+    log_.info("video的数据量:{}".format(len(result)))
+    records_process_for_list(result, process_and_store, max_size=50, num_workers=8)
+    log_.info("video的数据量:{}".format(len(result)))
+    log_.info("完成执行:" + datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
+
+if __name__ == '__main__':
+    main()
+
+
+
+
+# cd /root/zhangbo/rov-offline
+# python alg_recsys_recall_shield_videos.py

+ 70 - 0
alg_recsys_recall_undertake.py

@@ -0,0 +1,70 @@
+# -*- coding: utf-8 -*-
+import traceback
+from config import set_config
+from log import Log
+from utils import execute_sql_from_odps
+from db_helper import RedisHelper
+from datetime import datetime, timedelta
+
+from alg_recsys_recall_4h_region_trend import records_process_for_list
+config_, _ = set_config()
+log_ = Log()
+redis_helper = RedisHelper()
+
+
+
+def main():
+    log_.info("开始执行:" + datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
+    #1 读取当前时间及上一时间
+    now_date = datetime.today() # - timedelta(hours=1)
+    hour = datetime.strftime(now_date, '%H').lstrip('0')
+    date = datetime.strftime(now_date, '%Y%m%d')
+    if hour == "":
+        hour = "0"
+    previous_date = now_date + timedelta(hours=1)
+    hour_next = datetime.strftime(previous_date, '%H').lstrip('0')
+    date_next = datetime.strftime(previous_date, '%Y%m%d')
+    if hour_next == "":
+        hour_next = "0"
+    #2 循环拼接key,读一个写一个。
+    REGION_CODE = {
+        '北京': '110000', '天津': '120000', '河北省': '130000', '山西省': '140000', '内蒙古': '150000',
+        '辽宁省': '210000', '吉林省': '220000', '黑龙江省': '230000',
+        '上海': '310000', '江苏省': '320000', '浙江省': '330000', '安徽省': '340000', '福建省': '350000',
+        '江西省': '360000',
+        '山东省': '370000',
+        '河南省': '410000', '湖北省': '420000', '湖南省': '430000', '广东省': '440000', '广西': '450000',
+        '海南省': '460000',
+        '重庆': '500000', '四川省': '510000', '贵州省': '520000', '云南省': '530000', '西藏': '540000',
+        '陕西省': '610000', '甘肃省': '620000', '青海省': '630000', '宁夏': '640000', '新疆': '650000',
+        '台湾省': '710000', '香港': '810000', '澳门': '820000',
+
+        '广州': '440100', '深圳': '440300', '成都': '510100', '长沙': '430100',
+    }
+    keys = []
+    for _, code in REGION_CODE.items():
+        read_key = "recall:item:score:region:h:{}:data66:rule66:{}:{}".format(code, date, hour)
+        keys.append(read_key)
+        values = redis_helper.get_data_zset_with_index(read_key, 0, -1, True, True)
+        print(str(read_key))
+        print(str(values))
+        if values and len(values) > 0:
+            new_kvs = {}
+            for k, v in values:
+                new_kvs[k] = v
+            write_key = "recall:item:score:region:h:{}:data66:rule66:{}:{}".format(code, date_next, hour_next)
+            redis_helper.add_data_with_zset(write_key, new_kvs, expire_time = 1*24*3600)
+            print(str(write_key))
+
+
+    # records_process_for_list(result, process_and_store, max_size=50, num_workers=8)
+    log_.info("完成执行:" + datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
+
+if __name__ == '__main__':
+    main()
+
+
+
+
+# cd /root/zhangbo/rov-offline
+# python alg_recsys_recall_undertake.py

+ 22 - 0
alg_recsys_recall_undertake_task.sh

@@ -0,0 +1,22 @@
+source /etc/profile
+echo $ROV_OFFLINE_ENV
+if [ ! -d "my_logs_undertask" ]; then
+    # 如果文件夹不存在,则创建文件夹
+    mkdir my_logs_undertask
+fi
+cur_time="`date +%Y%m%d`"
+cur_h="`date +%H`"
+echo "开始执行时间:{$cur_time}-{$cur_h}"
+
+if [[ $ROV_OFFLINE_ENV == 'test' ]]; then
+  cd /root/zhangbo/rov-offline
+  /root/anaconda3/bin/python alg_recsys_recall_undertake.py
+  echo "all done"
+elif [[ $ROV_OFFLINE_ENV == 'pro' ]]; then
+  cd /root/zhangbo/rov-offline
+  /root/anaconda3/bin/python alg_recsys_recall_undertake.py
+  echo "all done"
+fi
+cur_time="`date +%Y%m%d`"
+cur_h="`date +%H`"
+echo "结束执行时间:{$cur_time}-{$cur_h}"

+ 84 - 0
alg_recsys_utils.py

@@ -0,0 +1,84 @@
+# -*- coding: utf-8 -*-
+from odps import ODPS
+import argparse
+
+ODPS_CONFIG = {
+        'ENDPOINT': 'http://service.cn.maxcompute.aliyun.com/api',
+        'ACCESSID': 'LTAIWYUujJAm7CbH',
+        'ACCESSKEY': 'RfSjdiWwED1sGFlsjXv0DlfTnZTG1P',
+}
+
+def check_data(project, table, partition) -> int:
+    """检查数据是否准备好,输出数据条数"""
+    odps = ODPS(
+        access_id=ODPS_CONFIG['ACCESSID'],
+        secret_access_key=ODPS_CONFIG['ACCESSKEY'],
+        project=project,
+        endpoint=ODPS_CONFIG['ENDPOINT'],
+        connect_timeout=3000,
+        read_timeout=500000,
+        pool_maxsize=1000,
+        pool_connections=1000
+    )
+    try:
+        t = odps.get_table(name=table)
+        check_res = t.exist_partition(partition_spec=f'dt={partition}')
+        if check_res:
+            sql = f'select * from {project}.{table} where dt = {partition}'
+            with odps.execute_sql(sql=sql).open_reader() as reader:
+                data_count = reader.count
+        else:
+            data_count = 0
+    except Exception as e:
+        print("error:" + str(e))
+        data_count = 0
+    return data_count
+
+
+def check_origin_hive(args):
+    project = "loghubods"
+    table = "alg_recsys_view_sample_v2"
+    partition = args.partition
+    count = check_data(project, table, partition)
+    if count == 0:
+        print("1")
+        exit(1)
+    else:
+        print("0")
+def check_item_hive(args):
+    project = "loghubods"
+    table = "alg_recsys_video_info"
+    partition = args.partition
+    count = check_data(project, table, partition)
+    if count == 0:
+        print("1")
+        exit(1)
+    else:
+        print("0")
+def check_user_hive(args):
+    project = "loghubods"
+    table = "alg_recsys_user_info"
+    partition = args.partition
+    count = check_data(project, table, partition)
+    if count == 0:
+        print("1")
+        exit(1)
+    else:
+        print("0")
+
+
+if __name__ == '__main__':
+    parser = argparse.ArgumentParser(description='脚本utils')
+    parser.add_argument('--excute_program', type=str, help='执行程序')
+    parser.add_argument('--partition', type=str, help='表分区')
+    args = parser.parse_args()
+    if args.excute_program == "check_origin_hive":
+        check_origin_hive(args)
+    if args.excute_program == "check_item_hive":
+        check_item_hive(args)
+    if args.excute_program == "check_user_hive":
+        check_user_hive(args)
+    else:
+        print("无合法参数,验证失败。")
+        exit(999)
+

+ 2 - 1
config.py

@@ -419,7 +419,8 @@ class BaseConfig(object):
                        'region_24h_rule_key': 'rule4', '24h_rule_key': 'rule4', 'merge_func': 2,
                        'region_24h_rule_key': 'rule4', '24h_rule_key': 'rule4', 'merge_func': 2,
                        'score_func': 'back_rate_rank_weighting'},
                        'score_func': 'back_rate_rank_weighting'},
             'rule66': {
             'rule66': {
-                'view_type': 'video-show-region', 'platform_return_rate': 0.001,
+                # 'view_type': 'video-show-region', 'platform_return_rate': 0.001,
+                'view_type': 'video-show-region', "return_countv2": 2, 'platform_return_ratev2': 0.001,
                 'region_24h_rule_key': 'rule66', '24h_rule_key': 'rule66'
                 'region_24h_rule_key': 'rule66', '24h_rule_key': 'rule66'
             },
             },
             'rule68': {
             'rule68': {