process_data_for_lightgbm.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183
  1. import datetime
  2. import sys
  3. import os
  4. import argparse
  5. import numpy as np
  6. from tqdm import tqdm
  7. import jieba.analyse
  8. import pandas as pd
  9. sys.path.append(os.getcwd())
  10. from functions import MySQLClientSpider
  11. class SpiderProcess(object):
  12. """
  13. Spider Data Process and Process data for lightgbm training
  14. """
  15. def __init__(self):
  16. self.client_spider = MySQLClientSpider()
  17. self.spider_features = [
  18. "channel",
  19. "out_user_id",
  20. "mode",
  21. "out_play_cnt",
  22. "out_like_cnt",
  23. "out_share_cnt"
  24. ]
  25. def spider_lop(self, video_id):
  26. """
  27. Spider lop = like / play
  28. :param video_id:
  29. :return:
  30. """
  31. sql = f"""SELECT like_cnt, play_cnt, duration from crawler_video where video_id = '{video_id}';"""
  32. try:
  33. like_cnt, play_cnt, duration = self.client_spider.select(sql)[0]
  34. lop = (like_cnt + 700) / (play_cnt + 18000)
  35. return lop, duration
  36. except Exception as e:
  37. print(video_id, "\t", e)
  38. return 0, 0
  39. def spider_data_produce(self, flag, dt_time):
  40. """
  41. 从数据库中读取爬虫数据,并且转化为 dataframe, 存储到本地,用作训练和预测
  42. :return:
  43. """
  44. dt_time = datetime.datetime.strptime(dt_time, "%Y%m%d")
  45. three_date_before = dt_time + datetime.timedelta(days=4)
  46. temp_time = three_date_before.strftime("%Y%m%d")
  47. if flag == "train":
  48. select_sql = f"""SELECT video_id, video_title, label, channel, out_user_id, spider_mode, out_play_cnt, out_like_cnt, out_share_cnt FROM lightgbm_data WHERE type = 'spider' and daily_dt_str <= '{temp_time}' order by daily_dt_str;"""
  49. des_path = "/root/luojunhui/alg/data/train_data/spider_train_{}.json".format(
  50. datetime.datetime.today().strftime("%Y%m%d"))
  51. elif flag == "predict":
  52. select_sql = f"""SELECT video_id, video_title, label, channel, out_user_id, spider_mode, out_play_cnt, out_like_cnt, out_share_cnt FROM lightgbm_data WHERE type = 'spider' and daily_dt_str = '{temp_time}';"""
  53. des_path = "/root/luojunhui/alg/data/predict_data/predict_{}.json".format(dt_time.strftime("%Y%m%d"))
  54. else:
  55. return
  56. data_list = self.client_spider.select(select_sql)
  57. df = []
  58. for line in tqdm(data_list):
  59. try:
  60. temp = list(line)
  61. video_id = line[0]
  62. title = line[1]
  63. lop, duration = self.spider_lop(video_id)
  64. title_tags = list(jieba.analyse.textrank(title, topK=3))
  65. temp.append(lop)
  66. temp.append(duration)
  67. if title_tags:
  68. for i in range(3):
  69. try:
  70. temp.append(title_tags[i])
  71. except:
  72. temp.append(None)
  73. else:
  74. temp.append(None)
  75. temp.append(None)
  76. temp.append(None)
  77. df.append(temp)
  78. except:
  79. continue
  80. df = pd.DataFrame(df,
  81. columns=['video_id', 'video_title', 'label', 'channel', 'out_user_id', 'mode', 'out_play_cnt',
  82. 'out_like_cnt',
  83. 'out_share_cnt', 'lop', 'duration', 'tag1', 'tag2', 'tag3'])
  84. df.to_json(des_path, orient='records')
  85. class UserProcess(object):
  86. """
  87. User Data Process
  88. """
  89. def __init__(self):
  90. self.client_spider = MySQLClientSpider()
  91. self.user_features = [
  92. "label",
  93. "uid",
  94. "channel",
  95. "user_fans",
  96. "user_view_30",
  97. "user_share_30",
  98. "user_return_30",
  99. "user_rov",
  100. "user_str",
  101. "user_return_videos_30",
  102. "user_return_videos_3",
  103. "user_return_3",
  104. "user_view_3",
  105. "user_share_3",
  106. "address",
  107. "tag1",
  108. "tag2",
  109. "tag3"
  110. ]
  111. def generate_user_data(self, flag, dt_time):
  112. """
  113. 生成user训练数据
  114. :return:
  115. """
  116. dt_time = datetime.datetime.strptime(dt_time, "%Y%m%d")
  117. three_date_before = dt_time + datetime.timedelta(days=4)
  118. temp_time = three_date_before.strftime("%Y%m%d")
  119. if flag == "train":
  120. sql = f"""select video_title, label, user_id, channel, user_fans, user_view_30, user_share_30, user_return_30, user_rov, user_str, user_return_videos_30, user_return_videos_3, user_return_3, user_view_3, user_share_3, address from lightgbm_data where type = 'userupload' and daily_dt_str <= '{temp_time}';"""
  121. des_path = "/root/luojunhui/alg/data/train_data/user_train_{}.json".format(
  122. datetime.datetime.today().strftime("%Y%m%d"))
  123. elif flag == "predict":
  124. sql = f"""select video_title, label, user_id, channel, user_fans, user_view_30, user_share_30, user_return_30, user_rov, user_str, user_return_videos_30, user_return_videos_3, user_return_3, user_view_3, user_share_3, address from lightgbm_data where type = 'userupload' and daily_dt_str = '{temp_time}';"""
  125. des_path = "/root/luojunhui/alg/data/predict_data/user_predict_{}.json".format(dt_time.strftime("%Y%m%d"))
  126. else:
  127. return
  128. dt_list = self.client_spider.select(sql)
  129. df = []
  130. for line in tqdm(dt_list):
  131. title = line[0]
  132. temp = list(line)
  133. title_tags = list(jieba.analyse.textrank(title, topK=3))
  134. if title_tags:
  135. for i in range(3):
  136. try:
  137. temp.append(title_tags[i])
  138. except:
  139. temp.append(None)
  140. else:
  141. temp.append(None)
  142. temp.append(None)
  143. temp.append(None)
  144. df.append(temp[1:])
  145. df = pd.DataFrame(df, columns=self.user_features)
  146. df['ros_30'] = np.where(df['user_view_30'] != 0, df['user_return_30'] / df['user_share_30'], np.nan)
  147. df['rov_30'] = np.where(df['user_view_30'] != 0, df['user_return_30'] / df['user_view_30'], np.nan)
  148. df['ros_3'] = np.where(df['user_view_3'] != 0, df['user_return_3'] / df['user_share_3'], np.nan)
  149. df['rov_3'] = np.where(df['user_view_3'] != 0, df['user_return_3'] / df['user_view_3'], np.nan)
  150. df.to_json(des_path, orient='records')
  151. if __name__ == '__main__':
  152. parser = argparse.ArgumentParser() # 新建参数解释器对象
  153. parser.add_argument("--cate")
  154. parser.add_argument("--flag")
  155. parser.add_argument("--dt")
  156. args = parser.parse_args()
  157. cate = args.cate
  158. flag = args.flag
  159. dt = args.dt
  160. match cate:
  161. case "spider":
  162. S = SpiderProcess()
  163. S.spider_data_produce(flag=flag, dt_time=dt)
  164. case "user_info":
  165. U = UserProcess()
  166. U.generate_user_data(flag=flag, dt_time=dt)