import json import re import requests from odps import ODPS from odps.tunnel import TableTunnel from util import convert_util, date_util class ODPSClient(object): def __init__(self, project="loghubods"): self.accessId = "LTAIWYUujJAm7CbH" self.accessSecret = "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P" self.endpoint = "http://service.odps.aliyun.com/api" self.tunnelUrl = "http://dt.cn-hangzhou.maxcompute.aliyun-inc.com" self.odps = ODPS( self.accessId, self.accessSecret, project, self.endpoint ) def get_all_record(self, table: str, dt: str) -> list: tunnel = TableTunnel(self.odps) download_session = tunnel.create_download_session(f"{table}", partition_spec=f"dt={dt}") count = download_session.count print(f"表: {table} 中的分区 {dt}, 共有 {count} 条数据") result = [] with download_session.open_record_reader(0, download_session.count) as reader: for record in reader: result.append(record) return result def get_download_session(self, table: str, dt: str): tunnel = TableTunnel(self.odps) return tunnel.create_download_session(f"{table}", partition_spec=f"dt={dt}") def get_all_record_batch(self, table: str, dt: str) -> list: tunnel = TableTunnel(self.odps) download_session = tunnel.create_download_session(f"{table}", partition_spec=f"dt={dt}") count = download_session.count print(f"表: {table} 中的分区 {dt}, 共有 {count} 条数据") result = [] with download_session.open_arrow_reader(0, download_session.count) as reader: for batch in reader: result.append(batch) return result @classmethod def get_all_partition_info(cls, table_name: str, page_num=1, page_size=30, project="loghubods") -> list[dict]: header = { "cookie": 'yunpk=1894469520484605; t=35f3266f9f98d4a391e691a92b49fae6; cna=8+1iHgu92GsCAT0whRrtEI4u; login_aliyunid_pk=1894469520484605; sca=2176fa73; bd=s0ouCmI%3D; copilot-session-id-dw-copilot=6e694b9e-aa2e-46fd-9ef5-77d4680755f1; help_csrf=doehZiDyB3oB1Z%2Fn3cPDTOjfQgZgfK3SmTSveK6mkcuU30ul8euzz4E%2BkHqlvnYgQ6A2GgsUAGTPFblrZ8s7xJmv7zwWDzSffe4ceCSGnoEo0tIfCoPvPcutqc2iScScUmvCqxLY9dxJnl9Dag0adw%3D%3D; cr_token=83b9653a-28d7-4981-af91-45f5828cd63b; _samesite_flag_=true; cookie2=1e2ab9af438ed18a85b4e68fbf4956d5; _tb_token_=e3f046f7b7bfe; aliyun_lang=zh; bs_n_lang=zh_CN; channel=8wCMqSyvya9kL6%2FhVYk8I9%2FA3WeWi2hokbeOVqhVRITWwr5a0UkPdlwrDTBdzn6RkBRhZo10My6Uzbw2JHtgUw%3D%3D; mini_login_aliyunid_ticket=3R5H3e3HY2c8gwLZuY5GmS7J.111MM3XnjoksUfqy6JmzYnFupvBR3MadyoMA6m77LnmZsTtAvFwmgEhCpSMbYYF3gFcaWV5S8zyAnxVRurGVyx1x9MRsdPDS63u3uacY7qpmaCcPoEFEkjQZMo6obYTYwHQYTuCzpMLXevXEyCPkr2aZutSbHfR9XmamQjn398bzuMsrwf8GdYUo3daQV23FmzguJNpg65o6DjGK2vjBt5Sk6ZbwuMDUnnzme5FWsa6bgkr8jChZayUZYQtjDQ4uDFctg737uxujFVg4SX81HBSfiyo8hkA7.1QDta7XUwaQYeHd6D7JAtTVDf2MGk1kTRPDCTnxuQC6kRXuxfzQhZYKSFoYtch5WW; login_current_pk=208811208914639646; activeRegionId=cn-hangzhou; dcd-v2-theme-guide=true; aliyun_site=CN; an=zhaohaipeng; lg=true; sg=g26; atpsida=136ca56e4531af814cf15d3b_1726713657_1; cnaui=%2522zhaohaipeng%2520%2540%25201894469520484605%2522; aui=%2522zhaohaipeng%2520%2540%25201894469520484605%2522; partitioned_cookie_flag=addPartitioned; login_aliyunid_csrf=_csrf_tk_1929015259855532; login_aliyunid="zhaohaipeng @ 1894469520484605"; login_aliyunid_ticket=3R5H3e3HY2c8q5WiJ2aXp2hv.1118PE7F75kkWBaGPUHxqUaKscHHhnLAQAwyE2aWXpFEXmYWBnMVZEY8QbdBwpSfgPTNnwGmpe6ue6dfb2xiYMsRV61QKswPE49AnW91XQhEpFLREhNQLb2QXDdnexjMQH4TaG6Kz2JLEEfphwf8FPGPit1VUxtBpu7wt9BkdjW57PdWnAX.1QDta7czWSQjDrrwgVSiZ3wJJLu4Vwb68JTMjjxps7a1b6FUS2xzfksHeetvHhsGc; login_aliyunid_sc=3R5H3e3HY2c8q5WiJ2aXp2hw.1113icixdWa5mrCSurJyEzCxSFLmki8uEvFHoRsb1rS3a5CJUPFANa1EWZw4K7gXeGK2vp.2mWNaj2VCa1FGk8Yu4nucsBgDw4d1XUAk1paqTuk9mMgZNrDWCwurTXCRqDSYcqZww; c_token=af32009647b42b0446119173fb306120; ck2=d5768ba1c6b7fc15b200ac30967777e5; dw_bff=208811208914639646.%E8%B5%B5%E6%B5%B7%E9%B9%8F.42901.1726805274350.1894469520484605.2067139635.26842.OFFICIAL.0.1726978075908.zhaohaipeng.5.09f4b70be2928a5243a5b78bc88c6b06d4ff13add; currentRegionId=cn-hangzhou; tfstk=fvFS4QAaPkF2oH6PCeQqCygAvVGIvk1N9egLSydyJbh-RXagA3orLuoLhlqBTa4dEw1QzPMEpgczOoZL0YEFEy-CpuqDU0I3zDZbDlneaznEOvZ0f0nPxX8unlz9a7WoTBGotXINb15wrzcn9OxSZa-kk2u5TQ3-euzJyVINb15VeEhCVGyUuHM2l2mxy2n-eZgxSVhKJ4hdlm3ISpK8vkUAl23KJBnKJEKx52hKvkhdlrLonegVjm45oI8r7jfBmynX9CLizYC_lpRWNSgSLWHocqu4G4MSXxoGfFN_WzFiRWb9dDaUeuu0A1sjM7Z7wxGCOIcz5-ESh8sBGx2YSWHbnG9gSzE7BAFRGZqQDP2iw7IHa02LkWn4wwRIXRziGqVFmQnQHJFrEXxRVqN75WwC4pAZfvUWdEMMOqiNlZ9HKtiLqiLGXHoKeq0VUZ_XQvM-oqGhlZ9HKY3muw_fldkC.; isg=BJ-fkvFoBbvh2gKR3u8xm_VoLvUpBPOmiEehEDHums6mwLZCOtew90LWg1C-2Mse' } url = f"https://bff-cn-hangzhou.data.aliyun.com/dma/listPartitions_2?pageSize={page_size}&pageNum={page_num}&entityGuid=odps.{project}.{table_name}&entityType=odps-table" print(f"请求的阿里云接口为: {url}") response = requests.get(url, headers=header) resp_json = json.loads(response.text) result = [] dt_hh = re.compile(r'dt=(\d{8})/hh=(\d{2})') dt_hh_mm = re.compile(r'dt=(\d{8})/hh=(\d{2})/mm=(\d{2})') for datum in resp_json["data"]["data"]: s1 = dt_hh.search(datum['name']) # s2 = dt_hh_mm.search(datum['name']) partition = datum['name'] if s1: partition = f"{s1.group(1)}{s1.group(2)}" item = { "表名": table_name, "name": datum["name"], "分区": partition, "数据量": datum["recordCount"], "数据大小": convert_util.byte_convert(datum['dataSize']), "创建时间": date_util.ts_cover_str(datum["gmtCreate"]), "更新时间": date_util.ts_cover_str(datum['gmtModified']) } result.append(item) return result