1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283 |
- import json
- import re
- import requests
- from odps import ODPS
- from odps.tunnel import TableTunnel
- from util import convert_util, date_util
- class ODPSClient(object):
- def __init__(self, project="loghubods"):
- self.accessId = "LTAIWYUujJAm7CbH"
- self.accessSecret = "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P"
- self.endpoint = "http://service.odps.aliyun.com/api"
- self.tunnelUrl = "http://dt.cn-hangzhou.maxcompute.aliyun-inc.com"
- self.odps = ODPS(
- self.accessId,
- self.accessSecret,
- project,
- self.endpoint
- )
- def get_all_record(self, table: str, dt: str) -> list:
- tunnel = TableTunnel(self.odps)
- download_session = tunnel.create_download_session(f"{table}", partition_spec=f"dt={dt}")
- count = download_session.count
- print(f"表: {table} 中的分区 {dt}, 共有 {count} 条数据")
- result = []
- with download_session.open_record_reader(0, download_session.count) as reader:
- for record in reader:
- result.append(record)
- return result
- def get_download_session(self, table: str, dt: str):
- tunnel = TableTunnel(self.odps)
- return tunnel.create_download_session(f"{table}", partition_spec=f"dt={dt}")
- def get_all_record_batch(self, table: str, dt: str) -> list:
- tunnel = TableTunnel(self.odps)
- download_session = tunnel.create_download_session(f"{table}", partition_spec=f"dt={dt}")
- count = download_session.count
- print(f"表: {table} 中的分区 {dt}, 共有 {count} 条数据")
- result = []
- with download_session.open_arrow_reader(0, download_session.count) as reader:
- for batch in reader:
- result.append(batch)
- return result
- @classmethod
- def get_all_partition_info(cls, table_name: str, page_num=1, page_size=30, project="loghubods") -> list[dict]:
- header = {
- "cookie": 'yunpk=1894469520484605; t=35f3266f9f98d4a391e691a92b49fae6; cna=8+1iHgu92GsCAT0whRrtEI4u; login_aliyunid_pk=1894469520484605; sca=2176fa73; bd=s0ouCmI%3D; copilot-session-id-dw-copilot=6e694b9e-aa2e-46fd-9ef5-77d4680755f1; help_csrf=doehZiDyB3oB1Z%2Fn3cPDTOjfQgZgfK3SmTSveK6mkcuU30ul8euzz4E%2BkHqlvnYgQ6A2GgsUAGTPFblrZ8s7xJmv7zwWDzSffe4ceCSGnoEo0tIfCoPvPcutqc2iScScUmvCqxLY9dxJnl9Dag0adw%3D%3D; cr_token=83b9653a-28d7-4981-af91-45f5828cd63b; _samesite_flag_=true; cookie2=1e2ab9af438ed18a85b4e68fbf4956d5; _tb_token_=e3f046f7b7bfe; aliyun_lang=zh; bs_n_lang=zh_CN; channel=8wCMqSyvya9kL6%2FhVYk8I9%2FA3WeWi2hokbeOVqhVRITWwr5a0UkPdlwrDTBdzn6RkBRhZo10My6Uzbw2JHtgUw%3D%3D; mini_login_aliyunid_ticket=3R5H3e3HY2c8gwLZuY5GmS7J.111MM3XnjoksUfqy6JmzYnFupvBR3MadyoMA6m77LnmZsTtAvFwmgEhCpSMbYYF3gFcaWV5S8zyAnxVRurGVyx1x9MRsdPDS63u3uacY7qpmaCcPoEFEkjQZMo6obYTYwHQYTuCzpMLXevXEyCPkr2aZutSbHfR9XmamQjn398bzuMsrwf8GdYUo3daQV23FmzguJNpg65o6DjGK2vjBt5Sk6ZbwuMDUnnzme5FWsa6bgkr8jChZayUZYQtjDQ4uDFctg737uxujFVg4SX81HBSfiyo8hkA7.1QDta7XUwaQYeHd6D7JAtTVDf2MGk1kTRPDCTnxuQC6kRXuxfzQhZYKSFoYtch5WW; login_current_pk=208811208914639646; activeRegionId=cn-hangzhou; dcd-v2-theme-guide=true; aliyun_site=CN; an=zhaohaipeng; lg=true; sg=g26; atpsida=136ca56e4531af814cf15d3b_1726713657_1; cnaui=%2522zhaohaipeng%2520%2540%25201894469520484605%2522; aui=%2522zhaohaipeng%2520%2540%25201894469520484605%2522; partitioned_cookie_flag=addPartitioned; login_aliyunid_csrf=_csrf_tk_1929015259855532; login_aliyunid="zhaohaipeng @ 1894469520484605"; login_aliyunid_ticket=3R5H3e3HY2c8q5WiJ2aXp2hv.1118PE7F75kkWBaGPUHxqUaKscHHhnLAQAwyE2aWXpFEXmYWBnMVZEY8QbdBwpSfgPTNnwGmpe6ue6dfb2xiYMsRV61QKswPE49AnW91XQhEpFLREhNQLb2QXDdnexjMQH4TaG6Kz2JLEEfphwf8FPGPit1VUxtBpu7wt9BkdjW57PdWnAX.1QDta7czWSQjDrrwgVSiZ3wJJLu4Vwb68JTMjjxps7a1b6FUS2xzfksHeetvHhsGc; login_aliyunid_sc=3R5H3e3HY2c8q5WiJ2aXp2hw.1113icixdWa5mrCSurJyEzCxSFLmki8uEvFHoRsb1rS3a5CJUPFANa1EWZw4K7gXeGK2vp.2mWNaj2VCa1FGk8Yu4nucsBgDw4d1XUAk1paqTuk9mMgZNrDWCwurTXCRqDSYcqZww; c_token=af32009647b42b0446119173fb306120; ck2=d5768ba1c6b7fc15b200ac30967777e5; dw_bff=208811208914639646.%E8%B5%B5%E6%B5%B7%E9%B9%8F.42901.1726805274350.1894469520484605.2067139635.26842.OFFICIAL.0.1726978075908.zhaohaipeng.5.09f4b70be2928a5243a5b78bc88c6b06d4ff13add; currentRegionId=cn-hangzhou; tfstk=fvFS4QAaPkF2oH6PCeQqCygAvVGIvk1N9egLSydyJbh-RXagA3orLuoLhlqBTa4dEw1QzPMEpgczOoZL0YEFEy-CpuqDU0I3zDZbDlneaznEOvZ0f0nPxX8unlz9a7WoTBGotXINb15wrzcn9OxSZa-kk2u5TQ3-euzJyVINb15VeEhCVGyUuHM2l2mxy2n-eZgxSVhKJ4hdlm3ISpK8vkUAl23KJBnKJEKx52hKvkhdlrLonegVjm45oI8r7jfBmynX9CLizYC_lpRWNSgSLWHocqu4G4MSXxoGfFN_WzFiRWb9dDaUeuu0A1sjM7Z7wxGCOIcz5-ESh8sBGx2YSWHbnG9gSzE7BAFRGZqQDP2iw7IHa02LkWn4wwRIXRziGqVFmQnQHJFrEXxRVqN75WwC4pAZfvUWdEMMOqiNlZ9HKtiLqiLGXHoKeq0VUZ_XQvM-oqGhlZ9HKY3muw_fldkC.; isg=BJ-fkvFoBbvh2gKR3u8xm_VoLvUpBPOmiEehEDHums6mwLZCOtew90LWg1C-2Mse'
- }
- url = f"https://bff-cn-hangzhou.data.aliyun.com/dma/listPartitions_2?pageSize={page_size}&pageNum={page_num}&entityGuid=odps.{project}.{table_name}&entityType=odps-table"
- print(f"请求的阿里云接口为: {url}")
- response = requests.get(url, headers=header)
- resp_json = json.loads(response.text)
- result = []
- dt_hh = re.compile(r'dt=(\d{8})/hh=(\d{2})')
- dt_hh_mm = re.compile(r'dt=(\d{8})/hh=(\d{2})/mm=(\d{2})')
- for datum in resp_json["data"]["data"]:
- s1 = dt_hh.search(datum['name'])
- # s2 = dt_hh_mm.search(datum['name'])
- partition = datum['name']
- if s1:
- partition = f"{s1.group(1)}{s1.group(2)}"
- item = {
- "表名": table_name,
- "name": datum["name"],
- "分区": partition,
- "数据量": datum["recordCount"],
- "数据大小": convert_util.byte_convert(datum['dataSize']),
- "创建时间": date_util.ts_cover_str(datum["gmtCreate"]),
- "更新时间": date_util.ts_cover_str(datum['gmtModified'])
- }
- result.append(item)
- return result
|