# encoding: utf-8 import pandas as pd import os import gc import time from deepctr.feature_column import SparseFeat, VarLenSparseFeat, get_feature_names # from preprocess_tzld210322_gen import gen_data_set, gen_model_input, gen_model_input_user_emb # from preprocess_tzld210327_gen import gen_data_set, gen_model_input, gen_model_input_user_emb from preprocess_tzld210423_gen import gen_data_set, gen_model_input, gen_model_input_user_emb from sklearn.preprocessing import LabelEncoder from tensorflow.python.keras import backend as K from tensorflow.python.keras.models import Model import tensorflow as tf import numpy as np from deepmatch.models import * count_train = 1 count_test = 1 batch_size = 4096 def generate_arrays_from_train(train_set, user_profile, SEQ_LEN): # x_y 是我们的训练集包括标签,每一行的第一个是我们的图片路径,后面的是我们的独热化后的标签 while 1: for i in range(0, len(train_set), batch_size): try: train_batch = train_set[i: i + batch_size] train_model_input_batch, train_label_batch = gen_model_input(train_batch, user_profile, SEQ_LEN) if train_model_input_batch is None or train_label_batch is None: continue print("train i: " + str(i) + " len train set " + str(len(train_set))) print(train_model_input_batch) print(train_label_batch) yield (train_model_input_batch, train_label_batch) except Exception as ex: print("\n\n generate_arrays_from_train exception ", ex) continue def generate_arrays_from_train_bak(train_set, user_profile, SEQ_LEN): global count_train while True: try: train_batch = train_set[(count_train - 1) * batch_size: count_train * batch_size] train_model_input_batch, train_label_batch = gen_model_input(train_batch, user_profile, SEQ_LEN) if count_train % 1000 == 0: print("count:" + str(count_train) + " len train set " + str(len(train_set))) count_train = count_train + 1 if count_train * batch_size > len(train_set): count_train = 1 yield (train_model_input_batch, train_label_batch) except Exception as ex: print("\n\n generate_arrays_from_file exception ", ex) count_train = count_train + 1 continue def generate_arrays_from_test(train_set, user_profile, SEQ_LEN): # x_y 是我们的训练集包括标签,每一行的第一个是我们的图片路径,后面的是我们的独热化后的标签 while 1: for i in range(0, len(train_set), batch_size): try: train_batch = train_set[i: i + batch_size] # train_model_input_batch, train_label_batch = gen_model_input(train_batch, user_profile, SEQ_LEN) train_model_input_batch, train_label_batch = gen_model_input_user_emb(train_batch, user_profile, SEQ_LEN) if train_model_input_batch is None or train_label_batch is None: continue print("test i: " + str(i) + " len train set " + str(len(train_set))) yield (train_model_input_batch, train_label_batch) except Exception as ex: print("\n\n generate_arrays_from_test exception ", ex) continue def generate_arrays_from_test_bak(train_set, user_profile, SEQ_LEN): global count_test while True: try: train_batch = train_set[(count_test - 1) * batch_size: count_test * batch_size] train_model_input_batch, train_label_batch = gen_model_input_user_emb(train_batch, user_profile, SEQ_LEN) if count_test % 1000 == 0: print("count:" + str(count_test) + " len train set " + str(len(train_set))) count_test = count_test + 1 if count_test * batch_size > len(train_set): count_test = 1 yield (train_model_input_batch, train_label_batch) except Exception as ex: print("\n\n generate_arrays_from_file exception ", ex) count_test = count_test + 1 continue if __name__ == "__main__": begin_time = time.time() data = pd.read_csvdata = pd.read_csv("/work/xielixun/user_action_feature_wdl_recall_app20210422.csv") print(data[0:5]) sparse_features = ["videoid", "mid", "mobileos", "videoGenre1", "videoGenre2", "userRatedVideo1", "userRatedVideo2", "userRatedVideo3", "userGenre1", "userGenre2", "userCity", "authorid", "userRealplayCount", "videoRealPlayCount", "videoDuration", "videorealplayrate"] SEQ_LEN = 50 negsample = 3 features = ["videoid", "mid", "mobileos", "videoGenre1", "videoGenre2", "userRatedVideo1", "userRatedVideo2", "userRatedVideo3", "userGenre1", "userGenre2", "userCity", "authorid", "userRealplayCount", "videoRealPlayCount", "videoDuration", "videorealplayrate"] feature_max_idx = {} data["mid"].replace("unknown", "N000111111D", inplace=True) data = data[data["mid"] != "unknown"].copy() data = data[data["mid"] != "N000111111D"].copy() # 和上面函数的功能是一样的,见 deepMatch DSSM def add_index_column(param_df, column_name): values = list(param_df[column_name].unique()) value_index_dict = {value: idx for idx, value in enumerate(values)} if column_name == "mid": param_df["uidx"] = param_df[column_name].copy() param_df["mid"] = param_df[column_name].map(value_index_dict) + 1 feature_max_idx["mid"] = param_df["mid"].max() + 1 add_index_column(data, "mid") feature_max_idx["videoid"] = data["videoid"].max() + 1 for idx, column_name in enumerate(features): lbe = LabelEncoder() if column_name == "videoGenre1" or column_name == "videoGenre2" or \ column_name == "videoGenre3" or column_name == "userGenre1" or column_name == "userGenre2" or column_name == "userGenre3": data[column_name].fillna("社会", inplace=True) if column_name == "userCity": data[column_name].fillna("北京", inplace=True) if column_name == "mid": continue data["uidx"] = data[column_name].copy() data["mid"] = lbe.fit_transform(data[column_name]) feature_max_idx["mid"] = data["mid"].max() + 1 elif column_name == "videoid": # 负采样生成的videoid,没有离散化 continue data["vidx"] = data[column_name].copy() data["videoid"] = lbe.fit_transform(data[column_name]) feature_max_idx["videoid"] = data["videoid"].max() + 1 else: data[column_name] = lbe.fit_transform(data[column_name]) feature_max_idx[column_name] = data[column_name].max() + 1 user_profile = data[["uidx", "mid", "mobileos", "userRatedVideo1", "userRatedVideo2", "userRatedVideo3", "userGenre1", "userGenre2", "userCity", "userRealplayCount", "videoGenre1", "videoGenre2", "authorid", "videoRealPlayCount", "videoDuration", "videorealplayrate"]].drop_duplicates('mid') user_mid_uid = data[["uidx", "mid"]].drop_duplicates('mid') user_mid_uid.rename(columns={'mid': 'userid'}, inplace=True) item_profile = data[ ["videoid", "videoGenre1", "videoGenre2", "authorid", "videoRealPlayCount", "videoDuration", "videorealplayrate"]].drop_duplicates( 'videoid') print("item size is: ", len(item_profile)) user_profile.set_index("mid", inplace=True) print(data) print("\n\n after group by mid videoid") # print(data) del data gc.collect() # 按序列采样,没有加负采样,会再训练时的batch里进行batch负采样 # test_path = "/work/xielixun/test_user_video_play_test.csv" # train_path = "/work/xielixun/train_user_video_play.csv" # 负采样数据 # test_path = "/work/xielixun/test_user_video_play_test_sample_negtive.csv" # train_path = "/work/xielixun/train_user_video_play_sample_negtive.csv" test_path = "/work/xielixun/sampleNegtive/test_set_0422.csv" train_path = "/work/xielixun/sampleNegtive/train_set_0422.csv" train_set, test_set = gen_data_set(train_path, test_path, user_mid_uid) embedding_dim = 32 user_feature_columns = [SparseFeat('mid', feature_max_idx['mid'], embedding_dim), SparseFeat("mobileos", feature_max_idx["mobileos"], embedding_dim), SparseFeat("userRatedVideo1", feature_max_idx['userRatedVideo1'], embedding_dim), SparseFeat("userRatedVideo2", feature_max_idx['userRatedVideo2'], embedding_dim), SparseFeat("userRatedVideo3", feature_max_idx['userRatedVideo3'], embedding_dim), SparseFeat("userGenre1", feature_max_idx['userGenre1'], embedding_dim), SparseFeat("userGenre2", feature_max_idx['userGenre2'], embedding_dim), SparseFeat("userCity", feature_max_idx['userCity'], embedding_dim), SparseFeat("userRealplayCount", feature_max_idx['userRealplayCount'], embedding_dim), VarLenSparseFeat(SparseFeat('hist_video_id', feature_max_idx['videoid'], embedding_dim, embedding_name="videoid"), SEQ_LEN, 'mean', 'hist_len'), ] item_feature_columns = [SparseFeat('videoid', feature_max_idx['videoid'], embedding_dim), SparseFeat("videoGenre1", feature_max_idx["videoGenre1"], embedding_dim), SparseFeat("videoGenre2", feature_max_idx["videoGenre2"], embedding_dim), SparseFeat("authorid", feature_max_idx["authorid"], embedding_dim), SparseFeat("videoRealPlayCount", feature_max_idx["videoRealPlayCount"], embedding_dim), SparseFeat("videoDuration", feature_max_idx["videoDuration"], embedding_dim), SparseFeat("videorealplayrate", feature_max_idx["videorealplayrate"], embedding_dim)] feature_names = get_feature_names(user_feature_columns + item_feature_columns) print("\n\nfeature_names is: ") print(feature_names) # 因为下面这几行没有加,youtubeDNN导致了错误 tensorflow.python.framework.errors_impl.InvalidArgumentError: assertion failed: [predictions must be <= 1] [Condition x <= y did not hold element-wise:] [x (functional_1/sampled_softmax_layer/ExpandDims:0) = ] [[0.944198132][1.15184534][1.00592339]...] [y (Cast_4/x:0) = ] [1] K.set_learning_phase(True) import tensorflow as tf if tf.__version__ >= '2.0.0': tf.compat.v1.disable_eager_execution() model = DSSM(user_feature_columns, item_feature_columns) logdir = os.path.join("log_callbacks_dssm") # Tensorboard需要一个文件夹 if not os.path.exists(logdir): os.mkdir(logdir) output_model_file = os.path.join(logdir, 'dssm_model.h5') callbacks = [ tf.keras.callbacks.TensorBoard(logdir), tf.keras.callbacks.ModelCheckpoint(output_model_file, save_best_only=True), tf.keras.callbacks.EarlyStopping(patience=5, min_delta=1e-5), ] METRICS = [ tf.keras.metrics.TruePositives(name='tp'), tf.keras.metrics.FalsePositives(name='fp'), tf.keras.metrics.TrueNegatives(name='tn'), tf.keras.metrics.FalseNegatives(name='fn'), tf.keras.metrics.BinaryAccuracy(name='accuracy'), tf.keras.metrics.Precision(name='precision'), tf.keras.metrics.Recall(name='recall'), tf.keras.metrics.AUC(name='auc'), tf.keras.metrics.AUC(name='auc-ROC', curve='ROC'), tf.keras.metrics.AUC(name='auc-PRC', curve='PR') ] # compile the model, set loss function, optimizer and evaluation metrics model.compile( loss='binary_crossentropy', optimizer='adam', metrics=METRICS ) model.fit_generator(generate_arrays_from_train(train_set, user_profile, SEQ_LEN), steps_per_epoch=len(train_set) // batch_size, epochs=2, max_queue_size=10, workers=1, callbacks=callbacks, verbose=1, use_multiprocessing=False) model.save("./tensorflow_DSSM-027-tzld-1.h5") all_item_model_input = {"videoid": item_profile['videoid'].values, "videoGenre1": item_profile['videoGenre1'].values, "videoGenre2": item_profile['videoGenre2'].values, "authorid": item_profile['authorid'].values, "videoRealPlayCount": item_profile['videoRealPlayCount'].values, "videoDuration": item_profile['videoDuration'].values, "videorealplayrate": item_profile['videorealplayrate'].values} user_embedding_model = Model(inputs=model.user_input, outputs=model.user_embedding) item_embedding_model = Model(inputs=model.item_input, outputs=model.item_embedding) user_embs = user_embedding_model.predict_generator(generate_arrays_from_test(test_set, user_profile, SEQ_LEN), steps=len(test_set) // batch_size, max_queue_size=1, workers=1, callbacks=callbacks, verbose=1, use_multiprocessing=False) test_model_input_emb2, test_model_label2 = gen_model_input_user_emb(test_set, user_profile, SEQ_LEN) user_embs2 = user_embedding_model.predict(test_model_input_emb2, batch_size=2 ** 12) item_embs = item_embedding_model.predict(all_item_model_input, batch_size=2 ** 12) # test_set里带有mid的原始值,可以取出作为生成embedding的key mid user_embs = list(map(str, user_embs2)) user_res = {"mid": test_model_input_emb2["userid_org"], "emb": user_embs} user_embs_res = pd.DataFrame.from_dict(user_res) output_user = "./tensorflow_user_embedding-dssm-tzld-210423.csv" user_embs_res.to_csv(output_user, index=False) item_embs = list(map(str, item_embs)) item_res = {"videoid": all_item_model_input["videoid"], "emb": item_embs} item_embs_res = pd.DataFrame.from_dict(item_res) output_item = "./tensorflow_video_embedding-dssm-tzld-210423.csv" item_embs_res.to_csv(output_item, index=False) print("************ 21-04-23 train dssm cost time : " + str(time.time() - begin_time))