read_data_v2.py 3.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. """
  2. @author: luojunhui
  3. """
  4. import os
  5. import sys
  6. import json
  7. import pymysql
  8. from tqdm import tqdm
  9. from concurrent.futures.thread import ThreadPoolExecutor
  10. sys.path.append(os.getcwd())
  11. from functions import PyODPS, generate_hourly_strings, generate_daily_strings
  12. from functions import MySQLClientSpider
  13. def insert_into_db(video_obj):
  14. """
  15. Insert video into MySQL database
  16. :param video_obj:
  17. :return:
  18. """
  19. video_id = video_obj['videoid']
  20. title = video_obj['title']
  21. recommend_status = video_obj['recommend_status']
  22. total_time = video_obj['total_time']
  23. width = video_obj['width']
  24. height = video_obj['height']
  25. app_type = video_obj['app_type'] if video_obj['app_type'] else ''
  26. descr = video_obj['descr'] if video_obj['descr'] else ''
  27. uid = video_obj['uid']
  28. type_ = video_obj['type'] if video_obj['type'] else ''
  29. channel = video_obj['channel'] if video_obj['channel'] else ''
  30. user_fans = video_obj['user_fans']
  31. user_view_30 = video_obj['user_view_cnt_30days']
  32. user_share_30 = video_obj['user_share_cnt_30days']
  33. user_return_30 = video_obj['user_return_cnt_30days']
  34. dt = video_obj['dt']
  35. sql = f"""
  36. INSERT INTO lightgbm
  37. (video_id, title, recommend_status, total_time, width, height, app_type, descr, uid, type, channel, user_fans, user_view_cnt_30days, user_share_cnt_30days, user_return_cnt_30days, dt)
  38. values
  39. ({video_id}, '{title}', '{recommend_status}', {total_time}, {width}, {height}, '{app_type}', '{descr}', '{uid}', '{type_}', '{channel}', {user_fans}, {user_view_30}, {user_share_30}, {user_return_30}, '{dt}');
  40. """
  41. # print(sql)
  42. connection = pymysql.connect(
  43. host="rm-bp1159bu17li9hi94.mysql.rds.aliyuncs.com", # 数据库IP地址,内网地址
  44. port=3306, # 端口号
  45. user="crawler", # mysql用户名
  46. passwd="crawler123456@", # mysql用户登录密码
  47. db="piaoquan-crawler", # 数据库名
  48. charset="utf8mb4" # 如果数据库里面的文本是utf8编码的,charset指定是utf8
  49. )
  50. cursor = connection.cursor()
  51. try:
  52. cursor.execute(sql)
  53. connection.commit()
  54. except Exception as e:
  55. print("{}插入失败, 失败原因是: {}".format(video_obj['video_id'], e))
  56. finally:
  57. connection.close()
  58. class ReadDataFromOdps(object):
  59. """
  60. read data from ODPS
  61. """
  62. def __init__(self):
  63. self.oo = PyODPS()
  64. self.spider_ = MySQLClientSpider()
  65. def read_data_with_multi_threads(self):
  66. """
  67. 多线程读取 odps,并且异步更新到 mysql 中,作为训练数据
  68. :return:
  69. """
  70. dt_list = generate_daily_strings(start_date="20240302", end_date="20240410")
  71. print(dt_list)
  72. for dt in tqdm(dt_list):
  73. daily_info = self.read_daily_data(dt)
  74. with ThreadPoolExecutor(max_workers=8) as Pool:
  75. Pool.map(insert_into_db, daily_info)
  76. def read_daily_data(self, dt):
  77. """
  78. 获取天级别数据
  79. :param dt:
  80. """
  81. sql = f"""select * from loghubods.content_quality_base_eachday where dt = '{dt}';"""
  82. daily_data = self.oo.select(sql)
  83. return daily_data
  84. if __name__ == '__main__':
  85. R = ReadDataFromOdps()
  86. R.read_data_with_multi_threads()