| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051 |
- from typing import Optional
- from alibabacloud_r_kvstore20150101.client import Client as KVStoreClient
- from alibabacloud_r_kvstore20150101.models import *
- from alibabacloud_tea_openapi import models
- from resource.enums.region import Region
- class KVStoreService(object):
- def __init__(self):
- self.region_client_map: Dict[Region, KVStoreClient] = {}
- self.access_key_id = "LTAI4GBWbFvvXoXsSVBe1o9f"
- self.access_key_secret = "kRAikWitb4kDxaAyBqNrmLmllMEDO3"
- def create_client(self, region: Region) -> KVStoreClient:
- config = models.Config(
- access_key_id=self.access_key_id,
- access_key_secret=self.access_key_secret,
- endpoint=region.endpoint,
- )
- return KVStoreClient(config)
- def get_kvstore_client(self, region: Region) -> KVStoreClient:
- if region in self.region_client_map:
- return self.region_client_map[region]
- client = self.create_client(region)
- self.region_client_map[region] = client
- return client
- def get_all_instance(self, region: Region) -> List[DescribeInstancesResponseBodyInstancesKVStoreInstance]:
- page_number = 1
- result: List[DescribeInstancesResponseBodyInstancesKVStoreInstance] = []
- while True:
- response = self.describe_instances(region, page_number)
- if not response or not response.kvstore_instance:
- break
- result.extend(response.kvstore_instance)
- page_number += 1
- return result
- def describe_instances(self, region: Region, page_number: int = 1, page_size: int = 50) -> Optional[DescribeInstancesResponseBodyInstances]:
- request = DescribeInstancesRequest(region_id=region.region, page_number=page_number, page_size=page_size)
- client = self.get_kvstore_client(region)
- return client.describe_instances(request).body.instances
- def describe_instance_attribute(self, instance_id: str, region: Region) -> Optional[DescribeInstanceAttributeResponseBodyInstancesDBInstanceAttribute]:
- request = DescribeInstanceAttributeRequest(instance_id=instance_id)
- response = self.get_kvstore_client(region).describe_instance_attribute(request)
- return response.body.instances.dbinstance_attribute
|