From 1243d4589486e818b567c25ebe58dedc7ea6e38a Mon Sep 17 00:00:00 2001 From: David Sutherland Date: Fri, 8 Jul 2022 13:42:18 +1200 Subject: [PATCH 1/4] api info file written --- cylc/uiserver/app.py | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/cylc/uiserver/app.py b/cylc/uiserver/app.py index e0807fac..dc643561 100644 --- a/cylc/uiserver/app.py +++ b/cylc/uiserver/app.py @@ -54,7 +54,9 @@ """ from concurrent.futures import ProcessPoolExecutor +from contextlib import suppress import getpass +import json import os from pathlib import Path, PurePath import sys @@ -109,6 +111,9 @@ INFO_FILES_DIR = Path(USER_CONF_ROOT / "info_files") +API_INFO_FILE = f'{USER_CONF_ROOT / "api_info.json"}' + + class PathType(TraitType): """A pathlib traitlet type which allows string and undefined values.""" @@ -409,6 +414,10 @@ def initialize_settings(self): for key, value in self.config['CylcUIServer'].items() ) ) + # Make API token available to server's user. + # Do it here to avoid overwriting via server start attempt, + # when server already running. + self.write_api_info() # start the async scan task running (do this on server start not init) ioloop.IOLoop.current().add_callback( self.workflows_mgr.run @@ -515,6 +524,22 @@ def set_auth(self): def initialize_templates(self): """Change the jinja templating environment.""" + def write_api_info(self): + api_info = self.serverapp.server_info() + api_token = os.environ.get("JUPYTERHUB_API_TOKEN") + api_url = os.environ.get("JUPYTERHUB_SERVICE_URL") + # Could be none, if server not launched by hub. + if api_token: + api_info['token'] = api_token + if api_url: + api_info['url'] = api_url + Path(API_INFO_FILE).parent.mkdir(parents=True, exist_ok=True) + with suppress(FileNotFoundError): + os.unlink(API_INFO_FILE) + fd = os.open(API_INFO_FILE, os.O_CREAT | os.O_WRONLY, mode=0o600) + os.write(fd, json.dumps(api_info).encode("utf-8")) + os.close(fd) + @classmethod def launch_instance(cls, argv=None, workflow_id=None, **kwargs): if workflow_id: @@ -530,6 +555,9 @@ def launch_instance(cls, argv=None, workflow_id=None, **kwargs): del os.environ["JUPYTER_RUNTIME_DIR"] async def stop_extension(self): + # Remove API token if hub spawned + with suppress(FileNotFoundError): + os.unlink(API_INFO_FILE) # stop the async scan task await self.workflows_mgr.stop() for sub in self.data_store_mgr.w_subs.values(): From 4ae98175a45f2f9d564e0f77066419ffce457230 Mon Sep 17 00:00:00 2001 From: David Sutherland Date: Sat, 10 Dec 2022 00:00:57 +1300 Subject: [PATCH 2/4] add http client --- cylc/uiserver/client.py | 151 ++++++++++++++++++++++++++++++++++++++++ setup.cfg | 1 + 2 files changed, 152 insertions(+) create mode 100644 cylc/uiserver/client.py diff --git a/cylc/uiserver/client.py b/cylc/uiserver/client.py new file mode 100644 index 00000000..6c54e00f --- /dev/null +++ b/cylc/uiserver/client.py @@ -0,0 +1,151 @@ +# Copyright (C) NIWA & British Crown (Met Office) & Contributors. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + + +import json +import os +import requests +from shutil import which +import socket +import sys +from typing import Any, Optional, Union, Dict + +from cylc.flow import LOG +from cylc.flow.exceptions import ClientError, ClientTimeout +from cylc.flow.network import encode_ +from cylc.flow.network.client import WorkflowRuntimeClientBase +from cylc.flow.network.client_factory import CommsMeth + +from cylc.uiserver.app import API_INFO_FILE + + +class WorkflowRuntimeClient(WorkflowRuntimeClientBase): + """Client to UI server communication using HTTP.""" + + DEFAULT_TIMEOUT = 10 # seconds + + def __init__( + self, + workflow: str, + host: Optional[str] = None, + port: Union[int, str, None] = None, + timeout: Union[float, str, None] = None, + ): + self.timeout = timeout + # gather header info post start + self.header = self.get_header() + + async def async_request( + self, + command: str, + args: Optional[Dict[str, Any]] = None, + timeout: Optional[float] = None, + req_meta: Optional[Dict[str, Any]] = None + ) -> object: + """Send an asynchronous request using asyncio. + + Has the same arguments and return values as ``serial_request``. + + """ + if not args: + args = {} + + with open(API_INFO_FILE, "r") as api_file: + api_info = json.loads(api_file.read()) + + # send message + msg: Dict[str, Any] = {'command': command, 'args': args} + msg.update(self.header) + # add the request metadata + if req_meta: + msg['meta'].update(req_meta) + + LOG.debug('http:send %s', msg) + + try: + res = requests.post( + api_info["url"] + 'cylc/graphql', + headers={ + 'Authorization': f'token {api_info["token"]}', + 'meta': encode_(msg.get('meta', {})), + }, + json={ + 'query': args['request_string'], + 'variables': args.get('variables', {}), + }, + timeout=self.timeout + ) + res.raise_for_status() + except requests.ConnectTimeout: + raise ClientTimeout( + 'Timeout waiting for server response.' + ' This could be due to network or server issues.' + ' Check the UI Server log.' + ) + except requests.ConnectionError as exc: + raise ClientError( + 'Unable to connect to UI Server or Hub.', + f'{exc}' + ) + + response = res.json() + LOG.debug('http:recv %s', response) + + try: + return response['data'] + except KeyError: + error = response.get( + 'error', + {'message': f'Received invalid response: {response}'}, + ) + raise ClientError( + error.get('message'), + error.get('traceback'), + ) + + def get_header(self) -> dict: + """Return "header" data to attach to each request for traceability. + + Returns: + dict: dictionary with the header information, such as + program and hostname. + """ + host = socket.gethostname() + if len(sys.argv) > 1: + cmd = sys.argv[1] + else: + cmd = sys.argv[0] + + cylc_executable_location = which("cylc") + if cylc_executable_location: + cylc_bin_dir = os.path.abspath( + os.path.join(cylc_executable_location, os.pardir) + ) + if not cylc_bin_dir.endswith("/"): + cylc_bin_dir = f"{cylc_bin_dir}/" + + if cmd.startswith(cylc_bin_dir): + cmd = cmd.replace(cylc_bin_dir, '') + return { + 'meta': { + 'prog': cmd, + 'host': host, + 'comms_method': + os.getenv( + "CLIENT_COMMS_METH", + default=CommsMeth.HTTP.value + ) + } + } diff --git a/setup.cfg b/setup.cfg index 485d27e7..f2ef67a8 100644 --- a/setup.cfg +++ b/setup.cfg @@ -58,6 +58,7 @@ install_requires = jupyter_server>=1.10.2 tornado>=6.1.0 # matches jupyter_server value traitlets>=5.2.1 # required for logging_config (5.2.0 had bugs) + requests==2.28.* # Transitive dependencies that we directly (lightly) use: pyzmq From 3b5287471b98c3e4c8af9c01ef30af9d23a6b04a Mon Sep 17 00:00:00 2001 From: David Sutherland Date: Fri, 16 Dec 2022 17:42:03 +1300 Subject: [PATCH 3/4] use existing Tornado library --- cylc/uiserver/client.py | 46 +++++++++++++++++++++-------------------- setup.cfg | 1 - 2 files changed, 24 insertions(+), 23 deletions(-) diff --git a/cylc/uiserver/client.py b/cylc/uiserver/client.py index 6c54e00f..a0837f1d 100644 --- a/cylc/uiserver/client.py +++ b/cylc/uiserver/client.py @@ -16,14 +16,18 @@ import json import os -import requests from shutil import which import socket import sys from typing import Any, Optional, Union, Dict +from tornado.httpclient import ( + AsyncHTTPClient, + HTTPRequest, + HTTPClientError +) from cylc.flow import LOG -from cylc.flow.exceptions import ClientError, ClientTimeout +from cylc.flow.exceptions import ClientError from cylc.flow.network import encode_ from cylc.flow.network.client import WorkflowRuntimeClientBase from cylc.flow.network.client_factory import CommsMeth @@ -43,7 +47,7 @@ def __init__( port: Union[int, str, None] = None, timeout: Union[float, str, None] = None, ): - self.timeout = timeout + self.timeout = timeout or self.DEFAULT_TIMEOUT # gather header info post start self.header = self.get_header() @@ -72,36 +76,34 @@ async def async_request( if req_meta: msg['meta'].update(req_meta) - LOG.debug('http:send %s', msg) + LOG.debug('https:send %s', msg) try: - res = requests.post( - api_info["url"] + 'cylc/graphql', + request = HTTPRequest( + url=api_info["url"] + 'cylc/graphql', + method='POST', headers={ 'Authorization': f'token {api_info["token"]}', + 'Content-Type': 'application/json', 'meta': encode_(msg.get('meta', {})), }, - json={ - 'query': args['request_string'], - 'variables': args.get('variables', {}), - }, - timeout=self.timeout - ) - res.raise_for_status() - except requests.ConnectTimeout: - raise ClientTimeout( - 'Timeout waiting for server response.' - ' This could be due to network or server issues.' - ' Check the UI Server log.' + body=json.dumps( + { + 'query': args['request_string'], + 'variables': args.get('variables', {}), + } + ), + request_timeout=float(self.timeout) ) - except requests.ConnectionError as exc: + res = await AsyncHTTPClient().fetch(request) + except HTTPClientError as exc: raise ClientError( - 'Unable to connect to UI Server or Hub.', + 'Client error with Hub/UI-Server request.', f'{exc}' ) - response = res.json() - LOG.debug('http:recv %s', response) + response = json.loads(res.body) + LOG.debug('https:recv %s', response) try: return response['data'] diff --git a/setup.cfg b/setup.cfg index f2ef67a8..485d27e7 100644 --- a/setup.cfg +++ b/setup.cfg @@ -58,7 +58,6 @@ install_requires = jupyter_server>=1.10.2 tornado>=6.1.0 # matches jupyter_server value traitlets>=5.2.1 # required for logging_config (5.2.0 had bugs) - requests==2.28.* # Transitive dependencies that we directly (lightly) use: pyzmq From 344246cb1dab37604f551237c08e135e709bedde Mon Sep 17 00:00:00 2001 From: David Sutherland Date: Thu, 19 Jan 2023 15:25:35 +1300 Subject: [PATCH 4/4] http => https comms method string, play via zmq --- cylc/uiserver/client.py | 17 ++++++++++++++--- cylc/uiserver/resolvers.py | 2 +- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/cylc/uiserver/client.py b/cylc/uiserver/client.py index a0837f1d..42bf77b7 100644 --- a/cylc/uiserver/client.py +++ b/cylc/uiserver/client.py @@ -66,8 +66,14 @@ async def async_request( if not args: args = {} - with open(API_INFO_FILE, "r") as api_file: - api_info = json.loads(api_file.read()) + try: + with open(API_INFO_FILE, "r") as api_file: + api_info = json.loads(api_file.read()) + except FileNotFoundError: + raise ClientError( + 'API info not found, is the UI-Server running?\n' + f'({API_INFO_FILE})' + ) # send message msg: Dict[str, Any] = {'command': command, 'args': args} @@ -96,6 +102,11 @@ async def async_request( request_timeout=float(self.timeout) ) res = await AsyncHTTPClient().fetch(request) + except ConnectionRefusedError: + raise ClientError( + 'Connection refused, is the UI-Server running?\n' + f'({api_info["url"]}cylc/graphql)' + ) except HTTPClientError as exc: raise ClientError( 'Client error with Hub/UI-Server request.', @@ -147,7 +158,7 @@ def get_header(self) -> dict: 'comms_method': os.getenv( "CLIENT_COMMS_METH", - default=CommsMeth.HTTP.value + default=CommsMeth.HTTPS.value ) } } diff --git a/cylc/uiserver/resolvers.py b/cylc/uiserver/resolvers.py index 4bc596e2..608ffc47 100644 --- a/cylc/uiserver/resolvers.py +++ b/cylc/uiserver/resolvers.py @@ -271,7 +271,7 @@ async def play(cls, workflows, args, workflows_mgr, log): args.pop('cylc_version') # build the command - cmd = ['cylc', 'play', '--color=never'] + cmd = ['cylc', 'play', '--color=never', '--comms-method=zmq'] cmd = _build_cmd(cmd, args) except Exception as exc: