|
@@ -32,12 +32,14 @@ logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(level
|
|
|
|
|
|
|
|
def get_partition_df(table, dt):
|
|
def get_partition_df(table, dt):
|
|
|
logger.info(f"开始下载: {table} -- {dt} 的数据")
|
|
logger.info(f"开始下载: {table} -- {dt} 的数据")
|
|
|
-
|
|
|
|
|
- download_session = odps_client.get_download_session(table, dt)
|
|
|
|
|
- logger.info(f"表: {table} 中的分区 {dt}, 共有 {download_session.count} 条数据")
|
|
|
|
|
- with download_session.open_arrow_reader(0, download_session.count) as reader:
|
|
|
|
|
- # 将所有数据加载到 DataFrame 中
|
|
|
|
|
- df = pd.concat([batch.to_pandas() for batch in reader])
|
|
|
|
|
|
|
+ try:
|
|
|
|
|
+ download_session = odps_client.get_download_session(table, dt)
|
|
|
|
|
+ logger.info(f"表: {table} 中的分区 {dt}, 共有 {download_session.count} 条数据")
|
|
|
|
|
+ with download_session.open_arrow_reader(0, download_session.count) as reader:
|
|
|
|
|
+ # 将所有数据加载到 DataFrame 中
|
|
|
|
|
+ df = pd.concat([batch.to_pandas() for batch in reader])
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ logger.error(f"下载 {table} -- {dt} 的数据异常: ", e)
|
|
|
|
|
|
|
|
logger.info(f"下载结束: {table} -- {dt} 的数据, 共计 {df.shape[0]} 条数据")
|
|
logger.info(f"下载结束: {table} -- {dt} 的数据, 共计 {df.shape[0]} 条数据")
|
|
|
return df
|
|
return df
|