Skip to content

Commit

Permalink
feat: Add kafka module (#75)
Browse files Browse the repository at this point in the history
* feat: add kafka module

* feat(kafka): add kafka users

* doc(kafka): Change kafka module docs

* doc(kafka): Add kafka plugins

* feat(kafka): Add kafka backups

* feat(kafka): Add outputs
  • Loading branch information
elsebasan authored Jul 17, 2024
1 parent dbd0022 commit d32d99b
Show file tree
Hide file tree
Showing 7 changed files with 736 additions and 0 deletions.
75 changes: 75 additions & 0 deletions aws/kafka/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
<!-- BEGIN_TF_DOCS -->
## Requirements

No requirements.

## Providers

| Name | Version |
|------|---------|
| <a name="provider_aws"></a> [aws](#provider\_aws) | n/a |
| <a name="provider_random"></a> [random](#provider\_random) | n/a |
| <a name="provider_terraform"></a> [terraform](#provider\_terraform) | n/a |

## Modules

| Name | Source | Version |
|------|--------|---------|
| <a name="module_kms"></a> [kms](#module\_kms) | github.com/TradrApi/terraform-modules//aws/kms | v1 |
| <a name="module_msk_s3_bkp"></a> [msk\_s3\_bkp](#module\_msk\_s3\_bkp) | github.com/TradrApi/terraform-modules//aws/s3 | v1 |
| <a name="module_mskconnect_custom_plugins"></a> [mskconnect\_custom\_plugins](#module\_mskconnect\_custom\_plugins) | github.com/TradrApi/terraform-modules//aws/s3 | v1 |

## Resources

| Name | Type |
|------|------|
| [aws_cloudwatch_log_group.msk](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/cloudwatch_log_group) | resource |
| [aws_cloudwatch_log_group.s3_sink_connector](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/cloudwatch_log_group) | resource |
| [aws_iam_policy.msk_s3_bkp](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/iam_policy) | resource |
| [aws_iam_role.msk_s3_bkp](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/iam_role) | resource |
| [aws_iam_role_policy_attachment.msk_s3_bkp](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/iam_role_policy_attachment) | resource |
| [aws_msk_cluster.this](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/msk_cluster) | resource |
| [aws_msk_configuration.this](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/msk_configuration) | resource |
| [aws_msk_scram_secret_association.users](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/msk_scram_secret_association) | resource |
| [aws_mskconnect_connector.backup_msk_to_s3](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/mskconnect_connector) | resource |
| [aws_mskconnect_custom_plugin.plugins](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/mskconnect_custom_plugin) | resource |
| [aws_s3_bucket_server_side_encryption_configuration.msk_s3_bkp](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/s3_bucket_server_side_encryption_configuration) | resource |
| [aws_s3_object.plugins](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/s3_object) | resource |
| [aws_secretsmanager_secret.msk_users](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/secretsmanager_secret) | resource |
| [aws_secretsmanager_secret_policy.msk_users](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/secretsmanager_secret_policy) | resource |
| [aws_secretsmanager_secret_version.msk_users](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/secretsmanager_secret_version) | resource |
| [aws_security_group.msk](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/security_group) | resource |
| [aws_security_group.s3_sink_connector](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/security_group) | resource |
| [aws_security_group_rule.msk](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/security_group_rule) | resource |
| [aws_security_group_rule.s3_sink_connector_outbound](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/security_group_rule) | resource |
| [random_password.msk](https://registry.terraform.io/providers/hashicorp/random/latest/docs/resources/password) | resource |
| [terraform_data.plugins](https://registry.terraform.io/providers/hashicorp/terraform/latest/docs/resources/data) | resource |
| [aws_caller_identity.this](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/caller_identity) | data source |
| [aws_iam_policy_document.msk_users](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/iam_policy_document) | data source |

## Inputs

| Name | Description | Type | Default | Required |
|------|-------------|------|---------|:--------:|
| <a name="input_allowed_cidr_blocks"></a> [allowed\_cidr\_blocks](#input\_allowed\_cidr\_blocks) | Allowed CIDR blocks. | `list(string)` | `[]` | no |
| <a name="input_client_subnets"></a> [client\_subnets](#input\_client\_subnets) | Client subnets. | `list(string)` | `[]` | no |
| <a name="input_cluster_name"></a> [cluster\_name](#input\_cluster\_name) | msk cluster name. | `string` | `null` | no |
| <a name="input_ebs_volume_size"></a> [ebs\_volume\_size](#input\_ebs\_volume\_size) | EBS volume size. | `number` | `10` | no |
| <a name="input_environment"></a> [environment](#input\_environment) | Environment name. | `string` | `null` | no |
| <a name="input_kafka_version"></a> [kafka\_version](#input\_kafka\_version) | Kafka version. | `string` | `"3.4.0"` | no |
| <a name="input_msk_instance_type"></a> [msk\_instance\_type](#input\_msk\_instance\_type) | MSK instance type. | `string` | `"kafka.t3.small"` | no |
| <a name="input_number_of_broker_nodes"></a> [number\_of\_broker\_nodes](#input\_number\_of\_broker\_nodes) | Number of broker nodes. | `number` | `3` | no |
| <a name="input_platform"></a> [platform](#input\_platform) | Platform name. | `string` | `null` | no |
| <a name="input_private_subnets"></a> [private\_subnets](#input\_private\_subnets) | Private subnets. | `list(string)` | `[]` | no |
| <a name="input_region"></a> [region](#input\_region) | AWS region. | `string` | `null` | no |
| <a name="input_security_groups"></a> [security\_groups](#input\_security\_groups) | Security groups. | `list(string)` | `[]` | no |
| <a name="input_users"></a> [users](#input\_users) | MSK users. | `list(string)` | `[]` | no |
| <a name="input_vpc_id"></a> [vpc\_id](#input\_vpc\_id) | VPC id. where the MSK cluster will be created. | `string` | `null` | no |

## Outputs

| Name | Description |
|------|-------------|
| <a name="output_msk_brokers_by_auth_method"></a> [msk\_brokers\_by\_auth\_method](#output\_msk\_brokers\_by\_auth\_method) | n/a |
| <a name="output_msk_sasl_scram_users"></a> [msk\_sasl\_scram\_users](#output\_msk\_sasl\_scram\_users) | n/a |
<!-- END_TF_DOCS -->
255 changes: 255 additions & 0 deletions aws/kafka/backups.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,255 @@
module "msk_s3_bkp" {
source = "github.com/TradrApi/terraform-modules//aws/s3?ref=v1"

bucket_name = "${var.environment}-${var.platform}-msk-backup"

block_public_access = true
create_bucket_acl = false
}



resource "aws_security_group" "s3_sink_connector" {
name = "${var.platform}-${var.environment}-amazon-s3-sink-connector"
vpc_id = var.vpc_id
}


resource "aws_cloudwatch_log_group" "s3_sink_connector" {
name = "${var.platform}-${var.environment}/amazon-msk-s3-sink-connector"
retention_in_days = 3
}

resource "aws_security_group_rule" "s3_sink_connector_outbound" {
for_each = {
all = {
port = 0
description = "ALL"
}
}

security_group_id = aws_security_group.s3_sink_connector.id
protocol = "-1"
cidr_blocks = ["0.0.0.0/0"]

description = each.value.description
from_port = each.value.port
to_port = each.value.port

type = "egress"
}




resource "aws_s3_bucket_server_side_encryption_configuration" "msk_s3_bkp" {
bucket = module.msk_s3_bkp.bucket.id

rule {
apply_server_side_encryption_by_default {
sse_algorithm = "AES256"
}
}
}


data "aws_caller_identity" "this" {}

resource "aws_iam_role" "msk_s3_bkp" {
name = replace(title("${var.platform}-msk-bkp-role"), "-", "")

assume_role_policy = jsonencode({
"Version" : "2012-10-17",
"Statement" : [
{
"Effect" : "Allow",
"Principal" : {
"Service" : "kafkaconnect.amazonaws.com"
},
"Action" : "sts:AssumeRole"
"Condition" : {
"StringEquals" : {
"aws:SourceAccount" : data.aws_caller_identity.this.account_id
}
}
}
]
})
}

resource "aws_iam_policy" "msk_s3_bkp" {
name = replace(title("${var.platform}-msk-bkp-policy"), "-", "")

policy = jsonencode({
"Version" : "2012-10-17",
"Statement" : [
{
"Effect" : "Allow",
"Action" : [
"s3:ListAllMyBuckets"
],
"Resource" : "arn:aws:s3:::*"
},
{
"Effect" : "Allow",
"Action" : [
"s3:ListBucket",
"s3:GetBucketLocation"
],
"Resource" : "arn:aws:s3:::${module.msk_s3_bkp.bucket.id}"
},
{
"Effect" : "Allow",
"Action" : [
"s3:PutObject",
"s3:GetObject",
"s3:AbortMultipartUpload",
"s3:PutObjectTagging"
],
"Resource" : "arn:aws:s3:::${module.msk_s3_bkp.bucket.id}/*"
},
# See https://repost.aws/knowledge-center/msk-connector-connect-errors for the below rules
{
"Effect" : "Allow",
"Action" : [
"kafka-cluster:Connect",
"kafka-cluster:DescribeCluster"
],
"Resource" : [
aws_msk_cluster.this.arn
]
},
{
"Effect" : "Allow",
"Action" : [
"kafka-cluster:ReadData",
"kafka-cluster:DescribeTopic"
],
"Resource" : [
"*"
]
},
{
"Effect" : "Allow",
"Action" : [
"kafka-cluster:WriteData",
"kafka-cluster:DescribeTopic"
],
"Resource" : [
"*"
]
},
{
"Effect" : "Allow",
"Action" : [
"kafka-cluster:CreateTopic",
"kafka-cluster:WriteData",
"kafka-cluster:ReadData",
"kafka-cluster:DescribeTopic"
],
"Resource" : [
"*"
]
},
{
"Effect" : "Allow",
"Action" : [
"kafka-cluster:AlterGroup",
"kafka-cluster:DescribeGroup"
],
"Resource" : [
"*"
]
}
]
})
}




resource "aws_iam_role_policy_attachment" "msk_s3_bkp" {
policy_arn = aws_iam_policy.msk_s3_bkp.arn
role = aws_iam_role.msk_s3_bkp.name
}



resource "aws_mskconnect_connector" "backup_msk_to_s3" {
name = "${var.platform}-msk-bkp-to-s3"
kafkaconnect_version = "2.7.1"

capacity {
autoscaling {
mcu_count = 1
max_worker_count = 2
min_worker_count = 1

scale_in_policy {
cpu_utilization_percentage = 10
}

scale_out_policy {
cpu_utilization_percentage = 80
}
}
}

connector_configuration = {
"s3.region" = var.region
"s3.bucket.name" = module.msk_s3_bkp.bucket_id
"connector.class" = "io.confluent.connect.s3.S3SinkConnector"
"format.class" = "io.confluent.connect.s3.format.json.JsonFormat"
"flush.size" = "1"
"schema.compatibility" = "NONE"
"tasks.max" = "2"
"partitioner.class" = "io.confluent.connect.storage.partitioner.DefaultPartitioner"
"storage.class" = "io.confluent.connect.s3.storage.S3Storage"
"topics.dir" = "topics"
"topics.regex" = "^(?!__).*"
"behavior.on.null.values" = "ignore"
"value.converter" = "org.apache.kafka.connect.json.JsonConverter"
"value.converter.schemas.enable" = "false"
}

kafka_cluster {
apache_kafka_cluster {
bootstrap_servers = aws_msk_cluster.this.bootstrap_brokers_sasl_iam

vpc {
security_groups = [aws_security_group.s3_sink_connector.id]
subnets = var.private_subnets
}
}

}

kafka_cluster_client_authentication {
authentication_type = "IAM"
}

kafka_cluster_encryption_in_transit {
encryption_type = "TLS"
}

plugin {
custom_plugin {
arn = aws_mskconnect_custom_plugin.plugins["amazon-s3-sink-connector"].arn
revision = aws_mskconnect_custom_plugin.plugins["amazon-s3-sink-connector"].latest_revision
}
}

log_delivery {
worker_log_delivery {
cloudwatch_logs {
log_group = aws_cloudwatch_log_group.s3_sink_connector.name
enabled = true
}
}
}

service_execution_role_arn = aws_iam_role.msk_s3_bkp.arn
}



Loading

0 comments on commit d32d99b

Please sign in to comment.