Bläddra i källkod

update flowpool data

liqian 1 år sedan
förälder
incheckning
7836869106

+ 2 - 0
config.py

@@ -630,6 +630,8 @@ class BaseConfig(object):
     # appType = 6, ROV召回池redis key前缀,完整格式:com.weiqu.video.recall.hot.apptype.h.item.score.{appType}.{h}
     # RECALL_KEY_NAME_PREFIX_APP_TYPE = 'com.weiqu.video.recall.hot.apptype.h.item.score.'
 
+    # 流量池分发实验配置(对照组与实验组划分)存放 redis key
+    FLOWPOOL_ABTEST_KEY_NAME = 'flow:pool:abtest:config'
     # 流量池离线模型结果存放 redis key前缀,完整格式 flow:pool:item:score:{appType}
     FLOWPOOL_KEY_NAME_PREFIX = 'flow:pool:item:score:'
     # 快速曝光流量池数据存放 redis key前缀,完整格式 flow:pool:quick:item:score:{appType}:{flowPool_id}

+ 4 - 4
flow_pool_task.sh

@@ -1,11 +1,11 @@
 source /etc/profile
 echo $ROV_OFFLINE_ENV
 if [[ $ROV_OFFLINE_ENV == 'test' ]]; then
-#    cd /data2/rov-offline && /root/anaconda3/bin/python /data2/rov-offline/pool_predict.py
+    cd /data2/rov-offline && /root/anaconda3/bin/python /data2/rov-offline/pool_predict.py &
 #    cd /data2/rov-offline && /root/anaconda3/bin/python /data2/rov-offline/flowpool_data_update.py
-    cd /data2/rov-offline && /root/anaconda3/bin/python /data2/rov-offline/flowpool_data_update_with_level.py
+#    cd /data2/rov-offline && /root/anaconda3/bin/python /data2/rov-offline/flowpool_data_update_with_level.py
 elif [[ $ROV_OFFLINE_ENV == 'pro' ]]; then
-#    cd /data/rov-offline && /root/anaconda3/bin/python /data/rov-offline/pool_predict.py
+    cd /data/rov-offline && /root/anaconda3/bin/python /data/rov-offline/pool_predict.py
 #    cd /data/rov-offline && /root/anaconda3/bin/python /data/rov-offline/flowpool_data_update.py
-    cd /data/rov-offline && /root/anaconda3/bin/python /data/rov-offline/flowpool_data_update_with_level.py
+#    cd /data/rov-offline && /root/anaconda3/bin/python /data/rov-offline/flowpool_data_update_with_level.py
 fi

+ 23 - 0
flowpool_abtest_config_update.py

@@ -0,0 +1,23 @@
+# -*- coding: utf-8 -*-
+# @ModuleName: flowpool_abtest_config_update
+# @Author: Liqian
+# @Time: 2023/9/23 13:31
+# @Software: PyCharm
+import json
+from config import set_config
+from log import Log
+from db_helper import RedisHelper
+
+config_, _ = set_config()
+log_ = Log()
+
+flow_pool_abtest_config = {
+    'control_group': [8, 10, 12, 14, 16],
+    'experimental_flow_set_level': [7, 9, 11, 13, 15]
+}
+log_.info(f"flow_pool_abtest_config: {flow_pool_abtest_config}")
+redis_helper = RedisHelper()
+redis_helper.set_data_to_redis(
+    key_name=config_.FLOWPOOL_ABTEST_KEY_NAME, value=json.dumps(flow_pool_abtest_config), expire_time=365 * 24 * 3600
+)
+log_.info(f"flow_pool_abtest_config update finished!")

+ 17 - 2
flowpool_data_update_with_level.py

@@ -4,6 +4,7 @@ import time
 import os
 import traceback
 import random
+import json
 
 from config import set_config
 from utils import request_post, filter_video_status, send_msg_to_feishu, filter_video_status_app, \
@@ -142,7 +143,7 @@ def online_flow_pool_data_to_redis(app_type, video_ids_set, video_info_data):
                                                        expire_time=3600)
 
 
-def get_flow_pool_data(app_type, video_info_list):
+def get_flow_pool_data(app_type, video_info_list, flow_pool_id_list):
     """
     获取流量池可分发视频,并将结果上传Redis
     :param app_type: 产品标识 type-int
@@ -161,6 +162,10 @@ def get_flow_pool_data(app_type, video_info_list):
         log_.info('流量池中视频数:{}'.format(len(videos)))
         mapping = {}
         for video in videos:
+            flow_pool_id = video['flowPoolId']
+            if int(flow_pool_id) not in flow_pool_id_list:
+                continue
+            # print(f"flow_pool_id: {flow_pool_id}")
             video_id = video['videoId']
             video_ids.add(video_id)
             item_info = {'flowPool': video['flowPool'], 'level': video['level']}
@@ -168,6 +173,7 @@ def get_flow_pool_data(app_type, video_info_list):
                 mapping[video_id].append(item_info)
             else:
                 mapping[video_id] = [item_info]
+        log_.info(f"需更新流量池视频数: {len(video_ids)}")
 
         # 对视频状态进行过滤
         if app_type == config_.APP_TYPE['APP']:
@@ -278,6 +284,14 @@ if __name__ == '__main__':
     time.sleep(60)
     app_type_list = [config_.APP_TYPE['LAO_HAO_KAN_VIDEO'], config_.APP_TYPE['ZUI_JING_QI']]
     log_.info('flow pool predict start...')
+    # 获取对应流量池id列表
+    redis_helper = RedisHelper()
+    flow_pool_abtest_config = redis_helper.get_data_from_redis(key_name=config_.FLOWPOOL_ABTEST_KEY_NAME)
+    if flow_pool_abtest_config is not None:
+        flow_pool_abtest_config = json.loads(flow_pool_abtest_config)
+    else:
+        flow_pool_abtest_config = {}
+    flow_pool_id_list = flow_pool_abtest_config.get('experimental_flow_set_level', [])
     video_info_list = []
     for app_name, app_type in config_.APP_TYPE.items():
         log_.info('{} predict start...'.format(app_name))
@@ -286,7 +300,8 @@ if __name__ == '__main__':
         elif app_type == config_.APP_TYPE['ZUI_JING_QI']:
             continue
         else:
-            video_info_list = get_flow_pool_data(app_type=app_type, video_info_list=video_info_list)
+            video_info_list = get_flow_pool_data(app_type=app_type, video_info_list=video_info_list,
+                                                 flow_pool_id_list=flow_pool_id_list)
         log_.info('{} predict end...'.format(app_name))
 
     # 更新剩余分发数

+ 11 - 0
flowpool_data_update_with_level_task.sh

@@ -0,0 +1,11 @@
+source /etc/profile
+echo $ROV_OFFLINE_ENV
+if [[ $ROV_OFFLINE_ENV == 'test' ]]; then
+#    cd /data2/rov-offline && /root/anaconda3/bin/python /data2/rov-offline/pool_predict.py &
+#    cd /data2/rov-offline && /root/anaconda3/bin/python /data2/rov-offline/flowpool_data_update.py
+    cd /data2/rov-offline && /root/anaconda3/bin/python /data2/rov-offline/flowpool_data_update_with_level.py
+elif [[ $ROV_OFFLINE_ENV == 'pro' ]]; then
+#    cd /data/rov-offline && /root/anaconda3/bin/python /data/rov-offline/pool_predict.py
+#    cd /data/rov-offline && /root/anaconda3/bin/python /data/rov-offline/flowpool_data_update.py
+    cd /data/rov-offline && /root/anaconda3/bin/python /data/rov-offline/flowpool_data_update_with_level.py
+fi

+ 19 - 2
pool_predict.py

@@ -1,4 +1,5 @@
 import datetime
+import json
 import random
 import time
 import os
@@ -137,7 +138,7 @@ def online_flow_pool_data_to_redis(app_type, video_ids_set, video_info_data):
                                                        expire_time=3600)
 
 
-def predict(app_type, video_info_list):
+def predict(app_type, video_info_list, flow_pool_id_list):
     """
     对流量池视频排序,并将结果上传Redis
     :param app_type: 产品标识 type-int
@@ -154,12 +155,17 @@ def predict(app_type, video_info_list):
         log_.info('流量池中视频数:{}'.format(len(videos)))
         mapping = {}
         for video in videos:
+            flow_pool_id = video['flowPoolId']
+            if int(flow_pool_id) not in flow_pool_id_list:
+                continue
+            # print(f"flow_pool_id: {flow_pool_id}")
             video_id = video['videoId']
             video_ids.add(video_id)
             if video_id in mapping:
                 mapping[video_id].append(video['flowPool'])
             else:
                 mapping[video_id] = [video['flowPool']]
+        log_.info(f"需更新流量池视频数: {len(video_ids)}")
 
         # 对视频状态进行过滤
         if app_type == config_.APP_TYPE['APP']:
@@ -375,8 +381,18 @@ def predict_19(app_type):
 
 
 if __name__ == '__main__':
+    # 为避免第一个app_type获取数据不全,等待1min
+    time.sleep(60)
     app_type_list = [config_.APP_TYPE['LAO_HAO_KAN_VIDEO'], config_.APP_TYPE['ZUI_JING_QI']]
     log_.info('flow pool predict start...')
+    # 获取对应流量池id列表
+    redis_helper = RedisHelper()
+    flow_pool_abtest_config = redis_helper.get_data_from_redis(key_name=config_.FLOWPOOL_ABTEST_KEY_NAME)
+    if flow_pool_abtest_config is not None:
+        flow_pool_abtest_config = json.loads(flow_pool_abtest_config)
+    else:
+        flow_pool_abtest_config = {}
+    flow_pool_id_list = flow_pool_abtest_config.get('control_group', [])
     video_info_list = []
     for app_name, app_type in config_.APP_TYPE.items():
         log_.info('{} predict start...'.format(app_name))
@@ -387,7 +403,8 @@ if __name__ == '__main__':
             # predict_19(app_type=app_type)
             continue
         else:
-            video_info_list = predict(app_type=app_type, video_info_list=video_info_list)
+            video_info_list = predict(app_type=app_type, video_info_list=video_info_list,
+                                      flow_pool_id_list=flow_pool_id_list)
         log_.info('{} predict end...'.format(app_name))
 
     # 更新剩余分发数

+ 1 - 1
videos_filter.py

@@ -1081,7 +1081,7 @@ def main():
         # appType = 19, ROV召回池视频过滤
         # filter_rov_pool(app_type=config_.APP_TYPE['ZUI_JING_QI'])
         # 流量池视频过滤
-        # filter_flow_pool()
+        filter_flow_pool()
         # filter_flow_pool_new()
         filter_flow_pool_new_with_level()
         # 兜底视频过滤