KVStoreService.py 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051
  1. from typing import Optional
  2. from alibabacloud_r_kvstore20150101.client import Client as KVStoreClient
  3. from alibabacloud_r_kvstore20150101.models import *
  4. from alibabacloud_tea_openapi import models
  5. from resource.enums.region import Region
  6. class KVStoreService(object):
  7. def __init__(self):
  8. self.region_client_map: Dict[Region, KVStoreClient] = {}
  9. self.access_key_id = "LTAI4GBWbFvvXoXsSVBe1o9f"
  10. self.access_key_secret = "kRAikWitb4kDxaAyBqNrmLmllMEDO3"
  11. def create_client(self, region: Region) -> KVStoreClient:
  12. config = models.Config(
  13. access_key_id=self.access_key_id,
  14. access_key_secret=self.access_key_secret,
  15. endpoint=region.endpoint,
  16. )
  17. return KVStoreClient(config)
  18. def get_kvstore_client(self, region: Region) -> KVStoreClient:
  19. if region in self.region_client_map:
  20. return self.region_client_map[region]
  21. client = self.create_client(region)
  22. self.region_client_map[region] = client
  23. return client
  24. def get_all_instance(self, region: Region) -> List[DescribeInstancesResponseBodyInstancesKVStoreInstance]:
  25. page_number = 1
  26. result: List[DescribeInstancesResponseBodyInstancesKVStoreInstance] = []
  27. while True:
  28. response = self.describe_instances(region, page_number)
  29. if not response or not response.kvstore_instance:
  30. break
  31. result.extend(response.kvstore_instance)
  32. page_number += 1
  33. return result
  34. def describe_instances(self, region: Region, page_number: int = 1, page_size: int = 50) -> Optional[DescribeInstancesResponseBodyInstances]:
  35. request = DescribeInstancesRequest(region_id=region.region, page_number=page_number, page_size=page_size)
  36. client = self.get_kvstore_client(region)
  37. return client.describe_instances(request).body.instances
  38. def describe_instance_attribute(self, instance_id: str, region: Region) -> Optional[DescribeInstanceAttributeResponseBodyInstancesDBInstanceAttribute]:
  39. request = DescribeInstanceAttributeRequest(instance_id=instance_id)
  40. response = self.get_kvstore_client(region).describe_instance_attribute(request)
  41. return response.body.instances.dbinstance_attribute