import json import random import time import math import gevent from db_helper import RedisHelper from config import set_config from log import Log from gevent import monkey, pool monkey.patch_all() config_ = set_config() log_ = Log() redis_helper = RedisHelper() def thompson_process(creative_id): # 获取creative_id对应的Thompson参数 thompson_param_initial = redis_helper.get_data_from_redis(key_name=f"{config_.THOMPSON_PARAM_KEY_PREFIX}{creative_id}") if thompson_param_initial is None or thompson_param_initial == '': # 参数不存在,获取默认参数 thompson_param = redis_helper.get_data_from_redis(key_name=f"{config_.THOMPSON_PARAM_KEY_PREFIX}-1") param_alpha, param_beta = json.loads(thompson_param.strip()) param_alpha, param_beta = int(param_alpha), int(param_beta) random_flag = 'initial_random' else: # 参数存在 param_alpha, param_beta = json.loads(thompson_param_initial.strip()) param_alpha, param_beta = int(param_alpha), int(param_beta) if param_alpha + param_beta >= 100: # ad_idea_id 曝光数 >= 100,生成参数为(param_alpha+1, param_beta+1)的beta分布随机数 thompson_param = thompson_param_initial random_flag = 'beta' else: # ad_idea_id 曝光数 < 100,获取默认参数 thompson_param = redis_helper.get_data_from_redis(key_name=f"{config_.THOMPSON_PARAM_KEY_PREFIX}-1") param_alpha, param_beta = json.loads(thompson_param.strip()) param_alpha, param_beta = int(param_alpha), int(param_beta) random_flag = 'under_view_initial_random' # 生成参数为(param_alpha+1, param_beta+1)的beta分布随机数 alpha = math.log(param_alpha + 1) + 1 beta = math.log(param_beta + 1) + 1 score = random.betavariate(alpha=alpha, beta=beta) betavariate_param = [alpha, beta] thompson_res = [creative_id, score, thompson_param_initial, thompson_param, betavariate_param, random_flag] return thompson_res def get_creative_id_with_thompson(mid, creative_id_list, gevent_pool, sort_strategy): """利用Thompson采样获取此次要展示的广告创意ID""" # 限制协程最大并发数:20 tasks = [gevent_pool.spawn(thompson_process, creative_id) for creative_id in creative_id_list] gevent.joinall(tasks) thompson_res_list = [t.get() for t in tasks] # 按照score排序 thompson_res_rank = sorted(thompson_res_list, key=lambda x: x[1], reverse=True) rank_res = { 'mid': mid, 'creative_id': thompson_res_rank[0][0], 'score': thompson_res_rank[0][1], 'thompson_param_initial': thompson_res_rank[0][2], 'thompson_param': thompson_res_rank[0][3], 'betavariate_param': thompson_res_rank[0][4], 'random_flag': thompson_res_rank[0][5], 'sort_strategy': sort_strategy, 'thompson_res_rank': thompson_res_rank } return rank_res def get_creative_id_with_thompson_weight(mid, creative_id_list, gevent_pool, sort_strategy): """利用Thompson采样+cvr加权 获取此次要展示的广告创意ID""" tasks = [gevent_pool.spawn(thompson_process, creative_id) for creative_id in creative_id_list] gevent.joinall(tasks) thompson_res_list = [t.get() for t in tasks] # 获取creative_id对应cvr, 给定对应权重 # st_1 = time.time() cvr_mapping = {} creative_weight = {} key_list = [f"{config_.CREATIVE_CVR_KEY_PREFIX}{creative_id}" for creative_id in creative_id_list] cvr_list = [] name_list = [] for i in range(len(key_list)): if i % 20 == 0 and i != 0: print(len(name_list)) cvr_res = redis_helper.get_batch_key(name_list=name_list) cvr_list.extend(cvr_res) name_list = [key_list[i]] else: name_list.append(key_list[i]) if len(name_list) > 0: cvr_res = redis_helper.get_batch_key(name_list=name_list) cvr_list.extend(cvr_res) for i, creative_id in enumerate(creative_id_list): creative_weight[creative_id] = config_.CREATIVE_WEIGHT_INITIAL cvr = cvr_list[i] if cvr is None: continue try: cvr_mapping[creative_id] = float(cvr) except: continue # log_.info(f"st1: {(time.time() - st_1) * 1000}ms") # st_2 = time.time() cvr_sorted = sorted(cvr_mapping.items(), key=lambda x: x[1], reverse=False) for i, item in enumerate(cvr_sorted): creative_id = item[0] creative_weight[creative_id] += (i * config_.WEIGHT_GRADIENT) # log_.info(f"st2: {(time.time() - st_2) * 1000}ms") # 对有cvr的creative进行加权 # st_3 = time.time() thompson_weight_res_list = [] weight_sum = sum([weight for _, weight in creative_weight.items()]) for thompson_res in thompson_res_list: creative_id, score = thompson_res[0], thompson_res[1] weight = creative_weight[creative_id] if weight > config_.CREATIVE_WEIGHT_INITIAL: weight_score = score * (1 + weight / weight_sum) else: weight_score = score thompson_weight_res = thompson_res + [creative_weight[creative_id], weight_score] thompson_weight_res_list.append(thompson_weight_res) # log_.info(f"st3: {(time.time() - st_3) * 1000}ms") # 重新排序 thompson_res_rank = sorted(thompson_weight_res_list, key=lambda x: x[7], reverse=True) rank_res = { 'mid': mid, 'creative_id': thompson_res_rank[0][0], 'score': thompson_res_rank[0][1], 'thompson_param_initial': thompson_res_rank[0][2], 'thompson_param': thompson_res_rank[0][3], 'betavariate_param': thompson_res_rank[0][4], 'random_flag': thompson_res_rank[0][5], 'creative_weight': thompson_res_rank[0][6], 'weight_score': thompson_res_rank[0][7], 'sort_strategy': sort_strategy, 'thompson_res_rank': thompson_res_rank } return rank_res