Skip to content

Commit

Permalink
Fix flubbed default user template (#1392)
Browse files Browse the repository at this point in the history
As identified in the associated ticket, there is no `endpoint_init`, and the
`endpoint_setup` item should be top-level.

However, debugging this shouldn't have been quite as arcane as it was: there
were no logs.  In lieu of an upcoming commit to more robustly communicate UEP
errors via the already-instantiated AMQP credentials, this commit ensures that
at least something gets written to the logs in the identified case.

[sc-28607]
  • Loading branch information
khk-globus committed Dec 14, 2023
1 parent 0c48759 commit 259a691
Show file tree
Hide file tree
Showing 8 changed files with 82 additions and 56 deletions.
22 changes: 18 additions & 4 deletions compute_endpoint/globus_compute_endpoint/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -490,10 +490,24 @@ def _do_start_endpoint(
exc_type = e.__class__.__name__
log.debug("Invalid info on stdin -- (%s) %s", exc_type, e)

if config_str is not None:
ep_config = load_config_yaml(config_str)
else:
ep_config = get_config(ep_dir)
try:
if config_str is not None:
ep_config = load_config_yaml(config_str)
else:
ep_config = get_config(ep_dir)
except Exception as e:
if isinstance(e, ClickException):
raise

# We've likely not exported to the log, so at least put _something_ in the
# logs for the human to debug; motivated by SC-28607
exc_type = type(e).__name__
msg = (
"Failed to find or parse endpoint configuration. Endpoint will not"
f" start. ({exc_type}) {e}"
)
log.critical(msg)
raise

if die_with_parent:
# The endpoint cannot die with its parent if it
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@
"type": "object",
"properties": {
"endpoint_setup": { "type": "string" },
"endpoint_init": { "type": "string" },
"worker_init": { "type": "string" }
},
"additionalProperties": true
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
# There are a number of example configurations available in the documentation:
# https://globus-compute.readthedocs.io/en/stable/endpoints.html#example-configurations

endpoint_setup: {{ endpoint_setup|default() }}

engine:
type: HighThroughputEngine
max_workers_per_node: 1
Expand All @@ -32,8 +34,6 @@ engine:
max_blocks: 1
init_blocks: 1

endpoint_setup: {{ endpoint_setup|default() }}
endpoint_init: {{ endpoint_init|default() }}
worker_init: {{ worker_init|default() }}

# Endpoints will be restarted when a user submits new tasks to the
Expand Down
66 changes: 39 additions & 27 deletions compute_endpoint/globus_compute_endpoint/endpoint/endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,12 +325,33 @@ def start_endpoint(
reg_info: dict,
die_with_parent: bool = False,
):
# If we are running a full detached daemon then we will send the output to
# log files, otherwise we can piggy back on our stdout
if endpoint_config.detach_endpoint:
stdout: t.TextIO = open(
os.path.join(endpoint_dir, endpoint_config.stdout), "a+"
)
stderr: t.TextIO = open(
os.path.join(endpoint_dir, endpoint_config.stderr), "a+"
)
else:
stdout = sys.stdout
stderr = sys.stderr

ostream = None
if sys.stdout.isatty():
ostream = sys.stdout
elif sys.stderr.isatty():
ostream = sys.stderr

# This is to ensure that at least 1 executor is defined
if not endpoint_config.executors:
raise Exception(
f"Endpoint config file at {endpoint_dir} is missing "
"executor definitions"
msg = (
"Endpoint configuration has no executors defined. Endpoint will not"
" start."
)
log.critical(msg)
raise ValueError(msg)

pid_check = Endpoint.check_pidfile(endpoint_dir)
# if the pidfile exists, we should return early because we don't
Expand All @@ -342,8 +363,9 @@ def start_endpoint(
if endpoint_config.display_name:
endpoint_name = endpoint_config.display_name
active_msg = f"Endpoint '{endpoint_name}' is already active"
print(active_msg)
log.info(active_msg)
if ostream:
print(active_msg, file=ostream)
sys.exit(-1)
else:
log.info(
Expand All @@ -355,21 +377,12 @@ def start_endpoint(
if endpoint_config.endpoint_setup:
Endpoint._run_command("endpoint_setup", endpoint_config.endpoint_setup)

result_store = ResultStore(endpoint_dir=endpoint_dir)

# Create a daemon context
# If we are running a full detached daemon then we will send the output to
# log files, otherwise we can piggy back on our stdout
if endpoint_config.detach_endpoint:
stdout: t.TextIO = open(
os.path.join(endpoint_dir, endpoint_config.stdout), "a+"
)
stderr: t.TextIO = open(
os.path.join(endpoint_dir, endpoint_config.stderr), "a+"
)
else:
stdout = sys.stdout
stderr = sys.stderr
try:
result_store = ResultStore(endpoint_dir=endpoint_dir)
except Exception as e:
exc_type = type(e).__name__
log.critical(f"Failed to initialize the result storage. ({exc_type}) {e}")
raise

try:
pid_file = endpoint_dir / "daemon.pid"
Expand Down Expand Up @@ -408,7 +421,8 @@ def start_endpoint(
except GlobusAPIError as e:
blocked_msg = f"Endpoint registration blocked. [{e.text}]"
log.warning(blocked_msg)
print(blocked_msg)
if ostream:
print(blocked_msg, file=ostream)
if e.http_status in (
HTTPStatus.CONFLICT,
HTTPStatus.LOCKED,
Expand All @@ -434,8 +448,9 @@ def start_endpoint(
"Please ensure the Globus Compute service address is reachable, "
"then attempt restarting the endpoint."
)
print(msg)
log.critical(msg)
if ostream:
print(msg, file=ostream)
exit(os.EX_TEMPFAIL)

ret_ep_uuid = reg_info.get("endpoint_id")
Expand Down Expand Up @@ -502,11 +517,6 @@ def start_endpoint(
self.debug,
)

ostream = None
if sys.stdout.isatty():
ostream = sys.stdout
elif sys.stderr.isatty():
ostream = sys.stderr
if ostream:
msg = f"Starting endpoint; registered ID: {endpoint_uuid}"
if log_to_console:
Expand All @@ -523,7 +533,9 @@ def start_endpoint(
# On any other system, this should make no difference (same file!)
with open(os.devnull) as nullf:
if os.dup2(nullf.fileno(), 0) != 0:
raise Exception("Unable to close stdin")
msg = "Unable to close stdin; endpoint will not start."
log.critical(msg)
raise Exception(msg)

setup_logging(
logfile=logfile,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,12 @@ def patch_compute_client(mocker):
yield mocker.patch(f"{_MOCK_BASE}Client", return_value=gcc)


def test_non_configured_endpoint(mocker):
result = CliRunner().invoke(app, ["start", "newendpoint"])
assert "newendpoint" in result.stdout
assert "not configured" in result.stdout
def test_non_configured_endpoint(mocker, tmp_path):
env = {"GLOBUS_COMPUTE_USER_DIR": str(tmp_path)}
with mock.patch.dict(os.environ, env):
result = CliRunner().invoke(app, ["start", "newendpoint"])
assert "newendpoint" in result.stdout
assert "not configured" in result.stdout


@pytest.mark.parametrize(
Expand Down Expand Up @@ -293,7 +295,7 @@ def test_endpoint_setup_execution(mocker, tmp_path, randomstring):

endpoint_dir = None
endpoint_uuid = None
endpoint_config = Config(endpoint_setup=command)
endpoint_config = Config(endpoint_setup=command, detach_endpoint=False)
log_to_console = False
no_color = True
reg_info = {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import pytest
import requests
import yaml
from globus_compute_endpoint.endpoint.endpoint import Endpoint
from globus_compute_endpoint.endpoint.endpoint import Config, Endpoint
from globus_sdk import GlobusAPIError

logger = logging.getLogger("mock_funcx")
Expand Down Expand Up @@ -333,24 +333,17 @@ def test_start_without_executors(self, mocker):

mock_context.return_value.pidfile.path = ""

class mock_executors:
executors = None

mock_config = mock_executors()
config = Config(executors=[], detach_endpoint=False)

manager = Endpoint()
config_dir = pathlib.Path("/some/path/mock_endpoint")

manager.configure_endpoint(config_dir, None)
with pytest.raises(
Exception,
match=f"Endpoint config file at {config_dir} is "
"missing executor definitions",
):
with pytest.raises(ValueError, match="has no executors defined"):
log_to_console = False
no_color = True
manager.start_endpoint(
config_dir, None, mock_config, log_to_console, no_color, reg_info={}
config_dir, None, config, log_to_console, no_color, reg_info={}
)

@pytest.mark.skip("This test doesn't make much sense")
Expand Down
10 changes: 7 additions & 3 deletions compute_endpoint/tests/unit/test_cli_behavior.py
Original file line number Diff line number Diff line change
Expand Up @@ -363,9 +363,13 @@ def test_start_ep_incorrect_config_py(
conf = mock_state.endpoint_config_dir / ep_name / "config.py"

conf.write_text("asa asd df = 5") # fail the import
with mock.patch("globus_compute_endpoint.endpoint.config.utils.log") as mock_log:
res = run_line(f"start {ep_name}", assert_exit_code=1)
assert "might be out of date" in mock_log.exception.call_args[0][0]
with mock.patch(f"{_MOCK_BASE}log"):
with mock.patch(
"globus_compute_endpoint.endpoint.config.utils.log"
) as mock_util_log:
res = run_line(f"start {ep_name}", assert_exit_code=1)
a, _ = mock_util_log.exception.call_args
assert "might be out of date" in a[0]
assert isinstance(res.exception, SyntaxError)

# `coverage` demands a valid syntax file. FBOW, then, the ordering and
Expand Down
6 changes: 4 additions & 2 deletions compute_endpoint/tests/unit/test_endpoint_unit.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ def test_start_endpoint_network_error(
mock_log = mocker.patch(f"{_mock_base}log")

f = io.StringIO()
f.isatty = lambda: True
with redirect_stdout(f):
with pytest.raises(SystemExit) as pytest_exc:
ep.start_endpoint(
Expand Down Expand Up @@ -324,7 +325,6 @@ def test_register_endpoint_blocked(
mock_log = mocker.patch(f"{_mock_base}log")
mock_gcc = get_standard_compute_client()
mocker.patch(f"{_mock_base}Endpoint.get_funcx_client").return_value = mock_gcc
f = io.StringIO()

ep, ep_dir, log_to_console, no_color, ep_conf = mock_ep_data
ep_id = str(uuid.uuid4())
Expand All @@ -335,6 +335,8 @@ def test_register_endpoint_blocked(
msg=some_err,
)

f = io.StringIO()
f.isatty = lambda: True
with redirect_stdout(f):
with pytest.raises((GlobusAPIError, SystemExit)) as pytexc:
ep.start_endpoint(
Expand Down Expand Up @@ -374,10 +376,10 @@ def test_register_endpoint_already_active(
mocker.patch(f"{_mock_base}Endpoint.check_pidfile").return_value = pid_active

f = io.StringIO()
f.isatty = lambda: True

ep, ep_dir, log_to_console, no_color, ep_conf = mock_ep_data
ep_id = str(uuid.uuid4())
# register_endpoint_failure_response(endpoint_id=ep_id, status_code=409)
with redirect_stdout(f):
with pytest.raises(SystemExit) as pytest_exc:
ep.start_endpoint(
Expand Down

0 comments on commit 259a691

Please sign in to comment.