Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Debezium streaming integration #170

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,26 @@
<artifactId>commons-dbcp2</artifactId>
</dependency>

<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-embedded</artifactId>
</dependency>

<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-mysql</artifactId>
</dependency>

<dependency>
<groupId>org.rocksdb</groupId>
<artifactId>rocksdbjni</artifactId>
</dependency>

<!-- <dependency>-->
<!-- <groupId>org.apache.commons</groupId>-->
<!-- <artifactId>commons-lang3</artifactId>-->
<!-- </dependency>-->

</dependencies>

<build>
Expand Down Expand Up @@ -125,4 +145,4 @@

</build>

</project>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,13 @@

public interface FlattenDatabaseDao {

/**
* Deploy MambaETL stored procedures
*/
void deployMambaEtl();

/**
* Stream in database changes using Debezium
*/
void streamInDatabaseChanges();
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,15 @@
package org.openmrs.module.mambacore.api.dao.impl;


import io.debezium.config.Configuration;
import org.openmrs.module.mambacore.api.dao.FlattenDatabaseDao;
import org.openmrs.module.mambacore.db.ConnectionPoolManager;
import org.openmrs.module.mambacore.debezium.DbChangeConsumer;
import org.openmrs.module.mambacore.debezium.DbChangeServiceImpl;
import org.openmrs.module.mambacore.debezium.DbChangeToEventMapper;
import org.openmrs.module.mambacore.debezium.DbEventConsumerImpl;
import org.openmrs.module.mambacore.debezium.DebeziumConfigBuilder;
import org.openmrs.module.mambacore.debezium.EventConsumer;
import org.openmrs.module.mambacore.util.MambaETLProperties;
import org.openmrs.module.mambacore.util.StringReplacerUtil;
import org.slf4j.Logger;
Expand Down Expand Up @@ -29,13 +37,36 @@ public class JdbcFlattenDatabaseDao implements FlattenDatabaseDao {
private static final String MYSQL_COMMENT_REGEX = "--.*(?=\\n)";
private static final String DELIMITER = "~-~-";

public JdbcFlattenDatabaseDao() {
}

/**
* Deploy MambaETL stored procedures
*/
@Override
public void deployMambaEtl() {

log.info("Deploying MambaETL...");
MambaETLProperties props = MambaETLProperties.getInstance();
log.info("Deploying MambaETL, scheduled @interval: " + props.getInterval() + " seconds...");
executeSqlScript(props);
log.info("Done deploying MambaETL...");
log.info("MambaETL deployed (with interval: " + props.getInterval() + "s )...");
}

/**
* Stream in database changes using Debezium
*/
@Override
public void streamInDatabaseChanges() {

EventConsumer eventConsumer = new DbEventConsumerImpl();
DbChangeToEventMapper eventMapper = new DbChangeToEventMapper();
Configuration debeziumConfig = DebeziumConfigBuilder.getInstance().build();

DbChangeConsumer dbChangeConsumer = new DbChangeConsumer(eventConsumer, eventMapper);
DbChangeServiceImpl dbChangeService = new DbChangeServiceImpl(debeziumConfig);

dbChangeService.addDbChangeListener(dbChangeConsumer);
dbChangeService.start();
}

private void executeSqlScript(MambaETLProperties props) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
import org.slf4j.LoggerFactory;
import org.springframework.transaction.annotation.Transactional;

import javax.annotation.PreDestroy;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

@Transactional
Expand All @@ -35,28 +35,31 @@ public void setDao(FlattenDatabaseDao dao) {

@Override
public void setupEtl() {
executorService.submit(() -> {
try {

try {
Future<?> deployFuture = executorService.submit(() -> {
dao.deployMambaEtl();
} catch (Exception e) {
log.error("Error deploying Mamba ETL", e);
}
});
});
deployFuture.get();
dao.streamInDatabaseChanges();
} catch (Exception e) {
log.error("Error deploying Mamba ETL", e);
}
}


@Override
@PreDestroy
//@PreDestroy
public void shutdownEtlThread() {
executorService.shutdown();
try {
if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
if (!executorService.awaitTermination(30, TimeUnit.SECONDS)) {
executorService.shutdownNow();
if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
if (!executorService.awaitTermination(30, TimeUnit.SECONDS)) {
log.error("ExecutorService did not terminate");
}
}
} catch (InterruptedException e) {
log.error("shutdownEtlThread InterruptedException: {}", e.getLocalizedMessage());
executorService.shutdownNow();
Thread.currentThread().interrupt();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/**
* This Source Code Form is subject to the terms of the Mozilla Public License,
* v. 2.0. If a copy of the MPL was not distributed with this file, You can
* obtain one at http://mozilla.org/MPL/2.0/. OpenMRS is also distributed under
* the terms of the Healthcare Disclaimer located at http://openmrs.org/license.
* <p>
* Copyright (C) OpenMRS Inc. OpenMRS is a registered trademark and the OpenMRS
* graphic logo is a trademark of OpenMRS Inc.
*/
package org.openmrs.module.mambacore.debezium;

import io.debezium.engine.ChangeEvent;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DbChangeConsumer implements DbChangeListener {

private static final Logger logger = LoggerFactory.getLogger(DbChangeConsumer.class);

private final EventConsumer eventConsumer;
private final DbChangeToEventMapper eventMapper;

public DbChangeConsumer(EventConsumer eventConsumer, DbChangeToEventMapper eventMapper) {
this.eventConsumer = eventConsumer;
this.eventMapper = eventMapper;
}

@Override
public void onDbChange(ChangeEvent<SourceRecord, SourceRecord> changeEvent) {
try {
DbEvent dbEvent = eventMapper.apply(changeEvent);
logger.debug("Notifying listener of the database event: {}", dbEvent);
eventConsumer.accept(dbEvent);
} catch (Throwable e) {
logger.error("Error processing change event: {}", changeEvent, e);
throw e;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/**
* This Source Code Form is subject to the terms of the Mozilla Public License,
* v. 2.0. If a copy of the MPL was not distributed with this file, You can
* obtain one at http://mozilla.org/MPL/2.0/. OpenMRS is also distributed under
* the terms of the Healthcare Disclaimer located at http://openmrs.org/license.
* <p>
* Copyright (C) OpenMRS Inc. OpenMRS is a registered trademark and the OpenMRS
* graphic logo is a trademark of OpenMRS Inc.
*/
package org.openmrs.module.mambacore.debezium;

import io.debezium.engine.ChangeEvent;
import org.apache.kafka.connect.source.SourceRecord;

public interface DbChangeListener {
void onDbChange(ChangeEvent<SourceRecord, SourceRecord> changeEvent);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/**
* This Source Code Form is subject to the terms of the Mozilla Public License,
* v. 2.0. If a copy of the MPL was not distributed with this file, You can
* obtain one at http://mozilla.org/MPL/2.0/. OpenMRS is also distributed under
* the terms of the Healthcare Disclaimer located at http://openmrs.org/license.
* <p>
* Copyright (C) OpenMRS Inc. OpenMRS is a registered trademark and the OpenMRS
* graphic logo is a trademark of OpenMRS Inc.
*/
package org.openmrs.module.mambacore.debezium;

public interface DbChangeService {

void start();

void stop();

void reset();

void disable();

void enable();

boolean isDisabled();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
/**
* This Source Code Form is subject to the terms of the Mozilla Public License,
* v. 2.0. If a copy of the MPL was not distributed with this file, You can
* obtain one at http://mozilla.org/MPL/2.0/. OpenMRS is also distributed under
* the terms of the Healthcare Disclaimer located at http://openmrs.org/license.
* <p>
* Copyright (C) OpenMRS Inc. OpenMRS is a registered trademark and the OpenMRS
* graphic logo is a trademark of OpenMRS Inc.
*/
package org.openmrs.module.mambacore.debezium;

import io.debezium.config.Configuration;
import io.debezium.embedded.Connect;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.spi.OffsetCommitPolicy;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class DbChangeServiceImpl implements DbChangeService {

private static final Logger logger = LoggerFactory.getLogger(DbChangeServiceImpl.class);

private final List<DbChangeListener> listeners = new ArrayList<DbChangeListener>();
private final Configuration debeziumConfig;
private DebeziumEngine<ChangeEvent<SourceRecord, SourceRecord>> engine = null;
private ExecutorService executor;
private boolean disabled = false;

public DbChangeServiceImpl(Configuration debeziumConfig) {
this.debeziumConfig = debeziumConfig;
}

public void addDbChangeListener(DbChangeListener listener) {
listeners.add(listener);
}

@Override
public void start() {

if (engine != null) {
logger.warn("Debezium engine is already running.");
return;
}
if (disabled) {
logger.warn("Debezium engine is disabled and cannot be started. Start it manually");
return;
}

engine = DebeziumEngine.create(Connect.class)
.using(debeziumConfig.asProperties())
.notifying(new DebeziumEngine.ChangeConsumer<ChangeEvent<SourceRecord, SourceRecord>>() {
@Override
public void handleBatch(List<ChangeEvent<SourceRecord, SourceRecord>> changeEvents,
DebeziumEngine.RecordCommitter<ChangeEvent<SourceRecord, SourceRecord>> committer)
throws InterruptedException {
for (ChangeEvent<SourceRecord, SourceRecord> event : changeEvents) {
notifyListeners(event);
committer.markProcessed(event);
}
committer.markBatchFinished();
}
})
.using(OffsetCommitPolicy.always())
.build();

executor = Executors.newSingleThreadExecutor();
executor.execute(engine);
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
stop();
}
}));

logger.info("Debezium engine started.");
}

private void notifyListeners(ChangeEvent<SourceRecord, SourceRecord> changeEvent) {
for (DbChangeListener listener : listeners) {
try {
listener.onDbChange(changeEvent);
} catch (Exception e) {
this.disable();
break;
}
}
}

@Override
public void stop() {
if (engine == null) {
logger.warn("Debezium engine is not running.");
return;
}

try {
RocksDBOffsetBackingStore.getInstance().stop();
executor.shutdown();
engine.close();

if (executor != null) {
if (!executor.awaitTermination(5L, TimeUnit.SECONDS)) {
logger.info("Waiting 5 seconds for the Debezium engine to shut down...");
executor.shutdownNow();
}
}
} catch (InterruptedException ie) {
logger.error("Interrupted while waiting for termination", ie);
Thread.currentThread().interrupt();
} catch (Exception e) {
logger.error("Error stopping Debezium engine", e);
executor.shutdownNow();
} finally {
engine = null;
executor = null;
}
}

@Override
public void reset() {
enable(); // Reset enables the service again
stop();
start();
logger.info("Debezium engine restarted.");
}

@Override
public void disable() {
disabled = true;
stop();
logger.warn("Debezium engine disabled due to error.");
}

@Override
public void enable() {
disabled = false;
logger.info("Debezium engine enabled.");
}

@Override
public boolean isDisabled() {
return disabled;
}
}
Loading