read_data_from_odps.py 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244
  1. """
  2. Created on Mon Mar 18, 2024
  3. @author: luojunhui
  4. Read data from odps and save to json file in local files
  5. """
  6. import os
  7. import sys
  8. import json
  9. from tqdm import tqdm
  10. sys.path.append(os.getcwd())
  11. from functions import PyODPS, generate_hourly_strings, generate_daily_strings
  12. from functions import MySQLClientSpider
  13. class VideoDataGenerator(object):
  14. """
  15. 生成训练数据,测试数据
  16. """
  17. def __init__(self):
  18. self.oo = PyODPS()
  19. # self.v_mysql = MysqlClient()
  20. self.spider_mysql = MySQLClientSpider()
  21. def insert_into_database(self, dt):
  22. """
  23. 把小时级数据插入 MySQL 数据库
  24. :return:
  25. """
  26. data_list = self.get_hour_data(dt)
  27. for obj in tqdm(data_list):
  28. insert_sql = f"""
  29. INSERT INTO lightgbm_data
  30. (video_id, user_id, type, channel, user_fans, user_view_30, user_share_30, user_return_30, user_rov, user_str, out_user_id, spider_mode, out_play_cnt, out_like_cnt, out_share_cnt, hour_dt_str)
  31. values
  32. ({obj['video_id']}, '{obj['uid']}', '{obj['type']}', '{obj['channel']}', {obj['fans']}, {obj['view_count_user_30days']}, {obj['share_count_user_30days']}, {obj['return_count_user_30days']}, {obj['rov_user']}, {obj['str_user']}, '{obj['out_user_id']}', '{obj['mode']}', {obj['out_play_cnt']}, {obj['out_like_cnt']}, {obj['out_share_cnt']}, '{obj['dt']}');
  33. """
  34. self.spider_mysql.update(insert_sql)
  35. def get_hour_data(self, dt):
  36. """
  37. 获取小时级的新视频
  38. :param dt: 小时参数
  39. :return:
  40. """
  41. sql = f"""select * from loghubods.conten_quality_base_hour where dt = '{dt}';"""
  42. hour_data = self.oo.select(sql)
  43. result = []
  44. for line in hour_data:
  45. obj = {
  46. "uid": line["uid"] if line['uid'] else "",
  47. "video_id": line["videoid"] if line['videoid'] else "",
  48. "type": line["type"] if line['type'] else "",
  49. "channel": line["channel"] if line['channel'] else "",
  50. "fst": line["flowpool_start_type"],
  51. "fsl": line["flowpool_start_level"],
  52. "fet": line["flowpool_end_type"],
  53. "fel": line["flowpool_end_level"],
  54. "f_view": line["flowpool_distribute_view_times"],
  55. "f_share": line["flowpool_share_times"],
  56. "f_return": line["flowpool_return_users"],
  57. "f3_view": line["flowpool_3days_distribute_view_times"],
  58. "f3_share": line["flowpool_3days_share_times"],
  59. "f3_return": line["flowpool_3days_return_users"],
  60. "ros_dms": line["ros_dms"],
  61. "rov_dms": line["rov_dms"],
  62. "ros_sls": line["ros_sls"],
  63. "rov_sls": line["rov_sls"],
  64. "fans": line["fans"] if line["fans"] else 0,
  65. "view_count_user_30days": line["view_cnt_user_30days"] if line["view_cnt_user_30days"] else 0,
  66. "share_count_user_30days": line["share_cnt_user_30days"] if line["share_cnt_user_30days"] else 0,
  67. "return_count_user_30days": line["return_cnt_user_30days"] if line["return_cnt_user_30days"] else 0,
  68. "rov_user": line["rov_user"] if line["rov_user"] else 0,
  69. "str_user": line["str_user"] if line["str_user"] else 0,
  70. "out_user_id": line["out_user_id"] if line["out_user_id"] else "",
  71. "mode": line["strategy"] if line["strategy"] else "",
  72. "out_play_cnt": line["out_play_cnt"] if line["out_play_cnt"] else 0,
  73. "out_like_cnt": line["out_like_cnt"] if line["out_like_cnt"] else 0,
  74. "out_share_cnt": line["out_share_cnt"] if line["out_share_cnt"] else 0,
  75. "out_collection_cnt": line["out_collection_cnt"],
  76. "up_level_time_hour": line["up_level_time_hour"],
  77. "dt": line["dt"],
  78. }
  79. result.append(obj)
  80. return result
  81. def get_daily_data(self, dt):
  82. """
  83. 天级表里面存储了视频的表现 label, 通过小时级的 video_id 去获取视频的表现
  84. :param dt: 20240101
  85. :return: data_list
  86. """
  87. sql = f"""select * from loghubods.conten_quality_base where dt = '{dt}';"""
  88. data = self.oo.select(sql)
  89. result = [
  90. {
  91. "video_id": item["videoid"],
  92. "total_view": item["flowpool_distribute_view_times"],
  93. "total_share": item["flowpool_share_times"],
  94. "total_return": item["flowpool_return_users"],
  95. "3day_view": item["flowpool_3days_distribute_view_times"],
  96. "3day_share": item["flowpool_3days_share_times"],
  97. "3day_return": item["flowpool_3days_return_users"],
  98. "3day_up_level": item["up_level_3_days"],
  99. 'uplevel': item['uplevel'],
  100. "dt": item["dt"],
  101. }
  102. for item in data
  103. ]
  104. return result
  105. def get_daily_user_info(self, dt):
  106. """
  107. 获取用户的信息以及特征
  108. :param dt:
  109. """
  110. sql = f"""select * from loghubods.conten_quality_base_day_new where dt = '{dt}';"""
  111. data = self.oo.select(sql)
  112. result = [
  113. {
  114. "uid": item["uid"],
  115. "video_id": item["videoid"],
  116. "address": item["city"],
  117. "return_3days": item["return_count_user_3days"],
  118. "view_3days": item["view_count_user_3days"],
  119. "share_3days": item["share_count_3days"],
  120. "3day_return_500_videos": item["return_count_user_3days_videocnt"],
  121. "30day_return_2000_videos": item["return_count_user_30days_videocnt"],
  122. "dt": item["dt"],
  123. }
  124. for item in data
  125. ]
  126. return result
  127. def save_daily_data(start_date, end_date, save_path):
  128. """
  129. 获取日期范围内数据,并且保存到指定路径
  130. :param start_date:
  131. :param end_date:
  132. :param save_path:
  133. :return:
  134. """
  135. date_list = generate_daily_strings(start_date, end_date)
  136. V = VideoDataGenerator()
  137. L = {}
  138. for date_str in tqdm(date_list):
  139. L[date_str] = {}
  140. data_list = V.get_daily_data(date_str)
  141. for obj in tqdm(data_list):
  142. video_id = obj["video_id"]
  143. L[date_str][video_id] = obj
  144. with open(save_path, "w") as f:
  145. f.write(json.dumps(L, ensure_ascii=False))
  146. def download_hour_video_data(date_str):
  147. """
  148. 获取日期参数
  149. :param date_str:
  150. :return:
  151. """
  152. V = VideoDataGenerator()
  153. data_list = V.get_hour_data(date_str)
  154. L = []
  155. for obj in tqdm(data_list):
  156. L.append(obj)
  157. temp_path = "data/temp_data/hour_{}.json".format(date_str)
  158. with open(temp_path, "w") as f:
  159. f.write(json.dumps(L, ensure_ascii=False))
  160. def save_hourly_data(start_date, end_date):
  161. """
  162. save hourly data
  163. :param start_date:
  164. :param end_date:
  165. :param save_path:
  166. :return:
  167. """
  168. # print(save_path)
  169. V = VideoDataGenerator()
  170. date_list = generate_hourly_strings(start_date, end_date)
  171. for obj in tqdm(date_list):
  172. V.insert_into_database(obj)
  173. # with ThreadPoolExecutor(max_workers=5) as Pool:
  174. # Pool.map(V.insert_into_database, date_list)
  175. def save_daily_user_info(start_date, end_date, save_path):
  176. """
  177. save daily user_info
  178. :param start_date:
  179. :param end_date:
  180. :param save_path:
  181. :return:
  182. """
  183. date_list = generate_daily_strings(start_date, end_date)
  184. V = VideoDataGenerator()
  185. L = {}
  186. for date_str in tqdm(date_list):
  187. L[date_str] = {}
  188. data_list = V.get_daily_user_info(date_str)
  189. for obj in tqdm(data_list):
  190. video_id = obj["video_id"]
  191. L[date_str][video_id] = obj
  192. with open(save_path, "w") as f:
  193. f.write(json.dumps(L, ensure_ascii=False))
  194. if __name__ == "__main__":
  195. flag = int(
  196. input(
  197. "请输入标识符,输入 1 获取小时级数据\n输入 2 获取天级数据\n输入 3 获取用户信息数据: \n"
  198. )
  199. )
  200. if flag == 1:
  201. start = str(input("请输入开始字符串, 格式为 yyyymmddhh:\n"))
  202. end = str(input("请输入结束字符串, 格式为 yyyymmddhh: \n"))
  203. if len(start) == 10 and len(end) == 10:
  204. save_hourly_data(start, end)
  205. else:
  206. print("Time format is not ok")
  207. elif flag == 2:
  208. start = str(input("请输入开始字符串, 格式为 yyyymmdd:\n"))
  209. end = str(input("请输入结束字符串, 格式为 yymmdd: \n"))
  210. save_p = "data/train_data/daily-label-{}-{}.json".format(start, end)
  211. if len(start) == 8 and len(end) == 8:
  212. save_daily_data(start, end, save_p)
  213. else:
  214. print("Time format is not ok")
  215. elif flag == 3:
  216. start = str(input("请输入开始字符串, 格式为 yyyymmdd:\n"))
  217. end = str(input("请输入结束字符串, 格式为 yymmdd: \n"))
  218. save_p = "data/train_data/daily-user-info-{}-{}.json".format(start, end)
  219. if len(start) == 8 and len(end) == 8:
  220. save_daily_user_info(start, end, save_p)
  221. else:
  222. print("Time format is not ok")
  223. else:
  224. print("Input Error ! Make sure your input is 1 or 2 or 3!!")