diff --git a/compute_endpoint/globus_compute_endpoint/cli.py b/compute_endpoint/globus_compute_endpoint/cli.py index d52f18d19..1889adf33 100644 --- a/compute_endpoint/globus_compute_endpoint/cli.py +++ b/compute_endpoint/globus_compute_endpoint/cli.py @@ -490,10 +490,24 @@ def _do_start_endpoint( exc_type = e.__class__.__name__ log.debug("Invalid info on stdin -- (%s) %s", exc_type, e) - if config_str is not None: - ep_config = load_config_yaml(config_str) - else: - ep_config = get_config(ep_dir) + try: + if config_str is not None: + ep_config = load_config_yaml(config_str) + else: + ep_config = get_config(ep_dir) + except Exception as e: + if isinstance(e, ClickException): + raise + + # We've likely not exported to the log, so at least put _something_ in the + # logs for the human to debug; motivated by SC-28607 + exc_type = type(e).__name__ + msg = ( + "Failed to find or parse endpoint configuration. Endpoint will not" + f" start. ({exc_type}) {e}" + ) + log.critical(msg) + raise if die_with_parent: # The endpoint cannot die with its parent if it diff --git a/compute_endpoint/globus_compute_endpoint/endpoint/config/user_config_schema.json b/compute_endpoint/globus_compute_endpoint/endpoint/config/user_config_schema.json index 28e84a42a..f8cfbf736 100644 --- a/compute_endpoint/globus_compute_endpoint/endpoint/config/user_config_schema.json +++ b/compute_endpoint/globus_compute_endpoint/endpoint/config/user_config_schema.json @@ -3,8 +3,7 @@ "type": "object", "properties": { "endpoint_setup": { "type": "string" }, - "endpoint_init": { "type": "string" }, "worker_init": { "type": "string" } }, "additionalProperties": true -} \ No newline at end of file +} diff --git a/compute_endpoint/globus_compute_endpoint/endpoint/config/user_config_template.yaml b/compute_endpoint/globus_compute_endpoint/endpoint/config/user_config_template.yaml index 9cdf03d9c..a818dc994 100644 --- a/compute_endpoint/globus_compute_endpoint/endpoint/config/user_config_template.yaml +++ b/compute_endpoint/globus_compute_endpoint/endpoint/config/user_config_template.yaml @@ -21,6 +21,8 @@ # There are a number of example configurations available in the documentation: # https://globus-compute.readthedocs.io/en/stable/endpoints.html#example-configurations +endpoint_setup: {{ endpoint_setup|default() }} + engine: type: HighThroughputEngine max_workers_per_node: 1 @@ -32,8 +34,6 @@ engine: max_blocks: 1 init_blocks: 1 - endpoint_setup: {{ endpoint_setup|default() }} - endpoint_init: {{ endpoint_init|default() }} worker_init: {{ worker_init|default() }} # Endpoints will be restarted when a user submits new tasks to the diff --git a/compute_endpoint/globus_compute_endpoint/endpoint/endpoint.py b/compute_endpoint/globus_compute_endpoint/endpoint/endpoint.py index 4bb0f7a32..88c8f9cd5 100644 --- a/compute_endpoint/globus_compute_endpoint/endpoint/endpoint.py +++ b/compute_endpoint/globus_compute_endpoint/endpoint/endpoint.py @@ -325,12 +325,33 @@ def start_endpoint( reg_info: dict, die_with_parent: bool = False, ): + # If we are running a full detached daemon then we will send the output to + # log files, otherwise we can piggy back on our stdout + if endpoint_config.detach_endpoint: + stdout: t.TextIO = open( + os.path.join(endpoint_dir, endpoint_config.stdout), "a+" + ) + stderr: t.TextIO = open( + os.path.join(endpoint_dir, endpoint_config.stderr), "a+" + ) + else: + stdout = sys.stdout + stderr = sys.stderr + + ostream = None + if sys.stdout.isatty(): + ostream = sys.stdout + elif sys.stderr.isatty(): + ostream = sys.stderr + # This is to ensure that at least 1 executor is defined if not endpoint_config.executors: - raise Exception( - f"Endpoint config file at {endpoint_dir} is missing " - "executor definitions" + msg = ( + "Endpoint configuration has no executors defined. Endpoint will not" + " start." ) + log.critical(msg) + raise ValueError(msg) pid_check = Endpoint.check_pidfile(endpoint_dir) # if the pidfile exists, we should return early because we don't @@ -342,8 +363,9 @@ def start_endpoint( if endpoint_config.display_name: endpoint_name = endpoint_config.display_name active_msg = f"Endpoint '{endpoint_name}' is already active" - print(active_msg) log.info(active_msg) + if ostream: + print(active_msg, file=ostream) sys.exit(-1) else: log.info( @@ -355,21 +377,12 @@ def start_endpoint( if endpoint_config.endpoint_setup: Endpoint._run_command("endpoint_setup", endpoint_config.endpoint_setup) - result_store = ResultStore(endpoint_dir=endpoint_dir) - - # Create a daemon context - # If we are running a full detached daemon then we will send the output to - # log files, otherwise we can piggy back on our stdout - if endpoint_config.detach_endpoint: - stdout: t.TextIO = open( - os.path.join(endpoint_dir, endpoint_config.stdout), "a+" - ) - stderr: t.TextIO = open( - os.path.join(endpoint_dir, endpoint_config.stderr), "a+" - ) - else: - stdout = sys.stdout - stderr = sys.stderr + try: + result_store = ResultStore(endpoint_dir=endpoint_dir) + except Exception as e: + exc_type = type(e).__name__ + log.critical(f"Failed to initialize the result storage. ({exc_type}) {e}") + raise try: pid_file = endpoint_dir / "daemon.pid" @@ -408,7 +421,8 @@ def start_endpoint( except GlobusAPIError as e: blocked_msg = f"Endpoint registration blocked. [{e.text}]" log.warning(blocked_msg) - print(blocked_msg) + if ostream: + print(blocked_msg, file=ostream) if e.http_status in ( HTTPStatus.CONFLICT, HTTPStatus.LOCKED, @@ -434,8 +448,9 @@ def start_endpoint( "Please ensure the Globus Compute service address is reachable, " "then attempt restarting the endpoint." ) - print(msg) log.critical(msg) + if ostream: + print(msg, file=ostream) exit(os.EX_TEMPFAIL) ret_ep_uuid = reg_info.get("endpoint_id") @@ -502,11 +517,6 @@ def start_endpoint( self.debug, ) - ostream = None - if sys.stdout.isatty(): - ostream = sys.stdout - elif sys.stderr.isatty(): - ostream = sys.stderr if ostream: msg = f"Starting endpoint; registered ID: {endpoint_uuid}" if log_to_console: @@ -523,7 +533,9 @@ def start_endpoint( # On any other system, this should make no difference (same file!) with open(os.devnull) as nullf: if os.dup2(nullf.fileno(), 0) != 0: - raise Exception("Unable to close stdin") + msg = "Unable to close stdin; endpoint will not start." + log.critical(msg) + raise Exception(msg) setup_logging( logfile=logfile, diff --git a/compute_endpoint/tests/integration/endpoint/endpoint/test_endpoint.py b/compute_endpoint/tests/integration/endpoint/endpoint/test_endpoint.py index 13b409932..f19bfffb6 100644 --- a/compute_endpoint/tests/integration/endpoint/endpoint/test_endpoint.py +++ b/compute_endpoint/tests/integration/endpoint/endpoint/test_endpoint.py @@ -44,10 +44,12 @@ def patch_compute_client(mocker): yield mocker.patch(f"{_MOCK_BASE}Client", return_value=gcc) -def test_non_configured_endpoint(mocker): - result = CliRunner().invoke(app, ["start", "newendpoint"]) - assert "newendpoint" in result.stdout - assert "not configured" in result.stdout +def test_non_configured_endpoint(mocker, tmp_path): + env = {"GLOBUS_COMPUTE_USER_DIR": str(tmp_path)} + with mock.patch.dict(os.environ, env): + result = CliRunner().invoke(app, ["start", "newendpoint"]) + assert "newendpoint" in result.stdout + assert "not configured" in result.stdout @pytest.mark.parametrize( @@ -293,7 +295,7 @@ def test_endpoint_setup_execution(mocker, tmp_path, randomstring): endpoint_dir = None endpoint_uuid = None - endpoint_config = Config(endpoint_setup=command) + endpoint_config = Config(endpoint_setup=command, detach_endpoint=False) log_to_console = False no_color = True reg_info = {} diff --git a/compute_endpoint/tests/integration/endpoint/endpoint/test_endpoint_manager.py b/compute_endpoint/tests/integration/endpoint/endpoint/test_endpoint_manager.py index 88b371202..28e1eb2d9 100644 --- a/compute_endpoint/tests/integration/endpoint/endpoint/test_endpoint_manager.py +++ b/compute_endpoint/tests/integration/endpoint/endpoint/test_endpoint_manager.py @@ -10,7 +10,7 @@ import pytest import requests import yaml -from globus_compute_endpoint.endpoint.endpoint import Endpoint +from globus_compute_endpoint.endpoint.endpoint import Config, Endpoint from globus_sdk import GlobusAPIError logger = logging.getLogger("mock_funcx") @@ -333,24 +333,17 @@ def test_start_without_executors(self, mocker): mock_context.return_value.pidfile.path = "" - class mock_executors: - executors = None - - mock_config = mock_executors() + config = Config(executors=[], detach_endpoint=False) manager = Endpoint() config_dir = pathlib.Path("/some/path/mock_endpoint") manager.configure_endpoint(config_dir, None) - with pytest.raises( - Exception, - match=f"Endpoint config file at {config_dir} is " - "missing executor definitions", - ): + with pytest.raises(ValueError, match="has no executors defined"): log_to_console = False no_color = True manager.start_endpoint( - config_dir, None, mock_config, log_to_console, no_color, reg_info={} + config_dir, None, config, log_to_console, no_color, reg_info={} ) @pytest.mark.skip("This test doesn't make much sense") diff --git a/compute_endpoint/tests/unit/test_cli_behavior.py b/compute_endpoint/tests/unit/test_cli_behavior.py index 43192f809..aa3425290 100644 --- a/compute_endpoint/tests/unit/test_cli_behavior.py +++ b/compute_endpoint/tests/unit/test_cli_behavior.py @@ -363,9 +363,13 @@ def test_start_ep_incorrect_config_py( conf = mock_state.endpoint_config_dir / ep_name / "config.py" conf.write_text("asa asd df = 5") # fail the import - with mock.patch("globus_compute_endpoint.endpoint.config.utils.log") as mock_log: - res = run_line(f"start {ep_name}", assert_exit_code=1) - assert "might be out of date" in mock_log.exception.call_args[0][0] + with mock.patch(f"{_MOCK_BASE}log"): + with mock.patch( + "globus_compute_endpoint.endpoint.config.utils.log" + ) as mock_util_log: + res = run_line(f"start {ep_name}", assert_exit_code=1) + a, _ = mock_util_log.exception.call_args + assert "might be out of date" in a[0] assert isinstance(res.exception, SyntaxError) # `coverage` demands a valid syntax file. FBOW, then, the ordering and diff --git a/compute_endpoint/tests/unit/test_endpoint_unit.py b/compute_endpoint/tests/unit/test_endpoint_unit.py index b468c06eb..1a25a8826 100644 --- a/compute_endpoint/tests/unit/test_endpoint_unit.py +++ b/compute_endpoint/tests/unit/test_endpoint_unit.py @@ -215,6 +215,7 @@ def test_start_endpoint_network_error( mock_log = mocker.patch(f"{_mock_base}log") f = io.StringIO() + f.isatty = lambda: True with redirect_stdout(f): with pytest.raises(SystemExit) as pytest_exc: ep.start_endpoint( @@ -324,7 +325,6 @@ def test_register_endpoint_blocked( mock_log = mocker.patch(f"{_mock_base}log") mock_gcc = get_standard_compute_client() mocker.patch(f"{_mock_base}Endpoint.get_funcx_client").return_value = mock_gcc - f = io.StringIO() ep, ep_dir, log_to_console, no_color, ep_conf = mock_ep_data ep_id = str(uuid.uuid4()) @@ -335,6 +335,8 @@ def test_register_endpoint_blocked( msg=some_err, ) + f = io.StringIO() + f.isatty = lambda: True with redirect_stdout(f): with pytest.raises((GlobusAPIError, SystemExit)) as pytexc: ep.start_endpoint( @@ -374,10 +376,10 @@ def test_register_endpoint_already_active( mocker.patch(f"{_mock_base}Endpoint.check_pidfile").return_value = pid_active f = io.StringIO() + f.isatty = lambda: True ep, ep_dir, log_to_console, no_color, ep_conf = mock_ep_data ep_id = str(uuid.uuid4()) - # register_endpoint_failure_response(endpoint_id=ep_id, status_code=409) with redirect_stdout(f): with pytest.raises(SystemExit) as pytest_exc: ep.start_endpoint(