-
-
Notifications
You must be signed in to change notification settings - Fork 81
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add
default_context
to secret engines
- Loading branch information
Showing
5 changed files
with
305 additions
and
4 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,100 @@ | ||
from unittest.mock import patch, Mock | ||
from django.test import SimpleTestCase | ||
from zentral.conf.config import ConfigDict | ||
from zentral.core.exceptions import ImproperlyConfigured | ||
from zentral.core.secret_engines import decrypt_str, encrypt_str, secret_engines | ||
from zentral.core.secret_engines.backends.aws_kms import SecretEngine | ||
|
||
|
||
class AWSKMSSecretEngineTestCase(SimpleTestCase): | ||
def test_init_key_id_uuid(self): | ||
secret_engines.load_config(ConfigDict({ | ||
"aws": {"backend": "zentral.core.secret_engines.backends.aws_kms", | ||
"key_id": "8ee44b97-8475-475c-bb1e-ac9198b2451d", | ||
"region_name": "us-east-1", | ||
"default_context": {"un": "1"}} | ||
})) | ||
secret_engine = secret_engines.default_secret_engine | ||
self.assertIsInstance(secret_engine, SecretEngine) | ||
self.assertEqual(secret_engine.client_kwargs["region_name"], "us-east-1") | ||
self.assertEqual(secret_engine.default_context, {"un": "1"}) | ||
|
||
def test_init_key_id_arn(self): | ||
secret_engines.load_config({ | ||
"aws": {"backend": "zentral.core.secret_engines.backends.aws_kms", | ||
"key_id": "arn:aws:kms:eu-central-1:000000000000:key/8ee44b97-8475-475c-bb1e-ac9198b2451d", | ||
"region_name": "us-east-1"} # ignored | ||
}) | ||
secret_engine = secret_engines.default_secret_engine | ||
self.assertIsInstance(secret_engine, SecretEngine) | ||
self.assertEqual(secret_engine.client_kwargs["region_name"], "eu-central-1") | ||
self.assertEqual(secret_engine.default_context, {}) | ||
|
||
def test_init_default_context_not_a_dict(self): | ||
with self.assertRaises(ImproperlyConfigured) as cm: | ||
secret_engines.load_config({ | ||
"aws": {"backend": "zentral.core.secret_engines.backends.aws_kms", | ||
"key_id": "8ee44b97-8475-475c-bb1e-ac9198b2451d", | ||
"region_name": "us-east-1", | ||
"default_context": "un"} | ||
}) | ||
self.assertEqual(cm.exception.args[0], "Default context is not a dict") | ||
|
||
def test_init_default_context_not_a_dict_str_str(self): | ||
with self.assertRaises(ImproperlyConfigured) as cm: | ||
secret_engines.load_config({ | ||
"aws": {"backend": "zentral.core.secret_engines.backends.aws_kms", | ||
"key_id": "8ee44b97-8475-475c-bb1e-ac9198b2451d", | ||
"region_name": "us-east-1", | ||
"default_context": {"un": 1}} | ||
}) | ||
self.assertEqual(cm.exception.args[0], "Default context is not a dict[str, str]") | ||
|
||
def test_prepared_context(self): | ||
secret_engines.load_config(ConfigDict({ | ||
"aws": {"backend": "zentral.core.secret_engines.backends.aws_kms", | ||
"key_id": "8ee44b97-8475-475c-bb1e-ac9198b2451d", | ||
"region_name": "us-east-1", | ||
"default_context": {"un": "1"}} | ||
})) | ||
secret_engine = secret_engines.default_secret_engine | ||
self.assertEqual( | ||
secret_engine._prepared_context({"un": "0", "deux": 2}), | ||
{"un": "0", "deux": "2"} | ||
) | ||
|
||
@patch("zentral.core.secret_engines.backends.aws_kms.boto3.client") | ||
def test_encrypt_str(self, boto3_client): | ||
mocked_client = Mock() | ||
mocked_client.encrypt.return_value = {"CiphertextBlob": b"fomo"} | ||
boto3_client.return_value = mocked_client | ||
secret_engines.load_config({ | ||
"aws": {"backend": "zentral.core.secret_engines.backends.aws_kms", | ||
"key_id": "8ee44b97-8475-475c-bb1e-ac9198b2451d", | ||
"region_name": "us-east-1", | ||
"default_context": {"un": "1"}} | ||
}) | ||
self.assertEqual(encrypt_str("yolo"), "aws$Zm9tbw==") | ||
mocked_client.encrypt.assert_called_once_with( | ||
KeyId="8ee44b97-8475-475c-bb1e-ac9198b2451d", | ||
Plaintext=b"yolo", | ||
EncryptionContext={"un": "1"}, | ||
EncryptionAlgorithm='SYMMETRIC_DEFAULT' | ||
) | ||
|
||
@patch("zentral.core.secret_engines.backends.aws_kms.boto3.client") | ||
def test_decrypt_str(self, boto3_client): | ||
mocked_client = Mock() | ||
mocked_client.decrypt.return_value = {"Plaintext": b"yolo"} | ||
boto3_client.return_value = mocked_client | ||
secret_engines.load_config({ | ||
"aws": {"backend": "zentral.core.secret_engines.backends.aws_kms", | ||
"key_id": "arn:aws:kms:eu-central-1:000000000000:key/8ee44b97-8475-475c-bb1e-ac9198b2451d"} | ||
}) | ||
self.assertEqual(decrypt_str("aws$Zm9tbw=="), "yolo") | ||
mocked_client.decrypt.assert_called_once_with( | ||
KeyId="arn:aws:kms:eu-central-1:000000000000:key/8ee44b97-8475-475c-bb1e-ac9198b2451d", | ||
CiphertextBlob=b"fomo", | ||
EncryptionContext={}, | ||
EncryptionAlgorithm='SYMMETRIC_DEFAULT' | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,178 @@ | ||
from unittest.mock import patch, Mock | ||
from django.test import SimpleTestCase | ||
from zentral.conf.config import ConfigDict | ||
from zentral.core.exceptions import ImproperlyConfigured | ||
from zentral.core.secret_engines import (decrypt_str, encrypt_str, secret_engines, | ||
DecryptionError, EncryptionError) | ||
from zentral.core.secret_engines.backends.gcp_kms import SecretEngine | ||
|
||
|
||
class GCPKMSSecretEngineTestCase(SimpleTestCase): | ||
def test_init(self): | ||
secret_engines.load_config(ConfigDict({ | ||
"gcp": {"backend": "zentral.core.secret_engines.backends.gcp_kms", | ||
"project_id": "PROJECT_ID", | ||
"location_id": "LOCATION", | ||
"key_ring_id": "KEY_RING", | ||
"key_id": "KEY_NAME", | ||
"default_context": {"un": "1"}} | ||
})) | ||
secret_engine = secret_engines.default_secret_engine | ||
self.assertIsInstance(secret_engine, SecretEngine) | ||
self.assertEqual( | ||
secret_engine.key_name, | ||
"projects/PROJECT_ID/locations/LOCATION/keyRings/KEY_RING/cryptoKeys/KEY_NAME" | ||
) | ||
self.assertEqual(secret_engine.default_context, {"un": "1"}) | ||
self.assertIsNone(secret_engine.credentials_file) | ||
|
||
def test_init_default_context_not_a_dict(self): | ||
with self.assertRaises(ImproperlyConfigured) as cm: | ||
secret_engines.load_config({ | ||
"gcp": {"backend": "zentral.core.secret_engines.backends.gcp_kms", | ||
"project_id": "PROJECT_ID", | ||
"location_id": "LOCATION", | ||
"key_ring_id": "KEY_RING", | ||
"key_id": "KEY_NAME", | ||
"default_context": "un"} | ||
}) | ||
self.assertEqual(cm.exception.args[0], "Default context is not a dict") | ||
|
||
def test_init_default_context_not_a_dict_str_str(self): | ||
with self.assertRaises(ImproperlyConfigured) as cm: | ||
secret_engines.load_config({ | ||
"gcp": {"backend": "zentral.core.secret_engines.backends.gcp_kms", | ||
"project_id": "PROJECT_ID", | ||
"location_id": "LOCATION", | ||
"key_ring_id": "KEY_RING", | ||
"key_id": "KEY_NAME", | ||
"default_context": {"un": 1}} | ||
}) | ||
self.assertEqual(cm.exception.args[0], "Default context is not a dict[str, str]") | ||
|
||
def test_prepared_context(self): | ||
secret_engines.load_config(ConfigDict({ | ||
"gcp": {"backend": "zentral.core.secret_engines.backends.gcp_kms", | ||
"project_id": "PROJECT_ID", | ||
"location_id": "LOCATION", | ||
"key_ring_id": "KEY_RING", | ||
"key_id": "KEY_NAME", | ||
"default_context": {"un": "1"}} | ||
})) | ||
secret_engine = secret_engines.default_secret_engine | ||
self.assertEqual( | ||
secret_engine._prepared_context({"un": "0", "deux": 2}), | ||
b'{"deux": "2", "un": "0"}' | ||
) | ||
|
||
@patch("zentral.core.secret_engines.backends.gcp_kms.kms.KeyManagementServiceClient") | ||
def test_encrypt_str_corrupted_request(self, gcp_client): | ||
mocked_response = Mock() | ||
mocked_response.verified_plaintext_crc32c = True | ||
mocked_response.verified_additional_authenticated_data_crc32c = False | ||
mocked_response.ciphertext = b"fomo" | ||
mocked_response.ciphertext_crc32c = 3847521259 | ||
mocked_client = Mock() | ||
mocked_client.encrypt.return_value = mocked_response | ||
gcp_client.return_value = mocked_client | ||
secret_engines.load_config({ | ||
"gcp": {"backend": "zentral.core.secret_engines.backends.gcp_kms", | ||
"project_id": "PROJECT_ID", | ||
"location_id": "LOCATION", | ||
"key_ring_id": "KEY_RING", | ||
"key_id": "KEY_NAME", | ||
"default_context": {"un": "1"}} | ||
}) | ||
with self.assertRaises(EncryptionError) as cm: | ||
encrypt_str("yolo") | ||
self.assertEqual( | ||
cm.exception.__context__.args[0], | ||
"The request sent to the server was corrupted in-transit." | ||
) | ||
|
||
@patch("zentral.core.secret_engines.backends.gcp_kms.kms.KeyManagementServiceClient") | ||
def test_encrypt_str_corrupted_response(self, gcp_client): | ||
mocked_response = Mock() | ||
mocked_response.verified_plaintext_crc32c = True | ||
mocked_response.verified_additional_authenticated_data_crc32c = True | ||
mocked_response.ciphertext = b"fomo" | ||
mocked_response.ciphertext_crc32c = 0 # not 3847521259 | ||
mocked_client = Mock() | ||
mocked_client.encrypt.return_value = mocked_response | ||
gcp_client.return_value = mocked_client | ||
secret_engines.load_config({ | ||
"gcp": {"backend": "zentral.core.secret_engines.backends.gcp_kms", | ||
"project_id": "PROJECT_ID", | ||
"location_id": "LOCATION", | ||
"key_ring_id": "KEY_RING", | ||
"key_id": "KEY_NAME", | ||
"default_context": {"un": "1"}} | ||
}) | ||
with self.assertRaises(EncryptionError) as cm: | ||
encrypt_str("yolo") | ||
self.assertEqual( | ||
cm.exception.__context__.args[0], | ||
"The response received from the server was corrupted in-transit." | ||
) | ||
|
||
@patch("zentral.core.secret_engines.backends.gcp_kms.kms.KeyManagementServiceClient") | ||
def test_encrypt_str(self, gcp_client): | ||
mocked_response = Mock() | ||
mocked_response.verified_plaintext_crc32c = True | ||
mocked_response.verified_additional_authenticated_data_crc32c = True | ||
mocked_response.ciphertext = b"fomo" | ||
mocked_response.ciphertext_crc32c = 3847521259 | ||
mocked_client = Mock() | ||
mocked_client.encrypt.return_value = mocked_response | ||
gcp_client.return_value = mocked_client | ||
secret_engines.load_config({ | ||
"gcp": {"backend": "zentral.core.secret_engines.backends.gcp_kms", | ||
"project_id": "PROJECT_ID", | ||
"location_id": "LOCATION", | ||
"key_ring_id": "KEY_RING", | ||
"key_id": "KEY_NAME", | ||
"default_context": {"un": "1"}} | ||
}) | ||
self.assertEqual(encrypt_str("yolo"), "gcp$Zm9tbw==") | ||
|
||
@patch("zentral.core.secret_engines.backends.gcp_kms.kms.KeyManagementServiceClient") | ||
def test_decrypt_str_corrupted_response(self, gcp_client): | ||
mocked_response = Mock() | ||
mocked_response.plaintext = b"yolo" | ||
mocked_response.plaintext_crc32c = 0 # not 4040585613 | ||
mocked_client = Mock() | ||
mocked_client.decrypt.return_value = mocked_response | ||
gcp_client.return_value = mocked_client | ||
secret_engines.load_config({ | ||
"gcp": {"backend": "zentral.core.secret_engines.backends.gcp_kms", | ||
"project_id": "PROJECT_ID", | ||
"location_id": "LOCATION", | ||
"key_ring_id": "KEY_RING", | ||
"key_id": "KEY_NAME", | ||
"default_context": {"un": "1"}} | ||
}) | ||
with self.assertRaises(DecryptionError) as cm: | ||
decrypt_str("gcp$Zm9tbw==") | ||
self.assertEqual( | ||
cm.exception.__context__.args[0], | ||
"The response received from the server was corrupted in-transit." | ||
) | ||
|
||
@patch("zentral.core.secret_engines.backends.gcp_kms.kms.KeyManagementServiceClient") | ||
def test_decrypt_str(self, gcp_client): | ||
mocked_response = Mock() | ||
mocked_response.plaintext = b"yolo" | ||
mocked_response.plaintext_crc32c = 4040585613 | ||
mocked_client = Mock() | ||
mocked_client.decrypt.return_value = mocked_response | ||
gcp_client.return_value = mocked_client | ||
secret_engines.load_config({ | ||
"gcp": {"backend": "zentral.core.secret_engines.backends.gcp_kms", | ||
"project_id": "PROJECT_ID", | ||
"location_id": "LOCATION", | ||
"key_ring_id": "KEY_RING", | ||
"key_id": "KEY_NAME", | ||
"default_context": {"un": "1"}} | ||
}) | ||
self.assertEqual(decrypt_str("gcp$Zm9tbw=="), "yolo") | ||
mocked_client.decrypt.assert_called_once() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters