updateDataFromOdpsDaily.py 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. """
  2. @author: luojunhui
  3. 每日将odps的回流前5000的数据存储迁移的数据库中
  4. """
  5. import asyncio
  6. import aiohttp
  7. from datetime import datetime, timedelta
  8. from applications import ODPSApi
  9. # 异步post请求
  10. async def asyncPost(url, headers, payload):
  11. """
  12. :param url:
  13. :param headers:
  14. :param payload:
  15. :return:
  16. """
  17. retries = 3
  18. async with aiohttp.ClientSession() as session:
  19. for attempt in range(3):
  20. try:
  21. async with session.post(url, headers=headers, json=payload, timeout=10) as response:
  22. return await response.json()
  23. except asyncio.TimeoutError:
  24. if attempt < retries - 1:
  25. await asyncio.sleep(2) # 等待一段时间后重试
  26. else:
  27. raise
  28. # 获取昨天的日期的字符串
  29. def getYesterdayStr():
  30. """获取昨天的日期字符"""
  31. today = datetime.now()
  32. # 计算昨天的日期
  33. yesterday = today - timedelta(days=1)
  34. return yesterday.strftime('%Y%m%d')
  35. class updateFromOdps(object):
  36. """
  37. 从odps更新数据
  38. """
  39. odps_server = ODPSApi()
  40. @classmethod
  41. def getVideoFromOdps(cls):
  42. """
  43. 从odps中获取视频list
  44. :return:
  45. """
  46. date_info = getYesterdayStr()
  47. sql = f"""
  48. select videoid, title, 回流人数, uid, 总曝光, share_total, 品类标签, dt
  49. from loghubods.lastday_return
  50. where dt = '{date_info}';
  51. """
  52. result = cls.odps_server.select(sql)
  53. response = [
  54. {
  55. "video_id": i['videoid'],
  56. "title": i['title'],
  57. "last_day_return": i['回流人数'],
  58. "uid": i['uid'],
  59. "last_day_view": i['总曝光'],
  60. "last_day_share": i['share_total'],
  61. "category": i['品类标签'],
  62. "dt": i['dt']
  63. }
  64. for i in result
  65. ]
  66. return response
  67. @classmethod
  68. async def insertIntoDB(cls, data_list):
  69. """
  70. 插入mysql
  71. :return:
  72. """
  73. # 分组,每组分50个
  74. def chunk_list(lst, chunk_size):
  75. """
  76. 将列表分割成指定大小的chunks。
  77. :param lst: 要分割的列表。
  78. :param chunk_size: 每个chunk的大小。
  79. :return: 包含chunks的生成器。
  80. """
  81. for i in range(0, len(lst), chunk_size):
  82. yield lst[i:i + chunk_size]
  83. task_list = chunk_list(data_list, chunk_size=50)
  84. for tasks in task_list:
  85. task_ = [cls.insertSingleVideoToDB(params) for params in tasks]
  86. await asyncio.gather(*task_)
  87. @classmethod
  88. async def insertSingleVideoToDB(cls, video_obj):
  89. """
  90. 更新单个视频
  91. :param video_obj:
  92. :return:
  93. """
  94. url = "http://localhost:8813/insertVideo"
  95. headers = {"Content-Type": "application/json"}
  96. response = await asyncPost(
  97. url=url,
  98. headers=headers,
  99. payload=video_obj
  100. )
  101. return response