process_data.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. """
  2. process the data to satisfy the lightgbm
  3. """
  4. import sys
  5. import os
  6. import json
  7. from tqdm import tqdm
  8. import jieba.analyse
  9. sys.path.append(os.getcwd())
  10. from functions import generate_label_date, MysqlClient, MySQLClientSpider
  11. class DataProcessor(object):
  12. """
  13. Process the data to satisfy the lightGBM
  14. """
  15. def __init__(self, flag, c="useful"):
  16. self.client = MysqlClient()
  17. self.client_spider = MySQLClientSpider()
  18. self.flag = flag
  19. self.c = c
  20. def generate_train_label(self, item, y_ori_data, cate):
  21. """
  22. 生成训练数据,用 np.array矩阵的方式返回,
  23. :return: x_train, 训练数据, y_train, 训练 label
  24. """
  25. video_id = item["video_id"]
  26. dt = item["dt"]
  27. useful_features = [
  28. "uid",
  29. "type",
  30. "channel",
  31. "fans",
  32. "view_count_user_30days",
  33. "share_count_user_30days",
  34. "return_count_user_30days",
  35. "rov_user",
  36. "str_user",
  37. "out_user_id",
  38. "mode",
  39. "out_play_cnt",
  40. "out_like_cnt",
  41. "out_share_cnt",
  42. "out_collection_cnt",
  43. ]
  44. spider_features = [
  45. "channel",
  46. "out_user_id",
  47. "mode",
  48. "out_play_cnt",
  49. "out_like_cnt",
  50. "out_share_cnt"
  51. ]
  52. user_features = [
  53. "uid",
  54. "channel",
  55. "fans",
  56. "view_count_user_30days",
  57. "share_count_user_30days",
  58. "return_count_user_30days",
  59. "rov_user",
  60. "str_user"
  61. ]
  62. match self.c:
  63. case "useful":
  64. item_features = [item[i] for i in useful_features]
  65. case "user":
  66. if item['type'] == "userupload":
  67. item_features = [item[i] for i in user_features]
  68. else:
  69. return None, None
  70. case "spider":
  71. if item['type'] == "spider":
  72. item_features = [item[i] for i in spider_features]
  73. lop, duration = self.cal_lop(video_id)
  74. item_features.append(lop)
  75. item_features.append(duration)
  76. else:
  77. return None, None
  78. keywords_textrank = self.title_processor(video_id)
  79. if keywords_textrank:
  80. for i in range(3):
  81. try:
  82. item_features.append(keywords_textrank[i])
  83. except:
  84. item_features.append(None)
  85. else:
  86. item_features.append(None)
  87. item_features.append(None)
  88. item_features.append(None)
  89. label_dt = generate_label_date(dt)
  90. label_obj = y_ori_data.get(label_dt, {}).get(video_id)
  91. if label_obj:
  92. label = int(label_obj[cate]) if label_obj[cate] else 0
  93. else:
  94. label = 0
  95. return label, item_features
  96. def title_processor(self, video_id):
  97. """
  98. 通过 video_id 去获取title, 然后通过 title 再分词,把关键词作为 feature
  99. :param video_id: the video id
  100. :return: tag_list [tag, tag, tag, tag......]
  101. """
  102. sql = f"""SELECT title from wx_video where id = {video_id};"""
  103. try:
  104. title = self.client.select(sql)[0][0]
  105. keywords_textrank = jieba.analyse.textrank(title, topK=3)
  106. return list(keywords_textrank)
  107. except Exception as e:
  108. print(video_id, "\t", e)
  109. return []
  110. def cal_lop(self, video_id):
  111. """
  112. 通过视频 id 去爬虫表读取播放和点赞,并且求出like / play的值,要注意平滑,要注意分母为 0 的情况
  113. :param video_id:
  114. :return: lop
  115. """
  116. sql = f"""SELECT like_cnt, play_cnt, duration from crawler_video where video_id = '{video_id}';"""
  117. try:
  118. like_cnt, play_cnt, duration = self.client_spider.select(sql)[0]
  119. lop = (like_cnt + 700) / (play_cnt + 18000)
  120. return lop, duration
  121. except Exception as e:
  122. print(video_id, "\t", e)
  123. return 0, 0
  124. def producer(self, dt):
  125. """
  126. 生成数据
  127. :return:none
  128. """
  129. if self.flag == "train":
  130. x_path = "data/train_data/train_2024010100_2024031523.json"
  131. y_path = "data/train_data/daily-label-20240101-20240325.json"
  132. elif self.flag == "predict":
  133. x_path = "data/pred_data/pred_202403{}00_202403{}23.json".format(dt, dt)
  134. y_path = "data/train_data/daily-label-20240101-20240325.json"
  135. else:
  136. return
  137. with open(x_path) as f:
  138. x_data = json.loads(f.read())
  139. with open(y_path) as f:
  140. y_data = json.loads(f.read())
  141. cate_list = ["total_return"]
  142. for c in cate_list:
  143. x_list = []
  144. y_list = []
  145. for video_obj in tqdm(x_data):
  146. our_label, features = self.generate_train_label(video_obj, y_data, c)
  147. if features:
  148. x_list.append(features)
  149. y_list.append(our_label)
  150. with open("data/produce_data/x_data_{}_{}_{}_{}.json".format(c, self.flag, dt, self.c), "w") as f1:
  151. f1.write(json.dumps(x_list, ensure_ascii=False))
  152. with open("data/produce_data/y_data_{}_{}_{}_{}.json".format(c, self.flag, dt, self.c), "w") as f2:
  153. f2.write(json.dumps(y_list, ensure_ascii=False))
  154. if __name__ == "__main__":
  155. flag = int(input("please input method train or predict:\n "))
  156. if flag == 1:
  157. t = "train"
  158. D = DataProcessor(flag=t, c="spider")
  159. D.producer(dt="whole")
  160. else:
  161. t = "predict"
  162. D = DataProcessor(flag=t, c="spider")
  163. for d in range(16, 22):
  164. D.producer(d)