Skip to content

Commit

Permalink
use sd_notify to update systemd in case of fatal exception and of cur…
Browse files Browse the repository at this point in the history
…rent progress
  • Loading branch information
holmanb committed Jul 15, 2024
1 parent a65e90c commit 5692c50
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 30 deletions.
61 changes: 36 additions & 25 deletions cloudinit/cmd/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -1146,31 +1146,42 @@ def main(sysv_args=None):
return sub_main(args)
LOG.info("Running cloud-init in single process mode.")

# this _must_ be called before sd_notify is called otherwise netcat may
# attempt to send "start" before a socket exists
sync = socket.SocketSync("local", "network", "config", "final")

# notify systemd that this stage has completed
socket.sd_notify("READY=1")

# wait for cloud-init-local.service to start
with sync("local"):
sub_main(parser.parse_args(args=["init", "--local"]))

# wait for cloud-init.service to start
with sync("network"):
# init stage
sub_main(parser.parse_args(args=["init"]))

# wait for cloud-config.service to start
with sync("config"):
# config stage
sub_main(parser.parse_args(args=["modules", "--mode=config"]))

with sync("final"):
# final stage
sub_main(parser.parse_args(args=["modules", "--mode=final"]))
socket.sd_notify(b"STOPPING=1")
try:
# this _must_ be called before sd_notify is called otherwise netcat may
# attempt to send "start" before a socket exists
sync = socket.SocketSync("local", "network", "config", "final")

# notify systemd that this stage has completed
socket.sd_notify("READY=1")

# wait for cloud-init-local.service to start
with sync("local"):
# local stage
sub_main(parser.parse_args(args=["init", "--local"]))

# wait for cloud-init.service to start
with sync("network"):
# init stage
sub_main(parser.parse_args(args=["init"]))

# wait for cloud-config.service to start
with sync("config"):
# config stage
sub_main(parser.parse_args(args=["modules", "--mode=config"]))

with sync("final"):
# final stage
return_code = sub_main(
parser.parse_args(args=["modules", "--mode=final"])
)
except Exception as e:
LOG.fatal("Fatal exception: %s", e, exc_info=True)
status = traceback.format_exc().replace("\n", " ")
socket.sd_notify(f"STATUS={status}")
return_code = 1
socket.sd_notify("STATUS=Completed")
socket.sd_notify("STOPPING=1")
return return_code


def sub_main(args):
Expand Down
12 changes: 10 additions & 2 deletions cloudinit/socket.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import logging
import os
import socket
import time
from contextlib import suppress

from cloudinit.settings import DEFAULT_RUN_DIR
Expand Down Expand Up @@ -90,7 +91,11 @@ def __enter__(self):
Once the message has been received, enter the context.
"""
LOG.debug("sync(%s): initial synchronization starting", self.stage)
sd_notify(
"STATUS=Waiting on external services to "
f"complete ({self.stage} stage)"
)
start_time = time.monotonic()
# block until init system sends us data
# the first value returned contains a message from the init system
# (should be "start")
Expand All @@ -111,7 +116,10 @@ def __enter__(self):
self.__exit__(None, None, None)
raise ValueError(f"Unexpected path to unix socket: {self.remote}")

LOG.debug("sync(%s): initial synchronization complete", self.stage)
total = time.monotonic() - start_time
time_msg = f"took {total: .3f} " if total > 0.1 else ""
sd_notify(f"STATUS=Running ({self.stage} stage)")
LOG.debug("sync(%s): synchronization %scomplete", self.stage, time_msg)
return self

def __exit__(self, exc_type, exc_val, exc_tb):
Expand Down
12 changes: 9 additions & 3 deletions tests/unittests/test_single_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ def __exit__(self, *_):

def test_single_process_times_out(tmp_path):
"""Verify that no "start" makes the protocol block"""
with mock.patch.object(ci_socket, "DEFAULT_RUN_DIR", tmp_path):
with mock.patch.object(
ci_socket, "DEFAULT_RUN_DIR", tmp_path
), mock.patch.object(ci_socket, "sd_notify"):
sync = ci_socket.SocketSync("first")

try:
Expand All @@ -71,7 +73,9 @@ def test_single_process(tmp_path):
After a socket has been been bound but before it has started listening
"""
expected = b"done"
with mock.patch.object(ci_socket, "DEFAULT_RUN_DIR", tmp_path):
with mock.patch.object(
ci_socket, "DEFAULT_RUN_DIR", tmp_path
), mock.patch.object(ci_socket, "sd_notify"):
sync = ci_socket.SocketSync("first", "second", "third")

# send all three syncs to the sockets
Expand Down Expand Up @@ -115,7 +119,9 @@ def syncer(index: int, name: str):
time.sleep(0.001 * random.randint(0, max_sleep))
sync_storage[index] = Sync(name, tmp_path)

with mock.patch.object(ci_socket, "DEFAULT_RUN_DIR", tmp_path):
with mock.patch.object(
ci_socket, "DEFAULT_RUN_DIR", tmp_path
), mock.patch.object(ci_socket, "sd_notify"):

sync = ci_socket.SocketSync(
"first", "second", "third", "fourth", "fifth"
Expand Down

0 comments on commit 5692c50

Please sign in to comment.