Skip to content

Commit

Permalink
Merge pull request #1947 from Chris-Peterson444/strict-top-level-keys
Browse files Browse the repository at this point in the history
Enforce top-level keys
  • Loading branch information
Chris-Peterson444 committed Mar 26, 2024
2 parents 5da4f9b + d812919 commit ce0683a
Show file tree
Hide file tree
Showing 3 changed files with 235 additions and 4 deletions.
12 changes: 11 additions & 1 deletion doc/reference/autoinstall-reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@
Autoinstall configuration reference manual
==========================================

The autoinstall file uses the YAML format. At the top level, it must be a mapping containing the keys described in this document. Unrecognised keys are ignored.
The autoinstall file uses the YAML format. At the top level, it must be a
mapping containing the keys described in this document. Unrecognised keys
are ignored in version 1, but will cause a fatal validation error in future
versions.


.. _ai-schema:

Expand All @@ -25,6 +29,12 @@ Several configuration keys are lists of commands to be executed. Each command ca
Top-level keys
--------------


.. warning::
In version 1, Subiquity will emit warnings when encountering unrecognised
keys. In later versions, a fatal validation error is thrown and the
installation will halt.

.. _ai-version:

version
Expand Down
100 changes: 98 additions & 2 deletions subiquity/server/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.

import asyncio
import copy
import logging
import os
import sys
import time
from typing import List, Optional
from typing import Any, List, Optional

import jsonschema
import yaml
Expand Down Expand Up @@ -553,8 +554,97 @@ async def apply_autoinstall_config(self, context):
await controller.apply_autoinstall_config()
await controller.configured()

def filter_autoinstall(
self,
autoinstall_config: dict[str, Any],
) -> tuple[dict[str, Any], dict[str, Any]]:
"""Separates autoinstall config by known and unknown keys"""

invalid_config: dict[str, Any] = copy.deepcopy(autoinstall_config)

# Pop keys of all loaded controllers, if they exists
for controller in self.controllers.instances:
invalid_config.pop(controller.autoinstall_key, None)

# Pop server keys, if they exist
for key in self.base_schema["properties"]:
invalid_config.pop(key, None)

valid_config: dict[str, Any] = copy.deepcopy(autoinstall_config)

# Remove the keys we isolated
for key in invalid_config:
valid_config.pop(key)

return (valid_config, invalid_config)

@with_context(name="top_level_keys")
def _enforce_top_level_keys(
self,
*,
autoinstall_config: dict[str, Any],
context: Context,
) -> dict[str, Any]:
"""Enforces usage of known top-level keys.
In Autoinstall v1, unknown top-level keys are removed from
the config and a cleaned config is returned.
In Autoinstall v2, unknown top-level keys result in a fatal
AutoinstallValidationError
Only checks for unrecognized keys, doesn't validate them.
Requires a version number, so this should be called after
validating against the schema.
"""

valid_config, invalid_config = self.filter_autoinstall(autoinstall_config)

# If no bad keys, return early
if len(invalid_config.keys()) == 0:
return autoinstall_config

# If the version is early enough, only warn
version: int = autoinstall_config["version"]

ctx = context
if version == 1:
# Warn then clean out bad keys and return

for key in invalid_config:
warning = f"Unrecognized top-level key {key!r}"
log.warning(warning)
ctx.warning(warning)

warning = (
"Unrecognized top-level keys will cause Autoinstall to "
"throw an error in future versions."
)
log.warning(warning)
ctx.warning(warning)

return valid_config

else:
for key in invalid_config:
error = f"Unrecognized top-level key {key!r}"
log.error(error)
ctx.error(error)

error = "Unrecognized top-level keys are unsupported"
log.error(error)
ctx.error(error)

raw_keys = (f"{key!r}" for key in invalid_config)
details: str = f"Unrecognized top-level key(s): {', '.join(raw_keys)}"
raise AutoinstallValidationError(
owner="top-level keys",
details=details,
)

def validate_autoinstall(self):
with self.context.child("core_validation", level="INFO"):
with self.context.child("core_validation", level="INFO") as ctx:
try:
jsonschema.validate(self.autoinstall_config, self.base_schema)
except ValidationError as original_exception:
Expand All @@ -567,6 +657,12 @@ def validate_autoinstall(self):

raise new_exception from original_exception

# Enforce top level keys after ensuring we have a version number
self.autoinstall_config = self._enforce_top_level_keys(
autoinstall_config=self.autoinstall_config,
context=ctx,
)

def load_autoinstall_config(self, *, only_early):
log.debug(
"load_autoinstall_config only_early %s file %s",
Expand Down
127 changes: 126 additions & 1 deletion subiquity/server/tests/test_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import os
import shlex
from typing import Any
from unittest.mock import AsyncMock, Mock, patch

import jsonschema
Expand Down Expand Up @@ -125,7 +126,9 @@ def test_bogus_autoinstall_argument(self):
with self.assertRaises(Exception):
self.server.select_autoinstall()

def test_early_commands_changes_autoinstall(self):
# Only care about changes to autoinstall, not validity
@patch("subiquity.server.server.SubiquityServer.validate_autoinstall")
def test_early_commands_changes_autoinstall(self, validate_mock):
self.server.controllers = Mock()
self.server.controllers.instances = []
rootpath = self.path(root_autoinstall_path)
Expand Down Expand Up @@ -166,6 +169,33 @@ async def asyncSetUp(self):
}
self.server.make_apport_report = Mock()

# Pseudo Load Controllers to avoid patching the loading logic for each
# controller when we still want access to class attributes
def pseudo_load_controllers(self):
controller_classes = []
for prefix in self.server.controllers.controller_names:
controller_classes.append(
self.server.controllers._get_controller_class(prefix)
)
self.server.controllers.instances = controller_classes

def load_config_and_controllers(
self, config: dict[str, Any]
) -> tuple[dict[str, Any], dict[str, Any]]:
"""Loads an autoinstall config and controllers.
Loads the provided autoinstall config and the controllers.
Returns the valid and invalid portions of the config.
"""
# Reset base schema
self.server.base_schema = SubiquityServer.base_schema

self.server.autoinstall_config = config

self.pseudo_load_controllers()

return self.server.filter_autoinstall(config)

def test_valid_schema(self):
"""Test that the expected autoinstall JSON schema is valid"""

Expand Down Expand Up @@ -232,6 +262,101 @@ async def test_autoinstall_validation__no_error_report(self):
error = NonReportableError.from_exception(exception)
self.assertEqual(error, self.server.nonreportable_error)

@patch("subiquity.server.server.log")
async def test_autoinstall_validation__strict_top_level_keys_warn(self, log_mock):
"""Test strict top-level key enforcement warnings in v1"""

bad_ai_data = {
"version": 1,
"interactive-sections": ["identity"],
"apt": "Invalid but deferred",
"literally-anything": "lmao",
}

good, bad = self.load_config_and_controllers(bad_ai_data)

# OK in Version 1 but ensure warnings and stripped config
self.server.validate_autoinstall()
log_mock.warning.assert_called()
log_mock.error.assert_not_called()
self.assertEqual(self.server.autoinstall_config, good)
self.assertEqual(bad, {"literally-anything": "lmao"})

@patch("subiquity.server.server.log")
async def test_autoinstall_validation__strict_top_level_keys_error(self, log_mock):
"""Test strict top-level key enforcement errors in v2 or greater"""

bad_ai_data = {
"version": 2,
"interactive-sections": ["identity"],
"apt": "Invalid but deferred",
"literally-anything": "lmao",
}

self.load_config_and_controllers(bad_ai_data)

# TODO: remove once V2 is enabled
self.server.base_schema["properties"]["version"]["maximum"] = 2

# Not OK in Version >= 2
with self.assertRaises(AutoinstallValidationError) as ctx:
self.server.validate_autoinstall()

self.assertIn("top-level keys", str(ctx.exception))

log_mock.error.assert_called()
log_mock.warning.assert_not_called()

@parameterized.expand(
(
(
# Case 1: extra key "some-bad-key"
{
"version": 1,
"interactive-sections": ["identity"],
"apt": "...",
"some-bad-key": "...",
},
{
"version": 1,
"interactive-sections": ["identity"],
"apt": "...",
},
{"some-bad-key": "..."},
),
(
# Case 2: no bad keys
{
"version": 1,
"interactive-sections": ["identity"],
"apt": "...",
},
{
"version": 1,
"interactive-sections": ["identity"],
"apt": "...",
},
{},
),
(
# Case 3: no good keys
{"some-bad-key": "..."},
{},
{"some-bad-key": "..."},
),
)
)
async def test_autoinstall_validation__filter_autoinstall(self, config, good, bad):
"""Test autoinstall config filtering"""

self.server.base_schema = SubiquityServer.base_schema
self.pseudo_load_controllers()

valid, invalid = self.server.filter_autoinstall(config)

self.assertEqual(valid, good)
self.assertEqual(invalid, bad)


class TestMetaController(SubiTestCase):
async def test_interactive_sections_not_present(self):
Expand Down

0 comments on commit ce0683a

Please sign in to comment.