import random import gc import numpy as np import pandas as pd from tqdm import tqdm from tensorflow.python.keras.preprocessing.sequence import pad_sequences import time import threadpool from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED, FIRST_COMPLETED, as_completed import multiprocessing def gen_data_set_bak(data, negsample=0): data.sort_values("logtimestamp", inplace=True) label_list = list(data["label"]) label_list_b = [] item_ids = data['videoid'].unique() print(item_ids) user_ids = data['mid'].unique() print("\n\nuser size is: ", len(user_ids)) print("********* pre data len is: " + str(len(list(data["label"])))) print(data["label"].mean()) data = data[data["label"] > 0].copy() user_ids = data['mid'].unique() train_set = [] test_set = [] print("pre data len is: " + str(len(list(data["label"])))) for reviewerID, hist in tqdm(data.groupby('mid')): print("\n\nreviewerID : ", reviewerID) print("\n\nhist : \n", hist) # pos_list = hist['movie_id'].tolist() pos_list = hist['videoid'].tolist() rating_list = hist['rating'].tolist() if negsample > 0: candidate_set = list(set(item_ids) - set(pos_list)) neg_list = np.random.choice(candidate_set, size=len(pos_list)*negsample,replace=True) for i in range(1, len(pos_list)): hist = pos_list[:i] if i != len(pos_list) - 1: train_set.append((reviewerID, hist[::-1], pos_list[i], 1, len(hist[::-1]),rating_list[i])) # print("hist[::-1] is: ") # print(hist[::-1]) for negi in range(negsample): train_set.append((reviewerID, hist[::-1], neg_list[i*negsample+negi], 0,len(hist[::-1]))) else: test_set.append((reviewerID, hist[::-1], pos_list[i],1,len(hist[::-1]),rating_list[i])) random.shuffle(train_set) random.shuffle(test_set) print(len(train_set[0]), len(test_set[0])) print(len(train_set[0])) print(len(train_set), len(test_set)) return train_set, test_set all_task = list() # pool = multiprocessing.Pool(processes=3) def thread_func1(train_set, test_set, negsample, item_ids, reviewerID, hist): begin_time = time.time() print("\n\nreviewerID : ", reviewerID) pos_list = hist['videoid'].tolist() rating_list = hist['rating'].tolist() if negsample > 0: candidate_set = list(set(item_ids) - set(pos_list)) neg_list = np.random.choice(candidate_set, size=len(pos_list) * negsample, replace=True) for i in range(1, len(pos_list)): hist = pos_list[:i] if i != len(pos_list) - 1: train_set.append((reviewerID, hist[::-1], pos_list[i], 1, len(hist[::-1]), rating_list[i])) # print("hist[::-1] is: ") # print(hist[::-1]) for negi in range(negsample): train_set.append((reviewerID, hist[::-1], neg_list[i * negsample + negi], 0, len(hist[::-1]))) else: test_set.append((reviewerID, hist[::-1], pos_list[i], 1, len(hist[::-1]), rating_list[i])) print(str(reviewerID) + " idx process func cost time: " + str(time.time() - begin_time)) return train_set, test_set def thread_func2(negsample, item_ids, reviewerID, hist): begin_time = time.time() train_set = [] test_set = [] print("\n\nreviewerID : ", reviewerID) pos_list = hist['videoid'].tolist() rating_list = hist['rating'].tolist() if negsample > 0: candidate_set = list(set(item_ids) - set(pos_list)) neg_list = np.random.choice(candidate_set, size=len(pos_list) * negsample, replace=True) for i in range(1, len(pos_list)): hist = pos_list[:i] print("hist[] is: ") print(hist) if i != len(pos_list) - 1: train_set.append((reviewerID, hist[::-1], pos_list[i], 1, len(hist[::-1]), rating_list[i])) print("hist[::-1] is: ") print(hist[::-1]) for negi in range(negsample): train_set.append((reviewerID, hist[::-1], neg_list[i * negsample + negi], 0, len(hist[::-1]))) else: test_set.append((reviewerID, hist[::-1], pos_list[i], 1, len(hist[::-1]), rating_list[i])) print(str(reviewerID) + " idx process func cost time: " + str(time.time() - begin_time)) return train_set, test_set def gen_data_set(train_path="", test_path="", user_mid_uid=None): train_set2 = pd.read_csv(train_path) test_set2 = pd.read_csv(test_path) train_set2 = train_set2[train_set2["mid"] != "unknown"].copy() train_pd = pd.merge(left=train_set2, right=user_mid_uid, left_on="mid", right_on="uidx", # how="left") how="inner") # train_pd = train_pd.sample(frac=0.2) test_pd = pd.merge(left=test_set2, right=user_mid_uid, left_on="mid", right_on="uidx", # how="left") how="inner") #train_pd = train_pd.sample(frac=0.3) train_pd = train_pd.sample(frac=0.6) print("after sample train set len is: ") print(len(train_pd), len(test_pd)) return train_pd, test_pd def gen_data_set_sdm(data, seq_short_len=5, seq_prefer_len=50): data.sort_values("timestamp", inplace=True) train_set = [] test_set = [] for reviewerID, hist in tqdm(data.groupby('user_id')): pos_list = hist['movie_id'].tolist() genres_list = hist['genres'].tolist() rating_list = hist['rating'].tolist() for i in range(1, len(pos_list)): hist = pos_list[:i] genres_hist = genres_list[:i] if i <= seq_short_len and i != len(pos_list) - 1: train_set.append((reviewerID, hist[::-1], [0]*seq_prefer_len, pos_list[i], 1, len(hist[::-1]), 0, rating_list[i], genres_hist[::-1], [0]*seq_prefer_len)) elif i != len(pos_list) - 1: train_set.append((reviewerID, hist[::-1][:seq_short_len], hist[::-1][seq_short_len:], pos_list[i], 1, seq_short_len, len(hist[::-1])-seq_short_len, rating_list[i], genres_hist[::-1][:seq_short_len], genres_hist[::-1][seq_short_len:])) elif i <= seq_short_len and i == len(pos_list) - 1: test_set.append((reviewerID, hist[::-1], [0] * seq_prefer_len, pos_list[i], 1, len(hist[::-1]), 0, rating_list[i], genres_hist[::-1], [0]*seq_prefer_len)) else: test_set.append((reviewerID, hist[::-1][:seq_short_len], hist[::-1][seq_short_len:], pos_list[i], 1, seq_short_len, len(hist[::-1])-seq_short_len, rating_list[i], genres_hist[::-1][:seq_short_len], genres_hist[::-1][seq_short_len:])) random.shuffle(train_set) random.shuffle(test_set) print(len(train_set[0]), len(test_set[0])) return train_set, test_set def split(x): key_ans = x.split(' ') return list(map(int, key_ans)) def gen_model_input(train_set, user_profile=None, item_profile=None, seq_max_len=50): train_uid = np.array(list(train_set["userid"].values)) train_seq = list(map(split, train_set["hist_video_list"].values)) train_iid = np.array(list(train_set["pos_video_id"].values)) train_label = np.array(list(train_set["label"].values)) train_hist_len = np.array(list(train_set["hist_list_len"].values)) print("\n\n train_seq") print(type(train_seq)) # 补充和截断 post从后面,默认pre从前面 train_seq_pad = pad_sequences(train_seq, maxlen=seq_max_len, padding='post', truncating='post', value=0) print("\n\n train_seq_pad") print(type(train_seq_pad)) train_model_input = {"mid": train_uid, "videoid": train_iid, "hist_video_id": train_seq_pad, "hist_len": train_hist_len} for key in ["userRatedVideo1", "userGenre1", "userCity", "userRealplayCount", "videoGenre1", "authorid", "videoRealPlayCount", "videoDuration"]: try: train_model_input[key] = user_profile.loc[train_model_input['mid']][key].values except Exception as ex: print("\n\n gen_model_input exception ", ex) return None, None return train_model_input, train_label def gen_model_input_user_emb(train_set, user_profile=None, item_profile=None, seq_max_len=50): train_uid = np.array(list(train_set["userid"].values)) train_seq = list(map(split, train_set["hist_video_list"].values)) train_iid = np.array(list(train_set["pos_video_id"].values)) train_label = np.array(list(train_set["label"].values)) train_hist_len = np.array(list(train_set["hist_list_len"].values)) # 补充和截断 post从后面,默认pre从前面 train_seq_pad = pad_sequences(train_seq, maxlen=seq_max_len, padding='post', truncating='post', value=0) train_model_input = {"mid": train_uid, "videoid": train_iid, "hist_video_id": train_seq_pad, "hist_len": train_hist_len} for key in ["userRatedVideo1", "userGenre1", "userCity", "userRealplayCount"]: try: train_model_input[key] = user_profile.loc[train_model_input['mid']][key].values except Exception as ex: print("\n\n gen_model_input_user_emb exception ", ex) return None, None return train_model_input, train_label