| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364 |
- from datetime import datetime
- from typing import Optional
- import pytz
- from alibabacloud_r_kvstore20150101.client import Client as KVStoreClient
- from alibabacloud_r_kvstore20150101.models import *
- from alibabacloud_tea_openapi import models
- from resource.enums.region import Region
- class KVStoreService(object):
- def __init__(self):
- self.region_client_map: Dict[Region, KVStoreClient] = {}
- self.access_key_id = "LTAI5tRwjztCCwQNBB6nW1dY"
- self.access_key_secret = "NaTnMxrGEJh64tLly7Kb5tr166Xpos"
- def create_client(self, region: Region) -> KVStoreClient:
- config = models.Config(
- access_key_id=self.access_key_id,
- access_key_secret=self.access_key_secret,
- endpoint=region.kv_store_endpoint,
- )
- return KVStoreClient(config)
- def get_kvstore_client(self, region: Region) -> KVStoreClient:
- if region in self.region_client_map:
- return self.region_client_map[region]
- client = self.create_client(region)
- self.region_client_map[region] = client
- return client
- def get_all_instance(self, region: Region) -> List[DescribeInstancesResponseBodyInstancesKVStoreInstance]:
- page_number = 1
- result: List[DescribeInstancesResponseBodyInstancesKVStoreInstance] = []
- while True:
- response = self.describe_instances(region, page_number)
- if not response or not response.kvstore_instance:
- break
- result.extend(response.kvstore_instance)
- page_number += 1
- return result
- def describe_instances(self, region: Region, page_number: int = 1, page_size: int = 50) -> Optional[DescribeInstancesResponseBodyInstances]:
- request = DescribeInstancesRequest(region_id=region.region, page_number=page_number, page_size=page_size)
- client = self.get_kvstore_client(region)
- return client.describe_instances(request).body.instances
- def describe_instance_attribute(self, instance_id: str, region: Region) -> Optional[List[DescribeInstanceAttributeResponseBodyInstancesDBInstanceAttribute]]:
- request = DescribeInstanceAttributeRequest(instance_id=instance_id)
- response = self.get_kvstore_client(region).describe_instance_attribute(request)
- return response.body.instances.dbinstance_attribute
- def describe_history_monitor_values(self, instance_id: str, region: Region, start_time: datetime, end_time: datetime, monitor_keys: str) -> Optional[DescribeHistoryMonitorValuesResponseBody]:
- request = DescribeHistoryMonitorValuesRequest(
- instance_id=instance_id,
- start_time=start_time.astimezone(pytz.UTC).strftime('%Y-%m-%dT%H:%M:%SZ'),
- end_time=end_time.astimezone(pytz.UTC).strftime('%Y-%m-%dT%H:%M:%SZ'),
- monitor_keys=monitor_keys,
- interval_for_history="01m"
- )
- response = self.get_kvstore_client(region).describe_history_monitor_values(request)
- return response.body
|