diff --git a/cloudinit/cmd/main.py b/cloudinit/cmd/main.py index 4a1c8b2e28c..4754b37245f 100644 --- a/cloudinit/cmd/main.py +++ b/cloudinit/cmd/main.py @@ -24,6 +24,7 @@ from cloudinit import netinfo from cloudinit import signal_handler from cloudinit import sources +from cloudinit import socket from cloudinit import stages from cloudinit import url_helper from cloudinit import util @@ -37,7 +38,12 @@ from cloudinit.config.schema import validate_cloudconfig_schema from cloudinit import log from cloudinit.reporting import events -from cloudinit.settings import PER_INSTANCE, PER_ALWAYS, PER_ONCE, CLOUD_CONFIG +from cloudinit.settings import ( + PER_INSTANCE, + PER_ALWAYS, + PER_ONCE, + CLOUD_CONFIG, +) # Welcome message template WELCOME_MSG_TPL = ( @@ -932,9 +938,19 @@ def main(sysv_args=None): default=False, ) + parser.add_argument( + "--single-process", + dest="single_process", + action="store_true", + help=( + "Run run the four stages as a single process as an optimization." + "Requires init system integration." + ), + default=False, + ) + parser.set_defaults(reporter=None) subparsers = parser.add_subparsers(title="Subcommands", dest="subcommand") - subparsers.required = True # Each action and its sub-options (if any) parser_init = subparsers.add_parser( @@ -1122,8 +1138,42 @@ def main(sysv_args=None): status_parser(parser_status) parser_status.set_defaults(action=("status", handle_status_args)) + else: + parser.error("a subcommand is required") args = parser.parse_args(args=sysv_args) + if not args.single_process: + return sub_main(args) + LOG.info("Running cloud-init in single process mode.") + + # this _must_ be called before sd_notify is called otherwise netcat may + # attempt to send "start" before a socket exists + sync = socket.SocketSync("local", "network", "config", "final") + + # notify cloud-init-local.service that this stage has completed + socket.sd_notify(b"READY=1") + + # wait for cloud-init-local.service to start + with sync("local"): + sub_main(parser.parse_args(args=["init", "--local"])) + + # wait for cloud-init.service to start + with sync("network"): + # init stage + sub_main(parser.parse_args(args=["init"])) + + # wait for cloud-config.service to start + with sync("config"): + # config stage + sub_main(parser.parse_args(args=["modules", "--mode=config"])) + + with sync("final"): + # final stage + sub_main(parser.parse_args(args=["modules", "--mode=final"])) + socket.sd_notify(b"STOPPING=1") + + +def sub_main(args): # Subparsers.required = True and each subparser sets action=(name, functor) (name, functor) = args.action diff --git a/cloudinit/socket.py b/cloudinit/socket.py new file mode 100644 index 00000000000..685f43f28f9 --- /dev/null +++ b/cloudinit/socket.py @@ -0,0 +1,117 @@ +# This file is part of cloud-init. See LICENSE file for license information. +"""A module for common socket helpers.""" +import logging +import os +import socket +from contextlib import suppress + +from cloudinit.settings import DEFAULT_RUN_DIR + +LOG = logging.getLogger(__name__) + + +def sd_notify(message: bytes): + """Send a sd_notify message.""" + LOG.info("Sending sd_notify(%s)", str(message)) + socket_path = os.environ.get("NOTIFY_SOCKET", "") + + # abstract + if socket_path[0] == "@": + socket_path.replace("@", "\0", 1) + + # unix domain + elif not socket_path[0] == "/": + raise OSError("Unsupported socket type") + + with socket.socket( + socket.AF_UNIX, socket.SOCK_DGRAM | socket.SOCK_CLOEXEC + ) as sock: + sock.connect(socket_path) + sock.sendall(message) + + +class SocketSync: + """A two way synchronization protocol over Unix domain sockets.""" + + def __init__(self, *names: str): + """Initialize a synchronization context. + + 1) Ensure that the socket directory exists. + 2) Bind a socket for each stage. + + Binding the sockets on initialization allows receipt of stage + "start" notifications prior to the cloud-init stage being ready to + start. + + :param names: stage names, used as a unique identifiers + """ + self.stage = "" + self.remote = "" + self.sockets = { + name: socket.socket( + socket.AF_UNIX, socket.SOCK_DGRAM | socket.SOCK_CLOEXEC + ) + for name in names + } + # ensure the directory exists + os.makedirs(f"{DEFAULT_RUN_DIR}/share", mode=0o700, exist_ok=True) + # removing stale sockets and bind + for name, sock in self.sockets.items(): + socket_path = f"{DEFAULT_RUN_DIR}/share/{name}.sock" + with suppress(FileNotFoundError): + os.remove(socket_path) + sock.bind(socket_path) + + def __call__(self, stage: str): + """Set the stage before entering context. + + This enables the context manager to be initialized separately from + each stage synchronization. + + :param stage: the name of a stage to synchronize + + Example: + sync = SocketSync("stage 1", "stage 2"): + with sync("stage 1"): + pass + with sync("stage 2"): + pass + """ + self.stage = stage + return self + + def __enter__(self): + """Wait until a message has been received on this stage's socket. + + Once the message has been received, enter the context. + """ + LOG.debug("sync(%s): initial synchronization starting", self.stage) + # block until init system sends us data + # the first value returned contains a message from the init system + # (should be "start") + # the second value contains the path to a unix socket on which to + # reply, which is expected to be /path/to/{self.stage}-return.sock + sock = self.sockets[self.stage] + chunk, self.remote = sock.recvfrom(5) + + if b"start" != chunk: + # The protocol expects to receive a command "start" + self.__exit__(None, None, None) + raise ValueError(f"Received invalid message: [{str(chunk)}]") + elif f"{DEFAULT_RUN_DIR}/share/{self.stage}-return.sock" != str( + self.remote + ): + # assert that the return path is in a directory with appropriate + # permissions + self.__exit__(None, None, None) + raise ValueError(f"Unexpected path to unix socket: {self.remote}") + + LOG.debug("sync(%s): initial synchronization complete", self.stage) + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """Notify the socket that this stage is complete.""" + sock = self.sockets[self.stage] + sock.connect(self.remote) + sock.sendall(b"done") + sock.close() diff --git a/systemd/cloud-config.service.tmpl b/systemd/cloud-config.service.tmpl index 79c75c71ae6..cefe2c0613c 100644 --- a/systemd/cloud-config.service.tmpl +++ b/systemd/cloud-config.service.tmpl @@ -10,12 +10,13 @@ ConditionEnvironment=!KERNEL_CMDLINE=cloud-init=disabled [Service] Type=oneshot -ExecStart=/usr/bin/cloud-init modules --mode=config +ExecStart=nc.openbsd -Uu -W1 /run/cloud-init/share/config.sock -s /run/cloud-init/share/config-return.sock RemainAfterExit=yes TimeoutSec=0 # Output needs to appear in instance console output StandardOutput=journal+console +StandardInputText=start [Install] WantedBy=cloud-init.target diff --git a/systemd/cloud-final.service.tmpl b/systemd/cloud-final.service.tmpl index b66533643d3..26d33192e12 100644 --- a/systemd/cloud-final.service.tmpl +++ b/systemd/cloud-final.service.tmpl @@ -15,10 +15,9 @@ ConditionEnvironment=!KERNEL_CMDLINE=cloud-init=disabled [Service] Type=oneshot -ExecStart=/usr/bin/cloud-init modules --mode=final +ExecStart=nc.openbsd -Uu -W1 /run/cloud-init/share/final.sock -s /run/cloud-init/share/final-return.sock RemainAfterExit=yes TimeoutSec=0 -KillMode=process {% if variant in ["almalinux", "cloudlinux", "rhel"] %} # Restart NetworkManager if it is present and running. ExecStartPost=/bin/sh -c 'u=NetworkManager.service; \ @@ -32,6 +31,7 @@ TasksMax=infinity # Output needs to appear in instance console output StandardOutput=journal+console +StandardInputText=start [Install] WantedBy=cloud-init.target diff --git a/systemd/cloud-init-local.service.tmpl b/systemd/cloud-init-local.service.tmpl index 0da2d8337e9..418a753e93f 100644 --- a/systemd/cloud-init-local.service.tmpl +++ b/systemd/cloud-init-local.service.tmpl @@ -7,7 +7,6 @@ DefaultDependencies=no {% endif %} Wants=network-pre.target After=hv_kvp_daemon.service -After=systemd-remount-fs.service {% if variant in ["almalinux", "cloudlinux", "rhel"] %} Requires=dbus.socket After=dbus.socket @@ -38,12 +37,13 @@ ExecStartPre=/bin/mkdir -p /run/cloud-init ExecStartPre=/sbin/restorecon /run/cloud-init ExecStartPre=/usr/bin/touch /run/cloud-init/enabled {% endif %} -ExecStart=/usr/bin/cloud-init init --local +ExecStart=nc.openbsd -Uu -W1 /run/cloud-init/share/local.sock -s /run/cloud-init/share/local-return.sock RemainAfterExit=yes TimeoutSec=0 # Output needs to appear in instance console output StandardOutput=journal+console +StandardInputText=start [Install] WantedBy=cloud-init.target diff --git a/systemd/cloud-init-single.service b/systemd/cloud-init-single.service new file mode 100644 index 00000000000..17bbcfc1bb2 --- /dev/null +++ b/systemd/cloud-init-single.service @@ -0,0 +1,27 @@ +[Unit] +Description=Cloud-init: Single Process +DefaultDependencies=no +Wants=network-pre.target +After=systemd-remount-fs.service +Before=NetworkManager.service +Before=network-pre.target +Before=shutdown.target +Before=sysinit.target +Before=cloud-init-local.service +Conflicts=shutdown.target +RequiresMountsFor=/var/lib/cloud +ConditionPathExists=!/etc/cloud/cloud-init.disabled +ConditionKernelCommandLine=!cloud-init=disabled +ConditionEnvironment=!KERNEL_CMDLINE=cloud-init=disabled + +[Service] +Type=notify +ExecStart=/usr/bin/cloud-init --single-process +KillMode=process +TimeoutStartSec=infinity + +# Output needs to appear in instance console output +StandardOutput=journal+console + +[Install] +WantedBy=cloud-init.target diff --git a/systemd/cloud-init.service.tmpl b/systemd/cloud-init.service.tmpl index 58031cc4331..32469fe07b3 100644 --- a/systemd/cloud-init.service.tmpl +++ b/systemd/cloud-init.service.tmpl @@ -46,12 +46,13 @@ ConditionEnvironment=!KERNEL_CMDLINE=cloud-init=disabled [Service] Type=oneshot -ExecStart=/usr/bin/cloud-init init +ExecStart=nc.openbsd -Uu -W1 /run/cloud-init/share/network.sock -s /run/cloud-init/share/network-return.sock RemainAfterExit=yes TimeoutSec=0 # Output needs to appear in instance console output StandardOutput=journal+console +StandardInputText=start [Install] WantedBy=cloud-init.target diff --git a/tests/unittests/test_cli.py b/tests/unittests/test_cli.py index 3a92d29e261..67531722736 100644 --- a/tests/unittests/test_cli.py +++ b/tests/unittests/test_cli.py @@ -160,9 +160,7 @@ def test_no_arguments_shows_usage(self, capsys): def test_no_arguments_shows_error_message(self, capsys): exit_code = self._call_main() - missing_subcommand_message = ( - "the following arguments are required: subcommand" - ) + missing_subcommand_message = "a subcommand is required" _out, err = capsys.readouterr() assert ( missing_subcommand_message in err diff --git a/tests/unittests/test_single_process.py b/tests/unittests/test_single_process.py new file mode 100644 index 00000000000..c5dfdff4979 --- /dev/null +++ b/tests/unittests/test_single_process.py @@ -0,0 +1,60 @@ +import socket +from unittest import mock + +from cloudinit import socket as ci_socket + + +class Sync: + """A device to send and receive synchronization messages + + Creating an instance of the device sends a b"start" + """ + + def __init__(self, name: str, path: str): + self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) + self.sock.connect(f"{path}/share/{name}.sock") + self.sock.bind(f"{path}/share/{name}-return.sock") + self.sock.sendall(b"start") + + def receive(self): + """receive 5 bytes from the socket""" + received = self.sock.recv(5) + self.sock.close() + return received + + +def test_single_process(tmp_path): + """Verify that a socket can store "start" messages + + After a socket has been been bound but before it has started listening + """ + expected = b"done" + with mock.patch.object(ci_socket, "DEFAULT_RUN_DIR", tmp_path): + sync = ci_socket.SocketSync("first", "second", "third") + + # send all three syncs to the sockets + first = Sync("first", tmp_path) + second = Sync("second", tmp_path) + third = Sync("third", tmp_path) + + # wait on the first sync event + with sync("first"): + assert True + # check that the first sync returned + assert expected == first.receive() + # wait on the second sync event + with sync("second"): + assert True + # check that the second sync returned + assert expected == second.receive() + # wait on the third sync event + with sync("third"): + assert True + # check that the third sync returned + assert expected == third.receive() + + +def test_single_process_threaded(tmp_path): + # TODO demonstrate that threaded code using the same SocketSync object + # will be ordered via the protocol + pass