Parcourir la source

Merge branch 'master' of https://git.yishihui.com/algorithm/rov-offline

gufengshou1 il y a 1 an
Parent
commit
24d3c1ea1f

+ 8 - 0
alg_recsys_delete_file.sh

@@ -0,0 +1,8 @@
+
+day="$(date -d '4 days ago' +%Y-%m-%d)"
+rm -rf /root/zhangbo/rov-offline/my_logs_tags/tags_${day}*
+rm -rf /root/zhangbo/rov-offline/my_logs/task_${day}*
+rm -rf /root/zhangbo/rov-offline/my_logs_feature/rt_1day_${day}*
+rm -rf /root/zhangbo/rov-offline/my_logs_feature/rt_1h_${day}*
+rm -rf /root/zhangbo/rov-offline/my_logs_shield/shield_videos_${day}*
+rm -rf /root/zhangbo/rov-offline/logs/${day}*

+ 192 - 0
alg_recsys_rank_item_realtime_1day.py

@@ -0,0 +1,192 @@
+# -*- coding: utf-8 -*-
+import traceback
+import datetime
+from odps import ODPS
+from threading import Timer
+from utils import RedisHelper, get_data_from_odps, send_msg_to_feishu
+from config import set_config
+from log import Log
+from alg_recsys_recall_4h_region_trend import records_process_for_list
+import json
+from datetime import datetime, timedelta
+import sys
+from utils import execute_sql_from_odps
+
+
+config_, _ = set_config()
+log_ = Log()
+redis_helper = RedisHelper()
+
+REDIS_PREFIX = "item_rt_fea_1day_"
+
+def process_and_store(row):
+    video_id, json_str = row
+    key = REDIS_PREFIX + str(video_id)
+    expire_time = 24 * 3600
+    redis_helper.set_data_to_redis(key, json_str, expire_time)
+    # log_.info("video写入数据key={},value={}".format(key, json_str))
+
+def check_data(project, table, partition) -> int:
+    """检查数据是否准备好,输出数据条数"""
+    odps = ODPS(
+        access_id=config_.ODPS_CONFIG['ACCESSID'],
+        secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
+        project=project,
+        endpoint=config_.ODPS_CONFIG['ENDPOINT'],
+        connect_timeout=3000,
+        read_timeout=500000,
+        pool_maxsize=1000,
+        pool_connections=1000
+    )
+    try:
+        t = odps.get_table(name=table)
+        log_.info(f"检查分区是否存在-【 dt={partition} 】")
+        check_res = t.exist_partition(partition_spec=f'dt={partition}')
+        if check_res:
+            sql = f'select * from {project}.{table} where dt = {partition}'
+            log_.info(sql)
+            with odps.execute_sql(sql=sql).open_reader() as reader:
+                data_count = reader.count
+        else:
+            log_.info("表{}分区{}不存在".format(table, partition))
+            data_count = 0
+    except Exception as e:
+        log_.error("table:{},partition:{} no data. return data_count=0:{}".format(table, partition, e))
+        data_count = 0
+    return data_count
+
+def get_sql(date, previous_date_str, project):
+    sql = '''
+    SELECT  videoid
+            ,CONCAT_WS(',',COLLECT_LIST(CONCAT(dt,":",view_pv))) AS view_pv_list_1day
+            ,CONCAT_WS(',',COLLECT_LIST(CONCAT(dt,":",view_uv))) AS view_uv_list_1day
+            ,CONCAT_WS(',',COLLECT_LIST(CONCAT(dt,":",play_pv))) AS play_pv_list_1day
+            ,CONCAT_WS(',',COLLECT_LIST(CONCAT(dt,":",play_uv))) AS play_uv_list_1day
+            ,CONCAT_WS(',',COLLECT_LIST(CONCAT(dt,":",share_pv))) AS share_pv_list_1day
+            ,CONCAT_WS(',',COLLECT_LIST(CONCAT(dt,":",share_uv))) AS share_uv_list_1day
+            ,CONCAT_WS(',',COLLECT_LIST(CONCAT(dt,":",return_uv))) AS return_uv_list_1day
+            ,CONCAT_WS(',',COLLECT_LIST(CONCAT(dt,":",p_view_uv))) AS p_view_uv_list_1day
+            ,CONCAT_WS(',',COLLECT_LIST(CONCAT(dt,":",p_view_pv))) AS p_view_pv_list_1day
+            ,CONCAT_WS(',',COLLECT_LIST(CONCAT(dt,":",p_return_uv))) AS p_return_uv_list_1day
+            ,CONCAT_WS(',',COLLECT_LIST(CONCAT(dt,":",2day_share_uv))) AS share_uv_list_2day
+            ,CONCAT_WS(',',COLLECT_LIST(CONCAT(dt,":",2day_share_pv))) AS share_pv_list_2day
+            ,CONCAT_WS(',',COLLECT_LIST(CONCAT(dt,":",3day_share_uv))) AS share_uv_list_3day
+            ,CONCAT_WS(',',COLLECT_LIST(CONCAT(dt,":",3day_share_pv))) AS share_pv_list_3day
+    FROM    (
+                SELECT  videoid
+                        ,dt
+                        ,SUM(view次数) AS view_pv
+                        ,SUM(view人数) AS view_uv
+                        ,SUM(play次数) AS play_pv
+                        ,SUM(play人数) AS play_uv
+                        ,SUM(share次数) AS share_pv
+                        ,SUM(share人数) AS share_uv
+                        ,SUM(回流人数) AS return_uv
+                        ,SUM(platform_view) AS p_view_uv
+                        ,SUM(platform_view_total) AS p_view_pv
+                        ,SUM(platform_return) AS p_return_uv
+                        ,SUM(lasttwodays_share) AS 2day_share_uv
+                        ,SUM(lasttwodays_share_total) AS 2day_share_pv
+                        ,SUM(lastthreedays_share) AS 3day_share_uv
+                        ,SUM(lastthreedays_share_total) AS 3day_share_pv
+                FROM    loghubods.video_data_each_hour_dataset_24h_total_apptype
+                WHERE   dt <= '{}23'
+                AND     dt >= '{}00'
+                GROUP BY videoid
+                         ,dt
+            ) 
+    GROUP BY videoid
+    '''.format(date, previous_date_str)
+    print("sql:" + sql)
+    records = execute_sql_from_odps(project=project, sql=sql)
+    video_list = []
+    with records.open_reader() as reader:
+        for record in reader:
+            video_id = record['videoid']
+            m = dict()
+            try:
+                m["view_pv_list_1day"] = record['view_pv_list_1day']
+            except Exception as e:
+                log_.error(e)
+            try:
+                m["view_uv_list_1day"] = record['view_uv_list_1day']
+            except Exception as e:
+                log_.error(e)
+            try:
+                m["play_pv_list_1day"] = record['play_pv_list_1day']
+            except Exception as e:
+                log_.error(e)
+            try:
+                m["play_uv_list_1day"] = record['play_uv_list_1day']
+            except Exception as e:
+                log_.error(e)
+            try:
+                m["share_pv_list_1day"] = record['share_pv_list_1day']
+            except Exception as e:
+                log_.error(e)
+            try:
+                m["share_uv_list_1day"] = record['share_uv_list_1day']
+            except Exception as e:
+                log_.error(e)
+            try:
+                m["return_uv_list_1day"] = record['return_uv_list_1day']
+            except Exception as e:
+                log_.error(e)
+            try:
+                m["p_view_pv_list_1day"] = record['p_view_pv_list_1day']
+            except Exception as e:
+                log_.error(e)
+            try:
+                m["p_view_uv_list_1day"] = record['p_view_uv_list_1day']
+            except Exception as e:
+                log_.error(e)
+            try:
+                m["p_return_uv_list_1day"] = record['p_return_uv_list_1day']
+            except Exception as e:
+                log_.error(e)
+            json_str = json.dumps(m)
+            video_list.append([video_id, json_str])
+    return video_list
+
+
+def h_timer_check():
+    try:
+        date = sys.argv[1]
+        hour = sys.argv[2]
+    except Exception as e:
+        now_date = datetime.today()
+        date = datetime.strftime(now_date, '%Y%m%d')
+        hour = datetime.now().hour
+        log_.info("没有读取到参数,采用系统时间,报错info:{}".format(e))
+    # 1 判断上游数据表是否生产完成
+    project = "loghubods"
+    table = "video_data_each_hour_dataset_24h_total_apptype"
+    partition = str(date) + str(hour)
+    table_data_cnt = check_data(project, table, partition)
+    if table_data_cnt == 0:
+        log_.info("上游数据{}未就绪{},等待...".format(table, partition))
+        Timer(60, h_timer_check).start()
+    else:
+        log_.info("上游数据就绪,count={},开始读取数据表".format(table_data_cnt))
+        # 2 读取数据表 处理特征
+        previous_date_str = (datetime.strptime(date, "%Y%m%d") - timedelta(days=1)).strftime("%Y%m%d")
+        video_list = get_sql(date, previous_date_str, project)
+        # 3 写入redis
+        log_.info("video的数据量:{}".format(len(video_list)))
+        records_process_for_list(video_list, process_and_store, max_size=50, num_workers=8)
+
+        redis_helper.set_data_to_redis(REDIS_PREFIX + "partition", partition, 24 * 3600)
+
+
+
+
+if __name__ == '__main__':
+    log_.info("开始执行:" + datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
+    h_timer_check()
+    log_.info("完成执行:" + datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
+
+
+
+
+# cd /root/zhangbo/rov-offline
+# python alg_recsys_rank_item_realtime_1day.py 20240117 20

+ 20 - 0
alg_recsys_rank_item_realtime_1day_task.sh

@@ -0,0 +1,20 @@
+source /etc/profile
+echo $ROV_OFFLINE_ENV
+if [ ! -d "my_logs_feature" ]; then
+    mkdir my_logs_feature
+fi
+cur_time="`date +%Y%m%d`"
+cur_h="`date +%H`"
+echo "开始执行时间:{$(date "+%Y-%m-%d %H:%M:%S")}"
+
+if [[ $ROV_OFFLINE_ENV == 'test' ]]; then
+  /root/anaconda3/bin/python alg_recsys_rank_item_realtime_1day.py $cur_time $cur_h
+  echo "结束执行时间:{$(date "+%Y-%m-%d %H:%M:%S")}"
+  echo "all done"
+elif [[ $ROV_OFFLINE_ENV == 'pro' ]]; then
+  /root/anaconda3/bin/python alg_recsys_rank_item_realtime_1day.py $cur_time $cur_h
+  echo "结束执行时间:{$(date "+%Y-%m-%d %H:%M:%S")}"
+  echo "all done"
+fi
+
+#sh alg_recsys_rank_item_realtime_1day_task.sh

+ 172 - 0
alg_recsys_rank_item_realtime_1h.py

@@ -0,0 +1,172 @@
+# -*- coding: utf-8 -*-
+import traceback
+import datetime
+from odps import ODPS
+from threading import Timer
+from utils import RedisHelper, get_data_from_odps, send_msg_to_feishu
+from config import set_config
+from log import Log
+from alg_recsys_recall_4h_region_trend import records_process_for_list
+import json
+from datetime import datetime, timedelta
+import sys
+from utils import execute_sql_from_odps
+
+
+config_, _ = set_config()
+log_ = Log()
+redis_helper = RedisHelper()
+
+REDIS_PREFIX = "item_rt_fea_1h_"
+
+def process_and_store(row):
+    video_id, json_str = row
+    key = REDIS_PREFIX + str(video_id)
+    expire_time = 24 * 3600
+    redis_helper.set_data_to_redis(key, json_str, expire_time)
+    # log_.info("video写入数据key={},value={}".format(key, json_str))
+
+def check_data(project, table, partition) -> int:
+    """检查数据是否准备好,输出数据条数"""
+    odps = ODPS(
+        access_id=config_.ODPS_CONFIG['ACCESSID'],
+        secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
+        project=project,
+        endpoint=config_.ODPS_CONFIG['ENDPOINT'],
+        connect_timeout=3000,
+        read_timeout=500000,
+        pool_maxsize=1000,
+        pool_connections=1000
+    )
+    try:
+        t = odps.get_table(name=table)
+        log_.info(f"检查分区是否存在-【 dt={partition} 】")
+        check_res = t.exist_partition(partition_spec=f'dt={partition}')
+        if check_res:
+            sql = f'select * from {project}.{table} where dt = {partition}'
+            log_.info(sql)
+            with odps.execute_sql(sql=sql).open_reader() as reader:
+                data_count = reader.count
+        else:
+            log_.info("表{}分区{}不存在".format(table, partition))
+            data_count = 0
+    except Exception as e:
+        log_.error("table:{},partition:{} no data. return data_count=0:{}".format(table, partition, e))
+        data_count = 0
+    return data_count
+
+def get_sql(date, previous_date_str, project):
+    sql = '''
+    SELECT  videoid
+    ,CONCAT_WS(',',COLLECT_LIST(CONCAT(dt,":",view_uv))) AS view_uv_list_1h
+    ,CONCAT_WS(',',COLLECT_LIST(CONCAT(dt,":",view_pv))) AS view_pv_list_1h
+    ,CONCAT_WS(',',COLLECT_LIST(CONCAT(dt,":",play_uv))) AS play_uv_list_1h
+    ,CONCAT_WS(',',COLLECT_LIST(CONCAT(dt,":",play_pv))) AS play_pv_list_1h
+    ,CONCAT_WS(',',COLLECT_LIST(CONCAT(dt,":",share_uv))) AS share_uv_list_1h
+    ,CONCAT_WS(',',COLLECT_LIST(CONCAT(dt,":",share_pv))) AS share_pv_list_1h
+    ,CONCAT_WS(',',COLLECT_LIST(CONCAT(dt,":",return_uv))) AS return_uv_list_1h
+    ,CONCAT_WS(',',COLLECT_LIST(CONCAT(dt,":",p_return_uv))) AS p_return_uv_list_1h
+    FROM    (
+    SELECT  videoid
+    ,dt
+    ,SUM(lastonehour_view) AS view_uv
+    ,SUM(lastonehour_view_total) AS view_pv
+    ,SUM(lastonehour_play) AS play_uv
+    ,SUM(lastonehour_play_total) AS play_pv
+    ,SUM(lastonehour_share) AS share_uv
+    ,SUM(lastonehour_share_total) AS share_pv
+    ,SUM(lastonehour_return) AS return_uv
+    ,SUM(platform_return) AS p_return_uv
+    FROM    loghubods.video_each_hour_update_no_province_apptype
+    WHERE   dt <= '{}23'
+    AND     dt >= '{}00'
+    GROUP BY videoid
+    ,dt
+    )
+    GROUP BY videoid
+    '''.format(date, previous_date_str)
+    print("sql:" + sql)
+    records = execute_sql_from_odps(project=project, sql=sql)
+    video_list = []
+    with records.open_reader() as reader:
+        for record in reader:
+            video_id = record['videoid']
+            m = dict()
+            try:
+                m["view_uv_list_1h"] = record['view_uv_list_1h']
+            except Exception as e:
+                log_.error(e)
+            try:
+                m["view_pv_list_1h"] = record['view_pv_list_1h']
+            except Exception as e:
+                log_.error(e)
+            try:
+                m["play_uv_list_1h"] = record['play_uv_list_1h']
+            except Exception as e:
+                log_.error(e)
+            try:
+                m["play_pv_list_1h"] = record['play_pv_list_1h']
+            except Exception as e:
+                log_.error(e)
+            try:
+                m["share_uv_list_1h"] = record['share_uv_list_1h']
+            except Exception as e:
+                log_.error(e)
+            try:
+                m["share_pv_list_1h"] = record['share_pv_list_1h']
+            except Exception as e:
+                log_.error(e)
+            try:
+                m["return_uv_list_1h"] = record['return_uv_list_1h']
+            except Exception as e:
+                log_.error(e)
+            try:
+                m["p_return_uv_list_1h"] = record['p_return_uv_list_1h']
+            except Exception as e:
+                log_.error(e)
+            json_str = json.dumps(m)
+            video_list.append([video_id, json_str])
+    return video_list
+
+
+def h_timer_check():
+    try:
+        date = sys.argv[1]
+        hour = sys.argv[2]
+    except Exception as e:
+        now_date = datetime.today()
+        date = datetime.strftime(now_date, '%Y%m%d')
+        hour = datetime.now().hour
+        log_.info("没有读取到参数,采用系统时间,报错info:{}".format(e))
+    # 1 判断上游数据表是否生产完成
+    project = "loghubods"
+    table = "video_each_hour_update_no_province_apptype"
+    partition = str(date) + str(hour)
+    table_data_cnt = check_data(project, table, partition)
+    if table_data_cnt == 0:
+        log_.info("上游数据{}未就绪{},等待...".format(table, partition))
+        Timer(60, h_timer_check).start()
+    else:
+        log_.info("上游数据就绪,count={},开始读取数据表".format(table_data_cnt))
+        # 2 读取数据表 处理特征
+        previous_date_str = (datetime.strptime(date, "%Y%m%d") - timedelta(days=1)).strftime("%Y%m%d")
+        video_list = get_sql(date, previous_date_str, project)
+        # 3 写入redis
+        log_.info("video的数据量:{}".format(len(video_list)))
+        records_process_for_list(video_list, process_and_store, max_size=50, num_workers=8)
+
+        redis_helper.set_data_to_redis(REDIS_PREFIX + "partition", partition, 24 * 3600)
+
+
+
+
+if __name__ == '__main__':
+    log_.info("开始执行:" + datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
+    h_timer_check()
+    log_.info("完成执行:" + datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
+
+
+
+
+# cd /root/zhangbo/rov-offline
+# python alg_recsys_rank_item_realtime_1h.py 20240117 20

+ 20 - 0
alg_recsys_rank_item_realtime_1h_task.sh

@@ -0,0 +1,20 @@
+source /etc/profile
+echo $ROV_OFFLINE_ENV
+if [ ! -d "my_logs_feature" ]; then
+    mkdir my_logs_feature
+fi
+cur_time="`date +%Y%m%d`"
+cur_h="`date +%H`"
+echo "开始执行时间:{$(date "+%Y-%m-%d %H:%M:%S")}"
+
+if [[ $ROV_OFFLINE_ENV == 'test' ]]; then
+  /root/anaconda3/bin/python alg_recsys_rank_item_realtime_1h.py $cur_time $cur_h
+  echo "结束执行时间:{$(date "+%Y-%m-%d %H:%M:%S")}"
+  echo "all done"
+elif [[ $ROV_OFFLINE_ENV == 'pro' ]]; then
+  /root/anaconda3/bin/python alg_recsys_rank_item_realtime_1h.py $cur_time $cur_h
+  echo "结束执行时间:{$(date "+%Y-%m-%d %H:%M:%S")}"
+  echo "all done"
+fi
+
+#sh alg_recsys_rank_item_realtime_1h_task.sh

+ 68 - 7
alg_recsys_recall_1h_region.py

@@ -23,7 +23,9 @@ region_code = config_.REGION_CODE
 RULE_PARAMS = {
     'rule_params': {
         'rule66': {
-            'view_type': 'video-show-region', 'platform_return_rate': 0.001,
+            'view_type': 'video-show-region',
+            # 'score_func': '20240223',
+            # 'lastonehour_allreturn': "1",
             'region_24h_rule_key': 'rule66', '24h_rule_key': 'rule66'
         },
         'rule67': {
@@ -40,8 +42,8 @@ RULE_PARAMS = {
     'data_params': config_.DATA_PARAMS,
     'params_list': [
         # 532
-        # {'data': 'data66', 'rule': 'rule66'},  # 523-> 523 & 518
-        {'data': 'data66', 'rule': 'rule67'},  # 523->510
+        {'data': 'data66', 'rule': 'rule66'},  # 523-> 523 & 518
+        # {'data': 'data66', 'rule': 'rule67'},  # 523->510
         # {'data': 'data66', 'rule': 'rule68'},  # 523->514
         # {'data': 'data66', 'rule': 'rule69'},  # 523->518
     ],
@@ -76,6 +78,9 @@ features = [
     'lastthreehour_return_now_new',  # h-3分享,过去1小时回流人数(回流统计为对应地域分享带回的回流,分享限制地域,回流不限制地域)
     'lastthreehour_return_new',  # h-3分享,h-3回流人数(回流统计为对应地域分享带回的回流,分享限制地域,回流不限制地域)
     'platform_return_new',  # 平台分发回流(回流统计为对应地域分享带回的回流,分享限制地域,回流不限制地域)
+
+    'lastonehour_allreturn',
+    'lastonehour_allreturn_sharecnt'
 ]
 
 
@@ -102,9 +107,12 @@ def h_data_check(project, table, now_date):
 
     try:
         dt = datetime.datetime.strftime(now_date, '%Y%m%d%H')
+        # 测试 张博
         check_res = check_table_partition_exits(date=dt, project=project, table=table)
         if check_res:
-            sql = f'select * from {project}.{table} where dt = {dt}'
+            sql = f'select * from {project}.{table} where dt = "{dt}"'
+            print("zhangbo-sql-是否有数据")
+            print(sql)
             with odps.execute_sql(sql=sql).open_reader() as reader:
                 data_count = reader.count
         else:
@@ -144,7 +152,7 @@ def get_day_30day_videos(now_date, data_key, rule_key):
 def get_feature_data(project, table, now_date):
     """获取特征数据"""
     dt = datetime.datetime.strftime(now_date, '%Y%m%d%H')
-    # dt = '2022041310'
+    # 张博 测试
     records = get_data_from_odps(date=dt, project=project, table=table)
     feature_data = []
     for record in records:
@@ -156,6 +164,37 @@ def get_feature_data(project, table, now_date):
     return feature_df
 
 
+def cal_score_initial_20240223(df, param):
+    """
+    计算score
+    :param df: 特征数据
+    :param param: 规则参数
+    :return:
+    """
+    log_.info("进入了cal_score_initial_20240223")
+    df = df.fillna(0)
+    df['share_rate'] = df['lastonehour_share'] / (df['lastonehour_play'] + 1000)
+    df['back_rate'] = df['lastonehour_return'] / (df['lastonehour_share'] + 10)
+    df['back_rate_new'] = (df['lastonehour_return'] + 1) / (df['lastonehour_share'] + 10)
+    df['back_rate_all'] = df['lastonehour_allreturn'] / (df['lastonehour_allreturn_sharecnt'] + 10)
+    df['log_back'] = (df['lastonehour_return'] + 1).apply(math.log)
+    df['log_back_all'] = (df['lastonehour_allreturn'] + 1).apply(math.log)
+    if param.get('view_type', None) == 'video-show':
+        df['ctr'] = df['lastonehour_play'] / (df['lastonehour_show'] + 1000)
+    elif param.get('view_type', None) == 'video-show-region':
+        df['ctr'] = df['lastonehour_play'] / (df['lastonehour_show_region'] + 1000)
+    else:
+        df['ctr'] = df['lastonehour_play'] / (df['lastonehour_preview'] + 1000)
+    df['K2'] = df['ctr'].apply(lambda x: 0.6 if x > 0.6 else x)
+    df['platform_return_rate'] = df['platform_return'] / df['lastonehour_return']
+    df['score'] = df['share_rate'] * (
+        df['back_rate_new'] + 0.01 * df['back_rate_all']
+    ) * (
+            df['log_back'] + 0.01 * df['log_back_all']
+    ) * df['K2']
+    df = df.sort_values(by=['score'], ascending=False)
+    return df
+
 def cal_score_initial(df, param):
     """
     计算score
@@ -527,6 +566,8 @@ def cal_score(df, param):
             df = cal_score_with_back_rate_exponential_weighting2(df=df, param=param)
         elif param.get('score_func', None) == 'back_rate_rank_weighting':
             df = cal_score_with_back_rate_by_rank_weighting(df=df, param=param)
+        elif param.get('score_func', None) == '20240223':
+            df = cal_score_initial_20240223(df=df, param=param)
         else:
             df = cal_score_initial(df=df, param=param)
     return df
@@ -618,8 +659,28 @@ def video_rank(df, now_date, now_h, rule_key, param, region, data_key, rule_rank
     return_count = param.get('return_count', 1)
     score_value = param.get('score_rule', 0)
     platform_return_rate = param.get('platform_return_rate', 0)
-    h_recall_df = df[(df['lastonehour_return'] >= return_count) & (df['score'] >= score_value)
-                     & (df['platform_return_rate'] >= platform_return_rate)]
+    # h_recall_df = df[(df['lastonehour_return'] >= return_count) & (df['score'] >= score_value)
+    #                  & (df['platform_return_rate'] >= platform_return_rate)]
+    h_recall_df = df[
+        (df['lastonehour_return'] >= return_count) &
+        (df['score'] >= score_value) &
+        (df['platform_return_rate'] >= platform_return_rate)
+        ]
+    if "lastonehour_allreturn" in param.keys():
+        log_.info("采用 lastonehour_allreturn 过滤")
+        h_recall_df = df[
+            (df['lastonehour_allreturn'] > 0)
+        ]
+    # try:
+    #     if "return_countv2" in param.keys() and "platform_return_ratev2" in param.keys():
+    #         return_countv2 = param["return_countv2"]
+    #         platform_return_ratev2 = param["platform_return_ratev2"]
+    #         h_recall_df = h_recall_df[
+    #             df['platform_return_rate'] >= platform_return_ratev2 |
+    #             (df['platform_return_rate'] < platform_return_ratev2 & df['lastonehour_return'] > return_countv2)
+    #             ]
+    # except Exception as e:
+    #     log_.error("return_countv2 is wrong with{}".format(e))
 
     # videoid重复时,保留分值高
     h_recall_df = h_recall_df.sort_values(by=['score'], ascending=False)

+ 8 - 8
alg_recsys_recall_hour_region_task.sh

@@ -10,24 +10,24 @@ echo "开始执行时间:{$cur_time}-{$cur_h}"
 
 if [[ $ROV_OFFLINE_ENV == 'test' ]]; then
   cd /root/zhangbo/rov-offline
-  /root/anaconda3/bin/python alg_recsys_recall_1h_region.py &
-  /root/anaconda3/bin/python alg_recsys_recall_24h_noregion.py &
-  /root/anaconda3/bin/python alg_recsys_recall_24h_region.py &
+#  /root/anaconda3/bin/python alg_recsys_recall_1h_region.py &
+#  /root/anaconda3/bin/python alg_recsys_recall_24h_noregion.py &
+#  /root/anaconda3/bin/python alg_recsys_recall_24h_region.py &
   /root/anaconda3/bin/python alg_recsys_recall_1h_noregion.py
   wait
   echo "并行执行时间:{$(date "+%Y-%m-%d %H:%M:%S")}"
-  /root/anaconda3/bin/python alg_recsys_recall_aftermerge.py
+#  /root/anaconda3/bin/python alg_recsys_recall_aftermerge.py
   echo "结束执行时间:{$(date "+%Y-%m-%d %H:%M:%S")}"
   echo "all done"
 elif [[ $ROV_OFFLINE_ENV == 'pro' ]]; then
   cd /root/zhangbo/rov-offline
-  /root/anaconda3/bin/python alg_recsys_recall_1h_region.py &
-  /root/anaconda3/bin/python alg_recsys_recall_24h_noregion.py &
-  /root/anaconda3/bin/python alg_recsys_recall_24h_region.py &
+#  /root/anaconda3/bin/python alg_recsys_recall_1h_region.py &
+#  /root/anaconda3/bin/python alg_recsys_recall_24h_noregion.py &
+#  /root/anaconda3/bin/python alg_recsys_recall_24h_region.py &
   /root/anaconda3/bin/python alg_recsys_recall_1h_noregion.py
   wait
   echo "并行执行时间:{$(date "+%Y-%m-%d %H:%M:%S")}"
-  /root/anaconda3/bin/python alg_recsys_recall_aftermerge.py
+#  /root/anaconda3/bin/python alg_recsys_recall_aftermerge.py
   echo "结束执行时间:{$(date "+%Y-%m-%d %H:%M:%S")}"
   echo "all done"
 fi

+ 105 - 15
alg_recsys_recall_tags_videos.py

@@ -4,26 +4,43 @@ from config import set_config
 from log import Log
 from utils import execute_sql_from_odps
 from db_helper import RedisHelper
-import datetime
+from datetime import datetime, timedelta
+
 from alg_recsys_recall_4h_region_trend import records_process_for_list
 config_, _ = set_config()
 log_ = Log()
 redis_helper = RedisHelper()
 
-PROJECT = "videoods"
-TABLE = "videoods.dim_video"
+
 REDIS_PREFIX = "alg_recsys_video_tags_"
 
-TAG_SET = ['元旦','腊八节','小年','小年','除夕','春节','情人节','元宵节','龙抬头','妇女节','劳动节','母亲节',
+TAG_SET = set(['元旦','腊八节','小年','小年','除夕','春节','情人节','元宵节','龙抬头','妇女节','劳动节','母亲节',
     '儿童节','端午节','父亲节','建党节','七七事变','建军节','七夕节','中元节','中秋节','毛主席逝世',
     '国庆节','重阳节','感恩节','公祭日','平安夜','圣诞节','毛主席诞辰','小寒','大寒','立春','雨水',
     '惊蛰','春分','清明','谷雨','立夏','小满','芒种','夏至','小暑','大暑','立秋','处暑','白露','秋分',
     '寒露','霜降','立冬','小雪','大雪','冬至','早上好','中午好','下午好','晚上好','晚安','祝福',
-    'P1高风险','P0高风险'
-]
+    'P1高风险','P0高风险',
+"早上好","中午好","下午好","晚上好","晚安",
+        "祝福",
+"元旦","腊八节","小年","除夕","春节","情人节","元宵节","龙抬头","妇女节","劳动节","母亲节","儿童节","端午节","父亲节",
+        "建党节","七七事变","建军节","七夕节","中元节","中秋节","毛主席逝世","国庆节","重阳节","感恩节","公祭日","平安夜","圣诞节"
+        ,"毛主席诞辰",
+
+        "小寒","大寒","立春","雨水","惊蛰","春分","清明","谷雨","立夏","小满","芒种","夏至","小暑","大暑","立秋","处暑","白露",
+        "秋分","寒露","霜降","立冬","小雪","大雪","冬至",
+
+        "孙中山诞辰","孙中山逝世","周恩来诞辰","周恩来逝世","邓小平诞辰","邓小平逝世","李克强诞辰","李克强逝世","袁隆平诞辰",
+        "袁隆平逝世","彭德怀诞辰","彭德怀逝世","朱德诞辰","朱德逝世","吴尊友逝世",
+
+               "初一","初二","初三","初四","初五"
+
+
+])
 
 def get_video_tags():
     """获取视频的tag"""
+    PROJECT = "videoods"
+    TABLE = "videoods.dim_video"
     try:
         sql = "SELECT  videoid \
 ,tags \
@@ -91,6 +108,11 @@ OR      exploded_value = '晚安' \
 OR      exploded_value = '祝福' \
 OR      exploded_value = 'P1高风险' \
 OR      exploded_value = 'P0高风险' \
+OR      exploded_value = '初一' \
+OR      exploded_value = '初二' \
+OR      exploded_value = '初三' \
+OR      exploded_value = '初四' \
+OR      exploded_value = '初五' \
 )".format(TABLE)
         print("sql:"+sql)
         records = execute_sql_from_odps(project=PROJECT, sql=sql)
@@ -103,26 +125,94 @@ OR      exploded_value = 'P0高风险' \
                 d["video_id"] = video_id
                 d["tags"] = tags
                 video_tags_list.append(d)
-                log_.info("{}:{}".format(video_id, tags))
-        log_.info("video的数据量:{}".format(len(video_tags_list)))
-        records_process_for_list(video_tags_list, process_and_store, max_size=50, num_workers=8)
-        log_.info("video的数据量:{}".format(len(video_tags_list)))
-
+                # log_.info("{}:{}".format(video_id, tags))
+        log_.info("天级表:{}".format(str(len(video_tags_list))))
+        return video_tags_list
     except Exception as e:
         log_.error(str(e) + str(traceback.format_exc()))
+    return []
+def get_video_tags_v2():
+    PROJECT = "loghubods"
+    TABLE = "loghubods.automated_updates_category_labels_1"
+    # now_date = datetime.today()
+    # date = datetime.strftime(now_date, '%Y%m%d')
+    # previous_date = now_date - timedelta(days=1)
+    # previous_date_str = datetime.strftime(previous_date, '%Y%m%d')
 
+    # 获取当前日期
+    now_date = datetime.today()
+    # 获取当前月份
+    current_month = now_date.strftime('%Y%m')
+    # 获取上个月份
+    previous_month = (now_date - timedelta(days=now_date.day)).strftime('%Y%m')
+    try:
+        sql = '''SELECT  videoid
+        ,secondary_labels
+FROM    loghubods.automated_updates_category_labels_1
+WHERE   (
+            dt LIKE '{}%' OR dt LIKE '{}%'
+)
+'''.format(current_month, previous_month)
+        print("sql:" + sql)
+        records = execute_sql_from_odps(project=PROJECT, sql=sql)
+        video_tags_list = []
+        with records.open_reader() as reader:
+            for record in reader:
+                video_id = int(record['videoid'])
+                tags = ",".join([i for i in str(record['secondary_labels']).split(",") if i in TAG_SET])
+                d = {}
+                d["video_id"] = video_id
+                d["tags"] = tags
+                video_tags_list.append(d)
+                # log_.info("{}:{}".format(video_id, tags))
+        log_.info("增量表:{}".format(str(len(video_tags_list))))
+        return video_tags_list
+    except Exception as e:
+        log_.error(str(e) + str(traceback.format_exc()))
+    return []
 def process_and_store(row):
     video_id = row["video_id"]
     tags = row["tags"]
     key = REDIS_PREFIX + str(video_id)
     expire_time = 24 * 3600 * 2
     redis_helper.set_data_to_redis(key, tags, expire_time)
-    log_.info("video-tags写入数据key={},value={}".format(key, tags))
+    # log_.info("video-tags写入数据key={},value={}".format(key, tags))
 
 def main():
-    log_.info("开始执行:" + datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
-    get_video_tags()
-    log_.info("完成执行:" + datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
+    log_.info("开始执行:" + datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
+    video_tags_list = get_video_tags()
+    video_tags_list2 = get_video_tags_v2()
+
+
+    m1 = {}
+    m2 = {}
+    for row in video_tags_list:
+        m1[row["video_id"]] = row["tags"]
+    for row in video_tags_list2:
+        m2[row["video_id"]] = row["tags"]
+    merged_map = {}
+    merged_map.update(m1)
+    for key, value in m2.items():
+        if key in merged_map:
+            l1 = merged_map[key].split(",")
+            l2 = value.split(",")
+            l1.extend(l2)
+            l3 = list(set(l1))
+            merged_map[key] = ",".join(l3)
+        else:
+            merged_map[key] = value
+    result = []
+    for key, value in merged_map.items():
+        tmp = {}
+        tmp["video_id"] = key
+        tmp["tags"] = value
+        result.append(tmp)
+        log_.info("{}:{}".format(key, value))
+
+    log_.info("video的数据量:{}".format(len(result)))
+    records_process_for_list(result, process_and_store, max_size=50, num_workers=8)
+    log_.info("video的数据量:{}".format(len(result)))
+    log_.info("完成执行:" + datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
 
 if __name__ == '__main__':
     main()

+ 215 - 0
alg_recsys_recall_tags_videosv2.py

@@ -0,0 +1,215 @@
+# -*- coding: utf-8 -*-
+import traceback
+from config import set_config
+from log import Log
+from utils import execute_sql_from_odps
+from db_helper import RedisHelper
+from datetime import datetime, timedelta
+
+from alg_recsys_recall_4h_region_trend import records_process_for_list
+config_, _ = set_config()
+log_ = Log()
+redis_helper = RedisHelper()
+
+
+REDIS_PREFIX = "alg_recsys_video_tags_"
+
+TAG_SET = set(['元旦','腊八节','小年','小年','除夕','春节','情人节','元宵节','龙抬头','妇女节','劳动节','母亲节',
+    '儿童节','端午节','父亲节','建党节','七七事变','建军节','七夕节','中元节','中秋节','毛主席逝世',
+    '国庆节','重阳节','感恩节','公祭日','平安夜','圣诞节','毛主席诞辰','小寒','大寒','立春','雨水',
+    '惊蛰','春分','清明','谷雨','立夏','小满','芒种','夏至','小暑','大暑','立秋','处暑','白露','秋分',
+    '寒露','霜降','立冬','小雪','大雪','冬至','早上好','中午好','下午好','晚上好','晚安','祝福',
+    'P1高风险','P0高风险',
+"早上好","中午好","下午好","晚上好","晚安",
+        "祝福",
+"元旦","腊八节","小年","除夕","春节","情人节","元宵节","龙抬头","妇女节","劳动节","母亲节","儿童节","端午节","父亲节",
+        "建党节","七七事变","建军节","七夕节","中元节","中秋节","毛主席逝世","国庆节","重阳节","感恩节","公祭日","平安夜","圣诞节"
+        ,"毛主席诞辰",
+
+        "小寒","大寒","立春","雨水","惊蛰","春分","清明","谷雨","立夏","小满","芒种","夏至","小暑","大暑","立秋","处暑","白露",
+        "秋分","寒露","霜降","立冬","小雪","大雪","冬至",
+
+        "孙中山诞辰","孙中山逝世","周恩来诞辰","周恩来逝世","邓小平诞辰","邓小平逝世","李克强诞辰","李克强逝世","袁隆平诞辰",
+        "袁隆平逝世","彭德怀诞辰","彭德怀逝世","朱德诞辰","朱德逝世","吴尊友逝世"
+])
+
+def get_video_tags():
+    """获取视频的tag"""
+    PROJECT = "videoods"
+    TABLE = "videoods.dim_video"
+    try:
+        sql = "SELECT  videoid \
+,tags \
+FROM    {} \
+LATERAL VIEW EXPLODE(SPLIT(tags,',')) exploded AS exploded_value \
+WHERE   tags IS NOT NULL \
+AND     ( \
+exploded_value = '元旦' \
+OR      exploded_value = '腊八节' \
+OR      exploded_value = '小年' \
+OR      exploded_value = '除夕' \
+OR      exploded_value = '春节' \
+OR      exploded_value = '情人节' \
+OR      exploded_value = '元宵节' \
+OR      exploded_value = '龙抬头' \
+OR      exploded_value = '妇女节' \
+OR      exploded_value = '劳动节' \
+OR      exploded_value = '母亲节' \
+OR      exploded_value = '儿童节' \
+OR      exploded_value = '端午节' \
+OR      exploded_value = '父亲节' \
+OR      exploded_value = '建党节' \
+OR      exploded_value = '七七事变' \
+OR      exploded_value = '建军节' \
+OR      exploded_value = '七夕节' \
+OR      exploded_value = '中元节' \
+OR      exploded_value = '中秋节' \
+OR      exploded_value = '毛主席逝世' \
+OR      exploded_value = '国庆节' \
+OR      exploded_value = '重阳节' \
+OR      exploded_value = '感恩节' \
+OR      exploded_value = '公祭日' \
+OR      exploded_value = '平安夜' \
+OR      exploded_value = '圣诞节' \
+OR      exploded_value = '毛主席诞辰' \
+OR      exploded_value = '小寒' \
+OR      exploded_value = '大寒' \
+OR      exploded_value = '立春' \
+OR      exploded_value = '雨水' \
+OR      exploded_value = '惊蛰' \
+OR      exploded_value = '春分' \
+OR      exploded_value = '清明' \
+OR      exploded_value = '谷雨' \
+OR      exploded_value = '立夏' \
+OR      exploded_value = '小满' \
+OR      exploded_value = '芒种' \
+OR      exploded_value = '夏至' \
+OR      exploded_value = '小暑' \
+OR      exploded_value = '大暑' \
+OR      exploded_value = '立秋' \
+OR      exploded_value = '处暑' \
+OR      exploded_value = '白露' \
+OR      exploded_value = '秋分' \
+OR      exploded_value = '寒露' \
+OR      exploded_value = '霜降' \
+OR      exploded_value = '立冬' \
+OR      exploded_value = '小雪' \
+OR      exploded_value = '大雪' \
+OR      exploded_value = '冬至' \
+OR      exploded_value = '早上好' \
+OR      exploded_value = '中午好' \
+OR      exploded_value = '下午好' \
+OR      exploded_value = '晚上好' \
+OR      exploded_value = '晚安' \
+OR      exploded_value = '祝福' \
+OR      exploded_value = 'P1高风险' \
+OR      exploded_value = 'P0高风险' \
+)".format(TABLE)
+        print("sql:"+sql)
+        records = execute_sql_from_odps(project=PROJECT, sql=sql)
+        video_tags_list = []
+        with records.open_reader() as reader:
+            for record in reader:
+                video_id = int(record['videoid'])
+                tags = ",".join([i for i in str(record['tags']).split(",") if i in TAG_SET])
+                d = {}
+                d["video_id"] = video_id
+                d["tags"] = tags
+                video_tags_list.append(d)
+                # log_.info("{}:{}".format(video_id, tags))
+        log_.info("天级表:{}".format(str(len(video_tags_list))))
+        return video_tags_list
+    except Exception as e:
+        log_.error(str(e) + str(traceback.format_exc()))
+    return []
+def get_video_tags_v2():
+    PROJECT = "loghubods"
+    TABLE = "loghubods.automated_updates_category_labels_1"
+    # now_date = datetime.today()
+    # date = datetime.strftime(now_date, '%Y%m%d')
+    # previous_date = now_date - timedelta(days=1)
+    # previous_date_str = datetime.strftime(previous_date, '%Y%m%d')
+
+    # 获取当前日期
+    now_date = datetime.today()
+    # 获取当前月份
+    current_month = now_date.strftime('%Y%m')
+    # 获取上个月份
+    previous_month = (now_date - timedelta(days=now_date.day)).strftime('%Y%m')
+    try:
+        sql = '''SELECT  videoid
+        ,secondary_labels
+FROM    loghubods.automated_updates_category_labels_1
+WHERE   (
+            dt LIKE '{}%' OR dt LIKE '{}%'
+)
+'''.format(current_month, previous_month)
+        print("sql:" + sql)
+        records = execute_sql_from_odps(project=PROJECT, sql=sql)
+        video_tags_list = []
+        with records.open_reader() as reader:
+            for record in reader:
+                video_id = int(record['videoid'])
+                tags = ",".join([i for i in str(record['secondary_labels']).split(",") if i in TAG_SET])
+                d = {}
+                d["video_id"] = video_id
+                d["tags"] = tags
+                video_tags_list.append(d)
+                # log_.info("{}:{}".format(video_id, tags))
+        log_.info("增量表:{}".format(str(len(video_tags_list))))
+        return video_tags_list
+    except Exception as e:
+        log_.error(str(e) + str(traceback.format_exc()))
+    return []
+def process_and_store(row):
+    video_id = row["video_id"]
+    tags = row["tags"]
+    key = REDIS_PREFIX + str(video_id)
+    expire_time = 24 * 3600 * 2
+    redis_helper.set_data_to_redis(key, tags, expire_time)
+    # log_.info("video-tags写入数据key={},value={}".format(key, tags))
+
+def main():
+    log_.info("开始执行:" + datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
+    video_tags_list = get_video_tags()
+    video_tags_list2 = get_video_tags_v2()
+
+
+    m1 = {}
+    m2 = {}
+    for row in video_tags_list:
+        m1[row["video_id"]] = row["tags"]
+    for row in video_tags_list2:
+        m2[row["video_id"]] = row["tags"]
+    merged_map = {}
+    merged_map.update(m1)
+    for key, value in m2.items():
+        if key in merged_map:
+            l1 = merged_map[key].split(",")
+            l2 = value.split(",")
+            l1.extend(l2)
+            l3 = list(set(l1))
+            merged_map[key] = ",".join(l3)
+        else:
+            merged_map[key] = value
+    result = []
+    for key, value in merged_map.items():
+        tmp = {}
+        tmp["video_id"] = key
+        tmp["tags"] = value
+        result.append(tmp)
+        log_.info("{}:{}".format(key, value))
+
+    log_.info("video的数据量:{}".format(len(result)))
+    records_process_for_list(result, process_and_store, max_size=50, num_workers=8)
+    log_.info("video的数据量:{}".format(len(result)))
+    log_.info("完成执行:" + datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
+
+if __name__ == '__main__':
+    main()
+
+
+
+
+# cd /root/zhangbo/rov-offline
+# python alg_recsys_recall_shield_videos.py

+ 70 - 0
alg_recsys_recall_undertake.py

@@ -0,0 +1,70 @@
+# -*- coding: utf-8 -*-
+import traceback
+from config import set_config
+from log import Log
+from utils import execute_sql_from_odps
+from db_helper import RedisHelper
+from datetime import datetime, timedelta
+
+from alg_recsys_recall_4h_region_trend import records_process_for_list
+config_, _ = set_config()
+log_ = Log()
+redis_helper = RedisHelper()
+
+
+
+def main():
+    log_.info("开始执行:" + datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
+    #1 读取当前时间及上一时间
+    now_date = datetime.today() # - timedelta(hours=1)
+    hour = datetime.strftime(now_date, '%H').lstrip('0')
+    date = datetime.strftime(now_date, '%Y%m%d')
+    if hour == "":
+        hour = "0"
+    previous_date = now_date + timedelta(hours=1)
+    hour_next = datetime.strftime(previous_date, '%H').lstrip('0')
+    date_next = datetime.strftime(previous_date, '%Y%m%d')
+    if hour_next == "":
+        hour_next = "0"
+    #2 循环拼接key,读一个写一个。
+    REGION_CODE = {
+        '北京': '110000', '天津': '120000', '河北省': '130000', '山西省': '140000', '内蒙古': '150000',
+        '辽宁省': '210000', '吉林省': '220000', '黑龙江省': '230000',
+        '上海': '310000', '江苏省': '320000', '浙江省': '330000', '安徽省': '340000', '福建省': '350000',
+        '江西省': '360000',
+        '山东省': '370000',
+        '河南省': '410000', '湖北省': '420000', '湖南省': '430000', '广东省': '440000', '广西': '450000',
+        '海南省': '460000',
+        '重庆': '500000', '四川省': '510000', '贵州省': '520000', '云南省': '530000', '西藏': '540000',
+        '陕西省': '610000', '甘肃省': '620000', '青海省': '630000', '宁夏': '640000', '新疆': '650000',
+        '台湾省': '710000', '香港': '810000', '澳门': '820000',
+
+        '广州': '440100', '深圳': '440300', '成都': '510100', '长沙': '430100',
+    }
+    keys = []
+    for _, code in REGION_CODE.items():
+        read_key = "recall:item:score:region:h:{}:data66:rule66:{}:{}".format(code, date, hour)
+        keys.append(read_key)
+        values = redis_helper.get_data_zset_with_index(read_key, 0, -1, True, True)
+        print(str(read_key))
+        print(str(values))
+        if values and len(values) > 0:
+            new_kvs = {}
+            for k, v in values:
+                new_kvs[k] = v
+            write_key = "recall:item:score:region:h:{}:data66:rule66:{}:{}".format(code, date_next, hour_next)
+            redis_helper.add_data_with_zset(write_key, new_kvs, expire_time = 1*24*3600)
+            print(str(write_key))
+
+
+    # records_process_for_list(result, process_and_store, max_size=50, num_workers=8)
+    log_.info("完成执行:" + datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
+
+if __name__ == '__main__':
+    main()
+
+
+
+
+# cd /root/zhangbo/rov-offline
+# python alg_recsys_recall_undertake.py

+ 22 - 0
alg_recsys_recall_undertake_task.sh

@@ -0,0 +1,22 @@
+source /etc/profile
+echo $ROV_OFFLINE_ENV
+if [ ! -d "my_logs_undertask" ]; then
+    # 如果文件夹不存在,则创建文件夹
+    mkdir my_logs_undertask
+fi
+cur_time="`date +%Y%m%d`"
+cur_h="`date +%H`"
+echo "开始执行时间:{$cur_time}-{$cur_h}"
+
+if [[ $ROV_OFFLINE_ENV == 'test' ]]; then
+  cd /root/zhangbo/rov-offline
+  /root/anaconda3/bin/python alg_recsys_recall_undertake.py
+  echo "all done"
+elif [[ $ROV_OFFLINE_ENV == 'pro' ]]; then
+  cd /root/zhangbo/rov-offline
+  /root/anaconda3/bin/python alg_recsys_recall_undertake.py
+  echo "all done"
+fi
+cur_time="`date +%Y%m%d`"
+cur_h="`date +%H`"
+echo "结束执行时间:{$cur_time}-{$cur_h}"

+ 84 - 0
alg_recsys_utils.py

@@ -0,0 +1,84 @@
+# -*- coding: utf-8 -*-
+from odps import ODPS
+import argparse
+
+ODPS_CONFIG = {
+        'ENDPOINT': 'http://service.cn.maxcompute.aliyun.com/api',
+        'ACCESSID': 'LTAIWYUujJAm7CbH',
+        'ACCESSKEY': 'RfSjdiWwED1sGFlsjXv0DlfTnZTG1P',
+}
+
+def check_data(project, table, partition) -> int:
+    """检查数据是否准备好,输出数据条数"""
+    odps = ODPS(
+        access_id=ODPS_CONFIG['ACCESSID'],
+        secret_access_key=ODPS_CONFIG['ACCESSKEY'],
+        project=project,
+        endpoint=ODPS_CONFIG['ENDPOINT'],
+        connect_timeout=3000,
+        read_timeout=500000,
+        pool_maxsize=1000,
+        pool_connections=1000
+    )
+    try:
+        t = odps.get_table(name=table)
+        check_res = t.exist_partition(partition_spec=f'dt={partition}')
+        if check_res:
+            sql = f'select * from {project}.{table} where dt = {partition}'
+            with odps.execute_sql(sql=sql).open_reader() as reader:
+                data_count = reader.count
+        else:
+            data_count = 0
+    except Exception as e:
+        print("error:" + str(e))
+        data_count = 0
+    return data_count
+
+
+def check_origin_hive(args):
+    project = "loghubods"
+    table = "alg_recsys_view_sample_v2"
+    partition = args.partition
+    count = check_data(project, table, partition)
+    if count == 0:
+        print("1")
+        exit(1)
+    else:
+        print("0")
+def check_item_hive(args):
+    project = "loghubods"
+    table = "alg_recsys_video_info"
+    partition = args.partition
+    count = check_data(project, table, partition)
+    if count == 0:
+        print("1")
+        exit(1)
+    else:
+        print("0")
+def check_user_hive(args):
+    project = "loghubods"
+    table = "alg_recsys_user_info"
+    partition = args.partition
+    count = check_data(project, table, partition)
+    if count == 0:
+        print("1")
+        exit(1)
+    else:
+        print("0")
+
+
+if __name__ == '__main__':
+    parser = argparse.ArgumentParser(description='脚本utils')
+    parser.add_argument('--excute_program', type=str, help='执行程序')
+    parser.add_argument('--partition', type=str, help='表分区')
+    args = parser.parse_args()
+    if args.excute_program == "check_origin_hive":
+        check_origin_hive(args)
+    if args.excute_program == "check_item_hive":
+        check_item_hive(args)
+    if args.excute_program == "check_user_hive":
+        check_user_hive(args)
+    else:
+        print("无合法参数,验证失败。")
+        exit(999)
+

+ 20 - 3
config.py

@@ -419,7 +419,11 @@ class BaseConfig(object):
                        'region_24h_rule_key': 'rule4', '24h_rule_key': 'rule4', 'merge_func': 2,
                        'score_func': 'back_rate_rank_weighting'},
             'rule66': {
-                'view_type': 'video-show-region', 'platform_return_rate': 0.001,
+                # 'view_type': 'video-show-region', 'platform_return_rate': 0.001,
+                # 'view_type': 'video-show-region', "return_countv2": 1, 'platform_return_ratev2': 0.001,
+                'view_type': 'video-show-region',
+                # 'score_func': '20240223',
+                # 'lastonehour_allreturn': "1",
                 'region_24h_rule_key': 'rule66', '24h_rule_key': 'rule66'
             },
             'rule68': {
@@ -2390,7 +2394,13 @@ class DevelopmentConfig(BaseConfig):
 
     # 测试环境mysql地址
     MYSQL_INFO = {
-        'host': 'rm-bp1k5853td1r25g3n690.mysql.rds.aliyuncs.com',
+        # 'host': 'rm-bp1k5853td1r25g3n690.mysql.rds.aliyuncs.com',
+        # 'port': 3306,
+        # 'user': 'wx2016_longvideo',
+        # 'password': 'wx2016_longvideoP@assword1234',
+        # 'db': 'longvideo',
+        # 'charset': 'utf8'
+        'host': 'rr-bp1x9785e8h5452bi157.mysql.rds.aliyuncs.com',
         'port': 3306,
         'user': 'wx2016_longvideo',
         'password': 'wx2016_longvideoP@assword1234',
@@ -2400,7 +2410,14 @@ class DevelopmentConfig(BaseConfig):
 
     # 测试环境 过滤用mysql地址
     FILTER_MYSQL_INFO = {
-        'host': 'am-bp1g3ys9u00u483uc131930.ads.aliyuncs.com',
+        # 'host': 'am-bp1g3ys9u00u483uc131930.ads.aliyuncs.com',
+        # 'port': 3306,
+        # 'user': 'lv_manager',
+        # 'password': 'lv_manager@2020',
+        # 'db': 'longvideo',
+        # 'charset': 'utf8'
+
+       'host': 'am-bp15tqt957i3b3sgi131950.ads.aliyuncs.com',
         'port': 3306,
         'user': 'lv_manager',
         'password': 'lv_manager@2020',

+ 77 - 33
region_rule_rank_h_v2.py

@@ -59,6 +59,9 @@ features = [
     'lastthreehour_return_now_new',  # h-3分享,过去1小时回流人数(回流统计为对应地域分享带回的回流,分享限制地域,回流不限制地域)
     'lastthreehour_return_new',  # h-3分享,h-3回流人数(回流统计为对应地域分享带回的回流,分享限制地域,回流不限制地域)
     'platform_return_new',  # 平台分发回流(回流统计为对应地域分享带回的回流,分享限制地域,回流不限制地域)
+
+    'lastonehour_allreturn',
+    'lastonehour_allreturn_sharecnt'
 ]
 
 
@@ -85,9 +88,12 @@ def h_data_check(project, table, now_date):
 
     try:
         dt = datetime.datetime.strftime(now_date, '%Y%m%d%H')
+        # 测试 张博
         check_res = check_table_partition_exits(date=dt, project=project, table=table)
         if check_res:
-            sql = f'select * from {project}.{table} where dt = {dt}'
+            sql = f'select * from {project}.{table} where dt = "{dt}"'
+            print("zhangbo-sql-是否有数据")
+            print(sql)
             with odps.execute_sql(sql=sql).open_reader() as reader:
                 data_count = reader.count
         else:
@@ -127,7 +133,7 @@ def get_day_30day_videos(now_date, data_key, rule_key):
 def get_feature_data(project, table, now_date):
     """获取特征数据"""
     dt = datetime.datetime.strftime(now_date, '%Y%m%d%H')
-    # dt = '2022041310'
+    # 张博 测试
     records = get_data_from_odps(date=dt, project=project, table=table)
     feature_data = []
     for record in records:
@@ -139,6 +145,37 @@ def get_feature_data(project, table, now_date):
     return feature_df
 
 
+def cal_score_initial_20240223(df, param):
+    """
+    计算score
+    :param df: 特征数据
+    :param param: 规则参数
+    :return:
+    """
+    log_.info("进入了cal_score_initial_20240223")
+    df = df.fillna(0)
+    df['share_rate'] = df['lastonehour_share'] / (df['lastonehour_play'] + 1000)
+    df['back_rate'] = df['lastonehour_return'] / (df['lastonehour_share'] + 10)
+    df['back_rate_new'] = (df['lastonehour_return'] + 1) / (df['lastonehour_share'] + 10)
+    df['back_rate_all'] = df['lastonehour_allreturn'] / (df['lastonehour_allreturn_sharecnt'] + 10)
+    df['log_back'] = (df['lastonehour_return'] + 1).apply(math.log)
+    df['log_back_all'] = (df['lastonehour_allreturn'] + 1).apply(math.log)
+    if param.get('view_type', None) == 'video-show':
+        df['ctr'] = df['lastonehour_play'] / (df['lastonehour_show'] + 1000)
+    elif param.get('view_type', None) == 'video-show-region':
+        df['ctr'] = df['lastonehour_play'] / (df['lastonehour_show_region'] + 1000)
+    else:
+        df['ctr'] = df['lastonehour_play'] / (df['lastonehour_preview'] + 1000)
+    df['K2'] = df['ctr'].apply(lambda x: 0.6 if x > 0.6 else x)
+    df['platform_return_rate'] = df['platform_return'] / df['lastonehour_return']
+    df['score'] = df['share_rate'] * (
+        df['back_rate_new'] + 0.01 * df['back_rate_all']
+    ) * (
+            df['log_back'] + 0.01 * df['log_back_all']
+    ) * df['K2']
+    df = df.sort_values(by=['score'], ascending=False)
+    return df
+
 def cal_score_initial(df, param):
     """
     计算score
@@ -510,6 +547,8 @@ def cal_score(df, param):
             df = cal_score_with_back_rate_exponential_weighting2(df=df, param=param)
         elif param.get('score_func', None) == 'back_rate_rank_weighting':
             df = cal_score_with_back_rate_by_rank_weighting(df=df, param=param)
+        elif param.get('score_func', None) == '20240223':
+            df = cal_score_initial_20240223(df=df, param=param)
         else:
             df = cal_score_initial(df=df, param=param)
     return df
@@ -601,57 +640,60 @@ def video_rank(df, now_date, now_h, rule_key, param, region, data_key, rule_rank
     return_count = param.get('return_count', 1)
     score_value = param.get('score_rule', 0)
     platform_return_rate = param.get('platform_return_rate', 0)
+    # h_recall_df = df[(df['lastonehour_return'] >= return_count) & (df['score'] >= score_value)
+    #                  & (df['platform_return_rate'] >= platform_return_rate)]
     h_recall_df = df[
-                        (df['lastonehour_return'] >= return_count) &
-                        (df['score'] >= score_value) &
-                        (df['platform_return_rate'] >= platform_return_rate)
-                     ]
-    try:
-        if "return_countv2" in param.keys() and "platform_return_ratev2" in param.keys():
-            return_countv2 = param["return_countv2"]
-            platform_return_ratev2 = param["platform_return_ratev2"]
-            h_recall_df = h_recall_df[
-                df['platform_return_rate'] >= platform_return_ratev2 |
-                (df['platform_return_rate'] < platform_return_ratev2 & df['lastonehour_return'] > return_countv2)
+        (df['lastonehour_return'] >= return_count) &
+        (df['score'] >= score_value) &
+        (df['platform_return_rate'] >= platform_return_rate)
+        ]
+    if "lastonehour_allreturn" in param.keys():
+        log_.info("采用 lastonehour_allreturn 过滤")
+        h_recall_df = df[
+            (df['lastonehour_allreturn'] > 0)
             ]
-    except Exception as e:
-        log_.error("return_countv2 is wrong with{}".format(e))
-
+    # try:
+    #     if "return_countv2" in param.keys() and "platform_return_ratev2" in param.keys():
+    #         return_countv2 = param["return_countv2"]
+    #         platform_return_ratev2 = param["platform_return_ratev2"]
+    #         h_recall_df = h_recall_df[
+    #             df['platform_return_rate'] >= platform_return_ratev2 |
+    #             (df['platform_return_rate'] < platform_return_ratev2 & df['lastonehour_return'] > return_countv2)
+    #             ]
+    # except Exception as e:
+    #     log_.error("return_countv2 is wrong with{}".format(e))
 
     # videoid重复时,保留分值高
     h_recall_df = h_recall_df.sort_values(by=['score'], ascending=False)
     h_recall_df = h_recall_df.drop_duplicates(subset=['videoid'], keep='first')
     h_recall_df['videoid'] = h_recall_df['videoid'].astype(int)
 
+    log_.info(f"各种规则过滤后,一共有多少个视频 = {len(h_recall_df)}")
     # 增加打捞的优质视频
     if add_videos_with_pre_h is True:
         add_func = param.get('add_func', None)
         h_recall_df = add_videos(initial_df=h_recall_df, now_date=now_date, rule_key=rule_key,
                                  region=region, data_key=data_key, hour_count=hour_count, top=10, add_func=add_func)
-
+        log_.info(f"打捞优质视频完成")
     h_recall_videos = h_recall_df['videoid'].to_list()
-    # log_.info(f'h_recall videos count = {len(h_recall_videos)}')
-
+    log_.info(f"各种规则增加后,一共有多少个视频 = {len(h_recall_videos)}")
     # 视频状态过滤
     if data_key in ['data7', ]:
         filtered_videos = filter_video_status_app(h_recall_videos)
     else:
         filtered_videos = filter_video_status(h_recall_videos)
-    # log_.info('filtered_videos count = {}'.format(len(filtered_videos)))
 
     # 屏蔽视频过滤
     shield_config = param.get('shield_config', config_.SHIELD_CONFIG)
     shield_key_name_list = shield_config.get(region, None)
     if shield_key_name_list is not None:
         filtered_videos = filter_shield_video(video_ids=filtered_videos, shield_key_name_list=shield_key_name_list)
-        # log_.info(f"shield filtered_videos count = {len(filtered_videos)}")
 
     # 涉政视频过滤
     political_filter = param.get('political_filter', None)
     if political_filter is True:
-        # log_.info(f"political filter videos count = {len(filtered_videos)}")
         filtered_videos = filter_political_videos(video_ids=filtered_videos)
-        # log_.info(f"political filtered videos count = {len(filtered_videos)}")
+    log_.info(f"视频状态-涉政等-过滤后,一共有多少个视频 = {len(filtered_videos)}")
 
     # 写入对应的redis
     h_video_ids = []
@@ -673,8 +715,9 @@ def video_rank(df, now_date, now_h, rule_key, param, region, data_key, rule_rank
     h_recall_key_name = \
         f"{config_.RECALL_KEY_NAME_PREFIX_REGION_BY_H}{region}:{data_key}:{rule_key}:" \
         f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
+    log_.info("打印地域1小时的某个地域{},redis key:{}".format(region, h_recall_key_name))
     if len(h_recall_result) > 0:
-        # log_.info(f"h_recall_result count = {len(h_recall_result)}")
+        log_.info(f"开始写入头部数据:count = {len(h_recall_result)}, key = {h_recall_key_name}")
         redis_helper.add_data_with_zset(key_name=h_recall_key_name, data=h_recall_result, expire_time=2 * 24 * 3600)
         # 限流视频score调整
         update_limit_video_score(initial_videos=h_recall_result, key_name=h_recall_key_name)
@@ -717,15 +760,14 @@ def merge_df_with_score(df_left, df_right):
 
 def process_with_region(region, df_merged, data_key, rule_key, rule_param, now_date, now_h,
                         rule_rank_h_flag, add_videos_with_pre_h, hour_count):
-    log_.info(f"region = {region} start...")
-    # 计算score
+    log_.info(f"多协程的region = {region} 开始执行")
     region_df = df_merged[df_merged['code'] == region]
-    log_.info(f'region = {region}, region_df count = {len(region_df)}')
+    log_.info(f'该区域region = {region}, 下有多少数据量 = {len(region_df)}')
     score_df = cal_score(df=region_df, param=rule_param)
     video_rank(df=score_df, now_date=now_date, now_h=now_h, rule_key=rule_key, param=rule_param,
                region=region, data_key=data_key, rule_rank_h_flag=rule_rank_h_flag,
                add_videos_with_pre_h=add_videos_with_pre_h, hour_count=hour_count)
-    log_.info(f"region = {region} end!")
+    log_.info(f"多协程的region = {region} 完成执行")
 
 
 def process_with_region2(region, df_merged, data_key, rule_key, rule_param, now_date, now_h,
@@ -805,11 +847,11 @@ def process_with_param(param, data_params_item, rule_params_item, region_code_li
 
     data_key = param.get('data')
     data_param = data_params_item.get(data_key)
-    log_.info(f"data_key = {data_key}, data_param = {data_param}")
     rule_key = param.get('rule')
     rule_param = rule_params_item.get(rule_key)
-    log_.info(f"rule_key = {rule_key}, rule_param = {rule_param}")
     merge_func = rule_param.get('merge_func', None)
+    log_.info("数据采用:{},统计采用{}.".format(data_key, rule_key))
+    log_.info("具体的规则是:{}.".format(rule_param))
     # 是否在地域小时级数据中增加打捞的优质视频
     add_videos_with_pre_h = rule_param.get('add_videos_with_pre_h', False)
     hour_count = rule_param.get('hour_count', 0)
@@ -1077,11 +1119,12 @@ def h_timer_check():
         table = config_.TABLE_REGION_APP_TYPE
         region_code_list = [code for region, code in region_code.items()]
         now_date = datetime.datetime.today()
-        log_.info(f"now_date: {datetime.datetime.strftime(now_date, '%Y%m%d%H')}, rule_rank_h_flag: {rule_rank_h_flag}")
+        log_.info(f"开始执行: {datetime.datetime.strftime(now_date, '%Y%m%d%H')}")
         now_h = datetime.datetime.now().hour
         now_min = datetime.datetime.now().minute
         redis_helper = RedisHelper()
         if now_h == 0:
+            log_.info("当前时间{}小时,使用bottom的data,开始。".format(now_h))
             h_rank_bottom(now_date=now_date, now_h=now_h, rule_params=rule_params, region_code_list=region_code_list,
                           rule_rank_h_flag=rule_rank_h_flag)
             log_.info(f"region_h_data end!")
@@ -1094,7 +1137,7 @@ def h_timer_check():
         # 查看当前小时更新的数据是否已准备好
         h_data_count = h_data_check(project=project, table=table, now_date=now_date)
         if h_data_count > 0:
-            log_.info(f'region_h_data_count = {h_data_count}')
+            log_.info('上游数据表查询数据条数 h_data_count = {},开始计算。'.format(h_data_count))
             # 数据准备好,进行更新
             rank_by_h(now_date=now_date, now_h=now_h, rule_params=rule_params,
                       project=project, table=table, region_code_list=region_code_list, rule_rank_h_flag=rule_rank_h_flag)
@@ -1116,6 +1159,7 @@ def h_timer_check():
             log_.info(f"region_h_data status update to '1' finished!")
         else:
             # 数据没准备好,1分钟后重新检查
+            log_.info("上游数据未就绪,等待...")
             Timer(60, h_timer_check).start()
 
     except Exception as e:
@@ -1130,5 +1174,5 @@ def h_timer_check():
 
 
 if __name__ == '__main__':
-    log_.info(f"region_h_data start...")
+    log_.info(f"region-rule-rank-h-v2 start...")
     h_timer_check()