updateDataFromOdpsDaily.py 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. """
  2. @author: luojunhui
  3. 每日将odps的回流前5000的数据存储迁移的数据库中
  4. """
  5. import time
  6. import asyncio
  7. import aiohttp
  8. from datetime import datetime, timedelta
  9. from applications import ODPSApi
  10. # 异步post请求
  11. async def asyncPost(url, headers, payload):
  12. """
  13. :param url:
  14. :param headers:
  15. :param payload:
  16. :return:
  17. """
  18. retries = 3
  19. async with aiohttp.ClientSession() as session:
  20. for attempt in range(3):
  21. try:
  22. async with session.post(url, headers=headers, data=payload, timeout=10) as response:
  23. return await response.json()
  24. except asyncio.TimeoutError:
  25. if attempt < retries - 1:
  26. await asyncio.sleep(2) # 等待一段时间后重试
  27. else:
  28. raise
  29. # 获取昨天的日期的字符串
  30. def getYesterdayStr():
  31. """获取昨天的日期字符"""
  32. today = datetime.now()
  33. # 计算昨天的日期
  34. yesterday = today - timedelta(days=1)
  35. return yesterday.strftime('%Y%m%d')
  36. class updateFromOdps(object):
  37. """
  38. 从odps更新数据
  39. """
  40. odps_server = ODPSApi()
  41. @classmethod
  42. def getVideoFromOdps(cls):
  43. """
  44. 从odps中获取视频list
  45. :return:
  46. """
  47. date_info = getYesterdayStr()
  48. sql = f"""
  49. select videoid, title, return_lastday, uid, lastday_return, share_total, 品类标签, dt
  50. from loghubods.lastday_return
  51. where dt = '{date_info}';
  52. """
  53. result = cls.odps_server.select(sql)
  54. response = [
  55. {
  56. "video_id": i['videoid'],
  57. "title": i['title'],
  58. "last_day_return": i['return_lastday'],
  59. "uid": i['uid'],
  60. "last_day_view": i['lastday_return'],
  61. "last_day_share": i['share_total'],
  62. "category": i['品类标签'],
  63. "dt": i['dt']
  64. }
  65. for i in result
  66. ]
  67. return response
  68. @classmethod
  69. async def insertIntoDB(cls, data_list):
  70. """
  71. 插入mysql
  72. :return:
  73. """
  74. # 分组,每组分50个
  75. def chunk_list(lst, chunk_size):
  76. """
  77. 将列表分割成指定大小的chunks。
  78. :param lst: 要分割的列表。
  79. :param chunk_size: 每个chunk的大小。
  80. :return: 包含chunks的生成器。
  81. """
  82. for i in range(0, len(lst), chunk_size):
  83. yield lst[i:i + chunk_size]
  84. task_list = chunk_list(data_list, chunk_size=50)
  85. for tasks in task_list:
  86. task_ = [cls.insertSingleVideoToDB(params) for params in tasks]
  87. await asyncio.gather(*task_)
  88. @classmethod
  89. async def insertSingleVideoToDB(cls, video_obj):
  90. """
  91. 更新单个视频
  92. :param video_obj:
  93. :return:
  94. """
  95. url = "http://localhost:8813/insertVideo"
  96. headers = {"Content-Type": "application/json"}
  97. response = await asyncPost(
  98. url=url,
  99. headers=headers,
  100. payload=video_obj
  101. )
  102. return response.json