# coding: utf-8 import pandas as pd import time import gc begin_time = time.time() #df = pd.read_csv("./datas/item2vecVideo2.csv") #df = pd.read_csv("./datas/item2vecTzld1106.csv") #df = pd.read_csv("/root/xielixun/item2vec_app_20201126.csv") #df = pd.read_csv("/root/xielixun/item2vec_app_20201217.csv") #df = pd.read_csv("/work/xielixun/user_action_feature_item2vec_app20210221.csv") #df = pd.read_csv("/work/xielixun/user_action_feature_item2vec_app20210406.csv") df = pd.read_csv("/work/xielixun/user_action_feature_item2vec_app20210419.csv") df.head() df["rating"].mean() # 只取平均分以上的数据,作为喜欢的列表 df = df[df["rating"] > df["rating"].mean()].copy() df.head(10) # 聚合得到userId,videoId列表 df_group = df.sort_values(by='timestamp', ascending=False).groupby(['userId'])['videoId'].apply(lambda x: ' '.join([str(m) for m in x])).reset_index() df_group.head() #df_group.to_csv("./datas/tzld_uid_videoids.csv", index=False) #df_group.to_csv("./datas/tzld_uid_videoids_app_20201126.csv", index=False) #df_group.to_csv("./datas/tzld_uid_videoids_app_20201202.csv", index=False) #df_group.to_csv("./datas/tzld_uid_videoids_app_20201217.csv", index=False) #df_group.to_csv("./datas/tzld_uid_videoids_app_20210222.csv", index=False) #df_group.to_csv("./datas/tzld_uid_videoids_app_20210406.csv", index=False) df_group.to_csv("./datas/tzld_uid_videoids_app_20210419.csv", index=False) del df_group gc.collect() # ### 3. 使用Pyspark训练item2vec import findspark findspark.init() from pyspark.sql import SparkSession from pyspark import SparkConf conf = SparkConf() conf.setMaster("local").setAppName("item2Vec-lixun").\ set("spark.submit.deployMode", "client").\ set("spark.executor.memory", "19g").\ set("spark.driver.memory", "19g").\ set("spark.executor.cores", 10).\ set("spark.driver.cores", 10).\ set("spark.executor.instances", 10) spark = SparkSession.builder.config(conf=conf).getOrCreate() sc = spark.sparkContext #df = spark.read.csv("./datas/tzld_uid_videoids.csv", header=True) #df = spark.read.csv("./datas/tzld_uid_videoids_app_20201126.csv", header=True) #df = spark.read.csv("./datas/tzld_uid_videoids_app_20201202.csv", header=True) #df = spark.read.csv("./datas/tzld_uid_videoids_app_20201217.csv", header=True) #df = spark.read.csv("./datas/tzld_uid_videoids_app_20210222.csv", header=True) #df = spark.read.csv("./datas/tzld_uid_videoids_app_20210406.csv", header=True) df = spark.read.csv("./datas/tzld_uid_videoids_app_20210419.csv", header=True) df.show(15) from pyspark.sql import functions as F from pyspark.sql import types as T # 把非常的字符串格式变成LIST形式 df = df.withColumn('video_ids', F.split(df.videoId, " ")) # #### 实现word2vec的训练与转换 from pyspark.ml.feature import Word2Vec word2Vec = Word2Vec( vectorSize=64, windowSize=5, maxIter=20, minCount=0, inputCol="video_ids", outputCol="video_2vec") model = word2Vec.fit(df) del df gc.collect() # 不计算每个user的embedding,而是计算item的embedding model.getVectors().show(3, truncate=False) # In[12]: #model.getVectors().select("word", "vector").toPandas().to_csv('./datas/tzld_video_embedding-1106-sort.csv', index=False) #model.getVectors().select("word", "vector").toPandas().to_csv('./datas/tzld_video_embedding-1126-sort.csv', index=False) #model.getVectors().select("word", "vector").toPandas().to_csv('./datas/tzld_video_embedding-1202-sort.csv', index=False) #model.getVectors().select("word", "vector").toPandas().to_csv('./datas/tzld_video_embedding-1217-sort.csv', index=False) #model.getVectors().select("word", "vector").toPandas().to_csv('./datas/tzld_video_embedding-210222-sort.csv', index=False) #model.getVectors().select("word", "vector").toPandas().to_csv('/work/xielixun/item2vec-java/tzld_video_embedding-210406-sort.csv', index=False) model.getVectors().select("word", "vector").toPandas().to_csv('/work/xielixun/item2vec-java/tzld_video_embedding-210419-sort.csv', index=False) print("train cost time is: " + str(time.time() - begin_time))