Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[docs] CDC: Add example for message structure when transformers are used. #25469

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,6 @@ Using Debezium requires three separate services: [Zookeeper](http://zookeeper.ap

In this tutorial, you will set up a single instance of each service using Docker and the Debezium container images.

To start the services needed for this tutorial, you must:

- [Start Zookeeper](#start-zookeeper)
- [Start Kafka](#start-kafka)
- [Start YugabyteDB](#start-yugabytedb)
- [Start Kafka Connect](#start-kafka-connect)

#### Start Zookeeper

Zookeeper is the first service you must start.
Expand Down Expand Up @@ -79,43 +72,29 @@ In this tutorial, you will always connect to Kafka from in a Docker container. A

At this point, you have started Zookeeper and Kafka, but you still need a database server from which Debezium can capture changes. In this procedure, you start a YugabyteDB instance with an example database. The example uses sample data in SQL scripts that are included with your YugabyteDB installation in the `share` directory.

Follow the [Quick Start](/preview/tutorials/quick-start) to start an instance using yugabyted.

{{< note title="Note" >}}

You need to start the database on an IP that is resolvable by the docker containers. If you use the localhost address (that is, `127.0.0.1`) then if you deploy the connectors in the docker containers, they won't be able to talk to the database and will keep trying to connect to `127.0.0.1` inside the container. Use the [--advertise_address option for yugabyted](../../../../reference/configuration/yugabyted#flags-8) to specify the IP you want to start your database instance.
1. Start YugabyteDB by following the instructions in [Quick Start](/preview/tutorials/quick-start).

For example, Linux users can use the following:
You need to start the database on an IP that is resolvable by the docker containers. If you use the localhost address (that is, `127.0.0.1`) then if you deploy the connectors in the docker containers, they won't be able to talk to the database and will keep trying to connect to `127.0.0.1` inside the container. Use the [--advertise_address option for yugabyted](../../../../reference/configuration/yugabyted/#flags-8) to specify the IP you want to start your database instance.

```sh
./bin/yugabyted start --advertise_address $(hostname -i)
```

{{< /note >}}

##### Use the YSQL command line client

After starting YugabyteDB, use ysqlsh to create your database:

1. Connect the client to the database process running on the IP you specified when you started up the database instance.
For example, Linux users can use the following:

```sh
./bin/ysqlsh -h <ip-of-your-machine>
./bin/yugabyted start --advertise_address $(hostname -i)
```

You should see output similar to the following:
1. Using ysqlsh, connect to the database process running on the IP you specified when you started up the database instance.

```output
ysqlsh (11.2-YB-{{<yb-version version="preview">}}-b0)
Type "help" for help.

yugabyte=#
```sh
./bin/ysqlsh -h <ip-of-your-machine>
```

1. Load the schema of the sample tables.

```sql
yugabyte=# \i share/schema.sql
```

```output
CREATE TABLE
CREATE TABLE
CREATE TABLE
Expand Down Expand Up @@ -149,8 +128,11 @@ After starting YugabyteDB, use ysqlsh to create your database:
yugabyte=# \i share/products.sql
```

```output
```sql
yugabyte=# select count(*) from products;
```

```output
count
-------
200
Expand Down Expand Up @@ -199,21 +181,16 @@ These commands use `localhost`. If you are using a non-native Docker platform (s

### Deploy the YugabyteDB connector

After starting the Debezium and YugabyteDB service, you are ready to deploy the YugabyteDB connector. To deploy the connector, you must:

- [Register the YugabyteDB connector to monitor the `yugabyte` database](#register-a-connector-to-monitor-yugabyte-database)
- Watch the connector start

#### Register a connector to monitor yugabyte database

By registering the YugabyteDB connector, the connector will start monitoring the YugabyteDB database's table `products`. When a row in the table changes, Debezium generates a change event.
After starting the Debezium and YugabyteDB service, you are ready to deploy the YugabyteDB connector. To deploy the connector, you register the YugabyteDB connector to monitor your database. By registering the YugabyteDB connector, the connector will start monitoring the YugabyteDB database's table (`products` in this example). When a row in the table changes, Debezium generates a change event.

{{< note title="Note" >}}

In a production environment, you would typically either use the Kafka tools to manually create the necessary topics, including specifying the number of replicas, or you would use the Kafka Connect mechanism for customizing the settings of [auto-created](https://debezium.io/documentation/reference/2.5/configuration/topic-auto-create-config.html) topics. However, for this tutorial, Kafka is configured to automatically create the topics with just one replica.

{{< /note >}}

To register the connector, do the following:

1. Review the configuration of the YugabyteDB connector that you will register. Before registering the connector, you should be familiar with its configuration. In the next step, you will register the following connector:

```json
Expand Down Expand Up @@ -282,8 +259,6 @@ Windows users may need to escape the double-quotes.
["ybconnector"]
```

#### Watch the connector start

When you register a connector, it generates a large amount of log output in the Kafka Connect container. By reviewing this output, you can better understand the process that the connector goes through from the time it is created until it begins reading the change events.

After registering the `ybconnector` connector, you can review the log output in the Kafka Connect container (`connect`) to track the connector's status.
Expand All @@ -300,21 +275,21 @@ Kafka Connect reports some "errors". However, you can safely ignore these warnin

### View change events

After deploying the YugabyteDB connector, it starts monitoring the `yugabyte` database for data change events.
After it is deployed, the YugabyteDB connector starts monitoring the `yugabyte` database for data change events.

For this tutorial, you will explore the `dbserver1.public.products` topic.

#### View a change event

Open a new terminal, and use it to start the watch-topic utility to watch the `dbserver1.public.products` topic from the beginning of the topic.

The following command runs the `watch-topic` utility in a new container using the `2.5.2.Final` version of the `debezium/kafka` image:
The following command runs the watch-topic utility in a new container using the 2.5.2.Final version of the `debezium/kafka` image:

```sh
docker run -it --rm --name consumer --link zookeeper:zookeeper --link kafka:kafka debezium/kafka:2.5.2.Final watch-topic -a dbserver1.public.products
```

The `watch-topic` utility returns the event records from the `products` table. There will be 200 events, one for each row in the table which was snapshotted. Each event is formatted in JSON, because that is how you configured the Kafka Connect service. There are two JSON documents for each event: one for the key, and one for the value.
The watch-topic utility returns the event records from the `products` table. There will be 200 events, one for each row in the table which was snapshotted. Each event is formatted in JSON, because that is how you configured the Kafka Connect service. There are two JSON documents for each event: one for the key, and one for the value.

You should see output similar to the following:

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

output is all on one line, which doesn't look great

Expand All @@ -328,11 +303,7 @@ Contents of topic dbserver1.public.products:
...
```

{{< note title="Note" >}}

This utility keeps watching the topic, so any new events will automatically appear as long as the utility is running.

{{< /note >}}
The watch-topic utility keeps watching the topic, so any new events appear automatically as long as the utility is running.

#### Update the database and view the update event

Expand All @@ -359,7 +330,7 @@ By completing this procedure, you will learn how to find details about what chan
(1 row)
```

1. Switch to the terminal running `watch-topic` to see a new event.
1. Switch to the terminal running watch-topic to see a new event.

By changing a record in the `products` table, the YugabyteDB connector generated a new event.

Expand Down Expand Up @@ -388,7 +359,7 @@ By completing this procedure, you will learn how to find details about what chan
}
```

Note that the fields which were not updated are coming out as `null`. This is because the [REPLICA IDENTITY](../key-concepts/#replica-identity) of the table is `CHANGE` by default, where you only send the values of the updated columns in the change event.
Note that the fields which were not updated are coming out as `null`. This is because the [REPLICA IDENTITY](../key-concepts/#replica-identity) of the table is CHANGE by default, where you only send the values of the updated columns in the change event.

#### Delete a row and view the delete event

Expand All @@ -398,7 +369,7 @@ Note that the fields which were not updated are coming out as `null`. This is be
delete from products where id = 22;
```

1. Switch to the terminal running `watch-topic` to see two new events. By deleting a row in the `products` table, the YugabyteDB connector generated 2 new events.
1. Switch to the terminal running watch-topic to see two new events. By deleting a row in the `products` table, the YugabyteDB connector generated 2 new events.

The details for the payload of the first event will look similar to the following (formatted for readability):

Expand Down Expand Up @@ -450,7 +421,7 @@ The second event will have a *key* but the *value* will be `null`; that is a [to

### Clean up

After you are finished with the tutorial, you can use Docker to stop all of the running containers.
After you are finished, you can use Docker to stop all of the running containers.

Run the following command:

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
---
title: YugabyteDB connector transformers
headerTitle: YugabyteDB connector transformers
linkTitle: Connector transformers
description: YugabyteDB connector transformers for Change Data Capture.
menu:
preview:
parent: yugabytedb-connector
identifier: yugabytedb-connector-transformers
weight: 70
type: docs
---

The YugabyteDB Connector comes bundled with Single Message Transformers (SMTs). SMTs are applied to messages as they flow through Kafka Connect so that sinks understand the format in which data is sent. SMTs transform inbound messages after a source connector has produced them, but before they are written to Kafka. SMTs transform outbound messages before they are sent to a sink connector.

The following SMTs are bundled with the connector jar file available on [GitHub releases](https://github.com/yugabyte/debezium/releases):

* YBExtractNewRecordState
* PGCompatible

{{< note title="Important" >}}

These SMTs are only compatible with the [yboutput plugin](../key-concepts#output-plugin).

{{< /note >}}

## Example

For simplicity, only `before` and `after` fields of the `payload` of the message published by the connector are mentioned in the following examples. Any information pertaining to the record schema, if it is the same as the standard Debezium connector for PostgreSQL, is skipped.

Consider a table created using the following statement:

```sql
CREATE TABLE test (id INT PRIMARY KEY, name TEXT, aura INT);
```

The following DML statements will be used to demonstrate payload in case of individual replica identities:

```sql
-- statement 1
INSERT INTO test VALUES (1, 'Vaibhav', 9876);

-- statement 2
UPDATE test SET aura = 9999 WHERE id = 1;

-- statement 3
UPDATE test SET name = 'Vaibhav Kushwaha', aura = 10 WHERE id = 1;

-- statement 4
UPDATE test SET aura = NULL WHERE id = 1;

-- statement 5
DELETE FROM test WHERE id = 1;
```

## PGCompatible

**Transformer class:** `io.debezium.connector.postgresql.transforms.PGCompatible`

By default, the YugabyteDB CDC service publishes events with a schema that only includes columns that have been modified. The source connector then sends the value as `null` for columns that are missing in the payload. Each column payload includes a `set` field that is used to signal if a column has been set to `null` because it wasn't present in the payload from YugabyteDB.

However, some sink connectors may not understand this format. PGCompatible transforms the payload to a format that is compatible with the format of standard change data events. Specifically, it transforms column schema and value to remove the `set` field and collapse the payload such that it only contains the data type schema and value.

PGCompatible differs from YBExtractNewRecordState by recursively modifying all the fields in a payload.

The following examples show what the payload would look like for each [replica identity](../key-concepts/#replica-identity).

### CHANGE

```output
-- statement 1
"before":null,"after":{"id":1,"name":"Vaibhav","aura":9876}

-- statement 2
"before":null,"after":{"id":1,"name":null,"aura":9999}

-- statement 3
"before":null,"after":{"id":1,"name":"Vaibhav Kushwaha","aura":10}

-- statement 4
"before":null,"after":{"id":1,"name":null,"aura":null}

-- statement 5
"before":{"id":1,"name":null,"aura":null},"after":null
```

Note that for statement 2 and 4, the columns that were not updated as a part of the UPDATE statement are `null` in the output field.

### DEFAULT

```output
-- statement 1
"before":null,"after":{"id":1,"name":"Vaibhav","aura":9876}

-- statement 2
"before":null,"after":{"id":1,"name":"Vaibhav","aura":9999}

-- statement 3
"before":null,"after":{"id":1,"name":"Vaibhav Kushwaha","aura":10}

-- statement 4
"before":null,"after":{"id":1,"name":"Vaibhav Kushwaha","aura":null}

-- statement 5
"before":{"id":1,"name":null,"aura":null},"after":null
```

### FULL

```output
-- statement 1
"before":null,"after":{"id":1,"name":"Vaibhav","aura":9876}

-- statement 2
"before":{"id":1,"name":"Vaibhav","aura":9876},"after":{"id":1,"name":"Vaibhav","aura":9999}

-- statement 3
"before":{"id":1,"name":"Vaibhav","aura":9999},"after":{"id":1,"name":"Vaibhav Kushwaha","aura":10}

-- statement 4
"before":{"id":1,"name":"Vaibhav Kushwaha","aura":10},"after":{"id":1,"name":"Vaibhav Kushwaha","aura":null}

-- statement 5
"before":{"id":1,"name":"Vaibhav Kushwaha","aura":null},"after":null
```
Loading