123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116 |
- """
- @author: luojunhui
- 每日将odps的回流前5000的数据存储迁移的数据库中
- """
- import time
- import asyncio
- import aiohttp
- from datetime import datetime, timedelta
- from applications import ODPSApi
- # 异步post请求
- async def asyncPost(url, headers, payload):
- """
- :param url:
- :param headers:
- :param payload:
- :return:
- """
- retries = 3
- async with aiohttp.ClientSession() as session:
- for attempt in range(3):
- try:
- async with session.post(url, headers=headers, data=payload, timeout=10) as response:
- return await response.json()
- except asyncio.TimeoutError:
- if attempt < retries - 1:
- await asyncio.sleep(2) # 等待一段时间后重试
- else:
- raise
- # 获取昨天的日期的字符串
- def getYesterdayStr():
- """获取昨天的日期字符"""
- today = datetime.now()
- # 计算昨天的日期
- yesterday = today - timedelta(days=1)
- return yesterday.strftime('%Y%m%d')
- class updateFromOdps(object):
- """
- 从odps更新数据
- """
- odps_server = ODPSApi()
- @classmethod
- def getVideoFromOdps(cls):
- """
- 从odps中获取视频list
- :return:
- """
- date_info = getYesterdayStr()
- sql = f"""
- select videoid, title, return_lastday, uid, lastday_return, share_total, 品类标签, dt
- from loghubods.lastday_return
- where dt = '{date_info}';
- """
- result = cls.odps_server.select(sql)
- response = [
- {
- "video_id": i['videoid'],
- "title": i['title'],
- "last_day_return": i['return_lastday'],
- "uid": i['uid'],
- "last_day_view": i['lastday_return'],
- "last_day_share": i['share_total'],
- "category": i['品类标签'],
- "dt": i['dt']
- }
- for i in result
- ]
- return response
- @classmethod
- async def insertIntoDB(cls, data_list):
- """
- 插入mysql
- :return:
- """
- # 分组,每组分50个
- def chunk_list(lst, chunk_size):
- """
- 将列表分割成指定大小的chunks。
- :param lst: 要分割的列表。
- :param chunk_size: 每个chunk的大小。
- :return: 包含chunks的生成器。
- """
- for i in range(0, len(lst), chunk_size):
- yield lst[i:i + chunk_size]
- task_list = chunk_list(data_list, chunk_size=50)
- for tasks in task_list:
- task_ = [cls.insertSingleVideoToDB(params) for params in tasks]
- await asyncio.gather(*task_)
- @classmethod
- async def insertSingleVideoToDB(cls, video_obj):
- """
- 更新单个视频
- :param video_obj:
- :return:
- """
- url = "http://localhost:8813/insertVideo"
- headers = {"Content-Type": "application/json"}
- response = await asyncPost(
- url=url,
- headers=headers,
- payload=video_obj
- )
- return response.json
|