Skip to content

Commit

Permalink
Overwrite extension heartbeat with policy error (#3321)
Browse files Browse the repository at this point in the history
* Overwrite heartbeat and add UT

* Add policy to history folder

* Save policy to history in policy_engine.py

* Fix UT failures

* Add UT

* Address review comments

* Skip AzureSecurityLinuxAgent on flatcar

* Address review comments 2

* Cleanup whitespace

* Update handler status message

* Address comment
  • Loading branch information
mgunnala authored Mar 4, 2025
1 parent 0d7c8a7 commit e2caad2
Show file tree
Hide file tree
Showing 8 changed files with 158 additions and 24 deletions.
4 changes: 4 additions & 0 deletions azurelinuxagent/common/protocol/goal_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,10 @@ def remote_access(self):
else:
return self._remote_access

@property
def history(self):
return self._history

def fetch_agent_manifest(self, family_name, uris):
"""
This is a convenience method that wraps WireClient.fetch_manifest(), but adds the required 'use_verify_header' parameter and saves
Expand Down
28 changes: 24 additions & 4 deletions azurelinuxagent/ga/exthandlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -508,9 +508,14 @@ def handle_ext_handlers(self, goal_state_id):

# Instantiate policy engine, and use same engine to handle all extension handlers. If an error is thrown during
# policy engine initialization, we block all extensions and report the error via handler status for each extension.
# Save policy to history folder.
policy_error = None
try:
gs_history = self.protocol.get_goal_state().history
policy_engine = ExtensionPolicyEngine()
if policy_engine is not None and policy_engine.policy_file_contents is not None and gs_history is not None:
gs_history.save(policy_engine.policy_file_contents, "waagent_policy.json")

except Exception as ex:
policy_error = ex

Expand Down Expand Up @@ -776,7 +781,6 @@ def __handle_ext_disallowed_error(self, ext_handler_i, error_code, report_op, me
# Only report extension status for install errors of extensions with settings. Disable/uninstall errors are
# reported at the handler status level only.
if extension is not None and ext_handler_i.ext_handler.state == ExtensionRequestedState.Enabled:
# TODO: if extension is reporting heartbeat, it overwrites status. Consider overwriting heartbeat here.
# Overwrite any existing status file to reflect the failure accurately.
ext_handler_i.create_status_file(extension, status=ExtensionStatusValue.error, code=error_code,
operation=ext_handler_i.operation, message=message, overwrite=True)
Expand Down Expand Up @@ -1089,14 +1093,30 @@ def report_ext_handler_status(self, vm_status, ext_handler, goal_state_changed):

# Since we require reading the Manifest for reading the heartbeat, this would fail if HandlerManifest not found.
# Only try to read heartbeat if HandlerState != NotInstalled.
# If extension is disallowed, concatenate the heartbeat message to the existing handler status message, and
# do not override handler error code or status with heartbeat.
if handler_state != ExtHandlerState.NotInstalled:
# Heartbeat is a handler level thing only, so we dont need to modify this
# Heartbeat is a handler level thing only, so we don't need to modify this
try:
heartbeat = ext_handler_i.collect_heartbeat()
if heartbeat is not None:
handler_status.status = heartbeat.get('status')
if ext_disallowed:
pass # The status already specifies that the extension is disallowed ('NotReady')
else:
handler_status.status = heartbeat.get('status')

if 'formattedMessage' in heartbeat:
handler_status.message = parse_formatted_message(heartbeat.get('formattedMessage'))
heartbeat_message = parse_formatted_message(heartbeat.get('formattedMessage'))
if ext_disallowed:
# If extension is disallowed, the agent should set the handler status message on behalf of the
# extension, handler_status.message should not be None.
if handler_status.message is None:
handler_status.message = "Extension was not executed, but it was previously enabled and reported the following heartbeat:\n{0}".format(heartbeat_message)
else:
handler_status.message += " Extension was previously enabled and reported the following heartbeat:\n{0}".format(heartbeat_message)
else:
handler_status.message = heartbeat_message

except ExtensionError as e:
ext_handler_i.set_handler_status(message=ustr(e), code=e.code)

Expand Down
33 changes: 22 additions & 11 deletions azurelinuxagent/ga/policy/policy_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,19 +44,27 @@ def __init__(self, msg, inner=None):
msg = "Customer-provided policy file ('{0}') is invalid, please correct the following error: {1}".format(conf.get_policy_file_path(), msg)
super(InvalidPolicyError, self).__init__(msg, inner)


class _PolicyEngine(object):
"""
Implements base policy engine API.
"""
def __init__(self):
# Set defaults for policy
"""
Initialize policy engine: if policy enforcement is enabled, read and parse policy file.
"""
self._policy_enforcement_enabled = self.__get_policy_enforcement_enabled()
self._policy_file_contents = None # Raw policy file contents will be saved in __read_policy()
if not self.policy_enforcement_enabled:
return

_PolicyEngine._log_policy_event("Policy enforcement is enabled.")
self._policy = self._parse_policy(self.__read_policy())

@property
def policy_file_contents(self):
return self._policy_file_contents

@staticmethod
def _log_policy_event(msg, is_success=True, op=WALAEventOperation.Policy, send_event=True):
"""
Expand All @@ -80,30 +88,33 @@ def __get_policy_enforcement_enabled():
def policy_enforcement_enabled(self):
return self._policy_enforcement_enabled

@staticmethod
def __read_policy():
def __read_policy(self):
"""
Read customer-provided policy JSON file, load and return as a dict.
Policy file is expected to be at conf.get_policy_file_path(). Note that this method should only be called
after verifying that the file exists (currently done in __init__).
Raise InvalidPolicyError if JSON is invalid, or any exceptions are thrown while reading the file.
"""
with open(conf.get_policy_file_path(), 'r') as f:
try:
contents = f.read()
# TODO: Consider copying the policy file contents to the history folder, and only log the policy locally
# in the case of policy-related failure.
self._policy_file_contents = f.read()
_PolicyEngine._log_policy_event(
"Enforcing policy using policy file found at '{0}'. File contents:\n{1}"
.format(conf.get_policy_file_path(), contents))
"Enforcing policy using policy file found at '{0}'.".format(conf.get_policy_file_path()))

# json.loads will raise error if file contents are not a valid json (including empty file).
custom_policy = json.loads(contents)
custom_policy = json.loads(self._policy_file_contents)

except ValueError as ex:
msg = "policy file does not conform to valid json syntax"
msg = "policy file does not conform to valid json syntax."
if self._policy_file_contents is not None:
msg += " File contents: {0}".format(self._policy_file_contents)
raise InvalidPolicyError(msg=msg, inner=ex)
except Exception as ex:
msg = "unable to read policy file"
msg = "unable to read or load policy file."
if self._policy_file_contents is not None:
msg += " File contents: {0}".format(self._policy_file_contents)
raise InvalidPolicyError(msg=msg, inner=ex)

return custom_policy
Expand Down
86 changes: 85 additions & 1 deletion tests/ga/test_extension.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,11 @@
from azurelinuxagent.common.protocol import wire
from azurelinuxagent.common.protocol.wire import WireProtocol, InVMArtifactsProfile
from azurelinuxagent.common.utils.restutil import KNOWN_WIRESERVER_IP
from azurelinuxagent.common.utils.archive import ARCHIVE_DIRECTORY_NAME

from azurelinuxagent.ga.exthandlers import ExtHandlerInstance, migrate_handler_state, \
get_exthandlers_handler, ExtCommandEnvVariable, HandlerManifest, NOT_RUN, \
ExtensionStatusValue, HANDLER_COMPLETE_NAME_PATTERN, HandlerEnvironment, GoalStateStatus
ExtensionStatusValue, HANDLER_COMPLETE_NAME_PATTERN, HandlerEnvironment, GoalStateStatus, ExtHandlerState

from tests.lib import wire_protocol_data
from tests.lib.mock_wire_protocol import mock_wire_protocol, MockHttpResponse
Expand Down Expand Up @@ -3715,6 +3716,89 @@ def test_uninstall_should_succeed_if_extension_allowed(self):
vm_status = args[0]
self.assertEqual(0, len(vm_status.vmAgent.extensionHandlers))

def test_should_report_both_policy_failure_and_heartbeat_in_status(self):
"""
If an extension reporting heartbeat is blocked by policy, the agent should report policy failure status and
concatenate the extension heartbeat.
"""

# Mock collect_heartbeat() to return heartbeat in test file
test_file = os.path.join(self.tmp_dir, "ext_heartbeat.json")
def mock_collect_heartbeat():
heartbeat_json = fileutil.read_file(test_file)
heartbeat = json.loads(heartbeat_json)[0]['heartbeat']
return heartbeat

# Create a mock heartbeat file reported by an extension
extension_heartbeat = [
{
"version": 1.0,
"heartbeat": {
"status": "ready",
"code": 0,
"formattedMessage": {
"lang": "en-US",
"message": "This is a heartbeat message"
}
}
}
]
with open(test_file, 'w') as out_file:
out_file.write(json.dumps(extension_heartbeat))

# Try to install a disallowed extension, then assert that policy failure status is reported instead of heartbeat
with patch("azurelinuxagent.ga.exthandlers.ExtHandlerInstance.get_heartbeat_file", return_value=test_file):
with patch("azurelinuxagent.ga.exthandlers.ExtHandlerInstance.collect_heartbeat", side_effect=mock_collect_heartbeat):
with patch("azurelinuxagent.ga.exthandlers.ExtHandlerInstance.get_handler_state", return_value=ExtHandlerState.Enabled):
policy = \
{
"policyVersion": "0.1.0",
"extensionPolicies": {
"allowListedExtensionsOnly": True
}
}

with mock_wire_protocol(wire_protocol_data.DATA_FILE) as protocol:
exthandlers_handler = get_exthandlers_handler(protocol)
self._create_policy_file(policy)
exthandlers_handler.run()

# Assert that agent is reporting the expected handler status, heartbeat should be concatenated to policy failure message
vm_status = exthandlers_handler.report_ext_handlers_status()
ext_handler = vm_status.vmAgent.extensionHandlers[0]
self.assertTrue("failed to run extension 'OSTCExtensions.ExampleHandlerLinux' because it is not specified as an allowed extension" in ext_handler.message,
"Should have reported policy failure")
self.assertTrue("Extension was previously enabled and reported the following heartbeat:" in ext_handler.message,
"Should have concatenated heartbeat to handler status message")
self.assertEqual(ext_handler.status, "NotReady", "Heartbeat should not have overridden policy failure status")
self.assertEqual(ext_handler.code, ExtensionErrorCodes.PluginEnableProcessingFailed, "Heartbeat should not have overridden policy failure code")

def test_should_save_policy_file_to_history_directory(self):
policy_file_name = "waagent_policy.json"
policy = {
"policyVersion": "0.1.0"
}
self._create_policy_file(policy)

# Policy file should be written to history folder when extensions are processed
with mock_wire_protocol(wire_protocol_data.DATA_FILE_VM_SETTINGS, save_to_history=True) as protocol:
# Update goal state with incarnation and etag, so we can search for the correct history folder
protocol.mock_wire_data.set_incarnation(999)
protocol.mock_wire_data.set_etag(888)
protocol.client.update_goal_state()
exthandlers_handler = get_exthandlers_handler(protocol)
with patch('azurelinuxagent.ga.policy.policy_engine.conf.get_enable_overprovisioning', return_value=False):
exthandlers_handler.run()

# Assert that policy file was copied to history folder.
root_dir = os.path.join(conf.get_lib_dir(), ARCHIVE_DIRECTORY_NAME)
matches = glob.glob(os.path.join(root_dir, "*_999-888"))
self.assertTrue(len(matches) == 1)
history_dir = matches[0]
file_path = os.path.join(history_dir, policy_file_name)
self.assertTrue(os.path.exists(file_path), "Policy file was not copied to history folder")
with open(file_path, mode='r') as f:
self.assertEqual(policy, json.load(f))

if __name__ == '__main__':
unittest.main()
4 changes: 2 additions & 2 deletions tests/ga/test_policy_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@
# Requires Python 2.4+ and Openssl 1.0+
#

import os
import json
import os

from tests.lib.tools import AgentTestCase
from azurelinuxagent.ga.policy.policy_engine import ExtensionPolicyEngine, InvalidPolicyError, \
_PolicyEngine, _DEFAULT_ALLOW_LISTED_EXTENSIONS_ONLY, _DEFAULT_SIGNATURE_REQUIRED
from tests.lib.tools import AgentTestCase
from tests.lib.tools import patch

TEST_EXTENSION_NAME = "Microsoft.Azure.ActiveDirectory.AADSSHLoginForLinux"
Expand Down
5 changes: 1 addition & 4 deletions tests_e2e/orchestrator/scripts/collect-logs
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,13 @@ echo "Collecting logs to $logs_file_name ..."
PYTHON=$(get-agent-python)
waagent_conf=$($PYTHON -c 'from azurelinuxagent.common.osutil import get_osutil; print(get_osutil().agent_conf_file_path)')

# TODO: instead of collecting /etc/waagent_policy.json here, consider adding it to goal state history, since it can
# change per goal state.
tar --exclude='journal/*' --exclude='omsbundle' --exclude='omsagent' --exclude='mdsd' --exclude='scx*' \
--exclude='*.so' --exclude='*__LinuxDiagnostic__*' --exclude='*.zip' --exclude='*.deb' --exclude='*.rpm' \
--warning=no-file-changed \
-czf "$logs_file_name" \
/var/log \
/var/lib/waagent/ \
$waagent_conf \
/etc/waagent_policy.json
$waagent_conf

set -euxo pipefail

Expand Down
18 changes: 17 additions & 1 deletion tests_e2e/tests/ext_policy/ext_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,13 @@ def run(self):
None
)

# AzureSecurityLinuxAgent is an extension that reports heartbeat.
azure_security = ExtPolicy.TestCase(
VirtualMachineExtensionClient(self._context.vm, VmExtensionIds.AzureSecurityLinuxAgent,
resource_name="AzureSecurityLinuxAgent"),
{}
)

# An earlier test suite may have left behind extensions; cleanup any leftovers to test a "fresh" installation
# for each extension in this suite.
log.info("")
Expand Down Expand Up @@ -336,11 +343,13 @@ def run(self):
self._create_policy_file(policy)
self._operation_should_fail("enable", custom_script)

# This policy tests the following scenario:
# This policy tests the following scenarios:
# - allow and enable an extension that reports heartbeat (AzureSecurityLinuxAgent) -> should succeed"
# - allow a previously-disallowed single-config extension (CustomScript), then enable again -> should succeed
log.info("")
log.info("*** Begin test case 4")
log.info("This policy tests the following scenario:")
log.info(" - allow and enable an extension that reports heartbeat (AzureSecurityLinuxAgent) -> should succeed")
log.info(" - allow a previously-disallowed single-config extension (CustomScript), then enable again -> should succeed")
policy = \
{
Expand All @@ -350,19 +359,24 @@ def run(self):
"signatureRequired": False,
"extensions": {
"Microsoft.Azure.Extensions.CustomScript": {},
"Microsoft.Azure.Security.Monitoring.AzureSecurityLinuxAgent": {},
# GuestConfiguration is added to all VMs for security requirements, so we always allow it.
"Microsoft.GuestConfiguration.ConfigurationforLinux": {}
}
}
}
self._create_policy_file(policy)
if VmExtensionIds.AzureSecurityLinuxAgent.supports_distro(distro):
self._operation_should_succeed("enable", azure_security)
self._operation_should_succeed("enable", custom_script)

# This policy tests the following scenarios:
# - disallow a previously-enabled extension that reports heartbeat (AzureSecurityLinuxAgent), then try to enable again -> should fail
# - disallow a previously-enabled single-config extension (CustomScript), then try to delete -> should reach timeout
log.info("")
log.info("*** Begin test case 5")
log.info("This policy tests the following scenarios:")
log.info(" - disallow a previously-enabled extension that reports heartbeat (AzureSecurityLinuxAgent), then try to enable again -> should fail")
log.info(" - disallow a previously-enabled single-config extension (CustomScript), then try to delete -> should reach timeout")
policy = \
{
Expand All @@ -377,6 +391,8 @@ def run(self):
}
}
self._create_policy_file(policy)
if VmExtensionIds.AzureSecurityLinuxAgent.supports_distro(distro):
self._operation_should_fail("enable", azure_security)
# Because this request marks CSE for deletion, the next operation must be a delete retry (enable will fail).
self._operation_should_fail("delete", custom_script)

Expand Down
4 changes: 3 additions & 1 deletion tests_e2e/tests/lib/vm_extension_identifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ def __init__(self, publisher: str, ext_type: str, version: str):
unsupported_distros: Dict[str, List[str]] = {
"Microsoft.OSTCExtensions.VMAccessForLinux": ["flatcar"],
"Microsoft.Azure.Monitor.AzureMonitorLinuxAgent": ["flatcar", "mariner_1", "ubuntu_2404", "sles_15"],
"Microsoft.GuestConfiguration.ConfigurationforLinux": ["flatcar"]
"Microsoft.GuestConfiguration.ConfigurationforLinux": ["flatcar"],
"Microsoft.Azure.Security.Monitoring.AzureSecurityLinuxAgent": ["flatcar"]
}

def supports_distro(self, system_info: str) -> bool:
Expand Down Expand Up @@ -68,3 +69,4 @@ class VmExtensionIds(object):
AzureMonitorLinuxAgent: VmExtensionIdentifier = VmExtensionIdentifier(publisher='Microsoft.Azure.Monitor', ext_type='AzureMonitorLinuxAgent', version="1.5")
GATestExtension: VmExtensionIdentifier = VmExtensionIdentifier(publisher='Microsoft.Azure.Extensions.Edp', ext_type='GATestExtGo', version="1.2")
GuestConfig: VmExtensionIdentifier = VmExtensionIdentifier(publisher='Microsoft.GuestConfiguration', ext_type='ConfigurationforLinux', version="1.0")
AzureSecurityLinuxAgent: VmExtensionIdentifier = VmExtensionIdentifier(publisher='Microsoft.Azure.Security.Monitoring', ext_type='AzureSecurityLinuxAgent', version="2.0")

0 comments on commit e2caad2

Please sign in to comment.