import json import requests header = { "Context-Type": "application/json", "Accept": "application/json" } class EurekaClient(object): def __init__(self, base_url): self.base_url = base_url def get_apps_info(self, app_id: str) -> dict: url = f"{self.base_url}/eureka/apps/{app_id}" print(f"获取{app_id}下的实例信息: {url}") return json.loads(requests.get(url, headers=header).content) def app_instance_up(self, app_id: str, instance_id: str): self.app_update_status(app_id, instance_id, "UP") def app_instance_out(self, app_id: str, instance_id: str): self.app_update_status(app_id, instance_id, "OUT_OF_SERVICE") def app_update_status(self, app_id: str, instance_id: str, status: str): try: url = f"{self.base_url}/eureka/apps/{app_id}/{instance_id}/status?value={status}" resp = requests.put(url, headers=header) print( f"修改{app_id}下,实例{instance_id}的状态为{status}\n" f"\t 请求URL: {url} \n" f"\t 返回状态码: {resp.status_code}, 返回信息: {resp.content}" ) except Exception as e: print(f"修改{app_id}下,实例{instance_id}的状态为{status}异常: {e}")