Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add remote invoke implementation for Kinesis stream service for put_record API #6063

Merged
merged 6 commits into from
Oct 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion samcli/commands/remote/remote_invoke_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
get_resource_summary,
get_resource_summary_from_physical_id,
)
from samcli.lib.utils.resources import AWS_LAMBDA_FUNCTION, AWS_SQS_QUEUE
from samcli.lib.utils.resources import AWS_KINESIS_STREAM, AWS_LAMBDA_FUNCTION, AWS_SQS_QUEUE
from samcli.lib.utils.stream_writer import StreamWriter

LOG = logging.getLogger(__name__)
Expand Down Expand Up @@ -215,6 +215,11 @@ def _get_from_physical_resource_id(self) -> CloudFormationResourceSummary:
sqs_client = self._boto_client_provider("sqs")
resource_id = get_queue_url_from_arn(sqs_client, resource_arn.resource_id)

if SUPPORTED_SERVICES.get(service_from_arn) == AWS_KINESIS_STREAM:
# Note (hnnasit): Add unit test after AWS_KINESIS_STREAM is added to SUPPORTED_SERVICES
# StreamName extracted from arn is used as resource_id.
resource_id = resource_arn.resource_id

return CloudFormationResourceSummary(
cast(str, SUPPORTED_SERVICES.get(service_from_arn)),
resource_id,
Expand Down
132 changes: 132 additions & 0 deletions samcli/lib/remote_invoke/kinesis_invoke_executors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
"""
Remote invoke executor implementation for Kinesis streams
"""
import logging
import uuid
from dataclasses import asdict, dataclass
from typing import cast

from botocore.exceptions import ClientError, ParamValidationError
from mypy_boto3_kinesis import KinesisClient

from samcli.lib.remote_invoke.exceptions import (
ErrorBotoApiCallException,
InvalidResourceBotoParameterException,
)
from samcli.lib.remote_invoke.remote_invoke_executors import (
BotoActionExecutor,
RemoteInvokeIterableResponseType,
RemoteInvokeOutputFormat,
RemoteInvokeResponse,
)

LOG = logging.getLogger(__name__)
STREAM_NAME = "StreamName"
DATA = "Data"
PARTITION_KEY = "PartitionKey"


@dataclass
class KinesisStreamPutRecordTextOutput:
"""
Dataclass that stores put_record boto3 API fields used to create
text output.
"""

ShardId: str
SequenceNumber: str

mndeveci marked this conversation as resolved.
Show resolved Hide resolved
def get_output_response_dict(self) -> dict:
"""
Returns a dict of existing dataclass fields.
Returns
-------
dict
Returns the dict of the fields that will be used as the output response for
text format output.
"""
return asdict(self, dict_factory=lambda x: {k: v for (k, v) in x if v is not None})


class KinesisPutDataExecutor(BotoActionExecutor):
"""
Calls "put_record" method of "Kinesis stream" service with given input.
If a file location provided, the file handle will be passed as input object.
"""

_kinesis_client: KinesisClient
_stream_name: str
_remote_output_format: RemoteInvokeOutputFormat
request_parameters: dict

def __init__(self, kinesis_client: KinesisClient, physical_id: str, remote_output_format: RemoteInvokeOutputFormat):
self._kinesis_client = kinesis_client
self._remote_output_format = remote_output_format
self._stream_name = physical_id
self.request_parameters = {}

def validate_action_parameters(self, parameters: dict) -> None:
"""
Validates the input boto parameters and prepares the parameters for calling the API.
Parameters
----------
parameters: dict
Boto parameters provided as input
"""
for parameter_key, parameter_value in parameters.items():
if parameter_key == STREAM_NAME:
LOG.warning("StreamName is defined using the value provided for resource_id argument.")
elif parameter_key == DATA:
LOG.warning("Data is defined using the value provided for either --event or --event-file options.")
else:
self.request_parameters[parameter_key] = parameter_value

if PARTITION_KEY not in self.request_parameters:
self.request_parameters[PARTITION_KEY] = str(uuid.uuid4())

def _execute_action(self, payload: str) -> RemoteInvokeIterableResponseType:
"""
Calls "put_record" method to write single data record to Kinesis data stream.
Parameters
----------
payload: str
The Data record which will be sent to the Kinesis stream
Yields
------
RemoteInvokeIterableResponseType
Response that is consumed by remote invoke consumers after execution
"""
if payload:
self.request_parameters[DATA] = payload
else:
self.request_parameters[DATA] = "{}"
LOG.debug("Input event not found, putting a record with Data {}")
self.request_parameters[STREAM_NAME] = self._stream_name
LOG.debug(
"Calling kinesis_client.put_record with StreamName:%s, Data:%s",
self.request_parameters[STREAM_NAME],
payload,
)
try:
put_record_response = cast(dict, self._kinesis_client.put_record(**self.request_parameters))

if self._remote_output_format == RemoteInvokeOutputFormat.JSON:
yield RemoteInvokeResponse(put_record_response)
if self._remote_output_format == RemoteInvokeOutputFormat.TEXT:
put_record_text_output = KinesisStreamPutRecordTextOutput(
ShardId=put_record_response["ShardId"],
SequenceNumber=put_record_response["SequenceNumber"],
)
output_data = put_record_text_output.get_output_response_dict()
yield RemoteInvokeResponse(output_data)
except ParamValidationError as param_val_ex:
raise InvalidResourceBotoParameterException(
f"Invalid parameter key provided."
f" {str(param_val_ex).replace(f'{STREAM_NAME}, ', '').replace(f'{DATA}, ', '')}"
)
except ClientError as client_ex:
raise ErrorBotoApiCallException(client_ex) from client_ex
39 changes: 39 additions & 0 deletions samcli/lib/remote_invoke/remote_invoke_executor_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import logging
from typing import Any, Callable, Dict, Optional

from samcli.lib.remote_invoke.kinesis_invoke_executors import KinesisPutDataExecutor
from samcli.lib.remote_invoke.lambda_invoke_executors import (
DefaultConvertToJSON,
LambdaInvokeExecutor,
Expand Down Expand Up @@ -224,6 +225,44 @@ def _create_sqs_boto_executor(
log_consumer=log_consumer,
)

def _create_kinesis_boto_executor(
self,
cfn_resource_summary: CloudFormationResourceSummary,
remote_invoke_output_format: RemoteInvokeOutputFormat,
response_consumer: RemoteInvokeConsumer[RemoteInvokeResponse],
log_consumer: RemoteInvokeConsumer[RemoteInvokeLogOutput],
) -> RemoteInvokeExecutor:
"""Creates a remote invoke executor for Kinesis resource type based on
the boto action being called.

Parameters
----------
cfn_resource_summary: CloudFormationResourceSummary
Information about the Kinesis stream resource
remote_invoke_output_format: RemoteInvokeOutputFormat
Response output format that will be used for remote invoke execution
response_consumer: RemoteInvokeConsumer[RemoteInvokeResponse]
Consumer instance which can process RemoteInvokeResponse events
log_consumer: RemoteInvokeConsumer[RemoteInvokeLogOutput]
Consumer instance which can process RemoteInvokeLogOutput events

Returns
-------
RemoteInvokeExecutor
Returns the Executor created for Kinesis stream
"""
LOG.info("Putting record to Kinesis data stream %s", cfn_resource_summary.logical_resource_id)
kinesis_client = self._boto_client_provider("kinesis")
return RemoteInvokeExecutor(
request_mappers=[DefaultConvertToJSON()],
response_mappers=[ResponseObjectToJsonStringMapper()],
boto_action_executor=KinesisPutDataExecutor(
kinesis_client, cfn_resource_summary.physical_resource_id, remote_invoke_output_format
),
response_consumer=response_consumer,
log_consumer=log_consumer,
)

# mapping definition for each supported resource type
REMOTE_INVOKE_EXECUTOR_MAPPING: Dict[
str,
Expand Down
142 changes: 142 additions & 0 deletions tests/unit/lib/remote_invoke/test_kinesis_invoke_executors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
from unittest import TestCase
from unittest.mock import patch, Mock

from parameterized import parameterized, parameterized_class
from samcli.lib.remote_invoke.kinesis_invoke_executors import (
RemoteInvokeOutputFormat,
KinesisPutDataExecutor,
ParamValidationError,
InvalidResourceBotoParameterException,
ErrorBotoApiCallException,
ClientError,
KinesisStreamPutRecordTextOutput,
)
from samcli.lib.remote_invoke.remote_invoke_executors import RemoteInvokeResponse


class TestKinesisStreamPutRecordTextOutput(TestCase):
@parameterized.expand(
[
("mock-shard-id", "mock-sequence-number"),
]
)
def test_kinesis_put_record_text_output(self, shard_id, sequence_number):
text_output = KinesisStreamPutRecordTextOutput(ShardId=shard_id, SequenceNumber=sequence_number)
self.assertEqual(text_output.ShardId, shard_id)
self.assertEqual(text_output.SequenceNumber, sequence_number)

@parameterized.expand(
[
(
"mock-shard-id",
"mock-sequence-number",
{
"ShardId": "mock-shard-id",
"SequenceNumber": "mock-sequence-number",
},
),
]
)
def test_get_output_response_dict(self, shard_id, sequence_number, expected_output):
text_output = KinesisStreamPutRecordTextOutput(ShardId=shard_id, SequenceNumber=sequence_number)
output_response_dict = text_output.get_output_response_dict()
self.assertEqual(output_response_dict, expected_output)


@parameterized_class(
"output",
[[RemoteInvokeOutputFormat.TEXT], [RemoteInvokeOutputFormat.JSON]],
)
class TestKinesisPutDataExecutor(TestCase):
output: RemoteInvokeOutputFormat

def setUp(self) -> None:
self.kinesis_client = Mock()
self.stream_name = "mock-kinesis-stream"
self.kinesis_put_data_executor = KinesisPutDataExecutor(self.kinesis_client, self.stream_name, self.output)

@patch("samcli.lib.remote_invoke.kinesis_invoke_executors.uuid")
def test_execute_action_successful(self, patched_uuid):
mock_uuid_value = "patched-uuid-value"
patched_uuid.uuid4.return_value = mock_uuid_value
given_input_data = "hello world"
mock_shard_id = "shardId-000000000000"
mock_sequence_number = "2941492a-5847-4ebb-a8a3-58c07ce9f198"
mock_text_response = {
"ShardId": mock_shard_id,
"SequenceNumber": mock_sequence_number,
}

mock_json_response = {
"ShardId": mock_shard_id,
"SequenceNumber": mock_sequence_number,
"ResponseMetadata": {},
}
self.kinesis_client.put_record.return_value = {
"ShardId": mock_shard_id,
"SequenceNumber": mock_sequence_number,
"ResponseMetadata": {},
}
self.kinesis_put_data_executor.validate_action_parameters({})
result = self.kinesis_put_data_executor._execute_action(given_input_data)
if self.output == RemoteInvokeOutputFormat.JSON:
self.assertEqual(list(result), [RemoteInvokeResponse(mock_json_response)])
else:
self.assertEqual(list(result), [RemoteInvokeResponse(mock_text_response)])

self.kinesis_client.put_record.assert_called_with(
Data=given_input_data, StreamName=self.stream_name, PartitionKey=mock_uuid_value
)

@parameterized.expand(
[
({}, {"PartitionKey": "mock-uuid-value"}),
(
{"ExplicitHashKey": "mock-explicit-hash-key", "SequenceNumberForOrdering": "1"},
{
"PartitionKey": "mock-uuid-value",
"ExplicitHashKey": "mock-explicit-hash-key",
"SequenceNumberForOrdering": "1",
},
),
(
{
"PartitionKey": "override-partition-key",
},
{
"PartitionKey": "override-partition-key",
},
),
(
{"StreamName": "mock-stream-name", "Data": "mock-data"},
{"PartitionKey": "mock-uuid-value"},
),
(
{"invalidParameterKey": "invalidParameterValue"},
{"invalidParameterKey": "invalidParameterValue", "PartitionKey": "mock-uuid-value"},
),
]
)
@patch("samcli.lib.remote_invoke.kinesis_invoke_executors.uuid")
def test_validate_action_parameters(self, parameters, expected_boto_parameters, patched_uuid):
mock_uuid_value = "mock-uuid-value"
patched_uuid.uuid4.return_value = mock_uuid_value
self.kinesis_put_data_executor.validate_action_parameters(parameters)
self.assertEqual(self.kinesis_put_data_executor.request_parameters, expected_boto_parameters)

@parameterized.expand(
[
(ParamValidationError(report="Invalid parameters"), InvalidResourceBotoParameterException),
(
ClientError(error_response={"Error": {"Code": "MockException"}}, operation_name="send_message"),
ErrorBotoApiCallException,
),
]
)
def test_execute_action_put_record_throws_boto_errors(self, boto_error, expected_error_thrown):
given_input_message = "hello world"
self.kinesis_client.put_record.side_effect = boto_error
with self.assertRaises(expected_error_thrown):
self.kinesis_put_data_executor.validate_action_parameters({})
for _ in self.kinesis_put_data_executor._execute_action(given_input_message):
pass
Original file line number Diff line number Diff line change
Expand Up @@ -231,3 +231,48 @@ def test_create_sqs_boto_executor(
response_consumer=given_response_consumer,
log_consumer=given_log_consumer,
)

@parameterized.expand(itertools.product([RemoteInvokeOutputFormat.JSON, RemoteInvokeOutputFormat.TEXT]))
@patch("samcli.lib.remote_invoke.remote_invoke_executor_factory.KinesisPutDataExecutor")
@patch("samcli.lib.remote_invoke.remote_invoke_executor_factory.DefaultConvertToJSON")
@patch("samcli.lib.remote_invoke.remote_invoke_executor_factory.ResponseObjectToJsonStringMapper")
@patch("samcli.lib.remote_invoke.remote_invoke_executor_factory.RemoteInvokeExecutor")
def test_create_kinesis_boto_executor(
self,
remote_invoke_output_format,
patched_remote_invoke_executor,
patched_object_to_json_converter,
patched_convert_to_default_json,
patched_kinesis_invoke_executor,
):
given_physical_resource_id = "mock-stream-name"
given_cfn_resource_summary = Mock(physical_resource_id=given_physical_resource_id)

given_kinesis_client = Mock()
self.boto_client_provider_mock.return_value = given_kinesis_client

given_remote_invoke_executor = Mock()
patched_remote_invoke_executor.return_value = given_remote_invoke_executor

given_response_consumer = Mock()
given_log_consumer = Mock()
kinesis_executor = self.remote_invoke_executor_factory._create_kinesis_boto_executor(
given_cfn_resource_summary, remote_invoke_output_format, given_response_consumer, given_log_consumer
)

self.assertEqual(kinesis_executor, given_remote_invoke_executor)
self.boto_client_provider_mock.assert_called_with("kinesis")
patched_convert_to_default_json.assert_called_once()

patched_object_to_json_converter.assert_called_once()
patched_kinesis_invoke_executor.assert_called_with(
given_kinesis_client, given_physical_resource_id, remote_invoke_output_format
)

patched_remote_invoke_executor.assert_called_with(
request_mappers=[patched_convert_to_default_json()],
response_mappers=[patched_object_to_json_converter()],
boto_action_executor=patched_kinesis_invoke_executor(),
response_consumer=given_response_consumer,
log_consumer=given_log_consumer,
)