123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109 |
- """
- @author: luojunhui
- 每日将odps的回流前5000的数据存储迁移的数据库中
- """
- import asyncio
- import time
- import aiohttp
- from applications import ODPSApi
- # 异步post请求
- async def asyncPost(url, headers, payload):
- """
- :param url:
- :param headers:
- :param payload:
- :return:
- """
- retries = 3
- async with aiohttp.ClientSession() as session:
- for attempt in range(3):
- try:
- async with session.post(url, headers=headers, json=payload, timeout=10) as response:
- return await response.json()
- except asyncio.TimeoutError:
- if attempt < retries - 1:
- await asyncio.sleep(2) # 等待一段时间后重试
- else:
- raise
- class updateFromOdps(object):
- """
- 从odps更新数据
- """
- odps_server = ODPSApi()
- @classmethod
- def getVideoFromOdps(cls, date_info):
- """
- 从odps中获取视频list
- :return:
- """
- sql = f"""
- select videoid, title, 回流人数, uid, 总曝光, share_total, 品类标签, dt
- from loghubods.lastday_return
- where dt = '{date_info}';
- """
- result = cls.odps_server.select(sql)
- response = [
- {
- "video_id": i['videoid'],
- "title": i['title'],
- "last_day_return": i['回流人数'],
- "uid": i['uid'],
- "last_day_view": i['总曝光'],
- "last_day_share": i['share_total'],
- "category": i['品类标签'],
- "dt": i['dt']
- }
- for i in result
- ]
- return response
- @classmethod
- async def insertIntoDB(cls, data_list):
- """
- 插入mysql
- :return:
- """
- # 分组,每组分50个
- def chunk_list(lst, chunk_size):
- """
- 将列表分割成指定大小的chunks。
- :param lst: 要分割的列表。
- :param chunk_size: 每个chunk的大小。
- :return: 包含chunks的生成器。
- """
- for i in range(0, len(lst), chunk_size):
- yield lst[i:i + chunk_size]
- task_list = chunk_list(data_list, chunk_size=50)
- for tasks in task_list:
- task_ = [cls.insertSingleVideoToDB(params) for params in tasks]
- await asyncio.gather(*task_)
- @classmethod
- async def insertSingleVideoToDB(cls, video_obj):
- """
- 更新单个视频
- :param video_obj:
- :return:
- """
- # url = "http://localhost:8813/insertVideo"
- url = "http://47.99.132.47:8813/insertVideo"
- headers = {"Content-Type": "application/json"}
- a = time.time()
- response = await asyncPost(
- url=url,
- headers=headers,
- payload=video_obj
- )
- b = time.time()
- print(b - a)
- return response
|