diff --git a/samcli/commands/remote/remote_invoke_context.py b/samcli/commands/remote/remote_invoke_context.py index e86cc0515a..483c096d11 100644 --- a/samcli/commands/remote/remote_invoke_context.py +++ b/samcli/commands/remote/remote_invoke_context.py @@ -35,7 +35,7 @@ get_resource_summary, get_resource_summary_from_physical_id, ) -from samcli.lib.utils.resources import AWS_LAMBDA_FUNCTION, AWS_SQS_QUEUE +from samcli.lib.utils.resources import AWS_KINESIS_STREAM, AWS_LAMBDA_FUNCTION, AWS_SQS_QUEUE from samcli.lib.utils.stream_writer import StreamWriter LOG = logging.getLogger(__name__) @@ -215,6 +215,11 @@ def _get_from_physical_resource_id(self) -> CloudFormationResourceSummary: sqs_client = self._boto_client_provider("sqs") resource_id = get_queue_url_from_arn(sqs_client, resource_arn.resource_id) + if SUPPORTED_SERVICES.get(service_from_arn) == AWS_KINESIS_STREAM: + # Note (hnnasit): Add unit test after AWS_KINESIS_STREAM is added to SUPPORTED_SERVICES + # StreamName extracted from arn is used as resource_id. + resource_id = resource_arn.resource_id + return CloudFormationResourceSummary( cast(str, SUPPORTED_SERVICES.get(service_from_arn)), resource_id, diff --git a/samcli/lib/remote_invoke/kinesis_invoke_executors.py b/samcli/lib/remote_invoke/kinesis_invoke_executors.py new file mode 100644 index 0000000000..d26c51850f --- /dev/null +++ b/samcli/lib/remote_invoke/kinesis_invoke_executors.py @@ -0,0 +1,132 @@ +""" +Remote invoke executor implementation for Kinesis streams +""" +import logging +import uuid +from dataclasses import asdict, dataclass +from typing import cast + +from botocore.exceptions import ClientError, ParamValidationError +from mypy_boto3_kinesis import KinesisClient + +from samcli.lib.remote_invoke.exceptions import ( + ErrorBotoApiCallException, + InvalidResourceBotoParameterException, +) +from samcli.lib.remote_invoke.remote_invoke_executors import ( + BotoActionExecutor, + RemoteInvokeIterableResponseType, + RemoteInvokeOutputFormat, + RemoteInvokeResponse, +) + +LOG = logging.getLogger(__name__) +STREAM_NAME = "StreamName" +DATA = "Data" +PARTITION_KEY = "PartitionKey" + + +@dataclass +class KinesisStreamPutRecordTextOutput: + """ + Dataclass that stores put_record boto3 API fields used to create + text output. + """ + + ShardId: str + SequenceNumber: str + + def get_output_response_dict(self) -> dict: + """ + Returns a dict of existing dataclass fields. + + Returns + ------- + dict + Returns the dict of the fields that will be used as the output response for + text format output. + """ + return asdict(self, dict_factory=lambda x: {k: v for (k, v) in x if v is not None}) + + +class KinesisPutDataExecutor(BotoActionExecutor): + """ + Calls "put_record" method of "Kinesis stream" service with given input. + If a file location provided, the file handle will be passed as input object. + """ + + _kinesis_client: KinesisClient + _stream_name: str + _remote_output_format: RemoteInvokeOutputFormat + request_parameters: dict + + def __init__(self, kinesis_client: KinesisClient, physical_id: str, remote_output_format: RemoteInvokeOutputFormat): + self._kinesis_client = kinesis_client + self._remote_output_format = remote_output_format + self._stream_name = physical_id + self.request_parameters = {} + + def validate_action_parameters(self, parameters: dict) -> None: + """ + Validates the input boto parameters and prepares the parameters for calling the API. + + Parameters + ---------- + parameters: dict + Boto parameters provided as input + """ + for parameter_key, parameter_value in parameters.items(): + if parameter_key == STREAM_NAME: + LOG.warning("StreamName is defined using the value provided for resource_id argument.") + elif parameter_key == DATA: + LOG.warning("Data is defined using the value provided for either --event or --event-file options.") + else: + self.request_parameters[parameter_key] = parameter_value + + if PARTITION_KEY not in self.request_parameters: + self.request_parameters[PARTITION_KEY] = str(uuid.uuid4()) + + def _execute_action(self, payload: str) -> RemoteInvokeIterableResponseType: + """ + Calls "put_record" method to write single data record to Kinesis data stream. + + Parameters + ---------- + payload: str + The Data record which will be sent to the Kinesis stream + + Yields + ------ + RemoteInvokeIterableResponseType + Response that is consumed by remote invoke consumers after execution + """ + if payload: + self.request_parameters[DATA] = payload + else: + self.request_parameters[DATA] = "{}" + LOG.debug("Input event not found, putting a record with Data {}") + self.request_parameters[STREAM_NAME] = self._stream_name + LOG.debug( + "Calling kinesis_client.put_record with StreamName:%s, Data:%s", + self.request_parameters[STREAM_NAME], + payload, + ) + try: + put_record_response = cast(dict, self._kinesis_client.put_record(**self.request_parameters)) + + if self._remote_output_format == RemoteInvokeOutputFormat.JSON: + yield RemoteInvokeResponse(put_record_response) + if self._remote_output_format == RemoteInvokeOutputFormat.TEXT: + put_record_text_output = KinesisStreamPutRecordTextOutput( + ShardId=put_record_response["ShardId"], + SequenceNumber=put_record_response["SequenceNumber"], + ) + output_data = put_record_text_output.get_output_response_dict() + yield RemoteInvokeResponse(output_data) + except ParamValidationError as param_val_ex: + raise InvalidResourceBotoParameterException( + f"Invalid parameter key provided." + f" {str(param_val_ex).replace(f'{STREAM_NAME}, ', '').replace(f'{DATA}, ', '')}" + ) + except ClientError as client_ex: + raise ErrorBotoApiCallException(client_ex) from client_ex diff --git a/samcli/lib/remote_invoke/remote_invoke_executor_factory.py b/samcli/lib/remote_invoke/remote_invoke_executor_factory.py index cb4c716a24..e739a639ff 100644 --- a/samcli/lib/remote_invoke/remote_invoke_executor_factory.py +++ b/samcli/lib/remote_invoke/remote_invoke_executor_factory.py @@ -4,6 +4,7 @@ import logging from typing import Any, Callable, Dict, Optional +from samcli.lib.remote_invoke.kinesis_invoke_executors import KinesisPutDataExecutor from samcli.lib.remote_invoke.lambda_invoke_executors import ( DefaultConvertToJSON, LambdaInvokeExecutor, @@ -224,6 +225,44 @@ def _create_sqs_boto_executor( log_consumer=log_consumer, ) + def _create_kinesis_boto_executor( + self, + cfn_resource_summary: CloudFormationResourceSummary, + remote_invoke_output_format: RemoteInvokeOutputFormat, + response_consumer: RemoteInvokeConsumer[RemoteInvokeResponse], + log_consumer: RemoteInvokeConsumer[RemoteInvokeLogOutput], + ) -> RemoteInvokeExecutor: + """Creates a remote invoke executor for Kinesis resource type based on + the boto action being called. + + Parameters + ---------- + cfn_resource_summary: CloudFormationResourceSummary + Information about the Kinesis stream resource + remote_invoke_output_format: RemoteInvokeOutputFormat + Response output format that will be used for remote invoke execution + response_consumer: RemoteInvokeConsumer[RemoteInvokeResponse] + Consumer instance which can process RemoteInvokeResponse events + log_consumer: RemoteInvokeConsumer[RemoteInvokeLogOutput] + Consumer instance which can process RemoteInvokeLogOutput events + + Returns + ------- + RemoteInvokeExecutor + Returns the Executor created for Kinesis stream + """ + LOG.info("Putting record to Kinesis data stream %s", cfn_resource_summary.logical_resource_id) + kinesis_client = self._boto_client_provider("kinesis") + return RemoteInvokeExecutor( + request_mappers=[DefaultConvertToJSON()], + response_mappers=[ResponseObjectToJsonStringMapper()], + boto_action_executor=KinesisPutDataExecutor( + kinesis_client, cfn_resource_summary.physical_resource_id, remote_invoke_output_format + ), + response_consumer=response_consumer, + log_consumer=log_consumer, + ) + # mapping definition for each supported resource type REMOTE_INVOKE_EXECUTOR_MAPPING: Dict[ str, diff --git a/tests/unit/lib/remote_invoke/test_kinesis_invoke_executors.py b/tests/unit/lib/remote_invoke/test_kinesis_invoke_executors.py new file mode 100644 index 0000000000..50793ff25e --- /dev/null +++ b/tests/unit/lib/remote_invoke/test_kinesis_invoke_executors.py @@ -0,0 +1,142 @@ +from unittest import TestCase +from unittest.mock import patch, Mock + +from parameterized import parameterized, parameterized_class +from samcli.lib.remote_invoke.kinesis_invoke_executors import ( + RemoteInvokeOutputFormat, + KinesisPutDataExecutor, + ParamValidationError, + InvalidResourceBotoParameterException, + ErrorBotoApiCallException, + ClientError, + KinesisStreamPutRecordTextOutput, +) +from samcli.lib.remote_invoke.remote_invoke_executors import RemoteInvokeResponse + + +class TestKinesisStreamPutRecordTextOutput(TestCase): + @parameterized.expand( + [ + ("mock-shard-id", "mock-sequence-number"), + ] + ) + def test_kinesis_put_record_text_output(self, shard_id, sequence_number): + text_output = KinesisStreamPutRecordTextOutput(ShardId=shard_id, SequenceNumber=sequence_number) + self.assertEqual(text_output.ShardId, shard_id) + self.assertEqual(text_output.SequenceNumber, sequence_number) + + @parameterized.expand( + [ + ( + "mock-shard-id", + "mock-sequence-number", + { + "ShardId": "mock-shard-id", + "SequenceNumber": "mock-sequence-number", + }, + ), + ] + ) + def test_get_output_response_dict(self, shard_id, sequence_number, expected_output): + text_output = KinesisStreamPutRecordTextOutput(ShardId=shard_id, SequenceNumber=sequence_number) + output_response_dict = text_output.get_output_response_dict() + self.assertEqual(output_response_dict, expected_output) + + +@parameterized_class( + "output", + [[RemoteInvokeOutputFormat.TEXT], [RemoteInvokeOutputFormat.JSON]], +) +class TestKinesisPutDataExecutor(TestCase): + output: RemoteInvokeOutputFormat + + def setUp(self) -> None: + self.kinesis_client = Mock() + self.stream_name = "mock-kinesis-stream" + self.kinesis_put_data_executor = KinesisPutDataExecutor(self.kinesis_client, self.stream_name, self.output) + + @patch("samcli.lib.remote_invoke.kinesis_invoke_executors.uuid") + def test_execute_action_successful(self, patched_uuid): + mock_uuid_value = "patched-uuid-value" + patched_uuid.uuid4.return_value = mock_uuid_value + given_input_data = "hello world" + mock_shard_id = "shardId-000000000000" + mock_sequence_number = "2941492a-5847-4ebb-a8a3-58c07ce9f198" + mock_text_response = { + "ShardId": mock_shard_id, + "SequenceNumber": mock_sequence_number, + } + + mock_json_response = { + "ShardId": mock_shard_id, + "SequenceNumber": mock_sequence_number, + "ResponseMetadata": {}, + } + self.kinesis_client.put_record.return_value = { + "ShardId": mock_shard_id, + "SequenceNumber": mock_sequence_number, + "ResponseMetadata": {}, + } + self.kinesis_put_data_executor.validate_action_parameters({}) + result = self.kinesis_put_data_executor._execute_action(given_input_data) + if self.output == RemoteInvokeOutputFormat.JSON: + self.assertEqual(list(result), [RemoteInvokeResponse(mock_json_response)]) + else: + self.assertEqual(list(result), [RemoteInvokeResponse(mock_text_response)]) + + self.kinesis_client.put_record.assert_called_with( + Data=given_input_data, StreamName=self.stream_name, PartitionKey=mock_uuid_value + ) + + @parameterized.expand( + [ + ({}, {"PartitionKey": "mock-uuid-value"}), + ( + {"ExplicitHashKey": "mock-explicit-hash-key", "SequenceNumberForOrdering": "1"}, + { + "PartitionKey": "mock-uuid-value", + "ExplicitHashKey": "mock-explicit-hash-key", + "SequenceNumberForOrdering": "1", + }, + ), + ( + { + "PartitionKey": "override-partition-key", + }, + { + "PartitionKey": "override-partition-key", + }, + ), + ( + {"StreamName": "mock-stream-name", "Data": "mock-data"}, + {"PartitionKey": "mock-uuid-value"}, + ), + ( + {"invalidParameterKey": "invalidParameterValue"}, + {"invalidParameterKey": "invalidParameterValue", "PartitionKey": "mock-uuid-value"}, + ), + ] + ) + @patch("samcli.lib.remote_invoke.kinesis_invoke_executors.uuid") + def test_validate_action_parameters(self, parameters, expected_boto_parameters, patched_uuid): + mock_uuid_value = "mock-uuid-value" + patched_uuid.uuid4.return_value = mock_uuid_value + self.kinesis_put_data_executor.validate_action_parameters(parameters) + self.assertEqual(self.kinesis_put_data_executor.request_parameters, expected_boto_parameters) + + @parameterized.expand( + [ + (ParamValidationError(report="Invalid parameters"), InvalidResourceBotoParameterException), + ( + ClientError(error_response={"Error": {"Code": "MockException"}}, operation_name="send_message"), + ErrorBotoApiCallException, + ), + ] + ) + def test_execute_action_put_record_throws_boto_errors(self, boto_error, expected_error_thrown): + given_input_message = "hello world" + self.kinesis_client.put_record.side_effect = boto_error + with self.assertRaises(expected_error_thrown): + self.kinesis_put_data_executor.validate_action_parameters({}) + for _ in self.kinesis_put_data_executor._execute_action(given_input_message): + pass diff --git a/tests/unit/lib/remote_invoke/test_remote_invoke_executor_factory.py b/tests/unit/lib/remote_invoke/test_remote_invoke_executor_factory.py index a3b4c96bbe..9d0843643a 100644 --- a/tests/unit/lib/remote_invoke/test_remote_invoke_executor_factory.py +++ b/tests/unit/lib/remote_invoke/test_remote_invoke_executor_factory.py @@ -231,3 +231,48 @@ def test_create_sqs_boto_executor( response_consumer=given_response_consumer, log_consumer=given_log_consumer, ) + + @parameterized.expand(itertools.product([RemoteInvokeOutputFormat.JSON, RemoteInvokeOutputFormat.TEXT])) + @patch("samcli.lib.remote_invoke.remote_invoke_executor_factory.KinesisPutDataExecutor") + @patch("samcli.lib.remote_invoke.remote_invoke_executor_factory.DefaultConvertToJSON") + @patch("samcli.lib.remote_invoke.remote_invoke_executor_factory.ResponseObjectToJsonStringMapper") + @patch("samcli.lib.remote_invoke.remote_invoke_executor_factory.RemoteInvokeExecutor") + def test_create_kinesis_boto_executor( + self, + remote_invoke_output_format, + patched_remote_invoke_executor, + patched_object_to_json_converter, + patched_convert_to_default_json, + patched_kinesis_invoke_executor, + ): + given_physical_resource_id = "mock-stream-name" + given_cfn_resource_summary = Mock(physical_resource_id=given_physical_resource_id) + + given_kinesis_client = Mock() + self.boto_client_provider_mock.return_value = given_kinesis_client + + given_remote_invoke_executor = Mock() + patched_remote_invoke_executor.return_value = given_remote_invoke_executor + + given_response_consumer = Mock() + given_log_consumer = Mock() + kinesis_executor = self.remote_invoke_executor_factory._create_kinesis_boto_executor( + given_cfn_resource_summary, remote_invoke_output_format, given_response_consumer, given_log_consumer + ) + + self.assertEqual(kinesis_executor, given_remote_invoke_executor) + self.boto_client_provider_mock.assert_called_with("kinesis") + patched_convert_to_default_json.assert_called_once() + + patched_object_to_json_converter.assert_called_once() + patched_kinesis_invoke_executor.assert_called_with( + given_kinesis_client, given_physical_resource_id, remote_invoke_output_format + ) + + patched_remote_invoke_executor.assert_called_with( + request_mappers=[patched_convert_to_default_json()], + response_mappers=[patched_object_to_json_converter()], + boto_action_executor=patched_kinesis_invoke_executor(), + response_consumer=given_response_consumer, + log_consumer=given_log_consumer, + )