Browse Source

update & add rank_score_update_task.sh, clean.sh

liqian 1 year ago
parent
commit
a31c06ba88
8 changed files with 227 additions and 65 deletions
  1. 26 12
      cal_24h_score.py
  2. 25 11
      cal_hour_score.py
  3. 25 0
      clean.sh
  4. 45 22
      compose_score.py
  5. 2 2
      config.py
  6. 20 9
      export_24h_vid.py
  7. 21 9
      export_hour_vid.py
  8. 63 0
      rank_score_update_task.sh

+ 26 - 12
cal_24h_score.py

@@ -1,8 +1,13 @@
 # coding utf-8
 import sys
-import json
-import math
+import traceback
 import pandas as pd
+from utils import send_msg_to_feishu
+from config import set_config
+from log import Log
+config_, _ = set_config()
+log_ = Log()
+
 
 
 features = [
@@ -73,13 +78,22 @@ def cal_score(data_df):
 
 
 if __name__ == "__main__":
-    # 1.load data
-    now_date = sys.argv[1]
-    print(f"now_date: {now_date}")
-    data_path = f"./data/24h_video_data_{now_date}.csv"
-    data_df = data_group(data_path=data_path)
-    print(f"data_df shape: {data_df.shape}")
-    hour_score_path = f"./data/24h_score_{now_date}.csv"
-    score_df = cal_score(data_df=data_df)
-    score_df.to_csv(hour_score_path, index=False)
-    print(f"score_df shape: {score_df.shape}")
+    try:
+        now_date = sys.argv[1]
+        print(f"now_date: {now_date}")
+        data_path = f"./data/24h_video_data_{now_date}.csv"
+        data_df = data_group(data_path=data_path)
+        print(f"24h data_df shape: {data_df.shape}")
+        hour_score_path = f"./data/24h_score_{now_date}.csv"
+        score_df = cal_score(data_df=data_df)
+        score_df.to_csv(hour_score_path, index=False)
+        print(f"24h score_df shape: {score_df.shape}")
+    except Exception as e:
+        log_.error(f"rank 24h分值更新失败, exception: {e}, traceback: {traceback.format_exc()}")
+        send_msg_to_feishu(
+            webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'),
+            key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'),
+            msg_text=f"rov-offline{config_.ENV_TEXT} - rank 24h分值更新失败\n"
+                     f"exception: {e}\n"
+                     f"traceback: {traceback.format_exc()}"
+        )

+ 25 - 11
cal_hour_score.py

@@ -1,8 +1,13 @@
 # coding utf-8
 import sys
-import json
+import traceback
 import math
 import pandas as pd
+from utils import send_msg_to_feishu
+from config import set_config
+from log import Log
+config_, _ = set_config()
+log_ = Log()
 
 
 features = [
@@ -117,13 +122,22 @@ def cal_score(data_df):
 
 
 if __name__ == "__main__":
-    # 1.load data
-    now_date = sys.argv[1]
-    print(f"now_date: {now_date}")
-    data_path = f"./data/hour_video_data_{now_date}.csv"
-    data_df = data_group(data_path=data_path)
-    print(f"data_df shape: {data_df.shape}")
-    hour_score_path = f"./data/hour_score_{now_date}.csv"
-    score_df = cal_score(data_df=data_df)
-    score_df.to_csv(hour_score_path, index=False)
-    print(f"score_df shape: {score_df.shape}")
+    try:
+        now_date = sys.argv[1]
+        print(f"now_date: {now_date}")
+        data_path = f"./data/hour_video_data_{now_date}.csv"
+        data_df = data_group(data_path=data_path)
+        print(f"hour data_df shape: {data_df.shape}")
+        hour_score_path = f"./data/hour_score_{now_date}.csv"
+        score_df = cal_score(data_df=data_df)
+        score_df.to_csv(hour_score_path, index=False)
+        print(f"hour score_df shape: {score_df.shape}")
+    except Exception as e:
+        log_.error(f"rank 小时级分值更新失败, exception: {e}, traceback: {traceback.format_exc()}")
+        send_msg_to_feishu(
+            webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'),
+            key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'),
+            msg_text=f"rov-offline{config_.ENV_TEXT} - rank 小时级分值更新失败\n"
+                     f"exception: {e}\n"
+                     f"traceback: {traceback.format_exc()}"
+        )

+ 25 - 0
clean.sh

@@ -0,0 +1,25 @@
+ #!/bin/bash
+source ~/.bash_profile
+source ~/.bashrc
+
+last3day=`date  +"%Y%m%d" -d -4days`
+hour_video_path=./data/hour_video_data_${last3day}'*'
+hour_score_path=./data/hour_score_${last3day}'*'
+day_video_path=./data/24h_video_data_${last3day}'*'
+day_score_path=./data/24h_score_${last3day}'*'
+merge_score_path = ./data/merge_score_${last3day}'*'
+log_path=./log/${last3day}'*'
+
+echo ${hour_video_path}
+echo ${hour_score_path}
+echo ${day_video_path}
+echo ${day_score_path}
+echo ${merge_score_path}
+echo ${log_path}
+
+rm -rf ${hour_video_path}
+rm -rf ${hour_score_path}
+rm -rf ${day_video_path}
+rm -rf ${day_score_path}
+rm -rf ${merge_score_path}
+rm -rf ${log_path}

+ 45 - 22
compose_score.py

@@ -1,32 +1,41 @@
 import sys
+import traceback
 import pandas as pd
 from db_helper import RedisHelper
+from utils import send_msg_to_feishu
+from config import set_config
+from log import Log
+config_, _ = set_config()
+log_ = Log()
+
 
 redis_helper = RedisHelper()
 
 
-def cal_compose_score(score_hour_path, score_24h_path):
+def cal_compose_score(score_hour_path, score_24h_path, merge_score_path):
     """分值合并"""
     score_hour_df = pd.read_csv(score_hour_path)
     score_24h_df = pd.read_csv(score_24h_path)
-    print(score_hour_df)
-    print(score_24h_df)
+    # print(score_hour_df)
+    # print(score_24h_df)
     score_hour_df['videoid'] = score_hour_df['videoid'].astype(int)
     score_24h_df['videoid'] = score_24h_df['videoid'].astype(int)
     score_merge_df = pd.merge(score_hour_df, score_24h_df, on='videoid', how='outer')
     score_merge_df.fillna(0, inplace=True)
-    print(score_merge_df)
-    print(score_hour_df.shape)
-    print(score_24h_df.shape)
-    print(score_merge_df.shape)
+    # print(score_merge_df)
+    print(f"score_hour_df shape: {score_hour_df.shape}")
+    print(f"score_24h_df shape: {score_24h_df.shape}")
+    print(f"score_merge_df shape: {score_merge_df.shape}")
     score_merge_df['score1'] = score_merge_df['24h_score1'] + score_merge_df['hour_score1']
     score_merge_df['score2'] = score_merge_df['24h_score1'] + score_merge_df['hour_score2']
     score_merge_df['score3'] = score_merge_df['24h_score1'] + score_merge_df['hour_score3']
     score_merge_df['score4'] = score_merge_df['24h_score1'] + score_merge_df['hour_score4']
     score_merge_df['score5'] = score_merge_df['24h_score1'] + score_merge_df['hour_score5']
-    print(score_merge_df)
-    print(score_merge_df.shape)
+    # print(score_merge_df)
+    print(f"score_merge_df shape: {score_merge_df.shape}")
+    score_merge_df.to_csv(merge_score_path, index=False)
     score_df = score_merge_df[['videoid', 'score1', 'score2', 'score3', 'score4', 'score5']]
+    print(f"score_df shape: {score_merge_df.shape}")
     return score_df
 
 
@@ -35,27 +44,41 @@ def score_to_redis(score_df):
     rank_score_key_prefix = 'rank:'
     score_name_list = score_df.columns.to_list()[1:]
     for ind, row in score_df.iterrows():
+        if ind % 1000 == 0:
+            if len(redis_data) > 0:
+                print(ind, len(redis_data))
+                redis_helper.update_batch_set_key(data=redis_data, expire_time=24*60*60)
+                redis_data = {}
         video_id = int(row['videoid'])
         for score_name in score_name_list:
             score = row[score_name]
             rank_score_key = f"{rank_score_key_prefix}{score_name}:{video_id}"
             redis_data[rank_score_key] = score
-            print(rank_score_key, score)
+            # print(rank_score_key, score)
             # redis_helper.set_data_to_redis(key_name=rank_score_key, value=score, expire_time=24*60*60)
-            if ind % 1000 == 0:
-                if len(redis_data) > 0:
-                    print(ind, len(redis_data))
-                    redis_helper.update_batch_set_key(data=redis_data, expire_time=24*60*60)
-                    redis_data = {}
     if len(redis_data) > 0:
+        print(len(redis_data))
         redis_helper.update_batch_set_key(data=redis_data, expire_time=24 * 60 * 60)
-    print(len(redis_data))
 
 
 if __name__ == '__main__':
-    now_date = sys.argv[1]
-    print("now date:", now_date)
-    score_hour_path = f"./data/hour_score_{now_date}.csv"
-    score_24h_path = f"./data/24h_score_{now_date}.csv"
-    score_df = cal_compose_score(score_hour_path=score_hour_path, score_24h_path=score_24h_path)
-    score_to_redis(score_df=score_df)
+    try:
+        now_date = sys.argv[1]
+        print("now date:", now_date)
+        score_hour_path = f"./data/hour_score_{now_date}.csv"
+        score_24h_path = f"./data/24h_score_{now_date}.csv"
+        merge_score_path = f"./data/merge_score_{now_date}.csv"
+        score_df = cal_compose_score(
+            score_hour_path=score_hour_path, score_24h_path=score_24h_path, merge_score_path=merge_score_path
+        )
+        score_to_redis(score_df=score_df)
+        print("rank score update finished!")
+    except Exception as e:
+        log_.error(f"rank 分值合并更新失败, exception: {e}, traceback: {traceback.format_exc()}")
+        send_msg_to_feishu(
+            webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'),
+            key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'),
+            msg_text=f"rov-offline{config_.ENV_TEXT} - rank 分值合并更新失败\n"
+                     f"exception: {e}\n"
+                     f"traceback: {traceback.format_exc()}"
+        )

+ 2 - 2
config.py

@@ -2553,8 +2553,8 @@ class ProductionConfig(BaseConfig):
 
 def set_config():
     # 获取环境变量 ROV_OFFLINE_ENV
-    # env = os.environ.get('ROV_OFFLINE_ENV')
-    env = 'dev'
+    env = os.environ.get('ROV_OFFLINE_ENV')
+    # env = 'dev'
     if env is None:
         # log_.error('ENV ERROR: is None!')
         return

+ 20 - 9
export_24h_vid.py

@@ -1,6 +1,7 @@
 import sys
+import traceback
 import pandas as pd
-from utils import get_data_from_odps
+from utils import get_data_from_odps, send_msg_to_feishu
 from config import set_config
 from log import Log
 config_, _ = set_config()
@@ -44,11 +45,21 @@ def get_feature_data(project, table, now_date):
 
 
 if __name__ == "__main__":
-    project = config_.PROJECT_24H_APP_TYPE
-    table = config_.TABLE_24H_APP_TYPE
-    now_date = sys.argv[1]
-    print("now date:", now_date)
-    data = get_feature_data(project=project, table=table, now_date=now_date)
-    data = data.fillna(0)
-    data.to_csv(f"./data/24h_video_data_{now_date}.csv", index=False)
-    print(f"data shape: {data.shape}")
+    try:
+        project = config_.PROJECT_24H_APP_TYPE
+        table = config_.TABLE_24H_APP_TYPE
+        now_date = sys.argv[1]
+        print("now date:", now_date)
+        data = get_feature_data(project=project, table=table, now_date=now_date)
+        data = data.fillna(0)
+        data.to_csv(f"./data/24h_video_data_{now_date}.csv", index=False)
+        print(f"24h video data shape: {data.shape}")
+    except Exception as e:
+        log_.error(f"rank 24h数据下载失败, exception: {e}, traceback: {traceback.format_exc()}")
+        send_msg_to_feishu(
+            webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'),
+            key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'),
+            msg_text=f"rov-offline{config_.ENV_TEXT} - rank 24h数据下载失败\n"
+                     f"exception: {e}\n"
+                     f"traceback: {traceback.format_exc()}"
+        )

+ 21 - 9
export_hour_vid.py

@@ -1,6 +1,7 @@
 import sys
+import traceback
 import pandas as pd
-from utils import get_data_from_odps
+from utils import get_data_from_odps, send_msg_to_feishu
 from config import set_config
 from log import Log
 config_, _ = set_config()
@@ -54,11 +55,22 @@ def get_feature_data(project, table, now_date):
 
 
 if __name__ == "__main__":
-    project = config_.PROJECT_REGION_APP_TYPE
-    table = config_.TABLE_REGION_APP_TYPE
-    now_date = sys.argv[1]
-    print("now date:", now_date)
-    data = get_feature_data(project=project, table=table, now_date=now_date)
-    data = data.fillna(0)
-    data.to_csv(f"./data/hour_video_data_{now_date}.csv", index=False)
-    print(f"data shape: {data.shape}")
+    try:
+        project = config_.PROJECT_REGION_APP_TYPE
+        table = config_.TABLE_REGION_APP_TYPE
+        now_date = sys.argv[1]
+        print("now date:", now_date)
+        data = get_feature_data(project=project, table=table, now_date=now_date)
+        data = data.fillna(0)
+        data.to_csv(f"./data/hour_video_data_{now_date}.csv", index=False)
+        print(f"hour video data shape: {data.shape}")
+    except Exception as e:
+        log_.error(f"rank 小时级数据下载失败, exception: {e}, traceback: {traceback.format_exc()}")
+        send_msg_to_feishu(
+            webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'),
+            key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'),
+            msg_text=f"rov-offline{config_.ENV_TEXT} - rank 小时级数据下载失败\n"
+                     f"exception: {e}\n"
+                     f"traceback: {traceback.format_exc()}"
+        )
+

+ 63 - 0
rank_score_update_task.sh

@@ -0,0 +1,63 @@
+source /etc/profile
+
+now_date=`date  +"%Y%m%d%H" -d -0days`
+echo ${now_date}
+echo $ROV_OFFLINE_ENV
+
+if [[ $ROV_OFFLINE_ENV == 'test' ]]; then
+    cd /data2/rov-offline
+elif [[ $ROV_OFFLINE_ENV == 'pro' ]]; then
+    cd /data/rov-offline
+fi
+
+mkdir -p ./data/
+
+# 1. 获取24h数据
+python export_24h_vid.py  ${now_date}
+if [ $? -ne 0 ];
+then
+    msg = "[ERROR] sorted extract_vid_log"
+    echo "[ERROR] echo 'extract_vid.py"
+    exit 255
+fi
+
+# 2. 获取小时级数据
+python export_hour_vid.py ${now_date}
+if [ $? -ne 0 ];
+then
+    msg = "[ERROR] sorted extract_hour_log"
+    echo "[ERROR] echo 'extract_hour_vid.py"
+    exit 255
+fi
+
+# 3. 计算24h分值
+python cal_24h_score.py ${now_date}
+if [ $? -ne 0 ];
+then
+    msg = "[ERROR] cal  ctr "
+    echo "[ERROR] echo 'calCtr.py"
+    exit 255
+fi
+
+# 4. 计算小时级分值
+python cal_hour_score.py ${now_date}
+if [ $? -ne 0 ];
+then
+    msg = "[ERROR] cal hour ctr "
+    echo "[ERROR] echo 'calCtr.py"
+    exit 255
+fi
+
+# 5. 分值合并
+python compose_score.py ${now_date}
+if [ $? -ne 0 ];
+then
+    msg = "[ERROR] cal compose_score "
+    echo "[ERROR] echo 'compose_score.py"
+    exit 255
+fi
+
+# 6. 过期数据清除
+sh clean.sh
+echo "finish sorted"
+