read_data_from_odps.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. """
  2. Created on Mon Mar 18, 2024
  3. @author: luojunhui
  4. Read data from odps and save to json file in local files
  5. """
  6. import os
  7. import sys
  8. import json
  9. from tqdm import tqdm
  10. sys.path.append(os.getcwd())
  11. from functions import PyODPS, generate_hourly_strings, generate_daily_strings
  12. class VideoDataGenerator(object):
  13. """
  14. 生成训练数据,测试数据
  15. """
  16. def __init__(self):
  17. self.oo = PyODPS()
  18. def get_hour_data(self, dt):
  19. """
  20. 获取小时级的新视频
  21. :param dt: 小时参数
  22. :return:
  23. """
  24. sql = f"""select * from loghubods.conten_quality_base_hour where dt = '{dt}';"""
  25. hour_data = self.oo.select(sql)
  26. result = []
  27. for line in hour_data:
  28. obj = {
  29. "uid": line['uid'],
  30. "video_id": line['videoid'],
  31. "type": line['type'],
  32. "channel": line['channel'],
  33. "fst": line['flowpool_start_type'],
  34. "fsl": line['flowpool_start_level'],
  35. "fet": line['flowpool_end_type'],
  36. "fel": line['flowpool_end_level'],
  37. "f_view": line['flowpool_distribute_view_times'],
  38. "f_share": line['flowpool_share_times'],
  39. "f_return": line['flowpool_return_users'],
  40. "f3_view": line['flowpool_3days_distribute_view_times'],
  41. "f3_share": line['flowpool_3days_share_times'],
  42. "f3_return": line['flowpool_3days_return_users'],
  43. "ros_dms": line['ros_dms'],
  44. "rov_dms": line['rov_dms'],
  45. "ros_sls": line['ros_sls'],
  46. "rov_sls": line['rov_sls'],
  47. "fans": line['fans'],
  48. "view_count_user_30days": line['view_cnt_user_30days'],
  49. "share_count_user_30days": line['share_cnt_user_30days'],
  50. "return_count_user_30days": line['return_cnt_user_30days'],
  51. "rov_user": line['rov_user'],
  52. "str_user": line['str_user'], # share / view
  53. "out_user_id": line['out_user_id'],
  54. "mode": line['strategy'],
  55. "out_play_cnt": line['out_play_cnt'],
  56. "out_like_cnt": line['out_like_cnt'],
  57. "out_share_cnt": line['out_share_cnt'],
  58. "out_collection_cnt": line['out_collection_cnt'],
  59. "up_level_time_hour": line['up_level_time_hour'],
  60. "dt": line['dt']
  61. }
  62. result.append(obj)
  63. return result
  64. def get_daily_data(self, dt):
  65. """
  66. 天级表里面存储了视频的表现 label, 通过小时级的 video_id 去获取视频的表现
  67. :param dt: 20240101
  68. :return: data_list
  69. """
  70. sql = f"""select * from loghubods.conten_quality_base where dt = '{dt}';"""
  71. data = self.oo.select(sql)
  72. result = [
  73. {
  74. "video_id": item['videoid'],
  75. "total_view": item['flowpool_distribute_view_times'],
  76. "total_share": item['flowpool_share_times'],
  77. "total_return": item['flowpool_return_users'],
  78. "3day_view": item['flowpool_3days_distribute_view_times'],
  79. "3day_share": item['flowpool_3days_share_times'],
  80. "3day_return": item['flowpool_3days_return_users'],
  81. "dt": item['dt']
  82. } for item in data
  83. ]
  84. return result
  85. def save_daily_data(start_date, end_date, save_path):
  86. """
  87. 获取日期范围内数据,并且保存到指定路径
  88. :param start_date:
  89. :param end_date:
  90. :param save_path:
  91. :return:
  92. """
  93. date_list = generate_daily_strings(start_date, end_date)
  94. V = VideoDataGenerator()
  95. L = {}
  96. for date_str in tqdm(date_list):
  97. L[date_str] = {}
  98. data_list = V.get_daily_data(date_str)
  99. for obj in tqdm(data_list):
  100. video_id = obj['video_id']
  101. L[date_str][video_id] = obj
  102. with open(save_path, "w") as f:
  103. f.write(json.dumps(L, ensure_ascii=False))
  104. def save_hourly_data(start_date, end_date, save_path):
  105. """
  106. save hourly data
  107. :param start_date:
  108. :param end_date:
  109. :param save_path:
  110. :return:
  111. """
  112. date_list = generate_hourly_strings(start_date, end_date)
  113. V = VideoDataGenerator()
  114. L = []
  115. for date_str in tqdm(date_list):
  116. data_list = V.get_hour_data(date_str)
  117. for obj in tqdm(data_list):
  118. L.append(obj)
  119. with open(save_path, "w") as f:
  120. f.write(json.dumps(L, ensure_ascii=False))
  121. if __name__ == '__main__':
  122. flag = int(input("请输入标识符,输入 1 获取小时级数据, 输入 2 获取天级数据: \n"))
  123. if flag == 1:
  124. start = str(input("请输入开始字符串, 格式为 yyyymmddhh:\n"))
  125. end = str(input("请输入结束字符串, 格式为 yymmddhh: \n"))
  126. save_p = "data/hourly-train-{}-{}.json".format(start, end)
  127. if len(start) == 10 and len(end) == 10:
  128. save_hourly_data(start, end, save_p)
  129. else:
  130. print("Time format is not ok")
  131. elif flag == 2:
  132. start = str(input("请输入开始字符串, 格式为 yyyymmdd:\n"))
  133. end = str(input("请输入结束字符串, 格式为 yymmdd: \n"))
  134. save_p = "data/daily-label-{}-{}.json".format(start, end)
  135. if len(start) == 8 and len(end) == 8:
  136. save_daily_data(start, end, save_p)
  137. else:
  138. print("Time format is not ok")
  139. else:
  140. print("Input Error ! Make sure your input is 1 or 2!!")