Explorar el Código

add rank abtest

liqian hace 1 año
padre
commit
7055132980
Se han modificado 7 ficheros con 401 adiciones y 2 borrados
  1. 85 0
      cal_24h_score.py
  2. 129 0
      cal_hour_score.py
  3. 61 0
      compose_score.py
  4. 2 2
      config.py
  5. 6 0
      db_helper.py
  6. 54 0
      export_24h_vid.py
  7. 64 0
      export_hour_vid.py

+ 85 - 0
cal_24h_score.py

@@ -0,0 +1,85 @@
+# coding utf-8
+import sys
+import json
+import math
+import pandas as pd
+
+
+features = [
+    'apptype',
+    'videoid',
+    'preview人数',  # 过去24h预曝光人数
+    'view人数',  # 过去24h曝光人数
+    'play人数',  # 过去24h播放人数
+    'share人数',  # 过去24h分享人数
+    '回流人数',  # 过去24h分享,过去24h回流人数
+    'preview次数',  # 过去24h预曝光次数
+    'view次数',  # 过去24h曝光次数
+    'play次数',  # 过去24h播放次数
+    'share次数',  # 过去24h分享次数
+    'platform_return',
+    'platform_preview',
+    'platform_preview_total',
+    'platform_show',
+    'platform_show_total',
+    'platform_view',
+    'platform_view_total',
+]
+
+
+def data_group(data_path):
+    """将数据按照videoid聚合(求和)"""
+    f = open(data_path)
+    index = 0
+    data_dict = {}
+    while True:
+        line = f.readline()
+        if not line:
+            break
+        if index == 0:
+            index += 1
+            continue
+        index += 1
+        items = line.strip().split(",")
+        # print(items)
+        if len(items) < len(features):
+            continue
+        video_id = items[1]
+        if video_id not in data_dict:
+            data_dict[video_id] = {'videoid': video_id}
+            for i, feature in enumerate(features):
+                if feature in ['apptype', 'videoid']:
+                    continue
+                data_dict[video_id][feature] = int(float(items[i]))
+        else:
+            for i, feature in enumerate(features):
+                if feature in ['apptype', 'videoid']:
+                    continue
+                data_dict[video_id][feature] = data_dict[video_id][feature] + int(float(items[i]))
+    f.close()
+    data_list = [item for video_id, item in data_dict.items()]
+    data_df = pd.DataFrame(data_list)
+    return data_df
+
+
+def cal_score(data_df):
+    """计算score"""
+    df = data_df.copy()
+    # score1 = 回流/(view+10)
+    df['24h_score1'] = df['回流人数'] / (df['view人数'] + 10)
+    score_df = df[['videoid', '24h_score1']]
+    # print(score_df)
+    return score_df
+
+
+if __name__ == "__main__":
+    # 1.load data
+    now_date = sys.argv[1]
+    print(f"now_date: {now_date}")
+    data_path = f"./data/24h_video_data_{now_date}.csv"
+    data_df = data_group(data_path=data_path)
+    print(f"data_df shape: {data_df.shape}")
+    hour_score_path = f"./data/24h_score_{now_date}.csv"
+    score_df = cal_score(data_df=data_df)
+    score_df.to_csv(hour_score_path, index=False)
+    print(f"score_df shape: {score_df.shape}")

+ 129 - 0
cal_hour_score.py

@@ -0,0 +1,129 @@
+# coding utf-8
+import sys
+import json
+import math
+import pandas as pd
+
+
+features = [
+    'apptype',
+    'code',
+    'videoid',
+    'lastonehour_preview',  # 过去1小时预曝光人数
+    'lastonehour_view',  # 过去1小时曝光人数
+    'lastonehour_play',  # 过去1小时播放人数
+    'lastonehour_share',  # 过去1小时分享人数
+    'lastonehour_return',  # 过去1小时分享,过去1小时回流人数
+    'lastonehour_preview_total',  # 过去1小时预曝光次数
+    'lastonehour_view_total',  # 过去1小时曝光次数
+    'lastonehour_play_total',  # 过去1小时播放次数
+    'lastonehour_share_total',  # 过去1小时分享次数
+    'platform_return',
+    'lastonehour_show',  # 不区分地域
+    'lastonehour_show_region',  # 地域分组
+    'lasttwohour_share',  # h-2小时分享人数
+    'lasttwohour_return_now',  # h-2分享,过去1小时回流人数
+    'lasttwohour_return',  # h-2分享,h-2回流人数
+    'lastthreehour_share',  # h-3小时分享人数
+    'lastthreehour_return_now',  # h-3分享,过去1小时回流人数
+    'lastthreehour_return',  # h-3分享,h-3回流人数
+
+    'lastonehour_return_new',  # 过去1小时分享,过去1小时回流人数(回流统计为对应地域分享带回的回流,分享限制地域,回流不限制地域)
+    'lasttwohour_return_now_new',  # h-2分享,过去1小时回流人数(回流统计为对应地域分享带回的回流,分享限制地域,回流不限制地域)
+    'lasttwohour_return_new',  # h-2分享,h-2回流人数(回流统计为对应地域分享带回的回流,分享限制地域,回流不限制地域)
+    'lastthreehour_return_now_new',  # h-3分享,过去1小时回流人数(回流统计为对应地域分享带回的回流,分享限制地域,回流不限制地域)
+    'lastthreehour_return_new',  # h-3分享,h-3回流人数(回流统计为对应地域分享带回的回流,分享限制地域,回流不限制地域)
+    'platform_return_new',  # 平台分发回流(回流统计为对应地域分享带回的回流,分享限制地域,回流不限制地域)
+]
+
+
+def data_group(data_path):
+    """将数据按照videoid聚合(求和)"""
+    f = open(data_path)
+    index = 0
+    data_dict = {}
+    while True:
+        line = f.readline()
+        if not line:
+            break
+        if index == 0:
+            index += 1
+            continue
+        index += 1
+        items = line.strip().split(",")
+        # print(items)
+        if len(items) < len(features):
+            continue
+        video_id = items[2]
+        if video_id not in data_dict:
+            data_dict[video_id] = {'videoid': video_id}
+            for i, feature in enumerate(features):
+                if feature in ['apptype', 'code', 'videoid']:
+                    continue
+                data_dict[video_id][feature] = int(float(items[i]))
+        else:
+            for i, feature in enumerate(features):
+                if feature in ['apptype', 'code', 'videoid']:
+                    continue
+                data_dict[video_id][feature] = data_dict[video_id][feature] + int(float(items[i]))
+    f.close()
+    data_list = [item for video_id, item in data_dict.items()]
+    data_df = pd.DataFrame(data_list)
+    return data_df
+
+
+def cal_score(data_df):
+    """计算score"""
+    df = data_df.copy()
+    # share_rate_view = (share+1)/(view+1000)
+    df['share_rate_view'] = (df['lastonehour_share'] + 1) / (df['lastonehour_view'] + 1000)
+
+    # back_rate = (return+1)/(share+10)
+    df['back_rate'] = (df['lastonehour_return'] + 1) / (df['lastonehour_share'] + 10)
+    # back_rate_2h = (lasttwohour_return_now+1)/(share+10)
+    df['back_rate_2h'] = (df['lasttwohour_return_now'] + 1) / (df['lasttwohour_share'] + 10)
+    # back_rate_3h = (lastthreehour_return_now+1)/(share+10)
+    df['back_rate_3h'] = (df['lastthreehour_return_now'] + 1) / (df['lastthreehour_share'] + 10)
+
+    df['log_back'] = (df['lastonehour_return'] + 1).apply(math.log)
+
+    # h-2回流留存
+    df['return_retention_initial_2h'] = (df['lasttwohour_return_now'] + 1) / (df['lasttwohour_return'] + 5)
+    df['return_retention_2h'] = df['return_retention_initial_2h'].apply(lambda x: 1 if x > 1 else x)
+    # h-3回流留存
+    df['return_retention_initial_3h'] = (df['lastthreehour_return_now'] + 1) / (df['lastthreehour_return'] + 10)
+    df['return_retention_3h'] = df['return_retention_initial_3h'].apply(lambda x: 0.8 if x > 0.8 else x)
+
+    # score1 = 回流/(view+5)
+    df['hour_score1'] = df['lastonehour_return'] / (df['lastonehour_view'] + 5)
+
+    # score2 = (回流 * (1 + h-2回流留存 + h-3回流留存))/(view+1000)
+    df['hour_score2'] = (df['lastonehour_return'] * (1 + df['return_retention_2h'] + df['return_retention_3h'])) / \
+                   (df['lastonehour_view'] + 1000)
+
+    # score3 = (lastthreehour_return_now + lasttwohour_return_now + lastonehour_return)/(lastonehour_view+1000)
+    df['hour_score3'] = (df['lastthreehour_return_now'] + df['lasttwohour_return_now'] + df['lastonehour_return']) / \
+                   (df['lastonehour_view'] + 1000)
+
+    # score4 = share/view * back_rate * logback
+    df['hour_score4'] = df['share_rate_view'] * df['back_rate'] * df['log_back']
+
+    # score5 = share/view * (back_rate + back_rate_2h + back_rate_3h) * logback
+    df['hour_score5'] = df['share_rate_view'] * (df['back_rate'] + df['back_rate_2h'] + df['back_rate_3h']) * df['log_back']
+
+    score_df = df[['videoid', 'hour_score1', 'hour_score2', 'hour_score3', 'hour_score4', 'hour_score5']]
+    # print(score_df)
+    return score_df
+
+
+if __name__ == "__main__":
+    # 1.load data
+    now_date = sys.argv[1]
+    print(f"now_date: {now_date}")
+    data_path = f"./data/hour_video_data_{now_date}.csv"
+    data_df = data_group(data_path=data_path)
+    print(f"data_df shape: {data_df.shape}")
+    hour_score_path = f"./data/hour_score_{now_date}.csv"
+    score_df = cal_score(data_df=data_df)
+    score_df.to_csv(hour_score_path, index=False)
+    print(f"score_df shape: {score_df.shape}")

+ 61 - 0
compose_score.py

@@ -0,0 +1,61 @@
+import sys
+import pandas as pd
+from db_helper import RedisHelper
+
+redis_helper = RedisHelper()
+
+
+def cal_compose_score(score_hour_path, score_24h_path):
+    """分值合并"""
+    score_hour_df = pd.read_csv(score_hour_path)
+    score_24h_df = pd.read_csv(score_24h_path)
+    print(score_hour_df)
+    print(score_24h_df)
+    score_hour_df['videoid'] = score_hour_df['videoid'].astype(int)
+    score_24h_df['videoid'] = score_24h_df['videoid'].astype(int)
+    score_merge_df = pd.merge(score_hour_df, score_24h_df, on='videoid', how='outer')
+    score_merge_df.fillna(0, inplace=True)
+    print(score_merge_df)
+    print(score_hour_df.shape)
+    print(score_24h_df.shape)
+    print(score_merge_df.shape)
+    score_merge_df['score1'] = score_merge_df['24h_score1'] + score_merge_df['hour_score1']
+    score_merge_df['score2'] = score_merge_df['24h_score1'] + score_merge_df['hour_score2']
+    score_merge_df['score3'] = score_merge_df['24h_score1'] + score_merge_df['hour_score3']
+    score_merge_df['score4'] = score_merge_df['24h_score1'] + score_merge_df['hour_score4']
+    score_merge_df['score5'] = score_merge_df['24h_score1'] + score_merge_df['hour_score5']
+    print(score_merge_df)
+    print(score_merge_df.shape)
+    score_df = score_merge_df[['videoid', 'score1', 'score2', 'score3', 'score4', 'score5']]
+    return score_df
+
+
+def score_to_redis(score_df):
+    redis_data = dict()
+    rank_score_key_prefix = 'rank:'
+    score_name_list = score_df.columns.to_list()[1:]
+    for ind, row in score_df.iterrows():
+        video_id = int(row['videoid'])
+        for score_name in score_name_list:
+            score = row[score_name]
+            rank_score_key = f"{rank_score_key_prefix}{score_name}:{video_id}"
+            redis_data[rank_score_key] = score
+            print(rank_score_key, score)
+            # redis_helper.set_data_to_redis(key_name=rank_score_key, value=score, expire_time=24*60*60)
+            if ind % 1000 == 0:
+                if len(redis_data) > 0:
+                    print(ind, len(redis_data))
+                    redis_helper.update_batch_set_key(data=redis_data, expire_time=24*60*60)
+                    redis_data = {}
+    if len(redis_data) > 0:
+        redis_helper.update_batch_set_key(data=redis_data, expire_time=24 * 60 * 60)
+    print(len(redis_data))
+
+
+if __name__ == '__main__':
+    now_date = sys.argv[1]
+    print("now date:", now_date)
+    score_hour_path = f"./data/hour_score_{now_date}.csv"
+    score_24h_path = f"./data/24h_score_{now_date}.csv"
+    score_df = cal_compose_score(score_hour_path=score_hour_path, score_24h_path=score_24h_path)
+    score_to_redis(score_df=score_df)

+ 2 - 2
config.py

@@ -2553,8 +2553,8 @@ class ProductionConfig(BaseConfig):
 
 def set_config():
     # 获取环境变量 ROV_OFFLINE_ENV
-    env = os.environ.get('ROV_OFFLINE_ENV')
-    # env = 'dev'
+    # env = os.environ.get('ROV_OFFLINE_ENV')
+    env = 'dev'
     if env is None:
         # log_.error('ENV ERROR: is None!')
         return

+ 6 - 0
db_helper.py

@@ -304,6 +304,12 @@ class RedisHelper(object):
         conn = self.connect()
         conn.expire(name=key_name, time=int(expire_time))
 
+    def update_batch_set_key(self, data, expire_time=5*60):
+        conn = self.connect()
+        conn.mset(data)
+        for key_name in data:
+            conn.expire(name=key_name, time=int(expire_time))
+
 
 class HologresHelper(object):
     def __init__(self):

+ 54 - 0
export_24h_vid.py

@@ -0,0 +1,54 @@
+import sys
+import pandas as pd
+from utils import get_data_from_odps
+from config import set_config
+from log import Log
+config_, _ = set_config()
+log_ = Log()
+
+features = [
+    'apptype',
+    'videoid',
+    'preview人数',  # 过去24h预曝光人数
+    'view人数',  # 过去24h曝光人数
+    'play人数',  # 过去24h播放人数
+    'share人数',  # 过去24h分享人数
+    '回流人数',  # 过去24h分享,过去24h回流人数
+    'preview次数',  # 过去24h预曝光次数
+    'view次数',  # 过去24h曝光次数
+    'play次数',  # 过去24h播放次数
+    'share次数',  # 过去24h分享次数
+    'platform_return',
+    'platform_preview',
+    'platform_preview_total',
+    'platform_show',
+    'platform_show_total',
+    'platform_view',
+    'platform_view_total',
+]
+
+
+def get_feature_data(project, table, now_date):
+    """获取特征数据"""
+    # dt = datetime.datetime.strftime(now_date, '%Y%m%d%H')
+    # dt = '2022041310'
+    records = get_data_from_odps(date=now_date, project=project, table=table)
+    feature_data = []
+    for record in records:
+        item = {}
+        for feature_name in features:
+            item[feature_name] = record[feature_name]
+        feature_data.append(item)
+    feature_df = pd.DataFrame(feature_data)
+    return feature_df
+
+
+if __name__ == "__main__":
+    project = config_.PROJECT_24H_APP_TYPE
+    table = config_.TABLE_24H_APP_TYPE
+    now_date = sys.argv[1]
+    print("now date:", now_date)
+    data = get_feature_data(project=project, table=table, now_date=now_date)
+    data = data.fillna(0)
+    data.to_csv(f"./data/24h_video_data_{now_date}.csv", index=False)
+    print(f"data shape: {data.shape}")

+ 64 - 0
export_hour_vid.py

@@ -0,0 +1,64 @@
+import sys
+import pandas as pd
+from utils import get_data_from_odps
+from config import set_config
+from log import Log
+config_, _ = set_config()
+log_ = Log()
+
+features = [
+    'apptype',
+    'code',
+    'videoid',
+    'lastonehour_preview',  # 过去1小时预曝光人数
+    'lastonehour_view',  # 过去1小时曝光人数
+    'lastonehour_play',  # 过去1小时播放人数
+    'lastonehour_share',  # 过去1小时分享人数
+    'lastonehour_return',  # 过去1小时分享,过去1小时回流人数
+    'lastonehour_preview_total',  # 过去1小时预曝光次数
+    'lastonehour_view_total',  # 过去1小时曝光次数
+    'lastonehour_play_total',  # 过去1小时播放次数
+    'lastonehour_share_total',  # 过去1小时分享次数
+    'platform_return',
+    'lastonehour_show',  # 不区分地域
+    'lastonehour_show_region',  # 地域分组
+    'lasttwohour_share',  # h-2小时分享人数
+    'lasttwohour_return_now',  # h-2分享,过去1小时回流人数
+    'lasttwohour_return',  # h-2分享,h-2回流人数
+    'lastthreehour_share',  # h-3小时分享人数
+    'lastthreehour_return_now',  # h-3分享,过去1小时回流人数
+    'lastthreehour_return',  # h-3分享,h-3回流人数
+
+    'lastonehour_return_new',  # 过去1小时分享,过去1小时回流人数(回流统计为对应地域分享带回的回流,分享限制地域,回流不限制地域)
+    'lasttwohour_return_now_new',  # h-2分享,过去1小时回流人数(回流统计为对应地域分享带回的回流,分享限制地域,回流不限制地域)
+    'lasttwohour_return_new',  # h-2分享,h-2回流人数(回流统计为对应地域分享带回的回流,分享限制地域,回流不限制地域)
+    'lastthreehour_return_now_new',  # h-3分享,过去1小时回流人数(回流统计为对应地域分享带回的回流,分享限制地域,回流不限制地域)
+    'lastthreehour_return_new',  # h-3分享,h-3回流人数(回流统计为对应地域分享带回的回流,分享限制地域,回流不限制地域)
+    'platform_return_new',  # 平台分发回流(回流统计为对应地域分享带回的回流,分享限制地域,回流不限制地域)
+]
+
+
+def get_feature_data(project, table, now_date):
+    """获取特征数据"""
+    # dt = datetime.datetime.strftime(now_date, '%Y%m%d%H')
+    # dt = '2022041310'
+    records = get_data_from_odps(date=now_date, project=project, table=table)
+    feature_data = []
+    for record in records:
+        item = {}
+        for feature_name in features:
+            item[feature_name] = record[feature_name]
+        feature_data.append(item)
+    feature_df = pd.DataFrame(feature_data)
+    return feature_df
+
+
+if __name__ == "__main__":
+    project = config_.PROJECT_REGION_APP_TYPE
+    table = config_.TABLE_REGION_APP_TYPE
+    now_date = sys.argv[1]
+    print("now date:", now_date)
+    data = get_feature_data(project=project, table=table, now_date=now_date)
+    data = data.fillna(0)
+    data.to_csv(f"./data/hour_video_data_{now_date}.csv", index=False)
+    print(f"data shape: {data.shape}")