Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: single process optimization #5489

Merged
merged 22 commits into from
Aug 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 105 additions & 17 deletions cloudinit/cmd/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from cloudinit import netinfo
from cloudinit import signal_handler
from cloudinit import sources
from cloudinit import socket
from cloudinit import stages
from cloudinit import url_helper
from cloudinit import util
Expand All @@ -38,7 +39,12 @@
from cloudinit.config.schema import validate_cloudconfig_schema
from cloudinit import log
from cloudinit.reporting import events
from cloudinit.settings import PER_INSTANCE, PER_ALWAYS, PER_ONCE, CLOUD_CONFIG
from cloudinit.settings import (
PER_INSTANCE,
PER_ALWAYS,
PER_ONCE,
CLOUD_CONFIG,
)

# Welcome message template
WELCOME_MSG_TPL = (
Expand Down Expand Up @@ -362,8 +368,11 @@ def main_init(name, args):
outfmt = None
errfmt = None
try:
close_stdin(lambda msg: early_logs.append((logging.DEBUG, msg)))
outfmt, errfmt = util.fixup_output(init.cfg, name)
if not args.skip_log_setup:
close_stdin(lambda msg: early_logs.append((logging.DEBUG, msg)))
outfmt, errfmt = util.fixup_output(init.cfg, name)
else:
outfmt, errfmt = util.get_output_cfg(init.cfg, name)
except Exception:
msg = "Failed to setup output redirection!"
util.logexc(LOG, msg)
Expand All @@ -375,8 +384,9 @@ def main_init(name, args):
"Logging being reset, this logger may no longer be active shortly"
)
log.reset_logging()
log.setup_logging(init.cfg)
apply_reporting_cfg(init.cfg)
if not args.skip_log_setup:
log.setup_logging(init.cfg)
apply_reporting_cfg(init.cfg)

# Any log usage prior to setup_logging above did not have local user log
# config applied. We send the welcome message now, as stderr/out have
Expand Down Expand Up @@ -633,8 +643,9 @@ def main_modules(action_name, args):
mods = Modules(init, extract_fns(args), reporter=args.reporter)
# Stage 4
try:
close_stdin()
util.fixup_output(mods.cfg, name)
if not args.skip_log_setup:
close_stdin()
util.fixup_output(mods.cfg, name)
except Exception:
util.logexc(LOG, "Failed to setup output redirection!")
if args.debug:
Expand All @@ -643,8 +654,9 @@ def main_modules(action_name, args):
"Logging being reset, this logger may no longer be active shortly"
)
log.reset_logging()
log.setup_logging(mods.cfg)
apply_reporting_cfg(init.cfg)
if not args.skip_log_setup:
log.setup_logging(mods.cfg)
apply_reporting_cfg(init.cfg)

# now that logging is setup and stdout redirected, send welcome
welcome(name, msg=w_msg)
Expand Down Expand Up @@ -804,9 +816,10 @@ def status_wrapper(name, args):
)

v1[mode]["start"] = float(util.uptime())
preexisting_recoverable_errors = next(
handler = next(
filter(lambda h: isinstance(h, log.LogExporter), root_logger.handlers)
).export_logs()
)
preexisting_recoverable_errors = handler.export_logs()

# Write status.json prior to running init / module code
atomic_helper.write_json(status_path, status)
Expand Down Expand Up @@ -847,11 +860,8 @@ def status_wrapper(name, args):
v1["stage"] = None

# merge new recoverable errors into existing recoverable error list
new_recoverable_errors = next(
filter(
lambda h: isinstance(h, log.LogExporter), root_logger.handlers
)
).export_logs()
new_recoverable_errors = handler.export_logs()
handler.clean_logs()
for key in new_recoverable_errors.keys():
if key in preexisting_recoverable_errors:
v1[mode]["recoverable_errors"][key] = list(
Expand Down Expand Up @@ -953,9 +963,19 @@ def main(sysv_args=None):
default=False,
)

parser.add_argument(
"--all-stages",
dest="all_stages",
action="store_true",
help=(
"Run cloud-init's stages under a single process using a "
"syncronization protocol. This is not intended for CLI usage."
),
default=False,
)

parser.set_defaults(reporter=None)
subparsers = parser.add_subparsers(title="Subcommands", dest="subcommand")
subparsers.required = True

# Each action and its sub-options (if any)
parser_init = subparsers.add_parser(
Expand Down Expand Up @@ -1143,8 +1163,76 @@ def main(sysv_args=None):

status_parser(parser_status)
parser_status.set_defaults(action=("status", handle_status_args))
else:
parser.error("a subcommand is required")

args = parser.parse_args(args=sysv_args)
setattr(args, "skip_log_setup", False)
if not args.all_stages:
return sub_main(args)
return all_stages(parser)


def all_stages(parser):
"""Run all stages in a single process using an ordering protocol."""
LOG.info("Running cloud-init in single process mode.")
holmanb marked this conversation as resolved.
Show resolved Hide resolved

# this _must_ be called before sd_notify is called otherwise netcat may
# attempt to send "start" before a socket exists
sync = socket.SocketSync("local", "network", "config", "final")

# notify systemd that this stage has completed
socket.sd_notify("READY=1")
# wait for cloud-init-local.service to start
with sync("local"):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there ordering requirements inherent to cloud-init's different stages? If local must be run before network, for example, this would be an ideal place to encode these requirements.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there ordering requirements inherent to cloud-init's different stages?

Yes, these ordering requirements are inherent requirements. This order is already encoded in the init system. Currently cloud-init-local.service is ordered before cloud-init.service which is before cloud-config.service which is before cloud-final.service.

local must be run before network, for example, this would be an ideal place to encode these requirements.

@setharnold Maybe you had something specific in mind that I'm missing?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If these have to happen in lockstep order, maybe a variable to show what stage it's on:

stage=0
with sync("local"):
    if (stage++ < 0):
        error_and_exit("unexpected stage error")
    # stuff
with sync("network"):
    if (stage++ < 1):
        error_and_exit("network must follow local")
    # stuff
with sync("config"):
    if (stage++ < 2):
        error_and_exit("config must follow network")
    # stuff
with sync # over and over again
...

If the ordering is more vague, like "config" requires "local" and doesn't care about "network", then local_done and network_done and config_done and so on boolean variables to mark these completed in turn, then each one could check the status of the phases that it depends upon.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If these have to happen in lockstep order, maybe a variable to show what stage it's on:

stage=0
with sync("local"):
    if (stage++ < 0):
        error_and_exit("unexpected stage error")
    # stuff
with sync("network"):
    if (stage++ < 1):
        error_and_exit("network must follow local")
    # stuff
with sync("config"):
    if (stage++ < 2):
        error_and_exit("config must follow network")
    # stuff
with sync # over and over again
...

If the ordering is more vague, like "config" requires "local" and doesn't care about "network", then local_done and network_done and config_done and so on boolean variables to mark these completed in turn, then each one could check the status of the phases that it depends upon.

We could add code like this, but it wouldn't be possible for error_and_exit() to ever be called, regardless of the order that the messages are received on the unix sockets. This is single threaded code and it only has a single entry point. So in order for the network stage to run, the local stage has to have run.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I've just got less faith than you in the systemd-enforced ordering, that's all. If you're sure I'm over-thinking it, feel free to leave it out.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I've just got less faith than you in the systemd-enforced ordering, that's all.

Systemd is not the only thing enforcing ordering - it is already enforced in the Python code as well. If systemd triggers stages out of order, earlier stages will not get skipped - if "network" is triggered before "local", this Python code will wait until systemd triggers "local".

The reason is that the ordering of these stages isn't enforced in the definition of the context manager. It is enforced by the order that the context manager is invoked in - since with sync("local") is called prior to with sync("network") is called, the network stage cannot start until the local stage has completed. No parallalism / async / threading / coroutines / etc are in use; the code simply runs one stage after the next but not until systemd has signaled that it is time for the stage to run.

If you're sure I'm over-thinking it, feel free to leave it out.

I'm pretty sure that's what is happening, but I want to make sure you're comfortable with / understanding with how it works too. Hopefully the following annotated code helps.

with sync("local"):
   # "start" must have been received on the local socket to enter the context manager
   # (triggered by the one-liner in cloud-init-local.service)

# once the code gets to this point, the "local" stage has completed (and a response was
# sent in the context manager to the shim process in cloud-init-local.service)
with sync("network"):
   # "start" must have been received on the network socket to enter the context manager
   # (triggered by the one-liner in cloud-init-network.service) - in addition to
   # all of the code above being completed

# once the code gets to this point, the "network" stage has completed (and a response 
# was sent in the context manager to the shim process in cloud-init-network.service)
with sync("config"):
   # "start" must have been received on the config socket to enter the context manager
   # (triggered by the one-liner in cloud-config.service) - in addition to
   # the code above being completed

# once the code gets to this point, the "config" stage has completed (and a response 
# was sent in the context manager to the shim process in cloud-config.service)
with sync("final"):
   # "start" must have been received on the final socket to enter the context manager
   # (triggered by the one-liner in cloud-config.service) - in addition to
   # the code above being completed

So even if systemd triggers events out of order, cloud-init won't run them out of order. It will just wait until the next stage in the order has completed.

Please let me know if you have any questions.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh! now I can see why you're confident :) thanks!

# set up logger
args = parser.parse_args(args=["init", "--local"])
args.skip_log_setup = False
# run local stage
sync.systemd_exit_code = sub_main(args)

# wait for cloud-init-network.service to start
with sync("network"):
# skip re-setting up logger
args = parser.parse_args(args=["init"])
args.skip_log_setup = True
# run init stage
sync.systemd_exit_code = sub_main(args)

# wait for cloud-config.service to start
with sync("config"):
# skip re-setting up logger
args = parser.parse_args(args=["modules", "--mode=config"])
args.skip_log_setup = True
# run config stage
sync.systemd_exit_code = sub_main(args)

# wait for cloud-final.service to start
with sync("final"):
# skip re-setting up logger
args = parser.parse_args(args=["modules", "--mode=final"])
args.skip_log_setup = True
# run final stage
sync.systemd_exit_code = sub_main(args)

# signal completion to cloud-init-main.service
if sync.experienced_any_error:
message = "a stage of cloud-init exited non-zero"
if sync.first_exception:
message = f"first exception received: {sync.first_exception}"
socket.sd_notify(
f"STATUS=Completed with failure, {message}. Run 'cloud-init status"
" --long' for more details."
)
socket.sd_notify("STOPPING=1")
# exit 1 for a fatal failure in any stage
return 1
else:
socket.sd_notify("STATUS=Completed")
socket.sd_notify("STOPPING=1")


def sub_main(args):

# Subparsers.required = True and each subparser sets action=(name, functor)
(name, functor) = args.action
Expand Down
3 changes: 2 additions & 1 deletion cloudinit/cmd/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,8 +318,9 @@ def systemd_failed(wait: bool) -> bool:
for service in [
"cloud-final.service",
"cloud-config.service",
"cloud-init.service",
"cloud-init-network.service",
"cloud-init-local.service",
"cloud-init-main.service",
]:
try:
stdout = query_systemctl(
Expand Down
2 changes: 1 addition & 1 deletion cloudinit/config/cc_mounts.py
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,7 @@ def handle(name: str, cfg: Config, cloud: Cloud, args: list) -> None:
# fs_spec, fs_file, fs_vfstype, fs_mntops, fs-freq, fs_passno
uses_systemd = cloud.distro.uses_systemd()
default_mount_options = (
"defaults,nofail,x-systemd.after=cloud-init.service,_netdev"
"defaults,nofail,x-systemd.after=cloud-init-network.service,_netdev"
if uses_systemd
else "defaults,nobootwait"
)
Expand Down
4 changes: 2 additions & 2 deletions cloudinit/config/schemas/schema-cloud-config-v1.json
Original file line number Diff line number Diff line change
Expand Up @@ -2022,12 +2022,12 @@
},
"mount_default_fields": {
"type": "array",
"description": "Default mount configuration for any mount entry with less than 6 options provided. When specified, 6 items are required and represent ``/etc/fstab`` entries. Default: ``defaults,nofail,x-systemd.after=cloud-init.service,_netdev``",
"description": "Default mount configuration for any mount entry with less than 6 options provided. When specified, 6 items are required and represent ``/etc/fstab`` entries. Default: ``defaults,nofail,x-systemd.after=cloud-init-network.service,_netdev``",
"default": [
null,
null,
"auto",
"defaults,nofail,x-systemd.after=cloud-init.service",
"defaults,nofail,x-systemd.after=cloud-init-network.service",
"0",
"2"
],
Expand Down
3 changes: 3 additions & 0 deletions cloudinit/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,9 @@ def emit(self, record: logging.LogRecord):
def export_logs(self):
return copy.deepcopy(self.holder)

def clean_logs(self):
self.holder = defaultdict(list)

def flush(self):
pass

Expand Down
Loading
Loading