spark-item2Vec-tzld.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899
  1. #!/usr/bin/env python
  2. # coding: utf-8
  3. import pandas as pd
  4. import time
  5. import gc
  6. begin_time = time.time()
  7. #df = pd.read_csv("./datas/item2vecVideo2.csv")
  8. #df = pd.read_csv("./datas/item2vecTzld1106.csv")
  9. #df = pd.read_csv("/root/xielixun/item2vec_app_20201126.csv")
  10. #df = pd.read_csv("/root/xielixun/item2vec_app_20201217.csv")
  11. #df = pd.read_csv("/work/xielixun/user_action_feature_item2vec_app20210221.csv")
  12. #df = pd.read_csv("/work/xielixun/user_action_feature_item2vec_app20210406.csv")
  13. df = pd.read_csv("/work/xielixun/user_action_feature_item2vec_app20210419.csv")
  14. df.head()
  15. df["rating"].mean()
  16. # 只取平均分以上的数据,作为喜欢的列表
  17. df = df[df["rating"] > df["rating"].mean()].copy()
  18. df.head(10)
  19. # 聚合得到userId,videoId列表
  20. df_group = df.sort_values(by='timestamp', ascending=False).groupby(['userId'])['videoId'].apply(lambda x: ' '.join([str(m) for m in x])).reset_index()
  21. df_group.head()
  22. #df_group.to_csv("./datas/tzld_uid_videoids.csv", index=False)
  23. #df_group.to_csv("./datas/tzld_uid_videoids_app_20201126.csv", index=False)
  24. #df_group.to_csv("./datas/tzld_uid_videoids_app_20201202.csv", index=False)
  25. #df_group.to_csv("./datas/tzld_uid_videoids_app_20201217.csv", index=False)
  26. #df_group.to_csv("./datas/tzld_uid_videoids_app_20210222.csv", index=False)
  27. #df_group.to_csv("./datas/tzld_uid_videoids_app_20210406.csv", index=False)
  28. df_group.to_csv("./datas/tzld_uid_videoids_app_20210419.csv", index=False)
  29. del df_group
  30. gc.collect()
  31. # ### 3. 使用Pyspark训练item2vec
  32. import findspark
  33. findspark.init()
  34. from pyspark.sql import SparkSession
  35. from pyspark import SparkConf
  36. conf = SparkConf()
  37. conf.setMaster("local").setAppName("item2Vec-lixun").\
  38. set("spark.submit.deployMode", "client").\
  39. set("spark.executor.memory", "19g").\
  40. set("spark.driver.memory", "19g").\
  41. set("spark.executor.cores", 10).\
  42. set("spark.driver.cores", 10).\
  43. set("spark.executor.instances", 10)
  44. spark = SparkSession.builder.config(conf=conf).getOrCreate()
  45. sc = spark.sparkContext
  46. #df = spark.read.csv("./datas/tzld_uid_videoids.csv", header=True)
  47. #df = spark.read.csv("./datas/tzld_uid_videoids_app_20201126.csv", header=True)
  48. #df = spark.read.csv("./datas/tzld_uid_videoids_app_20201202.csv", header=True)
  49. #df = spark.read.csv("./datas/tzld_uid_videoids_app_20201217.csv", header=True)
  50. #df = spark.read.csv("./datas/tzld_uid_videoids_app_20210222.csv", header=True)
  51. #df = spark.read.csv("./datas/tzld_uid_videoids_app_20210406.csv", header=True)
  52. df = spark.read.csv("./datas/tzld_uid_videoids_app_20210419.csv", header=True)
  53. df.show(15)
  54. # In[9]:
  55. from pyspark.sql import functions as F
  56. from pyspark.sql import types as T
  57. # 把非常的字符串格式变成LIST形式
  58. df = df.withColumn('video_ids', F.split(df.videoId, " "))
  59. # #### 实现word2vec的训练与转换
  60. from pyspark.ml.feature import Word2Vec
  61. word2Vec = Word2Vec(
  62. vectorSize=64,
  63. windowSize=5,
  64. maxIter=20,
  65. minCount=0,
  66. inputCol="video_ids",
  67. outputCol="video_2vec")
  68. model = word2Vec.fit(df)
  69. del df
  70. gc.collect()
  71. # 不计算每个user的embedding,而是计算item的embedding
  72. model.getVectors().show(3, truncate=False)
  73. # In[12]:
  74. #model.getVectors().select("word", "vector").toPandas().to_csv('./datas/tzld_video_embedding-1106-sort.csv', index=False)
  75. #model.getVectors().select("word", "vector").toPandas().to_csv('./datas/tzld_video_embedding-1126-sort.csv', index=False)
  76. #model.getVectors().select("word", "vector").toPandas().to_csv('./datas/tzld_video_embedding-1202-sort.csv', index=False)
  77. #model.getVectors().select("word", "vector").toPandas().to_csv('./datas/tzld_video_embedding-1217-sort.csv', index=False)
  78. #model.getVectors().select("word", "vector").toPandas().to_csv('./datas/tzld_video_embedding-210222-sort.csv', index=False)
  79. #model.getVectors().select("word", "vector").toPandas().to_csv('/work/xielixun/item2vec-java/tzld_video_embedding-210406-sort.csv', index=False)
  80. model.getVectors().select("word", "vector").toPandas().to_csv('/work/xielixun/item2vec-java/tzld_video_embedding-210419-sort.csv', index=False)
  81. print("train cost time is: " + str(time.time() - begin_time))