Skip to content

Commit

Permalink
Remove dependency on AWS Encryption SDK for HOOK types (#260)
Browse files Browse the repository at this point in the history
* Remove dependency on AWS Encryption SDK for HOOK types
  • Loading branch information
Brianwithay21 authored May 18, 2023
1 parent 775584e commit be9da28
Show file tree
Hide file tree
Showing 8 changed files with 101 additions and 337 deletions.
111 changes: 0 additions & 111 deletions src/cloudformation_cli_python_lib/cipher.py

This file was deleted.

4 changes: 0 additions & 4 deletions src/cloudformation_cli_python_lib/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,3 @@ def __init__(self, hook_type_name: str, target_type_name: str):

class Unknown(_HandlerError):
pass


class _EncryptionError(Exception):
pass
30 changes: 4 additions & 26 deletions src/cloudformation_cli_python_lib/hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,7 @@
from typing import Any, Callable, MutableMapping, Optional, Tuple, Type, Union

from .boto3_proxy import SessionProxy, _get_boto_session
from .cipher import Cipher, KmsCipher
from .exceptions import (
AccessDenied,
InternalFailure,
InvalidRequest,
_EncryptionError,
_HandlerError,
)
from .exceptions import InternalFailure, InvalidRequest, _HandlerError
from .interface import (
BaseHookHandlerRequest,
HandlerErrorCode,
Expand Down Expand Up @@ -173,31 +166,16 @@ def _parse_request(
]:
try:
event = HookInvocationRequest.deserialize(event_data)
cipher: Cipher = KmsCipher(
event.requestData.hookEncryptionKeyArn,
event.requestData.hookEncryptionKeyRole,
)

caller_credentials = cipher.decrypt_credentials(
event.requestData.callerCredentials
)
provider_credentials = cipher.decrypt_credentials(
event.requestData.providerCredentials
)

caller_sess = _get_boto_session(caller_credentials)
provider_sess = _get_boto_session(provider_credentials)
caller_sess = _get_boto_session(event.requestData.callerCredentials)
provider_sess = _get_boto_session(event.requestData.providerCredentials)
# credentials are used when rescheduling, so can't zero them out (for now)
invocation_point = HookInvocationPoint[event.actionInvocationPoint]
callback_context = event.requestContext.callbackContext or {}
except _EncryptionError as e:
LOG.exception("Failed to decrypt credentials")
raise AccessDenied(f"{e} ({type(e).__name__})") from e
except Exception as e:
LOG.exception("Invalid request")
raise InvalidRequest(f"{e} ({type(e).__name__})") from e

return ((caller_sess, provider_sess)), invocation_point, callback_context, event
return (caller_sess, provider_sess), invocation_point, callback_context, event

def _cast_hook_request(
self, request: HookInvocationRequest
Expand Down
45 changes: 40 additions & 5 deletions src/cloudformation_cli_python_lib/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,22 +204,48 @@ def deserialize(cls, json_data: MutableMapping[str, Any]) -> "HookRequestContext
return HookRequestContext()
return HookRequestContext(**json_data)

def serialize(self) -> Mapping[str, Any]:
return {key: value for key, value in self.__dict__.items() if value is not None}


@dataclass
class HookRequestData:
targetName: str
targetType: str
targetLogicalId: str
targetModel: Mapping[str, Any]
callerCredentials: Optional[str] = None
providerCredentials: Optional[str] = None
callerCredentials: Optional[Credentials] = None
providerCredentials: Optional[Credentials] = None
providerLogGroupName: Optional[str] = None
hookEncryptionKeyArn: Optional[str] = None
hookEncryptionKeyRole: Optional[str] = None

def __init__(self, **kwargs: Any) -> None:
dataclass_fields = {f.name for f in fields(self)}
for k, v in kwargs.items():
if k in dataclass_fields:
setattr(self, k, v)

@classmethod
def deserialize(cls, json_data: MutableMapping[str, Any]) -> "HookRequestData":
return HookRequestData(**json_data)
req_data = HookRequestData(**json_data)
for key in json_data:
if not key.endswith("Credentials"):
continue
creds = json_data.get(key)
if creds:
cred_data = json.loads(creds)
setattr(req_data, key, Credentials(**cred_data))
return req_data

def serialize(self) -> Mapping[str, Any]:
return {
key: {k: v for k, v in value.items() if v is not None}
if key == "targetModel"
else value.__dict__.copy()
if key.endswith("Credentials")
else value
for key, value in self.__dict__.items()
if value is not None
}


@dataclass
Expand Down Expand Up @@ -252,6 +278,15 @@ def deserialize(cls, json_data: MutableMapping[str, Any]) -> Any:
)
return event

def serialize(self) -> Mapping[str, Any]:
return {
key: value.serialize()
if key in ("requestData", "requestContext")
else value
for key, value in self.__dict__.items()
if value is not None
}


@dataclass
class UnmodelledHookRequest:
Expand Down
1 change: 0 additions & 1 deletion src/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
python_requires=">=3.6",
install_requires=[
"boto3>=1.10.20",
"aws-encryption-sdk==3.1.1",
'dataclasses;python_version<"3.7"',
],
license="Apache License 2.0",
Expand Down
134 changes: 0 additions & 134 deletions tests/lib/cipher_test.py

This file was deleted.

Loading

0 comments on commit be9da28

Please sign in to comment.