liqian 2 years ago
parent
commit
1ba295e055
6 changed files with 339 additions and 100 deletions
  1. 77 0
      ad_user_video_predict.py
  2. 58 100
      ad_users_data_update.py
  3. 84 0
      ad_video_data_upate.py
  4. 2 0
      config.py
  5. 78 0
      user_group_update.py
  6. 40 0
      utils.py

+ 77 - 0
ad_user_video_predict.py

@@ -0,0 +1,77 @@
+import datetime
+import pandas as pd
+from odps import ODPS
+from utils import data_check, get_feature_data, send_msg_to_feishu
+from config import set_config
+from log import Log
+config_, _ = set_config()
+log_ = Log()
+
+
+def predict_user_group_share_rate(dt, app_type):
+    """预估用户组对应的有广告时分享率"""
+    # 获取用户组特征
+    project = config_.ad_model_data['users_share_rate'].get('project')
+    table = config_.ad_model_data['users_share_rate'].get('table')
+    features = [
+        'apptype',
+        'group',
+        'sharerate_all',
+        'sharerate_ad'
+    ]
+    user_group_df = get_feature_data(project=project, table=table, features=features, dt=dt)
+    user_group_df['apptype'] = user_group_df['apptype'].astype(int)
+    user_group_df = user_group_df[user_group_df['apptype'] == app_type]
+    user_group_df['sharerate_all'] = user_group_df['sharerate_all'].astype(float)
+    user_group_df['sharerate_ad'] = user_group_df['sharerate_ad'].astype(float)
+    # 获取有广告时所有用户组近30天的分享率
+    ad_all_group_share_rate = user_group_df[user_group_df['group'] == 'allmids']['sharerate_ad'].values[0]
+    user_group_df = user_group_df[user_group_df['group'] != 'allmids']
+    # 计算用户组有广告时分享率
+    user_group_df['group_ad_share_rate'] = \
+        user_group_df['sharerate_ad'] * float(ad_all_group_share_rate) / user_group_df['sharerate_all']
+    return user_group_df
+
+
+def predict_video_share_rate(dt, app_type):
+    """预估视频有广告时分享率"""
+    # 获取视频特征
+    project = config_.ad_model_data['videos_share_rate'].get('project')
+    table = config_.ad_model_data['videos_share_rate'].get('table')
+    features = [
+        'apptype',
+        'videoid',
+        'sharerate_all',
+        'sharerate_ad'
+    ]
+    video_df = get_feature_data(project=project, table=table, features=features, dt=dt)
+    video_df['apptype'] = video_df['apptype'].astype(int)
+    video_df = video_df[video_df['apptype'] == app_type]
+    video_df['sharerate_all'] = video_df['sharerate_all'].astype(float)
+    video_df['sharerate_ad'] = video_df['sharerate_ad'].astype(float)
+    # 获取有广告时所有视频近30天的分享率
+    ad_all_videos_share_rate = video_df[video_df['videoid'] == 'allvideos']['sharerate_ad'].values[0]
+    video_df = video_df[video_df['videoid'] != 'allvideos']
+    # 计算视频有广告时分享率
+    video_df['video_ad_share_rate'] = \
+        video_df['sharerate_ad'] * float(ad_all_videos_share_rate) / video_df['sharerate_all']
+    return video_df
+
+
+def predict_ad_group_video():
+    app_type = config_.APP_TYPE['VLOG']
+    now_date = datetime.datetime.today()
+    dt = datetime.datetime.strftime(now_date, '%Y%m%d')
+    user_group_df = predict_user_group_share_rate(dt=dt, app_type=app_type)
+    video_df = predict_video_share_rate(dt=dt, app_type=app_type)
+    print(f"user_group_df count = {len(user_group_df)}, \nvideo_df count = {len(video_df)}")
+    predict_df = video_df
+    for item in user_group_df:
+        predict_df[item['group']] = predict_df['videoid'] * item['group_ad_share_rate']
+    predict_df.to_csv('./data/ad_user_video_predict.csv')
+    return predict_df
+
+
+if __name__ == '__main__':
+    predict_df = predict_ad_group_video()
+

+ 58 - 100
ad_users_data_update.py

@@ -1,66 +1,27 @@
 import datetime
-import pandas as pd
-from odps import ODPS
-from utils import get_data_from_odps, RedisHelper, check_table_partition_exits
+import traceback
+from threading import Timer
+from utils import RedisHelper, data_check, get_feature_data, send_msg_to_feishu
 from config import set_config
 from log import Log
 config_, _ = set_config()
 log_ = Log()
+redis_helper = RedisHelper()
 
+features = [
+    'apptype',
+    'group',
+    'sharerate_all',
+    'sharerate_ad'
+]
 
-def h_data_check(project, table, now_date):
-    """检查数据是否准备好"""
-    odps = ODPS(
-        access_id=config_.ODPS_CONFIG['ACCESSID'],
-        secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
-        project=project,
-        endpoint=config_.ODPS_CONFIG['ENDPOINT'],
-        connect_timeout=3000,
-        read_timeout=500000,
-        pool_maxsize=1000,
-        pool_connections=1000
-    )
 
-    try:
-        dt = datetime.datetime.strftime(now_date, '%Y%m%d')
-        check_res = check_table_partition_exits(date=dt, project=project, table=table)
-        if check_res:
-            sql = f'select * from {project}.{table} where dt = {dt}'
-            with odps.execute_sql(sql=sql).open_reader() as reader:
-                data_count = reader.count
-        else:
-            data_count = 0
-    except Exception as e:
-        data_count = 0
-    return data_count
-
-
-def get_feature_data(project, table, features, now_date):
-    """获取特征数据"""
-    dt = datetime.datetime.strftime(now_date, '%Y%m%d')
-    records = get_data_from_odps(date=dt, project=project, table=table)
-    feature_data = []
-    for record in records:
-        item = {}
-        for feature_name in features:
-            item[feature_name] = record[feature_name]
-        feature_data.append(item)
-    feature_df = pd.DataFrame(feature_data)
-    return feature_df
-
-
-def predict_user_group_share_rate(now_date):
+def predict_user_group_share_rate(project, table, dt, app_type):
     """预估用户组对应的有广告时分享率"""
     # 获取用户组特征
-    project = config_.ad_model_data['users_share_rate'].get('project')
-    table = config_.ad_model_data['users_share_rate'].get('table')
-    features = [
-        'apptype',
-        'group',
-        'sharerate_all',
-        'sharerate_ad'
-    ]
-    user_group_df = get_feature_data(project=project, table=table, features=features, now_date=now_date)
+    user_group_df = get_feature_data(project=project, table=table, features=features, dt=dt)
+    user_group_df['apptype'] = user_group_df['apptype'].astype(int)
+    user_group_df = user_group_df[user_group_df['apptype'] == app_type]
     user_group_df['sharerate_all'] = user_group_df['sharerate_all'].astype(float)
     user_group_df['sharerate_ad'] = user_group_df['sharerate_ad'].astype(float)
     # 获取有广告时所有用户组近30天的分享率
@@ -70,57 +31,54 @@ def predict_user_group_share_rate(now_date):
     user_group_df['group_ad_share_rate'] = \
         user_group_df['sharerate_ad'] * float(ad_all_group_share_rate) / user_group_df['sharerate_all']
     # 结果写入redis
-    # key_name = f"{config_.KEY_NAME_PREFIX_AD_GROUP}{datetime.datetime.strftime(now_date, '%Y%m%d')}"
-    # redis_data = {}
-    # for item in user_group_df:
-    #     redis_data[item['group']] = item['group_ad_share_rate']
-    # if len(redis_data) > 0:
-    #     redis_helper = RedisHelper()
-    #     redis_helper.add_data_with_zset(key_name=key_name, data=redis_data, expire_time=2 * 24 * 3600)
+    key_name = f"{config_.KEY_NAME_PREFIX_AD_GROUP}{dt}"
+    redis_data = {}
+    for item in user_group_df:
+        redis_data[item['group']] = item['group_ad_share_rate']
+    if len(redis_data) > 0:
+        redis_helper = RedisHelper()
+        redis_helper.add_data_with_zset(key_name=key_name, data=redis_data, expire_time=2 * 24 * 3600)
     return user_group_df
 
 
-def predict_video_share_rate(now_date):
-    """预估视频有广告时分享率"""
-    # 获取视频特征
-    project = config_.ad_model_data['videos_share_rate'].get('project')
-    table = config_.ad_model_data['videos_share_rate'].get('table')
-    features = [
-        'apptype',
-        'videoid',
-        'sharerate_all',
-        'sharerate_ad'
-    ]
-    video_df = get_feature_data(project=project, table=table, features=features, now_date=now_date)
-    video_df['sharerate_all'] = video_df['sharerate_all'].astype(float)
-    video_df['sharerate_ad'] = video_df['sharerate_ad'].astype(float)
-    # 获取有广告时所有视频近30天的分享率
-    ad_all_videos_share_rate = video_df[video_df['videoid'] == 'allvideos']['sharerate_ad']
-    video_df = video_df[video_df['videoid'] != 'allvideos']
-    # 计算视频有广告时分享率
-    video_df['video_ad_share_rate'] = \
-        video_df['sharerate_ad'] * float(ad_all_videos_share_rate) / video_df['sharerate_all']
-    # 结果写入redis
-    # key_name = f"{config_.KEY_NAME_PREFIX_AD_VIDEO}{datetime.datetime.strftime(now_date, '%Y%m%d')}"
-    # redis_data = {}
-    # for item in video_df:
-    #     redis_data[item['videoid']] = item['video_ad_share_rate']
-    # if len(redis_data) > 0:
-    #     redis_helper = RedisHelper()
-    #     redis_helper.add_data_with_zset(key_name=key_name, data=redis_data, expire_time=2 * 24 * 3600)
-    return video_df
-
+def timer_check():
+    try:
+        app_type = config_.APP_TYPE['VLOG']
+        project = config_.ad_model_data['users_share_rate'].get('project')
+        table = config_.ad_model_data['users_share_rate'].get('table')
+        now_date = datetime.datetime.today()
+        dt = datetime.datetime.strftime(now_date, '%Y%m%d')
+        log_.info(f"now_date: {dt}")
+        now_min = datetime.datetime.now().minute
+        # 查看当前更新的数据是否已准备好
+        data_count = data_check(project=project, table=table, dt=dt)
+        if data_count > 0:
+            log_.info(f"ad user group data count = {data_count}")
+            # 数据准备好,进行更新
+            predict_user_group_share_rate(project=project, table=table, dt=dt, app_type=app_type)
+            log_.info(f"ad user group data update end!")
+        elif now_min > 45:
+            log_.info('ad user group data is None!')
+            send_msg_to_feishu(
+                webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'),
+                key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'),
+                msg_text=f"rov-offline{config_.ENV_TEXT} - 用户组分享率数据未准备好!\n"
+                         f"traceback: {traceback.format_exc()}"
+            )
+        else:
+            # 数据没准备好,1分钟后重新检查
+            Timer(60, timer_check).start()
 
-def predict_ad_group_video(now_date):
-    user_group_df = predict_user_group_share_rate(now_date)
-    video_df = predict_video_share_rate(now_date)
-    predict_df = video_df
-    for item in user_group_df:
-        predict_df[item['group']] = predict_df['videoid'] * item['group_ad_share_rate']
-    return predict_df
+    except Exception as e:
+        log_.error(f"用户组分享率预测数据更新失败, exception: {e}, traceback: {traceback.format_exc()}")
+        send_msg_to_feishu(
+            webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'),
+            key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'),
+            msg_text=f"rov-offline{config_.ENV_TEXT} - 用户组分享率预测数据更新失败\n"
+                     f"exception: {e}\n"
+                     f"traceback: {traceback.format_exc()}"
+        )
 
 
 if __name__ == '__main__':
-    now_date = datetime.datetime.today()
-    predict_df = predict_ad_group_video(now_date)
-
+    timer_check()

+ 84 - 0
ad_video_data_upate.py

@@ -0,0 +1,84 @@
+import datetime
+import traceback
+from threading import Timer
+from utils import RedisHelper, data_check, get_feature_data, send_msg_to_feishu
+from config import set_config
+from log import Log
+config_, _ = set_config()
+log_ = Log()
+redis_helper = RedisHelper()
+
+features = [
+    'apptype',
+    'videoid',
+    'sharerate_all',
+    'sharerate_ad'
+]
+
+
+def predict_video_share_rate(project, table, dt, app_type):
+    """预估视频有广告时分享率"""
+    # 获取视频特征
+    video_df = get_feature_data(project=project, table=table, features=features, dt=dt)
+    video_df['apptype'] = video_df['apptype'].astype(int)
+    video_df = video_df[video_df['apptype'] == app_type]
+    video_df['sharerate_all'] = video_df['sharerate_all'].astype(float)
+    video_df['sharerate_ad'] = video_df['sharerate_ad'].astype(float)
+    # 获取有广告时所有视频近30天的分享率
+    ad_all_videos_share_rate = video_df[video_df['videoid'] == 'allvideos']['sharerate_ad']
+    video_df = video_df[video_df['videoid'] != 'allvideos']
+    # 计算视频有广告时分享率
+    video_df['video_ad_share_rate'] = \
+        video_df['sharerate_ad'] * float(ad_all_videos_share_rate) / video_df['sharerate_all']
+    # 结果写入redis
+    key_name = f"{config_.KEY_NAME_PREFIX_AD_VIDEO}{dt}"
+    redis_data = {}
+    for item in video_df:
+        redis_data[item['videoid']] = item['video_ad_share_rate']
+    if len(redis_data) > 0:
+        redis_helper = RedisHelper()
+        redis_helper.add_data_with_zset(key_name=key_name, data=redis_data, expire_time=2 * 24 * 3600)
+    return video_df
+
+
+def timer_check():
+    try:
+        app_type = config_.APP_TYPE['VLOG']
+        project = config_.ad_model_data['videos_share_rate'].get('project')
+        table = config_.ad_model_data['videos_share_rate'].get('table')
+        now_date = datetime.datetime.today()
+        dt = datetime.datetime.strftime(now_date, '%Y%m%d')
+        log_.info(f"now_date: {dt}")
+        now_min = datetime.datetime.now().minute
+        # 查看当前更新的数据是否已准备好
+        data_count = data_check(project=project, table=table, dt=dt)
+        if data_count > 0:
+            log_.info(f"ad video data count = {data_count}")
+            # 数据准备好,进行更新
+            predict_video_share_rate(project=project, table=table, dt=dt, app_type=app_type)
+            log_.info(f"ad video data update end!")
+        elif now_min > 45:
+            log_.info('ad video data is None!')
+            send_msg_to_feishu(
+                webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'),
+                key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'),
+                msg_text=f"rov-offline{config_.ENV_TEXT} - 视频分享率数据未准备好!\n"
+                         f"traceback: {traceback.format_exc()}"
+            )
+        else:
+            # 数据没准备好,1分钟后重新检查
+            Timer(60, timer_check).start()
+
+    except Exception as e:
+        log_.error(f"视频分享率预测数据更新失败, exception: {e}, traceback: {traceback.format_exc()}")
+        send_msg_to_feishu(
+            webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'),
+            key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'),
+            msg_text=f"rov-offline{config_.ENV_TEXT} - 视频分享率预测数据更新失败\n"
+                     f"exception: {e}\n"
+                     f"traceback: {traceback.format_exc()}"
+        )
+
+
+if __name__ == '__main__':
+    timer_check()

+ 2 - 0
config.py

@@ -600,6 +600,8 @@ class BaseConfig(object):
     KEY_NAME_PREFIX_AD_GROUP = 'ad:users:group:predict:share:rate:'
     # 视频有广告时的分享率预测结果存放 redis key 前缀,完整格式:ad:video:predict:share:rate:{date}
     KEY_NAME_PREFIX_AD_VIDEO = 'ad:video:predict:share:rate:'
+    # 用户分组结果存放 redis key 前缀,完整格式:mid:group:{mid}
+    KEY_NAME_PREFIX_MID_GROUP = 'mid:group:'
 
 
 class DevelopmentConfig(BaseConfig):

+ 78 - 0
user_group_update.py

@@ -0,0 +1,78 @@
+import datetime
+import traceback
+from threading import Timer
+from utils import RedisHelper, data_check, get_feature_data, send_msg_to_feishu
+from config import set_config
+from log import Log
+config_, _ = set_config()
+log_ = Log()
+redis_helper = RedisHelper()
+
+mid_group_list = [
+    'apptype',
+    'return1mids',
+    'return2_3mids',
+    'return4_8mids',
+    'return9_24mids',
+    'return25_nmids',
+    'return0share1mids',
+    'return0share2_nmids'
+]
+
+
+def update_user_group_to_redis(project, table, dt, app_type):
+    """更新mid对应分组到redis中"""
+    # 获取用户分组数据
+    feature_df = get_feature_data(project=project, table=table, features=mid_group_list, dt=dt)
+    feature_df['apptype'] = feature_df['apptype'].astype(int)
+    feature_df = feature_df[feature_df['apptype'] == app_type]
+    for group in mid_group_list:
+        log_.info(f"group = {group} update redis start ...")
+        mid_list = feature_df[group].tolist()
+        for mid in mid_list:
+            key_name = f"{config_.KEY_NAME_PREFIX_MID_GROUP}{mid}"
+            redis_helper.set_data_to_redis(key_name=key_name, value=group, expire_time=25 * 3600)
+        log_.info(f"group = {group}, mid count = {len(mid_list)}, update redis finished!")
+
+
+def timer_check():
+    try:
+        app_type = config_.APP_TYPE['VLOG']
+        project = config_.ad_model_data['user_group'].get('project')
+        table = config_.ad_model_data['user_group'].get('table')
+        now_date = datetime.datetime.today()
+        dt = datetime.datetime.strftime(now_date, '%Y%m%d')
+        log_.info(f"now_date: {dt}")
+        now_min = datetime.datetime.now().minute
+        # 查看当前更新的数据是否已准备好
+        data_count = data_check(project=project, table=table, dt=dt)
+        if data_count > 0:
+            log_.info(f"user group data count = {data_count}")
+            # 数据准备好,进行更新
+            update_user_group_to_redis(project=project, table=table, dt=dt, app_type=app_type)
+            log_.info(f"user group data update end!")
+        elif now_min > 45:
+            log_.info('user group data is None!')
+            send_msg_to_feishu(
+                webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'),
+                key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'),
+                msg_text=f"rov-offline{config_.ENV_TEXT} - 用户分组数据未准备好!\n"
+                         f"traceback: {traceback.format_exc()}"
+            )
+        else:
+            # 数据没准备好,1分钟后重新检查
+            Timer(60, timer_check).start()
+
+    except Exception as e:
+        log_.error(f"用户分组数据更新失败, exception: {e}, traceback: {traceback.format_exc()}")
+        send_msg_to_feishu(
+            webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'),
+            key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'),
+            msg_text=f"rov-offline{config_.ENV_TEXT} - 用户分组数据更新失败\n"
+                     f"exception: {e}\n"
+                     f"traceback: {traceback.format_exc()}"
+        )
+
+
+if __name__ == '__main__':
+    timer_check()

+ 40 - 0
utils.py

@@ -4,6 +4,7 @@ import os
 import requests
 import json
 import traceback
+import pandas as pd
 
 from odps import ODPS
 from config import set_config
@@ -426,6 +427,45 @@ def update_video_w_h_rate(video_ids, key_name):
         redis_helper.add_data_with_zset(key_name=key_name, data=info_data)
 
 
+def data_check(project, table, dt):
+    """检查数据是否准备好"""
+    odps = ODPS(
+        access_id=config_.ODPS_CONFIG['ACCESSID'],
+        secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
+        project=project,
+        endpoint=config_.ODPS_CONFIG['ENDPOINT'],
+        connect_timeout=3000,
+        read_timeout=500000,
+        pool_maxsize=1000,
+        pool_connections=1000
+    )
+
+    try:
+        check_res = check_table_partition_exits(date=dt, project=project, table=table)
+        if check_res:
+            sql = f'select * from {project}.{table} where dt = {dt}'
+            with odps.execute_sql(sql=sql).open_reader() as reader:
+                data_count = reader.count
+        else:
+            data_count = 0
+    except Exception as e:
+        data_count = 0
+    return data_count
+
+
+def get_feature_data(project, table, features, dt):
+    """获取特征数据"""
+    records = get_data_from_odps(date=dt, project=project, table=table)
+    feature_data = []
+    for record in records:
+        item = {}
+        for feature_name in features:
+            item[feature_name] = record[feature_name]
+        feature_data.append(item)
+    feature_df = pd.DataFrame(feature_data)
+    return feature_df
+
+
 if __name__ == '__main__':
     # data_test = [9.20273281e+03, 7.00795065e+03, 5.54813112e+03, 9.97402494e-01, 9.96402495e-01, 9.96402494e-01]
     # data_normalization(data_test)