浏览代码

add abnormal videos process

liqian 2 年之前
父节点
当前提交
0e2acc9fa8
共有 2 个文件被更改,包括 64 次插入7 次删除
  1. 59 7
      ad_video_data_update.py
  2. 5 0
      config.py

+ 59 - 7
ad_video_data_update.py

@@ -16,7 +16,46 @@ features = [
 ]
 
 
-def predict_video_share_rate(video_initial_df, dt, data_key, data_param):
+def get_top10_abnormal_videos_return(dt, filter_param):
+    """获取昨日各端top10中的异常视频(裂变视频)"""
+    abnormal_video_project = config_.ad_model_data['top10_videos'].get('project')
+    abnormal_video_table = config_.ad_model_data['top10_videos'].get('table')
+    abnormal_video_features = [
+        'apptype', 'videoid', 'yesterday_return', 'rank', 'multiple'
+    ]
+    data_count = data_check(project=abnormal_video_project, table=abnormal_video_table, dt=dt)
+    top10_abnormal_videos = {}
+    if data_count > 0:
+        abnormal_video_df = get_feature_data(project=abnormal_video_project, table=abnormal_video_table,
+                                             features=abnormal_video_features, dt=dt)
+        abnormal_video_df['multiple'].fillna(0, inplace=True)
+        abnormal_video_df['apptype'] = abnormal_video_df['apptype'].astype(int)
+        abnormal_video_df['videoid'] = abnormal_video_df['videoid'].astype(int)
+        abnormal_video_df['yesterday_return'] = abnormal_video_df['yesterday_return'].astype(int)
+        abnormal_video_df['rank'] = abnormal_video_df['rank'].astype(int)
+        abnormal_video_df['multiple'] = abnormal_video_df['multiple'].astype(float)
+        app_type_list = list(set(abnormal_video_df['apptype'].tolist()))
+        for app_type in app_type_list:
+            app_type_df = abnormal_video_df[abnormal_video_df['apptype'] == app_type]
+            app_type_df = app_type_df.sort_values(by=['rank'], ascending=True)
+            # print(app_type_df)
+            temp_video_id_list = []
+            for index, item in app_type_df.iterrows():
+                # print(item['rank'], item['videoid'], item['multiple'])
+                if item['multiple'] > filter_param:
+                    # print(item['videoid'], item['multiple'])
+                    abnormal_video_id_list = temp_video_id_list.copy()
+                    abnormal_video_id_list.append(int(item['videoid']))
+                    top10_abnormal_videos[app_type] = abnormal_video_id_list
+                    temp_video_id_list.append(int(item['videoid']))
+                else:
+                    temp_video_id_list.append(int(item['videoid']))
+            # print(top10_abnormal_videos)
+    log_.info(f"top10_abnormal_videos = {top10_abnormal_videos}")
+    return top10_abnormal_videos
+
+
+def predict_video_share_rate(video_initial_df, dt, data_key, data_param, top10_abnormal_videos):
     """预估视频有广告时分享率"""
     # 获取对应的视频特征
     video_df = video_initial_df.copy()
@@ -29,6 +68,13 @@ def predict_video_share_rate(video_initial_df, dt, data_key, data_param):
     # 获取有广告时所有视频近30天的分享率
     ad_all_videos_share_rate = video_df[video_df['videoid'] == 'allvideos']['sharerate_ad'].values[0]
     video_df = video_df[video_df['videoid'] != 'allvideos']
+    # 剔除异常视频数据
+    video_df['videoid'] = video_df['videoid'].astype(int)
+    top10_abnormal_video_ids = top10_abnormal_videos.get(int(data_param), None)
+    # print(int(data_param), len(video_df), top10_abnormal_video_ids)
+    if top10_abnormal_video_ids is not None:
+        video_df = video_df[~video_df['videoid'].isin(top10_abnormal_video_ids)]
+        # print(len(video_df))
     # 计算视频有广告时分享率
     video_df['video_ad_share_rate'] = \
         video_df['sharerate_ad'] * float(ad_all_videos_share_rate) / video_df['sharerate_all']
@@ -47,17 +93,18 @@ def predict_video_share_rate(video_initial_df, dt, data_key, data_param):
     return video_df
 
 
-def update_videos_data(project, table, dt, update_params):
+def update_videos_data(project, table, dt, update_params, top10_abnormal_videos):
     """预估视频有广告时分享率"""
     # 获取视频特征
     video_initial_df = get_feature_data(project=project, table=table, features=features, dt=dt)
     for data_key, data_param in update_params.items():
         log_.info(f"data_key = {data_key} update start...")
-        predict_video_share_rate(video_initial_df=video_initial_df, dt=dt, data_key=data_key, data_param=data_param)
+        predict_video_share_rate(video_initial_df=video_initial_df, dt=dt, data_key=data_key, data_param=data_param,
+                                 top10_abnormal_videos=top10_abnormal_videos)
         log_.info(f"data_key = {data_key} update end!")
 
 
-def timer_check(dt, video_key, video_params):
+def timer_check(dt, video_key, video_params, top10_abnormal_videos):
     log_.info(f"video_key = {video_key}")
     project = config_.ad_model_data[video_key].get('project')
     table = config_.ad_model_data[video_key].get('table')
@@ -66,7 +113,8 @@ def timer_check(dt, video_key, video_params):
     if data_count > 0:
         log_.info(f"ad video data count = {data_count}")
         # 数据准备好,进行更新
-        update_videos_data(project=project, table=table, dt=dt, update_params=video_params)
+        update_videos_data(project=project, table=table, dt=dt, update_params=video_params,
+                           top10_abnormal_videos=top10_abnormal_videos)
         log_.info(f"ad video data update end!")
         send_msg_to_feishu(
             webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'),
@@ -78,7 +126,7 @@ def timer_check(dt, video_key, video_params):
         )
     else:
         # 数据没准备好,1分钟后重新检查
-        Timer(60, timer_check, args=[dt, video_key, video_params]).start()
+        Timer(60, timer_check, args=[dt, video_key, video_params, top10_abnormal_videos]).start()
 
 
 def main():
@@ -86,9 +134,13 @@ def main():
         now_date = datetime.datetime.today()
         dt = datetime.datetime.strftime(now_date, '%Y%m%d')
         log_.info(f"now_date: {dt}")
+        # 获取昨天top10中的异常视频(裂变视频)
+        top10_abnormal_videos = get_top10_abnormal_videos_return(
+            dt=dt, filter_param=config_.ad_model_data['top10_videos'].get('abnormal_filter_param')
+        )
         update_params = config_.AD_VIDEO_DATA_PARAMS
         for video_key, video_params in update_params.items():
-            timer_check(dt, video_key, video_params)
+            timer_check(dt, video_key, video_params, top10_abnormal_videos)
 
     except Exception as e:
         log_.error(f"视频分享率预测数据更新失败, exception: {e}, traceback: {traceback.format_exc()}")

+ 5 - 0
config.py

@@ -672,6 +672,11 @@ class BaseConfig(object):
             'project': 'loghubods',
             'table': 'video_sharerate_admodel_7days'
         },
+        'top10_videos': {
+            'project': 'loghubods',
+            'table': 'abnormal_videoid_admodel',
+            'abnormal_filter_param': 1.5
+        }
     }
 
     # 自动调整广告模型阈值数据