migrate.py 2.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. """
  2. @author: luojunhui
  3. """
  4. import json
  5. import pymysql
  6. import requests
  7. import pandas as pd
  8. from concurrent.futures.thread import ThreadPoolExecutor
  9. def request_for_info(video_id):
  10. """
  11. 请求数据
  12. :param video_id:
  13. :return:
  14. """
  15. url = "https://longvideoapi.piaoquantv.com/longvideoapi/openapi/video/batchSelectVideoInfo"
  16. data = {
  17. "videoIdList": [video_id]
  18. }
  19. header = {
  20. "Content-Type": "application/json",
  21. }
  22. response = requests.post(url, headers=header, data=json.dumps(data))
  23. return response.json()
  24. def migrate_data_to_mysql(video_id, title, view_, return_, video_url):
  25. """
  26. 把 data_works 数据迁移到数据库
  27. :param obj:
  28. :return:
  29. """
  30. rov = int(return_) / int(view_) if int(view_) > 0 else 0
  31. insert_sql = f"""
  32. INSERT INTO top_return_daily
  33. (video_id, title, view_, return_, video_url, dt, rov)
  34. VALUES
  35. (%s, %s, %s, %s, %s, %s, %s);
  36. """
  37. connection = pymysql.connect(
  38. host='rm-bp1159bu17li9hi94.mysql.rds.aliyuncs.com',
  39. port=3306,
  40. user='crawler',
  41. password='crawler123456@',
  42. db='piaoquan-crawler',
  43. charset='utf8mb4'
  44. )
  45. cursor = connection.cursor()
  46. cursor.execute(
  47. insert_sql,
  48. (
  49. video_id,
  50. title,
  51. view_,
  52. return_,
  53. video_url,
  54. "20240715",
  55. rov
  56. )
  57. )
  58. connection.commit()
  59. def process(line):
  60. title = line[0]
  61. video_id = line[1].replace('"', '')
  62. view = int(line[3])
  63. return_count = int(line[4])
  64. video_url = request_for_info(video_id)['data'][0]['videoPath']
  65. migrate_data_to_mysql(video_id, title, view, return_count, video_url)
  66. path = "/Users/luojunhui/Downloads/2022-top10000.csv"
  67. with open(path, encoding="gbk", errors='ignore') as f:
  68. data = f.readlines()
  69. L = []
  70. for line in data:
  71. temp = line.replace("\n", "").split(",")
  72. # print(len(temp))
  73. if len(temp) == 5:
  74. L.append(temp)
  75. # for line in L:
  76. # print(line)
  77. # data_list = df.values.tolist()
  78. with ThreadPoolExecutor(max_workers=10) as pool:
  79. pool.map(process, L)