EurekaClient.py 1.3 KB

12345678910111213141516171819202122232425262728293031323334353637
  1. import json
  2. import requests
  3. header = {
  4. "Context-Type": "application/json",
  5. "Accept": "application/json"
  6. }
  7. class EurekaClient(object):
  8. def __init__(self, base_url):
  9. self.base_url = base_url
  10. def get_apps_info(self, app_id: str) -> dict:
  11. url = f"{self.base_url}/eureka/apps/{app_id}"
  12. print(f"获取{app_id}下的实例信息: {url}")
  13. return json.loads(requests.get(url, headers=header).content)
  14. def app_instance_up(self, app_id: str, instance_id: str):
  15. self.app_update_status(app_id, instance_id, "UP")
  16. def app_instance_out(self, app_id: str, instance_id: str):
  17. self.app_update_status(app_id, instance_id, "OUT_OF_SERVICE")
  18. def app_update_status(self, app_id: str, instance_id: str, status: str):
  19. try:
  20. url = f"{self.base_url}/eureka/apps/{app_id}/{instance_id}/status?value={status}"
  21. resp = requests.put(url, headers=header)
  22. print(
  23. f"修改{app_id}下,实例{instance_id}的状态为{status}\n"
  24. f"\t 请求URL: {url} \n"
  25. f"\t 返回状态码: {resp.status_code}, 返回信息: {resp.content}"
  26. )
  27. except Exception as e:
  28. print(f"修改{app_id}下,实例{instance_id}的状态为{status}异常: {e}")