process_data_for_lightgbm.py 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247
  1. import datetime
  2. import sys
  3. import os
  4. import argparse
  5. import numpy as np
  6. from tqdm import tqdm
  7. import jieba.analyse
  8. import pandas as pd
  9. sys.path.append(os.getcwd())
  10. from functions import MySQLClientSpider
  11. class SpiderProcess(object):
  12. """
  13. Spider Data Process and Process data for lightgbm training
  14. """
  15. def __init__(self):
  16. self.client_spider = MySQLClientSpider()
  17. self.spider_features = [
  18. "channel",
  19. "out_user_id",
  20. "mode",
  21. "out_play_cnt",
  22. "out_like_cnt",
  23. "out_share_cnt"
  24. ]
  25. def spider_lop(self, video_id):
  26. """
  27. Spider lop = like / play
  28. :param video_id:
  29. :return:
  30. """
  31. sql = f"""SELECT like_cnt, play_cnt, duration from crawler_video where video_id = '{video_id}';"""
  32. try:
  33. like_cnt, play_cnt, duration = self.client_spider.select(sql)[0]
  34. lop = (like_cnt + 700) / (play_cnt + 18000)
  35. return lop, duration
  36. except Exception as e:
  37. print(video_id, "\t", e)
  38. return 0, 0
  39. def spider_data_produce(self, flag, dt_time):
  40. """
  41. 从数据库中读取爬虫数据,并且转化为 dataframe, 存储到本地,用作训练和预测
  42. :return:
  43. """
  44. dt_time = datetime.datetime.strptime(dt_time, "%Y%m%d")
  45. three_date_before = dt_time + datetime.timedelta(days=4)
  46. temp_time = three_date_before.strftime("%Y%m%d")
  47. if flag == "train":
  48. select_sql = f"""SELECT video_id, video_title, rov_label, channel, out_user_id, spider_mode, out_play_cnt, out_like_cnt, out_share_cnt FROM lightgbm_data WHERE type = 'spider' and daily_dt_str <= '{temp_time}' and rov_label > 0;"""
  49. des_path = "/root/luojunhui/alg/data/train_data/spider_train_{}.json".format(
  50. datetime.datetime.today().strftime("%Y%m%d"))
  51. elif flag == "predict":
  52. select_sql = f"""SELECT video_id, video_title, rov_label, channel, out_user_id, spider_mode, out_play_cnt, out_like_cnt, out_share_cnt FROM lightgbm_data WHERE type = 'spider' and daily_dt_str = '{temp_time}' and rov_label > 0;"""
  53. des_path = "/root/luojunhui/alg/data/predict_data/predict_{}.json".format(dt_time.strftime("%Y%m%d"))
  54. else:
  55. return
  56. data_list = self.client_spider.select(select_sql)
  57. df = []
  58. for line in tqdm(data_list):
  59. try:
  60. temp = list(line)
  61. video_id = line[0]
  62. title = line[1]
  63. lop, duration = self.spider_lop(video_id)
  64. title_tags = list(jieba.analyse.textrank(title, topK=3))
  65. temp.append(lop)
  66. temp.append(duration)
  67. if title_tags:
  68. for i in range(3):
  69. try:
  70. temp.append(title_tags[i])
  71. except:
  72. temp.append(None)
  73. else:
  74. temp.append(None)
  75. temp.append(None)
  76. temp.append(None)
  77. df.append(temp)
  78. except:
  79. continue
  80. df = pd.DataFrame(df,
  81. columns=['video_id', 'video_title', 'label', 'channel', 'out_user_id', 'mode', 'out_play_cnt',
  82. 'out_like_cnt',
  83. 'out_share_cnt', 'lop', 'duration', 'tag1', 'tag2', 'tag3'])
  84. df.to_json(des_path, orient='records')
  85. class UserProcess(object):
  86. """
  87. User Data Process
  88. """
  89. def __init__(self):
  90. self.client_spider = MySQLClientSpider()
  91. self.user_features = [
  92. "label",
  93. "uid",
  94. "channel",
  95. "user_fans",
  96. "user_view_30",
  97. "user_share_30",
  98. "user_return_30",
  99. "user_rov",
  100. "user_str",
  101. "user_return_videos_30",
  102. "user_return_videos_3",
  103. "user_return_3",
  104. "user_view_3",
  105. "user_share_3",
  106. "address",
  107. "tag1",
  108. "tag2",
  109. "tag3"
  110. ]
  111. def generate_user_data(self, flag, dt_time):
  112. """
  113. 生成user训练数据
  114. :return:
  115. """
  116. dt_time = datetime.datetime.strptime(dt_time, "%Y%m%d")
  117. three_date_before = dt_time + datetime.timedelta(days=4)
  118. temp_time = three_date_before.strftime("%Y%m%d")
  119. if flag == "train":
  120. sql = f"""select video_title, label, user_id, channel, user_fans, user_view_30, user_share_30, user_return_30, user_rov, user_str, user_return_videos_30, user_return_videos_3, user_return_3, user_view_3, user_share_3, address from lightgbm_data where type = 'userupload' and daily_dt_str <= '{temp_time}';"""
  121. des_path = "/root/luojunhui/alg/data/train_data/user_train_{}.json".format(
  122. datetime.datetime.today().strftime("%Y%m%d"))
  123. elif flag == "predict":
  124. sql = f"""select video_title, label, user_id, channel, user_fans, user_view_30, user_share_30, user_return_30, user_rov, user_str, user_return_videos_30, user_return_videos_3, user_return_3, user_view_3, user_share_3, address from lightgbm_data where type = 'userupload' and daily_dt_str = '{temp_time}';"""
  125. des_path = "/root/luojunhui/alg/data/predict_data/user_predict_{}.json".format(dt_time.strftime("%Y%m%d"))
  126. else:
  127. return
  128. dt_list = self.client_spider.select(sql)
  129. df = []
  130. for line in tqdm(dt_list):
  131. title = line[0]
  132. temp = list(line)
  133. title_tags = list(jieba.analyse.textrank(title, topK=3))
  134. if title_tags:
  135. for i in range(3):
  136. try:
  137. temp.append(title_tags[i])
  138. except:
  139. temp.append(None)
  140. else:
  141. temp.append(None)
  142. temp.append(None)
  143. temp.append(None)
  144. df.append(temp[1:])
  145. df = pd.DataFrame(df, columns=self.user_features)
  146. df['ros_30'] = np.where(df['user_view_30'] != 0, df['user_return_30'] / df['user_share_30'], np.nan)
  147. df['rov_30'] = np.where(df['user_view_30'] != 0, df['user_return_30'] / df['user_view_30'], np.nan)
  148. df['ros_3'] = np.where(df['user_view_3'] != 0, df['user_return_3'] / df['user_share_3'], np.nan)
  149. df['rov_3'] = np.where(df['user_view_3'] != 0, df['user_return_3'] / df['user_view_3'], np.nan)
  150. df.to_json(des_path, orient='records')
  151. class AllProcess(object):
  152. """
  153. 全部数据
  154. """
  155. def __init__(self):
  156. self.client_spider = MySQLClientSpider()
  157. self.all_features = [
  158. # "video_title",
  159. "rov_label",
  160. "channel",
  161. "type",
  162. # "out_play_cnt",
  163. # "out_like_cnt",
  164. # "out_share_cnt"
  165. "tag1",
  166. "tag2",
  167. "tag3"
  168. ]
  169. def read_all_data(self, flag, dt_time):
  170. """
  171. 生成用户数据
  172. :param flag: predict/train
  173. :param dt_time: 时间
  174. :return:
  175. """
  176. dt_time = datetime.datetime.strptime(dt_time, "%Y%m%d")
  177. three_date_before = dt_time + datetime.timedelta(days=4)
  178. temp_time = three_date_before.strftime("%Y%m%d")
  179. if flag == "train":
  180. sql = f"""select video_title, rov_label, channel, type from lightgbm_data where daily_dt_str <= '{temp_time}' and rov_label > 0;"""
  181. des_path = "/root/luojunhui/alg/data/train_data/all_train_{}.json".format(
  182. datetime.datetime.today().strftime("%Y%m%d"))
  183. elif flag == "predict":
  184. sql = f"""select video_title, rov_label, channel, type from lightgbm_data where daily_dt_str = '{temp_time}';"""
  185. des_path = "/root/luojunhui/alg/data/predict_data/all_predict_{}.json".format(dt_time.strftime("%Y%m%d"))
  186. else:
  187. return
  188. dt_list = self.client_spider.select(sql)
  189. df = []
  190. for line in tqdm(dt_list):
  191. title = line[0]
  192. try:
  193. title_tags = list(jieba.analyse.textrank(title, topK=3))
  194. temp = list(line)
  195. if title_tags:
  196. for i in range(3):
  197. try:
  198. temp.append(title_tags[i])
  199. except:
  200. temp.append(None)
  201. df.append(temp[1:])
  202. else:
  203. continue
  204. except Exception as e:
  205. print("title is empty\t", e)
  206. df = pd.DataFrame(df, columns=self.all_features)
  207. df.to_json(des_path, orient='records')
  208. if __name__ == '__main__':
  209. parser = argparse.ArgumentParser() # 新建参数解释器对象
  210. parser.add_argument("--cate")
  211. parser.add_argument("--flag")
  212. parser.add_argument("--dt")
  213. args = parser.parse_args()
  214. cate = args.cate
  215. flag = args.flag
  216. dt = args.dt
  217. match cate:
  218. case "spider":
  219. S = SpiderProcess()
  220. S.spider_data_produce(flag=flag, dt_time=dt)
  221. case "user_info":
  222. U = UserProcess()
  223. U.generate_user_data(flag=flag, dt_time=dt)
  224. case "all":
  225. A = AllProcess()
  226. A.read_all_data(flag=flag, dt_time=dt)