import math import paddle from net import DSSMLayer class StaticModel(): def __init__(self, config): self.cost = None self.config = config self._init_hyper_parameters() self.is_infer = False tag_features_dict = [ {"vid": 5034920}, {"video_style": 10382}, {"captions_color": 3000}, {"valid_time": 1054}, {"audience_age_group": 100}, {"cate2": 107}, {"audience_value_type": 105}, {"font_size": 109}, {"audience_gender": 107}, {"cover_persons_num": 105}, {"cate1": 102}, {"sentiment_tendency": 101}, {"video_type": 20}, {"background_music_type": 20}, {"captions": 3}, {"has_end_credit_guide": 20}, {"timeliness": 10} ] self.stat_features_num = 47 self.stat_features_num_embeddings = 100 self.tag_feature_num = len(tag_features_dict) self.tag_feature_nums = [list(d.values())[0] for d in tag_features_dict] print(f"debug debug data tag_feature_nums: {self.tag_feature_nums}") # 第一层的输入维度是所有特征的embedding拼接 self.input_dim = self.tag_feature_num + self.stat_features_num*3 print(f"debug debug data input dim: {self.input_dim}") def _init_hyper_parameters(self): # 修改超参数初始化 self.feature_nums = self.config.get("hyper_parameters.feature_nums", [5,5,5,5,5]) self.embedding_dim = self.config.get("hyper_parameters.embedding_dim", 8) self.output_dim = self.config.get("hyper_parameters.output_dim", 16) self.hidden_layers = self.config.get("hyper_parameters.hidden_layers", [40, 32]) self.hidden_acts = self.config.get("hyper_parameters.hidden_acts", ["relu", "relu"]) self.learning_rate = self.config.get("hyper_parameters.optimizer.learning_rate", 0.001) self.margin = self.config.get("hyper_parameters.margin", 0.0) # 用于损失函数的margin参数 self.feature_num = len(self.feature_nums) self.is_infer = self.config.get("hyper_parameters.is_infer", False) def create_feeds(self, is_infer=False): # 定义输入数据占位符 feeds_list = [] if not is_infer: label = paddle.static.data( name="label", shape=[-1, 1], dtype='float32') feeds_list.append(label) left_features = paddle.static.data( name="left_features", shape=[-1, self.input_dim], dtype='float32') feeds_list.append(left_features) right_features = paddle.static.data( name="right_features", shape=[-1, self.input_dim], dtype='float32') feeds_list.append(right_features) else: #sample_id = paddle.static.data( # name="sample_id", shape=[-1, 1], dtype='int64') #feeds_list.append(sample_id) left_features = paddle.static.data( name="left_features", shape=[-1, self.input_dim], dtype='float32') feeds_list.append(left_features) return feeds_list def net(self, input, is_infer=False): # 创建模型实例 dssm_model = DSSMLayer( tag_feature_nums=self.tag_feature_nums, stat_features_num = self.stat_features_num, stat_features_num_embeddings = self.stat_features_num_embeddings, embedding_dim=self.embedding_dim, output_dim=self.output_dim, hidden_layers=self.hidden_layers, hidden_acts=self.hidden_acts ) if is_infer: left_features = input[0] left_vec = dssm_model(left_features,None,is_infer=True) self.inference_target_var = left_vec fetch_dict = { 'left_vector': left_vec } return fetch_dict else: label,left_features, right_features = input # 获取相似度和特征向量 sim_score, left_vec, right_vec = dssm_model(left_features, right_features) self.inference_target_var = left_vec # self.left_vector = left_vec # self.right_vector = right_vec # 计算损失 # 使用带margin的二元交叉熵损失 pos_mask = paddle.cast(label > 0.5, 'float32') neg_mask = 1.0 - pos_mask positive_loss = -pos_mask * paddle.log(paddle.clip(sim_score, 1e-8, 1.0)) negative_loss = -neg_mask * paddle.log(paddle.clip(1 - sim_score + self.margin, 1e-8, 1.0)) loss = positive_loss + negative_loss avg_cost = paddle.mean(loss) self._cost = avg_cost # 计算accuracy predictions = paddle.cast(sim_score > 0.5, 'float32') accuracy = paddle.mean(paddle.cast(paddle.equal(predictions, label), 'float32')) fetch_dict = { 'loss': avg_cost, 'accuracy': accuracy, #'similarity': sim_score, #'left_vector': left_vec, #'right_vector': right_vec } return fetch_dict def create_optimizer(self, strategy=None,is_infer=False): optimizer = paddle.optimizer.Adam( learning_rate=self.learning_rate) if strategy is not None: import paddle.distributed.fleet as fleet optimizer = fleet.distributed_optimizer(optimizer, strategy) if is_infer: zero_var = paddle.zeros(shape=[1], dtype='float32') optimizer.minimize(paddle.mean(zero_var)) else: optimizer.minimize(self._cost) def infer_net(self, input): return self.net(input, is_infer=True)