diff --git a/README.md b/README.md
index 04e85e7..a9994bd 100644
--- a/README.md
+++ b/README.md
@@ -113,6 +113,32 @@ ledger, not for updates. Otherwise, an excessive number of events may end up in
![Amazon EventBridge](img/eventbridge.png)
+### AWS Database Migration Service (DMS)
+
+This framework can be used with [AWS Database Migration Service (DMS)](https://docs.aws.amazon.com/dms/latest/userguide/Welcome.html)
+to migrate data from a [DMS-supported source database](https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Source.html)
+into the target ledger via an Amazon Kinesis Data Stream as the delivery channel. The framework supports both full loads and
+Change Data Capture (CDC) to support one-time migrations, gradual cut-overs, or for on-going data replication from a source
+database into a QLDB ledger acting as an audit database. DDL changes are not supported by the framework.
+
+![AWS Database Migration Service](img/dms.png)
+
+Understand how DMS works with your source database and with Kinesis Data Streams target endpoints before using this
+approach.
+
+It is possible to change the value of the primary key or unique identifier for a record in the source table. To
+propagate this change through to the ledger, the framework needs to know the previous value of the identifier. To have
+DMS provide the previous identifier, add the following configuration to the migration task's settings:
+
+ "BeforeImageSettings": {
+ "EnableBeforeImage": true,
+ "ColumnFilter": "pk-only",
+ "FieldName": "before-image"
+ }
+
+See [here](https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Tasks.CustomizingTasks.TaskSettings.BeforeImage.html)
+for more details on this setting.
+
## Requirements
1. **Sign up for AWS** — Before you begin, you need an AWS account. For more information about creating an AWS
@@ -134,15 +160,15 @@ the file to an S3 bucket, where it will be used by one of the CloudFormation tem
Deploy the loader project by creating a stack with one of the templates in the `src/main/cfn` folder of the project. Select
one of the following:
-| Template | Description |
-|-------------------------------------|---------|
-| qldb-ledger-load-via-sqs.yml | SQS |
-| qldb-ledger-load-via-sns.yml | SNS |
-| qldb-ledger-load-via-sns-to-sqs.yml | SNS to SQS |
-| qldb-ledger-load-via-kinesis.yml | Kinesis |
-| qldb-ledger-load-via-msk.yml | MSK |
+| Template | Description |
+|--------------------------------------|--------------------|
+| qldb-ledger-load-via-sqs.yml | SQS |
+| qldb-ledger-load-via-sns.yml | SNS |
+| qldb-ledger-load-via-sns-to-sqs.yml | SNS to SQS |
+| qldb-ledger-load-via-kinesis.yml | Kinesis |
+| qldb-ledger-load-via-msk.yml | MSK |
| qldb-ledger-load-via-eventbridge.yml | Amazon EventBridge |
-
+| qldb-ledger-load-via-dms-kinesis.yml | DMS |
## Getting Help
diff --git a/img/dms.png b/img/dms.png
new file mode 100644
index 0000000..76c7548
Binary files /dev/null and b/img/dms.png differ
diff --git a/pom.xml b/pom.xml
index 2d5875c..6228567 100644
--- a/pom.xml
+++ b/pom.xml
@@ -23,7 +23,7 @@ SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
software.amazon.qldb
amazon-qldb-ledger-load
- 1.0.1
+ 1.1.0
jar
diff --git a/src/main/cfn/qldb-ledger-load-via-dms-kinesis.yml b/src/main/cfn/qldb-ledger-load-via-dms-kinesis.yml
new file mode 100644
index 0000000..8d3a636
--- /dev/null
+++ b/src/main/cfn/qldb-ledger-load-via-dms-kinesis.yml
@@ -0,0 +1,346 @@
+#
+# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+# SPDX-License-Identifier: MIT-0
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy of this
+# software and associated documentation files (the "Software"), to deal in the Software
+# without restriction, including without limitation the rights to use, copy, modify,
+# merge, publish, distribute, sublicense, and/or sell copies of the Software, and to
+# permit persons to whom the Software is furnished to do so.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED,
+# INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
+# PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
+# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
+# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+#
+---
+AWSTemplateFormatVersion: "2010-09-09"
+Description: >
+ Creates the components for asynchronous data load into an Amazon QLDB ledger via AWS Database Migration Service (DMS) through
+ an Amazon Kinesis Data Stream. This template creates the Kinesis Data Stream, an SQS Dead Letter Queue to receive events,
+ and the DMS target endpoint for the stream. It does not create the DMS task to perform the migration.
+
+Metadata:
+ AWS::CloudFormation::Interface:
+ ParameterGroups:
+ - Label:
+ default: "Ledger"
+ Parameters:
+ - LedgerName
+ - LedgerRegion
+ - Label:
+ default: "Lambda Loader"
+ Parameters:
+ - LambdaCodeS3Bucket
+ - LambdaCodeS3Key
+ - LoaderHandlerName
+ - RevisionWriterClass
+ - LoadEventMapperClass
+ - LambdaMemory
+ - LoaderProvisionedConcurrency
+ - MaxOccRetries
+ - MaxLedgerSessionsPerLambda
+ - LoaderStrictMode
+ - BeforeImageFieldName
+ - Label:
+ default: "Delivery Channel Settings"
+ Parameters:
+ - ShardCount
+ - StreamMode
+ - RetentionPeriodHours
+ - StreamConsumerBatchSize
+ - ParallelizationFactor
+ - MaximumRetries
+ - MaximumEventAgeSeconds
+ - MaximumBatchingWindowInSeconds
+
+Parameters:
+ LedgerName:
+ Description: "Name of the QLDB ledger"
+ Type: "String"
+
+ LedgerRegion:
+ Description: "The region where the QLDB ledger is located. Leave blank to use the local region."
+ Type: "String"
+
+ LambdaCodeS3Bucket:
+ Description: "The S3 bucket containing this stack's Lambda code"
+ Type: "String"
+
+ LambdaCodeS3Key:
+ Description: "The S3 key of the zip file containing this stack's Lambda code"
+ Type: "String"
+
+ LoaderHandlerName:
+ Description: "The fully-qualified class and method names of the loader Lambda function"
+ Type: "String"
+ Default: "software.amazon.qldb.load.receiver.DmsToKinesisDataStreamEventReceiver::handleRequest"
+
+ RevisionWriterClass:
+ Description: "The fully-qualified name of the RevisionWriter class the function should use to write events to the ledger."
+ Type: "String"
+ Default: "software.amazon.qldb.load.writer.TableMapperRevisionWriter"
+
+ LoadEventMapperClass:
+ Description: "The fully-qualified name of the LoadEventMapper class the function should use to map data from the source table to the target table."
+ Type: "String"
+ Default: "software.amazon.qldb.load.eventmap.MappingFileLoadEventMapper"
+
+ BeforeImageFieldName:
+ Description: "The name of the 'before image' field in the DMS stream event. This corresponds to the value of the 'FieldName' setting in the 'BeforeImageSettings' section of the DMS task settings."
+ Type: "String"
+ Default: "before-image"
+
+ LambdaMemory:
+ Description: "The amount of memory (GB) to allocate to the Lambda function"
+ Type: Number
+ Default: 2048
+
+ LoaderProvisionedConcurrency:
+ Description: "The number of Lambda instances to initialize for loading"
+ Type: Number
+ MinValue: 1
+ Default: 10
+
+ MaxOccRetries:
+ Description: "The maximum number of times the QLDB driver will retry a transaction on OCC conflicts"
+ Type: "Number"
+ Default: 3
+
+ MaxLedgerSessionsPerLambda:
+ Description: "The maximum of number of concurrent QLDB sessions per Lambda function instance"
+ Type: "Number"
+ Default: 1
+
+ LoaderStrictMode:
+ Description: "Strict Mode setting for the loader function"
+ Type: "String"
+ AllowedValues:
+ - "True"
+ - "False"
+ Default: "True"
+
+ StreamConsumerBatchSize:
+ Description: "The maximum number of records in each batch that Lambda pulls from the stream and sends to the function. Omit if using an existing stream."
+ Type: "Number"
+ Default: 100
+
+ ParallelizationFactor:
+ Description: "The number of batches to process concurrently from each shard. Omit if using an existing stream."
+ Type: "Number"
+ Default: 1
+
+ RetentionPeriodHours:
+ Description: "The number of hours for the data records that are stored in shards to remain accessible. Omit if using an existing stream."
+ Type: "Number"
+ Default: 24
+
+ ShardCount:
+ Description: "The number of shards that the stream uses. For greater provisioned throughput, increase the number of shards. Omit if using an existing stream."
+ Type: "Number"
+ Default: 1
+
+ StreamMode:
+ Description: "Specifies the capacity mode for the data stream. Omit if using an existing stream."
+ Type: "String"
+ AllowedValues:
+ - "ON_DEMAND"
+ - "PROVISIONED"
+ Default: "PROVISIONED"
+
+ MaximumRetries:
+ Description: "Discard records after the specified number of retries. The default value is -1, which sets the maximum number of retries to infinite. When retries is infinite, Lambda retries failed records until the record expires in the stream."
+ Type: "Number"
+ Default: -1
+
+ MaximumEventAgeSeconds:
+ Description: "Tells the Lambda function to discard records older than the specified age (in seconds). The default value is -1, which sets the maximum age to infinite. When the value is set to infinite, Lambda never discards old records and records expire based on the stream's setting (configured via RetentionPeriodHours)."
+ Type: "Number"
+ Default: -1
+
+ MaximumBatchingWindowInSeconds:
+ Description: "The maximum amount of time, in seconds, that Lambda spends gathering records from the stream before invoking the function. A value of zero tells Lambda not to wait to buffer records."
+ Type: "Number"
+ Default: 0
+
+
+Conditions:
+ RegionSet: !Not [!Equals ["", !Ref LedgerRegion]]
+
+
+Resources:
+ DeadLetterQueue:
+ Type: AWS::SQS::Queue
+
+ Stream:
+ Type: AWS::Kinesis::Stream
+ Properties:
+ RetentionPeriodHours: !Ref RetentionPeriodHours
+ ShardCount: !Ref ShardCount
+ StreamModeDetails:
+ StreamMode: !Ref StreamMode
+
+ DmsTargetEndpointAccessRole:
+ Type: "AWS::IAM::Role"
+ Properties:
+ Policies:
+ - PolicyName: !Join [ "-", [ !Ref AWS::StackName, "DmsTargetEndpointServiceAccess" ] ]
+ PolicyDocument:
+ Version: "2012-10-17"
+ Statement:
+ - Effect: "Allow"
+ Action:
+ - "kinesis:DescribeStream"
+ - "kinesis:DescribeStreamSummary"
+ - "kinesis:ListShards"
+ - "kinesis:PutRecord"
+ - "kinesis:PutRecords"
+ Resource:
+ - !GetAtt Stream.Arn
+ AssumeRolePolicyDocument:
+ Version: "2012-10-17"
+ Statement:
+ - Effect: "Allow"
+ Principal:
+ Service:
+ - "dms.amazonaws.com"
+ Action:
+ - "sts:AssumeRole"
+
+ DmsKinesisTargetEndpoint:
+ Type: "AWS::DMS::Endpoint"
+ Properties:
+ EndpointIdentifier: !Join ["-", ["kinesis-to-qldb", !Ref LedgerName]]
+ EndpointType: "target"
+ EngineName: "kinesis"
+ KinesisSettings:
+ StreamArn: !GetAtt Stream.Arn
+ ServiceAccessRoleArn: !GetAtt DmsTargetEndpointAccessRole.Arn
+ MessageFormat: "json"
+
+ LedgerLoadFunctionRole:
+ Type: "AWS::IAM::Role"
+ Properties:
+ ManagedPolicyArns:
+ - "arn:aws:iam::aws:policy/AmazonQLDBReadOnly"
+ - "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole"
+ Policies:
+ - PolicyName: !Join [ "-", [ !Ref AWS::StackName, !Ref LedgerName, "LoaderLedgerAccess" ] ]
+ PolicyDocument:
+ Version: "2012-10-17"
+ Statement:
+ - Effect: "Allow"
+ Action:
+ - "qldb:SendCommand"
+ Resource:
+ - !Join [ "", [ "arn:aws:qldb:", !If [RegionSet, !Ref LedgerRegion, !Ref AWS::Region], ":", !Ref "AWS::AccountId", ":ledger/", !Ref LedgerName ] ]
+ - !Join [ "", [ "arn:aws:qldb:", !If [RegionSet, !Ref LedgerRegion, !Ref AWS::Region], ":", !Ref "AWS::AccountId", ":ledger/", !Ref LedgerName, "/*" ] ]
+ - Effect: "Allow"
+ Action:
+ - "qldb:PartiQLDelete"
+ - "qldb:PartiQLInsert"
+ - "qldb:PartiQLUpdate"
+ - "qldb:PartiQLSelect"
+ Resource:
+ - !Join [ "", [ "arn:aws:qldb:", !If [RegionSet, !Ref LedgerRegion, !Ref AWS::Region], ":", !Ref "AWS::AccountId", ":ledger/", !Ref LedgerName ] ]
+ - !Join [ "", [ "arn:aws:qldb:", !If [RegionSet, !Ref LedgerRegion, !Ref AWS::Region], ":", !Ref "AWS::AccountId", ":ledger/", !Ref LedgerName, "/*" ] ]
+ - Effect: "Allow"
+ Action:
+ - "kinesis:DescribeStream"
+ - "kinesis:DescribeStreamSummary"
+ - "kinesis:GetRecords"
+ - "kinesis:GetShardIterator"
+ - "kinesis:ListShards"
+ - "kinesis:ListStreams"
+ - "kinesis:SubscribeToShard"
+ Resource:
+ - !GetAtt Stream.Arn
+ - Effect: "Allow"
+ Action:
+ - "sqs:SendMessage"
+ Resource:
+ - !GetAtt DeadLetterQueue.Arn
+ AssumeRolePolicyDocument:
+ Version: "2012-10-17"
+ Statement:
+ - Effect: "Allow"
+ Principal:
+ Service:
+ - "lambda.amazonaws.com"
+ Action:
+ - "sts:AssumeRole"
+
+ LedgerLoadFunction:
+ Type: AWS::Lambda::Function
+ Properties:
+ Description: "Loads documents from Kinesis into a QLDB ledger"
+ Runtime: java11
+ Handler: !Ref LoaderHandlerName
+ MemorySize: !Ref LambdaMemory
+ Timeout: 30
+ Role: !GetAtt LedgerLoadFunctionRole.Arn
+ Environment:
+ Variables:
+ LEDGER_NAME: !Ref LedgerName
+ LEDGER_REGION: !If [RegionSet, !Ref LedgerRegion, !Ref AWS::Region]
+ MAX_SESSIONS_PER_LAMBDA: !Ref MaxLedgerSessionsPerLambda
+ MAX_OCC_RETRIES: !Ref MaxOccRetries
+ STRICT_MODE: !Ref LoaderStrictMode
+ REVISION_WRITER: !Ref RevisionWriterClass
+ LOAD_EVENT_MAPPER: !Ref LoadEventMapperClass
+ BEFORE_IMAGE_FIELD_NAME: !Ref BeforeImageFieldName
+ Code:
+ S3Bucket: !Ref LambdaCodeS3Bucket
+ S3Key: !Ref LambdaCodeS3Key
+
+ LedgerLoadFunctionVersion:
+ Type: AWS::Lambda::Version
+ Properties:
+ FunctionName: !Ref LedgerLoadFunction
+
+ LedgerLoadFunctionAlias:
+ Type: AWS::Lambda::Alias
+ Properties:
+ FunctionName: !Ref LedgerLoadFunction
+ FunctionVersion: !GetAtt LedgerLoadFunctionVersion.Version
+ Name: "LedgerLoaderLatest"
+ ProvisionedConcurrencyConfig:
+ ProvisionedConcurrentExecutions: !Ref LoaderProvisionedConcurrency
+
+ LambdaFunctionMapping:
+ Type: AWS::Lambda::EventSourceMapping
+ Properties:
+ EventSourceArn: !GetAtt Stream.Arn
+ BatchSize: !Ref StreamConsumerBatchSize
+ BisectBatchOnFunctionError: True
+ FunctionResponseTypes:
+ - ReportBatchItemFailures
+ FunctionName: !Ref LedgerLoadFunctionAlias
+ ParallelizationFactor: !Ref ParallelizationFactor
+ StartingPosition: TRIM_HORIZON
+ MaximumRetryAttempts: !Ref MaximumRetries
+ MaximumRecordAgeInSeconds: !Ref MaximumEventAgeSeconds
+ MaximumBatchingWindowInSeconds: !Ref MaximumBatchingWindowInSeconds
+ DestinationConfig:
+ OnFailure:
+ Destination: !GetAtt DeadLetterQueue.Arn
+
+
+Outputs:
+ StreamName:
+ Description: "The name of the Kinesis data stream"
+ Value: !Ref Stream
+
+ StreamArn:
+ Description: "The ARN of the Kinesis data stream"
+ Value: !GetAtt Stream.Arn
+
+ DeadLetterQueueUrl:
+ Description: "The URL of the SQS dead-letter queue created in this stack"
+ Value: !Ref DeadLetterQueue
+
+ DeadLetterQueueName:
+ Description: "The name of the SQS dead-letter queue created in this stack"
+ Value: !GetAtt DeadLetterQueue.QueueName
\ No newline at end of file
diff --git a/src/main/java/software/amazon/qldb/load/eventmap/LoadEventMapper.java b/src/main/java/software/amazon/qldb/load/eventmap/LoadEventMapper.java
index 310bd9c..f39bf14 100644
--- a/src/main/java/software/amazon/qldb/load/eventmap/LoadEventMapper.java
+++ b/src/main/java/software/amazon/qldb/load/eventmap/LoadEventMapper.java
@@ -25,11 +25,39 @@
*/
public interface LoadEventMapper {
+ /**
+ * Maps a table name from the source event to the corresponding table in the target database.
+ *
+ * @param sourceTable The name of the table in the source database to map.
+ * @return The name of the corresponding table in the target database or null if none was found.
+ */
public String mapTableName(String sourceTable);
- public IonStruct mapDataRecord(IonStruct sourceRecord, String sourceTable);
+ /**
+ * Maps a data record from a source database into a data record for the target database, handling all field name
+ * translation, data formatting and filtering, restructuring, data type conversion, etc.
+ *
+ * @param sourceRecord The data record from the source database
+ * @param beforeImage The "before image" of the record. This can be useful for updates where the new data record
+ * may need to consider the before state when setting its values. This parameter is optional.
+ * @param sourceTable The name of the table in the source database that the source record belongs to.
+ * @return A data record mapped to the target table or null if there is no mapping for the given table. It is
+ * possible for this method to return an empty IonStruct if a table mapping exists, but no fields in the
+ * source data can be mapped.
+ */
+ public IonStruct mapDataRecord(IonStruct sourceRecord, IonStruct beforeImage, String sourceTable);
- public IonValue mapPrimaryKey(IonStruct sourceRecord, String sourceTable);
+ /**
+ * Identifies the unique identifier (primary key) in the source record.
+ *
+ * @param sourceRecord The data record from the source database
+ * @param beforeImage The "before image" of the record. This can be useful for updates where the value of the unique
+ * identifier field is changed in the source database. Without the before image, it would not be
+ * possible to identify the corresponding record in the target database for update.
+ * @param sourceTable The name of the table in the source database that the source record belongs to.
+ * @return The value of the unique identifier (primary key) for the record.
+ */
+ public IonValue mapPrimaryKey(IonStruct sourceRecord, IonStruct beforeImage, String sourceTable);
}
diff --git a/src/main/java/software/amazon/qldb/load/eventmap/MappingFileLoadEventMapper.java b/src/main/java/software/amazon/qldb/load/eventmap/MappingFileLoadEventMapper.java
index 11d6253..1dd5ac0 100644
--- a/src/main/java/software/amazon/qldb/load/eventmap/MappingFileLoadEventMapper.java
+++ b/src/main/java/software/amazon/qldb/load/eventmap/MappingFileLoadEventMapper.java
@@ -55,7 +55,7 @@ public String mapTableName(String sourceTable) {
}
@Override
- public IonStruct mapDataRecord(IonStruct sourceRecord, String sourceTable) {
+ public IonStruct mapDataRecord(IonStruct sourceRecord, IonStruct beforeImage, String sourceTable) {
TableConfig config = tableMap.get(sourceTable);
if (config == null)
return null;
@@ -73,11 +73,14 @@ public IonStruct mapDataRecord(IonStruct sourceRecord, String sourceTable) {
}
@Override
- public IonValue mapPrimaryKey(IonStruct sourceRecord, String sourceTable) {
+ public IonValue mapPrimaryKey(IonStruct sourceRecord, IonStruct beforeImage, String sourceTable) {
TableConfig config = tableMap.get(sourceTable);
if (config == null)
return null;
+ if (beforeImage != null && beforeImage.containsKey(config.primaryKeyField))
+ return beforeImage.get(config.primaryKeyField);
+
return sourceRecord.get(config.primaryKeyField);
}
diff --git a/src/main/java/software/amazon/qldb/load/receiver/DmsToKinesisDataStreamEventReceiver.java b/src/main/java/software/amazon/qldb/load/receiver/DmsToKinesisDataStreamEventReceiver.java
index e65ac0d..4427bb9 100644
--- a/src/main/java/software/amazon/qldb/load/receiver/DmsToKinesisDataStreamEventReceiver.java
+++ b/src/main/java/software/amazon/qldb/load/receiver/DmsToKinesisDataStreamEventReceiver.java
@@ -30,10 +30,11 @@
import software.amazon.qldb.load.eventmap.LoadEventMapperFactory;
import software.amazon.qldb.load.writer.RevisionWriter;
import software.amazon.qldb.load.writer.RevisionWriterFactory;
+import software.amazon.qldb.load.writer.ValidationResult;
/**
- * AWS Lambda function that consumes AWS Database Migration Service (DMS) events from a Kinesis Data Stream and writes
+ * AWS Lambda function that consumes AWS Database Migration Service (DMS) events from an Amazon Kinesis Data Stream and writes
* them into a QLDB ledger, enabling automated migration and CDC replication of data from relational databases,
* mainframes, and any other data source that DMS supports. The behavior of this function with respect to writing events
* into the ledger is primarily controlled by the RevisionWriter, whose type is determined through an environment variable.
@@ -47,6 +48,15 @@ public class DmsToKinesisDataStreamEventReceiver implements RequestHandler