ad_user_video_predict.py 22 KB


  1. import datetime
  2. import sys
  3. import traceback
  4. import numpy as np
  5. import pandas as pd
  6. from odps import ODPS
  7. from utils import data_check, get_feature_data, send_msg_to_feishu_new, RedisHelper
  8. from config import set_config
  9. from log import Log
  10. config_, _ = set_config()
  11. log_ = Log()
  12. redis_helper = RedisHelper()
  13. def predict_user_group_share_rate(dt, app_type):
  14. """预估用户组对应的有广告时分享率"""
  15. # 获取用户组特征
  16. project = config_.ad_model_data['users_share_rate'].get('project')
  17. table = config_.ad_model_data['users_share_rate'].get('table')
  18. features = [
  19. 'apptype',
  20. 'group',
  21. 'sharerate_all',
  22. 'sharerate_ad'
  23. ]
  24. user_group_df = get_feature_data(project=project, table=table, features=features, dt=dt)
  25. user_group_df['apptype'] = user_group_df['apptype'].astype(int)
  26. user_group_df = user_group_df[user_group_df['apptype'] == app_type]
  27. user_group_df['sharerate_all'] = user_group_df['sharerate_all'].astype(float)
  28. user_group_df['sharerate_ad'] = user_group_df['sharerate_ad'].astype(float)
  29. # 获取有广告时所有用户组近30天的分享率
  30. ad_all_group_share_rate = user_group_df[user_group_df['group'] == 'allmids']['sharerate_ad'].values[0]
  31. user_group_df = user_group_df[user_group_df['group'] != 'allmids']
  32. # 计算用户组有广告时分享率
  33. user_group_df['group_ad_share_rate'] = \
  34. user_group_df['sharerate_ad'] * float(ad_all_group_share_rate) / user_group_df['sharerate_all']
  35. return user_group_df
  36. def predict_video_share_rate(dt, app_type):
  37. """预估视频有广告时分享率"""
  38. # 获取视频特征
  39. project = config_.ad_model_data['videos_share_rate'].get('project')
  40. table = config_.ad_model_data['videos_share_rate'].get('table')
  41. features = [
  42. 'apptype',
  43. 'videoid',
  44. 'sharerate_all',
  45. 'sharerate_ad'
  46. ]
  47. video_df = get_feature_data(project=project, table=table, features=features, dt=dt)
  48. video_df['apptype'] = video_df['apptype'].astype(int)
  49. video_df = video_df[video_df['apptype'] == app_type]
  50. video_df['sharerate_all'] = video_df['sharerate_all'].astype(float)
  51. video_df['sharerate_ad'] = video_df['sharerate_ad'].astype(float)
  52. # 获取有广告时所有视频近30天的分享率
  53. ad_all_videos_share_rate = video_df[video_df['videoid'] == 'allvideos']['sharerate_ad'].values[0]
  54. video_df = video_df[video_df['videoid'] != 'allvideos']
  55. # 计算视频有广告时分享率
  56. video_df['video_ad_share_rate'] = \
  57. video_df['sharerate_ad'] * float(ad_all_videos_share_rate) / video_df['sharerate_all']
  58. return video_df
  59. def predict_ad_group_video(dt, config_key, config_param, threshold_record):
  60. log_.info(f"config_key = {config_key} update start ...")
  61. # 获取用户组预测值
  62. user_data_key = config_param['user'].get('data')
  63. user_rule_key = config_param['user'].get('rule')
  64. group_key_name = f"{config_.KEY_NAME_PREFIX_AD_GROUP}{user_data_key}:{user_rule_key}:{dt}"
  65. group_data = redis_helper.get_all_data_from_zset(key_name=group_key_name, with_scores=True)
  66. if group_data is None:
  67. log_.info(f"group data is None!")
  68. group_df = pd.DataFrame(data=group_data, columns=['group', 'group_ad_share_rate'])
  69. group_df = group_df[group_df['group'] != 'mean_group']
  70. log_.info(f"group_df count = {len(group_df)}")
  71. # 获取视频预测值
  72. video_data_key = config_param['video'].get('data')
  73. video_key_name = f"{config_.KEY_NAME_PREFIX_AD_VIDEO}{video_data_key}:{dt}"
  74. video_data = redis_helper.get_all_data_from_zset(key_name=video_key_name, with_scores=True)
  75. if video_data is None:
  76. log_.info(f"video data is None!")
  77. video_df = pd.DataFrame(data=video_data, columns=['videoid', 'video_ad_share_rate'])
  78. video_df = video_df[video_df['videoid'] != -1]
  79. log_.info(f"video_df count = {len(video_df)}")
  80. if len(group_df) == 0 or len(video_df) == 0:
  81. sys.exit(1)
  82. predict_df = video_df
  83. all_group_data = []
  84. for index, item in group_df.iterrows():
  85. predict_df[item['group']] = predict_df['video_ad_share_rate'] * item['group_ad_share_rate']
  86. all_group_data.extend(predict_df[item['group']].tolist())
  87. # 计算对应的阈值
  88. # ad_threshold_mappings = config_.AD_ABTEST_THRESHOLD_CONFIG.get(config_key.split('-')[0])
  89. ad_threshold_mappings = threshold_record.get(config_key.split('-')[0])
  90. for abtest_group, ad_threshold_mapping in ad_threshold_mappings.items():
  91. threshold_data = {}
  92. for _, item in group_df.iterrows():
  93. # 获取分组对应的均值作为阈值
  94. threshold_data[item['group']] = predict_df[item['group']].mean() * ad_threshold_mapping['group']
  95. threshold_data['mean_group'] = np.mean(all_group_data) * ad_threshold_mapping['mean_group']
  96. # 获取需要多出广告的用户组,及阈值比例
  97. more_ad = config_param.get('more_ad', None)
  98. if more_ad is not None:
  99. for group_key, group_threshold_rate in more_ad.items():
  100. threshold_data[group_key] = threshold_data[group_key] * group_threshold_rate
  101. log_.info(f"config_key = {config_key}, abtest_group = {abtest_group}, threshold_data = {threshold_data}")
  102. # 将阈值写入redis
  103. abtest_config_list = config_key.split('-')
  104. abtest_id, abtest_config_tag = abtest_config_list[0], abtest_config_list[1]
  105. for key, val in threshold_data.items():
  106. key_name = f"{config_.KEY_NAME_PREFIX_AD_THRESHOLD}{abtest_id}:{abtest_config_tag}:{abtest_group}:{key}"
  107. redis_helper.set_data_to_redis(key_name=key_name, value=val, expire_time=2 * 24 * 3600)
  108. # 计算关怀模式实验阈值 并 写入Redis
  109. care_model = config_param.get('care_model', None)
  110. threshold_rate = config_param.get('threshold_rate', None)
  111. if care_model is True:
  112. care_model_threshold_data = {}
  113. for key, val in threshold_data.items():
  114. up_val = val * threshold_rate
  115. care_model_threshold_data[key] = up_val
  116. up_key_name = \
  117. f"{config_.KEY_NAME_PREFIX_AD_THRESHOLD_CARE_MODEL}{abtest_id}:{abtest_config_tag}:{abtest_group}:{key}"
  118. redis_helper.set_data_to_redis(key_name=up_key_name, value=up_val, expire_time=2 * 24 * 3600)
  119. log_.info(f"config_key = {config_key}, abtest_group = {abtest_group}, "
  120. f"care_model_threshold_data = {care_model_threshold_data}")
  121. # predict_df.to_csv(f'./data/ad_user_video_predict_{config_key}.csv')
  122. log_.info(f"config_key = {config_key} update end!")
  123. def predict_ad_group_video_mix_with_add(dt, config_key, config_param, threshold_record):
  124. log_.info(f"config_key = {config_key} update start ...")
  125. # ###### 获取以分享为目标的数据
  126. # 获取用户组预测值(出广告后分享的概率)
  127. share_user_data_key = config_param['share']['user'].get('data')
  128. share_user_rule_key = config_param['share']['user'].get('rule')
  129. share_group_key_name = f"{config_.KEY_NAME_PREFIX_AD_GROUP}{share_user_data_key}:{share_user_rule_key}:{dt}"
  130. share_group_data = redis_helper.get_all_data_from_zset(key_name=share_group_key_name, with_scores=True)
  131. if share_group_data is None:
  132. log_.info(f"share group data is None!")
  133. share_group_df = pd.DataFrame(data=share_group_data, columns=['group', 'group_ad_share_rate'])
  134. share_group_df = share_group_df[share_group_df['group'] != 'mean_group']
  135. log_.info(f"share_group_df count = {len(share_group_df)}")
  136. # 获取视频预测值(出广告后不分享的概率)
  137. share_video_data_key = config_param['share']['video'].get('data')
  138. share_video_key_name = f"{config_.KEY_NAME_PREFIX_AD_VIDEO}{share_video_data_key}:{dt}"
  139. share_video_data = redis_helper.get_all_data_from_zset(key_name=share_video_key_name, with_scores=True)
  140. if share_video_data is None:
  141. log_.info(f"share video data is None!")
  142. share_video_df = pd.DataFrame(data=share_video_data, columns=['videoid', 'video_ad_share_rate'])
  143. share_video_df = share_video_df[share_video_df['videoid'] != -1]
  144. log_.info(f"share_video_df count = {len(share_video_df)}")
  145. if len(share_video_df) == 0 or len(share_video_df) == 0:
  146. sys.exit(1)
  147. # ###### 获取以不直接跳出为目标的数据
  148. # 获取用户组预测值(出广告后不直接跳出的概率)
  149. out_user_data_key = config_param['out']['user'].get('data')
  150. out_user_rule_key = config_param['out']['user'].get('rule')
  151. out_group_key_name = f"{config_.KEY_NAME_PREFIX_AD_GROUP}{out_user_data_key}:{out_user_rule_key}:{dt}"
  152. out_group_data = redis_helper.get_all_data_from_zset(key_name=out_group_key_name, with_scores=True)
  153. if out_group_data is None:
  154. log_.info(f"out group data is None!")
  155. out_group_df = pd.DataFrame(data=out_group_data, columns=['group', 'group_ad_not_out_rate'])
  156. out_group_df = out_group_df[out_group_df['group'] != 'mean_group']
  157. log_.info(f"out_group_df count = {len(out_group_df)}")
  158. # 获取视频预测值(出广告后不直接跳出的概率)
  159. out_video_data_key = config_param['out']['video'].get('data')
  160. out_video_key_name = f"{config_.KEY_NAME_PREFIX_AD_VIDEO}{out_video_data_key}:{dt}"
  161. out_video_data = redis_helper.get_all_data_from_zset(key_name=out_video_key_name, with_scores=True)
  162. if out_video_data is None:
  163. log_.info(f"out video data is None!")
  164. out_video_df = pd.DataFrame(data=out_video_data, columns=['videoid', 'video_ad_not_out_rate'])
  165. out_video_df = out_video_df[out_video_df['videoid'] != -1]
  166. log_.info(f"out_video_df count = {len(out_video_df)}")
  167. if len(share_video_df) == 0 or len(share_video_df) == 0:
  168. sys.exit(1)
  169. # 加权融合
  170. share_weight = config_param['mix_param']['share_weight']
  171. out_weight = config_param['mix_param']['out_weight']
  172. # 用户侧数据
  173. group_df = pd.merge(share_group_df, out_group_df, on='group')
  174. group_df['group_rate'] = \
  175. share_weight * group_df['group_ad_share_rate'] + out_weight * group_df['group_ad_not_out_rate']
  176. # 视频侧数据
  177. video_df = pd.merge(share_video_df, out_video_df, on='videoid')
  178. video_df['video_rate'] = \
  179. share_weight * video_df['video_ad_share_rate'] + out_weight * video_df['video_ad_not_out_rate']
  180. predict_df = video_df.copy()
  181. all_group_data = []
  182. for index, item in group_df.iterrows():
  183. predict_df[item['group']] = predict_df['video_rate'] * item['group_rate']
  184. all_group_data.extend(predict_df[item['group']].tolist())
  185. # 计算对应的阈值
  186. ad_threshold_mappings = threshold_record.get(config_key.split('-')[0])
  187. for abtest_group, ad_threshold_mapping in ad_threshold_mappings.items():
  188. threshold_data = {}
  189. for _, item in group_df.iterrows():
  190. # 获取分组对应的均值作为阈值
  191. threshold_data[item['group']] = predict_df[item['group']].mean() * ad_threshold_mapping['group']
  192. threshold_data['mean_group'] = np.mean(all_group_data) * ad_threshold_mapping['mean_group']
  193. # 获取需要多出广告的用户组,及阈值比例
  194. more_ad = config_param.get('more_ad', None)
  195. if more_ad is not None:
  196. for group_key, group_threshold_rate in more_ad.items():
  197. threshold_data[group_key] = threshold_data[group_key] * group_threshold_rate
  198. log_.info(f"config_key = {config_key}, abtest_group = {abtest_group}, threshold_data = {threshold_data}")
  199. # 将阈值写入redis
  200. abtest_config_list = config_key.split('-')
  201. abtest_id, abtest_config_tag = abtest_config_list[0], abtest_config_list[1]
  202. for key, val in threshold_data.items():
  203. key_name = f"{config_.KEY_NAME_PREFIX_AD_THRESHOLD}{abtest_id}:{abtest_config_tag}:{abtest_group}:{key}"
  204. if abtest_id == 243 and (abtest_group == "ab0" or abtest_group == "ab1" or abtest_group == "ab2"):
  205. val=0.6983435337929007
  206. redis_helper.set_data_to_redis(key_name=key_name, value=val, expire_time=2 * 24 * 3600)
  207. # 计算关怀模式实验阈值 并 写入Redis
  208. care_model = config_param.get('care_model', None)
  209. threshold_rate = config_param.get('threshold_rate', None)
  210. if care_model is True:
  211. care_model_threshold_data = {}
  212. for key, val in threshold_data.items():
  213. up_val = val * threshold_rate
  214. care_model_threshold_data[key] = up_val
  215. if abtest_id == 243 and (abtest_group == "ab0" or abtest_group == "ab1" or abtest_group == "ab2"):
  216. val = 0.6983435337929007
  217. up_key_name = \
  218. f"{config_.KEY_NAME_PREFIX_AD_THRESHOLD_CARE_MODEL}{abtest_id}:{abtest_config_tag}:{abtest_group}:{key}"
  219. redis_helper.set_data_to_redis(key_name=up_key_name, value=up_val, expire_time=2 * 24 * 3600)
  220. log_.info(f"config_key = {config_key}, abtest_group = {abtest_group}, "
  221. f"care_model_threshold_data = {care_model_threshold_data}")
  222. # predict_df.to_csv(f'./data/ad_user_video_predict_{config_key}.csv')
  223. log_.info(f"config_key = {config_key} update end!")
  224. def predict_ad_group_video_mix_with_multiply(dt, config_key, config_param, threshold_record):
  225. log_.info(f"config_key = {config_key} update start ...")
  226. # ###### 获取以分享为目标的数据
  227. # 获取用户组预测值(出广告后分享的概率)
  228. share_user_data_key = config_param['share']['user'].get('data')
  229. share_user_rule_key = config_param['share']['user'].get('rule')
  230. share_group_key_name = f"{config_.KEY_NAME_PREFIX_AD_GROUP}{share_user_data_key}:{share_user_rule_key}:{dt}"
  231. share_group_data = redis_helper.get_all_data_from_zset(key_name=share_group_key_name, with_scores=True)
  232. if share_group_data is None:
  233. log_.info(f"share group data is None!")
  234. share_group_df = pd.DataFrame(data=share_group_data, columns=['group', 'group_ad_share_rate'])
  235. share_group_df = share_group_df[share_group_df['group'] != 'mean_group']
  236. log_.info(f"share_group_df count = {len(share_group_df)}")
  237. # 获取视频预测值(出广告后分享的概率)
  238. share_video_data_key = config_param['share']['video'].get('data')
  239. share_video_key_name = f"{config_.KEY_NAME_PREFIX_AD_VIDEO}{share_video_data_key}:{dt}"
  240. share_video_data = redis_helper.get_all_data_from_zset(key_name=share_video_key_name, with_scores=True)
  241. if share_video_data is None:
  242. log_.info(f"share video data is None!")
  243. share_video_df = pd.DataFrame(data=share_video_data, columns=['videoid', 'video_ad_share_rate'])
  244. share_video_df = share_video_df[share_video_df['videoid'] != -1]
  245. log_.info(f"share_video_df count = {len(share_video_df)}")
  246. if len(share_video_df) == 0 or len(share_video_df) == 0:
  247. sys.exit(1)
  248. # ###### 获取以不直接跳出为目标的数据
  249. # 获取用户组预测值(出广告后不直接跳出的概率)
  250. out_user_data_key = config_param['out']['user'].get('data')
  251. out_user_rule_key = config_param['out']['user'].get('rule')
  252. out_group_key_name = f"{config_.KEY_NAME_PREFIX_AD_GROUP}{out_user_data_key}:{out_user_rule_key}:{dt}"
  253. out_group_data = redis_helper.get_all_data_from_zset(key_name=out_group_key_name, with_scores=True)
  254. if out_group_data is None:
  255. log_.info(f"out group data is None!")
  256. out_group_df = pd.DataFrame(data=out_group_data, columns=['group', 'group_ad_not_out_rate'])
  257. out_group_df = out_group_df[out_group_df['group'] != 'mean_group']
  258. log_.info(f"out_group_df count = {len(out_group_df)}")
  259. # 获取视频预测值(出广告后不直接跳出的概率)
  260. out_video_data_key = config_param['out']['video'].get('data')
  261. out_video_key_name = f"{config_.KEY_NAME_PREFIX_AD_VIDEO}{out_video_data_key}:{dt}"
  262. out_video_data = redis_helper.get_all_data_from_zset(key_name=out_video_key_name, with_scores=True)
  263. if out_video_data is None:
  264. log_.info(f"out video data is None!")
  265. out_video_df = pd.DataFrame(data=out_video_data, columns=['videoid', 'video_ad_not_out_rate'])
  266. out_video_df = out_video_df[out_video_df['videoid'] != -1]
  267. log_.info(f"out_video_df count = {len(out_video_df)}")
  268. if len(share_video_df) == 0 or len(share_video_df) == 0:
  269. sys.exit(1)
  270. # 乘积融合
  271. # 用户侧数据
  272. group_df = pd.merge(share_group_df, out_group_df, on='group')
  273. group_df['group_rate'] = group_df['group_ad_share_rate'] * group_df['group_ad_not_out_rate']
  274. # 视频侧数据
  275. video_df = pd.merge(share_video_df, out_video_df, on='videoid')
  276. video_df['video_rate'] = video_df['video_ad_share_rate'] * video_df['video_ad_not_out_rate']
  277. predict_df = video_df.copy()
  278. all_group_data = []
  279. for index, item in group_df.iterrows():
  280. predict_df[item['group']] = predict_df['video_rate'] * item['group_rate']
  281. all_group_data.extend(predict_df[item['group']].tolist())
  282. # 计算对应的阈值
  283. ad_threshold_mappings = threshold_record.get(config_key.split('-')[0])
  284. for abtest_group, ad_threshold_mapping in ad_threshold_mappings.items():
  285. threshold_data = {}
  286. for _, item in group_df.iterrows():
  287. # 获取分组对应的均值作为阈值
  288. threshold_data[item['group']] = predict_df[item['group']].mean() * ad_threshold_mapping['group']
  289. threshold_data['mean_group'] = np.mean(all_group_data) * ad_threshold_mapping['mean_group']
  290. # 获取需要多出广告的用户组,及阈值比例
  291. more_ad = config_param.get('more_ad', None)
  292. if more_ad is not None:
  293. for group_key, group_threshold_rate in more_ad.items():
  294. threshold_data[group_key] = threshold_data[group_key] * group_threshold_rate
  295. log_.info(f"config_key = {config_key}, abtest_group = {abtest_group}, threshold_data = {threshold_data}")
  296. # 将阈值写入redis
  297. abtest_config_list = config_key.split('-')
  298. abtest_id, abtest_config_tag = abtest_config_list[0], abtest_config_list[1]
  299. for key, val in threshold_data.items():
  300. key_name = f"{config_.KEY_NAME_PREFIX_AD_THRESHOLD}{abtest_id}:{abtest_config_tag}:{abtest_group}:{key}"
  301. redis_helper.set_data_to_redis(key_name=key_name, value=val, expire_time=2 * 24 * 3600)
  302. # 计算关怀模式实验阈值 并 写入Redis
  303. care_model = config_param.get('care_model', None)
  304. threshold_rate = config_param.get('threshold_rate', None)
  305. if care_model is True:
  306. care_model_threshold_data = {}
  307. for key, val in threshold_data.items():
  308. up_val = val * threshold_rate
  309. care_model_threshold_data[key] = up_val
  310. up_key_name = \
  311. f"{config_.KEY_NAME_PREFIX_AD_THRESHOLD_CARE_MODEL}{abtest_id}:{abtest_config_tag}:{abtest_group}:{key}"
  312. redis_helper.set_data_to_redis(key_name=up_key_name, value=up_val, expire_time=2 * 24 * 3600)
  313. log_.info(f"config_key = {config_key}, abtest_group = {abtest_group}, "
  314. f"care_model_threshold_data = {care_model_threshold_data}")
  315. # predict_df.to_csv(f'./data/ad_user_video_predict_{config_key}.csv')
  316. log_.info(f"config_key = {config_key} update end!")
  317. def predict():
  318. try:
  319. now_date = datetime.datetime.today()
  320. dt = datetime.datetime.strftime(now_date, '%Y%m%d')
  321. log_.info(f"dt = {dt}")
  322. # 获取阈值参数记录
  323. threshold_record = redis_helper.get_data_from_redis(key_name=config_.KEY_NAME_PREFIX_AD_THRESHOLD_RECORD)
  324. # print(threshold_record)
  325. threshold_record = eval(threshold_record)
  326. log_.info(f"threshold_record = {threshold_record}")
  327. params = config_.AD_ABTEST_CONFIG
  328. for config_key, config_param in params.items():
  329. if config_param.get('threshold_mix_func') == 'add':
  330. predict_ad_group_video_mix_with_add(dt=dt,
  331. config_key=config_key,
  332. config_param=config_param,
  333. threshold_record=threshold_record)
  334. elif config_param.get('threshold_mix_func') == 'multiply':
  335. predict_ad_group_video_mix_with_multiply(dt=dt,
  336. config_key=config_key,
  337. config_param=config_param,
  338. threshold_record=threshold_record)
  339. else:
  340. predict_ad_group_video(dt=dt,
  341. config_key=config_key,
  342. config_param=config_param,
  343. threshold_record=threshold_record)
  344. # 阈值参数记录
  345. # redis_helper.set_data_to_redis(key_name=config_.KEY_NAME_PREFIX_AD_THRESHOLD_RECORD,
  346. # value=str(config_.AD_ABTEST_THRESHOLD_CONFIG),
  347. # expire_time=24*3600)
  348. redis_helper.set_data_to_redis(key_name=config_.KEY_NAME_PREFIX_AD_THRESHOLD_RECORD,
  349. value=str(threshold_record),
  350. expire_time=2 * 24 * 3600)
  351. msg_list = [
  352. f"env: rov-offline {config_.ENV_TEXT}",
  353. f"finished time: {datetime.datetime.strftime(datetime.datetime.now(), '%Y%m%d %H:%M:%S')}",
  354. ]
  355. send_msg_to_feishu_new(
  356. webhook=config_.FEISHU_ROBOT['ad_threshold_update_robot'].get('webhook'),
  357. key_word=config_.FEISHU_ROBOT['ad_threshold_update_robot'].get('key_word'),
  358. title='广告模型阈值更新完成',
  359. msg_list=msg_list
  360. )
  361. except Exception as e:
  362. log_.error(f"广告模型阈值更新失败, exception: {e}, traceback: {traceback.format_exc()}")
  363. msg_list = [
  364. f"env: rov-offline {config_.ENV_TEXT}",
  365. f"now time: {datetime.datetime.strftime(datetime.datetime.now(), '%Y%m%d %H:%M:%S')}",
  366. f"exception: {e}",
  367. f"traceback: {traceback.format_exc()}",
  368. ]
  369. send_msg_to_feishu_new(
  370. webhook=config_.FEISHU_ROBOT['ad_threshold_update_robot'].get('webhook'),
  371. key_word=config_.FEISHU_ROBOT['ad_threshold_update_robot'].get('key_word'),
  372. title='广告模型阈值更新失败',
  373. msg_list=msg_list
  374. )
  375. if __name__ == '__main__':
  376. # predict_ad_group_video()
  377. predict()