read_data_from_odps.py 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. """
  2. Created on Mon Mar 18, 2024
  3. @author: luojunhui
  4. Read data from odps and save to json file in local files
  5. """
  6. import os
  7. import sys
  8. import json
  9. from tqdm import tqdm
  10. from concurrent.futures import ThreadPoolExecutor
  11. sys.path.append(os.getcwd())
  12. from functions import PyODPS, generate_hourly_strings, generate_daily_strings
  13. class VideoDataGenerator(object):
  14. """
  15. 生成训练数据,测试数据
  16. """
  17. def __init__(self):
  18. self.oo = PyODPS()
  19. def get_hour_data(self, dt):
  20. """
  21. 获取小时级的新视频
  22. :param dt: 小时参数
  23. :return:
  24. """
  25. sql = f"""select * from loghubods.conten_quality_base_hour where dt = '{dt}';"""
  26. hour_data = self.oo.select(sql)
  27. result = []
  28. for line in hour_data:
  29. obj = {
  30. "uid": line["uid"],
  31. "video_id": line["videoid"],
  32. "type": line["type"],
  33. "channel": line["channel"],
  34. "fst": line["flowpool_start_type"],
  35. "fsl": line["flowpool_start_level"],
  36. "fet": line["flowpool_end_type"],
  37. "fel": line["flowpool_end_level"],
  38. "f_view": line["flowpool_distribute_view_times"],
  39. "f_share": line["flowpool_share_times"],
  40. "f_return": line["flowpool_return_users"],
  41. "f3_view": line["flowpool_3days_distribute_view_times"],
  42. "f3_share": line["flowpool_3days_share_times"],
  43. "f3_return": line["flowpool_3days_return_users"],
  44. "ros_dms": line["ros_dms"],
  45. "rov_dms": line["rov_dms"],
  46. "ros_sls": line["ros_sls"],
  47. "rov_sls": line["rov_sls"],
  48. "fans": line["fans"],
  49. "view_count_user_30days": line["view_cnt_user_30days"],
  50. "share_count_user_30days": line["share_cnt_user_30days"],
  51. "return_count_user_30days": line["return_cnt_user_30days"],
  52. "rov_user": line["rov_user"],
  53. "str_user": line["str_user"], # share / view
  54. "out_user_id": line["out_user_id"],
  55. "mode": line["strategy"],
  56. "out_play_cnt": line["out_play_cnt"],
  57. "out_like_cnt": line["out_like_cnt"],
  58. "out_share_cnt": line["out_share_cnt"],
  59. "out_collection_cnt": line["out_collection_cnt"],
  60. "up_level_time_hour": line["up_level_time_hour"],
  61. "dt": line["dt"],
  62. }
  63. result.append(obj)
  64. return result
  65. def get_daily_data(self, dt):
  66. """
  67. 天级表里面存储了视频的表现 label, 通过小时级的 video_id 去获取视频的表现
  68. :param dt: 20240101
  69. :return: data_list
  70. """
  71. sql = f"""select * from loghubods.conten_quality_base where dt = '{dt}';"""
  72. data = self.oo.select(sql)
  73. result = [
  74. {
  75. "video_id": item["videoid"],
  76. "total_view": item["flowpool_distribute_view_times"],
  77. "total_share": item["flowpool_share_times"],
  78. "total_return": item["flowpool_return_users"],
  79. "3day_view": item["flowpool_3days_distribute_view_times"],
  80. "3day_share": item["flowpool_3days_share_times"],
  81. "3day_return": item["flowpool_3days_return_users"],
  82. "3day_up_level": item["up_level_3_days"],
  83. "dt": item["dt"],
  84. }
  85. for item in data
  86. ]
  87. return result
  88. def save_daily_data(start_date, end_date, save_path):
  89. """
  90. 获取日期范围内数据,并且保存到指定路径
  91. :param start_date:
  92. :param end_date:
  93. :param save_path:
  94. :return:
  95. """
  96. date_list = generate_daily_strings(start_date, end_date)
  97. V = VideoDataGenerator()
  98. L = {}
  99. for date_str in tqdm(date_list):
  100. L[date_str] = {}
  101. data_list = V.get_daily_data(date_str)
  102. for obj in tqdm(data_list):
  103. video_id = obj["video_id"]
  104. L[date_str][video_id] = obj
  105. with open(save_path, "w") as f:
  106. f.write(json.dumps(L, ensure_ascii=False))
  107. def download_hour_video_data(date_str):
  108. """
  109. 获取日期参数
  110. :param date_str:
  111. :return:
  112. """
  113. V = VideoDataGenerator()
  114. data_list = V.get_hour_data(date_str)
  115. L = []
  116. for obj in data_list:
  117. L.append(obj)
  118. temp_path = "data/temp_data/hour_{}.json".format(date_str)
  119. with open(temp_path, "w") as f:
  120. f.write(json.dumps(L, ensure_ascii=False))
  121. def save_hourly_data(start_date, end_date, save_path):
  122. """
  123. save hourly data
  124. :param start_date:
  125. :param end_date:
  126. :param save_path:
  127. :return:
  128. """
  129. print(save_path)
  130. date_list = generate_hourly_strings(start_date, end_date)
  131. with ThreadPoolExecutor(max_workers=10) as Pool:
  132. Pool.map(download_hour_video_data, date_list)
  133. # for date_str in tqdm(date_list):
  134. # data_list = V.get_hour_data(date_str)
  135. # for obj in tqdm(data_list):
  136. # L.append(obj)
  137. # with open(save_path, "w") as f:
  138. # f.write(json.dumps(L, ensure_ascii=False))
  139. if __name__ == "__main__":
  140. flag = int(input("请输入标识符,输入 1 获取小时级数据, 输入 2 获取天级数据: \n"))
  141. if flag == 1:
  142. start = str(input("请输入开始字符串, 格式为 yyyymmddhh:\n"))
  143. end = str(input("请输入结束字符串, 格式为 yymmddhh: \n"))
  144. save_p = "data/hourly-train-{}-{}.json".format(start, end)
  145. if len(start) == 10 and len(end) == 10:
  146. save_hourly_data(start, end, save_p)
  147. else:
  148. print("Time format is not ok")
  149. elif flag == 2:
  150. start = str(input("请输入开始字符串, 格式为 yyyymmdd:\n"))
  151. end = str(input("请输入结束字符串, 格式为 yymmdd: \n"))
  152. save_p = "data/train_data/daily-label-{}-{}.json".format(start, end)
  153. if len(start) == 8 and len(end) == 8:
  154. save_daily_data(start, end, save_p)
  155. else:
  156. print("Time format is not ok")
  157. else:
  158. print("Input Error ! Make sure your input is 1 or 2!!")