This project demonstrates below aspects
- Setting up AWS Lambda to consume messages from Amazon MSK (Kafka), all running locally using LocalStack.
- Contract Test the Lambda based on AsyncAPI 3.0 spec using Specmatic Kafka Support (#NOCODE #LOWCODE approach)
Please make sure you have Docker Desktop installed on your machine.
If you don’t have the AWS CLI installed, install it from: AWS CLI installation
You can install LocalStack via pip:
brew install localstack
Signup with localstack to get an Auth Token (choose an appropriate license - Example: Trial or Hobby licence).
Since LocalStack is a mock AWS environment, configure a fake profile:
aws configure --profile localstack
- AWS Access Key:
test
- AWS Secret Access Key:
test
- Region:
us-east-1
- Output Format:
json
localstack auth set-token <your-auth-token>
LOCALSTACK_PERSISTENCE=1 localstack start
Please refer to GitHub comment.
aws kafka create-cluster \
--cluster-name my-kafka-cluster \
--kafka-version 2.8.1 \
--broker-node-group-info file://broker-config.json \
--number-of-broker-nodes 1 \
--region us-east-1 \
--profile localstack \
--endpoint-url=http://localhost:4566
aws kafka list-clusters \
--region us-east-1 \
--profile localstack \
--endpoint-url=http://localhost:4566
Please save the value of the ClusterArn field in the response.
You will need to use this in some of the next steps where you see <YOUR_CLUSTER_ARN>
.
aws kafka get-bootstrap-brokers \
--cluster-arn "<YOUR_CLUSTER_ARN>" \
--profile localstack \
--endpoint-url=http://localhost:4566
Example Response:
{
"BootstrapBrokerString": "localhost.localstack.cloud:4511"
}
Pre-requisite: Install Kafka on your local machine to use the kafka-topics.sh
command.
kafka-topics --create \
--bootstrap-server localhost.localstack.cloud:4511 \
--replication-factor 1 \
--partitions 1 \
--topic io.specmatic.json.request
kafka-topics --create \
--bootstrap-server localhost.localstack.cloud:4511 \
--replication-factor 1 \
--partitions 1 \
--topic io.specmatic.json.reply
Pre-requisite: Use JDK 17, for example if you are using jenv, please run: jenv local 17
From the project root folder:
Build the project and create a fat jar
./gradlew clean shadowJar
Deploy the fat jar as a lambda function
aws lambda create-function \
--function-name LambdaToKafka \
--runtime java17 \
--role arn:aws:iam::000000000000:role/lambda-role \
--handler com.example.XsdMessageHandler \
--zip-file fileb://build/libs/aws-lambda-kafka.jar \
--timeout 30 \
--memory-size 512 \
--region us-east-1 \
--profile localstack \
--endpoint-url=http://localhost:4566
Use
q to quit.
aws lambda list-functions --profile localstack --endpoint-url=http://localhost:4566
Search for the function LambdaToKafka
using /
and q
to quit.
aws lambda create-event-source-mapping \
--function-name LambdaToKafka \
--event-source-arn "<YOUR_CLUSTER_ARN>" \
--topics "io.specmatic.json.request" \
--starting-position LATEST \
--region us-east-1 \
--profile localstack \
--endpoint-url=http://localhost:4566
Use
q to quit.
kafka-console-producer --broker-list localhost:4511 --topic io.specmatic.json.request
Copy and paste the following json object and press enter:
{"id": 1, "xsd": "xsd 1"}
Press Ctrl+D
.
kafka-console-consumer --bootstrap-server localhost:4511 --topic io.specmatic.json.reply --from-beginning
You should see the following message :
{"id": 1, "json": "Converted from XSD"}
If you don't see this message, check the logs for your lambda function:
aws logs tail /aws/lambda/LambdaToKafka --follow \
--profile localstack \
--endpoint-url=http://localhost:4566
This step now uses Specmatic Kafka Support to leverage AsyncAPI 3.0 spec to contract test the above Lambda setup. The AsyncAPI 3.0 spec models the Event Driven Architecture, the topics and the schema of messages sent / received on those topics.
Please keep your Local Stack running for this next step.
./gradlew test
You should now see the interactive Specmatic Kafka HTML test report here - build/reports/index.html
.
The report has drill down details on the messages sent and received on the Kafka topics and if the messages are as per the schema in AsyncAPI spec.
localstack stop