Skip to content

Commit

Permalink
Merge pull request #106 from dasmeta/DMVP-4474-kafka-connect
Browse files Browse the repository at this point in the history
feat(DMVP-4474): Added new chart for kafka connect
  • Loading branch information
viktoryathegreat authored Jul 25, 2024
2 parents 0520399 + dbb1ea8 commit 85f589f
Show file tree
Hide file tree
Showing 9 changed files with 258 additions and 0 deletions.
6 changes: 6 additions & 0 deletions charts/kafka-connect/Chart.lock
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
dependencies:
- name: base
repository: https://dasmeta.github.io/helm
version: 0.2.1
digest: sha256:badb156e94af6ded7f736a245eff08854254c3217aa21dfe0ac2c7acd0cd9029
generated: "2024-07-24T09:40:37.569858+04:00"
29 changes: 29 additions & 0 deletions charts/kafka-connect/Chart.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
apiVersion: v2
name: kafka-connect
description: A Helm chart for Kafka Connect

# A chart can be either an 'application' or a 'library' chart.
#
# Application charts are a collection of templates that can be packaged into versioned archives
# to be deployed.
#
# Library charts provide useful utilities or functions for the chart developer. They're included as
# a dependency of application charts to inject those utilities and functions into the rendering
# pipeline. Library charts do not define any templates and therefore cannot be deployed.
type: application

# This is the chart version. This version number should be incremented each time you make changes
# to the chart and its templates, including the app version.
# Versions are expected to follow Semantic Versioning (https://semver.org/)
version: 1.0.0

# This is the version number of the application being deployed. This version number should be
# incremented each time you make changes to the application. Versions are not expected to
# follow Semantic Versioning. They should reflect the version the application is using.
# It is recommended to use it with quotes.
appVersion: "2.14.5"

dependencies:
- name: base
version: 0.2.1
repository: https://dasmeta.github.io/helm
10 changes: 10 additions & 0 deletions charts/kafka-connect/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# Use the official Confluent Kafka Connect base image
FROM confluentinc/cp-kafka-connect:7.2.2

# Install Aiven GCS Connector
RUN wget -O /tmp/aiven-kafka-connect-gcs-0.13.0.tar https://github.com/Aiven-Open/gcs-connector-for-apache-kafka/releases/download/v0.13.0/gcs-connector-for-apache-kafka-0.13.0.tar && \
mkdir -p /usr/share/confluent-hub-components/aiven-kafka-connect-gcs && \
tar -xvf /tmp/aiven-kafka-connect-gcs-0.13.0.tar --directory /usr/share/confluent-hub-components/aiven-kafka-connect-gcs

# Start Kafka Connect by default
CMD ["/etc/confluent/docker/run"]
31 changes: 31 additions & 0 deletions charts/kafka-connect/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Kafka Connect Helm Chart

This Helm chart deploys Kafka Connect on Kubernetes, currently configured to work with Google Cloud Storage (GCS) for sink functionality. It includes setups like a Kafka Connect deployment integrated with a GCS Sink Connector and a Schema Registry, providing flexible environment configurations.

## Prerequisites

- Kubernetes 1.18+
- Helm 3.0+
- Configured `kubectl` connected to your Kubernetes cluster
- Google Cloud credentials configured if GCS functionality is utilized (`gcs-credentials` secret has to exist in the namespace where the chart will be deployed and should include the json of GCP Service Account credentials).
For example:
```
apiVersion: v1
data:
gcs-credentials.json: "your-gcs-sa-credentials"
kind: Secret
metadata:
name: gcs-credentials
namespace: kafka
type: Opaque
```

## Installation

### Add Helm Repository

If the chart is hosted in a Helm repository:

```bash
helm repo add dasmeta https://dasmeta.github.io/helm/
helm install kafka-connect dasmeta/kafka-connect -f values.yaml --version 1.0.0
Binary file added charts/kafka-connect/charts/base-0.2.1.tgz
Binary file not shown.
12 changes: 12 additions & 0 deletions charts/kafka-connect/templates/configmap-job.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
apiVersion: v1
kind: ConfigMap
metadata:
name: deploy-gcs-connector
labels:
app: deploy-gcs-connector
data:
deploy-gcs-connector.sh: |
echo "===> Waiting for Kafka Connect to start"
while ! curl -s http://{{ .Release.Name }}:8083/; do sleep 5; done
echo "===> Deploying GCS Sink Connector"
curl -X POST -H "Content-Type: application/json" --data @/config/gcs-sink-connector.json http://{{ .Release.Name }}:8083/connectors
41 changes: 41 additions & 0 deletions charts/kafka-connect/templates/configmap.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
apiVersion: v1
kind: ConfigMap
metadata:
name: kafka-connect-config
labels:
app: kafka-connect-config
data:
gcs-sink-connector.json: |
{
"name": "{{ .Values.config.connector.name }}",
"config": {
"connector.class": "io.aiven.kafka.connect.gcs.GcsSinkConnector",
"tasks.max": "{{ .Values.config.connector.gcs.tasksMax }}",
"topics": "{{ .Values.config.connector.gcs.topics }}",
"gcs.bucket.name": "{{ .Values.config.connector.gcs.bucketName }}",
"gcs.part.size": "{{ .Values.config.connector.gcs.partSize }}",
"flush.size": "{{ .Values.config.connector.gcs.flushSize }}",
"gcs.credentials.path": "{{ .Values.config.connector.gcs.credentialsPath }}",
"format.output.type": "jsonl",
"format.output.envelope": false,
"format.output.fields": "value",
"_format.output.fields": "key,value,offset,timestamp,headers",
"file.name.prefix": "raw/",
"file.compression.type": "gzip",
"file.name.timestamp.timezone": "UTC",
"file.name.timestamp.source": "wallclock",
"file.name.template": "{{ printf `y={{{{timestamp:unit=yyyy}}}}/m={{{{timestamp:unit=MM}}}}/d={{{{timestamp:unit=dd}}}}/H={{{{timestamp:unit=HH}}}}/{{topic}}-{{partition:padding=true}}-{{start_offset:padding=true}}.gz` }}",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"_key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"key.converter.schemas.enable": false,
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"_value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"__value.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter.schemas.enable": false,
"errors.tolerance": "all",
"errors.deadletterqueue.topic.name": "connect.sink.dlt.gcs.users",
"errors.log.include.messages": true,
"errors.deadletterqueue.context.headers.enable": true,
"errors.deadletterqueue.topic.replication.factor": 1
}
}
28 changes: 28 additions & 0 deletions charts/kafka-connect/templates/deployment-schema-registry.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: schema-registry
labels:
app: schema-registry
spec:
replicas: 1
selector:
matchLabels:
app: schema-registry
template:
metadata:
labels:
app: schema-registry
spec:
containers:
- name: schema-registry
image: confluentinc/cp-schema-registry:latest
ports:
- containerPort: 8081
env:
- name: SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS
value: "{{ .Values.config.kafka.bootstrapServers }}"
- name: SCHEMA_REGISTRY_HOST_NAME
value: "schema-registry"
- name: SCHEMA_REGISTRY_LISTENERS
value: "http://0.0.0.0:8081"
101 changes: 101 additions & 0 deletions charts/kafka-connect/values.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
base:
image:
repository: dasmeta/kafka-connect
tag: 1.0.1
pullPolicy: IfNotPresent

replicaCount: 1

service:
type: ClusterIP
port: 8083

resources:
limits:
cpu: 2000m
memory: 2048Mi
requests:
cpu: 1500m
memory: 2048Mi

secretsDefaultEngine: disabled

containerPort: 8083

encodedGcsCredentials: ""

schemaRegistry:
enabled: true
url: "http://schema-registry:8081"

deployment:
volumes:
- mountPath: /opt/confluent/secrets/credential
name: gcs-credentials
secret:
secretName: gcs-credentials
readOnly: true
- name: config
mountPath: /config
configMap:
name: kafka-connect-config
- name: deploy-config
mountPath: /config/deploy
configMap:
name: deploy-gcs-connector

extraEnv:
CONNECT_BOOTSTRAP_SERVERS: ""
CONNECT_REST_PORT: "8083"
CONNECT_GROUP_ID: "kafka-connect"
CONNECT_CONFIG_STORAGE_TOPIC: "_kafka-connect-configs"
CONNECT_OFFSET_STORAGE_TOPIC: "_kafka-connect-offsets"
CONNECT_STATUS_STORAGE_TOPIC: "_kafka-connect-status"
CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.storage.StringConverter"
CONNECT_VALUE_CONVERTER: "io.confluent.connect.avro.AvroConverter"
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_LOG4J_ROOT_LOGLEVEL: "WARN"
CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR"
CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN: "[%d] %p %X{connector.context}%m (%c:%L)%n"
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_PLUGIN_PATH: "/data/connect-jars,/usr/share/java,/usr/share/confluent-hub-components/"
CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect.kafka.svc.cluster.local"
CONNECT_REST_LISTENERS: "http://0.0.0.0:8083"

job:
name: deploy-gcs-connector
image:
repository: curlimages/curl
tag: latest
pullPolicy: IfNotPresent
annotations:
"helm.sh/hook": post-install,post-upgrade
volumes:
- name: config
mountPath: /config
- name: deploy-config
mountPath: /config/deploy
command:
- "/bin/sh"
- "-c"
- "sh /config/deploy/deploy-gcs-connector.sh"

config:
kafka:
bootstrapServers: "broker:9092"

#this is going to be a configmap where configs of connector will be stored
connector:
name: "gcs-sink-connector"
gcs:
bucketName: "your-gcs-bucket-name"
partSize: 5242880
flushSize: 3
credentialsPath: /opt/confluent/secrets/credential/gcs-credentials.json
format: json
topics: "your-kafka-topic"
tasksMax: "1"

0 comments on commit 85f589f

Please sign in to comment.