client.py 21 KB


  1. # Licensed to the Apache Software Foundation (ASF) under one
  2. # or more contributor license agreements. See the NOTICE file
  3. # distributed with this work for additional information
  4. # regarding copyright ownership. The ASF licenses this file
  5. # to you under the Apache License, Version 2.0 (the
  6. # "License"); you may not use this file except in compliance
  7. # with the License. You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. #
  12. #
  13. # Unless required by applicable law or agreed to in writing,
  14. # software distributed under the License is distributed on an
  15. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  16. # KIND, either express or implied. See the License for the
  17. # specific language governing permissions and limitations
  18. # under the License.
  19. # coding=utf-8
  20. import time
  21. import warnings
  22. import json
  23. import logging
  24. import jmespath
  25. import copy
  26. import platform
  27. import sys
  28. import aliyunsdkcore
  29. from aliyunsdkcore.vendored.six.moves.urllib.parse import urlencode
  30. from aliyunsdkcore.vendored.requests import codes
  31. from aliyunsdkcore.acs_exception.exceptions import ClientException
  32. from aliyunsdkcore.acs_exception.exceptions import ServerException
  33. from aliyunsdkcore.acs_exception import error_code, error_msg
  34. from aliyunsdkcore.http.http_response import HttpResponse
  35. from aliyunsdkcore.request import AcsRequest
  36. from aliyunsdkcore.http import format_type
  37. from aliyunsdkcore.auth.signers.signer_factory import SignerFactory
  38. from aliyunsdkcore.request import CommonRequest
  39. from aliyunsdkcore.endpoint.resolver_endpoint_request import ResolveEndpointRequest
  40. from aliyunsdkcore.endpoint.default_endpoint_resolver import DefaultEndpointResolver
  41. import aliyunsdkcore.retry.retry_policy as retry_policy
  42. from aliyunsdkcore.retry.retry_condition import RetryCondition
  43. from aliyunsdkcore.retry.retry_policy_context import RetryPolicyContext
  44. import aliyunsdkcore.utils
  45. import aliyunsdkcore.utils.parameter_helper
  46. import aliyunsdkcore.utils.validation
  47. from aliyunsdkcore.vendored.requests.structures import CaseInsensitiveDict
  48. from aliyunsdkcore.vendored.requests.structures import OrderedDict
  49. from aliyunsdkcore import compat
  50. from aliyunsdkcore.vendored.requests import Session
  51. from aliyunsdkcore.vendored.requests.adapters import HTTPAdapter
  52. """
  53. Acs default client module.
  54. """
  55. DEFAULT_READ_TIMEOUT = 10
  56. DEFAULT_CONNECTION_TIMEOUT = 5
  57. # number of keep-alive connections
  58. DEFAULT_POOL_CONNECTIONS = 10
  59. # TODO: replace it with TimeoutHandler
  60. _api_timeout_config_data = aliyunsdkcore.utils._load_json_from_data_dir("timeout_config.json")
  61. logger = logging.getLogger(__name__)
  62. class AcsClient:
  63. LOG_FORMAT = '%(thread)d %(asctime)s %(name)s %(levelname)s %(message)s'
  64. def __init__(
  65. self,
  66. ak=None,
  67. secret=None,
  68. region_id="cn-hangzhou",
  69. auto_retry=True,
  70. max_retry_time=None,
  71. user_agent=None,
  72. port=80,
  73. connect_timeout=None,
  74. timeout=None,
  75. public_key_id=None,
  76. private_key=None,
  77. session_period=3600,
  78. credential=None,
  79. debug=False,
  80. verify=None,
  81. pool_size=10,
  82. proxy=None
  83. ):
  84. """
  85. constructor for AcsClient
  86. :param ak: String, access key id
  87. :param secret: String, access key secret
  88. :param region_id: String, region id
  89. :param auto_retry: Boolean
  90. :param max_retry_time: Number
  91. :param pool_size:
  92. In a multithreaded environment,
  93. you should set the maxsize of the pool to a higher number,
  94. such as the number of threads.
  95. :return:
  96. """
  97. self._max_retry_num = max_retry_time
  98. self._auto_retry = auto_retry
  99. self._ak = ak
  100. self._secret = secret
  101. self._region_id = region_id
  102. self._user_agent = user_agent
  103. self._port = port
  104. self._connect_timeout = connect_timeout
  105. self._read_timeout = timeout
  106. self._extra_user_agent = {}
  107. self._verify = verify
  108. credential = {
  109. 'ak': ak,
  110. 'secret': secret,
  111. 'public_key_id': public_key_id,
  112. 'private_key': private_key,
  113. 'session_period': session_period,
  114. 'credential': credential,
  115. }
  116. self._signer = SignerFactory.get_signer(
  117. credential, region_id, self._implementation_of_do_action, debug)
  118. self._endpoint_resolver = DefaultEndpointResolver(self)
  119. self.session = Session()
  120. self.session.mount('https://', HTTPAdapter(DEFAULT_POOL_CONNECTIONS, pool_size))
  121. self.session.mount('http://', HTTPAdapter(DEFAULT_POOL_CONNECTIONS, pool_size))
  122. if self._auto_retry:
  123. self._retry_policy = retry_policy.get_default_retry_policy(
  124. max_retry_times=self._max_retry_num)
  125. else:
  126. self._retry_policy = retry_policy.NO_RETRY_POLICY
  127. self.proxy = proxy
  128. def get_region_id(self):
  129. return self._region_id
  130. def get_access_key(self):
  131. return self._ak
  132. def get_access_secret(self):
  133. return self._secret
  134. def is_auto_retry(self):
  135. return self._auto_retry
  136. def get_max_retry_num(self):
  137. return self._max_retry_num
  138. def get_user_agent(self):
  139. return self._user_agent
  140. def get_verify(self):
  141. return self._verify
  142. def set_region_id(self, region):
  143. self._region_id = region
  144. def set_max_retry_num(self, num):
  145. self._max_retry_num = num
  146. def set_auto_retry(self, flag):
  147. """
  148. set whether or not the client perform auto-retry
  149. :param flag: Booleans
  150. :return: None
  151. """
  152. self._auto_retry = flag
  153. def set_user_agent(self, agent):
  154. """
  155. User agent set to client will overwrite the request setting.
  156. :param agent:
  157. :return:
  158. """
  159. self._user_agent = agent
  160. def set_verify(self, verify):
  161. self._verify = verify
  162. def append_user_agent(self, key, value):
  163. self._extra_user_agent.update({key: value})
  164. @staticmethod
  165. def user_agent_header():
  166. base = '%s (%s %s;%s)' \
  167. % ('AlibabaCloud',
  168. platform.system(),
  169. platform.release(),
  170. platform.machine()
  171. )
  172. return base
  173. @staticmethod
  174. def default_user_agent():
  175. default_agent = OrderedDict()
  176. default_agent['Python'] = platform.python_version()
  177. default_agent['Core'] = __import__('aliyunsdkcore').__version__
  178. default_agent['python-requests'] = __import__(
  179. 'aliyunsdkcore.vendored.requests.__version__', globals(), locals(),
  180. ['vendored', 'requests', '__version__'], 0).__version__
  181. return CaseInsensitiveDict(default_agent)
  182. def client_user_agent(self):
  183. client_user_agent = {}
  184. if self.get_user_agent() is not None:
  185. client_user_agent.update({'client': self.get_user_agent()})
  186. else:
  187. client_user_agent.update(self._extra_user_agent)
  188. return CaseInsensitiveDict(client_user_agent)
  189. def get_port(self):
  190. return self._port
  191. def get_location_service(self):
  192. return None
  193. @staticmethod
  194. def merge_user_agent(default_agent, extra_agent):
  195. if default_agent is None:
  196. return extra_agent
  197. if extra_agent is None:
  198. return default_agent
  199. user_agent = default_agent.copy()
  200. for key, value in extra_agent.items():
  201. if key not in default_agent:
  202. user_agent[key] = value
  203. return user_agent
  204. def __del__(self):
  205. if self.session:
  206. self.session.close()
  207. def handle_extra_agent(self, request):
  208. client_agent = self.client_user_agent()
  209. request_agent = request.request_user_agent()
  210. if client_agent is None:
  211. return request_agent
  212. if request_agent is None:
  213. return client_agent
  214. for key in request_agent:
  215. if key in client_agent:
  216. client_agent.pop(key)
  217. client_agent.update(request_agent)
  218. return client_agent
  219. def _make_http_response(self, endpoint, request, read_timeout, connect_timeout,
  220. specific_signer=None):
  221. body_params = request.get_body_params()
  222. if body_params:
  223. content_type = request.get_headers().get('Content-Type')
  224. if content_type and format_type.APPLICATION_JSON in content_type:
  225. body = json.dumps(body_params)
  226. request.set_content(body)
  227. elif content_type and format_type.APPLICATION_XML in content_type:
  228. body = aliyunsdkcore.utils.parameter_helper.to_xml(body_params)
  229. request.set_content(body)
  230. else:
  231. body = urlencode(body_params)
  232. request.set_content(body)
  233. request.set_content_type(format_type.APPLICATION_FORM)
  234. elif request.get_content() and "Content-Type" not in request.get_headers():
  235. request.set_content_type(format_type.APPLICATION_OCTET_STREAM)
  236. method = request.get_method()
  237. if isinstance(request, CommonRequest):
  238. request.trans_to_acs_request()
  239. signer = self._signer if specific_signer is None else specific_signer
  240. header, url = signer.sign(self._region_id, request)
  241. base = self.user_agent_header()
  242. extra_agent = self.handle_extra_agent(request)
  243. default_agent = self.default_user_agent()
  244. user_agent = self.merge_user_agent(default_agent, extra_agent)
  245. for key, value in user_agent.items():
  246. base += ' %s/%s' % (key, value)
  247. header['User-Agent'] = base
  248. header['x-sdk-client'] = 'python/2.0.0'
  249. protocol = request.get_protocol_type()
  250. response = HttpResponse(
  251. endpoint,
  252. url,
  253. method,
  254. header,
  255. protocol,
  256. request.get_content(),
  257. self._port,
  258. read_timeout=read_timeout,
  259. connect_timeout=connect_timeout,
  260. verify=self.get_verify(),
  261. session=self.session,
  262. proxy=self.proxy
  263. )
  264. if body_params:
  265. response.set_content(body, "utf-8", request.get_headers().get('Content-Type'))
  266. return response
  267. def _implementation_of_do_action(self, request, signer=None):
  268. if not isinstance(request, AcsRequest):
  269. raise ClientException(
  270. error_code.SDK_INVALID_REQUEST,
  271. error_msg.get_msg('SDK_INVALID_REQUEST'))
  272. # modify Accept-Encoding
  273. request.add_header('Accept-Encoding', 'identity')
  274. if request.endpoint:
  275. endpoint = request.endpoint
  276. else:
  277. endpoint = self._resolve_endpoint(request)
  278. return self._handle_retry_and_timeout(endpoint, request, signer)
  279. def implementation_of_do_action(self, request, signer=None):
  280. # keep compatible
  281. warnings.warn(
  282. "implementation_of_do_action() method is deprecated",
  283. DeprecationWarning)
  284. status, headers, body, exception = self._implementation_of_do_action(request, signer)
  285. return status, headers, body
  286. def _add_request_client_token(self, request):
  287. if hasattr(request, "set_ClientToken") and hasattr(request, "get_ClientToken"):
  288. client_token = request.get_ClientToken()
  289. if not client_token:
  290. # ClientToken has not been set
  291. client_token = aliyunsdkcore.utils.parameter_helper.get_uuid() # up to 60 chars
  292. request.set_ClientToken(client_token)
  293. def _get_request_read_timeout(self, request):
  294. # TODO: replace it with a timeout_handler
  295. if request._request_read_timeout:
  296. return request._request_read_timeout
  297. # if self._timeout:
  298. # return self._timeout
  299. if self._read_timeout:
  300. return self._read_timeout
  301. if request.get_product() is None:
  302. return DEFAULT_READ_TIMEOUT
  303. path = '"{0}"."{1}"."{2}"'.format(request.get_product().lower(), request.get_version(),
  304. request.get_action_name())
  305. timeout = jmespath.search(path, _api_timeout_config_data)
  306. if timeout is None:
  307. return DEFAULT_READ_TIMEOUT
  308. else:
  309. aliyunsdkcore.utils.validation.assert_integer_positive(timeout, "timeout")
  310. return max(timeout, DEFAULT_READ_TIMEOUT)
  311. def _get_request_connect_timeout(self, request):
  312. if request._request_connect_timeout:
  313. return request._request_connect_timeout
  314. if self._connect_timeout:
  315. return self._connect_timeout
  316. return DEFAULT_CONNECTION_TIMEOUT
  317. def _handle_retry_and_timeout(self, endpoint, request, signer):
  318. # TODO: replace it with a retry_handler
  319. # it's a temporary implementation. the long-term plan will be a group a normalized handlers
  320. # which contains retry_handler and timeout_handler
  321. # decide whether we should initialize a ClientToken for the request
  322. retry_policy_context = RetryPolicyContext(request, None, 0, None)
  323. if self._retry_policy.should_retry(retry_policy_context) & \
  324. RetryCondition.SHOULD_RETRY_WITH_CLIENT_TOKEN:
  325. self._add_request_client_token(request)
  326. request_read_timeout = self._get_request_read_timeout(request)
  327. request_connect_timeout = self._get_request_connect_timeout(request)
  328. retries = 0
  329. while True:
  330. status, headers, body, exception = self._handle_single_request(endpoint,
  331. request,
  332. request_read_timeout,
  333. request_connect_timeout,
  334. signer)
  335. retry_policy_context = RetryPolicyContext(request, exception, retries, status)
  336. retryable = self._retry_policy.should_retry(retry_policy_context)
  337. if retryable & RetryCondition.NO_RETRY:
  338. break
  339. logger.debug("Retry needed. Request:%s Retries :%d",
  340. request.get_action_name(), retries)
  341. retry_policy_context.retryable = retryable
  342. time_to_sleep = self._retry_policy.compute_delay_before_next_retry(retry_policy_context)
  343. time.sleep(time_to_sleep / 1000.0)
  344. retries += 1
  345. if isinstance(exception, ClientException):
  346. raise exception
  347. return status, headers, body, exception
  348. def _handle_single_request(self, endpoint, request, read_timeout, connect_timeout, signer):
  349. http_response = self._make_http_response(endpoint, request, read_timeout, connect_timeout,
  350. signer)
  351. params = copy.deepcopy(request.get_query_params())
  352. params.pop('AccessKeyId', None)
  353. logger.debug('Request received. Product:%s Endpoint:%s Params: %s',
  354. request.get_product(), endpoint, params)
  355. # Do the actual network thing
  356. try:
  357. status, headers, body = http_response.get_response_object()
  358. except IOError as e:
  359. exception = ClientException(error_code.SDK_HTTP_ERROR, compat.ensure_string('%s' % e))
  360. msg = "HttpError occurred. Host:%s SDK-Version:%s ClientException:%s" % (
  361. endpoint, aliyunsdkcore.__version__, exception)
  362. logger.error(compat.ensure_string(msg))
  363. return None, None, None, exception
  364. exception = self._get_server_exception(status, body, endpoint, request.string_to_sign)
  365. return status, headers, body, exception
  366. @staticmethod
  367. def _parse_error_info_from_response_body(response_body):
  368. error_code_to_return = error_code.SDK_UNKNOWN_SERVER_ERROR
  369. # TODO handle if response_body is too big
  370. error_message_to_return = compat.ensure_string("ServerResponseBody: %s" % (response_body,))
  371. try:
  372. body_obj = json.loads(response_body)
  373. if 'Code' in body_obj:
  374. error_code_to_return = body_obj['Code']
  375. if 'Message' in body_obj:
  376. error_message_to_return = body_obj['Message']
  377. except ValueError:
  378. # failed to parse body as json format
  379. logger.warning('Failed to parse response as json format. Response:%s', response_body)
  380. return error_code_to_return, error_message_to_return
  381. def _get_server_exception(self, http_status, response_body, endpoint, string_to_sign):
  382. request_id = None
  383. try:
  384. body_obj = json.loads(response_body.decode('utf-8'))
  385. request_id = body_obj.get('RequestId')
  386. except (ValueError, TypeError, AttributeError):
  387. # in case the response body is not a json string, return the raw
  388. # data instead
  389. logger.warning('Failed to parse response as json format. Response:%s', response_body)
  390. if http_status < codes.OK or http_status >= codes.MULTIPLE_CHOICES:
  391. server_error_code, server_error_message = self._parse_error_info_from_response_body(
  392. response_body.decode('utf-8'))
  393. if http_status == codes.BAD_REQUEST and server_error_code == 'SignatureDoesNotMatch':
  394. if string_to_sign == server_error_message.split(':')[1]:
  395. server_error_code = 'InvalidAccessKeySecret'
  396. server_error_message = 'The AccessKeySecret is incorrect. ' \
  397. 'Please check your AccessKeyId and AccessKeySecret.'
  398. exception = ServerException(
  399. server_error_code,
  400. server_error_message,
  401. http_status=http_status,
  402. request_id=request_id)
  403. msg = "ServerException occurred. Host:%s SDK-Version:%s ServerException:%s" % (
  404. endpoint, aliyunsdkcore.__version__, exception)
  405. logger.error(compat.ensure_string(msg))
  406. return exception
  407. def do_action_with_exception(self, acs_request):
  408. # set server response format as json, because this function will
  409. # parse the response so which format doesn't matter
  410. acs_request.set_accept_format('JSON')
  411. status, headers, body, exception = self._implementation_of_do_action(acs_request)
  412. if exception:
  413. raise exception
  414. logger.debug('Response received. Product:%s Response-body: %s',
  415. acs_request.get_product(), body)
  416. return body
  417. def _resolve_endpoint(self, request):
  418. if self._region_id:
  419. aliyunsdkcore.utils.validation.validate_pattern(
  420. self._region_id, 'region_id', '^[a-zA-Z0-9_-]+$'
  421. )
  422. if request.product_suffix:
  423. aliyunsdkcore.utils.validation.validate_pattern(
  424. request.product_suffix, 'suffix', '^[a-zA-Z0-9_-]+$'
  425. )
  426. if request.request_network:
  427. aliyunsdkcore.utils.validation.validate_pattern(
  428. request.request_network, 'network', '^[a-zA-Z0-9_-]+$'
  429. )
  430. resolve_request = ResolveEndpointRequest(
  431. self._region_id,
  432. request.get_product(),
  433. request.get_location_service_code(),
  434. request.get_location_endpoint_type(),
  435. )
  436. resolve_request.request_network = request.request_network
  437. resolve_request.product_suffix = request.product_suffix
  438. resolve_request.endpoint_map = request.endpoint_map
  439. resolve_request.endpoint_regional = request.endpoint_regional
  440. return self._endpoint_resolver.resolve(resolve_request)
  441. def do_action(self, acs_request):
  442. warnings.warn(
  443. "do_action() method is deprecated, please use do_action_with_exception() instead.",
  444. DeprecationWarning)
  445. status, headers, body, exception = self._implementation_of_do_action(acs_request)
  446. return body
  447. def get_response(self, acs_request):
  448. return self.implementation_of_do_action(acs_request)
  449. def add_endpoint(self, region_id, product_code, endpoint):
  450. self._endpoint_resolver.put_endpoint_entry(
  451. region_id, product_code, endpoint)
  452. def set_stream_logger(self, log_level=logging.DEBUG, logger_name='aliyunsdkcore', stream=None,
  453. format_string=None):
  454. log = logging.getLogger(logger_name)
  455. log.setLevel(log_level)
  456. ch = logging.StreamHandler(stream)
  457. ch.setLevel(log_level)
  458. if format_string is None:
  459. format_string = self.LOG_FORMAT
  460. formatter = logging.Formatter(format_string)
  461. ch.setFormatter(formatter)
  462. log.addHandler(ch)
  463. def set_file_logger(self, path, log_level=logging.DEBUG, logger_name='aliyunsdkcore'):
  464. log = logging.getLogger(logger_name)
  465. log.setLevel(log_level)
  466. fh = logging.FileHandler(path)
  467. fh.setLevel(log_level)
  468. formatter = logging.Formatter(self.LOG_FORMAT)
  469. fh.setFormatter(formatter)
  470. log.addHandler(fh)