Skip to content

Commit

Permalink
feat: Add remote invoke implementation for Kinesis stream service for…
Browse files Browse the repository at this point in the history
… put_record API (#6063)

* Add mypy-boto3 stubs for kinesis

* Add remote invoke implementation for kinesis service

* unit tests

* fix typo in comment

Co-authored-by: Wing Fung Lau <[email protected]>

---------

Co-authored-by: Wing Fung Lau <[email protected]>
  • Loading branch information
hnnasit and hawflau authored Oct 12, 2023
1 parent 9f182f0 commit 6ccd872
Show file tree
Hide file tree
Showing 5 changed files with 364 additions and 1 deletion.
7 changes: 6 additions & 1 deletion samcli/commands/remote/remote_invoke_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
get_resource_summary,
get_resource_summary_from_physical_id,
)
from samcli.lib.utils.resources import AWS_LAMBDA_FUNCTION, AWS_SQS_QUEUE
from samcli.lib.utils.resources import AWS_KINESIS_STREAM, AWS_LAMBDA_FUNCTION, AWS_SQS_QUEUE
from samcli.lib.utils.stream_writer import StreamWriter

LOG = logging.getLogger(__name__)
Expand Down Expand Up @@ -215,6 +215,11 @@ def _get_from_physical_resource_id(self) -> CloudFormationResourceSummary:
sqs_client = self._boto_client_provider("sqs")
resource_id = get_queue_url_from_arn(sqs_client, resource_arn.resource_id)

if SUPPORTED_SERVICES.get(service_from_arn) == AWS_KINESIS_STREAM:
# Note (hnnasit): Add unit test after AWS_KINESIS_STREAM is added to SUPPORTED_SERVICES
# StreamName extracted from arn is used as resource_id.
resource_id = resource_arn.resource_id

return CloudFormationResourceSummary(
cast(str, SUPPORTED_SERVICES.get(service_from_arn)),
resource_id,
Expand Down
132 changes: 132 additions & 0 deletions samcli/lib/remote_invoke/kinesis_invoke_executors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
"""
Remote invoke executor implementation for Kinesis streams
"""
import logging
import uuid
from dataclasses import asdict, dataclass
from typing import cast

from botocore.exceptions import ClientError, ParamValidationError
from mypy_boto3_kinesis import KinesisClient

from samcli.lib.remote_invoke.exceptions import (
ErrorBotoApiCallException,
InvalidResourceBotoParameterException,
)
from samcli.lib.remote_invoke.remote_invoke_executors import (
BotoActionExecutor,
RemoteInvokeIterableResponseType,
RemoteInvokeOutputFormat,
RemoteInvokeResponse,
)

LOG = logging.getLogger(__name__)
STREAM_NAME = "StreamName"
DATA = "Data"
PARTITION_KEY = "PartitionKey"


@dataclass
class KinesisStreamPutRecordTextOutput:
"""
Dataclass that stores put_record boto3 API fields used to create
text output.
"""

ShardId: str
SequenceNumber: str

def get_output_response_dict(self) -> dict:
"""
Returns a dict of existing dataclass fields.
Returns
-------
dict
Returns the dict of the fields that will be used as the output response for
text format output.
"""
return asdict(self, dict_factory=lambda x: {k: v for (k, v) in x if v is not None})


class KinesisPutDataExecutor(BotoActionExecutor):
"""
Calls "put_record" method of "Kinesis stream" service with given input.
If a file location provided, the file handle will be passed as input object.
"""

_kinesis_client: KinesisClient
_stream_name: str
_remote_output_format: RemoteInvokeOutputFormat
request_parameters: dict

def __init__(self, kinesis_client: KinesisClient, physical_id: str, remote_output_format: RemoteInvokeOutputFormat):
self._kinesis_client = kinesis_client
self._remote_output_format = remote_output_format
self._stream_name = physical_id
self.request_parameters = {}

def validate_action_parameters(self, parameters: dict) -> None:
"""
Validates the input boto parameters and prepares the parameters for calling the API.
Parameters
----------
parameters: dict
Boto parameters provided as input
"""
for parameter_key, parameter_value in parameters.items():
if parameter_key == STREAM_NAME:
LOG.warning("StreamName is defined using the value provided for resource_id argument.")
elif parameter_key == DATA:
LOG.warning("Data is defined using the value provided for either --event or --event-file options.")
else:
self.request_parameters[parameter_key] = parameter_value

if PARTITION_KEY not in self.request_parameters:
self.request_parameters[PARTITION_KEY] = str(uuid.uuid4())

def _execute_action(self, payload: str) -> RemoteInvokeIterableResponseType:
"""
Calls "put_record" method to write single data record to Kinesis data stream.
Parameters
----------
payload: str
The Data record which will be sent to the Kinesis stream
Yields
------
RemoteInvokeIterableResponseType
Response that is consumed by remote invoke consumers after execution
"""
if payload:
self.request_parameters[DATA] = payload
else:
self.request_parameters[DATA] = "{}"
LOG.debug("Input event not found, putting a record with Data {}")
self.request_parameters[STREAM_NAME] = self._stream_name
LOG.debug(
"Calling kinesis_client.put_record with StreamName:%s, Data:%s",
self.request_parameters[STREAM_NAME],
payload,
)
try:
put_record_response = cast(dict, self._kinesis_client.put_record(**self.request_parameters))

if self._remote_output_format == RemoteInvokeOutputFormat.JSON:
yield RemoteInvokeResponse(put_record_response)
if self._remote_output_format == RemoteInvokeOutputFormat.TEXT:
put_record_text_output = KinesisStreamPutRecordTextOutput(
ShardId=put_record_response["ShardId"],
SequenceNumber=put_record_response["SequenceNumber"],
)
output_data = put_record_text_output.get_output_response_dict()
yield RemoteInvokeResponse(output_data)
except ParamValidationError as param_val_ex:
raise InvalidResourceBotoParameterException(
f"Invalid parameter key provided."
f" {str(param_val_ex).replace(f'{STREAM_NAME}, ', '').replace(f'{DATA}, ', '')}"
)
except ClientError as client_ex:
raise ErrorBotoApiCallException(client_ex) from client_ex
39 changes: 39 additions & 0 deletions samcli/lib/remote_invoke/remote_invoke_executor_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import logging
from typing import Any, Callable, Dict, Optional

from samcli.lib.remote_invoke.kinesis_invoke_executors import KinesisPutDataExecutor
from samcli.lib.remote_invoke.lambda_invoke_executors import (
DefaultConvertToJSON,
LambdaInvokeExecutor,
Expand Down Expand Up @@ -224,6 +225,44 @@ def _create_sqs_boto_executor(
log_consumer=log_consumer,
)

def _create_kinesis_boto_executor(
self,
cfn_resource_summary: CloudFormationResourceSummary,
remote_invoke_output_format: RemoteInvokeOutputFormat,
response_consumer: RemoteInvokeConsumer[RemoteInvokeResponse],
log_consumer: RemoteInvokeConsumer[RemoteInvokeLogOutput],
) -> RemoteInvokeExecutor:
"""Creates a remote invoke executor for Kinesis resource type based on
the boto action being called.
Parameters
----------
cfn_resource_summary: CloudFormationResourceSummary
Information about the Kinesis stream resource
remote_invoke_output_format: RemoteInvokeOutputFormat
Response output format that will be used for remote invoke execution
response_consumer: RemoteInvokeConsumer[RemoteInvokeResponse]
Consumer instance which can process RemoteInvokeResponse events
log_consumer: RemoteInvokeConsumer[RemoteInvokeLogOutput]
Consumer instance which can process RemoteInvokeLogOutput events
Returns
-------
RemoteInvokeExecutor
Returns the Executor created for Kinesis stream
"""
LOG.info("Putting record to Kinesis data stream %s", cfn_resource_summary.logical_resource_id)
kinesis_client = self._boto_client_provider("kinesis")
return RemoteInvokeExecutor(
request_mappers=[DefaultConvertToJSON()],
response_mappers=[ResponseObjectToJsonStringMapper()],
boto_action_executor=KinesisPutDataExecutor(
kinesis_client, cfn_resource_summary.physical_resource_id, remote_invoke_output_format
),
response_consumer=response_consumer,
log_consumer=log_consumer,
)

# mapping definition for each supported resource type
REMOTE_INVOKE_EXECUTOR_MAPPING: Dict[
str,
Expand Down
142 changes: 142 additions & 0 deletions tests/unit/lib/remote_invoke/test_kinesis_invoke_executors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
from unittest import TestCase
from unittest.mock import patch, Mock

from parameterized import parameterized, parameterized_class
from samcli.lib.remote_invoke.kinesis_invoke_executors import (
RemoteInvokeOutputFormat,
KinesisPutDataExecutor,
ParamValidationError,
InvalidResourceBotoParameterException,
ErrorBotoApiCallException,
ClientError,
KinesisStreamPutRecordTextOutput,
)
from samcli.lib.remote_invoke.remote_invoke_executors import RemoteInvokeResponse


class TestKinesisStreamPutRecordTextOutput(TestCase):
@parameterized.expand(
[
("mock-shard-id", "mock-sequence-number"),
]
)
def test_kinesis_put_record_text_output(self, shard_id, sequence_number):
text_output = KinesisStreamPutRecordTextOutput(ShardId=shard_id, SequenceNumber=sequence_number)
self.assertEqual(text_output.ShardId, shard_id)
self.assertEqual(text_output.SequenceNumber, sequence_number)

@parameterized.expand(
[
(
"mock-shard-id",
"mock-sequence-number",
{
"ShardId": "mock-shard-id",
"SequenceNumber": "mock-sequence-number",
},
),
]
)
def test_get_output_response_dict(self, shard_id, sequence_number, expected_output):
text_output = KinesisStreamPutRecordTextOutput(ShardId=shard_id, SequenceNumber=sequence_number)
output_response_dict = text_output.get_output_response_dict()
self.assertEqual(output_response_dict, expected_output)


@parameterized_class(
"output",
[[RemoteInvokeOutputFormat.TEXT], [RemoteInvokeOutputFormat.JSON]],
)
class TestKinesisPutDataExecutor(TestCase):
output: RemoteInvokeOutputFormat

def setUp(self) -> None:
self.kinesis_client = Mock()
self.stream_name = "mock-kinesis-stream"
self.kinesis_put_data_executor = KinesisPutDataExecutor(self.kinesis_client, self.stream_name, self.output)

@patch("samcli.lib.remote_invoke.kinesis_invoke_executors.uuid")
def test_execute_action_successful(self, patched_uuid):
mock_uuid_value = "patched-uuid-value"
patched_uuid.uuid4.return_value = mock_uuid_value
given_input_data = "hello world"
mock_shard_id = "shardId-000000000000"
mock_sequence_number = "2941492a-5847-4ebb-a8a3-58c07ce9f198"
mock_text_response = {
"ShardId": mock_shard_id,
"SequenceNumber": mock_sequence_number,
}

mock_json_response = {
"ShardId": mock_shard_id,
"SequenceNumber": mock_sequence_number,
"ResponseMetadata": {},
}
self.kinesis_client.put_record.return_value = {
"ShardId": mock_shard_id,
"SequenceNumber": mock_sequence_number,
"ResponseMetadata": {},
}
self.kinesis_put_data_executor.validate_action_parameters({})
result = self.kinesis_put_data_executor._execute_action(given_input_data)
if self.output == RemoteInvokeOutputFormat.JSON:
self.assertEqual(list(result), [RemoteInvokeResponse(mock_json_response)])
else:
self.assertEqual(list(result), [RemoteInvokeResponse(mock_text_response)])

self.kinesis_client.put_record.assert_called_with(
Data=given_input_data, StreamName=self.stream_name, PartitionKey=mock_uuid_value
)

@parameterized.expand(
[
({}, {"PartitionKey": "mock-uuid-value"}),
(
{"ExplicitHashKey": "mock-explicit-hash-key", "SequenceNumberForOrdering": "1"},
{
"PartitionKey": "mock-uuid-value",
"ExplicitHashKey": "mock-explicit-hash-key",
"SequenceNumberForOrdering": "1",
},
),
(
{
"PartitionKey": "override-partition-key",
},
{
"PartitionKey": "override-partition-key",
},
),
(
{"StreamName": "mock-stream-name", "Data": "mock-data"},
{"PartitionKey": "mock-uuid-value"},
),
(
{"invalidParameterKey": "invalidParameterValue"},
{"invalidParameterKey": "invalidParameterValue", "PartitionKey": "mock-uuid-value"},
),
]
)
@patch("samcli.lib.remote_invoke.kinesis_invoke_executors.uuid")
def test_validate_action_parameters(self, parameters, expected_boto_parameters, patched_uuid):
mock_uuid_value = "mock-uuid-value"
patched_uuid.uuid4.return_value = mock_uuid_value
self.kinesis_put_data_executor.validate_action_parameters(parameters)
self.assertEqual(self.kinesis_put_data_executor.request_parameters, expected_boto_parameters)

@parameterized.expand(
[
(ParamValidationError(report="Invalid parameters"), InvalidResourceBotoParameterException),
(
ClientError(error_response={"Error": {"Code": "MockException"}}, operation_name="send_message"),
ErrorBotoApiCallException,
),
]
)
def test_execute_action_put_record_throws_boto_errors(self, boto_error, expected_error_thrown):
given_input_message = "hello world"
self.kinesis_client.put_record.side_effect = boto_error
with self.assertRaises(expected_error_thrown):
self.kinesis_put_data_executor.validate_action_parameters({})
for _ in self.kinesis_put_data_executor._execute_action(given_input_message):
pass
Original file line number Diff line number Diff line change
Expand Up @@ -231,3 +231,48 @@ def test_create_sqs_boto_executor(
response_consumer=given_response_consumer,
log_consumer=given_log_consumer,
)

@parameterized.expand(itertools.product([RemoteInvokeOutputFormat.JSON, RemoteInvokeOutputFormat.TEXT]))
@patch("samcli.lib.remote_invoke.remote_invoke_executor_factory.KinesisPutDataExecutor")
@patch("samcli.lib.remote_invoke.remote_invoke_executor_factory.DefaultConvertToJSON")
@patch("samcli.lib.remote_invoke.remote_invoke_executor_factory.ResponseObjectToJsonStringMapper")
@patch("samcli.lib.remote_invoke.remote_invoke_executor_factory.RemoteInvokeExecutor")
def test_create_kinesis_boto_executor(
self,
remote_invoke_output_format,
patched_remote_invoke_executor,
patched_object_to_json_converter,
patched_convert_to_default_json,
patched_kinesis_invoke_executor,
):
given_physical_resource_id = "mock-stream-name"
given_cfn_resource_summary = Mock(physical_resource_id=given_physical_resource_id)

given_kinesis_client = Mock()
self.boto_client_provider_mock.return_value = given_kinesis_client

given_remote_invoke_executor = Mock()
patched_remote_invoke_executor.return_value = given_remote_invoke_executor

given_response_consumer = Mock()
given_log_consumer = Mock()
kinesis_executor = self.remote_invoke_executor_factory._create_kinesis_boto_executor(
given_cfn_resource_summary, remote_invoke_output_format, given_response_consumer, given_log_consumer
)

self.assertEqual(kinesis_executor, given_remote_invoke_executor)
self.boto_client_provider_mock.assert_called_with("kinesis")
patched_convert_to_default_json.assert_called_once()

patched_object_to_json_converter.assert_called_once()
patched_kinesis_invoke_executor.assert_called_with(
given_kinesis_client, given_physical_resource_id, remote_invoke_output_format
)

patched_remote_invoke_executor.assert_called_with(
request_mappers=[patched_convert_to_default_json()],
response_mappers=[patched_object_to_json_converter()],
boto_action_executor=patched_kinesis_invoke_executor(),
response_consumer=given_response_consumer,
log_consumer=given_log_consumer,
)

0 comments on commit 6ccd872

Please sign in to comment.