ad_threshold_auto_update.py 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
  1. import datetime
  2. import traceback
  3. import numpy as np
  4. from threading import Timer
  5. import pandas as pd
  6. from utils import RedisHelper, data_check, get_feature_data, send_msg_to_feishu
  7. from config import set_config
  8. from log import Log
  9. config_, _ = set_config()
  10. log_ = Log()
  11. redis_helper = RedisHelper()
  12. features = [
  13. 'apptype',
  14. 'adcode',
  15. 'visit_uv_today',
  16. 'visit_uv_yesterday',
  17. 'b'
  18. ]
  19. def get_threshold_record_new(ad_abtest_abcode_config, feature_df, threshold_record):
  20. """根据活跃人数变化计算新的阈值参数"""
  21. robot_msg_record = []
  22. threshold_record_new = threshold_record.copy()
  23. for app_type, config_params in ad_abtest_abcode_config.items():
  24. # 获取对应端的数据, 更新阈值参数
  25. # log_.info(f"app_type = {app_type}")
  26. temp_df = feature_df[feature_df['apptype'] == app_type]
  27. ab_test_id = config_params.get('ab_test_id')
  28. ab_test_config = config_params.get('ab_test_config')
  29. threshold_update = config_params.get('threshold_update')
  30. for config_name, ab_code_list in ab_test_config.items():
  31. ad_abtest_tag = f"{ab_test_id}-{config_name}"
  32. # log_.info(f"ad_abtest_tag = {ad_abtest_tag}")
  33. if len(ab_code_list) > 0:
  34. b_mean = temp_df[temp_df['adcode'].isin(ab_code_list)]['b'].mean()
  35. if b_mean < 0:
  36. # 阈值调高
  37. threshold_param_new = float(threshold_record.get(ad_abtest_tag)) + threshold_update
  38. elif b_mean > 0.1:
  39. # 阈值调低
  40. threshold_param_new = float(threshold_record.get(ad_abtest_tag)) - threshold_update
  41. else:
  42. continue
  43. if threshold_param_new > 0:
  44. threshold_record_new[ad_abtest_tag] = threshold_param_new
  45. robot_msg_record.append({'appType': app_type, 'ad_abtest_tag': ad_abtest_tag,
  46. 'param_old': float(threshold_record.get(ad_abtest_tag)),
  47. 'param_new': threshold_param_new})
  48. return threshold_record_new, robot_msg_record
  49. def update_threshold(threshold_record_old, threshold_record_new):
  50. """更新阈值"""
  51. ad_mid_group_list = [group for class_key, group_list in config_.AD_MID_GROUP.items()
  52. for group in group_list]
  53. ad_mid_group_list = list(set(ad_mid_group_list))
  54. for ad_abtest_tag, threshold_param_new in threshold_record_new.items():
  55. threshold_param_old = threshold_record_old.get(ad_abtest_tag)
  56. log_.info(f"ad_abtest_tag = {ad_abtest_tag}, "
  57. f"threshold_param_old = {threshold_param_old}, threshold_param_new = {threshold_param_new}")
  58. tag_list = ad_abtest_tag.split('-')
  59. for group_key in ad_mid_group_list:
  60. # 获取对应的阈值
  61. key_name = f"{config_.KEY_NAME_PREFIX_AD_THRESHOLD}{tag_list[0]}:{tag_list[1]}:{group_key}"
  62. threshold_old = redis_helper.get_data_from_redis(key_name=key_name)
  63. if threshold_old is None:
  64. continue
  65. # 计算新的阈值
  66. threshold_new = float(threshold_old) / threshold_param_old * threshold_param_new
  67. log_.info(f"ad_abtest_tag = {ad_abtest_tag}, group_key = {group_key}, "
  68. f"threshold_old = {threshold_old}, threshold_new = {threshold_new}")
  69. # 更新redis
  70. redis_helper.set_data_to_redis(key_name=key_name, value=threshold_new)
  71. def update_ad_abtest_threshold(project, table, dt, ad_abtest_abcode_config):
  72. # 获取当前阈值参数值
  73. threshold_record = redis_helper.get_data_from_redis(key_name=config_.KEY_NAME_PREFIX_AD_THRESHOLD_RECORD)
  74. threshold_record = eval(threshold_record)
  75. log_.info(f"threshold_record = {threshold_record}")
  76. # 获取uv数据
  77. feature_df = get_feature_data(project=project, table=table, features=features, dt=dt)
  78. feature_df['apptype'] = feature_df['apptype'].astype(int)
  79. feature_df['b'] = feature_df['b'].astype(float)
  80. # 根据活跃人数变化计算新的阈值参数
  81. threshold_record_new, robot_msg_record = get_threshold_record_new(
  82. ad_abtest_abcode_config=ad_abtest_abcode_config, feature_df=feature_df, threshold_record=threshold_record)
  83. log_.info(f"threshold_record_new = {threshold_record_new}")
  84. # 更新阈值
  85. update_threshold(threshold_record_old=threshold_record, threshold_record_new=threshold_record_new)
  86. # 更新阈值参数
  87. redis_helper.set_data_to_redis(key_name=config_.KEY_NAME_PREFIX_AD_THRESHOLD_RECORD,
  88. value=str(threshold_record_new))
  89. return robot_msg_record
  90. def timer_check():
  91. try:
  92. ad_abtest_abcode_config = config_.AD_ABTEST_ABCODE_CONFIG
  93. project = config_.AD_THRESHOLD_AUTO_UPDATE_DATA.get('project')
  94. table = config_.AD_THRESHOLD_AUTO_UPDATE_DATA.get('table')
  95. now_date = datetime.datetime.today()
  96. now_min = datetime.datetime.now().minute
  97. log_.info(f"now_date: {datetime.datetime.strftime(now_date, '%Y%m%d%H')}")
  98. dt = datetime.datetime.strftime(now_date - datetime.timedelta(hours=1), '%Y%m%d%H')
  99. # 查看当前更新的数据是否已准备好
  100. data_count = data_check(project=project, table=table, dt=dt)
  101. if data_count > 0:
  102. log_.info(f"data count = {data_count}")
  103. # 数据准备好,进行更新
  104. robot_msg_record = update_ad_abtest_threshold(
  105. project=project, table=table, dt=dt, ad_abtest_abcode_config=ad_abtest_abcode_config)
  106. if len(robot_msg_record) > 0:
  107. robot_msg_record_text = "\n".join([str(item) for item in robot_msg_record])
  108. msg = f"threshold_param_update: \n{robot_msg_record_text}\n"
  109. else:
  110. msg = "无需更新!\n"
  111. send_msg_to_feishu(
  112. webhook=config_.FEISHU_ROBOT['ad_threshold_auto_update_robot'].get('webhook'),
  113. key_word=config_.FEISHU_ROBOT['ad_threshold_auto_update_robot'].get('key_word'),
  114. msg_text=f"rov-offline{config_.ENV_TEXT} - 阈值更新完成!\n{msg}"
  115. )
  116. log_.info(f"threshold update end!")
  117. elif now_min > 30:
  118. log_.info('threshold update data is None!')
  119. send_msg_to_feishu(
  120. webhook=config_.FEISHU_ROBOT['ad_threshold_auto_update_robot'].get('webhook'),
  121. key_word=config_.FEISHU_ROBOT['ad_threshold_auto_update_robot'].get('key_word'),
  122. msg_text=f"rov-offline{config_.ENV_TEXT} - 阈值更新相关数据未准备好!\n"
  123. )
  124. else:
  125. # 数据没准备好,1分钟后重新检查
  126. Timer(60, timer_check).start()
  127. except Exception as e:
  128. log_.error(f"阈值更新失败, exception: {e}, traceback: {traceback.format_exc()}")
  129. send_msg_to_feishu(
  130. webhook=config_.FEISHU_ROBOT['ad_threshold_auto_update_robot'].get('webhook'),
  131. key_word=config_.FEISHU_ROBOT['ad_threshold_auto_update_robot'].get('key_word'),
  132. msg_text=f"rov-offline{config_.ENV_TEXT} - 阈值更新失败\n"
  133. f"exception: {e}\n"
  134. f"traceback: {traceback.format_exc()}"
  135. )
  136. if __name__ == '__main__':
  137. timer_check()