ad_threshold_auto_update.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360
  1. import copy
  2. import datetime
  3. import traceback
  4. import math
  5. import numpy as np
  6. from threading import Timer
  7. import pandas as pd
  8. from my_utils import RedisHelper, data_check, get_feature_data, send_msg_to_feishu, request_get
  9. from my_config import set_config
  10. from log import Log
  11. config_, _ = set_config()
  12. log_ = Log()
  13. redis_helper = RedisHelper()
  14. features = [
  15. 'apptype',
  16. '分组',
  17. '广告uv'
  18. ]
  19. def get_threshold_record_new(ad_abtest_abcode_config, feature_df, threshold_record):
  20. """根据活跃人数变化计算新的阈值参数"""
  21. robot_msg_record = []
  22. threshold_record_new = threshold_record.copy()
  23. for app_type, config_params in ad_abtest_abcode_config.items():
  24. # 获取对应端的数据, 更新阈值参数
  25. # log_.info(f"app_type = {app_type}")
  26. temp_df = feature_df[feature_df['apptype'] == app_type]
  27. ab_test_id = config_params.get('ab_test_id')
  28. ab_test_config = config_params.get('ab_test_config')
  29. up_threshold_update = config_params.get('up_threshold_update')
  30. down_threshold_update = config_params.get('down_threshold_update')
  31. for config_name, ab_code_list in ab_test_config.items():
  32. ad_abtest_tag = f"{ab_test_id}-{config_name}"
  33. # log_.info(f"ad_abtest_tag = {ad_abtest_tag}")
  34. if len(ab_code_list) > 0:
  35. b_mean = temp_df[temp_df['adcode'].isin(ab_code_list)]['b'].mean()
  36. if b_mean < 0:
  37. # 阈值按梯度调高
  38. gradient = up_threshold_update[config_name].get('gradient')
  39. update_range = up_threshold_update[config_name].get('update_range')
  40. b_i = (b_mean * -1) // gradient + 1
  41. threshold_param_new = float(threshold_record.get(ad_abtest_tag)) + update_range * b_i
  42. elif b_mean > 0.1:
  43. # 阈值按梯度调低
  44. gradient = down_threshold_update[config_name].get('gradient')
  45. update_range = down_threshold_update[config_name].get('update_range')
  46. b_i = (b_mean - 0.1) // gradient + 1
  47. threshold_param_new = float(threshold_record.get(ad_abtest_tag)) - update_range * b_i
  48. else:
  49. continue
  50. if threshold_param_new > 0:
  51. threshold_record_new[ad_abtest_tag] = threshold_param_new
  52. robot_msg_record.append({'appType': app_type, 'abtestTag': ad_abtest_tag,
  53. 'gradient': round(gradient, 4), 'range': round(update_range, 4),
  54. 'i': int(b_i),
  55. 'paramOld': round(float(threshold_record.get(ad_abtest_tag)), 4),
  56. 'paramNew': round(threshold_param_new, 4)})
  57. return threshold_record_new, robot_msg_record
  58. def get_threshold_record_new_by_uv(ad_abtest_abcode_config, feature_df, threshold_record, ad_target_uv):
  59. """根据广告uv计算新的阈值参数"""
  60. robot_msg_record = []
  61. threshold_record_new = copy.deepcopy(threshold_record)
  62. # 根据目标uv进行调整
  63. for app_type, target_uv_mapping in ad_target_uv.items():
  64. # 获取app_type对应的目标uv
  65. temp_df = feature_df[feature_df['apptype'] == int(app_type)]
  66. # 获取app_type对应的阈值调整参数
  67. update_threshold_params = ad_abtest_abcode_config.get(int(app_type))
  68. ab_test_id = update_threshold_params.get('ab_test_id')
  69. not_update = update_threshold_params.get('not_update')
  70. gradient = update_threshold_params.get('gradient')
  71. max_update_step = update_threshold_params.get('max_update_step')
  72. threshold_update_mapping = update_threshold_params.get('threshold_update')
  73. threshold_record_old = threshold_record.get(ab_test_id)
  74. # print(ab_test_id, threshold_record, threshold_record_old)
  75. for ab_test_group, target_uv in target_uv_mapping.items():
  76. if target_uv is None:
  77. continue
  78. gradient, max_update_step = get_ad_uv_update_config(app_type=app_type, ab_group=ab_test_group)
  79. update_range = threshold_update_mapping.get(ab_test_group)
  80. # 获取对应组的当前uv
  81. try:
  82. current_uv = temp_df[temp_df['分组'] == ab_test_group]['广告uv'].values[0]
  83. except Exception as e:
  84. continue
  85. # 计算uv差值
  86. uv_differ = current_uv - target_uv
  87. if abs(uv_differ) <= not_update:
  88. continue
  89. # 获取当前阈值参数
  90. threshold_param_old = threshold_record_old[ab_test_group].get('group')
  91. if uv_differ < 0:
  92. # 当前uv < 目标uv,阈值按梯度调低(第一个梯度区间:向上取整,之后:四舍五入)
  93. if abs(uv_differ) < gradient:
  94. step = math.ceil(abs(uv_differ) / gradient)
  95. else:
  96. step = round(abs(uv_differ) / gradient)
  97. step = max_update_step if step > max_update_step else step
  98. threshold_param_new = float(threshold_param_old) - update_range * step
  99. elif uv_differ > 0:
  100. # 当前uv > 目标uv,阈值按梯度调高(第一个梯度区间:向上取整,之后:四舍五入)
  101. if uv_differ < gradient:
  102. step = math.ceil(uv_differ / gradient)
  103. else:
  104. step = round(uv_differ / gradient)
  105. step = max_update_step if step > max_update_step else step
  106. threshold_param_new = float(threshold_param_old) + update_range * step
  107. else:
  108. continue
  109. if threshold_param_new <= 0:
  110. threshold_param_new = 0
  111. log_.info(
  112. {
  113. 'appType': app_type, 'abtestid': ab_test_id, 'abTestGroup': ab_test_group,
  114. 'targetUv': target_uv, 'currentUv': round(current_uv, 4),
  115. 'uvDiffer': round(uv_differ, 4), 'gradient': round(gradient, 4), 'step': step,
  116. 'range': round(update_range, 4),
  117. 'paramOld': round(float(threshold_param_old), 4),
  118. 'paramNew': round(threshold_param_new, 4)
  119. }
  120. )
  121. threshold_record_new[ab_test_id][ab_test_group]['group'] = threshold_param_new
  122. threshold_record_new[ab_test_id][ab_test_group]['mean_group'] = threshold_param_new
  123. robot_msg_record.append(
  124. {
  125. 'appType': app_type, 'abtestid': ab_test_id, 'abTestGroup': ab_test_group,
  126. 'targetUv': target_uv, 'currentUv': round(current_uv, 4),
  127. 'uvDiffer': round(uv_differ, 4), 'gradient': round(gradient, 4), 'step': step,
  128. 'range': round(update_range, 4),
  129. 'paramOld': round(float(threshold_param_old), 4),
  130. 'paramNew': round(threshold_param_new, 4)
  131. }
  132. )
  133. return threshold_record_new, robot_msg_record
  134. def update_threshold(threshold_record_old, threshold_record_new):
  135. """更新阈值"""
  136. # 获取用户组列表
  137. ad_mid_group_list = [group for class_key, group_list in config_.AD_MID_GROUP.items()
  138. for group in group_list]
  139. ad_mid_group_list.append("mean_group")
  140. ad_mid_group_list = list(set(ad_mid_group_list))
  141. # 获取实验配置列表
  142. ad_abtest_config_mapping = {}
  143. abtest_id_list = []
  144. for key, val in config_.AD_ABTEST_CONFIG.items():
  145. abtest_id, abtest_config_tag = key.split('-')
  146. if abtest_id in abtest_id_list:
  147. ad_abtest_config_mapping[abtest_id].append((abtest_config_tag, val))
  148. else:
  149. abtest_id_list.append(abtest_id)
  150. ad_abtest_config_mapping[abtest_id] = [(abtest_config_tag, val)]
  151. log_.info(f"ad_abtest_config_mapping = {ad_abtest_config_mapping}")
  152. # 计算新的阈值并更新
  153. for abtest_id, threshold_param_mapping in threshold_record_new.items():
  154. for abtest_group, threshold_param_new in threshold_param_mapping.items():
  155. threshold_param_old = threshold_record_old[abtest_id].get(abtest_group)
  156. if str(threshold_param_old) == str(threshold_param_new):
  157. # print(abtest_id, abtest_group, threshold_param_old, threshold_param_new)
  158. continue
  159. log_.info(f"abtest_id = {abtest_id}, abtest_group = {abtest_group}, "
  160. f"threshold_param_old = {threshold_param_old}, threshold_param_new = {threshold_param_new}")
  161. for abtest_config_tag, config_val in ad_abtest_config_mapping.get(abtest_id, []):
  162. for group_key in ad_mid_group_list:
  163. # 获取对应的阈值
  164. key_name = \
  165. f"{config_.KEY_NAME_PREFIX_AD_THRESHOLD}{abtest_id}:{abtest_config_tag}:{abtest_group}:{group_key}"
  166. threshold_old = redis_helper.get_data_from_redis(key_name=key_name)
  167. if threshold_old is None:
  168. continue
  169. # 原阈值为0时,加10**(-5)兜底处理
  170. if float(threshold_old) == 0:
  171. threshold_old = float(threshold_old) + 10**(-5)
  172. # 计算新的阈值
  173. if group_key == 'mean_group':
  174. if threshold_param_old['mean_group'] == 0:
  175. threshold_new = \
  176. float(threshold_old) / 10**(-5) * threshold_param_new['mean_group']
  177. else:
  178. threshold_new = \
  179. float(threshold_old) / threshold_param_old['mean_group'] * threshold_param_new['mean_group']
  180. else:
  181. if threshold_param_old['group'] == 0:
  182. threshold_new = \
  183. float(threshold_old) / 10**(-5) * threshold_param_new['group']
  184. else:
  185. threshold_new = \
  186. float(threshold_old) / threshold_param_old['group'] * threshold_param_new['group']
  187. # 更新redis
  188. redis_helper.set_data_to_redis(key_name=key_name, value=threshold_new, expire_time=2 * 24 * 3600)
  189. log_.info(f"abtest_id = {abtest_id}, abtest_config_tag = {abtest_config_tag}, "
  190. f"abtest_group = {abtest_group}, group_key = {group_key}, "
  191. f"threshold_old = {threshold_old}, threshold_new = {threshold_new}")
  192. # 关怀模式实验阈值更新
  193. care_model = config_val.get('care_model', None)
  194. threshold_rate = config_val.get('threshold_rate', None)
  195. if care_model is True:
  196. care_model_key_name = \
  197. f"{config_.KEY_NAME_PREFIX_AD_THRESHOLD_CARE_MODEL}{abtest_id}:{abtest_config_tag}:{abtest_group}:{group_key}"
  198. care_model_threshold_old = redis_helper.get_data_from_redis(key_name=care_model_key_name)
  199. care_model_threshold_new = threshold_new * threshold_rate
  200. redis_helper.set_data_to_redis(key_name=care_model_key_name,
  201. value=care_model_threshold_new, expire_time=2 * 24 * 3600)
  202. log_.info(f"abtest_id = {abtest_id}, abtest_config_tag = {abtest_config_tag}, "
  203. f"abtest_group = {abtest_group}, group_key = {group_key}, "
  204. f"care_model_threshold_old = {care_model_threshold_old}, "
  205. f"care_model_threshold_new = {care_model_threshold_new}")
  206. def update_ad_abtest_threshold(project, table, dt, ad_abtest_abcode_config, ad_target_uv):
  207. # 获取当前阈值参数值
  208. threshold_record = redis_helper.get_data_from_redis(key_name=config_.KEY_NAME_PREFIX_AD_THRESHOLD_RECORD)
  209. threshold_record = eval(threshold_record)
  210. log_.info(f"threshold_record = {threshold_record}")
  211. # 获取uv数据
  212. feature_df = get_feature_data(project=project, table=table, features=features, dt=dt)
  213. feature_df['apptype'] = feature_df['apptype'].astype(int)
  214. feature_df['广告uv'] = feature_df['广告uv'].astype(float)
  215. # 根据广告uv变化计算新的阈值参数
  216. threshold_record_new, robot_msg_record = get_threshold_record_new_by_uv(
  217. ad_abtest_abcode_config=ad_abtest_abcode_config, feature_df=feature_df,
  218. threshold_record=threshold_record, ad_target_uv=ad_target_uv)
  219. log_.info(f"threshold_record_new = {threshold_record_new}")
  220. # 更新阈值
  221. update_threshold(threshold_record_old=threshold_record, threshold_record_new=threshold_record_new)
  222. # 更新阈值参数
  223. redis_helper.set_data_to_redis(key_name=config_.KEY_NAME_PREFIX_AD_THRESHOLD_RECORD,
  224. value=str(threshold_record_new), expire_time=2 * 24 * 3600)
  225. return robot_msg_record
  226. def get_ad_target_uv(now_h):
  227. """获取管理后台开启自动调整阈值开关的目标uv值"""
  228. ad_target_uv = {}
  229. result = request_get(request_url=config_.GET_AD_TARGET_UV_URL)
  230. if result is None:
  231. log_.info('获取管理后台广告目标uv值失败!')
  232. return ad_target_uv
  233. if result['code'] != 0:
  234. log_.info('获取管理后台广告目标uv值失败!')
  235. return ad_target_uv
  236. if not result['content']:
  237. return ad_target_uv
  238. for item in result['content']:
  239. app_type = item['productId']
  240. target_uv_mapping = {}
  241. target_uv_param = config_.AD_ABTEST_ABCODE_CONFIG.get(int(app_type)).get('target_uv_param', {})
  242. for uv_item in item['uvTargetDetails']:
  243. ab_group = uv_item['abParam']
  244. target_uv = uv_item['uvTarget']
  245. target_uv_param_group = target_uv_param.get(ab_group, None)
  246. if target_uv_param_group is not None:
  247. update_hours = target_uv_param_group.get('update_hours')
  248. update_param = target_uv_param_group.get('update_param')
  249. if now_h in update_hours:
  250. target_uv *= update_param
  251. target_uv_mapping[ab_group] = target_uv
  252. ad_target_uv[app_type] = target_uv_mapping
  253. return ad_target_uv
  254. def get_ad_uv_update_config(app_type, ab_group):
  255. """获取对应组自动调整阈值参数:梯度,最大步长"""
  256. now_h = datetime.datetime.now().hour
  257. update_threshold_params = config_.AD_ABTEST_ABCODE_CONFIG.get(int(app_type))
  258. gradient = update_threshold_params.get('gradient')
  259. max_update_step = update_threshold_params.get('max_update_step')
  260. target_uv_param = update_threshold_params.get('target_uv_param', {})
  261. target_uv_param_group = target_uv_param.get(ab_group, None)
  262. if target_uv_param_group is not None:
  263. special_update_config = target_uv_param_group.get('special_update_config', None)
  264. if special_update_config is not None:
  265. special_hours = special_update_config.get('special_hours', [])
  266. if now_h in special_hours:
  267. gradient = special_update_config.get('special_gradient')
  268. max_update_step = special_update_config.get('special_max_update_step')
  269. return gradient, max_update_step
  270. def timer_check():
  271. try:
  272. # 获取自动调整阈值参数
  273. ad_abtest_abcode_config = config_.AD_ABTEST_ABCODE_CONFIG
  274. # 自动调整阈值参数存储至redis
  275. redis_helper.set_data_to_redis(key_name=config_.KEY_NAME_PREFIX_AD_THRESHOLD_PARAM_RECORD,
  276. value=str(ad_abtest_abcode_config),
  277. expire_time=24 * 3600)
  278. project = config_.AD_THRESHOLD_AUTO_UPDATE_DATA.get('project')
  279. table = config_.AD_THRESHOLD_AUTO_UPDATE_DATA.get('table')
  280. now_date = datetime.datetime.today()
  281. now_h = datetime.datetime.now().hour
  282. now_min = datetime.datetime.now().minute
  283. log_.info(f"now_date: {datetime.datetime.strftime(now_date, '%Y%m%d%H')}")
  284. # 00:00 - 09:00 不做阈值参数调整
  285. if 0 <= now_h < 9:
  286. log_.info(f"00:00 - 09:00 不做阈值参数调整")
  287. return
  288. # 管理后台获取开启自动调整阈值开关的目标uv值
  289. ad_target_uv = get_ad_target_uv(now_h=now_h)
  290. log_.info(f"ad_target_uv: {ad_target_uv}")
  291. if len(ad_target_uv) == 0:
  292. return
  293. # 查看当前更新的数据是否已准备好
  294. dt = datetime.datetime.strftime(now_date - datetime.timedelta(hours=1), '%Y%m%d%H')
  295. data_count = data_check(project=project, table=table, dt=dt)
  296. if data_count > 0:
  297. log_.info(f"data count = {data_count}")
  298. # 数据准备好,进行更新
  299. robot_msg_record = update_ad_abtest_threshold(
  300. project=project, table=table, dt=dt,
  301. ad_abtest_abcode_config=ad_abtest_abcode_config, ad_target_uv=ad_target_uv)
  302. if len(robot_msg_record) > 0:
  303. robot_msg_record_text = "\n".join([str(item) for item in robot_msg_record])
  304. msg = f"threshold_param_update: \n{robot_msg_record_text.replace(', ', ', ')}\n"
  305. else:
  306. msg = "无需更新!\n"
  307. send_msg_to_feishu(
  308. webhook=config_.FEISHU_ROBOT['ad_threshold_auto_update_robot'].get('webhook'),
  309. key_word=config_.FEISHU_ROBOT['ad_threshold_auto_update_robot'].get('key_word'),
  310. msg_text=f"rov-offline{config_.ENV_TEXT} - 阈值更新完成!\n{msg}"
  311. )
  312. log_.info(f"threshold update end!")
  313. elif now_min > 30:
  314. log_.info('threshold update data is None!')
  315. send_msg_to_feishu(
  316. webhook=config_.FEISHU_ROBOT['ad_threshold_auto_update_robot'].get('webhook'),
  317. key_word=config_.FEISHU_ROBOT['ad_threshold_auto_update_robot'].get('key_word'),
  318. msg_text=f"rov-offline{config_.ENV_TEXT} - 阈值更新相关数据未准备好!\n"
  319. )
  320. else:
  321. # 数据没准备好,1分钟后重新检查
  322. Timer(60, timer_check).start()
  323. except Exception as e:
  324. log_.error(f"阈值更新失败, exception: {e}, traceback: {traceback.format_exc()}")
  325. send_msg_to_feishu(
  326. webhook=config_.FEISHU_ROBOT['ad_threshold_auto_update_robot'].get('webhook'),
  327. key_word=config_.FEISHU_ROBOT['ad_threshold_auto_update_robot'].get('key_word'),
  328. msg_text=f"rov-offline{config_.ENV_TEXT} - 阈值更新失败\n"
  329. f"exception: {e}\n"
  330. f"traceback: {traceback.format_exc()}"
  331. )
  332. if __name__ == '__main__':
  333. timer_check()