import paddle import paddle.nn as nn import paddle.nn.functional as F import numpy as np class DSSMLayer(nn.Layer): def __init__(self, feature_nums=[5,5,5,5,5], embedding_dim=8, output_dim=16, hidden_layers=[40, 32], hidden_acts=["relu", "relu"]): super(DSSMLayer, self).__init__() self.feature_num = len(feature_nums) self.embedding_dim = embedding_dim self.output_dim = output_dim # 第一层的输入维度是所有特征的embedding拼接 self.hidden_layers = [self.feature_num * embedding_dim] + hidden_layers + [output_dim] self.hidden_acts = hidden_acts # 为每个特征创建对应维度的Embedding层 self.left_embeddings = nn.LayerList([ nn.Embedding( num_embeddings=feature_nums[i], embedding_dim=embedding_dim, weight_attr=paddle.ParamAttr( initializer=paddle.nn.initializer.XavierNormal() ) ) for i in range(self.feature_num) ]) self.right_embeddings = nn.LayerList([ nn.Embedding( num_embeddings=feature_nums[i], embedding_dim=embedding_dim, weight_attr=paddle.ParamAttr( initializer=paddle.nn.initializer.XavierNormal() ) ) for i in range(self.feature_num) ]) # 左视频塔 self._left_tower = [] for i in range(len(self.hidden_layers) - 1): linear = paddle.nn.Linear( in_features=self.hidden_layers[i], out_features=self.hidden_layers[i + 1], weight_attr=paddle.ParamAttr( initializer=paddle.nn.initializer.XavierNormal() ), bias_attr=paddle.ParamAttr( initializer=paddle.nn.initializer.Constant(value=0.0) ) ) self.add_sublayer('left_linear_%d' % i, linear) self._left_tower.append(linear) if i < len(hidden_acts) and self.hidden_acts[i] == "relu": act = paddle.nn.ReLU() self.add_sublayer('left_act_%d' % i, act) self._left_tower.append(act) # 右视频塔 self._right_tower = [] for i in range(len(self.hidden_layers) - 1): linear = paddle.nn.Linear( in_features=self.hidden_layers[i], out_features=self.hidden_layers[i + 1], weight_attr=paddle.ParamAttr( initializer=paddle.nn.initializer.XavierNormal() ), bias_attr=paddle.ParamAttr( initializer=paddle.nn.initializer.Constant(value=0.0) ) ) self.add_sublayer('right_linear_%d' % i, linear) self._right_tower.append(linear) if i < len(hidden_acts) and self.hidden_acts[i] == "relu": act = paddle.nn.ReLU() self.add_sublayer('right_act_%d' % i, act) self._right_tower.append(act) def _process_features(self, features, embeddings): # 将每个特征转换为embedding embedded_features = [] for i in range(self.feature_num): feature = paddle.slice( features, axes=[1], starts=[i], ends=[i+1] ) feature = paddle.cast(feature, dtype='int64') embedded = embeddings[i](feature) paddle.static.Print(embedded, message="lqc debug _process_features embedded {i}:") embedded_features.append(embedded) # 将所有embedding连接起来 return paddle.concat(embedded_features, axis=1) def forward(self, left_features, right_features): # 获取两个视频的特征表示 left_vec, right_vec = self.get_vectors(left_features, right_features) # 计算相似度 sim_score = F.cosine_similarity( left_vec, right_vec, axis=1 ).reshape([-1, 1]) return sim_score, left_vec, right_vec def get_vectors(self, left_features, right_features): """获取两个视频的16维特征向量""" # 处理左视频特征 left_embedded = self._process_features(left_features, self.left_embeddings) paddle.static.Print(left_embedded, message="lqc left_embedded shape:") left_vec = left_embedded paddle.static.Print(left_vec, message=f"lqc lqc left_vec {i}:") for i, layer in enumerate(self._left_tower): left_vec = layer(left_vec) paddle.static.Print(left_vec, message=f"After layer {i}:") # 处理右视频特征 right_embedded = self._process_features(right_features, self.right_embeddings) right_vec = right_embedded for layer in self._right_tower: right_vec = layer(right_vec) # 确保输出是L2归一化的 left_vec = F.normalize(left_vec, p=2, axis=1) right_vec = F.normalize(right_vec, p=2, axis=1) return left_vec, right_vec