ad_threshold_auto_update.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240
  1. import datetime
  2. import traceback
  3. import numpy as np
  4. from threading import Timer
  5. import pandas as pd
  6. from utils import RedisHelper, data_check, get_feature_data, send_msg_to_feishu, request_get
  7. from config import set_config
  8. from log import Log
  9. config_, _ = set_config()
  10. log_ = Log()
  11. redis_helper = RedisHelper()
  12. features = [
  13. 'apptype',
  14. '分组',
  15. '广告uv'
  16. ]
  17. def get_threshold_record_new(ad_abtest_abcode_config, feature_df, threshold_record):
  18. """根据活跃人数变化计算新的阈值参数"""
  19. robot_msg_record = []
  20. threshold_record_new = threshold_record.copy()
  21. for app_type, config_params in ad_abtest_abcode_config.items():
  22. # 获取对应端的数据, 更新阈值参数
  23. # log_.info(f"app_type = {app_type}")
  24. temp_df = feature_df[feature_df['apptype'] == app_type]
  25. ab_test_id = config_params.get('ab_test_id')
  26. ab_test_config = config_params.get('ab_test_config')
  27. up_threshold_update = config_params.get('up_threshold_update')
  28. down_threshold_update = config_params.get('down_threshold_update')
  29. for config_name, ab_code_list in ab_test_config.items():
  30. ad_abtest_tag = f"{ab_test_id}-{config_name}"
  31. # log_.info(f"ad_abtest_tag = {ad_abtest_tag}")
  32. if len(ab_code_list) > 0:
  33. b_mean = temp_df[temp_df['adcode'].isin(ab_code_list)]['b'].mean()
  34. if b_mean < 0:
  35. # 阈值按梯度调高
  36. gradient = up_threshold_update[config_name].get('gradient')
  37. update_range = up_threshold_update[config_name].get('update_range')
  38. b_i = (b_mean * -1) // gradient + 1
  39. threshold_param_new = float(threshold_record.get(ad_abtest_tag)) + update_range * b_i
  40. elif b_mean > 0.1:
  41. # 阈值按梯度调低
  42. gradient = down_threshold_update[config_name].get('gradient')
  43. update_range = down_threshold_update[config_name].get('update_range')
  44. b_i = (b_mean - 0.1) // gradient + 1
  45. threshold_param_new = float(threshold_record.get(ad_abtest_tag)) - update_range * b_i
  46. else:
  47. continue
  48. if threshold_param_new > 0:
  49. threshold_record_new[ad_abtest_tag] = threshold_param_new
  50. robot_msg_record.append({'appType': app_type, 'abtestTag': ad_abtest_tag,
  51. 'gradient': round(gradient, 4), 'range': round(update_range, 4),
  52. 'i': int(b_i),
  53. 'paramOld': round(float(threshold_record.get(ad_abtest_tag)), 4),
  54. 'paramNew': round(threshold_param_new, 4)})
  55. return threshold_record_new, robot_msg_record
  56. def get_threshold_record_new_by_uv(ad_abtest_abcode_config, feature_df, threshold_record, ad_target_uv):
  57. """根据广告uv计算新的阈值参数"""
  58. robot_msg_record = []
  59. threshold_record_new = threshold_record.copy()
  60. for app_type, target_uv_mapping in ad_target_uv.items():
  61. update_threshold_params = ad_abtest_abcode_config.get(int(app_type))
  62. ab_test_id = update_threshold_params.get('ab_test_id')
  63. temp_df = feature_df[feature_df['apptype'] == int(app_type)]
  64. for ab_test_group, target_uv in target_uv_mapping.items():
  65. threshold_update = update_threshold_params.get('threshold_update')
  66. for app_type, config_params in ad_abtest_abcode_config.items():
  67. # 获取对应端的数据, 更新阈值参数
  68. # log_.info(f"app_type = {app_type}")
  69. temp_df = feature_df[feature_df['apptype'] == app_type]
  70. ab_test_id = config_params.get('ab_test_id')
  71. threshold_update = config_params.get('threshold_update')
  72. for config_name, ab_code_list in ab_test_config.items():
  73. ad_abtest_tag = f"{ab_test_id}-{config_name}"
  74. # log_.info(f"ad_abtest_tag = {ad_abtest_tag}")
  75. if len(ab_code_list) > 0:
  76. b_mean = temp_df[temp_df['adcode'].isin(ab_code_list)]['b'].mean()
  77. if b_mean < 0:
  78. # 阈值按梯度调高
  79. gradient = up_threshold_update[config_name].get('gradient')
  80. update_range = up_threshold_update[config_name].get('update_range')
  81. b_i = (b_mean * -1) // gradient + 1
  82. threshold_param_new = float(threshold_record.get(ad_abtest_tag)) + update_range * b_i
  83. elif b_mean > 0.1:
  84. # 阈值按梯度调低
  85. gradient = down_threshold_update[config_name].get('gradient')
  86. update_range = down_threshold_update[config_name].get('update_range')
  87. b_i = (b_mean - 0.1) // gradient + 1
  88. threshold_param_new = float(threshold_record.get(ad_abtest_tag)) - update_range * b_i
  89. else:
  90. continue
  91. if threshold_param_new > 0:
  92. threshold_record_new[ad_abtest_tag] = threshold_param_new
  93. robot_msg_record.append({'appType': app_type, 'abtestTag': ad_abtest_tag,
  94. 'gradient': round(gradient, 4), 'range': round(update_range, 4),
  95. 'i': int(b_i),
  96. 'paramOld': round(float(threshold_record.get(ad_abtest_tag)), 4),
  97. 'paramNew': round(threshold_param_new, 4)})
  98. return threshold_record_new, robot_msg_record
  99. def update_threshold(threshold_record_old, threshold_record_new):
  100. """更新阈值"""
  101. ad_mid_group_list = [group for class_key, group_list in config_.AD_MID_GROUP.items()
  102. for group in group_list]
  103. ad_mid_group_list.append("mean_group")
  104. ad_mid_group_list = list(set(ad_mid_group_list))
  105. for ad_abtest_tag, threshold_param_new in threshold_record_new.items():
  106. threshold_param_old = threshold_record_old.get(ad_abtest_tag)
  107. log_.info(f"ad_abtest_tag = {ad_abtest_tag}, "
  108. f"threshold_param_old = {threshold_param_old}, threshold_param_new = {threshold_param_new}")
  109. tag_list = ad_abtest_tag.split('-')
  110. for group_key in ad_mid_group_list:
  111. # 获取对应的阈值
  112. key_name = f"{config_.KEY_NAME_PREFIX_AD_THRESHOLD}{tag_list[0]}:{tag_list[1]}:{group_key}"
  113. threshold_old = redis_helper.get_data_from_redis(key_name=key_name)
  114. if threshold_old is None:
  115. continue
  116. # 计算新的阈值
  117. threshold_new = float(threshold_old) / threshold_param_old * threshold_param_new
  118. log_.info(f"ad_abtest_tag = {ad_abtest_tag}, group_key = {group_key}, "
  119. f"threshold_old = {threshold_old}, threshold_new = {threshold_new}")
  120. # 更新redis
  121. redis_helper.set_data_to_redis(key_name=key_name, value=threshold_new)
  122. def update_ad_abtest_threshold(project, table, dt, ad_abtest_abcode_config, ad_target_uv):
  123. # 获取当前阈值参数值
  124. threshold_record = redis_helper.get_data_from_redis(key_name=config_.KEY_NAME_PREFIX_AD_THRESHOLD_RECORD)
  125. threshold_record = eval(threshold_record)
  126. log_.info(f"threshold_record = {threshold_record}")
  127. # 获取uv数据
  128. feature_df = get_feature_data(project=project, table=table, features=features, dt=dt)
  129. feature_df['apptype'] = feature_df['apptype'].astype(int)
  130. feature_df['广告uv'] = feature_df['广告uv'].astype(float)
  131. # 根据广告uv变化计算新的阈值参数
  132. threshold_record_new, robot_msg_record = get_threshold_record_new_by_uv(
  133. ad_abtest_abcode_config=ad_abtest_abcode_config, feature_df=feature_df,
  134. threshold_record=threshold_record, ad_target_uv=ad_target_uv)
  135. log_.info(f"threshold_record_new = {threshold_record_new}")
  136. # 更新阈值
  137. update_threshold(threshold_record_old=threshold_record, threshold_record_new=threshold_record_new)
  138. # 更新阈值参数
  139. redis_helper.set_data_to_redis(key_name=config_.KEY_NAME_PREFIX_AD_THRESHOLD_RECORD,
  140. value=str(threshold_record_new))
  141. return robot_msg_record
  142. def get_ad_target_uv():
  143. """获取管理后台开启自动调整阈值开关的目标uv值"""
  144. ad_target_uv = {}
  145. result = request_get(request_url=config_.GET_AD_TARGET_UV_URL)
  146. if result is None:
  147. log_.info('获取管理后台广告目标uv值失败!')
  148. return ad_target_uv
  149. if result['code'] != 0:
  150. log_.info('获取管理后台广告目标uv值失败!')
  151. return ad_target_uv
  152. if not result['content']:
  153. return ad_target_uv
  154. for item in result['content']:
  155. app_type = item['productId']
  156. target_uv_mapping = {}
  157. for uv_item in item['uvTargetDetails']:
  158. ab_group = uv_item['abParam']
  159. target_uv = uv_item['uvTarget']
  160. target_uv_mapping[ab_group] = target_uv
  161. ad_target_uv[app_type] = target_uv_mapping
  162. return ad_target_uv
  163. def timer_check():
  164. try:
  165. # 获取自动调整阈值参数
  166. ad_abtest_abcode_config = config_.AD_ABTEST_ABCODE_CONFIG
  167. project = config_.AD_THRESHOLD_AUTO_UPDATE_DATA.get('project')
  168. table = config_.AD_THRESHOLD_AUTO_UPDATE_DATA.get('table')
  169. now_date = datetime.datetime.today()
  170. now_min = datetime.datetime.now().minute
  171. log_.info(f"now_date: {datetime.datetime.strftime(now_date, '%Y%m%d%H')}")
  172. # 管理后台获取开启自动调整阈值开关的目标uv值
  173. ad_target_uv = get_ad_target_uv()
  174. log_.info(f"ad_target_uv: {ad_target_uv}")
  175. if len(ad_target_uv) == 0:
  176. return
  177. # 查看当前更新的数据是否已准备好
  178. dt = datetime.datetime.strftime(now_date - datetime.timedelta(hours=1), '%Y%m%d%H')
  179. data_count = data_check(project=project, table=table, dt=dt)
  180. if data_count > 0:
  181. log_.info(f"data count = {data_count}")
  182. # 数据准备好,进行更新
  183. robot_msg_record = update_ad_abtest_threshold(
  184. project=project, table=table, dt=dt,
  185. ad_abtest_abcode_config=ad_abtest_abcode_config, ad_target_uv=ad_target_uv)
  186. if len(robot_msg_record) > 0:
  187. robot_msg_record_text = "\n".join([str(item) for item in robot_msg_record])
  188. msg = f"threshold_param_update: \n{robot_msg_record_text.replace(', ', ', ')}\n"
  189. else:
  190. msg = "无需更新!\n"
  191. send_msg_to_feishu(
  192. webhook=config_.FEISHU_ROBOT['ad_threshold_auto_update_robot'].get('webhook'),
  193. key_word=config_.FEISHU_ROBOT['ad_threshold_auto_update_robot'].get('key_word'),
  194. msg_text=f"rov-offline{config_.ENV_TEXT} - 阈值更新完成!\n{msg}"
  195. )
  196. log_.info(f"threshold update end!")
  197. elif now_min > 30:
  198. log_.info('threshold update data is None!')
  199. send_msg_to_feishu(
  200. webhook=config_.FEISHU_ROBOT['ad_threshold_auto_update_robot'].get('webhook'),
  201. key_word=config_.FEISHU_ROBOT['ad_threshold_auto_update_robot'].get('key_word'),
  202. msg_text=f"rov-offline{config_.ENV_TEXT} - 阈值更新相关数据未准备好!\n"
  203. )
  204. else:
  205. # 数据没准备好,1分钟后重新检查
  206. Timer(60, timer_check).start()
  207. except Exception as e:
  208. log_.error(f"阈值更新失败, exception: {e}, traceback: {traceback.format_exc()}")
  209. send_msg_to_feishu(
  210. webhook=config_.FEISHU_ROBOT['ad_threshold_auto_update_robot'].get('webhook'),
  211. key_word=config_.FEISHU_ROBOT['ad_threshold_auto_update_robot'].get('key_word'),
  212. msg_text=f"rov-offline{config_.ENV_TEXT} - 阈值更新失败\n"
  213. f"exception: {e}\n"
  214. f"traceback: {traceback.format_exc()}"
  215. )
  216. if __name__ == '__main__':
  217. timer_check()