ad_user_video_predict.py 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. import datetime
  2. import sys
  3. import numpy as np
  4. import pandas as pd
  5. from odps import ODPS
  6. from utils import data_check, get_feature_data, send_msg_to_feishu, RedisHelper
  7. from config import set_config
  8. from log import Log
  9. config_, _ = set_config()
  10. log_ = Log()
  11. redis_helper = RedisHelper()
  12. def predict_user_group_share_rate(dt, app_type):
  13. """预估用户组对应的有广告时分享率"""
  14. # 获取用户组特征
  15. project = config_.ad_model_data['users_share_rate'].get('project')
  16. table = config_.ad_model_data['users_share_rate'].get('table')
  17. features = [
  18. 'apptype',
  19. 'group',
  20. 'sharerate_all',
  21. 'sharerate_ad'
  22. ]
  23. user_group_df = get_feature_data(project=project, table=table, features=features, dt=dt)
  24. user_group_df['apptype'] = user_group_df['apptype'].astype(int)
  25. user_group_df = user_group_df[user_group_df['apptype'] == app_type]
  26. user_group_df['sharerate_all'] = user_group_df['sharerate_all'].astype(float)
  27. user_group_df['sharerate_ad'] = user_group_df['sharerate_ad'].astype(float)
  28. # 获取有广告时所有用户组近30天的分享率
  29. ad_all_group_share_rate = user_group_df[user_group_df['group'] == 'allmids']['sharerate_ad'].values[0]
  30. user_group_df = user_group_df[user_group_df['group'] != 'allmids']
  31. # 计算用户组有广告时分享率
  32. user_group_df['group_ad_share_rate'] = \
  33. user_group_df['sharerate_ad'] * float(ad_all_group_share_rate) / user_group_df['sharerate_all']
  34. return user_group_df
  35. def predict_video_share_rate(dt, app_type):
  36. """预估视频有广告时分享率"""
  37. # 获取视频特征
  38. project = config_.ad_model_data['videos_share_rate'].get('project')
  39. table = config_.ad_model_data['videos_share_rate'].get('table')
  40. features = [
  41. 'apptype',
  42. 'videoid',
  43. 'sharerate_all',
  44. 'sharerate_ad'
  45. ]
  46. video_df = get_feature_data(project=project, table=table, features=features, dt=dt)
  47. video_df['apptype'] = video_df['apptype'].astype(int)
  48. video_df = video_df[video_df['apptype'] == app_type]
  49. video_df['sharerate_all'] = video_df['sharerate_all'].astype(float)
  50. video_df['sharerate_ad'] = video_df['sharerate_ad'].astype(float)
  51. # 获取有广告时所有视频近30天的分享率
  52. ad_all_videos_share_rate = video_df[video_df['videoid'] == 'allvideos']['sharerate_ad'].values[0]
  53. video_df = video_df[video_df['videoid'] != 'allvideos']
  54. # 计算视频有广告时分享率
  55. video_df['video_ad_share_rate'] = \
  56. video_df['sharerate_ad'] * float(ad_all_videos_share_rate) / video_df['sharerate_all']
  57. return video_df
  58. def predict_ad_group_video(dt, config_key, config_param, threshold_record):
  59. log_.info(f"config_key = {config_key} update start ...")
  60. # 获取用户组预测值
  61. user_data_key = config_param['user'].get('data')
  62. user_rule_key = config_param['user'].get('rule')
  63. group_key_name = f"{config_.KEY_NAME_PREFIX_AD_GROUP}{user_data_key}:{user_rule_key}:{dt}"
  64. group_data = redis_helper.get_all_data_from_zset(key_name=group_key_name, with_scores=True)
  65. if group_data is None:
  66. log_.info(f"group data is None!")
  67. group_df = pd.DataFrame(data=group_data, columns=['group', 'group_ad_share_rate'])
  68. group_df = group_df[group_df['group'] != 'mean_group']
  69. log_.info(f"group_df count = {len(group_df)}")
  70. # 获取视频预测值
  71. video_data_key = config_param['video'].get('data')
  72. video_key_name = f"{config_.KEY_NAME_PREFIX_AD_VIDEO}{video_data_key}:{dt}"
  73. video_data = redis_helper.get_all_data_from_zset(key_name=video_key_name, with_scores=True)
  74. if video_data is None:
  75. log_.info(f"video data is None!")
  76. video_df = pd.DataFrame(data=video_data, columns=['videoid', 'video_ad_share_rate'])
  77. video_df = video_df[video_df['videoid'] != -1]
  78. log_.info(f"video_df count = {len(video_df)}")
  79. if len(group_df) == 0 or len(video_df) == 0:
  80. sys.exit(1)
  81. predict_df = video_df
  82. all_group_data = []
  83. for index, item in group_df.iterrows():
  84. predict_df[item['group']] = predict_df['video_ad_share_rate'] * item['group_ad_share_rate']
  85. all_group_data.extend(predict_df[item['group']].tolist())
  86. # 计算对应的阈值
  87. # ad_threshold_mappings = config_.AD_ABTEST_THRESHOLD_CONFIG.get(config_key.split('-')[0])
  88. ad_threshold_mappings = threshold_record.get(config_key.split('-')[0])
  89. for abtest_group, ad_threshold_mapping in ad_threshold_mappings.items():
  90. threshold_data = {}
  91. for _, item in group_df.iterrows():
  92. # 获取分组对应的均值作为阈值
  93. threshold_data[item['group']] = predict_df[item['group']].mean() * ad_threshold_mapping['group']
  94. threshold_data['mean_group'] = np.mean(all_group_data) * ad_threshold_mapping['mean_group']
  95. # 获取需要多出广告的用户组,及阈值比例
  96. more_ad = config_param.get('more_ad', None)
  97. if more_ad is not None:
  98. for group_key, group_threshold_rate in more_ad.items():
  99. threshold_data[group_key] = threshold_data[group_key] * group_threshold_rate
  100. log_.info(f"config_key = {config_key}, abtest_group = {abtest_group}, threshold_data = {threshold_data}")
  101. # 将阈值写入redis
  102. abtest_config_list = config_key.split('-')
  103. abtest_id, abtest_config_tag = abtest_config_list[0], abtest_config_list[1]
  104. for key, val in threshold_data.items():
  105. key_name = f"{config_.KEY_NAME_PREFIX_AD_THRESHOLD}{abtest_id}:{abtest_config_tag}:{abtest_group}:{key}"
  106. redis_helper.set_data_to_redis(key_name=key_name, value=val, expire_time=2 * 24 * 3600)
  107. # 计算关怀模式实验阈值 并 写入Redis
  108. care_model = config_param.get('care_model', None)
  109. threshold_rate = config_param.get('threshold_rate', None)
  110. if care_model is True:
  111. care_model_threshold_data = {}
  112. for key, val in threshold_data.items():
  113. up_val = val * threshold_rate
  114. care_model_threshold_data[key] = up_val
  115. up_key_name = \
  116. f"{config_.KEY_NAME_PREFIX_AD_THRESHOLD_CARE_MODEL}{abtest_id}:{abtest_config_tag}:{abtest_group}:{key}"
  117. redis_helper.set_data_to_redis(key_name=up_key_name, value=up_val, expire_time=2 * 24 * 3600)
  118. log_.info(f"config_key = {config_key}, abtest_group = {abtest_group}, "
  119. f"care_model_threshold_data = {care_model_threshold_data}")
  120. # predict_df.to_csv(f'./data/ad_user_video_predict_{config_key}.csv')
  121. log_.info(f"config_key = {config_key} update end!")
  122. def predict():
  123. now_date = datetime.datetime.today()
  124. dt = datetime.datetime.strftime(now_date, '%Y%m%d')
  125. log_.info(f"dt = {dt}")
  126. # 获取阈值参数记录
  127. threshold_record = redis_helper.get_data_from_redis(key_name=config_.KEY_NAME_PREFIX_AD_THRESHOLD_RECORD)
  128. threshold_record = eval(threshold_record)
  129. log_.info(f"threshold_record = {threshold_record}")
  130. params = config_.AD_ABTEST_CONFIG
  131. for config_key, config_param in params.items():
  132. predict_ad_group_video(dt=dt,
  133. config_key=config_key,
  134. config_param=config_param,
  135. threshold_record=threshold_record)
  136. # 阈值参数记录
  137. # redis_helper.set_data_to_redis(key_name=config_.KEY_NAME_PREFIX_AD_THRESHOLD_RECORD,
  138. # value=str(config_.AD_ABTEST_THRESHOLD_CONFIG),
  139. # expire_time=24*3600)
  140. redis_helper.set_data_to_redis(key_name=config_.KEY_NAME_PREFIX_AD_THRESHOLD_RECORD,
  141. value=str(threshold_record),
  142. expire_time=2 * 24 * 3600)
  143. if __name__ == '__main__':
  144. # predict_ad_group_video()
  145. predict()