Skip to content

Latest commit

 

History

History

replicator

Replicator

Confluent Replicator allows you to easily and reliably replicate topics from one Kafka cluster to another. In addition to copying the messages, Replicator will create topics as needed preserving the topic configuration in the source cluster. This includes preserving the number of partitions, the replication factor, and any configuration overrides specified for individual topics. Replicator is implemented as a connector.

In this scenario example, you'll deploy two Confluent clusters. One is the source cluster, and one is the destination cluster. You'll deploy Confluent Replicator on the destination cluster, where it will copy topic messages from the source cluster and write to the destination cluster.

Note: You can deploy Replicator near the destination cluster or the source cluster, and it will work either way. However, a best practice is to deploy Replicator closer to the destination cluster for reliability and performance over networks.

This scenario example document describes how to configure and deploy Confluent Replicator in one configuration scenario through Confluent for Kubernetes. To read more about the use cases, architectures and various configuration scenarios, please see the Confluent Replicator documentation.

Set up Pre-requisites

Set the tutorial directory for this tutorial under the directory you downloaded the tutorial files:

export TUTORIAL_HOME=<Tutorial directory>/hybrid/replicator

Create two namespaces, one for the source cluster and one for the destination cluster.

kubectl create ns source
kubectl create ns destination

Deploy Confluent for Kubernetes (CFK) in cluster mode, so that the one CFK instance can manage Confluent deployments in multiple namespaces. Here, CFk is deployed to the default namespace.

helm upgrade --install confluent-operator \
  confluentinc/confluent-for-kubernetes \
  --namespace default --set namespaced=false

Confluent For Kubernetes provides auto-generated certificates for Confluent Platform components to use for inter-component TLS. You'll need to generate and provide a Root Certificate Authority (CA).

Generate a CA pair to use in this tutorial:

openssl genrsa -out $TUTORIAL_HOME/ca-key.pem 2048
openssl req -new -key $TUTORIAL_HOME/ca-key.pem -x509 \
  -days 1000 \
  -out $TUTORIAL_HOME/ca.pem \
  -subj "/C=US/ST=CA/L=MountainView/O=Confluent/OU=Operator/CN=TestCA"

Then, provide the certificate authority as a Kubernetes secret ca-pair-sslcerts to be used to generate the auto-generated certs, in both the source and destination namespaces:

kubectl create secret tls ca-pair-sslcerts \
  --cert=$TUTORIAL_HOME/ca.pem \
  --key=$TUTORIAL_HOME/ca-key.pem -n source

kubectl create secret tls ca-pair-sslcerts \
  --cert=$TUTORIAL_HOME/ca.pem \
  --key=$TUTORIAL_HOME/ca-key.pem -n destination

Deploy source and destination clusters, including Replicator

Deploy the source and destination cluster.

# Specify the credentials required by the source and destination cluster. To understand how these
# credentials are configured, see 
# https://github.com/confluentinc/confluent-kubernetes-examples/tree/master/security/secure-authn-encrypt-deploy

kubectl create secret generic credential -n source \
--from-file=plain-users.json=$TUTORIAL_HOME/creds-kafka-sasl-users.json \
--from-file=plain.txt=$TUTORIAL_HOME/creds-client-kafka-sasl-user.txt \
--from-file=basic.txt=$TUTORIAL_HOME/creds-client-kafka-sasl-user.txt

kubectl create secret generic credential -n destination \
--from-file=plain-users.json=$TUTORIAL_HOME/creds-kafka-sasl-users.json \
--from-file=plain.txt=$TUTORIAL_HOME/creds-client-kafka-sasl-user.txt

# Deploy Zookeeper and Kafka to `source` namespace, to represent the source cluster
kubectl apply -f $TUTORIAL_HOME/components-source.yaml

# Deploy Zookeeper, Kafka, Replicator, Control Center to `destination` namespace, 
# to represent the destination cluster
kubectl apply -f $TUTORIAL_HOME/components-destination.yaml
kubectl apply -f $TUTORIAL_HOME/controlcenter.yaml

In $TUTORIAL_HOME/components-destination.yaml, note that the Connect CRD is used to define a custom resource for Confluent Replicator.

From $TUTORIAL_HOME/components-destination.yaml
---
apiVersion: platform.confluent.io/v1beta1
# Confluent Replicator is built as a connector, and so will use the `Connect` CRD.
kind: Connect
metadata:
  name: replicator
  namespace: destination
spec:
  replicas: 2
  # Configure to have TLS encryption, and use auto-generated server certs
  tls:
    autoGeneratedCerts: true
  image:
    # Use the `cp-enterprise-replicator-operator` Docker image, that contains the Replicator jars
    application: confluentinc/cp-enterprise-replicator:7.8.0
    init: confluentinc/confluent-init-container:2.10.0
  podTemplate:
    resources:
      requests:
        cpu: 2
        memory: 4Gi
    envVars:
      # The  Confluent Replicator Monitoring Extension allows for detailed metrics from Replicator tasks to be 
      # collected using an exposed REST API.
      # You'll need to update the version string in replicate-test-extension-<version>.jar based on what 
      # CP version you are using
      - name: CLASSPATH
        value: /usr/share/java/kafka-connect-replicator/replicator-rest-extension-7.4.0.jar
  configOverrides:
    # When the Connect distributed cluster hosting Replicator has a REST endpoint with SSL encryption 
    # enabled, you must configure security properties for the SSL keystore and truststore used by the 
    # Replicator monitoring extension to communicate with other Connect nodes in the cluster.
    # `/mnt/sslcerts/truststore.jks` is the truststore location when auto-genarated certs are used.
    jvm:
      - -Djavax.net.ssl.trustStore=/mnt/sslcerts/truststore.jks
      - -Djavax.net.ssl.trustStorePassword=mystorepassword
    server:
      # To activate the monitoring extension, configure this property
      - rest.extension.classes=io.confluent.connect.replicator.monitoring.ReplicatorMonitoringExtension
      # This specifies that Replicator is the Connector configured
      - connector.class=io.confluent.connect.replicator.ReplicatorSourceConnector
  dependencies:
    kafka:
      bootstrapEndpoint: kafka.destination.svc.cluster.local:9071
      authentication:
        type: plain
        jaasConfig:
          secretRef: credential
      tls:
        enabled: true
    interceptor:
      enabled: true
---

Create topic in source cluster

# This uses the `kafkaTopic` CRD to define a topic
kubectl apply -f $TUTORIAL_HOME/source-topic.yaml

Configure Replicator in destination cluster

There are two ways to deploy replicator connector:

  1. Declaratively creating replicator connector
  2. Using REST API endpoint to deploy the replicator connector

1) Declaratively creating replicator connector

Starting in Confluent for Kubernetes (CFK) 2.1.0, you can declaratively manage connectors in Kubernetes using the Connector custom resource definition (CRD).

Create replicator connector

kubectl apply -f $TUTORIAL_HOME/connector.yaml

NOTE: secret replicator-generated-jks used in the connector.yaml gets created automatically due to spec.tls.autoGeneratedCerts: true config in the $TUTORIAL_HOME/components-destination.yaml file.

Check connector

kubectl get connector -n destination

2) Using REST API endpoint to deploy the replicator connector

Confluent Replicator requires the configuration to be provided as a file in the running Docker container. You'll then interact with it through the REST API, to set the configuration.

# SSH into the `replicator-0` pod
kubectl -n destination exec -it replicator-0 -- bash

# Define the configuration as a file in the pod
cat <<EOF > replicator.json
 {
 "name": "replicator",
 "config": {
     "connector.class":
     "io.confluent.connect.replicator.ReplicatorSourceConnector",
     "tasks.max": "4",
     "topic.whitelist": "topic-in-source",
     "key.converter": "io.confluent.connect.replicator.util.ByteArrayConverter",
     "value.converter": "io.confluent.connect.replicator.util.ByteArrayConverter",
     "src.kafka.sasl.mechanism": "PLAIN",
     "src.kafka.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"kafka\" password=\"kafka-secret\";",
     "src.kafka.ssl.truststore.location": "/mnt/sslcerts/truststore.jks",
     "src.kafka.truststore.password": "mystorepassword",
     "src.kafka.bootstrap.servers": "kafka.source.svc.cluster.local:9092",
     "src.kafka.security.protocol": "SASL_SSL",
     "dest.kafka.bootstrap.servers": "kafka.destination.svc.cluster.local:9092",
     "dest.kafka.security.protocol": "SASL_SSL",
     "dest.kafka.ssl.truststore.location": "/mnt/sslcerts/truststore.jks",
     "dest.kafka.truststore.password": "mystorepassword",
     "dest.kafka.sasl.mechanism": "PLAIN",
     "dest.kafka.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"kafka\" password=\"kafka-secret\";",
     "src.consumer.group.id": "replicator",
     "src.consumer.confluent.monitoring.interceptor.sasl.mechanism": "PLAIN",
     "src.consumer.interceptor.classes": "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor",
     "src.consumer.confluent.monitoring.interceptor.bootstrap.servers": "kafka.source.svc.cluster.local:9092",
     "src.consumer.confluent.monitoring.interceptor.security.protocol": "SASL_SSL",
     "src.consumer.confluent.monitoring.interceptor.ssl.truststore.location": "/mnt/sslcerts/truststore.jks",
     "src.consumer.confluent.monitoring.interceptor.ssl.truststore.password": "mystorepassword",
     "src.consumer.confluent.monitoring.interceptor.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"kafka\" password=\"kafka-secret\";",
     "confluent.license": "",
     "confluent.topic.replication.factor": "3"
   }
 }
EOF

# Instantiate the Replicator Connector instance through the REST interface
curl -XPOST -H "Content-Type: application/json" --data @replicator.json https://localhost:8083/connectors -kv

# Check the status of the Replicator Connector instance
curl -XGET -H "Content-Type: application/json" https://localhost:8083/connectors -kv

Validate that it works

Produce data to topic in source cluster

Create the kafka.properties file in $TUTORIAL_HOME. Add the above endpoint and the credentials as follows:

bootstrap.servers=kafka.source.svc.cluster.local:9071
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username=kafka password=kafka-secret;
sasl.mechanism=PLAIN
security.protocol=SASL_SSL
ssl.truststore.location=/mnt/sslcerts/truststore.jks
ssl.truststore.password=mystorepassword

# Create a configuration secret for client applications to use
kubectl create secret generic kafka-client-config-secure \
  --from-file=$TUTORIAL_HOME/kafka.properties -n source

Deploy a producer application that produces messages to the topic topic-in-source:

kubectl apply -f $TUTORIAL_HOME/secure-producer-app-data.yaml

View in Control Center

Open Confluent Control Center.

kubectl confluent dashboard controlcenter -n destination

Tear Down:

kubectl delete -f $TUTORIAL_HOME/secure-producer-app-data.yaml
kubectl delete -f $TUTORIAL_HOME/connector.yaml (if declaratively creating replicator connector) 
kubectl delete -f $TUTORIAL_HOME/source-topic.yaml
kubectl delete -f $TUTORIAL_HOME/controlcenter.yaml
kubectl delete -f $TUTORIAL_HOME/components-destination.yaml
kubectl delete -f $TUTORIAL_HOME/components-source.yaml
kubectl delete secret kafka-client-config-secure credential ca-pair-sslcerts -n source
kubectl delete secret credential ca-pair-sslcerts -n destination
helm -n default delete confluent-operator
kubectl delete ns source destination