diff --git a/doc/reference/autoinstall-reference.rst b/doc/reference/autoinstall-reference.rst
index cfea33846..9fdca7c5f 100644
--- a/doc/reference/autoinstall-reference.rst
+++ b/doc/reference/autoinstall-reference.rst
@@ -3,7 +3,11 @@
Autoinstall configuration reference manual
==========================================
-The autoinstall file uses the YAML format. At the top level, it must be a mapping containing the keys described in this document. Unrecognised keys are ignored.
+The autoinstall file uses the YAML format. At the top level, it must be a
+mapping containing the keys described in this document. Unrecognised keys
+are ignored in version 1, but will cause a fatal validation error in future
+versions.
+
.. _ai-schema:
@@ -25,6 +29,12 @@ Several configuration keys are lists of commands to be executed. Each command ca
Top-level keys
--------------
+
+.. warning::
+ In version 1, Subiquity will emit warnings when encountering unrecognised
+ keys. In later versions, a fatal validation error is thrown and the
+ installation will halt.
+
.. _ai-version:
version
diff --git a/subiquity/server/server.py b/subiquity/server/server.py
index cdee8ff0f..826005d9f 100644
--- a/subiquity/server/server.py
+++ b/subiquity/server/server.py
@@ -14,11 +14,12 @@
# along with this program. If not, see .
import asyncio
+import copy
import logging
import os
import sys
import time
-from typing import List, Optional
+from typing import Any, List, Optional
import jsonschema
import yaml
@@ -552,8 +553,97 @@ async def apply_autoinstall_config(self, context):
await controller.apply_autoinstall_config()
await controller.configured()
+ def filter_autoinstall(
+ self,
+ autoinstall_config: dict[str, Any],
+ ) -> tuple[dict[str, Any], dict[str, Any]]:
+ """Separates autoinstall config by known and uknown keys"""
+
+ invalid_config: dict[str, Any] = copy.deepcopy(autoinstall_config)
+
+ # Pop keys of all loaded controllers, if they exists
+ for controller in self.controllers.instances:
+ invalid_config.pop(controller.autoinstall_key, None)
+
+ # Pop server keys, if they exist
+ for key in self.base_schema["properties"]:
+ invalid_config.pop(key, None)
+
+ valid_config: dict[str, Any] = copy.deepcopy(autoinstall_config)
+
+ # Remove the keys we isolated
+ for key in invalid_config:
+ valid_config.pop(key)
+
+ return (valid_config, invalid_config)
+
+ @with_context(name="top_level_keys")
+ def _enforce_top_level_keys(
+ self,
+ *,
+ autoinstall_config: dict[str, Any],
+ context: Context,
+ ) -> dict[str, Any]:
+ """Enforces usage of known top-level keys.
+
+ In Autoinstall v1, unknown top-level keys are removed from
+ the config and a cleaned config is returned.
+
+ In Autoinstall v2, unknown top-level keys result in a fatal
+ AutoinstallValidationError
+
+ Only checks for unrecognized keys, doesn't validate them.
+ Requires a version number, so this should be called after
+ validating against the schema.
+
+ """
+
+ valid_config, invalid_config = self.filter_autoinstall(autoinstall_config)
+
+ # If no bad keys, return early
+ if len(invalid_config.keys()) == 0:
+ return autoinstall_config
+
+ # If the version is early enough, only warn
+ version: int = autoinstall_config["version"]
+
+ ctx = context
+ if version == 1:
+ # Warn then clean out bad keys and return
+
+ for key in invalid_config:
+ warning = f"Unrecognized top-level key {key!r}"
+ log.warning(warning)
+ ctx.warning(warning)
+
+ warning = (
+ "Unrecognized top-level keys will cause Autoinstall to "
+ "throw an error in future versions."
+ )
+ log.warning(warning)
+ ctx.warning(warning)
+
+ return valid_config
+
+ else:
+ for key in invalid_config:
+ error = f"Unrecognized top-level key {key!r}"
+ log.error(error)
+ ctx.error(error)
+
+ error = "Unrecognized top-level keys are unsupported"
+ log.error(error)
+ ctx.error(error)
+
+ raw_keys = (f"{key!r}" for key in invalid_config)
+ details: str = f"Unrecognized top-level key(s): {', '.join(raw_keys)}"
+ raise AutoinstallValidationError(
+ owner="top-level keys",
+ details=details,
+ )
+
def validate_autoinstall(self):
- with self.context.child("core_validation", level="INFO"):
+ with self.context.child("core_validation", level="INFO") as ctx:
try:
jsonschema.validate(self.autoinstall_config, self.base_schema)
except ValidationError as original_exception:
@@ -566,6 +656,12 @@ def validate_autoinstall(self):
raise new_exception from original_exception
+ # Enforce top level keys after ensuring we have a version number
+ self.autoinstall_config = self._enforce_top_level_keys(
+ autoinstall_config=self.autoinstall_config,
+ context=ctx,
+ )
+
def load_autoinstall_config(self, *, only_early):
log.debug(
"load_autoinstall_config only_early %s file %s",
diff --git a/subiquity/server/tests/test_server.py b/subiquity/server/tests/test_server.py
index 292980f57..17d893176 100644
--- a/subiquity/server/tests/test_server.py
+++ b/subiquity/server/tests/test_server.py
@@ -13,8 +13,10 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see .
+import copy
import os
import shlex
+from typing import Any
from unittest.mock import AsyncMock, Mock, patch
import jsonschema
@@ -125,7 +127,9 @@ def test_bogus_autoinstall_argument(self):
with self.assertRaises(Exception):
self.server.select_autoinstall()
- def test_early_commands_changes_autoinstall(self):
+ # Only care about changes to autoinstall, not validity
+ @patch("subiquity.server.server.SubiquityServer.validate_autoinstall")
+ def test_early_commands_changes_autoinstall(self, validate_mock):
self.server.controllers = Mock()
self.server.controllers.instances = []
rootpath = self.path(root_autoinstall_path)
@@ -166,6 +170,33 @@ async def asyncSetUp(self):
}
self.server.make_apport_report = Mock()
+ # Pseudo Load Controllers to avoid patching the loading logic for each
+ # controller when we still want access to class attributes
+ def pseudo_load_controllers(self):
+ controller_classes = []
+ for prefix in self.server.controllers.controller_names:
+ controller_classes.append(
+ self.server.controllers._get_controller_class(prefix)
+ )
+ self.server.controllers.instances = controller_classes
+
+ def load_config_and_controllers(
+ self, config: dict[str, Any]
+ ) -> tuple[dict[str, Any], dict[str, Any]]:
+ """Loads an autoinstall config and controllers.
+
+ Loads the provided autoinstall config and the controllers.
+ Returns the valid and invalid portions of the config.
+ """
+ # Reset base schema
+ self.server.base_schema = SubiquityServer.base_schema
+
+ self.server.autoinstall_config = config
+
+ self.pseudo_load_controllers()
+
+ return self.server.filter_autoinstall(config)
+
def test_valid_schema(self):
"""Test that the expected autoinstall JSON schema is valid"""
@@ -232,6 +263,70 @@ async def test_autoinstall_validation__no_error_report(self):
error = NonReportableError.from_exception(exception)
self.assertEqual(error, self.server.nonreportable_error)
+ @patch("subiquity.server.server.log")
+ async def test_autoinstall_validation__strict_top_level_keys_warn(self, log_mock):
+ """Test strict top-level key enforcement warnings in v1"""
+
+ bad_ai_data = {
+ "version": 1,
+ "interactive-sections": ["identity"],
+ "apt": "Invalid but deferred",
+ "literally-anything": "lmao",
+ }
+
+ good, bad = self.load_config_and_controllers(bad_ai_data)
+
+ # OK in Version 1 but ensure warnings and stripped config
+ self.server.validate_autoinstall()
+ log_mock.warning.assert_called()
+ log_mock.error.assert_not_called()
+ self.assertEqual(self.server.autoinstall_config, good)
+
+ @patch("subiquity.server.server.log")
+ async def test_autoinstall_validation__strict_top_level_keys_error(self, log_mock):
+ """Test strict top-level key enforcement errors in v2 or greater"""
+
+ bad_ai_data = {
+ "version": 2,
+ "interactive-sections": ["identity"],
+ "apt": "Invalid but deferred",
+ "literally-anything": "lmao",
+ }
+
+ self.load_config_and_controllers(bad_ai_data)
+
+ # TODO: remove once V2 is enabled
+ self.server.base_schema["properties"]["version"]["maximum"] = 2
+
+ # Not OK in Version >= 2
+ with self.assertRaises(AutoinstallValidationError) as ctx:
+ self.server.validate_autoinstall()
+
+ self.assertIn("top-level keys", str(ctx.exception))
+
+ log_mock.error.assert_called()
+ log_mock.warning.assert_not_called()
+
+ async def test_autoinstall_validation__filter_autoinstall(self):
+ """Test autoinstall config filtering"""
+
+ # Mixed data: has base sections, controller section, and a bad key
+ autoinstall_config = {
+ "version": 1, # stay
+ "interactive-sections": ["identity"], # stay
+ "apt": "...", # strip
+ "some-bad-key": "...", # stay
+ }
+
+ self.server.base_schema = SubiquityServer.base_schema
+ self.pseudo_load_controllers()
+
+ valid, invalid = self.server.filter_autoinstall(autoinstall_config)
+
+ real_valid_keys = set(("version", "interactive-sections", "apt"))
+ self.assertEqual(set(valid.keys()), real_valid_keys)
+ self.assertEqual(set(invalid.keys()), set(("some-bad-key",)))
+
class TestMetaController(SubiTestCase):
async def test_interactive_sections_not_present(self):