KVStoreService.py 2.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364
  1. from datetime import datetime
  2. from typing import Optional
  3. import pytz
  4. from alibabacloud_r_kvstore20150101.client import Client as KVStoreClient
  5. from alibabacloud_r_kvstore20150101.models import *
  6. from alibabacloud_tea_openapi import models
  7. from resource.enums.region import Region
  8. class KVStoreService(object):
  9. def __init__(self):
  10. self.region_client_map: Dict[Region, KVStoreClient] = {}
  11. self.access_key_id = "LTAI5tRwjztCCwQNBB6nW1dY"
  12. self.access_key_secret = "NaTnMxrGEJh64tLly7Kb5tr166Xpos"
  13. def create_client(self, region: Region) -> KVStoreClient:
  14. config = models.Config(
  15. access_key_id=self.access_key_id,
  16. access_key_secret=self.access_key_secret,
  17. endpoint=region.kv_store_endpoint,
  18. )
  19. return KVStoreClient(config)
  20. def get_kvstore_client(self, region: Region) -> KVStoreClient:
  21. if region in self.region_client_map:
  22. return self.region_client_map[region]
  23. client = self.create_client(region)
  24. self.region_client_map[region] = client
  25. return client
  26. def get_all_instance(self, region: Region) -> List[DescribeInstancesResponseBodyInstancesKVStoreInstance]:
  27. page_number = 1
  28. result: List[DescribeInstancesResponseBodyInstancesKVStoreInstance] = []
  29. while True:
  30. response = self.describe_instances(region, page_number)
  31. if not response or not response.kvstore_instance:
  32. break
  33. result.extend(response.kvstore_instance)
  34. page_number += 1
  35. return result
  36. def describe_instances(self, region: Region, page_number: int = 1, page_size: int = 50) -> Optional[DescribeInstancesResponseBodyInstances]:
  37. request = DescribeInstancesRequest(region_id=region.region, page_number=page_number, page_size=page_size)
  38. client = self.get_kvstore_client(region)
  39. return client.describe_instances(request).body.instances
  40. def describe_instance_attribute(self, instance_id: str, region: Region) -> Optional[List[DescribeInstanceAttributeResponseBodyInstancesDBInstanceAttribute]]:
  41. request = DescribeInstanceAttributeRequest(instance_id=instance_id)
  42. response = self.get_kvstore_client(region).describe_instance_attribute(request)
  43. return response.body.instances.dbinstance_attribute
  44. def describe_history_monitor_values(self, instance_id: str, region: Region, start_time: datetime, end_time: datetime, monitor_keys: str) -> Optional[DescribeHistoryMonitorValuesResponseBody]:
  45. request = DescribeHistoryMonitorValuesRequest(
  46. instance_id=instance_id,
  47. start_time=start_time.astimezone(pytz.UTC).strftime('%Y-%m-%dT%H:%M:%SZ'),
  48. end_time=end_time.astimezone(pytz.UTC).strftime('%Y-%m-%dT%H:%M:%SZ'),
  49. monitor_keys=monitor_keys,
  50. interval_for_history="01m"
  51. )
  52. response = self.get_kvstore_client(region).describe_history_monitor_values(request)
  53. return response.body