import random import gc import numpy as np import pandas as pd from tqdm import tqdm from tensorflow.python.keras.preprocessing.sequence import pad_sequences import time import threadpool from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED, FIRST_COMPLETED, as_completed import multiprocessing def gen_data_set_bak(data, negsample=0): # data.sort_values("timestamp", inplace=True) data.sort_values("logtimestamp", inplace=True) label_list = list(data["label"]) label_list_b = [] item_ids = data['videoid'].unique() print(item_ids) user_ids = data['mid'].unique() print("\n\nuser size is: ", len(user_ids)) print("********* pre data len is: " + str(len(list(data["label"])))) print(data["label"].mean()) data = data[data["label"] > 0].copy() user_ids = data['mid'].unique() print("******** post user size is: ", len(user_ids)) print("data len is: " + str(len(list(data["label"])))) train_set = [] test_set = [] print("\n\n ******* data is: ") # print(data) # print(list(data.groupby('mid'))) print("pre data len is: " + str(len(list(data["label"])))) for reviewerID, hist in tqdm(data.groupby('mid')): print("\n\nreviewerID : ", reviewerID) print("\n\nhist : \n", hist) # pos_list = hist['movie_id'].tolist() pos_list = hist['videoid'].tolist() rating_list = hist['rating'].tolist() if negsample > 0: candidate_set = list(set(item_ids) - set(pos_list)) neg_list = np.random.choice(candidate_set, size=len(pos_list)*negsample,replace=True) for i in range(1, len(pos_list)): hist = pos_list[:i] if i != len(pos_list) - 1: train_set.append((reviewerID, hist[::-1], pos_list[i], 1, len(hist[::-1]),rating_list[i])) # print("hist[::-1] is: ") # print(hist[::-1]) for negi in range(negsample): train_set.append((reviewerID, hist[::-1], neg_list[i*negsample+negi], 0,len(hist[::-1]))) else: test_set.append((reviewerID, hist[::-1], pos_list[i],1,len(hist[::-1]),rating_list[i])) random.shuffle(train_set) random.shuffle(test_set) print(len(train_set[0]), len(test_set[0])) print(len(train_set[0])) print(len(train_set), len(test_set)) return train_set, test_set all_task = list() # pool = multiprocessing.Pool(processes=3) def thread_func1(train_set, test_set, negsample, item_ids, reviewerID, hist): begin_time = time.time() # train_set = [] # test_set = [] print("\n\nreviewerID : ", reviewerID) # hist = value_index_dict[reviewerID] # print("\n\nhist : \n", hist) # pos_list = hist['movie_id'].tolist() pos_list = hist['videoid'].tolist() rating_list = hist['rating'].tolist() if negsample > 0: candidate_set = list(set(item_ids) - set(pos_list)) neg_list = np.random.choice(candidate_set, size=len(pos_list) * negsample, replace=True) for i in range(1, len(pos_list)): hist = pos_list[:i] if i != len(pos_list) - 1: train_set.append((reviewerID, hist[::-1], pos_list[i], 1, len(hist[::-1]), rating_list[i])) # print("hist[::-1] is: ") # print(hist[::-1]) for negi in range(negsample): train_set.append((reviewerID, hist[::-1], neg_list[i * negsample + negi], 0, len(hist[::-1]))) else: test_set.append((reviewerID, hist[::-1], pos_list[i], 1, len(hist[::-1]), rating_list[i])) print(str(reviewerID) + " idx process func cost time: " + str(time.time() - begin_time)) return train_set, test_set def thread_func2(negsample, item_ids, reviewerID, hist): begin_time = time.time() train_set = [] test_set = [] print("\n\nreviewerID : ", reviewerID) # hist = value_index_dict[reviewerID] # print("\n\nhist : \n", hist) # pos_list = hist['movie_id'].tolist() pos_list = hist['videoid'].tolist() print("pos_list is: ") print(pos_list) rating_list = hist['rating'].tolist() if negsample > 0: candidate_set = list(set(item_ids) - set(pos_list)) neg_list = np.random.choice(candidate_set, size=len(pos_list) * negsample, replace=True) for i in range(1, len(pos_list)): hist = pos_list[:i] print("hist[] is: ") print(hist) if i != len(pos_list) - 1: train_set.append((reviewerID, hist[::-1], pos_list[i], 1, len(hist[::-1]), rating_list[i])) print("hist[::-1] is: ") print(hist[::-1]) for negi in range(negsample): train_set.append((reviewerID, hist[::-1], neg_list[i * negsample + negi], 0, len(hist[::-1]))) else: test_set.append((reviewerID, hist[::-1], pos_list[i], 1, len(hist[::-1]), rating_list[i])) print(str(reviewerID) + " idx process func cost time: " + str(time.time() - begin_time)) return train_set, test_set # def gen_data_set(data, negsample=0, train_path="", test_path="", user_mid_uid=None): def gen_data_set(train_path="", test_path="", user_mid_uid=None): # def gen_data_set(train_path="", test_path=""): # train_set2 = [] # test_set2 = [] train_set2 = pd.read_csv(train_path) test_set2 = pd.read_csv(test_path) train_set2 = train_set2[train_set2["mid"] != "unknown"].copy() test_set2 = test_set2[test_set2["mid"] != "unknown"].copy() print("train set len is: ") print(len(train_set2)) print("user mid uid len is: ", len(user_mid_uid)) train_pd = pd.merge(left=train_set2, right=user_mid_uid, left_on="mid", right_on="uidx", # how="left") how="inner") # train_pd = train_pd.sample(frac=0.2) test_pd = pd.merge(left=test_set2, right=user_mid_uid, left_on="mid", right_on="uidx", # how="left") how="inner") train_pd = train_pd.sample(frac=1.0) print("after sample train set len is: ") print(len(train_pd), len(test_pd)) print(len(train_set2), len(test_set2)) del train_set2, test_set2 gc.collect() return train_pd, test_pd def gen_data_set_sdm(data, seq_short_len=5, seq_prefer_len=50): data.sort_values("timestamp", inplace=True) train_set = [] test_set = [] for reviewerID, hist in tqdm(data.groupby('user_id')): pos_list = hist['movie_id'].tolist() genres_list = hist['genres'].tolist() rating_list = hist['rating'].tolist() for i in range(1, len(pos_list)): hist = pos_list[:i] genres_hist = genres_list[:i] if i <= seq_short_len and i != len(pos_list) - 1: train_set.append((reviewerID, hist[::-1], [0]*seq_prefer_len, pos_list[i], 1, len(hist[::-1]), 0, rating_list[i], genres_hist[::-1], [0]*seq_prefer_len)) elif i != len(pos_list) - 1: train_set.append((reviewerID, hist[::-1][:seq_short_len], hist[::-1][seq_short_len:], pos_list[i], 1, seq_short_len, len(hist[::-1])-seq_short_len, rating_list[i], genres_hist[::-1][:seq_short_len], genres_hist[::-1][seq_short_len:])) elif i <= seq_short_len and i == len(pos_list) - 1: test_set.append((reviewerID, hist[::-1], [0] * seq_prefer_len, pos_list[i], 1, len(hist[::-1]), 0, rating_list[i], genres_hist[::-1], [0]*seq_prefer_len)) else: test_set.append((reviewerID, hist[::-1][:seq_short_len], hist[::-1][seq_short_len:], pos_list[i], 1, seq_short_len, len(hist[::-1])-seq_short_len, rating_list[i], genres_hist[::-1][:seq_short_len], genres_hist[::-1][seq_short_len:])) random.shuffle(train_set) random.shuffle(test_set) print(len(train_set[0]), len(test_set[0])) return train_set, test_set def split(x): key_ans = x.split(' ') return list(map(int, key_ans)) def gen_model_input(train_set, user_profile=None, item_profile=None, seq_max_len=50): train_uid = np.array(list(train_set["userid"].values)) train_uid_org = np.array(list(train_set["mid"].values)) train_seq = list(map(split, train_set["hist_video_list"].values)) train_iid = np.array(list(train_set["pos_video_id"].values)) train_label = np.array(list(train_set["label"].values)) train_hist_len = np.array(list(train_set["hist_list_len"].values)) print("\n\n train_seq") print(type(train_seq)) # print(train_seq) # 补充和截断 post从后面,默认pre从前面 train_seq_pad = pad_sequences(train_seq, maxlen=seq_max_len, padding='post', truncating='post', value=0) print("\n\n train_seq_pad") print(type(train_seq_pad)) train_model_input = {"mid": train_uid, "videoid": train_iid, "hist_video_id": train_seq_pad, "hist_len": train_hist_len, "userid_org": train_uid_org} for key in ["mobileos", "userRatedVideo1", "userRatedVideo2", "userRatedVideo3", "userGenre1", "userGenre2", "userCity", "userRealplayCount", "videoGenre1", "videoGenre2", "authorid", "videoRealPlayCount", "videoDuration", "videorealplayrate"]: try: # train_model_input[key] = user_profile.loc[train_model_input['user_id']][key].values train_model_input[key] = user_profile.loc[train_model_input['mid']][key].values except Exception as ex: print("\n\n gen_model_input exception ", ex) return None, None return train_model_input, train_label def gen_model_input_user_emb(train_set, user_profile=None, item_profile=None, seq_max_len=50): train_uid = np.array(list(train_set["userid"].values)) train_uid_org = np.array(list(train_set["mid"].values)) train_seq = list(map(split, train_set["hist_video_list"].values)) train_iid = np.array(list(train_set["pos_video_id"].values)) train_label = np.array(list(train_set["label"].values)) train_hist_len = np.array(list(train_set["hist_list_len"].values)) print("\n\n train_seq") print(type(train_seq)) # print(train_seq) # 补充和截断 post从后面,默认pre从前面 train_seq_pad = pad_sequences(train_seq, maxlen=seq_max_len, padding='post', truncating='post', value=0) train_model_input = {"mid": train_uid, "videoid": train_iid, "hist_video_id": train_seq_pad, "hist_len": train_hist_len, "userid_org": train_uid_org} for key in ["mobileos", "userRatedVideo1", "userRatedVideo2", "userRatedVideo3", "userGenre1", "userGenre2", "userCity", "userRealplayCount"]: # "videoGenre1", "authorid", "videoRealPlayCount", "videoDuration"]: try: train_model_input[key] = user_profile.loc[train_model_input['mid']][key].values except Exception as ex: print("\n\n gen_model_input_user_emb exception ", ex) return None, None return train_model_input, train_label def gen_model_input_sdm(train_set, user_profile, seq_short_len, seq_prefer_len): train_uid = np.array([line[0] for line in train_set]) short_train_seq = [line[1] for line in train_set] prefer_train_seq = [line[2] for line in train_set] train_iid = np.array([line[3] for line in train_set]) train_label = np.array([line[4] for line in train_set]) train_short_len = np.array([line[5] for line in train_set]) train_prefer_len = np.array([line[6] for line in train_set]) short_train_seq_genres = np.array([line[8] for line in train_set]) prefer_train_seq_genres = np.array([line[9] for line in train_set]) train_short_item_pad = pad_sequences(short_train_seq, maxlen=seq_short_len, padding='post', truncating='post', value=0) train_prefer_item_pad = pad_sequences(prefer_train_seq, maxlen=seq_prefer_len, padding='post', truncating='post', value=0) train_short_genres_pad = pad_sequences(short_train_seq_genres, maxlen=seq_short_len, padding='post', truncating='post', value=0) train_prefer_genres_pad = pad_sequences(prefer_train_seq_genres, maxlen=seq_prefer_len, padding='post', truncating='post', value=0) train_model_input = {"user_id": train_uid, "movie_id": train_iid, "short_movie_id": train_short_item_pad, "prefer_movie_id": train_prefer_item_pad, "prefer_sess_length": train_prefer_len, "short_sess_length": train_short_len, 'short_genres': train_short_genres_pad, 'prefer_genres': train_prefer_genres_pad} for key in ["gender", "age", "occupation", "zip"]: train_model_input[key] = user_profile.loc[train_model_input['user_id']][key].values return train_model_input, train_label