Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(parser): DynamoDBMarshalled helper to parse DynamoDB data structure #3442

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
15 changes: 15 additions & 0 deletions docs/utilities/parser.md
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,21 @@ If you want to extend a schema and transform a JSON stringified payload to an ob
--8<-- "examples/snippets/parser/samples/exampleSqsPayload.json"
```

### DynamoDB Stream event parsing

If you want to parse a DynamoDB stream event with unmarshalling, you can use the helper function `DynamoDBMarshalled`:

=== "DynamoDBStreamSchema with DynamoDBMarshalled"
```typescript hl_lines="17"
--8<-- "examples/snippets/parser/extendDynamoDBStreamSchema.ts"
```

=== "DynamoDBStream event payload"

```json hl_lines="13-20 49-56"
--8<-- "examples/snippets/parser/samples/exampleDynamoDBStreamPayload.json"
```

## Envelopes

When trying to parse your payload you might encounter the following situations:
Expand Down
23 changes: 23 additions & 0 deletions examples/snippets/parser/extendDynamoDBStreamSchema.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import { DynamoDBMarshalled } from '@aws-lambda-powertools/parser/helpers/dynamodb';
import {
DynamoDBStreamRecord,
DynamoDBStreamSchema,
} from '@aws-lambda-powertools/parser/schemas/dynamodb';
import { z } from 'zod';

const customSchema = z.object({
id: z.string(),
message: z.string(),
});

const extendedSchema = DynamoDBStreamSchema.extend({
Records: z.array(
DynamoDBStreamRecord.extend({
dynamodb: z.object({
NewImage: DynamoDBMarshalled(customSchema).optional(),
}),
})
),
});

type ExtendedDynamoDBStreamEvent = z.infer<typeof extendedSchema>;
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
{
"Records": [
{
"eventID": "1",
"eventVersion": "1.0",
"dynamodb": {
"ApproximateCreationDateTime": 1693997155.0,
"Keys": {
"Id": {
"N": "101"
}
},
"NewImage": {
"Message": {
"S": "New item!"
},
"Id": {
"N": "101"
}
},
"StreamViewType": "NEW_AND_OLD_IMAGES",
"SequenceNumber": "111",
"SizeBytes": 26
},
"awsRegion": "us-west-2",
"eventName": "INSERT",
"eventSourceARN": "eventsource_arn",
"eventSource": "aws:dynamodb"
},
{
"eventID": "2",
"eventVersion": "1.0",
"dynamodb": {
"OldImage": {
"Message": {
"S": "New item!"
},
"Id": {
"N": "101"
}
},
"SequenceNumber": "222",
"Keys": {
"Id": {
"N": "101"
}
},
"SizeBytes": 59,
"NewImage": {
"Message": {
"S": "This item has changed"
},
"Id": {
"N": "101"
}
},
"StreamViewType": "NEW_AND_OLD_IMAGES"
},
"awsRegion": "us-west-2",
"eventName": "MODIFY",
"eventSourceARN": "source_arn",
"eventSource": "aws:dynamodb"
}
]
}

7 changes: 6 additions & 1 deletion packages/parser/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,10 @@
"require": "./lib/cjs/helpers.js",
"import": "./lib/esm/helpers.js"
},
"./helpers/dynamodb": {
"require": "./lib/cjs/helpers/dynamodb.js",
"import": "./lib/esm/helpers/dynamodb.js"
},
"./types": {
"require": "./lib/cjs/types/index.js",
"import": "./lib/esm/types/index.js"
Expand Down Expand Up @@ -363,7 +367,8 @@
],
"peerDependencies": {
"@middy/core": "4.x || 5.x || 6.x",
"zod": ">=3.x"
"zod": ">=3.x",
"@aws-sdk/util-dynamodb": ">=3.x"
},
"peerDependenciesMeta": {
"zod": {
Expand Down
82 changes: 82 additions & 0 deletions packages/parser/src/helpers/dynamodb.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import type { AttributeValue } from '@aws-sdk/client-dynamodb';
import { unmarshall } from '@aws-sdk/util-dynamodb';
import { type ZodTypeAny, z } from 'zod';

/**
* A helper function to unmarshall DynamoDB stream events and validate them against a schema.
*
* @example
* ```typescript
* const mySchema = z.object({
* id: z.string(),
* name: z.string(),
* });
* const eventSchema = DynamoDBStreamSchema.extend({
* Records: z.array(
* DynamoDBStreamRecord.extend({
* dynamodb: z.object({
* NewImage: DynamoDBMarshalled(mySchema).optional(),
* }),
* })
* ),
* });
* type eventSchema = z.infer<typeof extendedSchema>;
* ```
* For example, if you have a DynamoDB stream event like the following:
*
* ```json
* {
* "Records": [
* {
* "dynamodb": {
* "NewImage": {
* "id": {
* "S": "12345"
* },
* "name": {
* "S": "John Doe"
* }
* }
* }
* }
* ]
* }
* ```
* Resulting in:
*
* ```json
* {
* "Records": [
* {
* "dynamodb": {
* "NewImage": {
* "id": "12345",
* "name": "John Doe"
* }
* }
* }
* ]
* }
* ```
*
* @param schema - The schema to validate the JSON string against
*/
const DynamoDBMarshalled = <T extends ZodTypeAny>(schema: T) =>
z
.record(z.string(), z.custom<AttributeValue>())
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would appreciate suggestions on using this type.

Setting the type to unknown causes a TypeScript error. The @aws-sdk/util-dynamodb package has a peer dependency on @aws-sdk/client-dynamodb and we are using the AttributeValue type from @aws-sdk/client-dynamodb. Should we also declare @aws-sdk/client-dynamodb as a peer dependency?

Alternatively, we could set the type to unknown and suppress the TypeScript error using biome-ignore, though I would prefer to avoid this approach.

.transform((str, ctx) => {
try {
return unmarshall(str);
} catch (err) {
ctx.addIssue({
code: 'custom',
message: 'Could not unmarshall DynamoDB stream record',
fatal: true,
});

return z.NEVER;
}
})
.pipe(schema);

export { DynamoDBMarshalled };
153 changes: 152 additions & 1 deletion packages/parser/tests/unit/helpers.test.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,22 @@
import { describe, expect, it } from 'vitest';
import { z } from 'zod';
import { JSONStringified } from '../../src/helpers.js';
import { DynamoDBMarshalled } from '../../src/helpers/dynamodb.js';
import { AlbSchema } from '../../src/schemas/alb.js';
import {
DynamoDBStreamRecord,
DynamoDBStreamSchema,
} from '../../src/schemas/dynamodb';
import {
SnsNotificationSchema,
SnsRecordSchema,
} from '../../src/schemas/sns.js';
import { SqsRecordSchema, SqsSchema } from '../../src/schemas/sqs.js';
import type { SnsEvent, SqsEvent } from '../../src/types/schema.js';
import type {
DynamoDBStreamEvent,
SnsEvent,
SqsEvent,
} from '../../src/types/schema.js';
import { getTestEvent } from './schema/utils.js';

const bodySchema = z.object({
Expand Down Expand Up @@ -152,3 +161,145 @@ describe('JSONStringified', () => {
});
});
});

describe('DynamoDBMarshalled', () => {
// Prepare
const schema = z.object({
Message: z.string(),
Id: z.number(),
});

const extendedSchema = DynamoDBStreamSchema.extend({
Records: z.array(
DynamoDBStreamRecord.extend({
dynamodb: z.object({
NewImage: DynamoDBMarshalled(schema).optional(),
}),
})
),
});

it('should correctly unmarshall and validate a valid DynamoDB stream record', () => {
// Prepare
const testInput = [
{
Message: {
S: 'New item!',
},
Id: {
N: '101',
},
},
{
Message: {
S: 'This item has changed',
},
Id: {
N: '101',
},
},
];
const expectedOutput = [
{
Id: 101,
Message: 'New item!',
},
{
Id: 101,
Message: 'This item has changed',
},
];

const testEvent = getTestEvent<DynamoDBStreamEvent>({
eventsPath: '.',
filename: 'dynamoStreamEvent',
});

testEvent.Records[0].dynamodb.NewImage = testInput[0];
testEvent.Records[1].dynamodb.NewImage = testInput[1];

// Act & Assess
expect(extendedSchema.parse(testEvent)).toStrictEqual({
Records: [
{
...testEvent.Records[0],
dynamodb: {
NewImage: expectedOutput[0],
},
},
{
...testEvent.Records[1],
dynamodb: {
NewImage: expectedOutput[1],
},
},
],
});
});

it('should throw an error if the DynamoDB stream record cannot be unmarshalled', () => {
// Prepare
const testInput = [
{
Message: {
S: 'New item!',
},
Id: {
NNN: '101', //unknown type
},
},
{
Message: {
S: 'This item has changed',
},
Id: {
N: '101',
},
},
];

const testEvent = getTestEvent<DynamoDBStreamEvent>({
eventsPath: '.',
filename: 'dynamoStreamEvent',
});

testEvent.Records[0].dynamodb.NewImage = testInput[0];
testEvent.Records[1].dynamodb.NewImage = testInput[1];

// Act & Assess
expect(() => extendedSchema.parse(testEvent)).toThrow(
'Could not unmarshall DynamoDB stream record'
);
});

it('should throw a validation error if the unmarshalled record does not match the schema', () => {
// Prepare
const testInput = [
{
Message: {
S: 'New item!',
},
Id: {
N: '101',
},
},
{
Message: {
S: 'This item has changed',
},
// Id is missing
},
];

const testEvent = getTestEvent<DynamoDBStreamEvent>({
eventsPath: '.',
filename: 'dynamoStreamEvent',
});

testEvent.Records[0].dynamodb.NewImage = testInput[0];
testEvent.Records[1].dynamodb.NewImage = testInput[1];

// Act & Assess
expect(() => extendedSchema.parse(testEvent)).toThrow();
});
});
Loading
Loading