""" @author: luojunhui 每日将odps的回流前5000的数据存储迁移的数据库中 """ import asyncio import aiohttp from datetime import datetime, timedelta from applications import ODPSApi # 异步post请求 async def asyncPost(url, headers, payload): """ :param url: :param headers: :param payload: :return: """ retries = 3 async with aiohttp.ClientSession() as session: for attempt in range(3): try: async with session.post(url, headers=headers, json=payload, timeout=10) as response: return await response.json() except asyncio.TimeoutError: if attempt < retries - 1: await asyncio.sleep(2) # 等待一段时间后重试 else: raise # 获取昨天的日期的字符串 def getYesterdayStr(): """获取昨天的日期字符""" today = datetime.now() # 计算昨天的日期 yesterday = today - timedelta(days=1) return yesterday.strftime('%Y%m%d') class updateFromOdps(object): """ 从odps更新数据 """ odps_server = ODPSApi() @classmethod def getVideoFromOdps(cls): """ 从odps中获取视频list :return: """ date_info = getYesterdayStr() sql = f""" select videoid, title, 回流人数, uid, 总曝光, share_total, 品类标签, dt from loghubods.lastday_return where dt = '{date_info}'; """ result = cls.odps_server.select(sql) response = [ { "video_id": i['videoid'], "title": i['title'], "last_day_return": i['回流人数'], "uid": i['uid'], "last_day_view": i['总曝光'], "last_day_share": i['share_total'], "category": i['品类标签'], "dt": i['dt'] } for i in result ] return response @classmethod async def insertIntoDB(cls, data_list): """ 插入mysql :return: """ # 分组,每组分50个 def chunk_list(lst, chunk_size): """ 将列表分割成指定大小的chunks。 :param lst: 要分割的列表。 :param chunk_size: 每个chunk的大小。 :return: 包含chunks的生成器。 """ for i in range(0, len(lst), chunk_size): yield lst[i:i + chunk_size] task_list = chunk_list(data_list, chunk_size=50) for tasks in task_list: task_ = [cls.insertSingleVideoToDB(params) for params in tasks] await asyncio.gather(*task_) @classmethod async def insertSingleVideoToDB(cls, video_obj): """ 更新单个视频 :param video_obj: :return: """ url = "http://localhost:8813/insertVideo" headers = {"Content-Type": "application/json"} response = await asyncPost( url=url, headers=headers, payload=video_obj ) return response