1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798 |
- # coding: utf-8
- import pandas as pd
- import time
- import gc
- begin_time = time.time()
- #df = pd.read_csv("./datas/item2vecVideo2.csv")
- #df = pd.read_csv("./datas/item2vecTzld1106.csv")
- #df = pd.read_csv("/root/xielixun/item2vec_app_20201126.csv")
- #df = pd.read_csv("/root/xielixun/item2vec_app_20201217.csv")
- #df = pd.read_csv("/work/xielixun/user_action_feature_item2vec_app20210221.csv")
- #df = pd.read_csv("/work/xielixun/user_action_feature_item2vec_app20210406.csv")
- df = pd.read_csv("/work/xielixun/user_action_feature_item2vec_app20210419.csv")
- df.head()
- df["rating"].mean()
- # 只取平均分以上的数据,作为喜欢的列表
- df = df[df["rating"] > df["rating"].mean()].copy()
- df.head(10)
- # 聚合得到userId,videoId列表
- df_group = df.sort_values(by='timestamp', ascending=False).groupby(['userId'])['videoId'].apply(lambda x: ' '.join([str(m) for m in x])).reset_index()
- df_group.head()
- #df_group.to_csv("./datas/tzld_uid_videoids.csv", index=False)
- #df_group.to_csv("./datas/tzld_uid_videoids_app_20201126.csv", index=False)
- #df_group.to_csv("./datas/tzld_uid_videoids_app_20201202.csv", index=False)
- #df_group.to_csv("./datas/tzld_uid_videoids_app_20201217.csv", index=False)
- #df_group.to_csv("./datas/tzld_uid_videoids_app_20210222.csv", index=False)
- #df_group.to_csv("./datas/tzld_uid_videoids_app_20210406.csv", index=False)
- df_group.to_csv("./datas/tzld_uid_videoids_app_20210419.csv", index=False)
- del df_group
- gc.collect()
- # ### 3. 使用Pyspark训练item2vec
- import findspark
- findspark.init()
- from pyspark.sql import SparkSession
- from pyspark import SparkConf
- conf = SparkConf()
- conf.setMaster("local").setAppName("item2Vec-lixun").\
- set("spark.submit.deployMode", "client").\
- set("spark.executor.memory", "19g").\
- set("spark.driver.memory", "19g").\
- set("spark.executor.cores", 10).\
- set("spark.driver.cores", 10).\
- set("spark.executor.instances", 10)
- spark = SparkSession.builder.config(conf=conf).getOrCreate()
- sc = spark.sparkContext
- #df = spark.read.csv("./datas/tzld_uid_videoids.csv", header=True)
- #df = spark.read.csv("./datas/tzld_uid_videoids_app_20201126.csv", header=True)
- #df = spark.read.csv("./datas/tzld_uid_videoids_app_20201202.csv", header=True)
- #df = spark.read.csv("./datas/tzld_uid_videoids_app_20201217.csv", header=True)
- #df = spark.read.csv("./datas/tzld_uid_videoids_app_20210222.csv", header=True)
- #df = spark.read.csv("./datas/tzld_uid_videoids_app_20210406.csv", header=True)
- df = spark.read.csv("./datas/tzld_uid_videoids_app_20210419.csv", header=True)
- df.show(15)
- from pyspark.sql import functions as F
- from pyspark.sql import types as T
- # 把非常的字符串格式变成LIST形式
- df = df.withColumn('video_ids', F.split(df.videoId, " "))
- # #### 实现word2vec的训练与转换
- from pyspark.ml.feature import Word2Vec
- word2Vec = Word2Vec(
- vectorSize=64,
- windowSize=5,
- maxIter=20,
- minCount=0,
- inputCol="video_ids",
- outputCol="video_2vec")
- model = word2Vec.fit(df)
- del df
- gc.collect()
- # 不计算每个user的embedding,而是计算item的embedding
- model.getVectors().show(3, truncate=False)
- # In[12]:
- #model.getVectors().select("word", "vector").toPandas().to_csv('./datas/tzld_video_embedding-1106-sort.csv', index=False)
- #model.getVectors().select("word", "vector").toPandas().to_csv('./datas/tzld_video_embedding-1126-sort.csv', index=False)
- #model.getVectors().select("word", "vector").toPandas().to_csv('./datas/tzld_video_embedding-1202-sort.csv', index=False)
- #model.getVectors().select("word", "vector").toPandas().to_csv('./datas/tzld_video_embedding-1217-sort.csv', index=False)
- #model.getVectors().select("word", "vector").toPandas().to_csv('./datas/tzld_video_embedding-210222-sort.csv', index=False)
- #model.getVectors().select("word", "vector").toPandas().to_csv('/work/xielixun/item2vec-java/tzld_video_embedding-210406-sort.csv', index=False)
- model.getVectors().select("word", "vector").toPandas().to_csv('/work/xielixun/item2vec-java/tzld_video_embedding-210419-sort.csv', index=False)
- print("train cost time is: " + str(time.time() - begin_time))
|