Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use cluster-handling CloudifyClient #1459

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions cloudify_cli/async_commands/audit_log.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import asyncio
import json

from cloudify_cli.exceptions import CloudifyCliError
from cloudify_cli.logger import get_global_json_output


Expand All @@ -27,8 +26,6 @@ async def _stream_logs(creator_name,
timeout,
logger,
client):
if not hasattr(client.auditlog, 'stream'):
raise CloudifyCliError('Streaming requires Python>=3.6.')
logger.info('Streaming audit log entries...')
response = await client.auditlog.stream(timeout=timeout,
creator_name=creator_name,
Expand Down
71 changes: 40 additions & 31 deletions cloudify_cli/commands/audit_log.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import click

from cloudify_cli import env
from cloudify_cli.cli import cfy, helptexts
from cloudify_cli.exceptions import CloudifyCliError
from cloudify_cli.table import print_data
Expand Down Expand Up @@ -65,18 +66,20 @@ def auditlog():
@cfy.options.common_options
@cfy.pass_logger
@cfy.pass_client()
def list_logs(creator_name,
execution_id,
since,
follow,
timeout,
sort_by,
descending,
pagination_offset,
pagination_size,
logger,
client,
):
def list_logs(
creator_name,
execution_id,
since,
follow,
timeout,
sort_by,
descending,
pagination_offset,
pagination_size,
logger,
client,
):
client = env.get_rest_client(async_client=True)
if follow:
from cloudify_cli.async_commands.audit_log import stream_logs
stream_logs(creator_name,
Expand All @@ -86,29 +89,35 @@ def list_logs(creator_name,
logger,
client)
else:
_list_logs(creator_name,
execution_id,
since,
sort_by,
descending,
pagination_offset,
pagination_size,
logger,
client)
import asyncio
loop = asyncio.get_event_loop()
loop.run_until_complete(_list_logs(
creator_name,
execution_id,
since,
sort_by,
descending,
pagination_offset,
pagination_size,
logger,
client,
))


def _list_logs(creator_name,
execution_id,
since,
sort_by,
descending,
pagination_offset,
pagination_size,
logger,
client):
async def _list_logs(
creator_name,
execution_id,
since,
sort_by,
descending,
pagination_offset,
pagination_size,
logger,
client,
):
"""List audit_log entries"""
logger.info('Listing audit log entries...')
logs = client.auditlog.list(
logs = await client.auditlog.list(
creator_name=creator_name,
execution_id=execution_id,
since=since,
Expand Down
3 changes: 0 additions & 3 deletions cloudify_cli/commands/init.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,6 @@ def init_local_profile(reset_context=False,
if reset_context:
if hard:
os.remove(config.CLOUDIFY_CONFIG_PATH)
# else:
# TODO: Is this check necessary?
# _raise_initialized_error('local')

_create_profiles_dir_and_config(hard, enable_colors)
logger.info('Initialization completed successfully')
Expand Down
195 changes: 70 additions & 125 deletions cloudify_cli/env.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,25 @@
import os
import json
import errno
import types
import shutil
import getpass
import tempfile
import itertools
from base64 import b64encode

import yaml
import requests

from cloudify_rest_client import CloudifyClient
from cloudify_rest_client.client import HTTPClient
from cloudify_rest_client.client import CloudifyClient, HTTPClient
from cloudify.cluster_status import CloudifyNodeType
from cloudify_rest_client.exceptions import CloudifyClientError
from cloudify.utils import ipv6_url_compat

from cloudify_cli import constants
from cloudify_cli.exceptions import CloudifyCliError
try:
from cloudify_async_client.client import AsyncCloudifyClient
except ImportError as e:
AsyncCloudifyClient = None
async_import_error = e
else:
async_import_error = None


_ENV_NAME = 'manager'
Expand Down Expand Up @@ -300,18 +301,21 @@ def is_cluster(client_profile=None):
client_profile.cluster.get(CloudifyNodeType.MANAGER))


def get_rest_client(client_profile=None,
rest_host=None,
rest_port=None,
rest_protocol=None,
rest_cert=None,
username=None,
password=None,
tenant_name=None,
trust_all=False,
cluster=None,
kerberos_env=None,
token=None):
def get_rest_client(
client_profile=None,
rest_host=None,
rest_port=None,
rest_protocol=None,
rest_cert=None,
username=None,
password=None,
tenant_name=None,
trust_all=False,
cluster=None,
kerberos_env=None,
token=None,
async_client=False,
):
if client_profile is None:
client_profile = profile
assert_credentials_set(client_profile)
Expand All @@ -323,8 +327,17 @@ def get_rest_client(client_profile=None,
kerberos_env = kerberos_env \
if kerberos_env is not None else client_profile.kerberos_env

if get_target_manager():
rest_host = get_target_manager()
elif is_cluster(client_profile):
rest_host = [
node.get('host_ip') or node.get('hostname')
for node in client_profile.cluster.get(CloudifyNodeType.MANAGER)
]
rest_host = rest_host or client_profile.manager_ip

kwargs = {
'host': rest_host or client_profile.manager_ip,
'host': rest_host,
'port': rest_port or client_profile.rest_port,
'protocol': rest_protocol or client_profile.rest_protocol,
'cert': rest_cert or get_ssl_cert(client_profile),
Expand All @@ -341,12 +354,14 @@ def get_rest_client(client_profile=None,
kwargs['password'] = password
kwargs['headers'].update(get_auth_header(username, password))

if cluster:
kwargs['profile'] = client_profile
client = CloudifyClusterClient(**kwargs)
else:
client = CloudifyClient(**kwargs)
return client
client_cls = ProfileSavingClusterClient
if async_client:
if AsyncCloudifyClient is None:
raise RuntimeError(
f'Async client not available: {async_import_error}')
client_cls = AsyncCloudifyClient

return client_cls(**kwargs)


def build_manager_host_string(ssh_user='', ip=''):
Expand Down Expand Up @@ -553,118 +568,48 @@ def get_auth_header(username, password):
_TRY_NEXT_NODE = object()


class ClusterHTTPClient(HTTPClient):

class ProfileSavingHTTPClient(HTTPClient):
def __init__(self, *args, **kwargs):
profile = kwargs.pop('profile')
super(ClusterHTTPClient, self).__init__(*args, **kwargs)
if not profile.cluster:
raise ValueError('Cluster client invoked for an empty cluster!')
self._cluster = list(profile.cluster.get(CloudifyNodeType.MANAGER))
self._profile = profile
first_node = self._cluster[0]
self.cert = first_node.get('cert') or self.cert
self.trust_all = first_node.get('trust_all') or self.trust_all
self.default_timeout_sec = self.default_timeout_sec or (5, None)

def do_request(self, *args, **kwargs):
# this request can be retried for each manager - if the data is
# a generator, we need to copy it, so we can send it more than once
copied_data = None
if isinstance(kwargs.get('data'), types.GeneratorType):
copied_data = itertools.tee(kwargs.pop('data'),
len(self._cluster) + 1)

if kwargs.get('timeout') is None:
kwargs['timeout'] = self.default_timeout_sec

if copied_data is not None:
kwargs['data'] = copied_data[-1]

manager_host = get_target_manager()
if manager_host:
self.host = ipv6_url_compat(manager_host)
return super(ClusterHTTPClient, self).do_request(*args, **kwargs)

# First try with the main manager ip given when creating the profile
# with `cfy profiles use`
self.host = ipv6_url_compat(self._profile.manager_ip)
response = self._try_do_request(*args, **kwargs)
if response is not _TRY_NEXT_NODE:
return response

for node_index, node in list(enumerate(
self._profile.cluster[CloudifyNodeType.MANAGER])):
if self._profile.manager_ip in [node['host_ip'], node['hostname']]:
continue
self._use_node(node)
if copied_data is not None:
kwargs['data'] = copied_data[node_index]

response = self._try_do_request(*args, **kwargs)
if response is _TRY_NEXT_NODE:
continue
return response

raise CloudifyClientError('All cluster nodes are offline')

def _try_do_request(self, *args, **kwargs):
try:
return super(ClusterHTTPClient, self).do_request(*args,
**kwargs)
except (requests.exceptions.ConnectionError,
CloudifyClientError) as e:
if isinstance(e, CloudifyClientError) and e.status_code != 502:
raise
self.logger.warning('Could not connect to manager %s on port %s',
self.host, self.port)
self.logger.debug(str(e))
return _TRY_NEXT_NODE
super().__init__(*args, **kwargs)
self._last_tried_host = None

def _use_node(self, node):
if ipv6_url_compat(node['host_ip']) == self.host:
return
self.host = ipv6_url_compat(node['host_ip'])
for attr in ['rest_port', 'rest_protocol', 'trust_all', 'cert']:
new_value = node.get(attr)
if new_value:
setattr(self, attr, new_value)
self._update_profile(node)

def _update_profile(self, node):
def get_host(self):
host = super().get_host()
self._last_tried_host = host
return host

def process_response(self, *args, **kwargs):
if self._last_tried_host is not None:
self._update_profile(self._last_tried_host)
self._last_tried_host = None
return super().process_response(*args, **kwargs)

def _update_profile(self, target_ip):
"""
Put the node at the start of the cluster list in profile.

The client tries nodes in the order of the cluster list, so putting
the node first will make the client try it first next time. This makes
the client always try the last-known-active-manager first.
"""
self._profile.cluster[CloudifyNodeType.MANAGER].remove(node)
self._profile.cluster[CloudifyNodeType.MANAGER] = (
[node] + self._profile.cluster[CloudifyNodeType.MANAGER])
node = None
for cluster_member in profile.cluster[CloudifyNodeType.MANAGER]:
if cluster_member['host_ip'] == target_ip:
node = cluster_member
break
if node is None:
return
profile.cluster[CloudifyNodeType.MANAGER].remove(node)
profile.cluster[CloudifyNodeType.MANAGER] = (
[node] + profile.cluster[CloudifyNodeType.MANAGER])
for node_attr in CLUSTER_NODE_ATTRS:
if node_attr in node:
setattr(self._profile, node_attr, node[node_attr])
self._profile.save()

setattr(profile, node_attr, node[node_attr])
profile.save()

class CloudifyClusterClient(CloudifyClient):
"""
A CloudifyClient that will retry the queries with the current manager.

When a request fails with a connection error, this will keep trying with
every node in the cluster, until it finds an active manager.

When an active manager is found, the profile will be updated with its
address.
"""
def __init__(self, profile, *args, **kwargs):
self._profile = profile
super(CloudifyClusterClient, self).__init__(*args, **kwargs)

class ProfileSavingClusterClient(CloudifyClient):
def client_class(self, *args, **kwargs):
kwargs.setdefault('profile', self._profile)
return ClusterHTTPClient(*args, **kwargs)
return ProfileSavingHTTPClient(*args, **kwargs)


profile = get_profile_context(suppress_error=True)
Expand Down
Loading