spark-item2Vec-tzld.py 4.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. # coding: utf-8
  2. import pandas as pd
  3. import time
  4. import gc
  5. begin_time = time.time()
  6. #df = pd.read_csv("./datas/item2vecVideo2.csv")
  7. #df = pd.read_csv("./datas/item2vecTzld1106.csv")
  8. #df = pd.read_csv("/root/xielixun/item2vec_app_20201126.csv")
  9. #df = pd.read_csv("/root/xielixun/item2vec_app_20201217.csv")
  10. #df = pd.read_csv("/work/xielixun/user_action_feature_item2vec_app20210221.csv")
  11. #df = pd.read_csv("/work/xielixun/user_action_feature_item2vec_app20210406.csv")
  12. df = pd.read_csv("/work/xielixun/user_action_feature_item2vec_app20210419.csv")
  13. df.head()
  14. df["rating"].mean()
  15. # 只取平均分以上的数据,作为喜欢的列表
  16. df = df[df["rating"] > df["rating"].mean()].copy()
  17. df.head(10)
  18. # 聚合得到userId,videoId列表
  19. df_group = df.sort_values(by='timestamp', ascending=False).groupby(['userId'])['videoId'].apply(lambda x: ' '.join([str(m) for m in x])).reset_index()
  20. df_group.head()
  21. #df_group.to_csv("./datas/tzld_uid_videoids.csv", index=False)
  22. #df_group.to_csv("./datas/tzld_uid_videoids_app_20201126.csv", index=False)
  23. #df_group.to_csv("./datas/tzld_uid_videoids_app_20201202.csv", index=False)
  24. #df_group.to_csv("./datas/tzld_uid_videoids_app_20201217.csv", index=False)
  25. #df_group.to_csv("./datas/tzld_uid_videoids_app_20210222.csv", index=False)
  26. #df_group.to_csv("./datas/tzld_uid_videoids_app_20210406.csv", index=False)
  27. df_group.to_csv("./datas/tzld_uid_videoids_app_20210419.csv", index=False)
  28. del df_group
  29. gc.collect()
  30. # ### 3. 使用Pyspark训练item2vec
  31. import findspark
  32. findspark.init()
  33. from pyspark.sql import SparkSession
  34. from pyspark import SparkConf
  35. conf = SparkConf()
  36. conf.setMaster("local").setAppName("item2Vec-lixun").\
  37. set("spark.submit.deployMode", "client").\
  38. set("spark.executor.memory", "19g").\
  39. set("spark.driver.memory", "19g").\
  40. set("spark.executor.cores", 10).\
  41. set("spark.driver.cores", 10).\
  42. set("spark.executor.instances", 10)
  43. spark = SparkSession.builder.config(conf=conf).getOrCreate()
  44. sc = spark.sparkContext
  45. #df = spark.read.csv("./datas/tzld_uid_videoids.csv", header=True)
  46. #df = spark.read.csv("./datas/tzld_uid_videoids_app_20201126.csv", header=True)
  47. #df = spark.read.csv("./datas/tzld_uid_videoids_app_20201202.csv", header=True)
  48. #df = spark.read.csv("./datas/tzld_uid_videoids_app_20201217.csv", header=True)
  49. #df = spark.read.csv("./datas/tzld_uid_videoids_app_20210222.csv", header=True)
  50. #df = spark.read.csv("./datas/tzld_uid_videoids_app_20210406.csv", header=True)
  51. df = spark.read.csv("./datas/tzld_uid_videoids_app_20210419.csv", header=True)
  52. df.show(15)
  53. from pyspark.sql import functions as F
  54. from pyspark.sql import types as T
  55. # 把非常的字符串格式变成LIST形式
  56. df = df.withColumn('video_ids', F.split(df.videoId, " "))
  57. # #### 实现word2vec的训练与转换
  58. from pyspark.ml.feature import Word2Vec
  59. word2Vec = Word2Vec(
  60. vectorSize=64,
  61. windowSize=5,
  62. maxIter=20,
  63. minCount=0,
  64. inputCol="video_ids",
  65. outputCol="video_2vec")
  66. model = word2Vec.fit(df)
  67. del df
  68. gc.collect()
  69. # 不计算每个user的embedding,而是计算item的embedding
  70. model.getVectors().show(3, truncate=False)
  71. # In[12]:
  72. #model.getVectors().select("word", "vector").toPandas().to_csv('./datas/tzld_video_embedding-1106-sort.csv', index=False)
  73. #model.getVectors().select("word", "vector").toPandas().to_csv('./datas/tzld_video_embedding-1126-sort.csv', index=False)
  74. #model.getVectors().select("word", "vector").toPandas().to_csv('./datas/tzld_video_embedding-1202-sort.csv', index=False)
  75. #model.getVectors().select("word", "vector").toPandas().to_csv('./datas/tzld_video_embedding-1217-sort.csv', index=False)
  76. #model.getVectors().select("word", "vector").toPandas().to_csv('./datas/tzld_video_embedding-210222-sort.csv', index=False)
  77. #model.getVectors().select("word", "vector").toPandas().to_csv('/work/xielixun/item2vec-java/tzld_video_embedding-210406-sort.csv', index=False)
  78. model.getVectors().select("word", "vector").toPandas().to_csv('/work/xielixun/item2vec-java/tzld_video_embedding-210419-sort.csv', index=False)
  79. print("train cost time is: " + str(time.time() - begin_time))