Skip to content

Commit 8b004e9

Browse files
tzolovartembilan
authored andcommitted
GH-3779: Add Debezium Channel Adapter
Fixes #3779 initial debezium doc address some reviews resolve some classpath conflicts hacking failed test fixing tests and dependecies address review comments improve test coverage fix test checkstyle remove kafak references. hit support for batch improve java doc Initial batch support Convert the list of Change events into list of Messages. Use the same rules for buidling messages as the non-batch mode. Refine batch implementation and tests harden the testcontainers start/stop lifecycle simplify batch mode adjust test log config clean gradle config Add `HeaderMapper` filter configuration. Fix JavaDocs Use `CustomizableThreadFactory` for Exec Service. IT header tests more debeizum documentation Remove external Executor support in favor of configurable ThreadFactory minor `Threadfactory` naming fix fix support package structure * Clean up code style and language typos
1 parent 36930f5 commit 8b004e9

18 files changed

+1138
-0
lines changed

build.gradle

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ ext {
6060
commonsIoVersion = '2.11.0'
6161
commonsNetVersion = '3.9.0'
6262
curatorVersion = '5.5.0'
63+
debeziumVersion = '2.2.0.Final'
6364
derbyVersion = '10.16.1.1'
6465
findbugsVersion = '3.0.1'
6566
ftpServerVersion = '1.2.0'
@@ -576,6 +577,21 @@ project('spring-integration-core') {
576577
}
577578
}
578579

580+
project('spring-integration-debezium') {
581+
description = 'Spring Integration Debezium Support'
582+
dependencies {
583+
api project(':spring-integration-core')
584+
api("io.debezium:debezium-embedded:$debeziumVersion") {
585+
exclude group: 'org.glassfish.jersey.containers', module: 'jersey-container-servlet'
586+
exclude group: 'org.glassfish.jersey.inject', module: 'jersey-hk2'
587+
exclude group: 'jakarta.xml.bind', module: 'jakarta.xml.bind-api'
588+
exclude group: 'jakarta.activation', module: 'jakarta.activation-api'
589+
}
590+
591+
testImplementation "io.debezium:debezium-connector-mysql:$debeziumVersion"
592+
}
593+
}
594+
579595
project('spring-integration-event') {
580596
description = 'Spring Integration ApplicationEvent Support'
581597
dependencies {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,292 @@
1+
/*
2+
* Copyright 2023-2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.debezium.inbound;
18+
19+
import java.io.IOException;
20+
import java.util.List;
21+
import java.util.Optional;
22+
import java.util.concurrent.CountDownLatch;
23+
import java.util.concurrent.Executor;
24+
import java.util.concurrent.ExecutorService;
25+
import java.util.concurrent.Executors;
26+
import java.util.concurrent.ThreadFactory;
27+
import java.util.concurrent.TimeUnit;
28+
import java.util.function.Consumer;
29+
30+
import io.debezium.engine.ChangeEvent;
31+
import io.debezium.engine.DebeziumEngine;
32+
import io.debezium.engine.DebeziumEngine.Builder;
33+
import io.debezium.engine.DebeziumEngine.ChangeConsumer;
34+
import io.debezium.engine.DebeziumEngine.RecordCommitter;
35+
import io.debezium.engine.Header;
36+
import io.debezium.engine.format.SerializationFormat;
37+
38+
import org.springframework.integration.debezium.support.DebeziumHeaders;
39+
import org.springframework.integration.debezium.support.DefaultDebeziumHeaderMapper;
40+
import org.springframework.integration.endpoint.MessageProducerSupport;
41+
import org.springframework.lang.Nullable;
42+
import org.springframework.messaging.Message;
43+
import org.springframework.messaging.MessageHeaders;
44+
import org.springframework.messaging.support.HeaderMapper;
45+
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
46+
import org.springframework.util.Assert;
47+
48+
/**
49+
* Debezium Change Event Channel Adapter.
50+
*
51+
* @author Christian Tzolov
52+
* @author Artem Bilan
53+
*
54+
* @since 6.2
55+
*/
56+
public class DebeziumMessageProducer extends MessageProducerSupport {
57+
58+
private final DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>> debeziumEngineBuilder;
59+
60+
private DebeziumEngine<ChangeEvent<byte[], byte[]>> debeziumEngine;
61+
62+
/**
63+
* Debezium Engine is designed to be submitted to an {@link Executor}
64+
* or {@link ExecutorService} for execution by a single thread.
65+
* By default, a single-threaded ExecutorService instance is provided configured with a
66+
* {@link CustomizableThreadFactory} and a `debezium-` thread prefix.
67+
*/
68+
private ExecutorService executorService;
69+
70+
private String contentType = "application/json";
71+
72+
private HeaderMapper<List<Header<Object>>> headerMapper = new DefaultDebeziumHeaderMapper();
73+
74+
private boolean enableEmptyPayload = false;
75+
76+
private boolean enableBatch = false;
77+
78+
private ThreadFactory threadFactory;
79+
80+
private volatile CountDownLatch latch = new CountDownLatch(0);
81+
82+
/**
83+
* Create new Debezium message producer inbound channel adapter.
84+
* @param debeziumBuilder - pre-configured Debezium Engine Builder instance.
85+
*/
86+
public DebeziumMessageProducer(Builder<ChangeEvent<byte[], byte[]>> debeziumBuilder) {
87+
Assert.notNull(debeziumBuilder, "'debeziumBuilder' must not be null");
88+
this.debeziumEngineBuilder = debeziumBuilder;
89+
}
90+
91+
/**
92+
* Enable the {@link ChangeEvent} batch mode handling.
93+
* When enabled the channel adapter will send a {@link List}
94+
* of {@link ChangeEvent}s as a payload in a single downstream {@link Message}.
95+
* Such a batch payload is not serializable.
96+
* By default, the batch mode is disabled, e.g. every input {@link ChangeEvent} is converted into a
97+
* single downstream {@link Message}.
98+
* @param enable set to true to enable the batch mode. Disabled by default.
99+
*/
100+
public void setEnableBatch(boolean enable) {
101+
this.enableBatch = enable;
102+
}
103+
104+
/**
105+
* Enable support for tombstone (aka delete) messages.
106+
* On a database row delete, Debezium can send a tombstone change event
107+
* that has the same key as the deleted row and a value of {@link Optional#empty()}.
108+
* This record is a marker for downstream processors.
109+
* It indicates that log compaction can remove all records that have this key.
110+
* When the tombstone functionality is enabled in the Debezium connector
111+
* configuration you should enable the empty payload as well.
112+
* @param enabled set true to enable the empty payload. Disabled by default.
113+
*/
114+
public void setEnableEmptyPayload(boolean enabled) {
115+
this.enableEmptyPayload = enabled;
116+
}
117+
118+
/**
119+
* Set a {@link ThreadFactory} for the Debezium executor.
120+
* Defaults to the {@link CustomizableThreadFactory} with a
121+
* {@code debezium:inbound-channel-adapter-thread-} prefix.
122+
* @param threadFactory the {@link ThreadFactory} instance to use.
123+
*/
124+
public void setThreadFactory(ThreadFactory threadFactory) {
125+
Assert.notNull(threadFactory, "'threadFactory' must not be null");
126+
this.threadFactory = threadFactory;
127+
}
128+
129+
/**
130+
* Set the outbound message content type.
131+
* Must be aligned with the {@link SerializationFormat} configuration used by
132+
* the provided {@link DebeziumEngine}.
133+
*/
134+
public void setContentType(String contentType) {
135+
Assert.hasText(contentType, "'contentType' must not be empty");
136+
this.contentType = contentType;
137+
}
138+
139+
/**
140+
* Set a {@link HeaderMapper} to convert the {@link ChangeEvent}
141+
* headers into {@link Message} headers.
142+
* @param headerMapper {@link HeaderMapper} implementation to use.
143+
* Defaults to {@link DefaultDebeziumHeaderMapper}.
144+
*/
145+
public void setHeaderMapper(HeaderMapper<List<Header<Object>>> headerMapper) {
146+
Assert.notNull(headerMapper, "'headerMapper' must not be null.");
147+
this.headerMapper = headerMapper;
148+
}
149+
150+
@Override
151+
public String getComponentType() {
152+
return "debezium:inbound-channel-adapter";
153+
}
154+
155+
@Override
156+
protected void onInit() {
157+
super.onInit();
158+
159+
if (this.threadFactory == null) {
160+
this.threadFactory = new CustomizableThreadFactory(getComponentName() + "-thread-");
161+
}
162+
163+
this.executorService = Executors.newSingleThreadExecutor(this.threadFactory);
164+
165+
if (!this.enableBatch) {
166+
this.debeziumEngineBuilder.notifying(new StreamChangeEventConsumer<>());
167+
}
168+
else {
169+
this.debeziumEngineBuilder.notifying(new BatchChangeEventConsumer<>());
170+
}
171+
172+
this.debeziumEngine = this.debeziumEngineBuilder.build();
173+
}
174+
175+
@Override
176+
protected void doStart() {
177+
if (this.latch.getCount() > 0) {
178+
return;
179+
}
180+
this.latch = new CountDownLatch(1);
181+
this.executorService.execute(() -> {
182+
try {
183+
// Runs the debezium connector and deliver database changes to the registered consumer. This method
184+
// blocks until the connector is stopped.
185+
// If this instance is already running, then the run immediately returns.
186+
// When run the connector and starts polling the configured connector for change events.
187+
// All messages are delivered in batches to the consumer registered with this debezium engine.
188+
// The batch size, polling frequency, and other parameters are controlled via connector's configuration
189+
// settings. This continues until this connector is stopped.
190+
// This method can be called repeatedly as needed.
191+
this.debeziumEngine.run();
192+
}
193+
finally {
194+
this.latch.countDown();
195+
}
196+
});
197+
}
198+
199+
@Override
200+
protected void doStop() {
201+
try {
202+
this.debeziumEngine.close();
203+
}
204+
catch (IOException e) {
205+
logger.warn(e, "Debezium failed to close!");
206+
}
207+
try {
208+
if (!this.latch.await(5, TimeUnit.SECONDS)) {
209+
throw new IllegalStateException("Failed to stop " + this);
210+
}
211+
}
212+
catch (InterruptedException ignored) {
213+
}
214+
}
215+
216+
@Override
217+
public void destroy() {
218+
super.destroy();
219+
220+
this.executorService.shutdown();
221+
try {
222+
this.executorService.awaitTermination(5, TimeUnit.SECONDS);
223+
}
224+
catch (InterruptedException e) {
225+
throw new IllegalStateException("Debezium failed to close!", e);
226+
}
227+
}
228+
229+
@Nullable
230+
private <T> Message<?> toMessage(ChangeEvent<T, T> changeEvent) {
231+
Object key = changeEvent.key();
232+
Object payload = changeEvent.value();
233+
String destination = changeEvent.destination();
234+
235+
// When the tombstone event is enabled, Debezium serializes the payload to null (e.g. empty payload)
236+
// while the metadata information is carried through the headers (debezium_key).
237+
// Note: Event for none flattened responses, when the debezium.properties.tombstones.on.delete=true
238+
// (default), tombstones are generate by Debezium and handled by the code below.
239+
if (payload == null && DebeziumMessageProducer.this.enableEmptyPayload) {
240+
payload = Optional.empty();
241+
}
242+
243+
// If payload is still null ignore the message.
244+
if (payload == null) {
245+
logger.info(() -> "Dropped null payload message for Change Event key: " + key);
246+
return null;
247+
}
248+
249+
return getMessageBuilderFactory()
250+
.withPayload(payload)
251+
.setHeader(DebeziumHeaders.KEY, key)
252+
.setHeader(DebeziumHeaders.DESTINATION, destination)
253+
.setHeader(MessageHeaders.CONTENT_TYPE, this.contentType)
254+
// Use the provided header mapper to convert Debezium headers into message headers.
255+
.copyHeaders(this.headerMapper.toHeaders(changeEvent.headers()))
256+
.build();
257+
}
258+
259+
final class StreamChangeEventConsumer<T> implements Consumer<ChangeEvent<T, T>> {
260+
261+
@Override
262+
public void accept(ChangeEvent<T, T> changeEvent) {
263+
Message<?> message = toMessage(changeEvent);
264+
if (message != null) {
265+
sendMessage(message);
266+
}
267+
}
268+
269+
}
270+
271+
final class BatchChangeEventConsumer<T> implements ChangeConsumer<ChangeEvent<T, T>> {
272+
273+
@Override
274+
public void handleBatch(List<ChangeEvent<T, T>> changeEvents, RecordCommitter<ChangeEvent<T, T>> committer)
275+
throws InterruptedException {
276+
277+
Message<List<ChangeEvent<T, T>>> message =
278+
getMessageBuilderFactory()
279+
.withPayload(changeEvents)
280+
.build();
281+
282+
sendMessage(message);
283+
284+
for (ChangeEvent<T, T> event : changeEvents) {
285+
committer.markProcessed(event);
286+
}
287+
committer.markBatchFinished();
288+
}
289+
290+
}
291+
292+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
/**
2+
* Provides classes for the Debezium inbound channel adapters.
3+
*/
4+
@org.springframework.lang.NonNullApi
5+
package org.springframework.integration.debezium.inbound;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Copyright 2023-2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.debezium.support;
18+
19+
/**
20+
* Pre-defined header names to be used when retrieving Debezium Change Event headers.
21+
*
22+
* @author Christian Tzolov
23+
* @since 6.2
24+
*/
25+
public abstract class DebeziumHeaders {
26+
27+
public static final String PREFIX = "debezium_";
28+
29+
/**
30+
* Debezium's header key.
31+
*/
32+
public static final String KEY = PREFIX + "key";
33+
34+
/**
35+
* Debezium's event destination.
36+
*/
37+
public static final String DESTINATION = PREFIX + "destination";
38+
39+
}

0 commit comments

Comments
 (0)