Skip to content

Commit

Permalink
new: EventBridge Pipes support 🎉
Browse files Browse the repository at this point in the history
  • Loading branch information
ljacobsson committed Dec 14, 2022
1 parent c5ed5dc commit b97a907
Show file tree
Hide file tree
Showing 9 changed files with 5,500 additions and 6,753 deletions.
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,15 @@ For AWS events, such as `aws.codepipeline` it's already enabled, but for custom

![Demo](https://github.com/mhlabs/evb-cli/raw/master/images/demo.gif)

### Generate EventBridge Pipes connections
[EventBridge Pipes](https://aws.amazon.com/eventbridge/pipes/) was one of the more exciting serverless announcements at re:Invent 2022. It lets you create a one-to-one mapping between a source and a target service so you can build event driven applications with less Lambda glue functions.

`evb pipes` helps you create pipes between resources in your CloudFormation/SAM template. Although the [AWS::Pipes::Pipe](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-pipes-pipe.html) resource is easy to compose, the IAM role that goes with it isn't that straight forward.

Use the `--guided` flag to get prompted for all optional parameters.

![Demo](https://github.com/mhlabs/evb-cli/raw/master/images/demo-pipes.gif)

### To generate an EventBridge InputTransformer object:
[Input transformers](https://docs.aws.amazon.com/eventbridge/latest/userguide/eventbridge-input-transformer-tutorial.html) are useful when you only want a small portion of the event sent to your target. This command helps you navigate the JSON payload and generate the [InputTransformer CloudFormation object](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-events-rule-inputtransformer.html)

Expand Down
Binary file added images/demo-pipes.gif
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
1 change: 1 addition & 0 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ require("./src/commands/replay-dead-letter");
require("./src/commands/local");
require("./src/commands/code-binding");
require("./src/commands/api-destination");
require("./src/commands/pipes");

program.version(package.version, "-v, --vers", "output the current version");

Expand Down
10,173 changes: 3,423 additions & 6,750 deletions package-lock.json

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@mhlabs/evb-cli",
"version": "1.1.46",
"version": "1.1.47",
"description": "A package for building EventBridge/CloudWatch Events patterns",
"main": "index.js",
"scripts": {
Expand All @@ -14,7 +14,7 @@
"license": "ISC",
"dependencies": {
"@mhlabs/aws-sdk-sso": "^0.0.16",
"aws-sdk": "^2.853.0",
"aws-sdk": "^2.1268.0",
"axios": "^0.21.4",
"cli-spinner": "^0.2.10",
"commander": "^4.1.1",
Expand All @@ -37,7 +37,7 @@
},
"homepage": "https://github.com/mhlabs/evb-cli#readme",
"devDependencies": {
"jest": "^25.1.0",
"jest": "^29.3.1",
"y18n": ">=4.0.1"
},
"bin": {
Expand Down
19 changes: 19 additions & 0 deletions src/commands/pipes/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
const program = require("commander");
const pipeBuilder = require("./pipe-builder");

program
.command("pipes")
.alias("pi")
.option("-p, --profile [profile]", "AWS profile to use")
.option("-t, --template [template]", "Path to template file", "template.yaml")
.option("-g, --guided", "Run in guided mode - prompts for all optional parameters")
.option(
"--region [region]",
"The AWS region to use. Falls back on AWS_REGION environment variable if not specified"
)
.description("Connects two compatible resources in your template via EventBridge Pipes")
.action(async (cmd) => {
await pipeBuilder.build(
cmd
);
});
194 changes: 194 additions & 0 deletions src/commands/pipes/pipe-builder.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
const supportedTypes = require("./pipes-config.json");
const pipesSchema = require("./pipes-cfn-schema.json")
const templateParser = require("../shared/template-parser");
const inputUtil = require("../shared/input-util")
let cmd;
async function build(command) {
cmd = command;
const templateName = cmd.template;
const template = templateParser.load(templateName, true);
if (!template) {
throw new Error(`Template "${templateName}" does not exist.`);
}
const compatibleSources = await getSources(template);
const sourceChoices = compatibleSources.map(p => { return { name: `[${template.Resources[p].Type}] ${p}`, value: { name: p, type: template.Resources[p].Type } } }).sort((a, b) => a.name > b.name ? 1 : -1);
let source = await inputUtil.selectFrom([...sourceChoices, "Not templated"], "Select source", true);

if (source === "Not templated") {
const allTypes = supportedTypes.filter(p => !p.Type.includes("Serverless") && p.Source).map(p => p.Type)
const type = await inputUtil.selectFrom(allTypes, "Select resource type", true);
arn = await inputUtil.text("Enter ARN")
source = { type: type, arn: arn, name: type.split(":")[1] }
}
const sourceConfig = supportedTypes.find(p => p.Type === source.type);
const sourceObj = await buildParametersForSide(sourceConfig.SourceSchemaName);

const compatibleTargets = await getTargets(template, source);
const targetChoices = compatibleTargets.map(p => { return { name: `[${template.Resources[p].Type}] ${p}`, value: { name: p, type: template.Resources[p].Type } } }).sort((a, b) => a.name > b.name ? 1 : -1);
let target = await inputUtil.selectFrom([...targetChoices, "Not templated"], "Select target", true);
if (target === "Not templated") {
const allTypes = supportedTypes.filter(p => !p.Type.includes("Serverless") && p.Target).map(p => p.Type)
const type = await inputUtil.selectFrom(allTypes, "Select resource type", true);
arn = await inputUtil.text("Enter ARN")
target = { type: type, arn: arn, name: type.split(":")[1] }
}
const targetConfig = supportedTypes.find(p => p.Type === target.type);
const targetObj = await buildParametersForSide(targetConfig.TargetSchemaName);
const sourcePropertyName = sourceConfig.SourceSchemaName.replace("PipeSource", "");
const targetPropertyName = targetConfig.TargetSchemaName.replace("PipeTarget", "");
const pipeName = `${source.name}To${target.name}Pipe`;
template.Resources[pipeName] = {
Type: "AWS::Pipes::Pipe",
Properties: {
Name: {
"Fn::Sub": "${AWS::StackName}-" + pipeName
},
RoleArn: { "Fn::GetAtt": [`${pipeName}Role`, "Arn"] },
Source: source.arn || JSON.parse(JSON.stringify(sourceConfig.ArnGetter).replace("#RESOURCE_NAME#", source.name)),
Target: target.arn || JSON.parse(JSON.stringify(targetConfig.ArnGetter).replace("#RESOURCE_NAME#", target.name))
}
}
if (Object.keys(sourceObj).length)
template.Resources[pipeName].Properties["SourceParameters"] = { [sourcePropertyName]: sourceObj };

if (Object.keys(targetObj).length)
template.Resources[pipeName].Properties["TargetParameters"] = { [targetPropertyName]: targetObj };


const role = {
Type: "AWS::IAM::Role",
Properties: {
AssumeRolePolicyDocument: {
Version: "2012-10-17",
Statement: [
{
Effect: "Allow",
Principal: {
Service: [
"pipes.amazonaws.com"
]
},
Action: [
"sts:AssumeRole"
]
}
]
},
Policies: [
{
PolicyName: "LogsPolicy",
PolicyDocument: {
Version: "2012-10-17",
Statement: [
{
Effect: "Allow",
Action: [
"logs:CreateLogStream",
"logs:CreateLogGroup",
"logs:PutLogEvents"],
Resource: "*"
},
],
}
}
]
}
};
sourceConfig.SourcePolicy.Statement[0].Resource = source.arn || JSON.parse(JSON.stringify((sourceConfig.SourcePolicy.Statement[0].Resource || sourceConfig.ArnGetter)).replace(/#RESOURCE_NAME#/g, source.name));
targetConfig.TargetPolicy.Statement[0].Resource = target.arn || JSON.parse(JSON.stringify((targetConfig.TargetPolicy.Statement[0].Resource || targetConfig.ArnGetter)).replace(/#RESOURCE_NAME#/g, target.name));
role.Properties.Policies.push({
PolicyName: "SourcePolicy",
PolicyDocument: sourceConfig.SourcePolicy
}, {
PolicyName: "TargetPolicy",
PolicyDocument: targetConfig.TargetPolicy
})
template.Resources[`${pipeName}Role`] = role
templateParser.saveTemplate();
}

async function buildParametersForSide(definitionName) {
const schema = pipesSchema.definitions[definitionName];
const obj = {};
if (schema) {
await buildParameters(obj, schema);
}

return obj;
}

async function buildParameters(obj, schema, propName, prefix, isRequired) {
prefix = prefix || "";
let settings = [];
if (schema.type === "object") {
settings.push(...Object.keys(schema.properties));
} else {
settings = [schema];
}
for (const setting of settings) {
if (!propName) propName = setting;
isRequired = isRequired || schema.required && schema.required.includes(setting);
let optionalityString = "(leave blank to skip)"
if (isRequired) {
optionalityString = "(required)";
} else if (!cmd.guided) {
continue;
}
let validationString = "";
const property = schema.properties && schema.properties[setting] || setting;
if (property.maximum && property.minimum) {
validationString += ` (${property.minimum} - ${property.maximum})`;
}

if (property["$ref"]) {
const name = property.$ref.split("/").slice(-1)[0];
obj[setting] = obj[setting] || {};
const type = await buildParameters(obj[setting], pipesSchema.definitions[name], setting, prefix + setting + ".", isRequired);
if (type === "enum") {
obj[setting] = obj[setting][setting];
}
} else if (property.enum) {
if (!isRequired) {
property.enum.push("Skip");
}

const input = await inputUtil.selectFrom(property.enum, `Select value for ${propName}`, true)
if (input === "Skip") {
continue;
}
obj[propName] = input;
return "enum";
} else if (property.type === "array") {
const input = await inputUtil.text(`Enter values for ${prefix}${setting}${validationString}. Seperate with comma. ${optionalityString}`);
if (input) {
obj[setting] = input.split(",").map(x => x.trim());
}
}
else {
let input = await inputUtil.text(`Enter value for ${prefix}${setting}${validationString} ${optionalityString}`)
if (input) {
if (property.type === "integer") {
input = parseInt(input);
} else if (property.type === "boolean") {
input = input.toLowerCase() === "true";
}
obj[setting] = input;
}
}
}
}

async function getSources(template) {
const types = supportedTypes.map(p => p.Type)
return Object.keys(template.Resources).filter(p => types.includes(template.Resources[p].Type) && supportedTypes.find(q => q.Type === template.Resources[p].Type).Source)
}

async function getTargets(template, source) {
const types = supportedTypes.map(p => p.Type)
return Object.keys(template.Resources).filter(p => types.includes(template.Resources[p].Type) && supportedTypes.find(q => q.Type === template.Resources[p].Type).Target && p !== source)
}


module.exports = {
build,
};
Loading

0 comments on commit b97a907

Please sign in to comment.