Skip to content

Commit

Permalink
Add sample for Debezium CDC connector (#90)
Browse files Browse the repository at this point in the history
  • Loading branch information
phillipleblanc authored Jul 1, 2024
1 parent cc24b15 commit 158f8e9
Show file tree
Hide file tree
Showing 9 changed files with 334 additions and 0 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ Learn about Spice.ai with in-depth samples.
- [Sales BI Dashboard](sales-bi/README.md)
- [Local Materialization and Acceleration CQRS](acceleration/README.md)
- [Accelerated table data quality with constraint enforcement](constraints/README.md)
- [Streaming changes in real-time with Debezium CDC](cdc-debezium/README.md)

## Deploy Spice.ai

Expand Down
2 changes: 2 additions & 0 deletions cdc-debezium/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
*.db
*.db.wal
13 changes: 13 additions & 0 deletions cdc-debezium/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
.PHONY: all
all:
@docker compose up -d

.PHONY: clean
clean:
@docker compose down
@docker volume prune -f
@docker image prune -f

.PHONY: register-connector
register-connector:
@curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-connector.json
118 changes: 118 additions & 0 deletions cdc-debezium/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
# Streaming changes in real-time with Debezium CDC

Change Data Capture (CDC) is a technique to capture changed rows from a database's transaction log and deliver to consumers with low latency. Leveraging this technique allows Spice to keep locally accelerated datasets up-to-date in real-time with the source data, and is highly efficient by only transferring the changed rows instead of re-fetching the entire dataset on refresh.

In this sample we will have a local Postgres database with a table `customer_addresses` and a Spice runtime that accelerates the data from the `customer_addresses` table. A Debezium connector will capture changes from the `customer_addresses` table and publish them to a Kafka topic called `cdc.public.customer_addresses`. The Spice runtime will consume the changes from the Kafka topic and keep an accelerated dataset updated with the changes, including the initial state.

## Pre-requisites

This sample requires [Docker](https://www.docker.com/) and [Docker Compose](https://docs.docker.com/compose/) to be installed.

Also ensure that you have the `spice` CLI installed. You can find instructions on how to install it [here](https://docs.spiceai.org/getting-started).

You will also need `psql` or another Database client (i.e. DBeaver) to connect to the Postgres database.

`curl` is required to register the Debezium Postgres connector.

## How to run

Clone this samples repo locally and navigate to the `cdc-debezium` directory:

```bash
git clone https://github.com/spiceai/samples.git
cd samples/cdc-debezium
```

Start the Docker Compose stack, which includes a Postgres database, a Kafka broker (via Redpanda), and a Debezium connector:

`docker compose up -d`

Navigate to http://localhost:8080 to see the Redpanda console. Notice that no topics are created by Debezium yet. We need to tell Debezium to connect to the Postgres database and create the topics.

`curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-connector.json`

Now the Debezium connector is registered and will start capturing changes from the `customer_addresses` table in the Postgres database. Open http://localhost:8080/topics and see the topic `cdc.public.customer_addresses` created.

This spicepod.yaml shows the config needed to configure Spice to connect to the Kafka topic and consume the Debezium changes:

```yaml
version: v1beta1
kind: Spicepod
name: cdc-debezium

datasets:
- from: debezium:cdc.public.customer_addresses
name: cdc
params:
debezium_transport: kafka
debezium_message_format: json
kafka_bootstrap_servers: localhost:19092
acceleration:
enabled: true
engine: duckdb
mode: file
refresh_mode: changes
```
Start the Spice runtime, ensuring you are in the `cdc-debezium` directory:

```bash
spice run
```

Observe that it consumes all of the changes. It should look like:

```bash
2024-07-01T12:39:22.207145Z INFO runtime: Dataset cdc registered (debezium:cdc.public.customer_addresses), acceleration (duckdb:file, changes), results cache enabled.
2024-07-01T12:39:22.677117Z INFO runtime::accelerated_table::refresh_task::changes: Upserting data row for cdc with id=3
2024-07-01T12:39:22.692018Z INFO runtime::accelerated_table::refresh_task::changes: Upserting data row for cdc with id=4
...
```

Run `spice sql` in a separate terminal to query the data

```sql
SELECT * FROM cdc;
```

Now let's make some changes to the Postgres database and observe that Spice consumes the changes.

Stop the Spice SQL REPL or open a third terminal and connect to the Postgres database with `psql`:

```bash
PGPASSWORD="postgres" psql -h localhost -U postgres -d postgres -p 15432
```

```sql
INSERT INTO public.customer_addresses (id, first_name, last_name, email)
VALUES
(100, 'John', 'Doe', '[email protected]');
```

Notice that the Spice log shows the change. Querying the data again from the `spice sql` REPL will show the new record.

```sql
SELECT * FROM cdc;
```

Now let's see what happens when we stop Spice and restart it. The data should still be there and it should not replay all of the changes from the beginning.

Stop spice with `Ctrl+C`

Restart Spice with `spice run`

Observe that it doesn't replay the changes and the data is still there. Only new changes will be consumed.

## Clean up

To stop and remove the Docker containers/volumes that were created, run:

`make clean`

If you don't have the `make` command available, you can run the following commands:

```bash
docker compose down
docker volume prune -f
docker image prune -f
```
96 changes: 96 additions & 0 deletions cdc-debezium/compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
services:
postgres:
image: "postgres:alpine"
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
- POSTGRES_DB=postgres
expose:
- "5432"
ports:
- 15432:5432
volumes:
- ./init:/docker-entrypoint-initdb.d
- ./pg_config.conf:/etc/postgresql/postgresql.conf
command: ["postgres", "-c", "config_file=/etc/postgresql/postgresql.conf"]
deploy:
resources:
limits:
cpus: '2'
memory: 2G
reservations:
cpus: '2'
memory: 2G
healthcheck:
test: ["CMD-SHELL", "pg_isready -U postgres -d postgres -h localhost"]
interval: 10s
timeout: 5s
retries: 5
redpanda-kafka-0:
command:
- redpanda
- start
- --kafka-addr internal://0.0.0.0:9092,external://0.0.0.0:19092
# Address the broker advertises to clients that connect to the Kafka API.
# Use the internal addresses to connect to the Redpanda brokers'
# from inside the same Docker network.
# Use the external addresses to connect to the Redpanda brokers'
# from outside the Docker network.
- --advertise-kafka-addr internal://redpanda-kafka-0:9092,external://localhost:19092
- --pandaproxy-addr internal://0.0.0.0:8082,external://0.0.0.0:18082
# Address the broker advertises to clients that connect to the HTTP Proxy.
- --advertise-pandaproxy-addr internal://redpanda-kafka-0:8082,external://localhost:18082
- --schema-registry-addr internal://0.0.0.0:8081,external://0.0.0.0:18081
# Redpanda brokers use the RPC API to communicate with each other internally.
- --rpc-addr redpanda-kafka-0:33145
- --advertise-rpc-addr redpanda-kafka-0:33145
# Mode dev-container uses well-known configuration properties for development in containers.
- --mode dev-container
# Tells Seastar (the framework Redpanda uses under the hood) to use 1 core on the system.
- --smp 1
- --default-log-level=info
image: docker.redpanda.com/redpandadata/redpanda:v24.1.8
container_name: redpanda-kafka-0
ports:
- 18081:18081
- 18082:18082
- 19092:19092
- 9644:9644
healthcheck:
test: ["CMD-SHELL", "rpk cluster health | grep -E 'Healthy:.+true' || exit 1"]
interval: 15s
timeout: 3s
retries: 5
start_period: 5s
redpanda-kafka-console:
container_name: redpanda-kafka-console
image: docker.redpanda.com/redpandadata/console:v2.6.0
entrypoint: /bin/sh
command: -c 'echo "$$CONSOLE_CONFIG_FILE" > /tmp/config.yml; /app/console'
environment:
CONFIG_FILEPATH: /tmp/config.yml
CONSOLE_CONFIG_FILE: |
kafka:
brokers: ["redpanda-kafka-0:9092"]
schemaRegistry:
enabled: true
urls: ["http://redpanda-kafka-0:8081"]
redpanda:
adminApi:
enabled: true
urls: ["http://redpanda-kafka-0:9644"]
ports:
- 8080:8080
depends_on:
- redpanda-kafka-0
debezium:
image: debezium/connect:2.7
container_name: debezium
environment:
BOOTSTRAP_SERVERS: redpanda-kafka-0:9092
GROUP_ID: 1
CONFIG_STORAGE_TOPIC: connect_configs
OFFSET_STORAGE_TOPIC: connect_offsets
depends_on: [postgres, redpanda-kafka-0]
ports:
- 8083:8083
Loading

0 comments on commit 158f8e9

Please sign in to comment.