Forráskód Böngészése

Merge branch 'feature_2023101713_liqian_add_rank_abtest' into test

liqian 1 éve
szülő
commit
f17d738c62
8 módosított fájl, 561 hozzáadás és 0 törlés
  1. 99 0
      cal_24h_score.py
  2. 143 0
      cal_hour_score.py
  3. 25 0
      clean.sh
  4. 84 0
      compose_score.py
  5. 6 0
      db_helper.py
  6. 65 0
      export_24h_vid.py
  7. 76 0
      export_hour_vid.py
  8. 63 0
      rank_score_update_task.sh

+ 99 - 0
cal_24h_score.py

@@ -0,0 +1,99 @@
+# coding utf-8
+import sys
+import traceback
+import pandas as pd
+from utils import send_msg_to_feishu
+from config import set_config
+from log import Log
+config_, _ = set_config()
+log_ = Log()
+
+
+
+features = [
+    'apptype',
+    'videoid',
+    'preview人数',  # 过去24h预曝光人数
+    'view人数',  # 过去24h曝光人数
+    'play人数',  # 过去24h播放人数
+    'share人数',  # 过去24h分享人数
+    '回流人数',  # 过去24h分享,过去24h回流人数
+    'preview次数',  # 过去24h预曝光次数
+    'view次数',  # 过去24h曝光次数
+    'play次数',  # 过去24h播放次数
+    'share次数',  # 过去24h分享次数
+    'platform_return',
+    'platform_preview',
+    'platform_preview_total',
+    'platform_show',
+    'platform_show_total',
+    'platform_view',
+    'platform_view_total',
+]
+
+
+def data_group(data_path):
+    """将数据按照videoid聚合(求和)"""
+    f = open(data_path)
+    index = 0
+    data_dict = {}
+    while True:
+        line = f.readline()
+        if not line:
+            break
+        if index == 0:
+            index += 1
+            continue
+        index += 1
+        items = line.strip().split(",")
+        # print(items)
+        if len(items) < len(features):
+            continue
+        video_id = items[1]
+        if video_id not in data_dict:
+            data_dict[video_id] = {'videoid': video_id}
+            for i, feature in enumerate(features):
+                if feature in ['apptype', 'videoid']:
+                    continue
+                data_dict[video_id][feature] = int(float(items[i]))
+        else:
+            for i, feature in enumerate(features):
+                if feature in ['apptype', 'videoid']:
+                    continue
+                data_dict[video_id][feature] = data_dict[video_id][feature] + int(float(items[i]))
+    f.close()
+    data_list = [item for video_id, item in data_dict.items()]
+    data_df = pd.DataFrame(data_list)
+    return data_df
+
+
+def cal_score(data_df):
+    """计算score"""
+    df = data_df.copy()
+    # score1 = 回流/(view+10)
+    df['24h_score1'] = df['回流人数'] / (df['view人数'] + 10)
+    score_df = df[['videoid', '24h_score1']]
+    # print(score_df)
+    return score_df
+
+
+if __name__ == "__main__":
+    try:
+        now_date = sys.argv[1]
+        print(f"now_date: {now_date}")
+        data_path = f"./data/24h_video_data_{now_date}.csv"
+        data_df = data_group(data_path=data_path)
+        print(f"24h data_df shape: {data_df.shape}")
+        hour_score_path = f"./data/24h_score_{now_date}.csv"
+        score_df = cal_score(data_df=data_df)
+        score_df.to_csv(hour_score_path, index=False)
+        print(f"24h score_df shape: {score_df.shape}")
+    except Exception as e:
+        log_.error(f"rank 24h分值更新失败, exception: {e}, traceback: {traceback.format_exc()}")
+        send_msg_to_feishu(
+            webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'),
+            key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'),
+            msg_text=f"rov-offline{config_.ENV_TEXT} - rank 24h分值更新失败\n"
+                     f"exception: {e}\n"
+                     f"traceback: {traceback.format_exc()}"
+        )

+ 143 - 0
cal_hour_score.py

@@ -0,0 +1,143 @@
+# coding utf-8
+import sys
+import traceback
+import math
+import pandas as pd
+from utils import send_msg_to_feishu
+from config import set_config
+from log import Log
+config_, _ = set_config()
+log_ = Log()
+
+
+features = [
+    'apptype',
+    'code',
+    'videoid',
+    'lastonehour_preview',  # 过去1小时预曝光人数
+    'lastonehour_view',  # 过去1小时曝光人数
+    'lastonehour_play',  # 过去1小时播放人数
+    'lastonehour_share',  # 过去1小时分享人数
+    'lastonehour_return',  # 过去1小时分享,过去1小时回流人数
+    'lastonehour_preview_total',  # 过去1小时预曝光次数
+    'lastonehour_view_total',  # 过去1小时曝光次数
+    'lastonehour_play_total',  # 过去1小时播放次数
+    'lastonehour_share_total',  # 过去1小时分享次数
+    'platform_return',
+    'lastonehour_show',  # 不区分地域
+    'lastonehour_show_region',  # 地域分组
+    'lasttwohour_share',  # h-2小时分享人数
+    'lasttwohour_return_now',  # h-2分享,过去1小时回流人数
+    'lasttwohour_return',  # h-2分享,h-2回流人数
+    'lastthreehour_share',  # h-3小时分享人数
+    'lastthreehour_return_now',  # h-3分享,过去1小时回流人数
+    'lastthreehour_return',  # h-3分享,h-3回流人数
+
+    'lastonehour_return_new',  # 过去1小时分享,过去1小时回流人数(回流统计为对应地域分享带回的回流,分享限制地域,回流不限制地域)
+    'lasttwohour_return_now_new',  # h-2分享,过去1小时回流人数(回流统计为对应地域分享带回的回流,分享限制地域,回流不限制地域)
+    'lasttwohour_return_new',  # h-2分享,h-2回流人数(回流统计为对应地域分享带回的回流,分享限制地域,回流不限制地域)
+    'lastthreehour_return_now_new',  # h-3分享,过去1小时回流人数(回流统计为对应地域分享带回的回流,分享限制地域,回流不限制地域)
+    'lastthreehour_return_new',  # h-3分享,h-3回流人数(回流统计为对应地域分享带回的回流,分享限制地域,回流不限制地域)
+    'platform_return_new',  # 平台分发回流(回流统计为对应地域分享带回的回流,分享限制地域,回流不限制地域)
+]
+
+
+def data_group(data_path):
+    """将数据按照videoid聚合(求和)"""
+    f = open(data_path)
+    index = 0
+    data_dict = {}
+    while True:
+        line = f.readline()
+        if not line:
+            break
+        if index == 0:
+            index += 1
+            continue
+        index += 1
+        items = line.strip().split(",")
+        # print(items)
+        if len(items) < len(features):
+            continue
+        video_id = items[2]
+        if video_id not in data_dict:
+            data_dict[video_id] = {'videoid': video_id}
+            for i, feature in enumerate(features):
+                if feature in ['apptype', 'code', 'videoid']:
+                    continue
+                data_dict[video_id][feature] = int(float(items[i]))
+        else:
+            for i, feature in enumerate(features):
+                if feature in ['apptype', 'code', 'videoid']:
+                    continue
+                data_dict[video_id][feature] = data_dict[video_id][feature] + int(float(items[i]))
+    f.close()
+    data_list = [item for video_id, item in data_dict.items()]
+    data_df = pd.DataFrame(data_list)
+    return data_df
+
+
+def cal_score(data_df):
+    """计算score"""
+    df = data_df.copy()
+    # share_rate_view = (share+1)/(view+1000)
+    df['share_rate_view'] = (df['lastonehour_share'] + 1) / (df['lastonehour_view'] + 1000)
+
+    # back_rate = (return+1)/(share+10)
+    df['back_rate'] = (df['lastonehour_return'] + 1) / (df['lastonehour_share'] + 10)
+    # back_rate_2h = (lasttwohour_return_now+1)/(share+10)
+    df['back_rate_2h'] = (df['lasttwohour_return_now'] + 1) / (df['lasttwohour_share'] + 10)
+    # back_rate_3h = (lastthreehour_return_now+1)/(share+10)
+    df['back_rate_3h'] = (df['lastthreehour_return_now'] + 1) / (df['lastthreehour_share'] + 10)
+
+    df['log_back'] = (df['lastonehour_return'] + 1).apply(math.log)
+
+    # h-2回流留存
+    df['return_retention_initial_2h'] = (df['lasttwohour_return_now'] + 1) / (df['lasttwohour_return'] + 5)
+    df['return_retention_2h'] = df['return_retention_initial_2h'].apply(lambda x: 1 if x > 1 else x)
+    # h-3回流留存
+    df['return_retention_initial_3h'] = (df['lastthreehour_return_now'] + 1) / (df['lastthreehour_return'] + 10)
+    df['return_retention_3h'] = df['return_retention_initial_3h'].apply(lambda x: 0.8 if x > 0.8 else x)
+
+    # score1 = 回流/(view+5)
+    df['hour_score1'] = df['lastonehour_return'] / (df['lastonehour_view'] + 5)
+
+    # score2 = (回流 * (1 + h-2回流留存 + h-3回流留存))/(view+1000)
+    df['hour_score2'] = (df['lastonehour_return'] * (1 + df['return_retention_2h'] + df['return_retention_3h'])) / \
+                   (df['lastonehour_view'] + 1000)
+
+    # score3 = (lastthreehour_return_now + lasttwohour_return_now + lastonehour_return)/(lastonehour_view+1000)
+    df['hour_score3'] = (df['lastthreehour_return_now'] + df['lasttwohour_return_now'] + df['lastonehour_return']) / \
+                   (df['lastonehour_view'] + 1000)
+
+    # score4 = share/view * back_rate * logback
+    df['hour_score4'] = df['share_rate_view'] * df['back_rate'] * df['log_back']
+
+    # score5 = share/view * (back_rate + back_rate_2h + back_rate_3h) * logback
+    df['hour_score5'] = df['share_rate_view'] * (df['back_rate'] + df['back_rate_2h'] + df['back_rate_3h']) * df['log_back']
+
+    score_df = df[['videoid', 'hour_score1', 'hour_score2', 'hour_score3', 'hour_score4', 'hour_score5']]
+    # print(score_df)
+    return score_df
+
+
+if __name__ == "__main__":
+    try:
+        now_date = sys.argv[1]
+        print(f"now_date: {now_date}")
+        data_path = f"./data/hour_video_data_{now_date}.csv"
+        data_df = data_group(data_path=data_path)
+        print(f"hour data_df shape: {data_df.shape}")
+        hour_score_path = f"./data/hour_score_{now_date}.csv"
+        score_df = cal_score(data_df=data_df)
+        score_df.to_csv(hour_score_path, index=False)
+        print(f"hour score_df shape: {score_df.shape}")
+    except Exception as e:
+        log_.error(f"rank 小时级分值更新失败, exception: {e}, traceback: {traceback.format_exc()}")
+        send_msg_to_feishu(
+            webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'),
+            key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'),
+            msg_text=f"rov-offline{config_.ENV_TEXT} - rank 小时级分值更新失败\n"
+                     f"exception: {e}\n"
+                     f"traceback: {traceback.format_exc()}"
+        )

+ 25 - 0
clean.sh

@@ -0,0 +1,25 @@
+ #!/bin/bash
+source ~/.bash_profile
+source ~/.bashrc
+
+last3day=`date  +"%Y%m%d" -d -4days`
+hour_video_path=./data/hour_video_data_${last3day}'*'
+hour_score_path=./data/hour_score_${last3day}'*'
+day_video_path=./data/24h_video_data_${last3day}'*'
+day_score_path=./data/24h_score_${last3day}'*'
+merge_score_path = ./data/merge_score_${last3day}'*'
+log_path=./log/${last3day}'*'
+
+echo ${hour_video_path}
+echo ${hour_score_path}
+echo ${day_video_path}
+echo ${day_score_path}
+echo ${merge_score_path}
+echo ${log_path}
+
+rm -rf ${hour_video_path}
+rm -rf ${hour_score_path}
+rm -rf ${day_video_path}
+rm -rf ${day_score_path}
+rm -rf ${merge_score_path}
+rm -rf ${log_path}

+ 84 - 0
compose_score.py

@@ -0,0 +1,84 @@
+import sys
+import traceback
+import pandas as pd
+from db_helper import RedisHelper
+from utils import send_msg_to_feishu
+from config import set_config
+from log import Log
+config_, _ = set_config()
+log_ = Log()
+
+
+redis_helper = RedisHelper()
+
+
+def cal_compose_score(score_hour_path, score_24h_path, merge_score_path):
+    """分值合并"""
+    score_hour_df = pd.read_csv(score_hour_path)
+    score_24h_df = pd.read_csv(score_24h_path)
+    # print(score_hour_df)
+    # print(score_24h_df)
+    score_hour_df['videoid'] = score_hour_df['videoid'].astype(int)
+    score_24h_df['videoid'] = score_24h_df['videoid'].astype(int)
+    score_merge_df = pd.merge(score_hour_df, score_24h_df, on='videoid', how='outer')
+    score_merge_df.fillna(0, inplace=True)
+    # print(score_merge_df)
+    print(f"score_hour_df shape: {score_hour_df.shape}")
+    print(f"score_24h_df shape: {score_24h_df.shape}")
+    print(f"score_merge_df shape: {score_merge_df.shape}")
+    score_merge_df['score1'] = score_merge_df['24h_score1'] + score_merge_df['hour_score1']
+    score_merge_df['score2'] = score_merge_df['24h_score1'] + score_merge_df['hour_score2']
+    # score_merge_df['score3'] = score_merge_df['24h_score1'] + score_merge_df['hour_score3']
+    score_merge_df['score4'] = score_merge_df['24h_score1'] + score_merge_df['hour_score4']
+    score_merge_df['score5'] = score_merge_df['24h_score1'] + score_merge_df['hour_score5']
+    # print(score_merge_df)
+    print(f"score_merge_df shape: {score_merge_df.shape}")
+    score_merge_df.to_csv(merge_score_path, index=False)
+    score_df = score_merge_df[['videoid', 'score1', 'score2', 'score4', 'score5']]
+    print(f"score_df shape: {score_merge_df.shape}")
+    return score_df
+
+
+def score_to_redis(score_df):
+    redis_data = dict()
+    rank_score_key_prefix = 'rank:'
+    score_name_list = score_df.columns.to_list()[1:]
+    for ind, row in score_df.iterrows():
+        if ind % 1000 == 0:
+            if len(redis_data) > 0:
+                print(ind, len(redis_data))
+                redis_helper.update_batch_set_key(data=redis_data, expire_time=24*60*60)
+                redis_data = {}
+        video_id = int(row['videoid'])
+        for score_name in score_name_list:
+            score = row[score_name]
+            rank_score_key = f"{rank_score_key_prefix}{score_name}:{video_id}"
+            redis_data[rank_score_key] = score
+            # print(rank_score_key, score)
+            # redis_helper.set_data_to_redis(key_name=rank_score_key, value=score, expire_time=24*60*60)
+    if len(redis_data) > 0:
+        print(len(redis_data))
+        redis_helper.update_batch_set_key(data=redis_data, expire_time=24 * 60 * 60)
+
+
+if __name__ == '__main__':
+    try:
+        now_date = sys.argv[1]
+        print("now date:", now_date)
+        score_hour_path = f"./data/hour_score_{now_date}.csv"
+        score_24h_path = f"./data/24h_score_{now_date}.csv"
+        merge_score_path = f"./data/merge_score_{now_date}.csv"
+        score_df = cal_compose_score(
+            score_hour_path=score_hour_path, score_24h_path=score_24h_path, merge_score_path=merge_score_path
+        )
+        score_to_redis(score_df=score_df)
+        print("rank score update finished!")
+    except Exception as e:
+        log_.error(f"rank 分值合并更新失败, exception: {e}, traceback: {traceback.format_exc()}")
+        send_msg_to_feishu(
+            webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'),
+            key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'),
+            msg_text=f"rov-offline{config_.ENV_TEXT} - rank 分值合并更新失败\n"
+                     f"exception: {e}\n"
+                     f"traceback: {traceback.format_exc()}"
+        )

+ 6 - 0
db_helper.py

@@ -304,6 +304,12 @@ class RedisHelper(object):
         conn = self.connect()
         conn.expire(name=key_name, time=int(expire_time))
 
+    def update_batch_set_key(self, data, expire_time=5*60):
+        conn = self.connect()
+        conn.mset(data)
+        for key_name in data:
+            conn.expire(name=key_name, time=int(expire_time))
+
 
 class HologresHelper(object):
     def __init__(self):

+ 65 - 0
export_24h_vid.py

@@ -0,0 +1,65 @@
+import sys
+import traceback
+import pandas as pd
+from utils import get_data_from_odps, send_msg_to_feishu
+from config import set_config
+from log import Log
+config_, _ = set_config()
+log_ = Log()
+
+features = [
+    'apptype',
+    'videoid',
+    'preview人数',  # 过去24h预曝光人数
+    'view人数',  # 过去24h曝光人数
+    'play人数',  # 过去24h播放人数
+    'share人数',  # 过去24h分享人数
+    '回流人数',  # 过去24h分享,过去24h回流人数
+    'preview次数',  # 过去24h预曝光次数
+    'view次数',  # 过去24h曝光次数
+    'play次数',  # 过去24h播放次数
+    'share次数',  # 过去24h分享次数
+    'platform_return',
+    'platform_preview',
+    'platform_preview_total',
+    'platform_show',
+    'platform_show_total',
+    'platform_view',
+    'platform_view_total',
+]
+
+
+def get_feature_data(project, table, now_date):
+    """获取特征数据"""
+    # dt = datetime.datetime.strftime(now_date, '%Y%m%d%H')
+    # dt = '2022041310'
+    records = get_data_from_odps(date=now_date, project=project, table=table)
+    feature_data = []
+    for record in records:
+        item = {}
+        for feature_name in features:
+            item[feature_name] = record[feature_name]
+        feature_data.append(item)
+    feature_df = pd.DataFrame(feature_data)
+    return feature_df
+
+
+if __name__ == "__main__":
+    try:
+        project = config_.PROJECT_24H_APP_TYPE
+        table = config_.TABLE_24H_APP_TYPE
+        now_date = sys.argv[1]
+        print("now date:", now_date)
+        data = get_feature_data(project=project, table=table, now_date=now_date)
+        data = data.fillna(0)
+        data.to_csv(f"./data/24h_video_data_{now_date}.csv", index=False)
+        print(f"24h video data shape: {data.shape}")
+    except Exception as e:
+        log_.error(f"rank 24h数据下载失败, exception: {e}, traceback: {traceback.format_exc()}")
+        send_msg_to_feishu(
+            webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'),
+            key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'),
+            msg_text=f"rov-offline{config_.ENV_TEXT} - rank 24h数据下载失败\n"
+                     f"exception: {e}\n"
+                     f"traceback: {traceback.format_exc()}"
+        )

+ 76 - 0
export_hour_vid.py

@@ -0,0 +1,76 @@
+import sys
+import traceback
+import pandas as pd
+from utils import get_data_from_odps, send_msg_to_feishu
+from config import set_config
+from log import Log
+config_, _ = set_config()
+log_ = Log()
+
+features = [
+    'apptype',
+    'code',
+    'videoid',
+    'lastonehour_preview',  # 过去1小时预曝光人数
+    'lastonehour_view',  # 过去1小时曝光人数
+    'lastonehour_play',  # 过去1小时播放人数
+    'lastonehour_share',  # 过去1小时分享人数
+    'lastonehour_return',  # 过去1小时分享,过去1小时回流人数
+    'lastonehour_preview_total',  # 过去1小时预曝光次数
+    'lastonehour_view_total',  # 过去1小时曝光次数
+    'lastonehour_play_total',  # 过去1小时播放次数
+    'lastonehour_share_total',  # 过去1小时分享次数
+    'platform_return',
+    'lastonehour_show',  # 不区分地域
+    'lastonehour_show_region',  # 地域分组
+    'lasttwohour_share',  # h-2小时分享人数
+    'lasttwohour_return_now',  # h-2分享,过去1小时回流人数
+    'lasttwohour_return',  # h-2分享,h-2回流人数
+    'lastthreehour_share',  # h-3小时分享人数
+    'lastthreehour_return_now',  # h-3分享,过去1小时回流人数
+    'lastthreehour_return',  # h-3分享,h-3回流人数
+
+    'lastonehour_return_new',  # 过去1小时分享,过去1小时回流人数(回流统计为对应地域分享带回的回流,分享限制地域,回流不限制地域)
+    'lasttwohour_return_now_new',  # h-2分享,过去1小时回流人数(回流统计为对应地域分享带回的回流,分享限制地域,回流不限制地域)
+    'lasttwohour_return_new',  # h-2分享,h-2回流人数(回流统计为对应地域分享带回的回流,分享限制地域,回流不限制地域)
+    'lastthreehour_return_now_new',  # h-3分享,过去1小时回流人数(回流统计为对应地域分享带回的回流,分享限制地域,回流不限制地域)
+    'lastthreehour_return_new',  # h-3分享,h-3回流人数(回流统计为对应地域分享带回的回流,分享限制地域,回流不限制地域)
+    'platform_return_new',  # 平台分发回流(回流统计为对应地域分享带回的回流,分享限制地域,回流不限制地域)
+]
+
+
+def get_feature_data(project, table, now_date):
+    """获取特征数据"""
+    # dt = datetime.datetime.strftime(now_date, '%Y%m%d%H')
+    # dt = '2022041310'
+    records = get_data_from_odps(date=now_date, project=project, table=table)
+    feature_data = []
+    for record in records:
+        item = {}
+        for feature_name in features:
+            item[feature_name] = record[feature_name]
+        feature_data.append(item)
+    feature_df = pd.DataFrame(feature_data)
+    return feature_df
+
+
+if __name__ == "__main__":
+    try:
+        project = config_.PROJECT_REGION_APP_TYPE
+        table = config_.TABLE_REGION_APP_TYPE
+        now_date = sys.argv[1]
+        print("now date:", now_date)
+        data = get_feature_data(project=project, table=table, now_date=now_date)
+        data = data.fillna(0)
+        data.to_csv(f"./data/hour_video_data_{now_date}.csv", index=False)
+        print(f"hour video data shape: {data.shape}")
+    except Exception as e:
+        log_.error(f"rank 小时级数据下载失败, exception: {e}, traceback: {traceback.format_exc()}")
+        send_msg_to_feishu(
+            webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'),
+            key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'),
+            msg_text=f"rov-offline{config_.ENV_TEXT} - rank 小时级数据下载失败\n"
+                     f"exception: {e}\n"
+                     f"traceback: {traceback.format_exc()}"
+        )
+

+ 63 - 0
rank_score_update_task.sh

@@ -0,0 +1,63 @@
+source /etc/profile
+
+now_date=`date  +"%Y%m%d%H" -d -0days`
+echo ${now_date}
+echo $ROV_OFFLINE_ENV
+
+if [[ $ROV_OFFLINE_ENV == 'test' ]]; then
+    cd /data2/rov-offline
+elif [[ $ROV_OFFLINE_ENV == 'pro' ]]; then
+    cd /data/rov-offline
+fi
+
+mkdir -p ./data/
+
+# 1. 获取24h数据
+python export_24h_vid.py  ${now_date}
+if [ $? -ne 0 ];
+then
+    msg = "[ERROR] sorted extract_vid_log"
+    echo "[ERROR] echo 'extract_vid.py"
+    exit 255
+fi
+
+# 2. 获取小时级数据
+python export_hour_vid.py ${now_date}
+if [ $? -ne 0 ];
+then
+    msg = "[ERROR] sorted extract_hour_log"
+    echo "[ERROR] echo 'extract_hour_vid.py"
+    exit 255
+fi
+
+# 3. 计算24h分值
+python cal_24h_score.py ${now_date}
+if [ $? -ne 0 ];
+then
+    msg = "[ERROR] cal  ctr "
+    echo "[ERROR] echo 'calCtr.py"
+    exit 255
+fi
+
+# 4. 计算小时级分值
+python cal_hour_score.py ${now_date}
+if [ $? -ne 0 ];
+then
+    msg = "[ERROR] cal hour ctr "
+    echo "[ERROR] echo 'calCtr.py"
+    exit 255
+fi
+
+# 5. 分值合并
+python compose_score.py ${now_date}
+if [ $? -ne 0 ];
+then
+    msg = "[ERROR] cal compose_score "
+    echo "[ERROR] echo 'compose_score.py"
+    exit 255
+fi
+
+# 6. 过期数据清除
+sh clean.sh
+echo "finish sorted"
+