123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137 |
- import paddle
- import paddle.nn as nn
- import paddle.nn.functional as F
- import numpy as np
- class DSSMLayer(nn.Layer):
- def __init__(self, feature_nums=[5,5,5,5,5], embedding_dim=8, output_dim=16,
- hidden_layers=[40, 32], hidden_acts=["relu", "relu"]):
- super(DSSMLayer, self).__init__()
-
- self.feature_num = len(feature_nums)
- self.embedding_dim = embedding_dim
- self.output_dim = output_dim
- # 第一层的输入维度是所有特征的embedding拼接
- self.hidden_layers = [self.feature_num * embedding_dim] + hidden_layers + [output_dim]
- self.hidden_acts = hidden_acts
-
-
- # 为每个特征创建对应维度的Embedding层
- self.left_embeddings = nn.LayerList([
- nn.Embedding(
- num_embeddings=feature_nums[i],
- embedding_dim=embedding_dim,
- weight_attr=paddle.ParamAttr(
- initializer=paddle.nn.initializer.XavierNormal()
- )
- ) for i in range(self.feature_num)
- ])
- self.right_embeddings = nn.LayerList([
- nn.Embedding(
- num_embeddings=feature_nums[i],
- embedding_dim=embedding_dim,
- weight_attr=paddle.ParamAttr(
- initializer=paddle.nn.initializer.XavierNormal()
- )
- ) for i in range(self.feature_num)
- ])
- # 左视频塔
- self._left_tower = []
- for i in range(len(self.hidden_layers) - 1):
- linear = paddle.nn.Linear(
- in_features=self.hidden_layers[i],
- out_features=self.hidden_layers[i + 1],
- weight_attr=paddle.ParamAttr(
- initializer=paddle.nn.initializer.XavierNormal()
- ),
- bias_attr=paddle.ParamAttr(
- initializer=paddle.nn.initializer.Constant(value=0.0)
- )
- )
- self.add_sublayer('left_linear_%d' % i, linear)
- self._left_tower.append(linear)
-
- if i < len(hidden_acts) and self.hidden_acts[i] == "relu":
- act = paddle.nn.ReLU()
- self.add_sublayer('left_act_%d' % i, act)
- self._left_tower.append(act)
- # 右视频塔
- self._right_tower = []
- for i in range(len(self.hidden_layers) - 1):
- linear = paddle.nn.Linear(
- in_features=self.hidden_layers[i],
- out_features=self.hidden_layers[i + 1],
- weight_attr=paddle.ParamAttr(
- initializer=paddle.nn.initializer.XavierNormal()
- ),
- bias_attr=paddle.ParamAttr(
- initializer=paddle.nn.initializer.Constant(value=0.0)
- )
- )
- self.add_sublayer('right_linear_%d' % i, linear)
- self._right_tower.append(linear)
-
- if i < len(hidden_acts) and self.hidden_acts[i] == "relu":
- act = paddle.nn.ReLU()
- self.add_sublayer('right_act_%d' % i, act)
- self._right_tower.append(act)
- def _process_features(self, features, embeddings):
- # 将每个特征转换为embedding
- embedded_features = []
- for i in range(self.feature_num):
- feature = paddle.slice(
- features,
- axes=[1],
- starts=[i],
- ends=[i+1]
- )
- feature = paddle.cast(feature, dtype='int64')
- embedded = embeddings[i](feature)
- paddle.static.Print(embedded, message="lqc debug _process_features embedded {i}:")
- embedded_features.append(embedded)
-
- # 将所有embedding连接起来
-
- return paddle.concat(embedded_features, axis=1)
- def forward(self, left_features, right_features):
- # 获取两个视频的特征表示
- left_vec, right_vec = self.get_vectors(left_features, right_features)
-
- # 计算相似度
- sim_score = F.cosine_similarity(
- left_vec,
- right_vec,
- axis=1
- ).reshape([-1, 1])
- return sim_score, left_vec, right_vec
- def get_vectors(self, left_features, right_features):
- """获取两个视频的16维特征向量"""
- # 处理左视频特征
-
- left_embedded = self._process_features(left_features, self.left_embeddings)
- paddle.static.Print(left_embedded, message="lqc left_embedded shape:")
- left_vec = left_embedded
- paddle.static.Print(left_vec, message=f"lqc lqc left_vec {i}:")
- for i, layer in enumerate(self._left_tower):
- left_vec = layer(left_vec)
- paddle.static.Print(left_vec, message=f"After layer {i}:")
-
- # 处理右视频特征
- right_embedded = self._process_features(right_features, self.right_embeddings)
- right_vec = right_embedded
- for layer in self._right_tower:
- right_vec = layer(right_vec)
-
- # 确保输出是L2归一化的
- left_vec = F.normalize(left_vec, p=2, axis=1)
- right_vec = F.normalize(right_vec, p=2, axis=1)
-
- return left_vec, right_vec
|