12345678910111213141516171819202122232425262728293031323334353637 |
- import json
- import requests
- header = {
- "Context-Type": "application/json",
- "Accept": "application/json"
- }
- class EurekaClient(object):
- def __init__(self, base_url):
- self.base_url = base_url
- def get_apps_info(self, app_id: str) -> dict:
- url = f"{self.base_url}/eureka/apps/{app_id}"
- print(f"获取{app_id}下的实例信息: {url}")
- return json.loads(requests.get(url, headers=header).content)
- def app_instance_up(self, app_id: str, instance_id: str):
- self.app_update_status(app_id, instance_id, "UP")
- def app_instance_out(self, app_id: str, instance_id: str):
- self.app_update_status(app_id, instance_id, "OUT_OF_SERVICE")
- def app_update_status(self, app_id: str, instance_id: str, status: str):
- try:
- url = f"{self.base_url}/eureka/apps/{app_id}/{instance_id}/status?value={status}"
- resp = requests.put(url, headers=header)
- print(
- f"修改{app_id}下,实例{instance_id}的状态为{status}\n"
- f"\t 请求URL: {url} \n"
- f"\t 返回状态码: {resp.status_code}, 返回信息: {resp.content}"
- )
- except Exception as e:
- print(f"修改{app_id}下,实例{instance_id}的状态为{status}异常: {e}")
|