Skip to content

Commit

Permalink
Merge branch 'main' of github.com:umccr/orcabus into feat/reorder-events
Browse files Browse the repository at this point in the history
  • Loading branch information
mmalenic committed Feb 15, 2024
2 parents f9847b8 + a3e74d9 commit 528e90c
Show file tree
Hide file tree
Showing 5 changed files with 265 additions and 4 deletions.
20 changes: 16 additions & 4 deletions config/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { OrcaBusStatefulConfig } from '../lib/workload/orcabus-stateful-stack';
import { AuroraPostgresEngineVersion } from 'aws-cdk-lib/aws-rds';
import { OrcaBusStatelessConfig } from '../lib/workload/orcabus-stateless-stack';
import { Duration, aws_lambda, RemovalPolicy } from 'aws-cdk-lib';
import { EventSourceProps } from '../lib/workload/stateful/event_source/component';

const regName = 'OrcaBusSchemaRegistry';
const eventBusName = 'OrcaBusMain';
Expand Down Expand Up @@ -62,6 +63,16 @@ const orcaBusStatelessConfig = {
rdsMasterSecretName: rdsMasterSecretName,
};

const eventSourceConfig: EventSourceProps = {
queueName: 'orcabus-event-source-queue',
maxReceiveCount: 3,
rules: [
{
bucket: 'umccr-temp-dev',
},
],
};

interface EnvironmentConfig {
name: string;
accountId: string;
Expand All @@ -83,7 +94,6 @@ export const getEnvironmentConfig = (
schemaRegistryProps: {
...orcaBusStatefulConfig.schemaRegistryProps,
},

eventBusProps: {
...orcaBusStatefulConfig.eventBusProps,
},
Expand All @@ -99,8 +109,12 @@ export const getEnvironmentConfig = (
securityGroupProps: {
...orcaBusStatefulConfig.securityGroupProps,
},
eventSourceProps: eventSourceConfig,
},
orcaBusStatelessConfig: {
...orcaBusStatelessConfig,
eventSourceQueueName: eventSourceConfig.queueName,
},
orcaBusStatelessConfig: orcaBusStatelessConfig,
},
};

Expand All @@ -113,7 +127,6 @@ export const getEnvironmentConfig = (
schemaRegistryProps: {
...orcaBusStatefulConfig.schemaRegistryProps,
},

eventBusProps: {
...orcaBusStatefulConfig.eventBusProps,
},
Expand Down Expand Up @@ -143,7 +156,6 @@ export const getEnvironmentConfig = (
schemaRegistryProps: {
...orcaBusStatefulConfig.schemaRegistryProps,
},

eventBusProps: {
...orcaBusStatefulConfig.eventBusProps,
},
Expand Down
7 changes: 7 additions & 0 deletions lib/workload/orcabus-stateful-stack.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,22 @@ import { EventBusConstruct, EventBusProps } from './stateful/eventbridge/compone
import { DatabaseConstruct, DatabaseProps } from './stateful/database/component';
import { SecurityGroupConstruct, SecurityGroupProps } from './stateful/securitygroup/component';
import { SchemaRegistryConstruct, SchemaRegistryProps } from './stateful/schemaregistry/component';
import { EventSource, EventSourceProps } from './stateful/event_source/component';

export interface OrcaBusStatefulConfig {
schemaRegistryProps: SchemaRegistryProps;
eventBusProps: EventBusProps;
databaseProps: DatabaseProps;
securityGroupProps: SecurityGroupProps;
eventSourceProps?: EventSourceProps;
}

export class OrcaBusStatefulStack extends cdk.Stack {
readonly eventBus: EventBusConstruct;
readonly database: DatabaseConstruct;
readonly securityGroup: SecurityGroupConstruct;
readonly schemaRegistry: SchemaRegistryConstruct;
readonly eventSource?: EventSource;

constructor(scope: Construct, id: string, props: cdk.StackProps & OrcaBusStatefulConfig) {
super(scope, id, props);
Expand Down Expand Up @@ -46,5 +49,9 @@ export class OrcaBusStatefulStack extends cdk.Stack {
'SchemaRegistryConstruct',
props.schemaRegistryProps
);

if (props.eventSourceProps) {
this.eventSource = new EventSource(this, 'EventSourceConstruct', props.eventSourceProps);
}
}
}
1 change: 1 addition & 0 deletions lib/workload/orcabus-stateless-stack.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ export interface OrcaBusStatelessConfig {
lambdaRuntimePythonVersion: aws_lambda.Runtime;
bclConvertFunctionName: string;
rdsMasterSecretName: string;
eventSourceQueueName?: string;
}

export class OrcaBusStatelessStack extends cdk.Stack {
Expand Down
120 changes: 120 additions & 0 deletions lib/workload/stateful/event_source/component.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
import { Construct } from 'constructs';
import { Rule } from 'aws-cdk-lib/aws-events';
import { Queue } from 'aws-cdk-lib/aws-sqs';
import { SqsQueue } from 'aws-cdk-lib/aws-events-targets';
import { Alarm, ComparisonOperator, MathExpression } from 'aws-cdk-lib/aws-cloudwatch';
import { ServicePrincipal } from 'aws-cdk-lib/aws-iam';

/**
* Properties for defining an S3 EventBridge rule.
*/
export type EventSourceRule = {
/**
* Bucket to receive events from. If not specified, captures events from all buckets.
*/
bucket?: string;
/**
* The types of events to capture for the bucket. If not specified, captures all events.
* This should be from the list S3 EventBridge events:
* https://docs.aws.amazon.com/AmazonS3/latest/userguide/EventBridge.html
*/
eventTypes?: string[];
/**
* A prefix of the objects that are matched when receiving events from the buckets.
*/
prefix?: string;
};

/**
* Properties for the EventSource construct.
*/
export type EventSourceProps = {
/**
* The name of the queue to construct.
*/
queueName: string;
/**
* The maximum number of times a message can be unsuccessfully received before
* pushing it to the DLQ.
*/
maxReceiveCount: number;
/**
* A set of EventBridge rules to define..
*/
rules: EventSourceRule[];
};

/**
* A construct that defines an SQS S3 event source, along with a DLQ and CloudWatch alarms.
*/
export class EventSource extends Construct {
readonly queue: Queue;
readonly deadLetterQueue: Queue;
readonly alarm: Alarm;

constructor(scope: Construct, id: string, props: EventSourceProps) {
super(scope, id);

this.deadLetterQueue = new Queue(this, 'DeadLetterQueue');
this.queue = new Queue(this, 'Queue', {
queueName: props.queueName,
deadLetterQueue: {
maxReceiveCount: props.maxReceiveCount,
queue: this.deadLetterQueue,
},
});

for (const prop of props.rules) {
const rule = new Rule(scope, 'Rule', {
eventPattern: {
source: ['aws.s3'],
detailType: prop.eventTypes,
detail: {
...(prop.bucket && {
bucket: {
name: prop.bucket,
},
}),
...(prop.prefix && {
object: {
key: [
{
prefix: prop.prefix,
},
],
},
}),
},
},
});

rule.addTarget(new SqsQueue(this.queue));
}

this.queue.grantSendMessages(new ServicePrincipal('events.amazonaws.com'));

const rateOfMessages = new MathExpression({
expression: 'RATE(visible + notVisible)',
usingMetrics: {
visible: this.deadLetterQueue.metricApproximateNumberOfMessagesVisible(),
notVisible: this.deadLetterQueue.metricApproximateNumberOfMessagesVisible(),
},
});

this.alarm = new Alarm(this, 'Alarm', {
metric: rateOfMessages,
comparisonOperator: ComparisonOperator.GREATER_THAN_THRESHOLD,
threshold: 0,
evaluationPeriods: 1,
alarmName: 'Orcabus EventSource Alarm',
alarmDescription: 'An event has been received in the dead letter queue.',
});
}

/**
* Get the SQS queue ARN.
*/
get queueArn(): string {
return this.queue.queueArn;
}
}
121 changes: 121 additions & 0 deletions test/stateful/eventSourceConstruct.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
import * as cdk from 'aws-cdk-lib';
import { Match, Template } from 'aws-cdk-lib/assertions';
import { EventSource } from '../../lib/workload/stateful/event_source/component';

let stack: cdk.Stack;

function assert_common(template: Template) {
template.resourceCountIs('AWS::SQS::Queue', 2);

template.hasResourceProperties('AWS::SQS::Queue', {
QueueName: 'queue',
RedrivePolicy: {
deadLetterTargetArn: Match.anyValue(),
maxReceiveCount: 100,
},
});

template.hasResourceProperties('AWS::CloudWatch::Alarm', {
ComparisonOperator: 'GreaterThanThreshold',
EvaluationPeriods: 1,
Threshold: 0,
});

template.hasResourceProperties('AWS::Events::Rule', {
EventPattern: {
source: ['aws.s3'],
detail: {
bucket: {
name: 'bucket',
},
},
},
});
}

beforeEach(() => {
stack = new cdk.Stack();
});

test('Test EventSource created props', () => {
new EventSource(stack, 'TestEventSourceConstruct', {
queueName: 'queue',
maxReceiveCount: 100,
rules: [
{
bucket: 'bucket',
},
],
});
const template = Template.fromStack(stack);

console.log(JSON.stringify(template, undefined, 2));

assert_common(template);
});

test('Test EventSource created props with event types', () => {
new EventSource(stack, 'TestEventSourceConstruct', {
queueName: 'queue',
maxReceiveCount: 100,
rules: [
{
bucket: 'bucket',
eventTypes: ['Object Created'],
},
],
});
const template = Template.fromStack(stack);

assert_common(template);
template.hasResourceProperties('AWS::Events::Rule', {
EventPattern: {
'detail-type': ['Object Created'],
},
});
});

test('Test EventSource created props with prefix', () => {
new EventSource(stack, 'TestEventSourceConstruct', {
queueName: 'queue',
maxReceiveCount: 100,
rules: [
{
bucket: 'bucket',
prefix: 'prefix',
},
],
});
const template = Template.fromStack(stack);

assert_common(template);
template.hasResourceProperties('AWS::Events::Rule', {
EventPattern: {
detail: {
object: {
key: [
{
prefix: 'prefix',
},
],
},
},
},
});
});

test('Test EventSource created props with rules matching any bucket', () => {
new EventSource(stack, 'TestEventSourceConstruct', {
queueName: 'queue',
maxReceiveCount: 100,
rules: [{}],
});
const template = Template.fromStack(stack);

template.hasResourceProperties('AWS::Events::Rule', {
EventPattern: {
source: ['aws.s3'],
detail: {},
},
});
});

0 comments on commit 528e90c

Please sign in to comment.