updateDataFromOdpsDaily.py 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. """
  2. @author: luojunhui
  3. 每日将odps的回流前5000的数据存储迁移的数据库中
  4. """
  5. import asyncio
  6. import time
  7. import aiohttp
  8. from applications import ODPSApi
  9. # 异步post请求
  10. async def asyncPost(url, headers, payload):
  11. """
  12. :param url:
  13. :param headers:
  14. :param payload:
  15. :return:
  16. """
  17. retries = 3
  18. async with aiohttp.ClientSession() as session:
  19. for attempt in range(3):
  20. try:
  21. async with session.post(url, headers=headers, json=payload, timeout=10) as response:
  22. return await response.json()
  23. except asyncio.TimeoutError:
  24. if attempt < retries - 1:
  25. await asyncio.sleep(2) # 等待一段时间后重试
  26. else:
  27. raise
  28. class updateFromOdps(object):
  29. """
  30. 从odps更新数据
  31. """
  32. odps_server = ODPSApi()
  33. @classmethod
  34. def getVideoFromOdps(cls, date_info):
  35. """
  36. 从odps中获取视频list
  37. :return:
  38. """
  39. sql = f"""
  40. select videoid, title, 回流人数, uid, 总曝光, share_total, 品类标签, dt
  41. from loghubods.lastday_return
  42. where dt = '{date_info}';
  43. """
  44. result = cls.odps_server.select(sql)
  45. response = [
  46. {
  47. "video_id": i['videoid'],
  48. "title": i['title'],
  49. "last_day_return": i['回流人数'],
  50. "uid": i['uid'],
  51. "last_day_view": i['总曝光'],
  52. "last_day_share": i['share_total'],
  53. "category": i['品类标签'],
  54. "dt": i['dt']
  55. }
  56. for i in result
  57. ]
  58. return response
  59. @classmethod
  60. async def insertIntoDB(cls, data_list):
  61. """
  62. 插入mysql
  63. :return:
  64. """
  65. # 分组,每组分50个
  66. def chunk_list(lst, chunk_size):
  67. """
  68. 将列表分割成指定大小的chunks。
  69. :param lst: 要分割的列表。
  70. :param chunk_size: 每个chunk的大小。
  71. :return: 包含chunks的生成器。
  72. """
  73. for i in range(0, len(lst), chunk_size):
  74. yield lst[i:i + chunk_size]
  75. task_list = chunk_list(data_list, chunk_size=50)
  76. for tasks in task_list:
  77. task_ = [cls.insertSingleVideoToDB(params) for params in tasks]
  78. await asyncio.gather(*task_)
  79. @classmethod
  80. async def insertSingleVideoToDB(cls, video_obj):
  81. """
  82. 更新单个视频
  83. :param video_obj:
  84. :return:
  85. """
  86. # url = "http://localhost:8813/insertVideo"
  87. url = "http://47.99.132.47:8813/insertVideo"
  88. headers = {"Content-Type": "application/json"}
  89. a = time.time()
  90. response = await asyncPost(
  91. url=url,
  92. headers=headers,
  93. payload=video_obj
  94. )
  95. b = time.time()
  96. print(b - a)
  97. return response