ad_threshold_auto_update.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310
  1. import copy
  2. import datetime
  3. import traceback
  4. import math
  5. import numpy as np
  6. from threading import Timer
  7. import pandas as pd
  8. from utils import RedisHelper, data_check, get_feature_data, send_msg_to_feishu, request_get
  9. from config import set_config
  10. from log import Log
  11. config_, _ = set_config()
  12. log_ = Log()
  13. redis_helper = RedisHelper()
  14. features = [
  15. 'apptype',
  16. '分组',
  17. '广告uv'
  18. ]
  19. def get_threshold_record_new(ad_abtest_abcode_config, feature_df, threshold_record):
  20. """根据活跃人数变化计算新的阈值参数"""
  21. robot_msg_record = []
  22. threshold_record_new = threshold_record.copy()
  23. for app_type, config_params in ad_abtest_abcode_config.items():
  24. # 获取对应端的数据, 更新阈值参数
  25. # log_.info(f"app_type = {app_type}")
  26. temp_df = feature_df[feature_df['apptype'] == app_type]
  27. ab_test_id = config_params.get('ab_test_id')
  28. ab_test_config = config_params.get('ab_test_config')
  29. up_threshold_update = config_params.get('up_threshold_update')
  30. down_threshold_update = config_params.get('down_threshold_update')
  31. for config_name, ab_code_list in ab_test_config.items():
  32. ad_abtest_tag = f"{ab_test_id}-{config_name}"
  33. # log_.info(f"ad_abtest_tag = {ad_abtest_tag}")
  34. if len(ab_code_list) > 0:
  35. b_mean = temp_df[temp_df['adcode'].isin(ab_code_list)]['b'].mean()
  36. if b_mean < 0:
  37. # 阈值按梯度调高
  38. gradient = up_threshold_update[config_name].get('gradient')
  39. update_range = up_threshold_update[config_name].get('update_range')
  40. b_i = (b_mean * -1) // gradient + 1
  41. threshold_param_new = float(threshold_record.get(ad_abtest_tag)) + update_range * b_i
  42. elif b_mean > 0.1:
  43. # 阈值按梯度调低
  44. gradient = down_threshold_update[config_name].get('gradient')
  45. update_range = down_threshold_update[config_name].get('update_range')
  46. b_i = (b_mean - 0.1) // gradient + 1
  47. threshold_param_new = float(threshold_record.get(ad_abtest_tag)) - update_range * b_i
  48. else:
  49. continue
  50. if threshold_param_new > 0:
  51. threshold_record_new[ad_abtest_tag] = threshold_param_new
  52. robot_msg_record.append({'appType': app_type, 'abtestTag': ad_abtest_tag,
  53. 'gradient': round(gradient, 4), 'range': round(update_range, 4),
  54. 'i': int(b_i),
  55. 'paramOld': round(float(threshold_record.get(ad_abtest_tag)), 4),
  56. 'paramNew': round(threshold_param_new, 4)})
  57. return threshold_record_new, robot_msg_record
  58. def get_threshold_record_new_by_uv(ad_abtest_abcode_config, feature_df, threshold_record, ad_target_uv):
  59. """根据广告uv计算新的阈值参数"""
  60. robot_msg_record = []
  61. threshold_record_new = copy.deepcopy(threshold_record)
  62. # 根据目标uv进行调整
  63. for app_type, target_uv_mapping in ad_target_uv.items():
  64. # 获取app_type对应的目标uv
  65. temp_df = feature_df[feature_df['apptype'] == int(app_type)]
  66. # 获取app_type对应的阈值调整参数
  67. update_threshold_params = ad_abtest_abcode_config.get(int(app_type))
  68. ab_test_id = update_threshold_params.get('ab_test_id')
  69. not_update = update_threshold_params.get('not_update')
  70. gradient = update_threshold_params.get('gradient')
  71. max_update_step = update_threshold_params.get('max_update_step')
  72. threshold_update_mapping = update_threshold_params.get('threshold_update')
  73. threshold_record_old = threshold_record.get(ab_test_id)
  74. # print(ab_test_id, threshold_record, threshold_record_old)
  75. for ab_test_group, target_uv in target_uv_mapping.items():
  76. if target_uv is None:
  77. continue
  78. update_range = threshold_update_mapping.get(ab_test_group)
  79. # 获取对应组的当前uv
  80. try:
  81. current_uv = temp_df[temp_df['分组'] == ab_test_group]['广告uv'].values[0]
  82. except Exception as e:
  83. continue
  84. # 计算uv差值
  85. uv_differ = current_uv - target_uv
  86. if abs(uv_differ) < not_update:
  87. continue
  88. # 获取当前阈值参数
  89. threshold_param_old = threshold_record_old[ab_test_group].get('group')
  90. if uv_differ < 0:
  91. # 当前uv < 目标uv,阈值按梯度调低
  92. step = math.ceil((uv_differ * -1) / gradient)
  93. step = max_update_step if step > max_update_step else step
  94. threshold_param_new = float(threshold_param_old) - update_range * step
  95. elif uv_differ > 0:
  96. # 当前uv > 目标uv,阈值按梯度调高
  97. step = math.ceil(uv_differ / gradient)
  98. step = max_update_step if step > max_update_step else step
  99. threshold_param_new = float(threshold_param_old) + update_range * step
  100. else:
  101. continue
  102. if threshold_param_new > 0:
  103. log_.info(
  104. {
  105. 'appType': app_type, 'abtestid': ab_test_id, 'abTestGroup': ab_test_group,
  106. 'targetUv': target_uv, 'currentUv': round(current_uv, 4),
  107. 'uvDiffer': round(uv_differ, 4), 'gradient': round(gradient, 4), 'step': step,
  108. 'range': round(update_range, 4),
  109. 'paramOld': round(float(threshold_param_old), 4),
  110. 'paramNew': round(threshold_param_new, 4)
  111. }
  112. )
  113. threshold_record_new[ab_test_id][ab_test_group]['group'] = threshold_param_new
  114. threshold_record_new[ab_test_id][ab_test_group]['mean_group'] = threshold_param_new
  115. robot_msg_record.append(
  116. {
  117. 'appType': app_type, 'abtestid': ab_test_id, 'abTestGroup': ab_test_group,
  118. 'targetUv': target_uv, 'currentUv': round(current_uv, 4),
  119. 'uvDiffer': round(uv_differ, 4), 'gradient': round(gradient, 4), 'step': step,
  120. 'range': round(update_range, 4),
  121. 'paramOld': round(float(threshold_param_old), 4),
  122. 'paramNew': round(threshold_param_new, 4)
  123. }
  124. )
  125. return threshold_record_new, robot_msg_record
  126. def update_threshold(threshold_record_old, threshold_record_new):
  127. """更新阈值"""
  128. # 获取用户组列表
  129. ad_mid_group_list = [group for class_key, group_list in config_.AD_MID_GROUP.items()
  130. for group in group_list]
  131. ad_mid_group_list.append("mean_group")
  132. ad_mid_group_list = list(set(ad_mid_group_list))
  133. # 获取实验配置列表
  134. ad_abtest_config_mapping = {}
  135. abtest_id_list = []
  136. for key, val in config_.AD_ABTEST_CONFIG.items():
  137. abtest_id, abtest_config_tag = key.split('-')
  138. if abtest_id in abtest_id_list:
  139. ad_abtest_config_mapping[abtest_id].append((abtest_config_tag, val))
  140. else:
  141. abtest_id_list.append(abtest_id)
  142. ad_abtest_config_mapping[abtest_id] = [(abtest_config_tag, val)]
  143. log_.info(f"ad_abtest_config_mapping = {ad_abtest_config_mapping}")
  144. # 计算新的阈值并更新
  145. for abtest_id, threshold_param_mapping in threshold_record_new.items():
  146. for abtest_group, threshold_param_new in threshold_param_mapping.items():
  147. threshold_param_old = threshold_record_old[abtest_id].get(abtest_group)
  148. if str(threshold_param_old) == str(threshold_param_new):
  149. # print(abtest_id, abtest_group, threshold_param_old, threshold_param_new)
  150. continue
  151. log_.info(f"abtest_id = {abtest_id}, abtest_group = {abtest_group}, "
  152. f"threshold_param_old = {threshold_param_old}, threshold_param_new = {threshold_param_new}")
  153. for abtest_config_tag, config_val in ad_abtest_config_mapping.get(abtest_id, []):
  154. for group_key in ad_mid_group_list:
  155. # 获取对应的阈值
  156. key_name = \
  157. f"{config_.KEY_NAME_PREFIX_AD_THRESHOLD}{abtest_id}:{abtest_config_tag}:{abtest_group}:{group_key}"
  158. threshold_old = redis_helper.get_data_from_redis(key_name=key_name)
  159. if threshold_old is None:
  160. continue
  161. # 计算新的阈值
  162. if group_key == 'mean_group':
  163. threshold_new = \
  164. float(threshold_old) / threshold_param_old['mean_group'] * threshold_param_new['mean_group']
  165. else:
  166. threshold_new = \
  167. float(threshold_old) / threshold_param_old['group'] * threshold_param_new['group']
  168. # 更新redis
  169. redis_helper.set_data_to_redis(key_name=key_name, value=threshold_new, expire_time=2 * 24 * 3600)
  170. log_.info(f"abtest_id = {abtest_id}, abtest_config_tag = {abtest_config_tag}, "
  171. f"abtest_group = {abtest_group}, group_key = {group_key}, "
  172. f"threshold_old = {threshold_old}, threshold_new = {threshold_new}")
  173. # 关怀模式实验阈值更新
  174. care_model = config_val.get('care_model', None)
  175. threshold_rate = config_val.get('threshold_rate', None)
  176. if care_model is True:
  177. care_model_key_name = \
  178. f"{config_.KEY_NAME_PREFIX_AD_THRESHOLD_CARE_MODEL}{abtest_id}:{abtest_config_tag}:{abtest_group}:{group_key}"
  179. care_model_threshold_old = redis_helper.get_data_from_redis(key_name=care_model_key_name)
  180. care_model_threshold_new = threshold_new * threshold_rate
  181. redis_helper.set_data_to_redis(key_name=care_model_key_name,
  182. value=care_model_threshold_new, expire_time=2 * 24 * 3600)
  183. log_.info(f"abtest_id = {abtest_id}, abtest_config_tag = {abtest_config_tag}, "
  184. f"abtest_group = {abtest_group}, group_key = {group_key}, "
  185. f"care_model_threshold_old = {care_model_threshold_old}, "
  186. f"care_model_threshold_new = {care_model_threshold_new}")
  187. def update_ad_abtest_threshold(project, table, dt, ad_abtest_abcode_config, ad_target_uv):
  188. # 获取当前阈值参数值
  189. threshold_record = redis_helper.get_data_from_redis(key_name=config_.KEY_NAME_PREFIX_AD_THRESHOLD_RECORD)
  190. threshold_record = eval(threshold_record)
  191. log_.info(f"threshold_record = {threshold_record}")
  192. # 获取uv数据
  193. feature_df = get_feature_data(project=project, table=table, features=features, dt=dt)
  194. feature_df['apptype'] = feature_df['apptype'].astype(int)
  195. feature_df['广告uv'] = feature_df['广告uv'].astype(float)
  196. # 根据广告uv变化计算新的阈值参数
  197. threshold_record_new, robot_msg_record = get_threshold_record_new_by_uv(
  198. ad_abtest_abcode_config=ad_abtest_abcode_config, feature_df=feature_df,
  199. threshold_record=threshold_record, ad_target_uv=ad_target_uv)
  200. log_.info(f"threshold_record_new = {threshold_record_new}")
  201. # 更新阈值
  202. update_threshold(threshold_record_old=threshold_record, threshold_record_new=threshold_record_new)
  203. # 更新阈值参数
  204. redis_helper.set_data_to_redis(key_name=config_.KEY_NAME_PREFIX_AD_THRESHOLD_RECORD,
  205. value=str(threshold_record_new), expire_time=2 * 24 * 3600)
  206. return robot_msg_record
  207. def get_ad_target_uv():
  208. """获取管理后台开启自动调整阈值开关的目标uv值"""
  209. ad_target_uv = {}
  210. result = request_get(request_url=config_.GET_AD_TARGET_UV_URL)
  211. if result is None:
  212. log_.info('获取管理后台广告目标uv值失败!')
  213. return ad_target_uv
  214. if result['code'] != 0:
  215. log_.info('获取管理后台广告目标uv值失败!')
  216. return ad_target_uv
  217. if not result['content']:
  218. return ad_target_uv
  219. for item in result['content']:
  220. app_type = item['productId']
  221. target_uv_mapping = {}
  222. for uv_item in item['uvTargetDetails']:
  223. ab_group = uv_item['abParam']
  224. target_uv = uv_item['uvTarget']
  225. target_uv_mapping[ab_group] = target_uv
  226. ad_target_uv[app_type] = target_uv_mapping
  227. return ad_target_uv
  228. def timer_check():
  229. try:
  230. # 获取自动调整阈值参数
  231. ad_abtest_abcode_config = config_.AD_ABTEST_ABCODE_CONFIG
  232. # 自动调整阈值参数存储至redis
  233. redis_helper.set_data_to_redis(key_name=config_.KEY_NAME_PREFIX_AD_THRESHOLD_PARAM_RECORD,
  234. value=str(ad_abtest_abcode_config),
  235. expire_time=24 * 3600)
  236. project = config_.AD_THRESHOLD_AUTO_UPDATE_DATA.get('project')
  237. table = config_.AD_THRESHOLD_AUTO_UPDATE_DATA.get('table')
  238. now_date = datetime.datetime.today()
  239. now_min = datetime.datetime.now().minute
  240. log_.info(f"now_date: {datetime.datetime.strftime(now_date, '%Y%m%d%H')}")
  241. # 管理后台获取开启自动调整阈值开关的目标uv值
  242. ad_target_uv = get_ad_target_uv()
  243. log_.info(f"ad_target_uv: {ad_target_uv}")
  244. if len(ad_target_uv) == 0:
  245. return
  246. # 查看当前更新的数据是否已准备好
  247. dt = datetime.datetime.strftime(now_date - datetime.timedelta(hours=1), '%Y%m%d%H')
  248. data_count = data_check(project=project, table=table, dt=dt)
  249. if data_count > 0:
  250. log_.info(f"data count = {data_count}")
  251. # 数据准备好,进行更新
  252. robot_msg_record = update_ad_abtest_threshold(
  253. project=project, table=table, dt=dt,
  254. ad_abtest_abcode_config=ad_abtest_abcode_config, ad_target_uv=ad_target_uv)
  255. if len(robot_msg_record) > 0:
  256. robot_msg_record_text = "\n".join([str(item) for item in robot_msg_record])
  257. msg = f"threshold_param_update: \n{robot_msg_record_text.replace(', ', ', ')}\n"
  258. else:
  259. msg = "无需更新!\n"
  260. send_msg_to_feishu(
  261. webhook=config_.FEISHU_ROBOT['ad_threshold_auto_update_robot'].get('webhook'),
  262. key_word=config_.FEISHU_ROBOT['ad_threshold_auto_update_robot'].get('key_word'),
  263. msg_text=f"rov-offline{config_.ENV_TEXT} - 阈值更新完成!\n{msg}"
  264. )
  265. log_.info(f"threshold update end!")
  266. elif now_min > 30:
  267. log_.info('threshold update data is None!')
  268. send_msg_to_feishu(
  269. webhook=config_.FEISHU_ROBOT['ad_threshold_auto_update_robot'].get('webhook'),
  270. key_word=config_.FEISHU_ROBOT['ad_threshold_auto_update_robot'].get('key_word'),
  271. msg_text=f"rov-offline{config_.ENV_TEXT} - 阈值更新相关数据未准备好!\n"
  272. )
  273. else:
  274. # 数据没准备好,1分钟后重新检查
  275. Timer(60, timer_check).start()
  276. except Exception as e:
  277. log_.error(f"阈值更新失败, exception: {e}, traceback: {traceback.format_exc()}")
  278. send_msg_to_feishu(
  279. webhook=config_.FEISHU_ROBOT['ad_threshold_auto_update_robot'].get('webhook'),
  280. key_word=config_.FEISHU_ROBOT['ad_threshold_auto_update_robot'].get('key_word'),
  281. msg_text=f"rov-offline{config_.ENV_TEXT} - 阈值更新失败\n"
  282. f"exception: {e}\n"
  283. f"traceback: {traceback.format_exc()}"
  284. )
  285. if __name__ == '__main__':
  286. timer_check()