From 315a7b604c3feee9463545d085bf1adeedd9c3b1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Maksymczuk?= Date: Tue, 13 Dec 2022 10:40:17 +0100 Subject: [PATCH 1/5] Rewrite CloudifyClient & HTTPClient The aim is: - a better subclass structure, which will allow useful subclassing in the future (in the future commit) for the async client - handle clustering right here - shorten the tracebacks :) To do this: - split HTTPClient into HTTPClientBase & HTTPClient. Now the Base has all kinds of utils, it's mostly ad-hoc because it was also ad-hoc before, and HTTPClient itself has only a .do_request A future async client would also implement .do_request - copy over all the clustering stuff from cloudify/cluster.py - rewrite the exception throwing :) Note that the .do_request and varius .get etc, now also take a `wrapper` argument, and if that is present, they wrap the response data in it. This will become very important soon! --- cloudify_rest_client/client.py | 592 +++++++++++++++++------------ cloudify_rest_client/exceptions.py | 63 ++- 2 files changed, 407 insertions(+), 248 deletions(-) diff --git a/cloudify_rest_client/client.py b/cloudify_rest_client/client.py index 368d4add2..9a5ba3953 100644 --- a/cloudify_rest_client/client.py +++ b/cloudify_rest_client/client.py @@ -1,6 +1,10 @@ +import itertools import json import logging import numbers +import random +import re +import types import requests from base64 import b64encode @@ -58,7 +62,6 @@ from cloudify_rest_client.workflows import WorkflowsClient from cloudify_rest_client.audit_log import AuditLogClient from cloudify_rest_client.community_contacts import CommunityContactsClient -from cloudify_async_client.audit_log import AuditLogAsyncClient try: from requests_kerberos import HTTPKerberosAuth @@ -80,15 +83,32 @@ urllib3.disable_warnings(urllib3.exceptions.InsecurePlatformWarning) -class HTTPClient(object): +class HTTPClientBase: + def __init__( + self, + host, + port=DEFAULT_PORT, + protocol=DEFAULT_PROTOCOL, + api_version=DEFAULT_API_VERSION, + headers=None, + query_params=None, + cert=None, + trust_all=False, + username=None, + password=None, + token=None, + tenant=None, + kerberos_env=None, + timeout=None, + retries=None, + ): + hosts = list(host) if isinstance(host, list) else [host] + hosts = [ipv6_url_compat(h) for h in hosts] + random.shuffle(hosts) + self.hosts = itertools.cycle(hosts) + self.retries = retries or len(hosts) - def __init__(self, host, port=DEFAULT_PORT, - protocol=DEFAULT_PROTOCOL, api_version=DEFAULT_API_VERSION, - headers=None, query_params=None, cert=None, trust_all=False, - username=None, password=None, token=None, tenant=None, - kerberos_env=None, timeout=None, session=None): self.port = port - self.host = ipv6_url_compat(host) self.protocol = protocol self.api_version = api_version self.kerberos_env = kerberos_env @@ -106,19 +126,169 @@ def __init__(self, host, port=DEFAULT_PORT, log_value=False) self._set_header(constants.CLOUDIFY_TOKEN_AUTHENTICATION_HEADER, token) self._set_header(CLOUDIFY_TENANT_HEADER, tenant) - if session is None: - session = requests.Session() - self._session = session + self._has_kerberos = None - @property - def url(self): - return '{0}://{1}:{2}/api/{3}'.format(self.protocol, self.host, - self.port, self.api_version) - - def has_kerberos(self): if self.kerberos_env is not None: - return self.kerberos_env - return bool(HTTPKerberosAuth) and is_kerberos_env() + self.has_kerberos = True + else: + self.has_kerberos = bool(HTTPKerberosAuth) and is_kerberos_env() + + if self.has_kerberos: + self.auth = self._make_kerberos_auth() + else: + self.auth = None + + def _make_kerberos_auth(self): + if self.has_kerberos and not self.has_auth_header(): + if HTTPKerberosAuth is None: + raise exceptions.CloudifyClientError( + 'Trying to create a client with kerberos, ' + 'but kerberos_env does not exist') + return HTTPKerberosAuth() + + def _get_total_headers(self, headers): + total_headers = self.headers.copy() + if headers: + total_headers.update(headers) + return total_headers + + def _get_total_params(self, params): + total_params = self.query_params.copy() + if params: + total_params.update(params) + return { + k: self._format_querystring_param(v) + for k, v in total_params.items() + if k is not None and v is not None + } + + def _format_querystring_param(self, param): + if isinstance(param, bool): + return str(param) + return param + + def get_host(self): + return next(self.hosts) + + def get_request_url(self, host, uri, versioned=True): + base_url = f'{self.protocol}://{host}:{self.port}/api' + if not versioned: + return f'{base_url}{uri}' + return f'{base_url}/{self.api_version}{uri}' + + def _log_request(self, method, uri, data): + if not self.logger.isEnabledFor(logging.DEBUG): + return + self.logger.debug( + 'Sending request: %s %s; body: %r', + method, + uri, + data if not data or isinstance(data, dict) else '(bytes data)', + ) + + def get(self, uri, data=None, params=None, headers=None, _include=None, + expected_status_code=200, stream=False, timeout=None, + versioned_url=True, wrapper=None): + if _include: + fields = ','.join(_include) + if not params: + params = {} + params['_include'] = fields + + self._log_request('GET', uri, data=data) + return self.do_request( + 'GET', + uri, + data=data, + params=self._get_total_params(params), + headers=self._get_total_headers(headers), + expected_status_code=expected_status_code, + stream=stream, + timeout=timeout, + wrapper=wrapper, + versioned_url=versioned_url, + ) + + def put(self, uri, data=None, params=None, headers=None, + expected_status_code=200, stream=False, timeout=None, + wrapper=None, versioned_url=True): + self._log_request('PUT', uri, data) + return self.do_request( + 'PUT', + uri, + data=data, + params=self._get_total_params(params), + headers=self._get_total_headers(headers), + expected_status_code=expected_status_code, + stream=stream, + timeout=timeout, + wrapper=wrapper, + versioned_url=versioned_url, + ) + + def patch(self, uri, data=None, params=None, headers=None, + expected_status_code=200, stream=False, timeout=None, + wrapper=None, versioned_url=True): + self._log_request('PATCH', uri, data) + return self.do_request( + 'PATCH', + uri, + data=data, + params=self._get_total_params(params), + headers=self._get_total_headers(headers), + expected_status_code=expected_status_code, + stream=stream, + timeout=timeout, + wrapper=wrapper, + versioned_url=versioned_url, + ) + + def post(self, uri, data=None, params=None, headers=None, + expected_status_code=200, stream=False, timeout=None, + wrapper=None, versioned_url=True): + self._log_request('POST', uri, data) + return self.do_request( + 'POST', + uri, + data=data, + params=self._get_total_params(params), + headers=self._get_total_headers(headers), + expected_status_code=expected_status_code, + stream=stream, + timeout=timeout, + wrapper=wrapper, + versioned_url=versioned_url, + ) + + def delete(self, uri, data=None, params=None, headers=None, + expected_status_code=(200, 204), stream=False, timeout=None, + wrapper=None, versioned_url=True): + self._log_request('DELETE', uri, data) + return self.do_request( + 'DELETE', + uri, + data=data, + params=self._get_total_params(params), + headers=self._get_total_headers(headers), + expected_status_code=expected_status_code, + stream=stream, + timeout=timeout, + wrapper=wrapper, + versioned_url=versioned_url, + ) + + def _get_auth_header(self, username, password): + if not username or not password: + return None + credentials = '{0}:{1}'.format(username, password).encode('utf-8') + encoded_credentials = b64encode(credentials).decode('utf-8') + return BASIC_AUTH_PREFIX + ' ' + encoded_credentials + + def _set_header(self, key, value, log_value=True): + if not value: + return + self.headers[key] = value + value = value if log_value else '*' def has_auth_header(self): auth_headers = [constants.CLOUDIFY_AUTHENTICATION_HEADER, @@ -126,60 +296,51 @@ def has_auth_header(self): constants.CLOUDIFY_TOKEN_AUTHENTICATION_HEADER] return any(header in self.headers for header in auth_headers) - def _raise_client_error(self, response, url=None): - try: - result = response.json() - except Exception: - if response.status_code == 304: - error_msg = 'Nothing to modify' - self._prepare_and_raise_exception( - message=error_msg, - error_code='not_modified', - status_code=response.status_code, - server_traceback='') - else: - message = response.content - if url: - message = '{0} [{1}]'.format(message, url) - error_msg = '{0}: {1}'.format(response.status_code, message) - raise exceptions.CloudifyClientError( - error_msg, - status_code=response.status_code, - response=response) - # this can be changed after RD-3539 - message = result.get('message') or result.get('detail') - code = result.get('error_code') - server_traceback = result.get('server_traceback') - self._prepare_and_raise_exception( - message=message, - error_code=code, - status_code=response.status_code, - server_traceback=server_traceback, - response=response) - - @staticmethod - def _prepare_and_raise_exception(message, - error_code, - status_code, - server_traceback=None, - response=None): - - error = exceptions.ERROR_MAPPING.get(error_code, - exceptions.CloudifyClientError) - raise error(message, server_traceback, - status_code, error_code=error_code, response=response) - - def verify_response_status(self, response, expected_code=200): - if response.status_code != expected_code: - self._raise_client_error(response) - - def _do_request(self, requests_method, request_url, body, params, headers, - expected_status_code, stream, verify, timeout): + def _is_fileserver_download(self, response): + """Is this response a file-download response? + + 404 responses to requests that download files, need to be retried + with all managers in the cluster: if some file was not yet + replicated, another manager might have this file. + + This is because the file replication is asynchronous. + """ + # str() the url because sometimes (aiohttp) it is a URL object + if re.search('/(blueprints|snapshots)/', str(response.url)): + return True + disposition = response.headers.get('Content-Disposition') + if not disposition: + return False + return disposition.strip().startswith('attachment') + + +class HTTPClient(HTTPClientBase): + def __init__(self, *args, **kwargs): + session = kwargs.pop('session', None) + super().__init__(*args, **kwargs) + + if session is None: + session = requests.Session() + self._session = session + + def do_request( + self, + method, + uri, + data, + params, + headers, + expected_status_code, + stream, + timeout, + wrapper, + versioned_url=True, + ): """Run a requests method. :param request_method: string choosing the method, eg "get" or "post" :param request_url: the URL to run the request against - :param body: request body, as a string + :param data: request data, dict or string :param params: querystring parameters, as a dict :param headers: request headers, as a dict :param expected_status_code: check that the response is this @@ -188,21 +349,76 @@ def _do_request(self, requests_method, request_url, body, params, headers, :param verify: the CA cert path :param timeout: request timeout or a (connect, read) timeouts pair """ - auth = None - if self.has_kerberos() and not self.has_auth_header(): - if HTTPKerberosAuth is None: - raise exceptions.CloudifyClientError( - 'Trying to create a client with kerberos, ' - 'but kerberos_env does not exist') - auth = HTTPKerberosAuth() - response = requests_method(request_url, - data=body, - params=params, - headers=headers, - stream=stream, - verify=verify, - timeout=timeout or self.default_timeout_sec, - auth=auth) + requests_method = getattr(self._session, method.lower(), None) + if requests_method is None: + raise RuntimeError(f'Unknown method: {method}') + + copied_data = None + if isinstance(data, types.GeneratorType): + copied_data = itertools.tee(data, self.retries) + elif isinstance(data, dict): + data = json.dumps(data) + + errors = {} + for retry in range(self.retries): + manager_to_try = self.get_host() + request_url = self.get_request_url( + manager_to_try, + uri, + versioned=versioned_url, + ) + if copied_data is not None: + data = copied_data[retry] + try: + response = requests_method( + request_url, + data=data, + params=params, + headers=headers, + stream=stream, + verify=self.get_request_verify(), + timeout=timeout or self.default_timeout_sec, + auth=self.auth, + ) + except requests.exceptions.SSLError as e: + errors[manager_to_try] = exceptions.format_ssl_error(e) + self.logger.debug( + 'HTTP Client error: %s %s: %s', method, uri, e) + continue + except requests.exceptions.ConnectionError as e: + errors[manager_to_try] = exceptions.format_connection_error(e) + self.logger.debug( + 'HTTP Client error: %s %s: %s', method, uri, e) + continue + except exceptions.CloudifyClientError as e: + self.logger.debug( + 'HTTP Client error: %s %s: %s', method, uri, e) + errors[manager_to_try] = e.status_code + if e.response.status_code == 502: + continue + if e.response.status_code == 404 and \ + self._is_fileserver_download(e.response): + continue + else: + raise + + return self.process_response( + response, + expected_status_code, + stream, + wrapper, + ) + mgr_errors = ', '.join(f'{host}: {e}' for host, e in errors.items()) + raise exceptions.CloudifyClientError( + f'HTTP Client error: {method} {uri} ({mgr_errors})') + + def process_response( + self, + response, + expected_status_code, + stream, + wrapper + ): if self.logger.isEnabledFor(logging.DEBUG): for hdr, hdr_content in response.request.headers.items(): self.logger.debug('request header: %s: %s', hdr, hdr_content) @@ -214,7 +430,8 @@ def _do_request(self, requests_method, request_url, body, params, headers, if isinstance(expected_status_code, numbers.Number): expected_status_code = [expected_status_code] if response.status_code not in expected_status_code: - self._raise_client_error(response, request_url) + raise exceptions.CloudifyClientError.from_response( + response, response.status_code, response.content) if response.status_code == 204: return None @@ -227,6 +444,8 @@ def _do_request(self, requests_method, request_url, body, params, headers, if response.history: response_json['history'] = response.history + if wrapper: + return wrapper(response_json) return response_json def get_request_verify(self): @@ -239,152 +458,6 @@ def get_request_verify(self): # verify the certificate return True - def do_request(self, - requests_method, - uri, - data=None, - params=None, - headers=None, - expected_status_code=200, - stream=False, - versioned_url=True, - timeout=None): - if versioned_url: - request_url = '{0}{1}'.format(self.url, uri) - else: - # remove version from url ending - url = self.url.rsplit('/', 1)[0] - request_url = '{0}{1}'.format(url, uri) - - # build headers - headers = headers or {} - total_headers = self.headers.copy() - total_headers.update(headers) - - # build query params - params = params or {} - total_params = self.query_params.copy() - total_params.update(params) - - # data is either dict, bytes data or None - is_dict_data = isinstance(data, dict) - body = json.dumps(data) if is_dict_data else data - if self.logger.isEnabledFor(logging.DEBUG): - log_message = 'Sending request: {0} {1}'.format( - requests_method.__name__.upper(), - request_url) - if is_dict_data: - log_message += '; body: {0}'.format(body) - elif data is not None: - log_message += '; body: bytes data' - self.logger.debug(log_message) - try: - return self._do_request( - requests_method=requests_method, request_url=request_url, - body=body, params=total_params, headers=total_headers, - expected_status_code=expected_status_code, stream=stream, - verify=self.get_request_verify(), timeout=timeout) - except requests.exceptions.SSLError as e: - # Special handling: SSL Verification Error. - # We'd have liked to use `__context__` but this isn't supported in - # Py26, so as long as we support Py26, we need to go about this - # awkwardly. - if len(e.args) > 0 and 'CERTIFICATE_VERIFY_FAILED' in str( - e.args[0]): - raise requests.exceptions.SSLError( - 'Certificate verification failed; please ensure that the ' - 'certificate presented by Cloudify Manager is trusted ' - '(underlying reason: {0})'.format(e)) - raise requests.exceptions.SSLError( - 'An SSL-related error has occurred. This can happen if the ' - 'specified REST certificate does not match the certificate on ' - 'the manager. Underlying reason: {0}'.format(e)) - except requests.exceptions.ConnectionError as e: - raise requests.exceptions.ConnectionError( - '{0}' - '\nAn error occurred when trying to connect to the manager,' - 'please make sure it is online and all required ports are ' - 'open.' - '\nThis can also happen when the manager is not working with ' - 'SSL, but the client does'.format(e) - ) - - def get(self, uri, data=None, params=None, headers=None, _include=None, - expected_status_code=200, stream=False, versioned_url=True, - timeout=None): - if _include: - fields = ','.join(_include) - if not params: - params = {} - params['_include'] = fields - return self.do_request(self._session.get, - uri, - data=data, - params=params, - headers=headers, - expected_status_code=expected_status_code, - stream=stream, - versioned_url=versioned_url, - timeout=timeout) - - def put(self, uri, data=None, params=None, headers=None, - expected_status_code=200, stream=False, timeout=None): - return self.do_request(self._session.put, - uri, - data=data, - params=params, - headers=headers, - expected_status_code=expected_status_code, - stream=stream, - timeout=timeout) - - def patch(self, uri, data=None, params=None, headers=None, - expected_status_code=200, stream=False, timeout=None): - return self.do_request(self._session.patch, - uri, - data=data, - params=params, - headers=headers, - expected_status_code=expected_status_code, - stream=stream, - timeout=timeout) - - def post(self, uri, data=None, params=None, headers=None, - expected_status_code=200, stream=False, timeout=None): - return self.do_request(self._session.post, - uri, - data=data, - params=params, - headers=headers, - expected_status_code=expected_status_code, - stream=stream, - timeout=timeout) - - def delete(self, uri, data=None, params=None, headers=None, - expected_status_code=(200, 204), stream=False, timeout=None): - return self.do_request(self._session.delete, - uri, - data=data, - params=params, - headers=headers, - expected_status_code=expected_status_code, - stream=stream, - timeout=timeout) - - def _get_auth_header(self, username, password): - if not username or not password: - return None - credentials = '{0}:{1}'.format(username, password).encode('utf-8') - encoded_credentials = b64encode(credentials).decode('utf-8') - return BASIC_AUTH_PREFIX + ' ' + encoded_credentials - - def _set_header(self, key, value, log_value=True): - if not value: - return - self.headers[key] = value - value = value if log_value else '*' - self.logger.debug('Setting `%s` header: %s', key, value) - class StreamedResponse(object): @@ -409,11 +482,25 @@ class CloudifyClient(object): """Cloudify's management client.""" client_class = HTTPClient - def __init__(self, host='localhost', port=None, protocol=DEFAULT_PROTOCOL, - api_version=DEFAULT_API_VERSION, headers=None, - query_params=None, cert=None, trust_all=False, - username=None, password=None, token=None, tenant=None, - kerberos_env=None, timeout=None, session=None): + def __init__( + self, + host='localhost', + port=None, + protocol=DEFAULT_PROTOCOL, + api_version=DEFAULT_API_VERSION, + headers=None, + query_params=None, + cert=None, + trust_all=False, + username=None, + password=None, + token=None, + tenant=None, + kerberos_env=None, + timeout=None, + session=None, + retries=None, + ): """ Creates a Cloudify client with the provided host and optional port. @@ -434,22 +521,38 @@ def __init__(self, host='localhost', port=None, protocol=DEFAULT_PROTOCOL, :param timeout: Requests timeout value. If not set, will default to (5, None)- 5 seconds connect timeout, no read timeout. :param session: a requests.Session to use for all HTTP calls + :param retries: requests that fail with a connection error will be + retried this many times + :param retry_interval: wait this many seconds between retries :return: Cloudify client instance. """ if not port: if protocol == SECURED_PROTOCOL: - # SSL port = SECURED_PORT else: port = DEFAULT_PORT self.host = host - self._client = self.client_class(host, port, protocol, api_version, - headers, query_params, cert, - trust_all, username, password, - token, tenant, kerberos_env, timeout, - session) + self._client = self.client_class( + host=host, + port=port, + protocol=protocol, + api_version=api_version, + headers=headers, + query_params=query_params, + cert=cert, + trust_all=trust_all, + username=username, + password=password, + token=token, + tenant=tenant, + kerberos_env=kerberos_env, + timeout=timeout, + session=session, + retries=retries, + ) + self.blueprints = BlueprintsClient(self._client) self.idp = IdentityProviderClient(self._client) self.permissions = PermissionsClient(self._client) @@ -493,7 +596,4 @@ def __init__(self, host='localhost', port=None, protocol=DEFAULT_PROTOCOL, self.blueprints_labels = BlueprintsLabelsClient(self._client) self.workflows = WorkflowsClient(self._client) self.community_contacts = CommunityContactsClient(self._client) - if AuditLogAsyncClient is None: - self.auditlog = AuditLogClient(self._client) - else: - self.auditlog = AuditLogAsyncClient(self._client) + self.auditlog = AuditLogClient(self._client) diff --git a/cloudify_rest_client/exceptions.py b/cloudify_rest_client/exceptions.py index 35ee790d2..f79045c5e 100644 --- a/cloudify_rest_client/exceptions.py +++ b/cloudify_rest_client/exceptions.py @@ -13,8 +13,67 @@ # * See the License for the specific language governing permissions and # * limitations under the License. +import json +import requests.exceptions + + +def format_ssl_error(e): + # Special handling: SSL Verification Error. + # We'd have liked to use `__context__` but this isn't supported in + # Py26, so as long as we support Py26, we need to go about this + # awkwardly. + if len(e.args) > 0 and 'CERTIFICATE_VERIFY_FAILED' in str( + e.args[0]): + return requests.exceptions.SSLError( + 'Certificate verification failed; please ensure that the ' + 'certificate presented by Cloudify Manager is trusted ' + '(underlying reason: {0})'.format(e)) + return requests.exceptions.SSLError( + 'An SSL-related error has occurred. This can happen if the ' + 'specified REST certificate does not match the certificate on ' + 'the manager. Underlying reason: {0}'.format(e)) + + +def format_connection_error(e): + return requests.exceptions.ConnectionError( + '{0}' + '\nAn error occurred when trying to connect to the manager,' + 'please make sure it is online and all required ports are ' + 'open.' + '\nThis can also happen when the manager is not working with ' + 'SSL, but the client does'.format(e) + ) + class CloudifyClientError(Exception): + @classmethod + def from_response(cls, response, status, response_content): + try: + result = json.loads(response_content.decode('utf-8')) + except Exception: + if status == 304: + return NotModifiedError( + message='Nothing to modify', + error_code=NotModifiedError.ERROR_CODE, + status_code=status, + ) + return cls( + message=f'{status}: {response_content}', + status_code=status, + response=response, + ) + + message = result.get('message') or result.get('detail') + code = result.get('error_code') + server_traceback = result.get('server_traceback') + error_cls = ERROR_MAPPING.get(code, cls) + return error_cls( + message, + server_traceback=server_traceback, + status_code=status, + error_code=code, + response=response, + ) def __init__(self, message, server_traceback=None, status_code=-1, error_code=None, response=None): @@ -282,8 +341,8 @@ def __init__(self, message, server_traceback=None, status_code=-1, error_code=None, response=None): super(InvalidFilterRule, self).__init__( message, server_traceback, status_code, error_code, response) - self.err_filter_rule = response.json().get('err_filter_rule') - self.err_reason = response.json().get('err_reason') + self.err_filter_rule = response.json.get('err_filter_rule') + self.err_reason = response.json.get('err_reason') class DeploymentParentNotFound(CloudifyClientError): From 7e80c3f3d49624313528cf7e475d14c3aa728938 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Maksymczuk?= Date: Tue, 13 Dec 2022 10:43:59 +0100 Subject: [PATCH 2/5] Rewrite the async client too Now the async client is more of a thing of its own. Similar to how you do CloudifyClient(), you can now also use AsyncCloudifyClient(), and that is going to be very similar to the usual client, except.. well, async. Not all endpoints are available yet. Some of them I couldn't easily translate :( And so, community_contacts, deployment_groups, and log_bundles, are just not available for now. Still pretty good. --- cloudify_async_client/__init__.py | 4 +- cloudify_async_client/audit_log.py | 23 +-- cloudify_async_client/client.py | 217 +++++++++++++++++++++++------ 3 files changed, 187 insertions(+), 57 deletions(-) diff --git a/cloudify_async_client/__init__.py b/cloudify_async_client/__init__.py index 12b968e40..c49e0eaa0 100644 --- a/cloudify_async_client/__init__.py +++ b/cloudify_async_client/__init__.py @@ -1 +1,3 @@ -from cloudify_async_client.client import CloudifyAsyncClient # noqa +from cloudify_async_client.client import AsyncCloudifyClient + +__all__ = ['AsyncCloudifyClient'] diff --git a/cloudify_async_client/audit_log.py b/cloudify_async_client/audit_log.py index 3ec5336f1..da1c661ca 100644 --- a/cloudify_async_client/audit_log.py +++ b/cloudify_async_client/audit_log.py @@ -1,4 +1,3 @@ -from cloudify_async_client import CloudifyAsyncClient from cloudify_rest_client.audit_log import AuditLogClient @@ -14,20 +13,10 @@ async def stream(self, timeout=None, **kwargs): :return: ``ListResponse`` with of ``AuditLog`` items and response metadata. """ - client = await self.async_client() - response = await client.get('audit/stream', - params=kwargs, - timeout=timeout) - return response - - async def async_client(self): - headers = self.api.headers.copy() - headers.update({'Content-type': 'text/event-stream'}) - client = CloudifyAsyncClient( - host=self.api.host, - port=self.api.port, - protocol=self.api.protocol, - cert=self.api.cert, - headers=headers, + response = await self.api.get( + '/audit/stream', + params=kwargs, + timeout=timeout, + stream=True, ) - return client + return response diff --git a/cloudify_async_client/client.py b/cloudify_async_client/client.py index 03a293b1c..114675916 100644 --- a/cloudify_async_client/client.py +++ b/cloudify_async_client/client.py @@ -1,44 +1,183 @@ +import json +import itertools +import logging +import numbers import ssl +import types +import aiohttp.client_exceptions -class CloudifyAsyncClient: - host: str - port: int - protocol: str - headers: dict - cert: str - - def __init__(self, **kwargs): - # only import aiohttp if this is used - otherwise we pay the price - # on every import of the rest-client - import aiohttp - self._aiohttp = aiohttp - - self.host = kwargs.pop('host', 'localhost') - self.port = kwargs.pop('port', 443) - self.protocol = kwargs.pop('protocol', 'https') - self.headers = kwargs.pop('headers', {}) - self.cert = kwargs.pop('cert') - self.ssl = ssl.create_default_context(cafile=self.cert) - self.api_version = 'v3.1' - self.session = self._aiohttp.ClientSession(headers=self.headers) +from cloudify_rest_client import exceptions +from cloudify_rest_client.client import HTTPClientBase, CloudifyClient + +from cloudify_async_client.audit_log import AuditLogAsyncClient + + +class AsyncHTTPClient(HTTPClientBase): + def __init__(self, *args, **kwargs): + session = kwargs.pop('session', None) + timeout = kwargs.pop('timeout', None) + super().__init__(*args, **kwargs) + # can't use base class' timeout because it's a tuple there, and + # aiohttp needs a single int + self.default_timeout_sec = timeout or 5 + + self._aiohttp = None + if session is None: + session = self.aiohttp.ClientSession(headers=self.headers) + self._session = session + + if self.trust_all: + self.ssl = ssl.create_default_context() + self.ssl.check_hostname = False + self.ssl.verify_mode = ssl.CERT_NONE + else: + self.ssl = ssl.create_default_context(cafile=self.cert) @property - def url(self): - return '{0}://{1}:{2}/api/{3}'.format(self.protocol, self.host, - self.port, self.api_version) - - def get(self, url, params=None, timeout=300, **kwargs): - if isinstance(timeout, int) or isinstance(timeout, float): - timeout = self._aiohttp.ClientTimeout(total=timeout) - - if params: - # Format query parameters and pass params only if it is not empty - p = {k: str(v) for k, v in params.items() if v is not None} - if p: - kwargs['params'] = p - - return self.session.get(f"{self.url}/{url}", - ssl=self.ssl, - timeout=timeout, - **kwargs) + def aiohttp(self): + if self._aiohttp is None: + import aiohttp + self._aiohttp = aiohttp + return self._aiohttp + + async def do_request( + self, + method, + uri, + data, + params, + headers, + expected_status_code, + stream, + verify, + timeout, + wrapper, + versioned_url=True, + ): + session_method = getattr(self._session, method.lower(), None) + if session_method is None: + raise RuntimeError(f'Unknown method: {method}') + + copied_data = None + if isinstance(data, types.GeneratorType): + copied_data = itertools.tee(data, self.retries) + elif isinstance(data, dict): + data = json.dumps(data) + + errors = {} + for retry in range(self.retries): + manager_to_try = self.get_host() + request_url = self.get_request_url( + manager_to_try, + uri, + versioned=versioned_url, + ) + if copied_data is not None: + data = copied_data[retry] + try: + response = await session_method( + request_url, + data=data, + params=params, + headers=headers, + ssl=self.ssl, + timeout=timeout or self.default_timeout_sec, + auth=self.auth, + ) + return await self.process_response( + response, + expected_status_code, + stream, + wrapper, + ) + + except aiohttp.client_exceptions.ClientSSLError as e: + errors[manager_to_try] = exceptions.format_ssl_error(e) + self.logger.debug( + 'HTTP Client error: %s %s: %s', method, uri, e) + continue + except aiohttp.client_exceptions.ClientConnectionError as e: + errors[manager_to_try] = exceptions.format_connection_error(e) + self.logger.debug( + 'HTTP Client error: %s %s: %s', method, uri, e) + continue + except exceptions.CloudifyClientError as e: + self.logger.debug( + 'HTTP Client error: %s %s: %s', method, uri, e) + errors[manager_to_try] = e.status_code + if e.status_code == 502: + continue + if e.status_code == 404 and \ + self._is_fileserver_download(e.response): + continue + else: + raise + + mgr_errors = ', '.join(f'{host}: {e}' for host, e in errors.items()) + raise exceptions.CloudifyClientError( + f'HTTP Client error: {method} {uri} ({mgr_errors})') + + async def process_response( + self, + response, + expected_status_code, + stream, + wrapper + ): + if self.logger.isEnabledFor(logging.DEBUG): + for hdr, hdr_content in response.request_info.headers.items(): + self.logger.debug('request header: %s: %s', hdr, hdr_content) + self.logger.debug('reply: "%s %s" %s', response.status, + response.reason, response.content) + for hdr, hdr_content in response.headers.items(): + self.logger.debug('response header: %s: %s', hdr, hdr_content) + + if isinstance(expected_status_code, numbers.Number): + expected_status_code = [expected_status_code] + if response.status not in expected_status_code: + raise exceptions.CloudifyClientError.from_response( + response, response.status, response.request_info.url) + + if response.status == 204: + return None + + if stream: + return response + + response_json = await response.json() + + if response.history: + response_json['history'] = response.history + + if wrapper: + return wrapper(response_json) + await response.close() + + return response_json + + async def close(self): + await self._session.close() + + +class AsyncCloudifyClient(CloudifyClient): + client_class = AsyncHTTPClient + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.auditlog = AuditLogAsyncClient(self._client) + + @property + def community_contacts(self): + raise RuntimeError('async client does not support community_contacts') + + @property + def deployment_groups(self): + raise RuntimeError('async client does not support deployment_groups') + + @property + def log_bundles(self): + raise RuntimeError('async client does not support log_bundles') + + def close(self): + return self._client.close() From a65adac36deb5c288e23efec94f772061625a54c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Maksymczuk?= Date: Tue, 13 Dec 2022 10:45:08 +0100 Subject: [PATCH 3/5] Shorthand for constructing ListResponses For example, `wrapper = Deployment; wrapper(resp)` creates a Deployment object from a response. Similarly, now `wrapper = ListResponse.of(Deployment); wrapper(resp)` creates a list of deployments from a response. --- cloudify_rest_client/responses.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/cloudify_rest_client/responses.py b/cloudify_rest_client/responses.py index ea7957181..aca9d999a 100644 --- a/cloudify_rest_client/responses.py +++ b/cloudify_rest_client/responses.py @@ -58,6 +58,12 @@ def total(self): class ListResponse(object): + @classmethod + def of(cls, item_cls): + return lambda response_json: cls( + items=[item_cls(item) for item in response_json.get('items', [])], + metadata=response_json.get('metadata', {}), + ) def __init__(self, items, metadata): self.items = items From 10b5671bbf924c63cd859878726041a73c5dae10 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Maksymczuk?= Date: Tue, 13 Dec 2022 10:48:56 +0100 Subject: [PATCH 4/5] Use wrappers & returns in all clients In every client, replace this: ```python response = self.api.get(...) return Deployment(response) ``` with this: ```python return self.api.get(..., wrapper=Deployment) ``` Now there's a trick! Since we always just return the result of `self.api.x()`, all the specific client functions (BlueprintsClient.get, DeploymentsClient.get, etc. that's what I mean by "specific") don't really care whether they're sync or async. If self.api.get returns a future, we'll just return it from here directly. Very few endpoints were not really possible to be rewritten easily like that, so I've left TODO comments in there. Hopefully we can get back to them at some point. --- cloudify_rest_client/agents.py | 41 ++--- cloudify_rest_client/audit_log.py | 19 ++- cloudify_rest_client/blueprints.py | 65 ++++---- cloudify_rest_client/bytes_stream_utils.py | 2 +- .../deployment_modifications.py | 41 +++-- cloudify_rest_client/deployment_updates.py | 109 +++---------- cloudify_rest_client/deployments.py | 113 ++++++++------ cloudify_rest_client/evaluate.py | 15 +- cloudify_rest_client/events.py | 51 ++---- cloudify_rest_client/execution_schedules.py | 52 ++++--- cloudify_rest_client/executions.py | 125 ++++++++------- cloudify_rest_client/filters.py | 32 ++-- .../inter_deployment_dependencies.py | 50 +++--- cloudify_rest_client/labels.py | 21 ++- cloudify_rest_client/ldap.py | 3 +- cloudify_rest_client/license.py | 16 +- cloudify_rest_client/log_bundles.py | 27 ++-- cloudify_rest_client/maintenance.py | 9 +- cloudify_rest_client/manager.py | 146 ++++++++++-------- cloudify_rest_client/node_instances.py | 66 ++++---- cloudify_rest_client/nodes.py | 60 +++---- cloudify_rest_client/operations.py | 61 +++++--- cloudify_rest_client/permissions.py | 13 +- cloudify_rest_client/plugins.py | 74 +++++---- cloudify_rest_client/plugins_update.py | 44 +++--- cloudify_rest_client/secrets.py | 59 ++++--- cloudify_rest_client/secrets_providers.py | 37 ++--- cloudify_rest_client/sites.py | 32 ++-- cloudify_rest_client/snapshots.py | 38 +++-- cloudify_rest_client/summary.py | 4 +- cloudify_rest_client/tenants.py | 41 ++--- cloudify_rest_client/tests/__init__.py | 20 ++- cloudify_rest_client/tests/test_tokens.py | 2 +- cloudify_rest_client/tokens.py | 18 +-- cloudify_rest_client/user_groups.py | 46 +++--- cloudify_rest_client/users.py | 69 +++++---- cloudify_rest_client/workflows.py | 18 ++- 37 files changed, 863 insertions(+), 776 deletions(-) diff --git a/cloudify_rest_client/agents.py b/cloudify_rest_client/agents.py index d7832b5fa..af3c1b5b9 100644 --- a/cloudify_rest_client/agents.py +++ b/cloudify_rest_client/agents.py @@ -140,7 +140,6 @@ class AgentsClient(object): def __init__(self, api): self.api = api self._uri_prefix = 'agents' - self._wrapper_cls = Agent def list(self, _include=None, sort=None, is_descending=False, **kwargs): """List the agents installed from the manager. @@ -154,12 +153,11 @@ def list(self, _include=None, sort=None, is_descending=False, **kwargs): if sort: kwargs['_sort'] = '-' + sort if is_descending else sort - response = self.api.get('/{self._uri_prefix}'.format(self=self), - _include=_include, - params=kwargs) - return ListResponse( - [self._wrapper_cls(item) for item in response['items']], - response['metadata'] + return self.api.get( + '/{self._uri_prefix}'.format(self=self), + _include=_include, + params=kwargs, + wrapper=ListResponse.of(Agent), ) def get(self, name): @@ -168,8 +166,10 @@ def get(self, name): :param name: The name of the agent :return: The details of the agent """ - response = self.api.get('/{0}/{1}'.format(self._uri_prefix, name)) - return self._wrapper_cls(response) + return self.api.get( + '/{0}/{1}'.format(self._uri_prefix, name), + wrapper=Agent, + ) def create(self, name, node_instance_id, state=AgentState.CREATING, create_rabbitmq_user=True, **kwargs): @@ -185,9 +185,11 @@ def create(self, name, node_instance_id, state=AgentState.CREATING, 'state': state, 'create_rabbitmq_user': create_rabbitmq_user} data.update(kwargs) - response = self.api.put('/{0}/{1}'.format(self._uri_prefix, name), - data=data) - return self._wrapper_cls(response) + return self.api.put( + '/{0}/{1}'.format(self._uri_prefix, name), + data=data, + wrapper=Agent, + ) def update(self, name, state): """Update agent with the provided state. @@ -197,9 +199,11 @@ def update(self, name, state): :return: The updated agent """ data = {'state': state} - response = self.api.patch('/{0}/{1}'.format(self._uri_prefix, name), - data=data) - return self._wrapper_cls(response) + return self.api.patch( + '/{0}/{1}'.format(self._uri_prefix, name), + data=data, + wrapper=Agent, + ) def replace_ca_certs(self, bundle, @@ -227,6 +231,7 @@ def replace_ca_certs(self, 'manager_ca_cert': manager_ca_cert_str } - response = self.api.patch('/' + self._uri_prefix, data=data) - - return response + return self.api.patch( + '/' + self._uri_prefix, + data=data, + ) diff --git a/cloudify_rest_client/audit_log.py b/cloudify_rest_client/audit_log.py index f592efdf9..51aa4a14b 100644 --- a/cloudify_rest_client/audit_log.py +++ b/cloudify_rest_client/audit_log.py @@ -68,11 +68,11 @@ def list(self, get_all=False, **kwargs): if get_all: params['size'] = 0 params['offset'] = 0 - response = self.api.get('/audit', params=params) - - return ListResponse( - [AuditLog(item) for item in response['items']], - response['metadata']) + return self.api.get( + '/audit', + params=params, + wrapper=ListResponse.of(AuditLog), + ) def delete(self, **kwargs): """Delete (some) of the AuditLogs. @@ -82,12 +82,15 @@ def delete(self, **kwargs): :return: DeletedResponse describing deletion outcome - a number of 'deleted' records. """ - response = self.api.delete('/audit', params=kwargs) - return DeletedResponse(**response) + return self.api.delete( + '/audit', + params=kwargs, + wrapper=DeletedResponse, + ) def inject(self, logs): """Inject audit logs. Intended for internal use only. :param logs: List of dict log entries to inject. """ - self.api.post('/audit', data=logs) + return self.api.post('/audit', data=logs) diff --git a/cloudify_rest_client/blueprints.py b/cloudify_rest_client/blueprints.py index 314d06342..4955349ee 100644 --- a/cloudify_rest_client/blueprints.py +++ b/cloudify_rest_client/blueprints.py @@ -118,7 +118,6 @@ class BlueprintsClient(object): def __init__(self, api): self.api = api self._uri_prefix = 'blueprints' - self._wrapper_cls = Blueprint def _prepare_put_request( self, @@ -254,6 +253,7 @@ def callback_wrapper(watcher): data=multipart, headers={'Content-Type': multipart.content_type}, expected_status_code=expected_status, + wrapper=Blueprint, ) def _validate(self, @@ -330,18 +330,25 @@ def list(self, _include=None, sort=None, is_descending=False, params['_filter_id'] = filter_id if filter_rules: - response = self.api.post('/searches/blueprints', params=params, - data={'filter_rules': filter_rules}) + return self.api.post( + '/searches/blueprints', + params=params, + data={'filter_rules': filter_rules}, + wrapper=ListResponse.of(Blueprint), + ) elif constraints: - response = self.api.post('/searches/blueprints', params=params, - data={'constraints': constraints}) + return self.api.post( + '/searches/blueprints', + params=params, + data={'constraints': constraints}, + wrapper=ListResponse.of(Blueprint), + ) else: - response = self.api.get('/{self._uri_prefix}'.format(self=self), - params=params) - return ListResponse( - [self._wrapper_cls(item) for item in response['items']], - response['metadata'] - ) + return self.api.get( + '/{self._uri_prefix}'.format(self=self), + params=params, + wrapper=ListResponse.of(Blueprint), + ) def publish_archive( self, @@ -378,7 +385,7 @@ def publish_archive( blueprint's unique Id. """ - response = self._upload( + return self._upload( archive_location, blueprint_id=blueprint_id, application_file_name=blueprint_filename, @@ -391,8 +398,6 @@ def publish_archive( skip_execution=skip_execution, requirements=requirements, ) - if not async_upload: - return self._wrapper_cls(response) @staticmethod def calc_size(blueprint_path): @@ -448,7 +453,7 @@ def upload( tar_path, application_file = self._validate_blueprint_size( path, tempdir, skip_size_limit) - response = self._upload( + return self._upload( tar_path, blueprint_id=entity_id, application_file_name=application_file, @@ -463,8 +468,6 @@ def upload( legacy=legacy, requirements=requirements, ) - if not async_upload: - return self._wrapper_cls(response) finally: shutil.rmtree(tempdir) @@ -501,7 +504,7 @@ def validate(self, tar_path, application_file = self._validate_blueprint_size( path, tempdir, skip_size_limit) - response = self._validate( + return self._validate( tar_path or path, blueprint_id=entity_id, application_file_name=application_file or blueprint_filename, @@ -510,10 +513,6 @@ def validate(self, finally: shutil.rmtree(tempdir) - if response: - # on cloudify earlier than 6.4, response is None (204 no content) - return response - def get(self, blueprint_id, _include=None): """ Gets a blueprint by its id. @@ -524,8 +523,11 @@ def get(self, blueprint_id, _include=None): """ assert blueprint_id uri = '/{self._uri_prefix}/{id}'.format(self=self, id=blueprint_id) - response = self.api.get(uri, _include=_include) - return self._wrapper_cls(response) + return self.api.get( + uri, + _include=_include, + wrapper=Blueprint, + ) def delete(self, blueprint_id, force=False): """ @@ -538,7 +540,7 @@ def delete(self, blueprint_id, force=False): """ assert blueprint_id - self.api.delete( + return self.api.delete( '/{self._uri_prefix}/{id}'.format(self=self, id=blueprint_id), params={'force': force}) @@ -607,13 +609,12 @@ def update(self, blueprint_id, update_dict): :param update_dict: Dictionary of attributes and values to be updated. :return: The updated blueprint. """ - response = self.api.patch('/{self._uri_prefix}/{id}'.format( + return self.api.patch('/{self._uri_prefix}/{id}'.format( self=self, id=blueprint_id), - data=update_dict + data=update_dict, + wrapper=Blueprint, ) - return self._wrapper_cls(response) - def upload_archive(self, blueprint_id, archive_path): """ Upload an archive for an existing a blueprint. @@ -630,7 +631,7 @@ def upload_archive(self, blueprint_id, archive_path): archive_data = bytes_stream_utils.request_data_file_stream( archive_path, client=self.api) - self.api.put('/{self._uri_prefix}/{id}/archive'.format( + return self.api.put('/{self._uri_prefix}/{id}/archive'.format( self=self, id=blueprint_id), data=archive_data ) @@ -646,7 +647,7 @@ def upload_icon(self, blueprint_id, icon_path): icon_data = bytes_stream_utils.request_data_file_stream( icon_path, client=self.api) - self.api.patch('/{self._uri_prefix}/{id}/icon'.format( + return self.api.patch('/{self._uri_prefix}/{id}/icon'.format( self=self, id=blueprint_id), data=icon_data ) @@ -657,6 +658,6 @@ def remove_icon(self, blueprint_id): :param blueprint_id: Blueprint's id to update. """ - self.api.patch('/{self._uri_prefix}/{id}/icon'.format( + return self.api.patch('/{self._uri_prefix}/{id}/icon'.format( self=self, id=blueprint_id), ) diff --git a/cloudify_rest_client/bytes_stream_utils.py b/cloudify_rest_client/bytes_stream_utils.py index 6eaeafe85..822da7ca8 100644 --- a/cloudify_rest_client/bytes_stream_utils.py +++ b/cloudify_rest_client/bytes_stream_utils.py @@ -15,7 +15,7 @@ def request_data_file_stream(file_path, :param progress_callback: Callback function - can be used to print progress :return: File data or generator object """ - if client and client.has_kerberos() and not client.has_auth_header(): + if client and client.has_kerberos and not client.has_auth_header(): # kerberos currently does not support chunks with open(file_path, 'rb') as f: data = f.read() diff --git a/cloudify_rest_client/deployment_modifications.py b/cloudify_rest_client/deployment_modifications.py index 7851f0d76..412d55b19 100644 --- a/cloudify_rest_client/deployment_modifications.py +++ b/cloudify_rest_client/deployment_modifications.py @@ -131,9 +131,12 @@ def list(self, deployment_id=None, _include=None, **kwargs): params.update(kwargs) uri = '/deployment-modifications' - response = self.api.get(uri, params=params, _include=_include) - items = [DeploymentModification(item) for item in response['items']] - return ListResponse(items, response['metadata']) + return self.api.get( + uri, + params=params, + _include=_include, + wrapper=ListResponse.of(DeploymentModification), + ) def start(self, deployment_id, nodes, context=None): """Start deployment modification. @@ -152,19 +155,23 @@ def start(self, deployment_id, nodes, context=None): if context is not None: data['context'] = context - uri = '/deployment-modifications' - response = self.api.post(uri, data, - expected_status_code=201) - return DeploymentModification(response) + return self.api.post( + '/deployment-modifications', + data, + expected_status_code=201, + wrapper=DeploymentModification, + ) def get(self, modification_id, _include=None): """Get deployment modification :param modification_id: The modification id """ - uri = '/deployment-modifications/{0}'.format(modification_id) - response = self.api.get(uri, _include=_include) - return DeploymentModification(response) + return self.api.get( + '/deployment-modifications/{0}'.format(modification_id), + _include=_include, + wrapper=DeploymentModification, + ) def finish(self, modification_id): """Finish deployment modification @@ -173,9 +180,10 @@ def finish(self, modification_id): """ assert modification_id - uri = '/deployment-modifications/{0}/finish'.format(modification_id) - response = self.api.post(uri) - return DeploymentModification(response) + return self.api.post( + '/deployment-modifications/{0}/finish'.format(modification_id), + wrapper=DeploymentModification, + ) def rollback(self, modification_id): """Rollback deployment modification @@ -184,6 +192,7 @@ def rollback(self, modification_id): """ assert modification_id - uri = '/deployment-modifications/{0}/rollback'.format(modification_id) - response = self.api.post(uri) - return DeploymentModification(response) + return self.api.post( + '/deployment-modifications/{0}/rollback'.format(modification_id), + wrapper=DeploymentModification, + ) diff --git a/cloudify_rest_client/deployment_updates.py b/cloudify_rest_client/deployment_updates.py index 18a7a7566..494efa083 100644 --- a/cloudify_rest_client/deployment_updates.py +++ b/cloudify_rest_client/deployment_updates.py @@ -13,16 +13,6 @@ # * See the License for the specific language governing permissions and # * limitations under the License. -import os -import json -import shutil -import tempfile -from urllib.parse import quote as urlquote, urlparse -from urllib.request import pathname2url - -from mimetypes import MimeTypes - -from cloudify_rest_client import utils from cloudify_rest_client.responses import ListResponse @@ -96,8 +86,7 @@ def create(self, update_id, deployment_id, **kwargs): 'deployment_id': deployment_id, } data.update(kwargs) - response = self.api.put(url, data=data) - return DeploymentUpdate(response) + return self.api.put(url, data=data, wrapper=DeploymentUpdate) def set_attributes(self, update_id, **kwargs): """Update a deployment-update object with the given attributes. @@ -105,8 +94,10 @@ def set_attributes(self, update_id, **kwargs): This is only useful from within the deployment-update workflow. Do not use this otherwise. """ - url = '/deployment-updates/{0}'.format(update_id) - self.api.patch(url, data=kwargs) + return self.api.patch( + '/deployment-updates/{0}'.format(update_id), + data=kwargs, + ) def list(self, _include=None, sort=None, is_descending=False, **kwargs): """List deployment updates @@ -123,79 +114,20 @@ def list(self, _include=None, sort=None, is_descending=False, **kwargs): if sort: params['_sort'] = '-' + sort if is_descending else sort - response = self.api.get(uri, params=params, _include=_include) - items = [DeploymentUpdate(item) for item in response['items']] - return ListResponse(items, response['metadata']) + return self.api.get( + uri, + params=params, + _include=_include, + wrapper=ListResponse.of(DeploymentUpdate), + ) def bulk_insert(self, updates): """Bulk insert deployment updates. For internal use only.""" - uri = '/deployment-updates' - self.api.post(uri, {'deployment_updates': updates}, - expected_status_code=[201, 204]) - - def _update_from_blueprint(self, - deployment_id, - blueprint_path, - inputs=None): - """Create a deployment update transaction for blueprint app. - - :param deployment_id: The deployment id - :param blueprint_path: the path of the blueprint to stage - """ - assert deployment_id - - tempdir = tempfile.mkdtemp() - try: - tar_path = utils.tar_blueprint(blueprint_path, tempdir) - application_filename = os.path.basename(blueprint_path) - - return self._update_from_archive(deployment_id, - tar_path, - application_filename, - inputs=inputs) - finally: - shutil.rmtree(tempdir) - - @staticmethod - def _update_from_archive(deployment_id, - archive_path, - application_file_name=None, - inputs=None): - """Create a deployment update transaction for an archived app. - - :param archive_path: the path for the archived app. - :param application_file_name: the main blueprint filename. - :param deployment_id: the deployment id to update. - :return: DeploymentUpdate dict - :rtype: DeploymentUpdate - """ - assert deployment_id - - mime_types = MimeTypes() - - data_form = {} - params = {} - # all the inputs are passed through the query - if inputs: - data_form['inputs'] = ('inputs', json.dumps(inputs), 'text/plain') - - if application_file_name: - params['application_file_name'] = urlquote(application_file_name) - - # For a Windows path (e.g. "C:\aaa\bbb.zip") scheme is the - # drive letter and therefore the 2nd condition is present - if all([urlparse(archive_path).scheme, - not os.path.exists(archive_path)]): - # archive location is URL - params['blueprint_archive_url'] = archive_path - else: - data_form['blueprint_archive'] = ( - os.path.basename(archive_path), - open(archive_path, 'rb'), - # Guess the archive mime type - mime_types.guess_type(pathname2url(archive_path))) - - return data_form, params + return self.api.post( + '/deployment-updates', + {'deployment_updates': updates}, + expected_status_code=[201, 204], + ) def get(self, update_id, _include=None): """Get deployment update @@ -203,8 +135,7 @@ def get(self, update_id, _include=None): :param update_id: The update id """ uri = '/deployment-updates/{0}'.format(update_id) - response = self.api.get(uri, _include=_include) - return DeploymentUpdate(response) + return self.api.get(uri, _include=_include, wrapper=DeploymentUpdate) def update_with_existing_blueprint( self, @@ -254,8 +185,7 @@ def update_with_existing_blueprint( if reevaluate_active_statuses is not None: data['reevaluate_active_statuses'] = reevaluate_active_statuses uri = '/deployment-updates/{0}/update/initiate'.format(deployment_id) - response = self.api.post(uri, data=data) - return DeploymentUpdate(response) + return self.api.post(uri, data=data, wrapper=DeploymentUpdate) def finalize_commit(self, update_id): """Finalize the committing process @@ -266,5 +196,4 @@ def finalize_commit(self, update_id): assert update_id uri = '/deployment-updates/{0}/update/finalize'.format(update_id) - response = self.api.post(uri) - return DeploymentUpdate(response) + return self.api.post(uri, wrapper=DeploymentUpdate) diff --git a/cloudify_rest_client/deployments.py b/cloudify_rest_client/deployments.py index 2bcd4169c..ecf12bf5f 100644 --- a/cloudify_rest_client/deployments.py +++ b/cloudify_rest_client/deployments.py @@ -369,15 +369,18 @@ def list(self, _include=None, **kwargs): if _include: params['_include'] = ','.join(_include) - response = self.api.get('/deployment-groups', params=params) - return ListResponse( - [DeploymentGroup(item) for item in response['items']], - response['metadata']) + return self.api.get( + '/deployment-groups', + params=params, + wrapper=ListResponse.of(DeploymentGroup), + ) def get(self, group_id): """Get the specified deployment group.""" - response = self.api.get('/deployment-groups/{0}'.format(group_id)) - return DeploymentGroup(response) + return self.api.get( + '/deployment-groups/{0}'.format(group_id), + wrapper=DeploymentGroup, + ) def put(self, group_id, visibility=VisibilityState.TENANT, description=None, blueprint_id=None, default_inputs=None, @@ -430,10 +433,10 @@ def put(self, group_id, visibility=VisibilityState.TENANT, data['created_by'] = created_by if creation_counter: data['creation_counter'] = creation_counter - response = self.api.put( + return self.api.put( '/deployment-groups/{0}'.format(group_id), data=data, + wrapper=DeploymentGroup, ) - return DeploymentGroup(response) def add_deployments(self, group_id, deployment_ids=None, count=None, new_deployments=None, filter_id=None, @@ -474,6 +477,7 @@ def add_deployments(self, group_id, deployment_ids=None, count=None, else: batches = [new_deployments] + # TODO this is not async-friendly for new_deployments_batch in batches: response = self.api.patch( '/deployment-groups/{0}'.format(group_id), @@ -504,7 +508,7 @@ def remove_deployments(self, group_id, deployment_ids=None, group given by this id :return: the updated deployment group """ - response = self.api.patch( + return self.api.patch( '/deployment-groups/{0}'.format(group_id), data={ 'remove': { @@ -513,9 +517,9 @@ def remove_deployments(self, group_id, deployment_ids=None, 'filter_rules': filter_rules, 'deployments_from_group': deployments_from_group, } - } + }, + wrapper=DeploymentGroup, ) - return DeploymentGroup(response) def delete(self, group_id, delete_deployments=False, force=False, with_logs=False): @@ -527,7 +531,7 @@ def delete(self, group_id, delete_deployments=False, :param force: same meaning as in deployments.delete :param with_logs: same meaning as in deployments.delete """ - self.api.delete( + return self.api.delete( '/deployment-groups/{0}'.format(group_id), params={ 'delete_deployments': delete_deployments, @@ -551,8 +555,7 @@ def get(self, deployment_id): """ assert deployment_id uri = '/deployments/{0}/outputs'.format(deployment_id) - response = self.api.get(uri) - return DeploymentOutputs(response) + return self.api.get(uri, wrapper=DeploymentOutputs) class DeploymentCapabilitiesClient(object): @@ -568,8 +571,7 @@ def get(self, deployment_id): """ assert deployment_id uri = '/deployments/{0}/capabilities'.format(deployment_id) - response = self.api.get(uri) - return DeploymentCapabilities(response) + return self.api.get(uri, wrapper=DeploymentCapabilities) def list(self, deployment_id, _include=None, constraints=None, **kwargs): """ @@ -592,11 +594,11 @@ def list(self, deployment_id, _include=None, constraints=None, **kwargs): constraints = dict() constraints['deployment_id'] = deployment_id - response = self.api.post('/searches/capabilities', params=params, - data={'constraints': constraints}) - return ListResponse( - items=[DeploymentCapabilities(item) for item in response['items']], - metadata=response['metadata'] + return self.api.post( + '/searches/capabilities', + params=params, + data={'constraints': constraints}, + wrapper=ListResponse.of(DeploymentCapabilities), ) @@ -628,11 +630,11 @@ def list(self, blueprint_id=None, deployment_id=None, if _include: params['_include'] = ','.join(_include) - response = self.api.post('/searches/scaling-groups', params=params, - data={'constraints': constraints or {}}) - return ListResponse( - items=[DeploymentScalingGroup(item) for item in response['items']], - metadata=response['metadata'] + return self.api.post( + '/searches/scaling-groups', + params=params, + data={'constraints': constraints or {}}, + wrapper=ListResponse.of(DeploymentScalingGroup), ) @@ -674,16 +676,25 @@ def list(self, _include=None, sort=None, is_descending=False, params['_filter_id'] = filter_id if filter_rules: - response = self.api.post('/searches/deployments', params=params, - data={'filter_rules': filter_rules}) + return self.api.post( + '/searches/deployments', + params=params, + data={'filter_rules': filter_rules}, + wrapper=ListResponse.of(Deployment), + ) elif constraints: - response = self.api.post('/searches/deployments', params=params, - data={'constraints': constraints}) + return self.api.post( + '/searches/deployments', + params=params, + data={'constraints': constraints}, + wrapper=ListResponse.of(Deployment), + ) else: - response = self.api.get('/deployments', params=params) - - return ListResponse([Deployment(item) for item in response['items']], - response['metadata']) + return self.api.get( + '/deployments', + params=params, + wrapper=ListResponse.of(Deployment), + ) def get(self, deployment_id, @@ -704,13 +715,13 @@ def get(self, """ assert deployment_id uri = '/deployments/{0}'.format(deployment_id) - response = self.api.get( + return self.api.get( uri, _include=_include, params={'all_sub_deployments': all_sub_deployments, - 'include_workdir': include_workdir} + 'include_workdir': include_workdir}, + wrapper=Deployment, ) - return Deployment(response) def create(self, blueprint_id, @@ -822,9 +833,13 @@ def create(self, if async_create is not None: # if it's None, we just keep the server's default behaviour params['async_create'] = async_create - response = self.api.put( - uri, data, params=params, expected_status_code=201) - return Deployment(response) + return self.api.put( + uri, + data, + params=params, + expected_status_code=201, + wrapper=Deployment, + ) def delete(self, deployment_id, force=False, @@ -850,7 +865,7 @@ def delete(self, deployment_id, warnings.warn('delete_db_mode is deprecated and does nothing', DeprecationWarning) - self.api.delete( + return self.api.delete( '/deployments/{0}'.format(deployment_id), params=params) def set_visibility(self, deployment_id, visibility): @@ -900,9 +915,11 @@ def update_labels(self, deployment_id, labels, creator=None, data['creator'] = creator if created_at: data['created_at'] = created_at - updated_dep = self.api.patch( - '/deployments/{0}'.format(deployment_id), data=data) - return Deployment(updated_dep) + return self.api.patch( + '/deployments/{0}'.format(deployment_id), + data=data, + wrapper=Deployment, + ) def set_attributes(self, deployment_id, **kwargs): """Set arbitrary properties on the deployment. @@ -912,6 +929,8 @@ def set_attributes(self, deployment_id, **kwargs): For internal use only. """ - updated_dep = self.api.patch( - '/deployments/{0}'.format(deployment_id), data=kwargs) - return Deployment(updated_dep) + return self.api.patch( + '/deployments/{0}'.format(deployment_id), + data=kwargs, + wrapper=Deployment, + ) diff --git a/cloudify_rest_client/evaluate.py b/cloudify_rest_client/evaluate.py index 985a4f1c5..893a64c77 100644 --- a/cloudify_rest_client/evaluate.py +++ b/cloudify_rest_client/evaluate.py @@ -55,9 +55,12 @@ def functions(self, deployment_id, context, payload): :rtype: EvaluatedFunctions """ assert deployment_id - result = self.api.post('/evaluate/functions', data={ - 'deployment_id': deployment_id, - 'context': context, - 'payload': payload - }) - return EvaluatedFunctions(result) + return self.api.post( + '/evaluate/functions', + data={ + 'deployment_id': deployment_id, + 'context': context, + 'payload': payload + }, + wrapper=EvaluatedFunctions, + ) diff --git a/cloudify_rest_client/events.py b/cloudify_rest_client/events.py index 31e17e573..cfc17edce 100644 --- a/cloudify_rest_client/events.py +++ b/cloudify_rest_client/events.py @@ -1,4 +1,3 @@ -import warnings from datetime import datetime from cloudify_rest_client.responses import ListResponse @@ -9,34 +8,6 @@ class EventsClient(object): def __init__(self, api): self.api = api - def get(self, - execution_id, - from_event=0, - batch_size=100, - include_logs=False): - """ - Returns event for the provided execution id. - - :param execution_id: Id of execution to get events for. - :param from_event: Index of first event to retrieve on pagination. - :param batch_size: Maximum number of events to retrieve per call. - :param include_logs: Whether to also get logs. - :return: Events list and total number of currently available - events (tuple). - """ - warnings.warn('method is deprecated, use "{0}" method instead' - .format(self.list.__name__), - DeprecationWarning) - - response = self.list(execution_id=execution_id, - include_logs=include_logs, - _offset=from_event, - _size=batch_size, - _sort='@timestamp') - events = response.items - total_events = response.metadata.pagination.total - return events, total_events - def list(self, include_logs=False, message=None, from_datetime=None, to_datetime=None, _include=None, sort=None, **kwargs): """List events @@ -59,8 +30,12 @@ def list(self, include_logs=False, message=None, from_datetime=None, sort=sort, **kwargs) - response = self.api.get(uri, _include=_include, params=params) - return ListResponse(response['items'], response['metadata']) + return self.api.get( + uri, + _include=_include, + params=params, + wrapper=ListResponse.of(dict), + ) def create(self, events=None, logs=None, execution_id=None, agent_name=None, manager_name=None, @@ -84,7 +59,11 @@ def create(self, events=None, logs=None, execution_id=None, if execution_group_id: data['execution_group_id'] = execution_group_id - self.api.post('/events', data=data, expected_status_code=(201, 204)) + return self.api.post( + '/events', + data=data, + expected_status_code=(201, 204), + ) def delete(self, deployment_id, include_logs=False, message=None, from_datetime=None, to_datetime=None, sort=None, **kwargs): @@ -109,9 +88,11 @@ def delete(self, deployment_id, include_logs=False, message=None, deployment_id=deployment_id, **kwargs) - response = self.api.delete(uri, params=params, - expected_status_code=200) - return ListResponse(response['items'], response['metadata']) + return self.api.delete( + uri, params=params, + expected_status_code=200, + wrapper=ListResponse.of(lambda x: x), + ) @staticmethod def _create_query(include_logs=False, message=None, from_datetime=None, diff --git a/cloudify_rest_client/execution_schedules.py b/cloudify_rest_client/execution_schedules.py index c18257411..aa71aee68 100644 --- a/cloudify_rest_client/execution_schedules.py +++ b/cloudify_rest_client/execution_schedules.py @@ -168,11 +168,13 @@ def create(self, schedule_id, deployment_id, workflow_id, if created_at: data['created_at'] = created_at uri = '/{self._uri_prefix}/{id}'.format(self=self, id=schedule_id) - response = self.api.put(uri, - data=data, - params=params, - expected_status_code=201) - return ExecutionSchedule(response) + return self.api.put( + uri, + data=data, + params=params, + expected_status_code=201, + wrapper=ExecutionSchedule, + ) def update(self, schedule_id, deployment_id, since=None, until=None, recurrence=None, count=None, weekdays=None, rrule=None, @@ -224,11 +226,13 @@ def update(self, schedule_id, deployment_id, since=None, until=None, 'workflow_id': workflow_id, } uri = '/{self._uri_prefix}/{id}'.format(self=self, id=schedule_id) - response = self.api.patch(uri, - data=data, - params=params, - expected_status_code=201) - return ExecutionSchedule(response) + return self.api.patch( + uri, + data=data, + params=params, + expected_status_code=201, + wrapper=ExecutionSchedule, + ) def delete(self, schedule_id, deployment_id): """ @@ -239,10 +243,11 @@ def delete(self, schedule_id, deployment_id): """ assert schedule_id params = {'deployment_id': deployment_id} - self.api.delete('/{self._uri_prefix}/{id}'.format(self=self, - id=schedule_id), - params=params, - expected_status_code=204) + return self.api.delete( + '/{self._uri_prefix}/{id}'.format(self=self, id=schedule_id), + params=params, + expected_status_code=204, + ) def list(self, _include=None, sort=None, is_descending=False, **kwargs): """ @@ -259,11 +264,12 @@ def list(self, _include=None, sort=None, is_descending=False, **kwargs): if sort: params['_sort'] = '-' + sort if is_descending else sort - response = self.api.get('/{self._uri_prefix}'.format(self=self), - params=params, _include=_include) - return ListResponse([ExecutionSchedule(item) - for item in response['items']], - response['metadata']) + return self.api.get( + '/{self._uri_prefix}'.format(self=self), + params=params, + _include=_include, + wrapper=ListResponse.of(ExecutionSchedule), + ) def get(self, schedule_id, deployment_id, _include=None): """Get an execution schedule by its id. @@ -276,5 +282,9 @@ def get(self, schedule_id, deployment_id, _include=None): assert schedule_id params = {'deployment_id': deployment_id} uri = '/{self._uri_prefix}/{id}'.format(self=self, id=schedule_id) - response = self.api.get(uri, _include=_include, params=params) - return ExecutionSchedule(response) + return self.api.get( + uri, + _include=_include, + params=params, + wrapper=ExecutionSchedule, + ) diff --git a/cloudify_rest_client/executions.py b/cloudify_rest_client/executions.py index c8aa59b81..977ea31e9 100644 --- a/cloudify_rest_client/executions.py +++ b/cloudify_rest_client/executions.py @@ -190,9 +190,10 @@ def list(self, _include=None, **kwargs): response['metadata']) def get(self, execution_group_id): - response = self.api.get( - '/execution-groups/{0}'.format(execution_group_id)) - return ExecutionGroup(response) + return self.api.get( + '/execution-groups/{0}'.format(execution_group_id), + wrapper=ExecutionGroup, + ) def create(self, deployment_group_id, workflow_id, executions, force=False, default_parameters=None, parameters=None, @@ -218,8 +219,11 @@ def create(self, deployment_group_id, workflow_id, executions, args['created_by'] = created_by if created_at: args['created_at'] = created_at - response = self.api.post('/execution-groups', data=args) - return ExecutionGroup(response) + return self.api.post( + '/execution-groups', + data=args, + wrapper=ExecutionGroup, + ) def start(self, deployment_group_id, workflow_id, force=False, default_parameters=None, parameters=None, @@ -235,15 +239,18 @@ def start(self, deployment_group_id, workflow_id, force=False, the default parameters on a per-deployment basis :param concurrency: run this many executions at a time """ - response = self.api.post('/execution-groups', data={ - 'force': force, - 'deployment_group_id': deployment_group_id, - 'workflow_id': workflow_id, - 'parameters': parameters, - 'default_parameters': default_parameters, - 'concurrency': concurrency - }) - return ExecutionGroup(response) + return self.api.post( + '/execution-groups', + data={ + 'force': force, + 'deployment_group_id': deployment_group_id, + 'workflow_id': workflow_id, + 'parameters': parameters, + 'default_parameters': default_parameters, + 'concurrency': concurrency + }, + wrapper=ExecutionGroup, + ) def cancel(self, execution_group_id, force=False, kill=False): """Cancel the executions in this group. @@ -253,18 +260,20 @@ def cancel(self, execution_group_id, force=False, kill=False): Queued executions are marked cancelled immediately. """ action = 'kill' if kill else 'force-cancel' if force else 'cancel' - response = self.api.post( + return self.api.post( '/execution-groups/{0}'.format(execution_group_id), - data={'action': action}) - return ExecutionGroup(response) + data={'action': action}, + wrapper=ExecutionGroup, + ) def resume(self, execution_group_id, force=False): """Resume the executions in this group.""" action = 'force-resume' if force else 'resume' - response = self.api.post( + return self.api.post( '/execution-groups/{0}'.format(execution_group_id), - data={'action': action}) - return ExecutionGroup(response) + data={'action': action}, + wrapper=ExecutionGroup, + ) def set_target_group(self, execution_group_id, success_group=None, failed_group=None): @@ -281,14 +290,14 @@ def set_target_group(self, execution_group_id, :param success_group: ID of the target failure deployment group :return: The updated ExecutionGroup """ - response = self.api.patch( + return self.api.patch( '/execution-groups/{0}'.format(execution_group_id), data={ 'success_group_id': success_group, 'failure_group_id': failed_group, - } + }, + wrapper=ExecutionGroup, ) - return ExecutionGroup(response) class ExecutionsClient(object): @@ -296,7 +305,6 @@ class ExecutionsClient(object): def __init__(self, api): self.api = api self._uri_prefix = 'executions' - self._wrapper_cls = Execution def _create_filters( self, @@ -325,8 +333,7 @@ def should_start(self, execution_id): assert execution_id uri = '/{self._uri_prefix}/{id}/should-start'.format( self=self, id=execution_id) - response = self.api.get(uri) - return response + return self.api.get(uri) def list(self, _include=None, **kwargs): """Returns a list of executions. @@ -343,13 +350,11 @@ def list(self, _include=None, **kwargs): """ params = self._create_filters(**kwargs) - response = self.api.get( + return self.api.get( '/{self._uri_prefix}'.format(self=self), params=params, - _include=_include) - return ListResponse( - [self._wrapper_cls(item) for item in response['items']], - response['metadata'] + _include=_include, + wrapper=ListResponse.of(Execution), ) def get(self, execution_id, _include=None): @@ -361,8 +366,7 @@ def get(self, execution_id, _include=None): """ assert execution_id uri = '/{self._uri_prefix}/{id}'.format(self=self, id=execution_id) - response = self.api.get(uri, _include=_include) - return self._wrapper_cls(response) + return self.api.get(uri, _include=_include, wrapper=Execution) def update(self, execution_id, status, error=None): """Update execution with the provided status and optional error. @@ -377,8 +381,7 @@ def update(self, execution_id, status, error=None): params = {'status': status} if error: params['error'] = error - response = self.api.patch(uri, data=params) - return Execution(response) + return self.api.patch(uri, data=params, wrapper=Execution) def start(self, *args, **kwargs): """Starts a deployment's workflow execution whose id is provided. @@ -438,10 +441,12 @@ def create(self, deployment_id, workflow_id, parameters=None, 'error': error, } uri = '/executions' - response = self.api.post(uri, - data=data, - expected_status_code=201) - return Execution(response) + return self.api.post( + uri, + data=data, + expected_status_code=201, + wrapper=Execution, + ) def cancel(self, execution_id, force=False, kill=False): """Cancels an execution. @@ -454,10 +459,12 @@ def cancel(self, execution_id, force=False, kill=False): """ uri = '/{self._uri_prefix}/{id}'.format(self=self, id=execution_id) action = 'kill' if kill else 'force-cancel' if force else 'cancel' - response = self.api.post(uri, - data={'action': action}, - expected_status_code=200) - return self._wrapper_cls(response) + return self.api.post( + uri, + data={'action': action}, + expected_status_code=200, + wrapper=Execution, + ) def resume(self, execution_id, force=False): """Resume an execution. @@ -469,10 +476,12 @@ def resume(self, execution_id, force=False): """ uri = '/{self._uri_prefix}/{id}'.format(self=self, id=execution_id) action = 'force-resume' if force else 'resume' - response = self.api.post(uri, - data={'action': action}, - expected_status_code=200) - return self._wrapper_cls(response) + return self.api.post( + uri, + data={'action': action}, + expected_status_code=200, + wrapper=Execution, + ) def requeue(self, execution_id): """ @@ -482,10 +491,12 @@ def requeue(self, execution_id): :return: Requeued execution. """ uri = '/{self._uri_prefix}/{id}'.format(self=self, id=execution_id) - response = self.api.post(uri, - data={'action': 'requeue'}, - expected_status_code=200) - return self._wrapper_cls(response) + return self.api.post( + uri, + data={'action': 'requeue'}, + expected_status_code=200, + wrapper=Execution, + ) def delete(self, to_datetime=None, keep_last=None, **kwargs): """Deletes finished executions from the DB. @@ -503,8 +514,10 @@ def delete(self, to_datetime=None, keep_last=None, **kwargs): data['to_datetime'] = to_datetime.isoformat() if keep_last: data['keep_last'] = keep_last - response = self.api.delete('/{self._uri_prefix}'.format(self=self), - data=data, - params=kwargs, - expected_status_code=200) - return response['items'][0]['count'] + return self.api.delete( + '/{self._uri_prefix}'.format(self=self), + data=data, + params=kwargs, + expected_status_code=200, + wrapper=lambda response: response['items'][0]['count'], + ) diff --git a/cloudify_rest_client/filters.py b/cloudify_rest_client/filters.py index 894a3ffc9..1bbfb6bab 100644 --- a/cloudify_rest_client/filters.py +++ b/cloudify_rest_client/filters.py @@ -75,9 +75,11 @@ def create(self, data['created_at'] = created_at if created_by: data['created_by'] = created_by - response = self.api.put('{0}/{1}'.format(self.uri, filter_id), - data=data) - return Filter(response) + return self.api.put( + '{0}/{1}'.format(self.uri, filter_id), + data=data, + wrapper=Filter, + ) def list(self, sort=None, is_descending=False, **kwargs): """Returns a list of all filters. @@ -94,16 +96,20 @@ def list(self, sort=None, is_descending=False, **kwargs): if sort: params['_sort'] = '-' + sort if is_descending else sort - response = self.api.get(self.uri, params=params) - return ListResponse([Filter(item) for item in response['items']], - response['metadata']) + return self.api.get( + self.uri, + params=params, + wrapper=ListResponse.of(Filter), + ) def get(self, filter_id): - response = self.api.get('{0}/{1}'.format(self.uri, filter_id)) - return Filter(response) + return self.api.get( + '{0}/{1}'.format(self.uri, filter_id), + wrapper=Filter, + ) def delete(self, filter_id): - self.api.delete('{0}/{1}'.format(self.uri, filter_id)) + return self.api.delete('{0}/{1}'.format(self.uri, filter_id)) def update(self, filter_id, new_filter_rules=None, new_visibility=None): """Updates the filter's visibility or rules @@ -131,9 +137,11 @@ def update(self, filter_id, new_filter_rules=None, new_visibility=None): if new_filter_rules: data['filter_rules'] = new_filter_rules - response = self.api.patch('{0}/{1}'.format(self.uri, filter_id), - data=data) - return Filter(response) + return self.api.patch( + '{0}/{1}'.format(self.uri, filter_id), + data=data, + wrapper=Filter, + ) class BlueprintsFiltersClient(FiltersClient): diff --git a/cloudify_rest_client/inter_deployment_dependencies.py b/cloudify_rest_client/inter_deployment_dependencies.py index 171512b99..e78977061 100644 --- a/cloudify_rest_client/inter_deployment_dependencies.py +++ b/cloudify_rest_client/inter_deployment_dependencies.py @@ -56,13 +56,6 @@ class InterDeploymentDependencyClient(object): def __init__(self, api): self.api = api self._uri_prefix = 'deployments/inter-deployment-dependencies' - self._wrapper_cls = InterDeploymentDependency - - def _wrap_list(self, response): - return ListResponse( - [self._wrapper_cls(item) for item in response['items']], - response['metadata'] - ) def create(self, dependency_creator, source_deployment, target_deployment=None, @@ -93,9 +86,11 @@ def create(self, dependency_creator, source_deployment, target_deployment_func, external_source, external_target) - response = self.api.put( - '/{self._uri_prefix}'.format(self=self), data=data) - return self._wrapper_cls(response) + return self.api.put( + '/{self._uri_prefix}'.format(self=self), + data=data, + wrapper=InterDeploymentDependency, + ) def create_many(self, source_deployment_id, inter_deployment_dependencies): """Creates a number of inter-deployment dependencies. @@ -106,12 +101,14 @@ def create_many(self, source_deployment_id, inter_deployment_dependencies): dependencies descriptions, but without a source_deployment(_id). :return: a list of created InterDeploymentDependencies IDs. """ - response = self.api.post( - '/{self._uri_prefix}'.format(self=self), data={ + return self.api.post( + '/{self._uri_prefix}'.format(self=self), + data={ 'source_deployment_id': source_deployment_id, - 'inter_deployment_dependencies': inter_deployment_dependencies} + 'inter_deployment_dependencies': inter_deployment_dependencies + }, + wrapper=ListResponse.of(InterDeploymentDependency), ) - return self._wrap_list(response) def update_all(self, source_deployment_id, inter_deployment_dependencies): """Update (i.e. rewrite all) inter-deployment dependencies for @@ -123,14 +120,14 @@ def update_all(self, source_deployment_id, inter_deployment_dependencies): dependencies descriptions, but without a source_deployment(_id). :return: a list of created InterDeploymentDependencies IDs. """ - response = self.api.put( + return self.api.put( '/deployments/{0}/inter-deployment-dependencies'.format( source_deployment_id), data={ 'inter_deployment_dependencies': inter_deployment_dependencies, }, + wrapper=ListResponse.of(InterDeploymentDependency), ) - return self._wrap_list(response) def delete(self, dependency_creator, source_deployment, target_deployment=None, @@ -162,7 +159,10 @@ def delete(self, dependency_creator, source_deployment, external_source=external_source, external_target=external_target) data['is_component_deletion'] = is_component_deletion - self.api.delete('/{self._uri_prefix}'.format(self=self), data=data) + return self.api.delete( + '/{self._uri_prefix}'.format(self=self), + data=data, + ) def list(self, _include=None, sort=None, is_descending=False, **kwargs): """ @@ -179,10 +179,12 @@ def list(self, _include=None, sort=None, is_descending=False, **kwargs): if sort: params['_sort'] = '-' + sort if is_descending else sort - response = self.api.get('/{self._uri_prefix}'.format(self=self), - _include=_include, - params=params) - return self._wrap_list(response) + return self.api.get( + '/{self._uri_prefix}'.format(self=self), + _include=_include, + params=params, + wrapper=ListResponse.of(InterDeploymentDependency), + ) def restore(self, deployment_id, update_service_composition): """ @@ -194,5 +196,7 @@ def restore(self, deployment_id, update_service_composition): 'deployment_id': deployment_id, 'update_service_composition': update_service_composition, } - self.api.post('/{self._uri_prefix}/restore'.format(self=self), - data=data) + return self.api.post( + '/{self._uri_prefix}/restore'.format(self=self), + data=data, + ) diff --git a/cloudify_rest_client/labels.py b/cloudify_rest_client/labels.py index a2bdd423a..5106fe92a 100644 --- a/cloudify_rest_client/labels.py +++ b/cloudify_rest_client/labels.py @@ -32,8 +32,10 @@ def list_keys(self): """ Returns all defined label keys, from all elements of the resource. """ - response = self.api.get('/labels/{0}'.format(self.resource_name)) - return ListResponse(response['items'], response['metadata']) + return self.api.get( + '/labels/{0}'.format(self.resource_name), + wrapper=ListResponse.of(lambda x: x), + ) def list_key_values(self, label_key): """ @@ -41,15 +43,18 @@ def list_key_values(self, label_key): :param label_key: The resource labels' key to list the values for. """ - response = self.api.get( - '/labels/{0}/{1}'.format(self.resource_name, label_key)) - return ListResponse(response['items'], response['metadata']) + return self.api.get( + '/labels/{0}/{1}'.format(self.resource_name, label_key), + wrapper=ListResponse.of(lambda x: x), + ) def get_reserved_labels_keys(self): """Returns the reserved labels keys (`csys-` prefixed).""" - response = self.api.get('/labels/{0}'.format(self.resource_name), - params={'_reserved': True}) - return ListResponse(response['items'], response['metadata']) + return self.api.get( + '/labels/{0}'.format(self.resource_name), + params={'_reserved': True}, + wrapper=ListResponse.of(lambda x: x), + ) class DeploymentsLabelsClient(_LabelsClient): diff --git a/cloudify_rest_client/ldap.py b/cloudify_rest_client/ldap.py index 31d153949..9283b9a80 100644 --- a/cloudify_rest_client/ldap.py +++ b/cloudify_rest_client/ldap.py @@ -119,8 +119,7 @@ def set(self, with open(ldap_ca_path) as cert_handle: params['ldap_ca_cert'] = cert_handle.read() uri = '/ldap' - response = self.api.post(uri, params) - return LdapResponse(response) + return self.api.post(uri, params, wrapper=LdapResponse) def get_status(self): uri = '/ldap' diff --git a/cloudify_rest_client/license.py b/cloudify_rest_client/license.py index 33e30ad77..1d57eeabd 100644 --- a/cloudify_rest_client/license.py +++ b/cloudify_rest_client/license.py @@ -70,7 +70,6 @@ class LicenseClient(object): def __init__(self, api): self.api = api - self._wrapper_cls = License def check(self): """Check license state of manager is healthy. @@ -78,19 +77,14 @@ def check(self): If this is not the case, the following exception will be thrown: cloudify_rest_client.exceptions.MissingCloudifyLicense """ - self.api.get('/license-check') + return self.api.get('/license-check') def list(self): """Get the Cloudify license from the Manager. :rtype: License """ - response = self.api.get('/license') - - return ListResponse( - [self._wrapper_cls(item) for item in response['items']], - response['metadata'] - ) + return self.api.get('/license', wrapper=ListResponse.of(License)) def upload(self, license_path): """Uploads a Cloudify license the Manager @@ -103,14 +97,12 @@ def upload(self, license_path): license_path, client=self.api) - response = self.api.put( + return self.api.put( '/license', data=data ) - return response - def delete(self): """Remove the the Cloudify license from the Manager. """ - self.api.delete('/license') + return self.api.delete('/license') diff --git a/cloudify_rest_client/log_bundles.py b/cloudify_rest_client/log_bundles.py index 056a9c428..26c780e50 100644 --- a/cloudify_rest_client/log_bundles.py +++ b/cloudify_rest_client/log_bundles.py @@ -52,8 +52,7 @@ def get(self, log_bundle_id, _include=None): :return: LogBundle. """ uri = self.base_url + log_bundle_id - response = self.api.get(uri, _include=_include) - return LogBundle(response) + return self.api.get(uri, _include=_include, wrapper=LogBundle) def list(self, _include=None, sort=None, is_descending=False, **kwargs): """Returns a list of currently stored log bundles. @@ -68,10 +67,12 @@ def list(self, _include=None, sort=None, is_descending=False, **kwargs): if sort: params['_sort'] = '-' + sort if is_descending else sort - response = self.api.get(self.base_url.rstrip('/'), - params=params, _include=_include) - return ListResponse([LogBundle(item) for item in response['items']], - response['metadata']) + return self.api.get( + self.base_url.rstrip('/'), + params=params, + _include=_include, + wrapper=ListResponse.of(LogBundle), + ) def create(self, log_bundle_id, @@ -84,15 +85,19 @@ def create(self, """ uri = self.base_url + log_bundle_id params = {'queue': queue} - response = self.api.put(uri, data=params, expected_status_code=201) - return Execution(response) + return self.api.put( + uri, + data=params, + expected_status_code=201, + wrapper=Execution, + ) def delete(self, log_bundle_id): """Deletes the log bundle whose id matches the provided log bundle id. :param log_bundle_id: The id of the log bundle to be deleted. """ uri = self.base_url + log_bundle_id - self.api.delete(uri) + return self.api.delete(uri) def download(self, log_bundle_id, output_file, progress_callback=None): """Downloads a previously created log bundle from a manager. @@ -104,10 +109,10 @@ def download(self, log_bundle_id, output_file, progress_callback=None): """ uri = self.base_url + '{}/archive'.format(log_bundle_id) + # TODO this is not async-friendly with contextlib.closing(self.api.get(uri, stream=True)) as response: output_file = bytes_stream_utils.write_response_stream_to_file( response, output_file, progress_callback=progress_callback) - return output_file def update_status(self, log_bundle_id, status, error=None): @@ -120,4 +125,4 @@ def update_status(self, log_bundle_id, status, error=None): params = {'status': status} if error: params['error'] = error - self.api.patch(uri, data=params) + return self.api.patch(uri, data=params) diff --git a/cloudify_rest_client/maintenance.py b/cloudify_rest_client/maintenance.py index e2e7992de..8ef6c6017 100644 --- a/cloudify_rest_client/maintenance.py +++ b/cloudify_rest_client/maintenance.py @@ -72,8 +72,7 @@ def status(self): :return: Maintenance mode state. """ uri = '/maintenance' - response = self.api.get(uri) - return Maintenance(response) + return self.api.get(uri, wrapper=Maintenance) def activate(self): """ @@ -83,11 +82,10 @@ def activate(self): """ uri = '/maintenance/activate' try: - response = self.api.post(uri) + return self.api.post(uri, wrapper=Maintenance) except NotModifiedError as e: e.message = 'Maintenance mode is already on.' raise - return Maintenance(response) def deactivate(self): """ @@ -97,8 +95,7 @@ def deactivate(self): """ uri = '/maintenance/deactivate' try: - response = self.api.post(uri) + return self.api.post(uri, wrapper=Maintenance) except NotModifiedError as e: e.message = 'Maintenance mode is already off.' raise - return Maintenance(response) diff --git a/cloudify_rest_client/manager.py b/cloudify_rest_client/manager.py index a18865c37..42f915326 100644 --- a/cloudify_rest_client/manager.py +++ b/cloudify_rest_client/manager.py @@ -262,8 +262,7 @@ def get_status(self): """ :return: Cloudify's management machine status. """ - response = self.api.get('/status') - return response + return self.api.get('/status') def get_config(self, name=None, scope=None): """Get configuration of the manager. @@ -272,18 +271,27 @@ def get_config(self, name=None, scope=None): provided, return all values for that scope. """ if name and scope: - response = self.api.get('/config/{0}.{1}'.format(scope, name)) - return ConfigItem(response) + return self.api.get( + '/config/{0}.{1}'.format(scope, name), + wrapper=ConfigItem, + ) if name: - response = self.api.get('/config/{0}'.format(name)) - return ConfigItem(response) + return self.api.get( + '/config/{0}'.format(name), + wrapper=ConfigItem, + ) if scope: - response = self.api.get('/config', params={'scope': scope}) + return self.api.get( + '/config', + params={'scope': scope}, + wrapper=ListResponse.of(ConfigItem), + ) else: - response = self.api.get('/config') - return ListResponse([ConfigItem(item) for item in response['items']], - response['metadata']) + return self.api.get( + '/config', + wrapper=ListResponse.of(ConfigItem), + ) def put_config(self, name, value, force=False): """Update a given setting. @@ -292,11 +300,14 @@ def put_config(self, name, value, force=False): :param force: Force changing non-editable settings """ - response = self.api.put('/config/{0}'.format(name), data={ - 'value': value, - 'force': force - }) - return ConfigItem(response) + return self.api.put( + '/config/{0}'.format(name), + data={ + 'value': value, + 'force': force + }, + wrapper=ConfigItem, + ) def add_manager(self, hostname, private_ip, public_ip, version, edition, distribution, distro_release, @@ -320,8 +331,7 @@ def add_manager(self, hostname, private_ip, public_ip, version, manager['fs_sync_node_id'] = fs_sync_node_id if networks: manager['networks'] = networks - response = self.api.post('/managers', data=manager) - return ManagerItem(response) + return self.api.post('/managers', data=manager, wrapper=ManagerItem) def remove_manager(self, hostname): """ @@ -331,7 +341,7 @@ def remove_manager(self, hostname): the cluster, not necessarily for uninstalling the manager :param hostname: The manager's hostname """ - self.api.delete('/managers/{0}'.format(hostname)) + return self.api.delete('/managers/{0}'.format(hostname)) def update_manager(self, hostname, fs_sync_node_id, bootstrap_cluster): """ @@ -342,11 +352,14 @@ def update_manager(self, hostname, fs_sync_node_id, bootstrap_cluster): :param bootstrap_cluster: Whether it is the 1st manager in the cluster or not """ - response = self.api.put('/managers/{0}'.format(hostname), data={ - 'fs_sync_node_id': fs_sync_node_id, - 'bootstrap_cluster': bootstrap_cluster - }) - return ManagerItem(response) + return self.api.put( + '/managers/{0}'.format(hostname), + data={ + 'fs_sync_node_id': fs_sync_node_id, + 'bootstrap_cluster': bootstrap_cluster + }, + wrapper=ManagerItem, + ) def get_managers(self, hostname=None, _include=None): """ @@ -356,14 +369,18 @@ def get_managers(self, hostname=None, _include=None): :param _include: list of columns to include in the returned list """ if hostname: - response = self.api.get('/managers', params={'hostname': hostname}, - _include=_include) + return self.api.get( + '/managers', + params={'hostname': hostname}, + _include=_include, + wrapper=ListResponse.of(ManagerItem), + ) else: - response = self.api.get('/managers', _include=_include) - return ListResponse( - [ManagerItem(item) for item in response['items']], - response['metadata'] - ) + return self.api.get( + '/managers', + _include=_include, + wrapper=ListResponse.of(ManagerItem), + ) def add_broker(self, name, address, port=None, networks=None): """Add a broker to the brokers table. @@ -390,8 +407,11 @@ def add_broker(self, name, address, port=None, networks=None): params['port'] = port if networks: params['networks'] = networks - response = self.api.post('/brokers', data=params) - return RabbitMQBrokerItem(response) + return self.api.post( + '/brokers', + data=params, + wrapper=RabbitMQBrokerItem, + ) def remove_broker(self, name): """Remove a broker from the brokers table. @@ -404,7 +424,7 @@ def remove_broker(self, name): :return: The broker that was deleted. """ - self.api.delete('/brokers/{0}'.format(name)) + return self.api.delete('/brokers/{0}'.format(name)) def update_broker(self, name, networks): """Update a broker. @@ -417,16 +437,18 @@ def update_broker(self, name, networks): :return: The updated broker. """ - response = self.api.put('/brokers/{0}'.format(name), data={ - 'networks': networks, - }) - return RabbitMQBrokerItem(response) + return self.api.put( + '/brokers/{0}'.format(name), + data={ + 'networks': networks, + }, + wrapper=RabbitMQBrokerItem, + ) def get_brokers(self): - response = self.api.get('/brokers',) - return ListResponse( - [RabbitMQBrokerItem(item) for item in response['items']], - response['metadata'] + return self.api.get( + '/brokers', + wrapper=ListResponse.of(RabbitMQBrokerItem), ) def update_db_nodes(self): @@ -435,25 +457,20 @@ def update_db_nodes(self): :return: A list of DB nodes in the cluster. """ params = {'action': 'update'} - response = self.api.post('/db-nodes', data=params) - return ListResponse( - [DBNodeItem(item) for item in response['items']], - response['metadata'] + return self.api.post( + '/db-nodes', + data=params, + wrapper=ListResponse.of(DBNodeItem), ) def get_db_nodes(self): - response = self.api.get('/db-nodes') - return ListResponse( - [DBNodeItem(item) for item in response['items']], - response['metadata'] - ) + return self.api.get('/db-nodes', wrapper=ListResponse.of(DBNodeItem)) def get_version(self): """ :return: Cloudify's management machine version information. """ - response = self.api.get('/version', versioned_url=False) - return response + return self.api.get('/version', versioned_url=False) def get_context(self, _include=None): """ @@ -464,8 +481,7 @@ def get_context(self, _include=None): :param _include: List of fields to include in response. :return: Context stored in manager. """ - response = self.api.get('/provider/context', _include=_include) - return response + return self.api.get('/provider/context', _include=_include) def create_context(self, name, context): """ @@ -478,11 +494,11 @@ def create_context(self, name, context): :param context: Context as dict. :return: Create context result. """ - data = {'name': name, 'context': context} - response = self.api.post('/provider/context', - data, - expected_status_code=201) - return response + return self.api.post( + '/provider/context', + data={'name': name, 'context': context}, + expected_status_code=201, + ) def update_context(self, name, context): @@ -497,9 +513,9 @@ def update_context(self, name, context): :param context: Context as dict. """ - - data = {'name': name, 'context': context} - response = self.api.post('/provider/context', data, - expected_status_code=200, - params={'update': 'true'}) - return response + return self.api.post( + '/provider/context', + data={'name': name, 'context': context}, + expected_status_code=200, + params={'update': 'true'}, + ) diff --git a/cloudify_rest_client/node_instances.py b/cloudify_rest_client/node_instances.py index 5c4875cac..a9340657c 100644 --- a/cloudify_rest_client/node_instances.py +++ b/cloudify_rest_client/node_instances.py @@ -134,10 +134,8 @@ def has_configuration_drift(self): class NodeInstancesClient(object): - def __init__(self, api): self.api = api - self._wrapper_cls = NodeInstance self._uri_prefix = 'node-instances' def create_many(self, deployment_id, node_instances): @@ -149,7 +147,7 @@ def create_many(self, deployment_id, node_instances): keys: id, node_id. :return: None """ - self.api.post( + return self.api.post( '/{self._uri_prefix}'.format(self=self), data={ 'deployment_id': deployment_id, @@ -168,10 +166,12 @@ def get(self, node_instance_id, _include=None, evaluate_functions=False): :return: The retrieved node instance. """ assert node_instance_id - uri = '/{self._uri_prefix}/{id}'.format(self=self, id=node_instance_id) - params = {'_evaluate_functions': evaluate_functions} - response = self.api.get(uri, params=params, _include=_include) - return self._wrapper_cls(response) + return self.api.get( + '/{self._uri_prefix}/{id}'.format(self=self, id=node_instance_id), + params={'_evaluate_functions': evaluate_functions}, + _include=_include, + wrapper=NodeInstance, + ) def update(self, node_instance_id, @@ -212,8 +212,12 @@ def update(self, params = {} if force: params['force'] = True - response = self.api.patch(uri, params=params, data=data) - return NodeInstance(response) + return self.api.patch( + uri, + params=params, + data=data, + wrapper=NodeInstance, + ) def _create_filters( self, @@ -266,23 +270,22 @@ def list(self, _include=None, constraints=None, **kwargs): params = self._create_filters(**kwargs) if constraints is None: - response = self.api.get('/{self._uri_prefix}'.format(self=self), - params=params, - _include=_include) + return self.api.get( + '/{self._uri_prefix}'.format(self=self), + params=params, + _include=_include, + wrapper=ListResponse.of(NodeInstance), + ) else: if _include: params['_include'] = ','.join(_include) - response = self.api.post( + return self.api.post( '/searches/{self._uri_prefix}'.format(self=self), params=params, - data={'constraints': constraints} + data={'constraints': constraints}, + wrapper=ListResponse.of(NodeInstance), ) - return ListResponse( - [self._wrapper_cls(item) for item in response['items']], - response['metadata'] - ) - def search(self, ids, all_tenants=False): """Search node instances by their IDs. @@ -294,17 +297,18 @@ def search(self, ids, all_tenants=False): params = {} if all_tenants: params['_all_tenants'] = True - response = self.api.post('/searches/node-instances', data={ - 'filter_rules': [{ - 'key': 'id', - 'values': ids, - 'operator': 'any_of', - 'type': 'attribute' - }] - }, params=params) - return ListResponse( - [self._wrapper_cls(item) for item in response['items']], - response['metadata'] + return self.api.post( + '/searches/node-instances', + data={ + 'filter_rules': [{ + 'key': 'id', + 'values': ids, + 'operator': 'any_of', + 'type': 'attribute' + }] + }, + params=params, + wrapper=ListResponse.of(NodeInstance), ) def delete(self, instance_id): @@ -316,7 +320,7 @@ def delete(self, instance_id): :param instance_id: ID of the instance to be deleted """ - self.api.delete( + return self.api.delete( '/{self._uri_prefix}/{instance_id}' .format(self=self, instance_id=instance_id), expected_status_code=204, diff --git a/cloudify_rest_client/nodes.py b/cloudify_rest_client/nodes.py index d3ea7c8e0..7c39271da 100644 --- a/cloudify_rest_client/nodes.py +++ b/cloudify_rest_client/nodes.py @@ -173,10 +173,8 @@ def type(self): class NodesClient(object): - def __init__(self, api): self.api = api - self._wrapper_cls = Node self._uri_prefix = 'nodes' self.types = NodeTypesClient(api) @@ -203,7 +201,7 @@ def _create_filters( return params def list(self, _include=None, filter_rules=None, constraints=None, - **kwargs): + wrapper=None, **kwargs): """ Returns a list of nodes which belong to the deployment identified by the provided deployment id. @@ -227,32 +225,32 @@ def list(self, _include=None, filter_rules=None, constraints=None, 'provide either filter_rules or DSL constraints, not both') params = self._create_filters(**kwargs) + wrapper = wrapper or ListResponse.of(Node) if filter_rules is not None: if _include: params['_include'] = ','.join(_include) - response = self.api.post( + return self.api.post( '/searches/{self._uri_prefix}'.format(self=self), params=params, - data={'filter_rules': filter_rules} + data={'filter_rules': filter_rules}, + wrapper=wrapper, ) elif constraints is not None: if _include: params['_include'] = ','.join(_include) - response = self.api.post( + return self.api.post( '/searches/{self._uri_prefix}'.format(self=self), params=params, - data={'constraints': constraints} + data={'constraints': constraints}, + wrapper=wrapper, ) else: - response = self.api.get( + return self.api.get( '/{self._uri_prefix}'.format(self=self), params=params, - _include=_include + _include=_include, + wrapper=wrapper ) - return ListResponse( - [self._wrapper_cls(item) for item in response['items']], - response['metadata'] - ) def get(self, deployment_id, node_id, _include=None, evaluate_functions=False): @@ -269,14 +267,19 @@ def get(self, deployment_id, node_id, _include=None, """ assert deployment_id assert node_id - result = self.list(deployment_id=deployment_id, - id=node_id, - _include=_include, - evaluate_functions=evaluate_functions) - if not result: - return None - else: - return result[0] + + def _get_single_node(response): + if not response.get('items'): + return None + return Node(response['items'][0]) + + return self.list( + deployment_id=deployment_id, + id=node_id, + _include=_include, + evaluate_functions=evaluate_functions, + wrapper=_get_single_node, + ) def create_many(self, deployment_id, nodes): """Create multiple nodes. @@ -286,7 +289,7 @@ def create_many(self, deployment_id, nodes): Each node dict must contain at least the keys: id, type. :return: None """ - self.api.post( + return self.api.post( '/{self._uri_prefix}'.format(self=self), data={ 'deployment_id': deployment_id, @@ -306,7 +309,7 @@ def update(self, deployment_id, node_id, **kwargs): :param node_id: The node id within the given deployment :param kwargs: The new node attributes """ - self.api.patch( + return self.api.patch( '/{self._uri_prefix}/{deployment_id}/{node_id}' .format(self=self, deployment_id=deployment_id, node_id=node_id), data=kwargs, @@ -319,7 +322,7 @@ def delete(self, deployment_id, node_id): :param deployment_id: The deployment the node belongs to :param node_id: The node id within the given deployment """ - self.api.delete( + return self.api.delete( '/{self._uri_prefix}/{deployment_id}/{node_id}' .format(self=self, deployment_id=deployment_id, node_id=node_id), expected_status_code=204, @@ -353,9 +356,8 @@ def list(self, node_type=None, constraints=None, **kwargs): if constraints is None: constraints = dict() - response = self.api.post('/searches/node-types', params=params, - data={'constraints': constraints}) - return ListResponse( - items=[NodeTypes(item) for item in response['items']], - metadata=response['metadata'] + return self.api.post( + '/searches/node-types', params=params, + data={'constraints': constraints}, + wrapper=ListResponse.of(NodeTypes), ) diff --git a/cloudify_rest_client/operations.py b/cloudify_rest_client/operations.py index dd9661642..6da2c891d 100644 --- a/cloudify_rest_client/operations.py +++ b/cloudify_rest_client/operations.py @@ -50,7 +50,6 @@ class OperationsClient(object): def __init__(self, api): self.api = api self._uri_prefix = 'operations' - self._wrapper_cls = Operation def list( self, @@ -90,16 +89,18 @@ def list( params['_offset'] = _offset if _size is not None: params['_size'] = _size - response = self.api.get('/{self._uri_prefix}'.format(self=self), - params=params, _include=_include) - return ListResponse( - [self._wrapper_cls(item) for item in response['items']], - response['metadata']) + return self.api.get( + '/{self._uri_prefix}'.format(self=self), + params=params, + _include=_include, + wrapper=ListResponse.of(Operation), + ) def get(self, operation_id): - response = self.api.get('/{self._uri_prefix}/{id}' - .format(self=self, id=operation_id)) - return Operation(response) + return self.api.get( + '/{self._uri_prefix}/{id}'.format(self=self, id=operation_id), + wrapper=Operation, + ) def create(self, operation_id, graph_id, name, type, parameters, dependencies): @@ -111,14 +112,18 @@ def create(self, operation_id, graph_id, name, type, parameters, 'parameters': parameters } uri = '/operations/{0}'.format(operation_id) - response = self.api.put(uri, data=data, expected_status_code=201) - return Operation(response) + return self.api.put( + uri, + data=data, + expected_status_code=201, + wrapper=Operation, + ) def update(self, operation_id, state, result=None, exception=None, exception_causes=None, manager_name=None, agent_name=None): uri = '/operations/{0}'.format(operation_id) - self.api.patch(uri, data={ + return self.api.patch(uri, data={ 'state': state, 'result': result, 'exception': exception, @@ -144,7 +149,7 @@ def _update_operation_inputs(self, deployment_id=None, node_id=None, :param rel_index: when updating relationship operations, look at the relationship at this index """ - self.api.post('/operations', data={ + return self.api.post('/operations', data={ 'action': 'update-stored', 'deployment_id': deployment_id, 'node_id': node_id, @@ -154,8 +159,7 @@ def _update_operation_inputs(self, deployment_id=None, node_id=None, }, expected_status_code=(200, 204)) def delete(self, operation_id): - uri = '/operations/{0}'.format(operation_id) - self.api.delete(uri) + return self.api.delete('/operations/{0}'.format(operation_id)) class TasksGraph(dict): @@ -179,17 +183,17 @@ class TasksGraphClient(object): def __init__(self, api): self.api = api self._uri_prefix = 'tasks_graphs' - self._wrapper_cls = TasksGraph def list(self, execution_id, name=None, _include=None): params = {'execution_id': execution_id} if name: params['name'] = name - response = self.api.get('/{self._uri_prefix}'.format(self=self), - params=params, _include=_include) - return ListResponse( - [self._wrapper_cls(item) for item in response['items']], - response['metadata']) + return self.api.get( + '/{self._uri_prefix}'.format(self=self), + params=params, + _include=_include, + wrapper=ListResponse.of(TasksGraph), + ) def create(self, execution_id, name, operations=None, created_at=None, graph_id=None): @@ -203,10 +207,17 @@ def create(self, execution_id, name, operations=None, created_at=None, if graph_id: params['graph_id'] = graph_id uri = '/{self._uri_prefix}/tasks_graphs'.format(self=self) - response = self.api.post(uri, data=params, expected_status_code=201) - return TasksGraph(response) + return self.api.post( + uri, + data=params, + expected_status_code=201, + wrapper=TasksGraph, + ) def update(self, tasks_graph_id, state): uri = '/tasks_graphs/{0}'.format(tasks_graph_id) - response = self.api.patch(uri, data={'state': state}) - return TasksGraph(response) + return self.api.patch( + uri, + data={'state': state}, + wrapper=TasksGraph, + ) diff --git a/cloudify_rest_client/permissions.py b/cloudify_rest_client/permissions.py index 4bedd34b6..b6021e5e2 100644 --- a/cloudify_rest_client/permissions.py +++ b/cloudify_rest_client/permissions.py @@ -10,7 +10,6 @@ def __init__(self, permission): class PermissionsClient(object): def __init__(self, api): self.api = api - self._wrapper_cls = Permission self._uri_prefix = '/permissions' def list(self, role=None): @@ -22,11 +21,7 @@ def list(self, role=None): url = self._uri_prefix if role: url = '{0}/{1}'.format(url, role) - response = self.api.get(url) - return ListResponse( - [self._wrapper_cls(item) for item in response['items']], - response['metadata'] - ) + return self.api.get(url, wrapper=ListResponse.of(Permission)) def add(self, permission, role): """Allow role the specified permission @@ -34,7 +29,9 @@ def add(self, permission, role): :param permission: the permission name to allow :param role: the role name """ - self.api.put('{0}/{1}/{2}'.format(self._uri_prefix, role, permission)) + return self.api.put( + '{0}/{1}/{2}'.format(self._uri_prefix, role, permission), + ) def delete(self, permission, role): """Disallow role the specified permission @@ -42,5 +39,5 @@ def delete(self, permission, role): :param permission: the permission name to disallow :param role: the role name """ - self.api.delete( + return self.api.delete( '{0}/{1}/{2}'.format(self._uri_prefix, role, permission)) diff --git a/cloudify_rest_client/plugins.py b/cloudify_rest_client/plugins.py index 3f5042e3c..34184f3bf 100644 --- a/cloudify_rest_client/plugins.py +++ b/cloudify_rest_client/plugins.py @@ -208,7 +208,6 @@ class PluginsClient(object): def __init__(self, api): self.api = api self._uri_prefix = 'plugins' - self._wrapper_cls = Plugin def get(self, plugin_id, _include=None, **kwargs): """ @@ -219,14 +218,11 @@ def get(self, plugin_id, _include=None, **kwargs): :return: The plugin details. """ assert plugin_id - uri = '/{self._uri_prefix}/{id}'.format(self=self, id=plugin_id) - response = self.api.get(uri, _include=_include, params=kwargs) - return self._wrapper_cls(response) - - def _wrap_list(self, response): - return ListResponse( - [self._wrapper_cls(item) for item in response['items']], - response['metadata'] + return self.api.get( + '/{self._uri_prefix}/{id}'.format(self=self, id=plugin_id), + _include=_include, + params=kwargs, + wrapper=Plugin, ) def list(self, _include=None, sort=None, is_descending=False, **kwargs): @@ -243,10 +239,12 @@ def list(self, _include=None, sort=None, is_descending=False, **kwargs): if sort: params['_sort'] = '-' + sort if is_descending else sort - response = self.api.get('/{self._uri_prefix}'.format(self=self), - _include=_include, - params=params) - return self._wrap_list(response) + return self.api.get( + '/{self._uri_prefix}'.format(self=self), + _include=_include, + params=params, + wrapper=ListResponse.of(Plugin), + ) def delete(self, plugin_id, force=False): """ @@ -260,7 +258,7 @@ def delete(self, plugin_id, force=False): data = { 'force': force } - self.api.delete('/plugins/{0}'.format(plugin_id), data=data) + return self.api.delete('/plugins/{0}'.format(plugin_id), data=data) def upload(self, plugin_path, @@ -307,18 +305,21 @@ def upload(self, progress_callback=progress_callback, client=self.api) - response = self.api.post( + return self.api.post( '/{self._uri_prefix}'.format(self=self), params=query_params, data=data, timeout=timeout, - expected_status_code=201 + expected_status_code=201, + wrapper=self._wrap_plugins_list ) + + def _wrap_plugins_list(self, response): if 'metadata' in response and 'items' in response: # This is a list of plugins - for caravan - return self._wrap_list(response) + return ListResponse.of(Plugin)(response) else: - return self._wrapper_cls(response) + return Plugin(response) def download(self, plugin_id, output_file, progress_callback=None): """Downloads a previously uploaded plugin archive from the manager @@ -330,6 +331,7 @@ def download(self, plugin_id, output_file, progress_callback=None): :return: The file path of the downloaded plugin. """ uri = '/plugins/{0}/archive'.format(plugin_id) + # TODO this is not async-friendly with contextlib.closing(self.api.get(uri, stream=True)) as response: output_file = bytes_stream_utils.write_response_stream_to_file( response, output_file, progress_callback=progress_callback) @@ -346,6 +348,7 @@ def download_yaml(self, plugin_id, output_file, progress_callback=None): :return: The file path of the downloaded plugin yaml. """ uri = '/plugins/{0}/yaml'.format(plugin_id) + # TODO this is not async-friendly with contextlib.closing(self.api.get(uri, stream=True)) as response: output_file = bytes_stream_utils.write_response_stream_to_file( response, output_file, progress_callback=progress_callback) @@ -363,6 +366,7 @@ def get_yaml(self, plugin_id, dsl_version=None, progress_callback=None): """ params = {'dsl_version': dsl_version} if dsl_version else {} uri = '/plugins/{0}/yaml'.format(plugin_id) + # TODO this is not async-friendly with tempfile.TemporaryDirectory() as tmpdir: output_file = os.path.join(tmpdir, 'plugin.yaml') response = self.api.get(uri, params=params, stream=True) @@ -404,7 +408,7 @@ def set_global(self, plugin_id): data = {'visibility': VisibilityState.GLOBAL} return self.api.patch( '/plugins/{0}/set-visibility'.format(plugin_id), - data=data + data=data, ) def set_visibility(self, plugin_id, visibility): @@ -419,7 +423,7 @@ def set_visibility(self, plugin_id, visibility): data = {'visibility': visibility} return self.api.patch( '/plugins/{0}/set-visibility'.format(plugin_id), - data=data + data=data, ) def install(self, plugin_id, managers=None, agents=None): @@ -439,8 +443,11 @@ def install(self, plugin_id, managers=None, agents=None): data['managers'] = managers if agents: data['agents'] = agents - response = self.api.post('/plugins/{0}'.format(plugin_id), data=data) - return Plugin(response) + return self.api.post( + '/plugins/{0}'.format(plugin_id), + data=data, + wrapper=Plugin, + ) def set_state(self, plugin_id, state, agent_name=None, manager_name=None, error=None): @@ -460,16 +467,23 @@ def set_state(self, plugin_id, state, agent_name=None, data['manager'] = manager_name if error: data['error'] = error - response = self.api.put('/plugins/{0}'.format(plugin_id), data=data) - return Plugin(response) + return self.api.put( + '/plugins/{0}'.format(plugin_id), + data=data, + wrapper=Plugin, + ) def set_owner(self, plugin_id, creator): """Change ownership of the plugin.""" - response = self.api.patch('/plugins/{0}'.format(plugin_id), - data={'creator': creator}) - return Plugin(response) + return self.api.patch( + '/plugins/{0}'.format(plugin_id), + data={'creator': creator}, + wrapper=Plugin, + ) def update(self, plugin_id, **kwargs): - response = self.api.patch('/plugins/{0}'.format(plugin_id), - data=kwargs) - return Plugin(response) + return self.api.patch( + '/plugins/{0}'.format(plugin_id), + data=kwargs, + wrapper=Plugin, + ) diff --git a/cloudify_rest_client/plugins_update.py b/cloudify_rest_client/plugins_update.py index fa280373b..4f6871c27 100644 --- a/cloudify_rest_client/plugins_update.py +++ b/cloudify_rest_client/plugins_update.py @@ -57,7 +57,6 @@ class PluginsUpdateClient(object): def __init__(self, api): self.api = api self._uri_prefix = 'plugins-updates' - self._wrapper_cls = PluginsUpdate def get(self, plugins_update_id, _include=None, **kwargs): """ @@ -70,13 +69,11 @@ def get(self, plugins_update_id, _include=None, **kwargs): assert plugins_update_id uri = '/{self._uri_prefix}/{id}'.format( self=self, id=plugins_update_id) - response = self.api.get(uri, _include=_include, params=kwargs) - return self._wrapper_cls(response) - - def _wrap_list(self, response): - return ListResponse( - [self._wrapper_cls(item) for item in response['items']], - response['metadata'] + return self.api.get( + uri, + _include=_include, + params=kwargs, + wrapper=PluginsUpdate, ) def list(self, _include=None, sort=None, is_descending=False, **kwargs): @@ -94,10 +91,12 @@ def list(self, _include=None, sort=None, is_descending=False, **kwargs): if sort: params['_sort'] = '-' + sort if is_descending else sort - response = self.api.get('/{self._uri_prefix}'.format(self=self), - _include=_include, - params=params) - return self._wrap_list(response) + return self.api.get( + '/{self._uri_prefix}'.format(self=self), + _include=_include, + params=params, + wrapper=ListResponse.of(PluginsUpdate), + ) def inject(self, blueprint_id, force=False, created_by=None, created_at=None, @@ -105,7 +104,7 @@ def inject(self, blueprint_id, force=False, update_id=None, affected_deployments=None, deployments_per_tenants=None, all_tenants=None, temp_blueprint_id=None): - return PluginsUpdate(self.api.post( + return self.api.post( '/{self._uri_prefix}/{}/update/initiate'.format(blueprint_id, self=self), data=_data_from_kwargs( @@ -120,7 +119,8 @@ def inject(self, blueprint_id, force=False, deployments_per_tenants=deployments_per_tenants, temp_blueprint_id=temp_blueprint_id, ), - )) + wrapper=PluginsUpdate, + ) def update_plugins(self, blueprint_id, force=False, plugin_names=None, to_latest=None, all_to_latest=True, @@ -162,7 +162,7 @@ def update_plugins(self, blueprint_id, force=False, plugin_names=None, RuntimeWarning) else: mapping = {} - response = self.api.post( + return self.api.post( '/{self._uri_prefix}/{}/update/initiate'.format(blueprint_id, self=self), data=_data_from_kwargs( @@ -176,9 +176,9 @@ def update_plugins(self, blueprint_id, force=False, plugin_names=None, auto_correct_types=auto_correct_types, reevaluate_active_statuses=reevaluate_active_statuses, all_tenants=all_tenants - ) + ), + wrapper=PluginsUpdate, ) - return PluginsUpdate(response) def finalize_plugins_update(self, plugins_update_id): """ @@ -186,11 +186,13 @@ def finalize_plugins_update(self, plugins_update_id): :return: a PluginUpdate object. """ - response = self.api.post( - '/{self._uri_prefix}/{}/update/finalize'.format(plugins_update_id, - self=self) + return self.api.post( + '/{self._uri_prefix}/{}/update/finalize'.format( + plugins_update_id, + self=self, + ), + wrapper=PluginsUpdate, ) - return PluginsUpdate(response) def _data_from_kwargs(**kwargs): diff --git a/cloudify_rest_client/secrets.py b/cloudify_rest_client/secrets.py index ba965452e..25c43e6f9 100644 --- a/cloudify_rest_client/secrets.py +++ b/cloudify_rest_client/secrets.py @@ -77,7 +77,6 @@ def provider_name(self): class SecretsClient(object): - def __init__(self, api): self.api = api @@ -125,8 +124,11 @@ def create(self, if provider: data['provider'] = provider - response = self.api.put('/secrets/{0}'.format(key), data=data) - return Secret(response) + return self.api.put( + '/secrets/{0}'.format(key), + data=data, + wrapper=Secret, + ) def update( self, @@ -144,12 +146,14 @@ def update( 'provider': provider, }) data = dict((k, v) for k, v in kwargs.items() if v is not None) - response = self.api.patch('/secrets/{0}'.format(key), data=data) - return Secret(response) + return self.api.patch( + '/secrets/{0}'.format(key), + data=data, + wrapper=Secret, + ) def get(self, key): - response = self.api.get('/secrets/{0}'.format(key)) - return Secret(response) + return self.api.get('/secrets/{0}'.format(key), wrapper=Secret) def export(self, _include=None, **kwargs): """ @@ -159,9 +163,11 @@ def export(self, _include=None, **kwargs): :return: Secrets' list """ params = kwargs - response = self.api.get('/secrets/share/export', params=params, - _include=_include) - return response + return self.api.get( + '/secrets/share/export', + params=params, + _include=_include, + ) def import_secrets(self, secrets_list, tenant_map=None, passphrase=None, override_collisions=False): @@ -183,8 +189,10 @@ def import_secrets(self, secrets_list, tenant_map=None, 'override_collisions': override_collisions } data = dict((k, v) for k, v in data.items() if v is not None) - response = self.api.post('/secrets/share/import', data=data) - return response + return self.api.post( + '/secrets/share/import', + data=data, + ) def list(self, sort=None, is_descending=False, filter_rules=None, constraints=None, **kwargs): @@ -211,19 +219,28 @@ def list(self, sort=None, is_descending=False, params['_sort'] = '-' + sort if is_descending else sort if filter_rules: - response = self.api.post('/searches/secrets', params=params, - data={'filter_rules': filter_rules}) + return self.api.post( + '/searches/secrets', + params=params, + data={'filter_rules': filter_rules}, + wrapper=ListResponse.of(Secret), + ) elif constraints: - response = self.api.post('/searches/secrets', params=params, - data={'constraints': constraints}) + return self.api.post( + '/searches/secrets', + params=params, + data={'constraints': constraints}, + wrapper=ListResponse.of(Secret), + ) else: - response = self.api.get('/secrets', params=params) - - return ListResponse([Secret(item) for item in response['items']], - response['metadata']) + return self.api.get( + '/secrets', + params=params, + wrapper=ListResponse.of(Secret), + ) def delete(self, key): - self.api.delete('/secrets/{0}'.format(key)) + return self.api.delete('/secrets/{0}'.format(key)) def set_global(self, key): """ diff --git a/cloudify_rest_client/secrets_providers.py b/cloudify_rest_client/secrets_providers.py index a3f0c2dca..8e45b9de4 100644 --- a/cloudify_rest_client/secrets_providers.py +++ b/cloudify_rest_client/secrets_providers.py @@ -69,9 +69,10 @@ def __init__(self, api): self.api = api def get(self, name): - response = self.api.get(f'/secrets-providers/{name}') - - return SecretsProvider(response) + return self.api.get( + f'/secrets-providers/{name}', + wrapper=SecretsProvider, + ) def create( self, @@ -104,9 +105,11 @@ def create( if connection_parameters: data['connection_parameters'] = connection_parameters - response = self.api.put('/secrets-providers', data=data) - - return SecretsProvider(response) + return self.api.put( + '/secrets-providers', + data=data, + wrapper=SecretsProvider, + ) def update( self, @@ -138,15 +141,17 @@ def update( } data = dict((k, v) for k, v in data.items() if v is not None) - response = self.api.patch(f'/secrets-providers/{name}', data=data) - - return SecretsProvider(response) + return self.api.patch( + f'/secrets-providers/{name}', + data=data, + wrapper=SecretsProvider, + ) def delete(self, name): """ Delete a Secrets Provider. """ - self.api.delete(f'/secrets-providers/{name}') + return self.api.delete(f'/secrets-providers/{name}') def list(self): """ @@ -154,11 +159,9 @@ def list(self): :return: Secrets Parameters list. """ - response = self.api.get('/secrets-providers') - - return ListResponse( - [SecretsProvider(item) for item in response['items']], - response['metadata'], + return self.api.get( + '/secrets-providers', + wrapper=ListResponse.of(SecretsProvider), ) def check( @@ -190,6 +193,4 @@ def check( if connection_parameters: data['connection_parameters'] = connection_parameters - response = self.api.put('/secrets-providers', data=data) - - return response + return self.api.put('/secrets-providers', data=data) diff --git a/cloudify_rest_client/sites.py b/cloudify_rest_client/sites.py index e3edc8985..580592043 100644 --- a/cloudify_rest_client/sites.py +++ b/cloudify_rest_client/sites.py @@ -57,7 +57,6 @@ class SitesClient(object): def __init__(self, api): self.api = api self._uri_prefix = 'sites' - self._wrapper_cls = Site def create(self, name, location=None, visibility=VisibilityState.TENANT, created_by=None, created_at=None): @@ -79,11 +78,11 @@ def create(self, name, location=None, visibility=VisibilityState.TENANT, data['created_by'] = created_by if created_at: data['created_at'] = created_at - response = self.api.put( + return self.api.put( '/{self._uri_prefix}/{name}'.format(self=self, name=name), - data=data + data=data, + wrapper=Site, ) - return self._wrapper_cls(response) def update(self, name, location=None, visibility=VisibilityState.TENANT, new_name=None): @@ -104,11 +103,11 @@ def update(self, name, location=None, visibility=VisibilityState.TENANT, } # Remove the keys with value None data = dict((k, v) for k, v in data.items() if v is not None) - response = self.api.post( + return self.api.post( '/{self._uri_prefix}/{name}'.format(self=self, name=name), - data=data + data=data, + wrapper=Site, ) - return self._wrapper_cls(response) def get(self, name): """ @@ -117,10 +116,10 @@ def get(self, name): :param name: The name of the site :return: The details of the site """ - response = self.api.get( - '/{self._uri_prefix}/{name}'.format(self=self, name=name) + return self.api.get( + '/{self._uri_prefix}/{name}'.format(self=self, name=name), + wrapper=Site, ) - return self._wrapper_cls(response) def list(self, _include=None, sort=None, is_descending=False, **kwargs): """ @@ -137,12 +136,11 @@ def list(self, _include=None, sort=None, is_descending=False, **kwargs): if sort: kwargs['_sort'] = '-' + sort if is_descending else sort - response = self.api.get('/{self._uri_prefix}'.format(self=self), - _include=_include, - params=kwargs) - return ListResponse( - [self._wrapper_cls(item) for item in response['items']], - response['metadata'] + return self.api.get( + '/{self._uri_prefix}'.format(self=self), + _include=_include, + params=kwargs, + wrapper=ListResponse.of(Site), ) def delete(self, name): @@ -152,6 +150,6 @@ def delete(self, name): :param name: The name of the site to be deleted. :return: Deleted site. """ - self.api.delete( + return self.api.delete( '/{self._uri_prefix}/{name}'.format(self=self, name=name) ) diff --git a/cloudify_rest_client/snapshots.py b/cloudify_rest_client/snapshots.py index 602a2b3e7..8b0685c23 100644 --- a/cloudify_rest_client/snapshots.py +++ b/cloudify_rest_client/snapshots.py @@ -85,8 +85,7 @@ def get(self, snapshot_id, _include=None): """ assert snapshot_id uri = '/snapshots/{0}'.format(snapshot_id) - response = self.api.get(uri, _include=_include) - return Snapshot(response) + return self.api.get(uri, _include=_include, wrapper=Snapshot) def list(self, _include=None, sort=None, is_descending=False, **kwargs): """ @@ -103,9 +102,12 @@ def list(self, _include=None, sort=None, is_descending=False, **kwargs): if sort: params['_sort'] = '-' + sort if is_descending else sort - response = self.api.get('/snapshots', params=params, _include=_include) - return ListResponse([Snapshot(item) for item in response['items']], - response['metadata']) + return self.api.get( + '/snapshots', + params=params, + _include=_include, + wrapper=ListResponse.of(Snapshot), + ) def create(self, snapshot_id, @@ -131,8 +133,12 @@ def create(self, 'queue': queue, 'tempdir_path': tempdir_path, } - response = self.api.put(uri, data=params, expected_status_code=201) - return Execution(response) + return self.api.put( + uri, + data=params, + expected_status_code=201, + wrapper=Execution, + ) def delete(self, snapshot_id): """ @@ -142,7 +148,7 @@ def delete(self, snapshot_id): :return: Deleted snapshot. """ assert snapshot_id - self.api.delete('/snapshots/{0}'.format(snapshot_id)) + return self.api.delete('/snapshots/{0}'.format(snapshot_id)) def restore(self, snapshot_id, @@ -171,8 +177,7 @@ def restore(self, 'ignore_plugin_failure': ignore_plugin_failure } - response = self.api.post(uri, data=params) - return Execution(response) + return self.api.post(uri, data=params, wrapper=Execution) def upload(self, snapshot_path, @@ -206,9 +211,13 @@ def upload(self, progress_callback=progress_callback, client=self.api) - response = self.api.put(uri, params=query_params, data=data, - expected_status_code=201) - return Snapshot(response) + return self.api.put( + uri, + params=query_params, + data=data, + expected_status_code=201, + wrapper=Snapshot, + ) def download(self, snapshot_id, output_file, progress_callback=None): """ @@ -223,6 +232,7 @@ def download(self, snapshot_id, output_file, progress_callback=None): """ uri = '/snapshots/{0}/archive'.format(snapshot_id) + # TODO this is not async-friendly with contextlib.closing(self.api.get(uri, stream=True)) as response: output_file = bytes_stream_utils.write_response_stream_to_file( response, output_file, progress_callback=progress_callback) @@ -241,7 +251,7 @@ def update_status(self, snapshot_id, status, error=None): params = {'status': status} if error: params['error'] = error - self.api.patch(uri, data=params) + return self.api.patch(uri, data=params) def get_status(self): """ diff --git a/cloudify_rest_client/summary.py b/cloudify_rest_client/summary.py index faaeeea77..88ea7424a 100644 --- a/cloudify_rest_client/summary.py +++ b/cloudify_rest_client/summary.py @@ -27,11 +27,11 @@ def get(self, _target_field, _sub_field=None, **kwargs): '_sub_field': _sub_field, } params.update(kwargs) - response = self.api.get( + return self.api.get( '/summary/{summary_type}'.format(summary_type=self.summary_type), params=params, + wrapper=ListResponse.of(dict), ) - return ListResponse(response['items'], response['metadata']) class SummariesClient(object): diff --git a/cloudify_rest_client/tenants.py b/cloudify_rest_client/tenants.py index 3f0893428..4f145df8d 100644 --- a/cloudify_rest_client/tenants.py +++ b/cloudify_rest_client/tenants.py @@ -103,19 +103,20 @@ def list(self, _include=None, sort=None, is_descending=False, **kwargs): if sort: params['_sort'] = '-' + sort if is_descending else sort - response = self.api.get('/tenants', - _include=_include, - params=params) - return ListResponse([Tenant(item) for item in response['items']], - response['metadata']) + return self.api.get( + '/tenants', + _include=_include, + params=params, + wrapper=ListResponse.of(Tenant), + ) def create(self, tenant_name, rabbitmq_password=''): - response = self.api.post( + return self.api.post( '/tenants/{0}'.format(tenant_name), expected_status_code=201, data={'rabbitmq_password': rabbitmq_password}, + wrapper=Tenant, ) - return Tenant(response) def add_user(self, username, tenant_name, role): """Add user to a tenant. @@ -130,8 +131,7 @@ def add_user(self, username, tenant_name, role): 'tenant_name': tenant_name, 'role': role, } - response = self.api.put('/tenants/users', data=data) - return Tenant(response) + return self.api.put('/tenants/users', data=data, wrapper=Tenant) def update_user(self, username, tenant_name, role): """Update user in a tenant. @@ -149,8 +149,7 @@ def update_user(self, username, tenant_name, role): 'tenant_name': tenant_name, 'role': role, } - response = self.api.patch('/tenants/users', data=data) - return Tenant(response) + return self.api.patch('/tenants/users', data=data, wrapper=Tenant) def remove_user(self, username, tenant_name): data = {'username': username, 'tenant_name': tenant_name} @@ -169,8 +168,7 @@ def add_user_group(self, group_name, tenant_name, role): 'tenant_name': tenant_name, 'role': role, } - response = self.api.put('/tenants/user-groups', data=data) - return Tenant(response) + return self.api.put('/tenants/user-groups', data=data, wrapper=Tenant) def update_user_group(self, group_name, tenant_name, role): """Update user group in a tenant. @@ -188,8 +186,11 @@ def update_user_group(self, group_name, tenant_name, role): 'tenant_name': tenant_name, 'role': role, } - response = self.api.patch('/tenants/user-groups', data=data) - return Tenant(response) + return self.api.patch( + '/tenants/user-groups', + data=data, + wrapper=Tenant, + ) def remove_user_group(self, group_name, tenant_name): """Remove user group from tenant. @@ -201,14 +202,14 @@ def remove_user_group(self, group_name, tenant_name): """ data = {'group_name': group_name, 'tenant_name': tenant_name} - self.api.delete('/tenants/user-groups', data=data) + return self.api.delete('/tenants/user-groups', data=data) def get(self, tenant_name, **kwargs): - response = self.api.get( + return self.api.get( '/tenants/{0}'.format(tenant_name), - params=kwargs + params=kwargs, + wrapper=Tenant, ) - return Tenant(response) def delete(self, tenant_name): - self.api.delete('/tenants/{0}'.format(tenant_name)) + return self.api.delete('/tenants/{0}'.format(tenant_name)) diff --git a/cloudify_rest_client/tests/__init__.py b/cloudify_rest_client/tests/__init__.py index c03c6a1f3..b17a3a525 100644 --- a/cloudify_rest_client/tests/__init__.py +++ b/cloudify_rest_client/tests/__init__.py @@ -7,7 +7,14 @@ class MockHTTPClient(CloudifyClient.client_class): def __init__(self, *args, **kwargs): super(MockHTTPClient, self).__init__(*args, **kwargs) - self._do_request = mock.Mock() + self.do_request = mock.Mock(side_effect=self._fake_do_request) + + def _fake_do_request(self, *args, **kwargs): + data = self.do_request.return_value or {} + wrapper = kwargs.get('wrapper') + if wrapper: + return wrapper(data) + return data class MockClient(CloudifyClient): @@ -24,7 +31,7 @@ def __init__(self, **kwargs): @property def mock_do_request(self): - return self._client._do_request + return self._client.do_request def assert_last_mock_call(self, endpoint, data=None, params=None, expected_status_code=200, stream=False, @@ -32,17 +39,16 @@ def assert_last_mock_call(self, endpoint, data=None, params=None, if not params: params = {} - _, kwargs = self.mock_do_request.call_args_list[-1] + args, kwargs = self.mock_do_request.call_args_list[-1] - called_endpoint = kwargs['request_url'].rpartition('v3.1')[2] + method, called_endpoint = args assert endpoint == called_endpoint - assert data == kwargs['body'] + assert data == kwargs['data'] assert params == kwargs['params'] assert expected_status_code == kwargs['expected_status_code'] assert stream == kwargs['stream'] - - assert expected_method == kwargs['requests_method'].__name__ + assert expected_method == method.lower() @property def last_mock_call_headers(self): diff --git a/cloudify_rest_client/tests/test_tokens.py b/cloudify_rest_client/tests/test_tokens.py index c9b0beb82..1ad14d7c7 100644 --- a/cloudify_rest_client/tests/test_tokens.py +++ b/cloudify_rest_client/tests/test_tokens.py @@ -20,7 +20,7 @@ def test_token_create(): result = client.tokens.create() client.assert_last_mock_call(endpoint='/tokens', - data='{}', + data={}, expected_method='post') assert isinstance(result, Token) diff --git a/cloudify_rest_client/tokens.py b/cloudify_rest_client/tokens.py index 3be5c1148..6a80da13f 100644 --- a/cloudify_rest_client/tokens.py +++ b/cloudify_rest_client/tokens.py @@ -58,11 +58,10 @@ def list(self, **kwargs): :param kwargs: Optional fields or filter arguments as defined in the restservice. """ - response = self.api.get('/tokens', params=kwargs) - - return ListResponse( - [Token(item) for item in response['items']], - response['metadata'] + return self.api.get( + '/tokens', + params=kwargs, + wrapper=ListResponse.of(Token), ) def get(self, token_id): @@ -71,11 +70,11 @@ def get(self, token_id): :return: Token """ - return Token(self.api.get('/tokens/{}'.format(token_id))) + return self.api.get('/tokens/{}'.format(token_id), wrapper=Token) def delete(self, token_id): """Delete an existing token, revoking its access.""" - self.api.delete('/tokens/{}'.format(token_id)) + return self.api.delete('/tokens/{}'.format(token_id)) def create(self, description=None, expiration=None): """Create a new authentication token for the current user. @@ -94,7 +93,8 @@ def create(self, description=None, expiration=None): if expiration: parse_utc_datetime(expiration) data['expiration_date'] = expiration - return Token(self.api.post( + return self.api.post( '/tokens', data=data, - )) + wrapper=Token, + ) diff --git a/cloudify_rest_client/user_groups.py b/cloudify_rest_client/user_groups.py index e70780fe4..ae3beff73 100644 --- a/cloudify_rest_client/user_groups.py +++ b/cloudify_rest_client/user_groups.py @@ -78,11 +78,12 @@ def list(self, _include=None, sort=None, is_descending=False, **kwargs): if sort: params['_sort'] = '-' + sort if is_descending else sort - response = self.api.get('/user-groups', - _include=_include, - params=params) - return ListResponse([Group(item) for item in response['items']], - response['metadata']) + return self.api.get( + '/user-groups', + _include=_include, + params=params, + wrapper=ListResponse.of(Group), + ) def create(self, group_name, role, ldap_group_dn=None): data = { @@ -90,32 +91,39 @@ def create(self, group_name, role, ldap_group_dn=None): 'ldap_group_dn': ldap_group_dn, 'role': role } - response = self.api.post('/user-groups', - data=data, - expected_status_code=201) - return Group(response) + return self.api.post( + '/user-groups', + data=data, + expected_status_code=201, + wrapper=Group, + ) def get(self, group_name, **kwargs): - response = self.api.get( + return self.api.get( '/user-groups/{0}'.format(group_name), - params=kwargs + params=kwargs, + wrapper=Group, ) - return Group(response) def delete(self, group_name): - self.api.delete('/user-groups/{0}'.format(group_name)) + return self.api.delete('/user-groups/{0}'.format(group_name)) def set_role(self, group_name, new_role): data = {'role': new_role} - response = self.api.post('/user-groups/{0}'.format(group_name), - data=data) - return Group(response) + return self.api.post( + '/user-groups/{0}'.format(group_name), + data=data, + wrapper=Group, + ) def add_user(self, username, group_name): data = {'username': username, 'group_name': group_name} - response = self.api.put('/user-groups/users', data=data) - return Group(response) + return self.api.put( + '/user-groups/users', + data=data, + wrapper=Group, + ) def remove_user(self, username, group_name): data = {'username': username, 'group_name': group_name} - self.api.delete('/user-groups/users', data=data) + return self.api.delete('/user-groups/users', data=data) diff --git a/cloudify_rest_client/users.py b/cloudify_rest_client/users.py index fed8067f3..66988741a 100644 --- a/cloudify_rest_client/users.py +++ b/cloudify_rest_client/users.py @@ -113,11 +113,12 @@ def list(self, _include=None, sort=None, is_descending=False, **kwargs): if sort: params['_sort'] = '-' + sort if is_descending else sort - response = self.api.get('/users', - _include=_include, - params=params) - return ListResponse([User(item) for item in response['items']], - response['metadata']) + return self.api.get( + '/users', + _include=_include, + params=params, + wrapper=ListResponse.of(User), + ) def create(self, username, password, role, is_prehashed=None, created_at=None, first_login_at=None, last_login_at=None): @@ -130,55 +131,67 @@ def create(self, username, password, role, is_prehashed=None, data['first_login_at'] = first_login_at if last_login_at: data['last_login_at'] = last_login_at - response = self.api.put('/users', data=data, expected_status_code=201) - return User(response) + return self.api.put( + '/users', + data=data, + expected_status_code=201, + wrapper=User, + ) def set_password(self, username, new_password): data = {'password': new_password} - response = self.api.post('/users/{0}'.format(username), data=data) - return User(response) + return self.api.post( + '/users/{0}'.format(username), + data=data, + wrapper=User, + ) def set_role(self, username, new_role): data = {'role': new_role} - response = self.api.post('/users/{0}'.format(username), data=data) - return User(response) + return self.api.post( + '/users/{0}'.format(username), + data=data, + wrapper=User, + ) def set_show_getting_started(self, username, flag_value): data = {'show_getting_started': flag_value} - response = self.api.post('/users/{0}'.format(username), data=data) - return User(response) + return self.api.post( + '/users/{0}'.format(username), + data=data, + wrapper=User, + ) def get(self, username, **kwargs): - response = self.api.get( + return self.api.get( '/users/{0}'.format(username), - params=kwargs + params=kwargs, + wrapper=User, ) - return User(response) def get_self(self, **kwargs): - response = self.api.get('/user', params=kwargs) - return User(response) + return self.api.get('/user', params=kwargs, wrapper=User) def delete(self, username): - self.api.delete('/users/{0}'.format(username)) + return self.api.delete('/users/{0}'.format(username)) def activate(self, username): - response = self.api.post( + return self.api.post( '/users/active/{0}'.format(username), - data={'action': 'activate'} + data={'action': 'activate'}, + wrapper=User, ) - return User(response) def deactivate(self, username): - response = self.api.post( + return self.api.post( '/users/active/{0}'.format(username), - data={'action': 'deactivate'} + data={'action': 'deactivate'}, + wrapper=User, ) - return User(response) def unlock(self, username, **kwargs): - response = self.api.post( + return self.api.post( '/users/unlock/{0}'.format(username), - params=kwargs + params=kwargs, + wrapper=User, ) - return User(response) diff --git a/cloudify_rest_client/workflows.py b/cloudify_rest_client/workflows.py index b7414063d..2df9ceae6 100644 --- a/cloudify_rest_client/workflows.py +++ b/cloudify_rest_client/workflows.py @@ -58,11 +58,15 @@ def list(self, filter_id=None, filter_rules=None, **kwargs): params['_filter_id'] = filter_id if filter_rules: - response = self.api.post('/searches/workflows', params=params, - data={'filter_rules': filter_rules}) + return self.api.post( + '/searches/workflows', + params=params, + data={'filter_rules': filter_rules}, + wrapper=ListResponse.of(Workflow), + ) else: - response = self.api.get('/workflows', params=params) - - return ListResponse( - [Workflow(item) for item in response['items']], - response['metadata']) + return self.api.get( + '/workflows', + params=params, + wrapper=ListResponse.of(Workflow), + ) From 605289a961f720849de93698b1cbe8876739c1dc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Maksymczuk?= Date: Tue, 13 Dec 2022 10:49:10 +0100 Subject: [PATCH 5/5] Remove now-unused cluster stuff Now the regular client just handles clustering. --- cloudify/cluster.py | 106 -------------------------------------------- cloudify/manager.py | 14 ++++-- 2 files changed, 10 insertions(+), 110 deletions(-) delete mode 100644 cloudify/cluster.py diff --git a/cloudify/cluster.py b/cloudify/cluster.py deleted file mode 100644 index 200f84091..000000000 --- a/cloudify/cluster.py +++ /dev/null @@ -1,106 +0,0 @@ -######## -# Copyright (c) 2017-2019 Cloudify Platform Ltd. All rights reserved -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# * See the License for the specific language governing permissions and -# * limitations under the License. - -import re -import types -import random -import requests -import itertools - -from cloudify.utils import ipv6_url_compat - -from cloudify_rest_client import CloudifyClient -from cloudify_rest_client.client import HTTPClient -from cloudify_rest_client.exceptions import CloudifyClientError - - -class ClusterHTTPClient(HTTPClient): - - def __init__(self, host, *args, **kwargs): - # from outside, we get host passed in as a list (optionally). - # But we still need self.host to be the currently-used manager, - # and we can store the list as self.hosts - # (copy the list so that outside mutations don't affect us) - hosts = list(host) if isinstance(host, list) else [host] - hosts = [ipv6_url_compat(h) for h in hosts] - random.shuffle(hosts) - self.hosts = itertools.cycle(hosts) - super(ClusterHTTPClient, self).__init__(hosts[0], *args, **kwargs) - self.default_timeout_sec = self.default_timeout_sec or (5, None) - self.retries = 30 - self.retry_interval = 3 - - def do_request(self, method, url, *args, **kwargs): - kwargs.setdefault('timeout', self.default_timeout_sec) - - copied_data = None - if isinstance(kwargs.get('data'), types.GeneratorType): - copied_data = itertools.tee(kwargs.pop('data'), self.retries) - - errors = {} - for retry in range(self.retries): - manager_to_try = next(self.hosts) - self.host = manager_to_try - if copied_data is not None: - kwargs['data'] = copied_data[retry] - - try: - return super(ClusterHTTPClient, self).do_request( - method, url, *args, **kwargs) - except (requests.exceptions.ConnectionError) as error: - self.logger.debug( - 'Connection error when trying to connect to ' - 'manager {0}'.format(error) - ) - errors[manager_to_try] = error - continue - except CloudifyClientError as e: - errors[manager_to_try] = e.status_code - if e.response.status_code == 502: - continue - if e.response.status_code == 404 and \ - self._is_fileserver_download(e.response): - continue - else: - raise - - raise CloudifyClientError( - 'HTTP Client error: {0} {1} ({2})'.format( - method.__name__.upper(), - url, - ', '.join( - '{0}: {1}'.format(host, e) for host, e in errors.items() - ) - )) - - def _is_fileserver_download(self, response): - """Is this response a file-download response? - - 404 responses to requests that download files, need to be retried - with all managers in the cluster: if some file was not yet - replicated, another manager might have this file. - - This is because the file replication is asynchronous. - """ - if re.search('/(blueprints|snapshots)/', response.url): - return True - disposition = response.headers.get('Content-Disposition') - if not disposition: - return False - return disposition.strip().startswith('attachment') - - -class CloudifyClusterClient(CloudifyClient): - client_class = ClusterHTTPClient diff --git a/cloudify/manager.py b/cloudify/manager.py index bf158354f..7d64929ca 100644 --- a/cloudify/manager.py +++ b/cloudify/manager.py @@ -8,7 +8,8 @@ from cloudify.state import ctx, workflow_ctx, NotInContext from cloudify.exceptions import (HttpException, NonRecoverableError) -from cloudify.cluster import CloudifyClusterClient +from cloudify_rest_client.client import CloudifyClient +from cloudify_async_client.client import AsyncCloudifyClient class NodeInstance(object): @@ -123,7 +124,7 @@ def system_properties(self): return self._system_properties -def get_rest_client(tenant=None, api_token=None): +def get_rest_client(tenant=None, api_token=None, async_client=False): """ :param tenant: optional tenant name to connect as :param api_token: optional api_token to authenticate with (instead of @@ -150,7 +151,11 @@ def get_rest_client(tenant=None, api_token=None): else: token = utils.get_rest_token() - return CloudifyClusterClient( + client_cls = CloudifyClient + if async_client: + client_cls = AsyncCloudifyClient + + return client_cls( headers=headers, host=utils.get_manager_rest_service_host(), port=utils.get_manager_rest_service_port(), @@ -159,7 +164,8 @@ def get_rest_client(tenant=None, api_token=None): protocol=utils.get_manager_rest_service_protocol(), cert=utils.get_local_rest_certificate(), kerberos_env=utils.get_kerberos_indication( - os.environ.get(constants.KERBEROS_ENV_KEY)) + os.environ.get(constants.KERBEROS_ENV_KEY)), + retries=30, )