From 83f9287f2007153c37360afb199550e69dc2021b Mon Sep 17 00:00:00 2001 From: smallgod Date: Tue, 8 Oct 2024 09:47:17 +0300 Subject: [PATCH 1/4] testing issue with dependencies conflict --- api/pom.xml | 10 + .../mambacore/api/dao/FlattenDatabaseDao.java | 8 + .../api/dao/impl/JdbcFlattenDatabaseDao.java | 18 +- .../db/debezium/DebeziumConfigProvider.java | 70 +++++ .../db/debezium/DebeziumEventService.java | 54 ++++ .../db/debezium/DebeziumListener.java | 65 +++++ .../mambacore/util/MambaETLProperties.java | 264 ++++++++++-------- .../xf_system/sp_mamba_dim_table_insert.sql | 2 +- pom.xml | 85 ++++++ 9 files changed, 454 insertions(+), 122 deletions(-) create mode 100644 api/src/main/java/org/openmrs/module/mambacore/db/debezium/DebeziumConfigProvider.java create mode 100644 api/src/main/java/org/openmrs/module/mambacore/db/debezium/DebeziumEventService.java create mode 100644 api/src/main/java/org/openmrs/module/mambacore/db/debezium/DebeziumListener.java diff --git a/api/pom.xml b/api/pom.xml index ccb1bdb9..48a8a487 100644 --- a/api/pom.xml +++ b/api/pom.xml @@ -73,6 +73,16 @@ commons-dbcp2 + + io.debezium + debezium-embedded + + + + io.debezium + debezium-connector-mysql + + diff --git a/api/src/main/java/org/openmrs/module/mambacore/api/dao/FlattenDatabaseDao.java b/api/src/main/java/org/openmrs/module/mambacore/api/dao/FlattenDatabaseDao.java index 627e9c21..4abc4ea6 100644 --- a/api/src/main/java/org/openmrs/module/mambacore/api/dao/FlattenDatabaseDao.java +++ b/api/src/main/java/org/openmrs/module/mambacore/api/dao/FlattenDatabaseDao.java @@ -11,5 +11,13 @@ public interface FlattenDatabaseDao { + /** + * Deploy MambaETL stored procedures + */ void deployMambaEtl(); + + /** + * Stream in database changes using Debezium + */ + void streamInDatabaseChanges(); } diff --git a/api/src/main/java/org/openmrs/module/mambacore/api/dao/impl/JdbcFlattenDatabaseDao.java b/api/src/main/java/org/openmrs/module/mambacore/api/dao/impl/JdbcFlattenDatabaseDao.java index aa2b66df..a8a0fed2 100644 --- a/api/src/main/java/org/openmrs/module/mambacore/api/dao/impl/JdbcFlattenDatabaseDao.java +++ b/api/src/main/java/org/openmrs/module/mambacore/api/dao/impl/JdbcFlattenDatabaseDao.java @@ -2,6 +2,7 @@ import org.openmrs.module.mambacore.api.dao.FlattenDatabaseDao; import org.openmrs.module.mambacore.db.ConnectionPoolManager; +//import org.openmrs.module.mambacore.db.debezium.DebeziumListener; import org.openmrs.module.mambacore.util.MambaETLProperties; import org.openmrs.module.mambacore.util.StringReplacerUtil; import org.slf4j.Logger; @@ -29,13 +30,26 @@ public class JdbcFlattenDatabaseDao implements FlattenDatabaseDao { private static final String MYSQL_COMMENT_REGEX = "--.*(?=\\n)"; private static final String DELIMITER = "~-~-"; +// private DebeziumListener debeziumListener = new DebeziumListener(); + + /** + * Deploy MambaETL stored procedures + */ @Override public void deployMambaEtl() { + log.info("Deploying MambaETL..."); MambaETLProperties props = MambaETLProperties.getInstance(); - log.info("Deploying MambaETL, scheduled @interval: " + props.getInterval() + " seconds..."); executeSqlScript(props); - log.info("Done deploying MambaETL..."); + log.info("MambaETL deployed (with interval: " + props.getInterval() + "s )..."); + } + + /** + * Stream in database changes using Debezium + */ + @Override + public void streamInDatabaseChanges() { + //debeziumListener.startListening(); } private void executeSqlScript(MambaETLProperties props) { diff --git a/api/src/main/java/org/openmrs/module/mambacore/db/debezium/DebeziumConfigProvider.java b/api/src/main/java/org/openmrs/module/mambacore/db/debezium/DebeziumConfigProvider.java new file mode 100644 index 00000000..f0f83b8c --- /dev/null +++ b/api/src/main/java/org/openmrs/module/mambacore/db/debezium/DebeziumConfigProvider.java @@ -0,0 +1,70 @@ +///** +// * This Source Code Form is subject to the terms of the Mozilla Public License, +// * v. 2.0. If a copy of the MPL was not distributed with this file, You can +// * obtain one at http://mozilla.org/MPL/2.0/. OpenMRS is also distributed under +// * the terms of the Healthcare Disclaimer located at http://openmrs.org/license. +// *

+// * Copyright (C) OpenMRS Inc. OpenMRS is a registered trademark and the OpenMRS +// * graphic logo is a trademark of OpenMRS Inc. +// */ +//package org.openmrs.module.mambacore.db.debezium; +// +//import io.debezium.config.Configuration; +//import org.openmrs.module.mambacore.util.MambaETLProperties; +// +//import java.util.stream.Collectors; +// +///** +// * Provides configuration for Debezium connector +// */ +//public class DebeziumConfigProvider { +// +// private static final String MYSQL_CONNECTOR_CLASS = "io.debezium.connector.mysql.MySqlConnector"; +// +// private static final String CONNECTOR_NAME = "mamba-core-connector"; +// +// private static final String SERVER_NAME = "openmrsDbServer"; +// +// private static volatile DebeziumConfigProvider instance = null; +// +// private static Configuration config; +// +// private final MambaETLProperties props = MambaETLProperties.getInstance(); +// +// private DebeziumConfigProvider() { +// initializeConfig(); +// } +// +// public static DebeziumConfigProvider getInstance() { +// if (instance == null) { +// synchronized (DebeziumConfigProvider.class) { +// if (instance == null) { +// instance = new DebeziumConfigProvider(); +// } +// } +// } +// return instance; +// } +// +// private void initializeConfig() { +// String source = props.getOpenmrsDatabase(); +// +// config = Configuration.create().with("name", CONNECTOR_NAME).with("connector.class", MYSQL_CONNECTOR_CLASS) +// .with("database.server.id", props.getSourceDatabaseServerId()).with("database.server.name", SERVER_NAME) +// .with("database.hostname", props.getSourceDatabaseHost()) +// .with("database.port", props.getSourceDatabasePort()).with("database.user", props.getMambaETLuser()) +// .with("database.password", props.getMambaETLuserPassword()).with("database.include.list", source) +// .with("table.include.list", getTableIncludeList(source)) +// .with("database.history.file.filename", props.getHistoryFilePath()).build(); +// } +// +// private String getTableIncludeList(String source) { +// return props.getTablesWithChangesToStream().stream() +// .map(table -> source + "." + table) +// .collect(Collectors.joining(",")); +// } +// +// public Configuration build() { +// return config; +// } +//} diff --git a/api/src/main/java/org/openmrs/module/mambacore/db/debezium/DebeziumEventService.java b/api/src/main/java/org/openmrs/module/mambacore/db/debezium/DebeziumEventService.java new file mode 100644 index 00000000..66f70bc4 --- /dev/null +++ b/api/src/main/java/org/openmrs/module/mambacore/db/debezium/DebeziumEventService.java @@ -0,0 +1,54 @@ +//package org.openmrs.module.mambacore.db.debezium; +// +//import org.slf4j.Logger; +//import org.slf4j.LoggerFactory; +// +//import java.util.Map; +// +//public class DebeziumEventService { +// +// private static final Logger log = LoggerFactory.getLogger(DebeziumEventService.class); +// +// public DebeziumEventService() { +// } +// +// public void processDebeziumEvent(Map changeEvent) { +// // Similar logic to handle insert, update, and delete events +// // Extract and process data from the event map +// Map payload = (Map) changeEvent.get("payload"); +// if (payload == null) { +// return; +// } +// System.out.println("Payload: " + payload); +// +// String operation = (String) payload.get("op"); +// Map source = (Map) payload.get("source"); +// String databaseName = (String) source.get("db"); +// String tableName = (String) source.get("table"); +// +// Map before = (Map) payload.get("before"); +// Map after = (Map) payload.get("after"); +// +// switch (operation) { +// +// case "c": // INSERT +// System.out.println("INSERT event captured on " + databaseName + "." + tableName); +// System.out.println("New row data: " + after); +// break; +// +// case "u": // UPDATE +// System.out.println("UPDATE event captured on " + databaseName + "." + tableName); +// System.out.println("Before: " + before); +// System.out.println("After: " + after); +// break; +// +// case "d": // DELETE +// System.out.println("DELETE event captured on " + databaseName + "." + tableName); +// System.out.println("Deleted row data: " + before); +// break; +// +// default: +// System.out.println("Unknown event type: " + operation); +// } +// } +//} \ No newline at end of file diff --git a/api/src/main/java/org/openmrs/module/mambacore/db/debezium/DebeziumListener.java b/api/src/main/java/org/openmrs/module/mambacore/db/debezium/DebeziumListener.java new file mode 100644 index 00000000..51c2134b --- /dev/null +++ b/api/src/main/java/org/openmrs/module/mambacore/db/debezium/DebeziumListener.java @@ -0,0 +1,65 @@ +//package org.openmrs.module.mambacore.db.debezium; +// +//import com.fasterxml.jackson.databind.ObjectMapper; +//import io.debezium.config.Configuration; +//import io.debezium.engine.ChangeEvent; +//import io.debezium.engine.DebeziumEngine; +//import io.debezium.engine.format.Json; +// +//import java.util.Map; +// +//public class DebeziumListener { +// +// private final DebeziumEventService debeziumEventService; +// private final ObjectMapper objectMapper; +// +// public DebeziumListener() { +// this.debeziumEventService = new DebeziumEventService(); +// this.objectMapper = new ObjectMapper(); // Initialize Jackson ObjectMapper for JSON parsing +// } +// +// public void startListening() { +// // Step 1: Get the configuration from DebeziumConfigProvider +// Configuration config = DebeziumConfigProvider +// .getInstance() +// .build(); +// +// // Step 2: Set up the Debezium Engine to capture JSON events +// DebeziumEngine> engine = DebeziumEngine.create(Json.class) +// .using(config.asProperties()) +// .notifying(this::handleDebeziumEvent) +// .build(); +// +// // Step 3: Run the Debezium Engine in a separate thread +//// ExecutorService executor = Executors.newSingleThreadExecutor(); +//// executor.submit(engine); +// +// // Step 4: Gracefully shutdown the engine on JVM exit +// Runtime.getRuntime().addShutdownHook(new Thread(() -> { +// try { +// engine.close(); +// } catch (Exception e) { +// e.printStackTrace(); +// } +// })); +// } +// +// private void handleDebeziumEvent(ChangeEvent event) { +// try { +// // Step 5: Parse the JSON string from the event +// String jsonString = event.value(); +// if (jsonString == null) { +// return; // Ignore empty events +// } +// +// // Convert the JSON string into a Map using Jackson ObjectMapper +// Map changeEvent = objectMapper.readValue(jsonString, Map.class); +// +// // Step 6: Delegate the event to the service for transactional handling +// debeziumEventService.processDebeziumEvent(changeEvent); +// +// } catch (Exception e) { +// e.printStackTrace(); +// } +// } +//} \ No newline at end of file diff --git a/api/src/main/java/org/openmrs/module/mambacore/util/MambaETLProperties.java b/api/src/main/java/org/openmrs/module/mambacore/util/MambaETLProperties.java index e2bbc9d8..a42d9a2a 100644 --- a/api/src/main/java/org/openmrs/module/mambacore/util/MambaETLProperties.java +++ b/api/src/main/java/org/openmrs/module/mambacore/util/MambaETLProperties.java @@ -2,126 +2,152 @@ import org.openmrs.api.context.Context; +import java.util.Arrays; +import java.util.List; import java.util.Properties; public class MambaETLProperties { - - private static MambaETLProperties instance; - - private final String locale; - - private final int columns; - - private final int incremental; - - private final int automated; - - private final int interval; - - private final String openmrsDbDriver; - - private final String openmrsDbConnectionUrl; - - private final String mambaETLuser; - - private final String mambaETLuserPassword; - - private final String openmrsDatabase; - - private final String etlDatababase; - - private final int connectionInitialSize = 4; - - private final int connectionMaxTotal = 20; - - private MambaETLProperties() { - - Properties properties = Context.getRuntimeProperties(); - - this.openmrsDbDriver = properties.getProperty("connection.driver_class"); - this.openmrsDbConnectionUrl = properties.getProperty("connection.url"); - - this.mambaETLuser = properties.getProperty("mambaetl.analysis.db.username", - properties.getProperty("connection.username")); - this.mambaETLuserPassword = properties.getProperty("mambaetl.analysis.db.password", - properties.getProperty("connection.password")); - - this.openmrsDatabase = getProperty(properties, "mambaetl.analysis.db.openmrs_database", "openmrs"); - this.etlDatababase = getProperty(properties, "mambaetl.analysis.db.etl_database", "analysis_db"); - - this.locale = getProperty(properties, "mambaetl.analysis.locale", "en"); - this.columns = getIntProperty(properties, "mambaetl.analysis.columns", 40); - this.incremental = getIntProperty(properties, "mambaetl.analysis.incremental_mode", 1); - this.automated = getIntProperty(properties, "mambaetl.analysis.automated_flattening", 0); - this.interval = getIntProperty(properties, "mambaetl.analysis.etl_interval", 300); - } - - public static synchronized MambaETLProperties getInstance() { - if (instance == null) { - instance = new MambaETLProperties(); - } - return instance; - } - - public String getLocale() { - return locale; - } - - public int getColumns() { - return columns; - } - - public int getIncremental() { - return incremental; - } - - public int getAutomated() { - return automated; - } - - public int getInterval() { - return interval; - } - - public String getOpenmrsDbDriver() { - return openmrsDbDriver; - } - - public String getOpenmrsDbConnectionUrl() { - return openmrsDbConnectionUrl; - } - - public String getMambaETLuser() { - return mambaETLuser; - } - - public String getMambaETLuserPassword() { - return mambaETLuserPassword; - } - - public String getOpenmrsDatabase() { - return openmrsDatabase; - } - - public String getEtlDatababase() { - return etlDatababase; - } - - public int getConnectionInitialSize() { - return connectionInitialSize; - } - - public int getConnectionMaxTotal() { - return connectionMaxTotal; - } - - private String getProperty(Properties properties, String key, String defaultValue) { - String value = properties.getProperty(key); - return (value == null || value.isEmpty()) ? defaultValue : value.trim(); - } - - private int getIntProperty(Properties properties, String key, int defaultValue) { - String value = properties.getProperty(key); - return (value == null || value.isEmpty()) ? defaultValue : Integer.parseInt(value.trim()); - } + + private static MambaETLProperties instance; + + private final String locale; + + private final int columns; + + private final int incremental; + + private final int automated; + + private final int interval; + + private final String openmrsDbDriver; + + private final String openmrsDbConnectionUrl; + + private final String mambaETLuser; + + private final String mambaETLuserPassword; + + private final String openmrsDatabase; + + private final String etlDatababase; + + private final List tablesWithChangesToStream; + + private final int connectionInitialSize = 4; + + private final int connectionMaxTotal = 20; + + private MambaETLProperties() { + + Properties properties = Context.getRuntimeProperties(); + + this.openmrsDbDriver = properties.getProperty("connection.driver_class"); + this.openmrsDbConnectionUrl = properties.getProperty("connection.url"); + + this.mambaETLuser = properties.getProperty("mambaetl.analysis.db.username", + properties.getProperty("connection.username")); + this.mambaETLuserPassword = properties.getProperty("mambaetl.analysis.db.password", + properties.getProperty("connection.password")); + + this.openmrsDatabase = getProperty(properties, "mambaetl.analysis.db.openmrs_database", "openmrs"); + this.etlDatababase = getProperty(properties, "mambaetl.analysis.db.etl_database", "analysis_db"); + + this.locale = getProperty(properties, "mambaetl.analysis.locale", "en"); + this.columns = getIntProperty(properties, "mambaetl.analysis.columns", 40); + this.incremental = getIntProperty(properties, "mambaetl.analysis.incremental_mode", 1); + this.automated = getIntProperty(properties, "mambaetl.analysis.automated_flattening", 0); + this.interval = getIntProperty(properties, "mambaetl.analysis.etl_interval", 300); + this.tablesWithChangesToStream = Arrays.asList("obs", "encounter");//TODO include more + } + + public static synchronized MambaETLProperties getInstance() { + if (instance == null) { + instance = new MambaETLProperties(); + } + return instance; + } + + public String getLocale() { + return locale; + } + + public int getColumns() { + return columns; + } + + public int getIncremental() { + return incremental; + } + + public int getAutomated() { + return automated; + } + + public int getInterval() { + return interval; + } + + public String getOpenmrsDbDriver() { + return openmrsDbDriver; + } + + public String getOpenmrsDbConnectionUrl() { + return openmrsDbConnectionUrl; + } + + public String getMambaETLuser() { + return mambaETLuser; + } + + public String getMambaETLuserPassword() { + return mambaETLuserPassword; + } + + public String getOpenmrsDatabase() { + return openmrsDatabase; + } + + public String getEtlDatababase() { + return etlDatababase; + } + + public int getConnectionInitialSize() { + return connectionInitialSize; + } + + public int getConnectionMaxTotal() { + return connectionMaxTotal; + } + + public List getTablesWithChangesToStream() { + return tablesWithChangesToStream; + } + + // Example MambaETLProperties methods + public String getSourceDatabaseHost() { + return "localhost"; // TODO: put in properties file + } + + public String getSourceDatabasePort() { + return "3306"; // TODO: put in properties file + } + + public String getSourceDatabaseServerId() { + return "85744"; // TODO: put in properties file + } + + public String getHistoryFilePath() { + return "dbhistory.dat"; // TODO: put in properties file + } + + private String getProperty(Properties properties, String key, String defaultValue) { + String value = properties.getProperty(key); + return (value == null || value.isEmpty()) ? defaultValue : value.trim(); + } + + private int getIntProperty(Properties properties, String key, int defaultValue) { + String value = properties.getProperty(key); + return (value == null || value.isEmpty()) ? defaultValue : Integer.parseInt(value.trim()); + } } diff --git a/api/src/main/resources/_core/database/mysql/xf_system/sp_mamba_dim_table_insert.sql b/api/src/main/resources/_core/database/mysql/xf_system/sp_mamba_dim_table_insert.sql index 6b78af1d..b1f0cd15 100644 --- a/api/src/main/resources/_core/database/mysql/xf_system/sp_mamba_dim_table_insert.sql +++ b/api/src/main/resources/_core/database/mysql/xf_system/sp_mamba_dim_table_insert.sql @@ -68,7 +68,7 @@ BEGIN -- Set the join clause if it is an incremental insert IF is_incremental THEN SET join_clause = CONCAT( - ' INNER JOIN mamba_etl_incremental_columns_index_new ic', + ' INNER JOIN ic', ' ON tb.', pkey_column, ' = ic.incremental_table_pkey'); END IF; diff --git a/pom.xml b/pom.xml index 4c79439f..67d9ba45 100644 --- a/pom.xml +++ b/pom.xml @@ -122,6 +122,90 @@ + + io.debezium + debezium-embedded + ${debezium.version} + + + org.slf4j + slf4j-api + + + org.springframework.boot + spring-boot-starter + + + org.springframework + spring-context + + + org.springframework + spring-beans + + + org.springframework + spring-core + + + + + + io.debezium + debezium-connector-mysql + ${debezium.version} + + + org.slf4j + slf4j-api + + + org.springframework.boot + spring-boot-starter + + + org.springframework + spring-context + + + org.springframework + spring-beans + + + org.springframework + spring-core + + + + + + io.debezium + debezium-api + ${debezium.version} + + + org.slf4j + slf4j-api + + + org.springframework.boot + spring-boot-starter + + + org.springframework + spring-context + + + org.springframework + spring-beans + + + org.springframework + spring-core + + + + @@ -194,6 +278,7 @@ 2.21.0 2.11.0 2.15.3 + 1.9.7.Final From d118ae28e17523e527dac7cb00f6ce1a33ff8f61 Mon Sep 17 00:00:00 2001 From: smallgod Date: Fri, 11 Oct 2024 12:04:09 +0300 Subject: [PATCH 2/4] Resolve Debezium-mysql conflict issue --- api/pom.xml | 7 +- .../api/dao/impl/JdbcFlattenDatabaseDao.java | 6 +- .../api/impl/FlattenDatabaseServiceImpl.java | 5 +- .../db/debezium/DebeziumConfigProvider.java | 140 +++++++++--------- .../db/debezium/DebeziumEventService.java | 108 +++++++------- .../db/debezium/DebeziumListener.java | 132 +++++++++-------- pom.xml | 112 +++++++------- 7 files changed, 255 insertions(+), 255 deletions(-) diff --git a/api/pom.xml b/api/pom.xml index 48a8a487..67a52e85 100644 --- a/api/pom.xml +++ b/api/pom.xml @@ -78,6 +78,11 @@ debezium-embedded + + io.debezium + debezium-api + + io.debezium debezium-connector-mysql @@ -135,4 +140,4 @@ - + \ No newline at end of file diff --git a/api/src/main/java/org/openmrs/module/mambacore/api/dao/impl/JdbcFlattenDatabaseDao.java b/api/src/main/java/org/openmrs/module/mambacore/api/dao/impl/JdbcFlattenDatabaseDao.java index a8a0fed2..293cc362 100644 --- a/api/src/main/java/org/openmrs/module/mambacore/api/dao/impl/JdbcFlattenDatabaseDao.java +++ b/api/src/main/java/org/openmrs/module/mambacore/api/dao/impl/JdbcFlattenDatabaseDao.java @@ -2,7 +2,7 @@ import org.openmrs.module.mambacore.api.dao.FlattenDatabaseDao; import org.openmrs.module.mambacore.db.ConnectionPoolManager; -//import org.openmrs.module.mambacore.db.debezium.DebeziumListener; +import org.openmrs.module.mambacore.db.debezium.DebeziumListener; import org.openmrs.module.mambacore.util.MambaETLProperties; import org.openmrs.module.mambacore.util.StringReplacerUtil; import org.slf4j.Logger; @@ -30,7 +30,7 @@ public class JdbcFlattenDatabaseDao implements FlattenDatabaseDao { private static final String MYSQL_COMMENT_REGEX = "--.*(?=\\n)"; private static final String DELIMITER = "~-~-"; -// private DebeziumListener debeziumListener = new DebeziumListener(); + private DebeziumListener debeziumListener = new DebeziumListener(); /** * Deploy MambaETL stored procedures @@ -49,7 +49,7 @@ public void deployMambaEtl() { */ @Override public void streamInDatabaseChanges() { - //debeziumListener.startListening(); + debeziumListener.startListening(); } private void executeSqlScript(MambaETLProperties props) { diff --git a/api/src/main/java/org/openmrs/module/mambacore/api/impl/FlattenDatabaseServiceImpl.java b/api/src/main/java/org/openmrs/module/mambacore/api/impl/FlattenDatabaseServiceImpl.java index edee6afe..ef6ffb4a 100644 --- a/api/src/main/java/org/openmrs/module/mambacore/api/impl/FlattenDatabaseServiceImpl.java +++ b/api/src/main/java/org/openmrs/module/mambacore/api/impl/FlattenDatabaseServiceImpl.java @@ -16,7 +16,7 @@ import org.slf4j.LoggerFactory; import org.springframework.transaction.annotation.Transactional; -import javax.annotation.PreDestroy; +//import javax.annotation.PreDestroy; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -38,6 +38,7 @@ public void setupEtl() { executorService.submit(() -> { try { dao.deployMambaEtl(); + dao.streamInDatabaseChanges(); } catch (Exception e) { log.error("Error deploying Mamba ETL", e); } @@ -46,7 +47,7 @@ public void setupEtl() { @Override - @PreDestroy + //@PreDestroy public void shutdownEtlThread() { executorService.shutdown(); try { diff --git a/api/src/main/java/org/openmrs/module/mambacore/db/debezium/DebeziumConfigProvider.java b/api/src/main/java/org/openmrs/module/mambacore/db/debezium/DebeziumConfigProvider.java index f0f83b8c..07953c49 100644 --- a/api/src/main/java/org/openmrs/module/mambacore/db/debezium/DebeziumConfigProvider.java +++ b/api/src/main/java/org/openmrs/module/mambacore/db/debezium/DebeziumConfigProvider.java @@ -1,70 +1,70 @@ -///** -// * This Source Code Form is subject to the terms of the Mozilla Public License, -// * v. 2.0. If a copy of the MPL was not distributed with this file, You can -// * obtain one at http://mozilla.org/MPL/2.0/. OpenMRS is also distributed under -// * the terms of the Healthcare Disclaimer located at http://openmrs.org/license. -// *

-// * Copyright (C) OpenMRS Inc. OpenMRS is a registered trademark and the OpenMRS -// * graphic logo is a trademark of OpenMRS Inc. -// */ -//package org.openmrs.module.mambacore.db.debezium; -// -//import io.debezium.config.Configuration; -//import org.openmrs.module.mambacore.util.MambaETLProperties; -// -//import java.util.stream.Collectors; -// -///** -// * Provides configuration for Debezium connector -// */ -//public class DebeziumConfigProvider { -// -// private static final String MYSQL_CONNECTOR_CLASS = "io.debezium.connector.mysql.MySqlConnector"; -// -// private static final String CONNECTOR_NAME = "mamba-core-connector"; -// -// private static final String SERVER_NAME = "openmrsDbServer"; -// -// private static volatile DebeziumConfigProvider instance = null; -// -// private static Configuration config; -// -// private final MambaETLProperties props = MambaETLProperties.getInstance(); -// -// private DebeziumConfigProvider() { -// initializeConfig(); -// } -// -// public static DebeziumConfigProvider getInstance() { -// if (instance == null) { -// synchronized (DebeziumConfigProvider.class) { -// if (instance == null) { -// instance = new DebeziumConfigProvider(); -// } -// } -// } -// return instance; -// } -// -// private void initializeConfig() { -// String source = props.getOpenmrsDatabase(); -// -// config = Configuration.create().with("name", CONNECTOR_NAME).with("connector.class", MYSQL_CONNECTOR_CLASS) -// .with("database.server.id", props.getSourceDatabaseServerId()).with("database.server.name", SERVER_NAME) -// .with("database.hostname", props.getSourceDatabaseHost()) -// .with("database.port", props.getSourceDatabasePort()).with("database.user", props.getMambaETLuser()) -// .with("database.password", props.getMambaETLuserPassword()).with("database.include.list", source) -// .with("table.include.list", getTableIncludeList(source)) -// .with("database.history.file.filename", props.getHistoryFilePath()).build(); -// } -// -// private String getTableIncludeList(String source) { -// return props.getTablesWithChangesToStream().stream() -// .map(table -> source + "." + table) -// .collect(Collectors.joining(",")); -// } -// -// public Configuration build() { -// return config; -// } -//} +/** + * This Source Code Form is subject to the terms of the Mozilla Public License, + * v. 2.0. If a copy of the MPL was not distributed with this file, You can + * obtain one at http://mozilla.org/MPL/2.0/. OpenMRS is also distributed under + * the terms of the Healthcare Disclaimer located at http://openmrs.org/license. + *

+ * Copyright (C) OpenMRS Inc. OpenMRS is a registered trademark and the OpenMRS + * graphic logo is a trademark of OpenMRS Inc. + */ +package org.openmrs.module.mambacore.db.debezium; + +import io.debezium.config.Configuration; +import org.openmrs.module.mambacore.util.MambaETLProperties; + +import java.util.stream.Collectors; + +/** + * Provides configuration for Debezium connector + */ +public class DebeziumConfigProvider { + + private static final String MYSQL_CONNECTOR_CLASS = "io.debezium.connector.mysql.MySqlConnector"; + + private static final String CONNECTOR_NAME = "mamba-core-connector"; + + private static final String SERVER_NAME = "openmrsDbServer"; + + private static volatile DebeziumConfigProvider instance = null; + + private static Configuration config; + + private final MambaETLProperties props = MambaETLProperties.getInstance(); + + private DebeziumConfigProvider() { + initializeConfig(); + } + + public static DebeziumConfigProvider getInstance() { + if (instance == null) { + synchronized (DebeziumConfigProvider.class) { + if (instance == null) { + instance = new DebeziumConfigProvider(); + } + } + } + return instance; + } + + private void initializeConfig() { + String source = props.getOpenmrsDatabase(); + + config = Configuration.create().with("name", CONNECTOR_NAME).with("connector.class", MYSQL_CONNECTOR_CLASS) + .with("database.server.id", props.getSourceDatabaseServerId()).with("database.server.name", SERVER_NAME) + .with("database.hostname", props.getSourceDatabaseHost()) + .with("database.port", props.getSourceDatabasePort()).with("database.user", props.getMambaETLuser()) + .with("database.password", props.getMambaETLuserPassword()).with("database.include.list", source) + //.with("table.include.list", getTableIncludeList(source)) + .with("database.history.file.filename", props.getHistoryFilePath()).build(); + } + + private String getTableIncludeList(String source) { + return props.getTablesWithChangesToStream().stream() + .map(table -> source + "." + table) + .collect(Collectors.joining(",")); + } + + public Configuration build() { + return config; + } +} diff --git a/api/src/main/java/org/openmrs/module/mambacore/db/debezium/DebeziumEventService.java b/api/src/main/java/org/openmrs/module/mambacore/db/debezium/DebeziumEventService.java index 66f70bc4..bf4450bd 100644 --- a/api/src/main/java/org/openmrs/module/mambacore/db/debezium/DebeziumEventService.java +++ b/api/src/main/java/org/openmrs/module/mambacore/db/debezium/DebeziumEventService.java @@ -1,54 +1,54 @@ -//package org.openmrs.module.mambacore.db.debezium; -// -//import org.slf4j.Logger; -//import org.slf4j.LoggerFactory; -// -//import java.util.Map; -// -//public class DebeziumEventService { -// -// private static final Logger log = LoggerFactory.getLogger(DebeziumEventService.class); -// -// public DebeziumEventService() { -// } -// -// public void processDebeziumEvent(Map changeEvent) { -// // Similar logic to handle insert, update, and delete events -// // Extract and process data from the event map -// Map payload = (Map) changeEvent.get("payload"); -// if (payload == null) { -// return; -// } -// System.out.println("Payload: " + payload); -// -// String operation = (String) payload.get("op"); -// Map source = (Map) payload.get("source"); -// String databaseName = (String) source.get("db"); -// String tableName = (String) source.get("table"); -// -// Map before = (Map) payload.get("before"); -// Map after = (Map) payload.get("after"); -// -// switch (operation) { -// -// case "c": // INSERT -// System.out.println("INSERT event captured on " + databaseName + "." + tableName); -// System.out.println("New row data: " + after); -// break; -// -// case "u": // UPDATE -// System.out.println("UPDATE event captured on " + databaseName + "." + tableName); -// System.out.println("Before: " + before); -// System.out.println("After: " + after); -// break; -// -// case "d": // DELETE -// System.out.println("DELETE event captured on " + databaseName + "." + tableName); -// System.out.println("Deleted row data: " + before); -// break; -// -// default: -// System.out.println("Unknown event type: " + operation); -// } -// } -//} \ No newline at end of file +package org.openmrs.module.mambacore.db.debezium; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; + +public class DebeziumEventService { + + private static final Logger log = LoggerFactory.getLogger(DebeziumEventService.class); + + public DebeziumEventService() { + } + + public void processDebeziumEvent(Map changeEvent) { + // Similar logic to handle insert, update, and delete events + // Extract and process data from the event map + Map payload = (Map) changeEvent.get("payload"); + if (payload == null) { + return; + } + System.out.println("Payload: " + payload); + + String operation = (String) payload.get("op"); + Map source = (Map) payload.get("source"); + String databaseName = (String) source.get("db"); + String tableName = (String) source.get("table"); + + Map before = (Map) payload.get("before"); + Map after = (Map) payload.get("after"); + + switch (operation) { + + case "c": // INSERT + System.out.println("INSERT event captured on " + databaseName + "." + tableName); + System.out.println("New row data: " + after); + break; + + case "u": // UPDATE + System.out.println("UPDATE event captured on " + databaseName + "." + tableName); + System.out.println("Before: " + before); + System.out.println("After: " + after); + break; + + case "d": // DELETE + System.out.println("DELETE event captured on " + databaseName + "." + tableName); + System.out.println("Deleted row data: " + before); + break; + + default: + System.out.println("Unknown event type: " + operation); + } + } +} \ No newline at end of file diff --git a/api/src/main/java/org/openmrs/module/mambacore/db/debezium/DebeziumListener.java b/api/src/main/java/org/openmrs/module/mambacore/db/debezium/DebeziumListener.java index 51c2134b..59cb1b54 100644 --- a/api/src/main/java/org/openmrs/module/mambacore/db/debezium/DebeziumListener.java +++ b/api/src/main/java/org/openmrs/module/mambacore/db/debezium/DebeziumListener.java @@ -1,65 +1,67 @@ -//package org.openmrs.module.mambacore.db.debezium; -// -//import com.fasterxml.jackson.databind.ObjectMapper; -//import io.debezium.config.Configuration; -//import io.debezium.engine.ChangeEvent; -//import io.debezium.engine.DebeziumEngine; -//import io.debezium.engine.format.Json; -// -//import java.util.Map; -// -//public class DebeziumListener { -// -// private final DebeziumEventService debeziumEventService; -// private final ObjectMapper objectMapper; -// -// public DebeziumListener() { -// this.debeziumEventService = new DebeziumEventService(); -// this.objectMapper = new ObjectMapper(); // Initialize Jackson ObjectMapper for JSON parsing -// } -// -// public void startListening() { -// // Step 1: Get the configuration from DebeziumConfigProvider -// Configuration config = DebeziumConfigProvider -// .getInstance() -// .build(); -// -// // Step 2: Set up the Debezium Engine to capture JSON events -// DebeziumEngine> engine = DebeziumEngine.create(Json.class) -// .using(config.asProperties()) -// .notifying(this::handleDebeziumEvent) -// .build(); -// -// // Step 3: Run the Debezium Engine in a separate thread -//// ExecutorService executor = Executors.newSingleThreadExecutor(); -//// executor.submit(engine); -// -// // Step 4: Gracefully shutdown the engine on JVM exit -// Runtime.getRuntime().addShutdownHook(new Thread(() -> { -// try { -// engine.close(); -// } catch (Exception e) { -// e.printStackTrace(); -// } -// })); -// } -// -// private void handleDebeziumEvent(ChangeEvent event) { -// try { -// // Step 5: Parse the JSON string from the event -// String jsonString = event.value(); -// if (jsonString == null) { -// return; // Ignore empty events -// } -// -// // Convert the JSON string into a Map using Jackson ObjectMapper -// Map changeEvent = objectMapper.readValue(jsonString, Map.class); -// -// // Step 6: Delegate the event to the service for transactional handling -// debeziumEventService.processDebeziumEvent(changeEvent); -// -// } catch (Exception e) { -// e.printStackTrace(); -// } -// } -//} \ No newline at end of file +package org.openmrs.module.mambacore.db.debezium; + +import com.fasterxml.jackson.databind.ObjectMapper; +import io.debezium.config.Configuration; +import io.debezium.engine.ChangeEvent; +import io.debezium.engine.DebeziumEngine; +import io.debezium.engine.format.Json; + +import java.util.Map; + +public class DebeziumListener { + + private final DebeziumEventService debeziumEventService; + private final ObjectMapper objectMapper; + + public DebeziumListener() { + this.debeziumEventService = new DebeziumEventService(); + this.objectMapper = new ObjectMapper(); // Initialize Jackson ObjectMapper for JSON parsing + } + + public void startListening() { + // Step 1: Get the configuration from DebeziumConfigProvider + Configuration config = DebeziumConfigProvider + .getInstance() + .build(); + + // Step 2: Set up the Debezium Engine to capture JSON events + DebeziumEngine> engine = DebeziumEngine.create(Json.class) + .using(config.asProperties()) + .notifying(this::handleDebeziumEvent) + .build(); + + // Step 3: Run the Debezium Engine in a separate thread +// ExecutorService executor = Executors.newSingleThreadExecutor(); +// executor.submit(engine); + + // Step 4: Gracefully shutdown the engine on JVM exit + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + try { + engine.close(); + } catch (Exception e) { + e.printStackTrace(); + } + })); + System.out.println("Debezium engine started..."); + } + + private void handleDebeziumEvent(ChangeEvent event) { + System.out.println("Received event: " + event); + try { + // Step 5: Parse the JSON string from the event + String jsonString = event.value(); + if (jsonString == null) { + return; // Ignore empty events + } + + // Convert the JSON string into a Map using Jackson ObjectMapper + Map changeEvent = objectMapper.readValue(jsonString, Map.class); + + // Step 6: Delegate the event to the service for transactional handling + debeziumEventService.processDebeziumEvent(changeEvent); + + } catch (Exception e) { + e.printStackTrace(); + } + } +} \ No newline at end of file diff --git a/pom.xml b/pom.xml index 67d9ba45..01334721 100644 --- a/pom.xml +++ b/pom.xml @@ -33,6 +33,26 @@ api + + 8 + ${version.java} + ${version.java} + ${version.java} + 3.0.0-M6 + 3.10.1 + 3.3.0 + 3.0.0-M7 + UTF-8 + 5.7.2 + 4.13.1 + 2.0.0 + 2.21.0 + 2.11.0 + 2.15.3 + 1.9.7.Final + + + @@ -128,24 +148,44 @@ ${debezium.version} - org.slf4j - slf4j-api + org.glassfish.hk2.external + aopalliance-repackaged + + + com.fasterxml.jackson.core + jackson-databind + + + com.fasterxml.jackson.core + jackson-core + - org.springframework.boot - spring-boot-starter + com.fasterxml.jackson.datatype + jackson-datatype-jsr310 - org.springframework - spring-context + com.fasterxml.jackson.core + jackson-annotations - org.springframework - spring-beans + javax.servlet + javax.servlet-api - org.springframework - spring-core + org.apache.commons + commons-lang3 + + + org.slf4j + slf4j-api + + + org.slf4j + slf4j-log4j12 @@ -160,20 +200,8 @@ slf4j-api - org.springframework.boot - spring-boot-starter - - - org.springframework - spring-context - - - org.springframework - spring-beans - - - org.springframework - spring-core + mysql + mysql-connector-java @@ -187,22 +215,6 @@ org.slf4j slf4j-api - - org.springframework.boot - spring-boot-starter - - - org.springframework - spring-context - - - org.springframework - spring-beans - - - org.springframework - spring-core - @@ -262,26 +274,6 @@ - - 8 - ${version.java} - ${version.java} - ${version.java} - 3.0.0-M6 - 3.10.1 - 3.3.0 - 3.0.0-M7 - UTF-8 - 5.7.2 - 4.13.1 - 2.0.0 - 2.21.0 - 2.11.0 - 2.15.3 - 1.9.7.Final - - - From 78e2015f428b4bbe0d65a8048b902d4e75b453d8 Mon Sep 17 00:00:00 2001 From: smallgod Date: Fri, 18 Oct 2024 09:01:49 +0300 Subject: [PATCH 3/4] Setup Debezium with offsets --- .../CustomFileOffsetBackingStore.java | 84 ++++ .../db/debezium/DbChangeToEvent.java | 107 +++++ .../module/mambacore/db/debezium/DbEvent.java | 91 ++++ .../mambacore/db/debezium/DbEventStatus.java | 106 +++++ .../mambacore/db/debezium/DbOperation.java | 38 ++ .../mambacore/db/debezium/DbSnapshot.java | 37 ++ .../db/debezium/DebeziumConfigProducer.java | 139 ++++++ .../db/debezium/DebeziumConstants.java | 75 ++++ .../db/debezium/DebeziumConsumer.java | 77 ++++ .../db/debezium/DebeziumEventService.java | 54 --- .../db/debezium/DebeziumListener.java | 67 --- .../db/debezium/DebeziumProperties.java | 105 +++++ .../db/debezium/DebeziumPropertiesImpl.java | 413 ++++++++++++++++++ .../mambacore/db/debezium/EventConsumer.java | 21 + .../db/debezium/MyEventConsumer.java | 0 .../mambacore/db/debezium/ObjectMap.java | 55 +++ .../mambacore/db/debezium/ObjectMapImpl.java | 33 ++ .../mambacore/db/debezium/OffsetUtils.java | 45 ++ .../DebeziumConfigProvider.java | 47 +- 19 files changed, 1456 insertions(+), 138 deletions(-) create mode 100644 api/src/main/java/org/openmrs/module/mambacore/db/debezium/CustomFileOffsetBackingStore.java create mode 100644 api/src/main/java/org/openmrs/module/mambacore/db/debezium/DbChangeToEvent.java create mode 100644 api/src/main/java/org/openmrs/module/mambacore/db/debezium/DbEvent.java create mode 100644 api/src/main/java/org/openmrs/module/mambacore/db/debezium/DbEventStatus.java create mode 100644 api/src/main/java/org/openmrs/module/mambacore/db/debezium/DbOperation.java create mode 100644 api/src/main/java/org/openmrs/module/mambacore/db/debezium/DbSnapshot.java create mode 100644 api/src/main/java/org/openmrs/module/mambacore/db/debezium/DebeziumConfigProducer.java create mode 100644 api/src/main/java/org/openmrs/module/mambacore/db/debezium/DebeziumConstants.java create mode 100644 api/src/main/java/org/openmrs/module/mambacore/db/debezium/DebeziumConsumer.java delete mode 100644 api/src/main/java/org/openmrs/module/mambacore/db/debezium/DebeziumEventService.java delete mode 100644 api/src/main/java/org/openmrs/module/mambacore/db/debezium/DebeziumListener.java create mode 100644 api/src/main/java/org/openmrs/module/mambacore/db/debezium/DebeziumProperties.java create mode 100644 api/src/main/java/org/openmrs/module/mambacore/db/debezium/DebeziumPropertiesImpl.java create mode 100644 api/src/main/java/org/openmrs/module/mambacore/db/debezium/EventConsumer.java create mode 100644 api/src/main/java/org/openmrs/module/mambacore/db/debezium/MyEventConsumer.java create mode 100644 api/src/main/java/org/openmrs/module/mambacore/db/debezium/ObjectMap.java create mode 100644 api/src/main/java/org/openmrs/module/mambacore/db/debezium/ObjectMapImpl.java create mode 100644 api/src/main/java/org/openmrs/module/mambacore/db/debezium/OffsetUtils.java rename api/src/main/java/org/openmrs/module/mambacore/{db/debezium => task}/DebeziumConfigProvider.java (72%) diff --git a/api/src/main/java/org/openmrs/module/mambacore/db/debezium/CustomFileOffsetBackingStore.java b/api/src/main/java/org/openmrs/module/mambacore/db/debezium/CustomFileOffsetBackingStore.java new file mode 100644 index 00000000..49899532 --- /dev/null +++ b/api/src/main/java/org/openmrs/module/mambacore/db/debezium/CustomFileOffsetBackingStore.java @@ -0,0 +1,84 @@ +package com.ayinza.util.debezium.application.service; + +import jakarta.inject.Inject; +import org.apache.kafka.connect.json.JsonConverter; +import org.apache.kafka.connect.json.JsonConverterConfig; +import org.apache.kafka.connect.storage.FileOffsetBackingStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; + +/** + * Custom {@link FileOffsetBackingStore} that only saves the offset if no exception was encountered + * while processing a source record read by debezium from the MySQL binlog to ensure no binlog entry + * goes unprocessed. + */ +public class CustomFileOffsetBackingStore extends FileOffsetBackingStore { + + protected static final Logger log = LoggerFactory.getLogger(CustomFileOffsetBackingStore.class); + private static final JsonConverter KEY_CONVERTER = new JsonConverter(); + private static boolean disabled = false; + + @Inject + private OffsetUtils offsetUtils; + + public CustomFileOffsetBackingStore() { + super(KEY_CONVERTER); + KEY_CONVERTER.configure(Collections.singletonMap(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, "false"), true); + } + + /** + * Disables offset storage + */ + public static void disable() { + disabled = true; + log.debug("Disabled saving of offsets"); + } + + /** + * Re-enables offset storage + */ + public static void reset() { + disabled = false; + } + + /** + * @see FileOffsetBackingStore#save() + */ + @Override + protected void save() { + + synchronized (CustomFileOffsetBackingStore.class) { + + if (disabled) { + log.warn("Skipping saving of offset because an error was encountered while processing a change event"); + return; + } + log.debug("Saving binlog offset"); + super.save(); + } + } + + /** + * @see FileOffsetBackingStore#start() + */ + @Override + public synchronized void start() { + + doStart(); + + try { + //The offset file structure changed from that generated by previous versions therefore, we need to + //transform any existing offset file to match the new structure otherwise remote sites will lose any + //events that are recorded between pre-upgrade and post-upgrade application runs of the sender. + offsetUtils.transformOffsetIfNecessary(data); + } catch (Exception e) { + throw new RuntimeException("An error occurred while verifying the existing debezium offset file data", e); + } + } + + protected void doStart() { + super.start(); + } +} diff --git a/api/src/main/java/org/openmrs/module/mambacore/db/debezium/DbChangeToEvent.java b/api/src/main/java/org/openmrs/module/mambacore/db/debezium/DbChangeToEvent.java new file mode 100644 index 00000000..5405f4e8 --- /dev/null +++ b/api/src/main/java/org/openmrs/module/mambacore/db/debezium/DbChangeToEvent.java @@ -0,0 +1,107 @@ +package com.ayinza.util.debezium.application.model; + +import com.ayinza.utils.domain.model.debezium.DbEvent; +import com.ayinza.utils.domain.model.debezium.ObjectMap; +import com.ayinza.utils.domain.vo.DbOperation; +import com.ayinza.utils.domain.vo.DbSnapshot; +import io.debezium.DebeziumException; +import io.debezium.engine.ChangeEvent; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.source.SourceRecord; + +import java.util.Optional; +import java.util.function.Function; + +/** + * Utility function that converts a Debezium {@link ChangeEvent} to a {@link DbEvent}. + */ +public class DbChangeToEvent + implements Function, DbEvent> { + + private static final String BEFORE_FIELD = "before"; + private static final String AFTER_FIELD = "after"; + private static final String SOURCE_FIELD = "source"; + private static final String OPERATION_FIELD = "op"; + private static final String TIMESTAMP_FIELD = "ts_ms"; + private static final String SNAPSHOT_FIELD = "snapshot"; + private static final String TABLE_FIELD = "table"; + private static final String NAME_FIELD = "name"; + + @Override + public DbEvent apply(ChangeEvent changeEvent) { + + SourceRecord record = Optional.ofNullable(changeEvent) + .map(ChangeEvent::value) + .orElseThrow(() -> new DebeziumException("ChangeEvent value is null")); + + Struct keyStruct = getStruct(record.key(), "key"); + Struct valueStruct = getStruct(record.value(), "value"); + Struct sourceStruct = getStruct(valueStruct, SOURCE_FIELD); + + validateKeyStruct(keyStruct); + + ObjectMap primaryKey = new ObjectMapImpl(keyStruct); + ObjectMap previousState = new ObjectMapImpl(valueStruct.getStruct(BEFORE_FIELD)); + ObjectMap newState = new ObjectMapImpl(valueStruct.getStruct(AFTER_FIELD)); + ObjectMap source = new ObjectMapImpl(sourceStruct); + + DbOperation operation = DbOperation.convertToEnum(getString(valueStruct, OPERATION_FIELD)); + ObjectMap values = (operation == DbOperation.DELETE) ? previousState : newState; + + Long timestamp = getLong(valueStruct, TIMESTAMP_FIELD); + String tableName = getString(sourceStruct, TABLE_FIELD); + String sourceName = getString(sourceStruct, NAME_FIELD); + DbSnapshot snapshot = DbSnapshot.convertToEnum(getString(sourceStruct, SNAPSHOT_FIELD)); + + return new DbEvent(primaryKey, + previousState, + newState, + source, + values, + tableName, + sourceName, + operation, + timestamp, + snapshot); + } + + /** + * Helper method to retrieve a {@link Struct} and throw an appropriate exception if it is null. + */ + private Struct getStruct(Object object, String structName) { + return Optional.ofNullable((Struct) object) + .orElseThrow(() -> new DebeziumException(structName + " struct is null")); + } + + /** + * Helper method to retrieve a String field value from a {@link Struct}. + */ + private String getString(Struct struct, String fieldName) { + return Optional.ofNullable(struct.getString(fieldName)) + .orElseThrow(() -> new DebeziumException(fieldName + " field is missing or null")); + } + + /** + * Helper method to retrieve a Long field value from a {@link Struct}. + */ + private Long getLong(Struct struct, String fieldName) { + return Optional.ofNullable(struct.getInt64(fieldName)) + .orElseThrow(() -> new DebeziumException(fieldName + " field is missing or null")); + } + + /** + * Validates the {@link Struct} for the key, ensuring it contains a single primary key. + */ + private void validateKeyStruct(Struct keyStruct) { + + int keyFieldsSize = keyStruct.schema().fields().size(); + + if (keyFieldsSize == 0) { + throw new DebeziumException("Tables with no primary key column are not supported"); + } + + if (keyFieldsSize > 1) { + throw new DebeziumException("Tables with composite primary keys are not supported"); + } + } +} \ No newline at end of file diff --git a/api/src/main/java/org/openmrs/module/mambacore/db/debezium/DbEvent.java b/api/src/main/java/org/openmrs/module/mambacore/db/debezium/DbEvent.java new file mode 100644 index 00000000..8ac4d454 --- /dev/null +++ b/api/src/main/java/org/openmrs/module/mambacore/db/debezium/DbEvent.java @@ -0,0 +1,91 @@ +package com.ayinza.util.debezium.domain.model; + +import com.ayinza.utils.domain.vo.DbOperation; +import com.ayinza.utils.domain.vo.DbSnapshot; + +public class DbEvent { + + private final ObjectMap primaryKey; + + private final String tableName; + + private final DbOperation operation; + + private final ObjectMap previousState; + + private final ObjectMap newState; + + private final DbSnapshot snapshot; + + private final Long timestamp; + + private final ObjectMap values; + + private final ObjectMap source; + + private final String sourceName; + + public DbEvent( + ObjectMap primaryKey, + ObjectMap previousState, + ObjectMap newState, + ObjectMap source, + ObjectMap values, + String tableName, + String sourceName, + DbOperation operation, + Long timestamp, + DbSnapshot snapshot) { + + this.primaryKey = primaryKey; + this.previousState = previousState; + this.newState = newState; + this.source = source; + this.values = values; + this.tableName = tableName; + this.operation = operation; + this.timestamp = timestamp; + this.snapshot = snapshot; + this.sourceName = sourceName; + } + + public ObjectMap getPrimaryKeyId() { + return primaryKey; + } + + public String getTableName() { + return tableName; + } + + public DbOperation getOperation() { + return operation; + } + + public ObjectMap getPreviousState() { + return previousState; + } + + public ObjectMap getNewState() { + return newState; + } + + public DbSnapshot getSnapshot() { + return snapshot; + } + + public Long getTimestamp() { + return timestamp; + } + + public ObjectMap getPrimaryKey() { + return primaryKey; + } + + public ObjectMap getValues() { + return values; + } + + public ObjectMap getSource() { + return source; + } +} diff --git a/api/src/main/java/org/openmrs/module/mambacore/db/debezium/DbEventStatus.java b/api/src/main/java/org/openmrs/module/mambacore/db/debezium/DbEventStatus.java new file mode 100644 index 00000000..accc091e --- /dev/null +++ b/api/src/main/java/org/openmrs/module/mambacore/db/debezium/DbEventStatus.java @@ -0,0 +1,106 @@ +package com.ayinza.util.debezium.domain.model; + +import java.time.Instant; +import java.util.Optional; + +/** + * This class represents the status of a particular event. + * It allows tracking events as they are streamed and provides information on any errors associated. + */ +public class DbEventStatus { + + private final DbEvent event; + private volatile Throwable error; + private volatile Instant timestamp; + private volatile boolean processed; + + /** + * Constructs a DbEventStatus object for a given event and sets the initial timestamp. + * + * @param event the event being tracked + */ + public DbEventStatus(DbEvent event) { + this.event = event; + this.timestamp = Instant.now(); + this.processed = false; + } + + /** + * Gets the associated event. + * + * @return the event being tracked + */ + public DbEvent getEvent() { + return event; + } + + /** + * Checks if the event has been processed. + * + * @return true if the event is processed, false otherwise + */ + public boolean isProcessed() { + return processed; + } + + /** + * Marks the event as processed. + * + * @param processed whether the event is processed or not + * @return the updated DbEventStatus object for method chaining + */ + public DbEventStatus setProcessed(boolean processed) { + this.processed = processed; + return this; + } + + /** + * Gets the error associated with the event, if any. + * + * @return an Optional containing the error, or an empty Optional if no error exists + */ + public Optional getError() { + return Optional.ofNullable(error); + } + + /** + * Sets an error for the event. + * + * @param error the error encountered during event processing + * @return the updated DbEventStatus object for method chaining + */ + public DbEventStatus setError(Throwable error) { + this.error = error; + return this; + } + + /** + * Gets the timestamp of when this status was created or last updated. + * + * @return the timestamp as an Instant + */ + public Instant getTimestamp() { + return timestamp; + } + + /** + * Updates the timestamp to the specified value. + * + * @param timestamp the new timestamp + * @return the updated DbEventStatus object for method chaining + */ + public DbEventStatus setTimestamp(Instant timestamp) { + this.timestamp = timestamp; + return this; + } + + /** + * Updates the timestamp to the current time. + * + * @return the updated DbEventStatus object for method chaining + */ + public DbEventStatus updateTimestamp() { + this.timestamp = Instant.now(); + return this; + } +} \ No newline at end of file diff --git a/api/src/main/java/org/openmrs/module/mambacore/db/debezium/DbOperation.java b/api/src/main/java/org/openmrs/module/mambacore/db/debezium/DbOperation.java new file mode 100644 index 00000000..0d8ce4a8 --- /dev/null +++ b/api/src/main/java/org/openmrs/module/mambacore/db/debezium/DbOperation.java @@ -0,0 +1,38 @@ +package com.ayinza.util.debezium.domain.model; + +/** + * @author smallGod + * @date: 16/10/2024 + */ +public enum DbOperation implements ValueObject { + + CREATE("c"), + READ("r"), + UPDATE("u"), + DELETE("d"); + + private final String value; + + DbOperation(String value) { + this.value = value; + } + + public static DbOperation convertToEnum(String value) + throws EnumConstantNotPresentException { + + for (DbOperation enumConstant : DbOperation.values()) { + if (value.equalsIgnoreCase(enumConstant.getValue())) + return enumConstant; + } + throw new EnumConstantNotPresentException(DbOperation.class, value); + } + + public String getValue() { + return this.value; + } + + @Override + public boolean isSameAs(DbOperation other) { + return this.value.equalsIgnoreCase(other.value); + } +} diff --git a/api/src/main/java/org/openmrs/module/mambacore/db/debezium/DbSnapshot.java b/api/src/main/java/org/openmrs/module/mambacore/db/debezium/DbSnapshot.java new file mode 100644 index 00000000..a8bf0e23 --- /dev/null +++ b/api/src/main/java/org/openmrs/module/mambacore/db/debezium/DbSnapshot.java @@ -0,0 +1,37 @@ +package com.ayinza.util.debezium.domain.model; + +/** + * @author smallGod + * @date: 16/10/2024 + */ +public enum DbSnapshot implements ValueObject { + + TRUE("true"), + FALSE("false"), + LAST("last"); + + private final String value; + + DbSnapshot(String value) { + this.value = value; + } + + public static DbSnapshot convertToEnum(String value) + throws EnumConstantNotPresentException { + + for (DbSnapshot enumConstant : DbSnapshot.values()) { + if (value.equalsIgnoreCase(enumConstant.getValue())) + return enumConstant; + } + throw new EnumConstantNotPresentException(DbSnapshot.class, value); + } + + public String getValue() { + return this.value; + } + + @Override + public boolean isSameAs(DbSnapshot other) { + return this.value.equalsIgnoreCase(other.value); + } +} diff --git a/api/src/main/java/org/openmrs/module/mambacore/db/debezium/DebeziumConfigProducer.java b/api/src/main/java/org/openmrs/module/mambacore/db/debezium/DebeziumConfigProducer.java new file mode 100644 index 00000000..27a10028 --- /dev/null +++ b/api/src/main/java/org/openmrs/module/mambacore/db/debezium/DebeziumConfigProducer.java @@ -0,0 +1,139 @@ +package com.ayinza.util.debezium.application.model; + +import com.ayinza.utils.domain.model.Properties; +import com.ayinza.utils.domain.model.debezium.DebeziumConstants; +import com.ayinza.utils.domain.model.debezium.DebeziumProperties; +import io.debezium.config.Configuration; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.inject.Produces; +import jakarta.inject.Inject; +import org.apache.commons.lang3.StringUtils; + +import java.io.File; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; + +/** + * @author smallGod + * @date: 17/10/2024 + */ +@ApplicationScoped +public class DebeziumConfigProducer { + + private static final String DEBEZIUM_DIR = "debezium"; + private static final String OFFSETS_FILE_SUFFIX = "_offsets.dat"; + private static final String SCHEMA_HISTORY_FILE_SUFFIX = "_schema_history.dat"; + + private final DebeziumProperties properties; + private final String appDataDir; + + @Inject + public DebeziumConfigProducer(DebeziumProperties properties, Properties mainProperties) { + this.properties = properties; + this.appDataDir = mainProperties.getAppDataDir(); + } + + @Produces + public Configuration createDebeziumConfig() { + + File debeziumConfigDir = new File(appDataDir, DEBEZIUM_DIR); + ensureDirectoryExists(debeziumConfigDir); + + String serverId = properties.getDbServerId(); + File offsetsDataFile = new File(debeziumConfigDir, serverId + OFFSETS_FILE_SUFFIX); + File schemaHistoryDataFile = new File(debeziumConfigDir, serverId + SCHEMA_HISTORY_FILE_SUFFIX); + + Configuration.Builder configBuilder = Configuration.create(); + + // database-related properties + configBuilder + .with(DebeziumConstants.NAME, properties.getConnectorName()) + .with(DebeziumConstants.DB_HOST, properties.getDbHostname()) + .with(DebeziumConstants.DB_PORT, properties.getDbPort()) + .with(DebeziumConstants.DB_NAME, properties.getDbName()) + .with(DebeziumConstants.DB_USERNAME, properties.getDbUser()) + .with(DebeziumConstants.DB_PASSWORD, properties.getDbPassword()) + .with(DebeziumConstants.DB_SERVER_ID, properties.getDbServerId()) + .with(DebeziumConstants.DB_SERVER_NAME, properties.getDbServerName()) + .with(DebeziumConstants.DB_INCLUDE_LIST, properties.getDbIncludeList()) + .with(DebeziumConstants.DB_EXCLUDE_LIST, properties.getDbExcludeList()) + .with(DebeziumConstants.DATABASE_TIMEZONE, properties.getDatabaseTimeZone()) + .with(DebeziumConstants.TABLE_INCLUDE_LIST, prepareTableList(properties.getTableIncludeList())) + .with(DebeziumConstants.TABLE_EXCLUDE_LIST, prepareTableList(properties.getTableExcludeList())); + + // offset storage-related properties + configBuilder + .with(DebeziumConstants.OFFSET_STORAGE_FILE_NAME, offsetsDataFile.getAbsolutePath()) + .with(DebeziumConstants.OFFSET_STORAGE_DIR, properties.getOffsetStorageDir()) + .with(DebeziumConstants.OFFSET_STORAGE_IMPL, properties.getOffsetStorageImpl()) + .with(DebeziumConstants.OFFSET_FLUSH_INTERVAL_MS, properties.getOffsetFlushIntervalMs()) + .with(DebeziumConstants.OFFSET_FLUSH_TIMEOUT_MS, properties.getOffsetFlushTimeoutMs()) + .with(DebeziumConstants.OFFSET_FLUSH_SIZE, properties.getOffsetFlushSize()); + + // snapshot-related properties + configBuilder + .with(DebeziumConstants.SNAPSHOT_MODE, properties.getSnapshotMode()) + .with(DebeziumConstants.SNAPSHOT_LOCKING_MODE, properties.getSnapshotLockingMode()) + .with(DebeziumConstants.SNAPSHOT_FETCH_SIZE, properties.getSnapshotFetchSize()) + .with(DebeziumConstants.SNAPSHOT_INCLUDE_COLLECTION_LIST, properties.getSnapshotIncludeCollectionList()) + .with(DebeziumConstants.SNAPSHOT_EXCLUDE_COLLECTION_LIST, properties.getSnapshotExcludeCollectionList()) + .with(DebeziumConstants.SNAPSHOT_DELAY_MS, properties.getSnapshotDelayMs()); + + // connector-related properties + configBuilder + .with(DebeziumConstants.CONNECTOR_CLASS, properties.getConnectorClass()) + .with(DebeziumConstants.DATABASE_HISTORY_IMPL, properties.getDatabaseHistoryImpl()) + .with(DebeziumConstants.DATABASE_HISTORY_FILE_NAME, schemaHistoryDataFile.getAbsolutePath()); + + // heartbeat properties + configBuilder + .with(DebeziumConstants.HEARTBEAT_INTERVAL_MS, properties.getHeartbeatIntervalMs()) + .with(DebeziumConstants.HEARTBEAT_TOPICS_PREFIX, properties.getHeartbeatTopicsPrefix()); + + // event-related properties + configBuilder + .with(DebeziumConstants.MAX_BATCH_SIZE, properties.getMaxBatchSize()) + .with(DebeziumConstants.MAX_QUEUE_SIZE, properties.getMaxQueueSize()) + .with(DebeziumConstants.POLL_INTERVAL_MS, properties.getPollIntervalMs()) + .with(DebeziumConstants.SCHEMA_REFRESH_MODE, properties.getSchemaRefreshMode()) + .with(DebeziumConstants.TOMBSTONES_ON_DELETE, properties.isTombstonesOnDelete()) + .with(DebeziumConstants.PROVIDE_TRANSACTION_METADATA, properties.isProvideTransactionMetadata()); + + // error handling-related properties + configBuilder + .with(DebeziumConstants.MAX_RETRIES, properties.getMaxRetries()) + .with(DebeziumConstants.RETRY_DELAY_MS, properties.getRetryDelayMs()) + .with(DebeziumConstants.MAX_RETRY_DURATION_MS, properties.getMaxRetryDurationMs()); + + // additional general configurations + configBuilder + .with(DebeziumConstants.INCLUDE_SCHEMA_CHANGES, properties.isIncludeSchemaChanges()) + .with(DebeziumConstants.INCLUDE_QUERY, properties.isIncludeQuery()) + .with(DebeziumConstants.DECIMAL_HANDLING_MODE, properties.getDecimalHandlingMode()) + .with(DebeziumConstants.BINARY_HANDLING_MODE, properties.getBinaryHandlingMode()); + + return configBuilder.build(); + } + + /** + * Prefix tables to include with dbname if not prefixed + */ + private String prepareTableList(List tables) { + return Optional.ofNullable(tables) + .filter(t -> !t.isEmpty()) + .map(t -> { + String tablePrefix = StringUtils.isNotBlank(properties.getDbName()) ? properties.getDbName() + "." : ""; + return t.stream() + .map(table -> table.startsWith(tablePrefix) ? table : tablePrefix + table) + .collect(Collectors.joining(",")); + }) + .orElse(""); + } + + private void ensureDirectoryExists(File directory) { + if (!directory.exists() && !directory.mkdirs()) { + throw new IllegalStateException("Unable to create directory: " + directory.getAbsolutePath()); + } + } +} \ No newline at end of file diff --git a/api/src/main/java/org/openmrs/module/mambacore/db/debezium/DebeziumConstants.java b/api/src/main/java/org/openmrs/module/mambacore/db/debezium/DebeziumConstants.java new file mode 100644 index 00000000..3b9da314 --- /dev/null +++ b/api/src/main/java/org/openmrs/module/mambacore/db/debezium/DebeziumConstants.java @@ -0,0 +1,75 @@ +package com.ayinza.util.debezium.domain.model; + +/** + * @author smallGod + * @date: 17/10/2024 + */ +public class DebeziumConstants { + + // General properties + public static final String NAME = "name"; + + // Database-related properties + public static final String DB_HOST = "database.hostname"; + public static final String DB_PORT = "database.port"; + public static final String DB_NAME = "database.name"; + public static final String DB_USERNAME = "database.user"; + public static final String DB_PASSWORD = "database.password"; + public static final String DB_SERVER_ID = "database.server.id"; + public static final String DB_SERVER_NAME = "database.server.name"; + public static final String DB_INCLUDE_LIST = "database.include.list"; + public static final String DB_EXCLUDE_LIST = "database.exclude.list"; + public static final String DATABASE_TIMEZONE = "database.timezone"; + public static final String DATABASE_HISTORY = "database.history"; + + public static final String TABLE_INCLUDE_LIST = "table.include.list"; + public static final String TABLE_EXCLUDE_LIST = "table.exclude.list"; + public static final String COLUMN_BLACKLIST = "column.blacklist"; + public static final String COLUMN_WHITELIST = "column.whitelist"; + + // Offset storage-related properties + public static final String OFFSET_STORAGE_FILE_NAME = "offset.storage.file.name"; + public static final String OFFSET_STORAGE_DIR = "offset.storage.dir"; + public static final String OFFSET_STORAGE_IMPL = "offset.storage.impl"; + public static final String OFFSET_FLUSH_INTERVAL_MS = "offset.flush.interval.ms"; + public static final String OFFSET_FLUSH_TIMEOUT_MS = "offset.flush.timeout.ms"; + public static final String OFFSET_FLUSH_SIZE = "offset.flush.size"; + + // Snapshot-related properties + public static final String SNAPSHOT_MODE = "snapshot.mode"; + public static final String SNAPSHOT_LOCKING_MODE = "snapshot.locking.mode"; + public static final String SNAPSHOT_FETCH_SIZE = "snapshot.fetch.size"; + public static final String SNAPSHOT_INCLUDE_COLLECTION_LIST = "snapshot.include.collection.list"; + public static final String SNAPSHOT_EXCLUDE_COLLECTION_LIST = "snapshot.exclude.collection.list"; + public static final String SNAPSHOT_DELAY_MS = "snapshot.delay.ms"; + + // Connector-related properties + public static final String CONNECTOR_CLASS = "connector.class"; + public static final String DATABASE_HISTORY_IMPL = "database.history.impl"; + public static final String DATABASE_HISTORY_FILE_NAME = "database.history.file.name"; + + // Heartbeat properties + public static final String HEARTBEAT_INTERVAL_MS = "heartbeat.interval.ms"; + public static final String HEARTBEAT_TOPICS_PREFIX = "heartbeat.topics.prefix"; + + // Event-related properties + public static final String MAX_BATCH_SIZE = "max.batch.size"; + public static final String MAX_QUEUE_SIZE = "max.queue.size"; + public static final String POLL_INTERVAL_MS = "poll.interval.ms"; + public static final String SCHEMA_REFRESH_MODE = "schema.refresh.mode"; + public static final String TOMBSTONES_ON_DELETE = "tombstones.on.delete"; + public static final String PROVIDE_TRANSACTION_METADATA = "provide.transaction.metadata"; + + // Error handling-related properties + public static final String MAX_RETRIES = "max.retries"; + public static final String RETRY_DELAY_MS = "retry.delay.ms"; + public static final String MAX_RETRY_DURATION_MS = "max.retry.duration.ms"; + + // Additional general configurations + public static final String INCLUDE_SCHEMA_CHANGES = "include.schema.changes"; + public static final String INCLUDE_QUERY = "include.query"; + public static final String DECIMAL_HANDLING_MODE = "decimal.handling.mode"; + public static final String BINARY_HANDLING_MODE = "binary.handling.mode"; + + // Other constants can be added here as needed +} diff --git a/api/src/main/java/org/openmrs/module/mambacore/db/debezium/DebeziumConsumer.java b/api/src/main/java/org/openmrs/module/mambacore/db/debezium/DebeziumConsumer.java new file mode 100644 index 00000000..29822b93 --- /dev/null +++ b/api/src/main/java/org/openmrs/module/mambacore/db/debezium/DebeziumConsumer.java @@ -0,0 +1,77 @@ +package com.ayinza.util.debezium.application.service; + +import com.ayinza.utils.application.model.debezium.DbChangeToEvent; +import com.ayinza.utils.domain.model.debezium.DbEvent; +import com.ayinza.utils.domain.model.debezium.DbEventStatus; +import com.ayinza.utils.domain.model.debezium.EventConsumer; +import io.debezium.engine.ChangeEvent; +import jakarta.inject.Inject; +import org.apache.kafka.connect.source.SourceRecord; +import org.apache.logging.log4j.Logger; + +import java.util.function.Consumer; +import java.util.function.Function; + +/** + * Implementation of a Debezium ChangeEvent consumer, which abstracts the Debezium API behind a DbEvent + * and ensures that the registered DbEvent EventConsumer is successfully processed before moving onto the next + * record, with a configurable retryInterval upon failure. + */ +public class DebeziumConsumer implements Consumer> { + + private final DbEventSourceConfig eventSourceConfig; + @Inject + private final EventConsumer eventConsumer; + @Inject + private Logger logger; + private boolean stopped = false; + private boolean disabled = false; + + private Function, DbEvent> function = new DbChangeToEvent(); + + public DebeziumConsumer(EventConsumer eventConsumer, DbEventSourceConfig eventSourceConfig) { + this.eventConsumer = eventConsumer; + this.eventSourceConfig = eventSourceConfig; + } + + /** + * This the primary handler for all Debezium-generated change events. Per the + * Debezium Documentation + * this function should not throw any exceptions, as these will simply get logged and Debezium will continue onto + * the next source record. So if any exception is caught, this logs the Exception, and retries again after + * a configurable retryInterval, until it passes. This effectively blocks any subsequent processing. + * + * @param changeEvent the Debeziumn generated event to process + */ + @Override + public final void accept(ChangeEvent changeEvent) { + + DbEventStatus status = null; + try { + if (disabled) { + logger.error("The Debezium consumer has been stopped prior to processing: " + changeEvent); + return; + } + + DbEvent dbEvent = function.apply(changeEvent); + logger.debug("Notifying listener of the database event: " + dbEvent); + + eventConsumer.accept(dbEvent); + + status = DbEventLog.logger(event); + status.setProcessed(true); + } catch (Throwable t) { + logger.error("An error occurred processing change event: " + changeEvent, t); + if (status != null) { + status.setError(t); + } + //TODO Do not disable in case of a snapshot event + disabled = true; + CustomFileOffsetBackingStore.disable(); + } + } + + public void cancel() { + this.disabled = true; + } +} diff --git a/api/src/main/java/org/openmrs/module/mambacore/db/debezium/DebeziumEventService.java b/api/src/main/java/org/openmrs/module/mambacore/db/debezium/DebeziumEventService.java deleted file mode 100644 index bf4450bd..00000000 --- a/api/src/main/java/org/openmrs/module/mambacore/db/debezium/DebeziumEventService.java +++ /dev/null @@ -1,54 +0,0 @@ -package org.openmrs.module.mambacore.db.debezium; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Map; - -public class DebeziumEventService { - - private static final Logger log = LoggerFactory.getLogger(DebeziumEventService.class); - - public DebeziumEventService() { - } - - public void processDebeziumEvent(Map changeEvent) { - // Similar logic to handle insert, update, and delete events - // Extract and process data from the event map - Map payload = (Map) changeEvent.get("payload"); - if (payload == null) { - return; - } - System.out.println("Payload: " + payload); - - String operation = (String) payload.get("op"); - Map source = (Map) payload.get("source"); - String databaseName = (String) source.get("db"); - String tableName = (String) source.get("table"); - - Map before = (Map) payload.get("before"); - Map after = (Map) payload.get("after"); - - switch (operation) { - - case "c": // INSERT - System.out.println("INSERT event captured on " + databaseName + "." + tableName); - System.out.println("New row data: " + after); - break; - - case "u": // UPDATE - System.out.println("UPDATE event captured on " + databaseName + "." + tableName); - System.out.println("Before: " + before); - System.out.println("After: " + after); - break; - - case "d": // DELETE - System.out.println("DELETE event captured on " + databaseName + "." + tableName); - System.out.println("Deleted row data: " + before); - break; - - default: - System.out.println("Unknown event type: " + operation); - } - } -} \ No newline at end of file diff --git a/api/src/main/java/org/openmrs/module/mambacore/db/debezium/DebeziumListener.java b/api/src/main/java/org/openmrs/module/mambacore/db/debezium/DebeziumListener.java deleted file mode 100644 index 59cb1b54..00000000 --- a/api/src/main/java/org/openmrs/module/mambacore/db/debezium/DebeziumListener.java +++ /dev/null @@ -1,67 +0,0 @@ -package org.openmrs.module.mambacore.db.debezium; - -import com.fasterxml.jackson.databind.ObjectMapper; -import io.debezium.config.Configuration; -import io.debezium.engine.ChangeEvent; -import io.debezium.engine.DebeziumEngine; -import io.debezium.engine.format.Json; - -import java.util.Map; - -public class DebeziumListener { - - private final DebeziumEventService debeziumEventService; - private final ObjectMapper objectMapper; - - public DebeziumListener() { - this.debeziumEventService = new DebeziumEventService(); - this.objectMapper = new ObjectMapper(); // Initialize Jackson ObjectMapper for JSON parsing - } - - public void startListening() { - // Step 1: Get the configuration from DebeziumConfigProvider - Configuration config = DebeziumConfigProvider - .getInstance() - .build(); - - // Step 2: Set up the Debezium Engine to capture JSON events - DebeziumEngine> engine = DebeziumEngine.create(Json.class) - .using(config.asProperties()) - .notifying(this::handleDebeziumEvent) - .build(); - - // Step 3: Run the Debezium Engine in a separate thread -// ExecutorService executor = Executors.newSingleThreadExecutor(); -// executor.submit(engine); - - // Step 4: Gracefully shutdown the engine on JVM exit - Runtime.getRuntime().addShutdownHook(new Thread(() -> { - try { - engine.close(); - } catch (Exception e) { - e.printStackTrace(); - } - })); - System.out.println("Debezium engine started..."); - } - - private void handleDebeziumEvent(ChangeEvent event) { - System.out.println("Received event: " + event); - try { - // Step 5: Parse the JSON string from the event - String jsonString = event.value(); - if (jsonString == null) { - return; // Ignore empty events - } - - // Convert the JSON string into a Map using Jackson ObjectMapper - Map changeEvent = objectMapper.readValue(jsonString, Map.class); - - // Step 6: Delegate the event to the service for transactional handling - debeziumEventService.processDebeziumEvent(changeEvent); - - } catch (Exception e) { - e.printStackTrace(); - } - } -} \ No newline at end of file diff --git a/api/src/main/java/org/openmrs/module/mambacore/db/debezium/DebeziumProperties.java b/api/src/main/java/org/openmrs/module/mambacore/db/debezium/DebeziumProperties.java new file mode 100644 index 00000000..3a107db6 --- /dev/null +++ b/api/src/main/java/org/openmrs/module/mambacore/db/debezium/DebeziumProperties.java @@ -0,0 +1,105 @@ +package com.ayinza.util.debezium.domain.model; + +import java.util.List; + +/** + * @author smallGod + * @date: 17/10/2024 + */ +public interface DebeziumProperties { + + String getConnectorName(); + + // Offset storage-related properties + String getOffsetStorageFileName(); + + String getOffsetStorageDir(); + + String getOffsetStorageImpl(); + + int getOffsetFlushIntervalMs(); + + int getOffsetFlushTimeoutMs(); + + int getOffsetFlushSize(); + + // Snapshot-related properties + String getSnapshotMode(); + + String getSnapshotLockingMode(); + + int getSnapshotFetchSize(); + + String getSnapshotIncludeCollectionList(); + + String getSnapshotExcludeCollectionList(); + + long getSnapshotDelayMs(); + + // Connector-related properties + String getConnectorClass(); + + String getDatabaseHistoryImpl(); + + String getDatabaseHistoryFileName(); + + // Heartbeat properties + long getHeartbeatIntervalMs(); + + String getHeartbeatTopicsPrefix(); + + // Event-related properties + int getMaxBatchSize(); + + int getMaxQueueSize(); + + long getPollIntervalMs(); + + String getSchemaRefreshMode(); + + boolean isTombstonesOnDelete(); + + boolean isProvideTransactionMetadata(); + + // Database-related properties + String getDbHostname(); + + int getDbPort(); + + String getDbName(); + + String getDbUser(); + + String getDbPassword(); + + String getDbServerId(); + + String getDbServerName(); + + String getDbIncludeList(); + + String getDbExcludeList(); + + List getTableIncludeList(); + + List getTableExcludeList(); + + // Timezone-related properties + String getDatabaseTimeZone(); + + // Error handling-related properties + int getMaxRetries(); + + long getRetryDelayMs(); + + long getMaxRetryDurationMs(); + + // Additional general configurations + boolean isIncludeSchemaChanges(); + + boolean isIncludeQuery(); + + String getDecimalHandlingMode(); + + String getBinaryHandlingMode(); +} \ No newline at end of file diff --git a/api/src/main/java/org/openmrs/module/mambacore/db/debezium/DebeziumPropertiesImpl.java b/api/src/main/java/org/openmrs/module/mambacore/db/debezium/DebeziumPropertiesImpl.java new file mode 100644 index 00000000..6c9a621a --- /dev/null +++ b/api/src/main/java/org/openmrs/module/mambacore/db/debezium/DebeziumPropertiesImpl.java @@ -0,0 +1,413 @@ +package com.ayinza.util.debezium.application.model; + +import com.ayinza.utils.domain.model.debezium.DebeziumProperties; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; +import org.eclipse.microprofile.config.inject.ConfigProperty; + +import java.util.List; + +/** + * Implementation of DebeziumProperties to fetch configuration from MicroProfile Config. + * + * @author smallGod + * @date: 15/09/2024 + */ +@ApplicationScoped +public class DebeziumPropertiesImpl implements DebeziumProperties { + + @Inject + @ConfigProperty(name = "name") + private String connectorName; + + // Database connection properties + @Inject + @ConfigProperty(name = "database.hostname") + private String dbHostname; + + @Inject + @ConfigProperty(name = "database.port") + private int dbPort; + + @Inject + @ConfigProperty(name = "database.name") + private String dbName; + + @Inject + @ConfigProperty(name = "database.user") + private String dbUser; + + @Inject + @ConfigProperty(name = "database.password") + private String dbPassword; + + @Inject + @ConfigProperty(name = "database.server.id", defaultValue = "85744") + private String dbServerId;// Server ID for MySQL replication + + @Inject + @ConfigProperty(name = "database.server.name", defaultValue = "debezium-app-connector") + private String dbServerName; + + @Inject + @ConfigProperty(name = "database.include.list", defaultValue = "") + private String dbIncludeList; + + @Inject + @ConfigProperty(name = "database.exclude.list", defaultValue = "") + private String dbExcludeList; + + @Inject + @ConfigProperty(name = "database.timezone", defaultValue = "UTC") + private String databaseTimeZone; + + @Inject + @ConfigProperty(name = "table.include.list", defaultValue = "") + private List tableIncludeList; + + @Inject + @ConfigProperty(name = "table.exclude.list", defaultValue = "") + private List tableExcludeList; + + // Offset storage-related properties + @Inject + @ConfigProperty(name = "offset.storage.file.name", defaultValue = "offsets.dat") + private String offsetStorageFileName; + + @Inject + @ConfigProperty(name = "offset.storage.dir", defaultValue = "/tmp") + private String offsetStorageDir; + + @Inject + @ConfigProperty(name = "offset.storage.impl", defaultValue = "org.apache.kafka.connect.storage.FileOffsetBackingStore") + private String offsetStorageImpl; + + @Inject + @ConfigProperty(name = "offset.flush.interval.ms", defaultValue = "60000") + private int offsetFlushIntervalMs;// 60 seconds: interval at which Debezium will persist the current offset position + + @Inject + @ConfigProperty(name = "offset.flush.timeout.ms", defaultValue = "15000") + private int offsetFlushTimeoutMs; + + @Inject + @ConfigProperty(name = "offset.flush.size", defaultValue = "1000") + private int offsetFlushSize; + + // Snapshot-related properties + @Inject + @ConfigProperty(name = "snapshot.mode", defaultValue = "when_needed") + private String snapshotMode; + + @Inject + @ConfigProperty(name = "snapshot.locking.mode", defaultValue = "none") + private String snapshotLockingMode; + + @Inject + @ConfigProperty(name = "snapshot.fetch.size", defaultValue = "10000") + private int snapshotFetchSize; + + @Inject + @ConfigProperty(name = "snapshot.include.collection.list", defaultValue = "") + private String snapshotIncludeCollectionList;// Specify tables to include during snapshot + + @Inject + @ConfigProperty(name = "snapshot.exclude.collection.list", defaultValue = "") + private String snapshotExcludeCollectionList; + + @Inject + @ConfigProperty(name = "snapshot.delay.ms", defaultValue = "0") + private long snapshotDelayMs; + + // Connector-related properties + @Inject + @ConfigProperty(name = "connector.class", defaultValue = "io.debezium.connector.mysql.MySqlConnector") + private String connectorClass; + + @Inject + @ConfigProperty(name = "database.history.impl", defaultValue = "io.debezium.relational.history.FileDatabaseHistory") + private String databaseHistoryImpl; + + @Inject + @ConfigProperty(name = "database.history.file.name", defaultValue = "dbhistory.dat") + private String databaseHistoryFileName; + + // Heartbeat properties + @Inject + @ConfigProperty(name = "heartbeat.interval.ms", defaultValue = "0") + private long heartbeatIntervalMs;// No heartbeat by default + + @Inject + @ConfigProperty(name = "heartbeat.topics.prefix", defaultValue = "__debezium-heartbeat") + private String heartbeatTopicsPrefix; + + // Event-related properties + @Inject + @ConfigProperty(name = "max.batch.size", defaultValue = "2048") + private int maxBatchSize; + + @Inject + @ConfigProperty(name = "max.queue.size", defaultValue = "8192") + private int maxQueueSize; + + @Inject + @ConfigProperty(name = "poll.interval.ms", defaultValue = "500") + private long pollIntervalMs; + + @Inject + @ConfigProperty(name = "schema.refresh.mode", defaultValue = "columns_diff") + private String schemaRefreshMode;// How schema changes are handled, can also be 'columns_diff_exclude_unchanged_toast' + + @Inject + @ConfigProperty(name = "tombstones.on.delete", defaultValue = "false") + private boolean tombstonesOnDelete; + + @Inject + @ConfigProperty(name = "provide.transaction.metadata", defaultValue = "false") + private boolean provideTransactionMetadata; + + // Error handling-related properties + @Inject + @ConfigProperty(name = "max.retries", defaultValue = "10") + private int maxRetries;// Max retry attempts for failed tasks + + @Inject + @ConfigProperty(name = "retry.delay.ms", defaultValue = "1000") + private long retryDelayMs; + + @Inject + @ConfigProperty(name = "max.retry.duration.ms", defaultValue = "60000") + private long maxRetryDurationMs; + + // Additional general configurations + @Inject + @ConfigProperty(name = "include.schema.changes", defaultValue = "true") + private boolean includeSchemaChanges; + + @Inject + @ConfigProperty(name = "include.query", defaultValue = "false") + private boolean includeQuery; + + @Inject + @ConfigProperty(name = "decimal.handling.mode", defaultValue = "precise") + private String decimalHandlingMode;// How to handle decimal types: 'precise', 'string', 'double' + + @Inject + @ConfigProperty(name = "binary.handling.mode", defaultValue = "bytes") + private String binaryHandlingMode;// How to handle binary fields: 'bytes', 'base64' + + @Override + public String getConnectorName() { + return connectorName; // Return your desired connector name + } + + @Override + public String getDbHostname() { + return dbHostname; + } + + @Override + public int getDbPort() { + return dbPort; + } + + @Override + public String getDbName() { + return dbName; + } + + @Override + public String getDbUser() { + return dbUser; + } + + @Override + public String getDbPassword() { + return dbPassword; + } + + @Override + public String getDbServerId() { + return dbServerId; + } + + @Override + public String getDbServerName() { + return dbServerName; + } + + @Override + public String getDbIncludeList() { + return dbIncludeList; + } + + @Override + public String getDbExcludeList() { + return dbExcludeList; + } + + @Override + public String getDatabaseTimeZone() { + return databaseTimeZone; + } + + @Override + public List getTableIncludeList() { + return tableIncludeList; + } + + @Override + public List getTableExcludeList() { + return tableExcludeList; + } + + @Override + public String getOffsetStorageFileName() { + return offsetStorageFileName; + } + + @Override + public String getOffsetStorageDir() { + return offsetStorageDir; + } + + @Override + public String getOffsetStorageImpl() { + return offsetStorageImpl; + } + + @Override + public int getOffsetFlushIntervalMs() { + return offsetFlushIntervalMs; + } + + @Override + public int getOffsetFlushTimeoutMs() { + return offsetFlushTimeoutMs; + } + + @Override + public int getOffsetFlushSize() { + return offsetFlushSize; + } + + @Override + public String getSnapshotMode() { + return snapshotMode; + } + + @Override + public String getSnapshotLockingMode() { + return snapshotLockingMode; + } + + @Override + public int getSnapshotFetchSize() { + return snapshotFetchSize; + } + + @Override + public String getSnapshotIncludeCollectionList() { + return snapshotIncludeCollectionList; + } + + @Override + public String getSnapshotExcludeCollectionList() { + return snapshotExcludeCollectionList; + } + + @Override + public long getSnapshotDelayMs() { + return snapshotDelayMs; + } + + @Override + public String getConnectorClass() { + return connectorClass; + } + + @Override + public String getDatabaseHistoryImpl() { + return databaseHistoryImpl; + } + + @Override + public String getDatabaseHistoryFileName() { + return databaseHistoryFileName; + } + + @Override + public long getHeartbeatIntervalMs() { + return heartbeatIntervalMs; + } + + @Override + public String getHeartbeatTopicsPrefix() { + return heartbeatTopicsPrefix; + } + + @Override + public int getMaxBatchSize() { + return maxBatchSize; + } + + @Override + public int getMaxQueueSize() { + return maxQueueSize; + } + + @Override + public long getPollIntervalMs() { + return pollIntervalMs; + } + + @Override + public String getSchemaRefreshMode() { + return schemaRefreshMode; + } + + @Override + public String getDecimalHandlingMode() { + return decimalHandlingMode; + } + + @Override + public String getBinaryHandlingMode() { + return binaryHandlingMode; + } + + @Override + public boolean isTombstonesOnDelete() { + return tombstonesOnDelete; + } + + @Override + public boolean isProvideTransactionMetadata() { + return provideTransactionMetadata; + } + + @Override + public int getMaxRetries() { + return maxRetries; + } + + @Override + public long getRetryDelayMs() { + return retryDelayMs; + } + + @Override + public long getMaxRetryDurationMs() { + return maxRetryDurationMs; + } + + @Override + public boolean isIncludeSchemaChanges() { + return includeSchemaChanges; + } + + @Override + public boolean isIncludeQuery() { + return includeQuery; + } +} \ No newline at end of file diff --git a/api/src/main/java/org/openmrs/module/mambacore/db/debezium/EventConsumer.java b/api/src/main/java/org/openmrs/module/mambacore/db/debezium/EventConsumer.java new file mode 100644 index 00000000..4600fd90 --- /dev/null +++ b/api/src/main/java/org/openmrs/module/mambacore/db/debezium/EventConsumer.java @@ -0,0 +1,21 @@ +package com.ayinza.util.debezium.domain.model; + +import java.util.function.Consumer; + +/** + * Implement this interface to provide logic for handing Events from a DbEventSource + */ +public interface EventConsumer extends Consumer { + + /** + * Executed at startup (prior to any Event processing) + */ + default void preStartup() { + } + + /** + * Executed at shutdown + */ + default void preShutdown() { + } +} diff --git a/api/src/main/java/org/openmrs/module/mambacore/db/debezium/MyEventConsumer.java b/api/src/main/java/org/openmrs/module/mambacore/db/debezium/MyEventConsumer.java new file mode 100644 index 00000000..e69de29b diff --git a/api/src/main/java/org/openmrs/module/mambacore/db/debezium/ObjectMap.java b/api/src/main/java/org/openmrs/module/mambacore/db/debezium/ObjectMap.java new file mode 100644 index 00000000..c1474fb4 --- /dev/null +++ b/api/src/main/java/org/openmrs/module/mambacore/db/debezium/ObjectMap.java @@ -0,0 +1,55 @@ +package com.ayinza.util.debezium.domain.model; + +import com.ayinza.utils.domain.model.Contract; + + +import java.util.Map; + +/** + * Simple HashMap extension that contains utility methods for retrieving / converting values to certain types + */ +public interface ObjectMap extends Contract, Map { + + /** + * @return the value with the given key cast as an Integer + */ + default Integer getInteger(String key) { + return (Integer) get(key); + } + + /** + * @return the value with the given key cast as a Long + */ + default Long getLong(String key) { + return (Long) get(key); + } + + /** + * @return the toString representation of the value with the given key, or null if not found + */ + default String getString(String key) { + Object value = get(key); + return value == null ? null : value.toString(); + } + + /** + * @return the value with the given key as a cast or parsed boolean value + */ + default Boolean getBoolean(String key) { + Object value = get(key); + return switch (value) { + case null -> null; + case Boolean bool -> bool; + case Number number -> number.intValue() == 1; + default -> Boolean.parseBoolean(value.toString()); + }; + } + + /** + * @return the value with the given key as a cast or parsed boolean value, or the defaultValue if null + */ + default boolean getBoolean(String key, boolean defaultValue) { + Boolean value = getBoolean(key); + return value == null ? defaultValue : value; + } +} \ No newline at end of file diff --git a/api/src/main/java/org/openmrs/module/mambacore/db/debezium/ObjectMapImpl.java b/api/src/main/java/org/openmrs/module/mambacore/db/debezium/ObjectMapImpl.java new file mode 100644 index 00000000..909d9210 --- /dev/null +++ b/api/src/main/java/org/openmrs/module/mambacore/db/debezium/ObjectMapImpl.java @@ -0,0 +1,33 @@ +package com.ayinza.util.debezium.application.model; + +import com.ayinza.utils.domain.model.debezium.ObjectMap; +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Struct; + +import java.util.HashMap; + +public class ObjectMapImpl extends HashMap implements ObjectMap { + + public ObjectMapImpl() { + super(); + } + + /** + * Constructs a new ObjectMap from the given Struct, using the fields of the schema as keys + * + * @param struct the struct to convert to an ObjectMap + */ + public ObjectMapImpl(Struct struct) { + this(); + if (struct != null && struct.schema() != null) { + for (Field field : struct.schema().fields()) { + put(field.name(), struct.get(field)); + } + } + } + + @Override + public ObjectMapImpl fromContract() { + return this; + } +} diff --git a/api/src/main/java/org/openmrs/module/mambacore/db/debezium/OffsetUtils.java b/api/src/main/java/org/openmrs/module/mambacore/db/debezium/OffsetUtils.java new file mode 100644 index 00000000..86e9249d --- /dev/null +++ b/api/src/main/java/org/openmrs/module/mambacore/db/debezium/OffsetUtils.java @@ -0,0 +1,45 @@ +package com.ayinza.util.debezium.application.service; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; +import org.slf4j.Logger; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Map; + +@ApplicationScoped +public class OffsetUtils { + + @Inject + private Logger logger; + + /** + * Transforms the specified offset file to match the new expected structure after the debezium and + * kafka upgrades in previous versions. + * + * @param offset the offset data + * @throws IOException + */ + public void transformOffsetIfNecessary(Map offset) + throws IOException { + + if (offset.isEmpty()) { + logger.debug("No existing offset file found, skipping offset transformation check"); + return; + } + + ObjectMapper mapper = new ObjectMapper(); + ByteBuffer keyByteBuf = offset.keySet().iterator().next(); + ByteBuffer valueByteBuf = offset.get(keyByteBuf); + JsonNode keyNode = mapper.readValue(keyByteBuf.array(), JsonNode.class); + if (keyNode.isObject()) { + logger.info("Transforming offset to structure that conforms to the new kafka API"); + offset.remove(keyByteBuf); + byte[] newKeyBytes = mapper.writeValueAsBytes(keyNode.get("payload")); + offset.put(ByteBuffer.wrap(newKeyBytes), valueByteBuf); + } + } +} diff --git a/api/src/main/java/org/openmrs/module/mambacore/db/debezium/DebeziumConfigProvider.java b/api/src/main/java/org/openmrs/module/mambacore/task/DebeziumConfigProvider.java similarity index 72% rename from api/src/main/java/org/openmrs/module/mambacore/db/debezium/DebeziumConfigProvider.java rename to api/src/main/java/org/openmrs/module/mambacore/task/DebeziumConfigProvider.java index 07953c49..32ff7350 100644 --- a/api/src/main/java/org/openmrs/module/mambacore/db/debezium/DebeziumConfigProvider.java +++ b/api/src/main/java/org/openmrs/module/mambacore/task/DebeziumConfigProvider.java @@ -12,29 +12,38 @@ import io.debezium.config.Configuration; import org.openmrs.module.mambacore.util.MambaETLProperties; +import java.io.File; +import java.io.IOException; import java.util.stream.Collectors; /** * Provides configuration for Debezium connector */ public class DebeziumConfigProvider { - + private static final String MYSQL_CONNECTOR_CLASS = "io.debezium.connector.mysql.MySqlConnector"; - + + private static final String FILE_DB_HISTORY = "io.debezium.relational.history.FileDatabaseHistory"; + private static final String CONNECTOR_NAME = "mamba-core-connector"; - + private static final String SERVER_NAME = "openmrsDbServer"; - + private static volatile DebeziumConfigProvider instance = null; - + private static Configuration config; - + private final MambaETLProperties props = MambaETLProperties.getInstance(); - + private DebeziumConfigProvider() { - initializeConfig(); + try { + initializeConfig(); + } + catch (IOException e) { + System.err.println("Error initializing Debezium configuration: " + e.getMessage()); + } } - + public static DebeziumConfigProvider getInstance() { if (instance == null) { synchronized (DebeziumConfigProvider.class) { @@ -45,25 +54,29 @@ public static DebeziumConfigProvider getInstance() { } return instance; } - - private void initializeConfig() { + + private void initializeConfig() throws IOException { String source = props.getOpenmrsDatabase(); - + File dbHistoryTempFile = File.createTempFile("dbhistory_", ".dat"); + config = Configuration.create().with("name", CONNECTOR_NAME).with("connector.class", MYSQL_CONNECTOR_CLASS) .with("database.server.id", props.getSourceDatabaseServerId()).with("database.server.name", SERVER_NAME) .with("database.hostname", props.getSourceDatabaseHost()) - .with("database.port", props.getSourceDatabasePort()).with("database.user", props.getMambaETLuser()) - .with("database.password", props.getMambaETLuserPassword()).with("database.include.list", source) + .with("database.port", props.getSourceDatabasePort()).with("database.user", "root") + .with("database.password", "4#edRmgaF+k?") + .with("database.include.list", source) //.with("table.include.list", getTableIncludeList(source)) - .with("database.history.file.filename", props.getHistoryFilePath()).build(); + .with("table.include.list", "openmrs.person, openmrs.patient, openmrs.obs") + .with("database.history.file.filename", dbHistoryTempFile.getAbsolutePath()) + .with("database.history", FILE_DB_HISTORY).with("include.schema.changes", "true").build(); } - + private String getTableIncludeList(String source) { return props.getTablesWithChangesToStream().stream() .map(table -> source + "." + table) .collect(Collectors.joining(",")); } - + public Configuration build() { return config; } From bb981dc909b3a5ceb44ee7871d33abd9898513c7 Mon Sep 17 00:00:00 2001 From: smallgod Date: Fri, 18 Oct 2024 09:38:56 +0300 Subject: [PATCH 4/4] Setup Debezium with offsets --- api/pom.xml | 25 +-- .../api/dao/impl/JdbcFlattenDatabaseDao.java | 17 +- .../CustomFileOffsetBackingStore.java | 13 +- .../db/debezium/DbChangeToEvent.java | 15 +- .../module/mambacore/db/debezium/DbEvent.java | 14 +- .../mambacore/db/debezium/DbEventStatus.java | 11 +- .../mambacore/db/debezium/DbOperation.java | 22 +-- .../mambacore/db/debezium/DbSnapshot.java | 20 +-- .../db/debezium/DebeziumConfigProducer.java | 24 ++- .../db/debezium/DebeziumConstants.java | 13 +- .../db/debezium/DebeziumConsumer.java | 16 +- .../db/debezium/DebeziumProperties.java | 15 +- .../db/debezium/DebeziumPropertiesImpl.java | 170 +++++------------- .../mambacore/db/debezium/EventConsumer.java | 2 +- .../db/debezium/MyEventConsumer.java | 14 ++ .../mambacore/db/debezium/ObjectMap.java | 7 +- .../mambacore/db/debezium/ObjectMapImpl.java | 8 +- .../mambacore/db/debezium/OffsetUtils.java | 9 +- .../task/DebeziumConfigProvider.java | 2 +- .../mambacore/util/MambaETLProperties.java | 2 +- .../xf_system/sp_mamba_dim_table_insert.sql | 2 +- pom.xml | 158 ++++++++-------- 22 files changed, 277 insertions(+), 302 deletions(-) diff --git a/api/pom.xml b/api/pom.xml index 67a52e85..017d27c0 100644 --- a/api/pom.xml +++ b/api/pom.xml @@ -73,19 +73,24 @@ commons-dbcp2 - - io.debezium - debezium-embedded - + + + + - - io.debezium - debezium-api - + + + + + + + + + - io.debezium - debezium-connector-mysql + org.openmrs.module + dbevent-api diff --git a/api/src/main/java/org/openmrs/module/mambacore/api/dao/impl/JdbcFlattenDatabaseDao.java b/api/src/main/java/org/openmrs/module/mambacore/api/dao/impl/JdbcFlattenDatabaseDao.java index 293cc362..a2fef3ce 100644 --- a/api/src/main/java/org/openmrs/module/mambacore/api/dao/impl/JdbcFlattenDatabaseDao.java +++ b/api/src/main/java/org/openmrs/module/mambacore/api/dao/impl/JdbcFlattenDatabaseDao.java @@ -1,8 +1,11 @@ package org.openmrs.module.mambacore.api.dao.impl; +import org.openmrs.module.dbevent.DbEventSource; +import org.openmrs.module.dbevent.DbEventSourceConfig; +import org.openmrs.module.dbevent.EventContext; import org.openmrs.module.mambacore.api.dao.FlattenDatabaseDao; import org.openmrs.module.mambacore.db.ConnectionPoolManager; -import org.openmrs.module.mambacore.db.debezium.DebeziumListener; +import org.openmrs.module.mambacore.db.debezium.MyEventConsumer; import org.openmrs.module.mambacore.util.MambaETLProperties; import org.openmrs.module.mambacore.util.StringReplacerUtil; import org.slf4j.Logger; @@ -18,6 +21,7 @@ import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; +import java.util.Arrays; import java.util.HashMap; import java.util.Map; import java.util.stream.Collectors; @@ -30,7 +34,7 @@ public class JdbcFlattenDatabaseDao implements FlattenDatabaseDao { private static final String MYSQL_COMMENT_REGEX = "--.*(?=\\n)"; private static final String DELIMITER = "~-~-"; - private DebeziumListener debeziumListener = new DebeziumListener(); + //private DebeziumListener debeziumListener = new DebeziumListener(); /** * Deploy MambaETL stored procedures @@ -49,7 +53,14 @@ public void deployMambaEtl() { */ @Override public void streamInDatabaseChanges() { - debeziumListener.startListening(); + //debeziumListener.startListening(); + EventContext ctx = new EventContext(); + DbEventSourceConfig config = new DbEventSourceConfig(100002, "mamba-debezium", ctx); + config.configureTablesToInclude(Arrays.asList("obs", "patient", "encounter", "encounter_type", "location")); + DbEventSource eventSource = new DbEventSource(config); + MyEventConsumer consumer = new MyEventConsumer(); + eventSource.setEventConsumer(consumer); + eventSource.start(); } private void executeSqlScript(MambaETLProperties props) { diff --git a/api/src/main/java/org/openmrs/module/mambacore/db/debezium/CustomFileOffsetBackingStore.java b/api/src/main/java/org/openmrs/module/mambacore/db/debezium/CustomFileOffsetBackingStore.java index 49899532..9d5f5178 100644 --- a/api/src/main/java/org/openmrs/module/mambacore/db/debezium/CustomFileOffsetBackingStore.java +++ b/api/src/main/java/org/openmrs/module/mambacore/db/debezium/CustomFileOffsetBackingStore.java @@ -1,6 +1,14 @@ -package com.ayinza.util.debezium.application.service; +/** + * This Source Code Form is subject to the terms of the Mozilla Public License, + * v. 2.0. If a copy of the MPL was not distributed with this file, You can + * obtain one at http://mozilla.org/MPL/2.0/. OpenMRS is also distributed under + * the terms of the Healthcare Disclaimer located at http://openmrs.org/license. + *

+ * Copyright (C) OpenMRS Inc. OpenMRS is a registered trademark and the OpenMRS + * graphic logo is a trademark of OpenMRS Inc. + */ +package org.openmrs.module.mambacore.db.debezium; -import jakarta.inject.Inject; import org.apache.kafka.connect.json.JsonConverter; import org.apache.kafka.connect.json.JsonConverterConfig; import org.apache.kafka.connect.storage.FileOffsetBackingStore; @@ -20,7 +28,6 @@ public class CustomFileOffsetBackingStore extends FileOffsetBackingStore { private static final JsonConverter KEY_CONVERTER = new JsonConverter(); private static boolean disabled = false; - @Inject private OffsetUtils offsetUtils; public CustomFileOffsetBackingStore() { diff --git a/api/src/main/java/org/openmrs/module/mambacore/db/debezium/DbChangeToEvent.java b/api/src/main/java/org/openmrs/module/mambacore/db/debezium/DbChangeToEvent.java index 5405f4e8..c2b4e3a1 100644 --- a/api/src/main/java/org/openmrs/module/mambacore/db/debezium/DbChangeToEvent.java +++ b/api/src/main/java/org/openmrs/module/mambacore/db/debezium/DbChangeToEvent.java @@ -1,9 +1,14 @@ -package com.ayinza.util.debezium.application.model; +/** + * This Source Code Form is subject to the terms of the Mozilla Public License, + * v. 2.0. If a copy of the MPL was not distributed with this file, You can + * obtain one at http://mozilla.org/MPL/2.0/. OpenMRS is also distributed under + * the terms of the Healthcare Disclaimer located at http://openmrs.org/license. + *

+ * Copyright (C) OpenMRS Inc. OpenMRS is a registered trademark and the OpenMRS + * graphic logo is a trademark of OpenMRS Inc. + */ +package org.openmrs.module.mambacore.db.debezium; -import com.ayinza.utils.domain.model.debezium.DbEvent; -import com.ayinza.utils.domain.model.debezium.ObjectMap; -import com.ayinza.utils.domain.vo.DbOperation; -import com.ayinza.utils.domain.vo.DbSnapshot; import io.debezium.DebeziumException; import io.debezium.engine.ChangeEvent; import org.apache.kafka.connect.data.Struct; diff --git a/api/src/main/java/org/openmrs/module/mambacore/db/debezium/DbEvent.java b/api/src/main/java/org/openmrs/module/mambacore/db/debezium/DbEvent.java index 8ac4d454..42871506 100644 --- a/api/src/main/java/org/openmrs/module/mambacore/db/debezium/DbEvent.java +++ b/api/src/main/java/org/openmrs/module/mambacore/db/debezium/DbEvent.java @@ -1,7 +1,13 @@ -package com.ayinza.util.debezium.domain.model; - -import com.ayinza.utils.domain.vo.DbOperation; -import com.ayinza.utils.domain.vo.DbSnapshot; +/** + * This Source Code Form is subject to the terms of the Mozilla Public License, + * v. 2.0. If a copy of the MPL was not distributed with this file, You can + * obtain one at http://mozilla.org/MPL/2.0/. OpenMRS is also distributed under + * the terms of the Healthcare Disclaimer located at http://openmrs.org/license. + *

+ * Copyright (C) OpenMRS Inc. OpenMRS is a registered trademark and the OpenMRS + * graphic logo is a trademark of OpenMRS Inc. + */ +package org.openmrs.module.mambacore.db.debezium; public class DbEvent { diff --git a/api/src/main/java/org/openmrs/module/mambacore/db/debezium/DbEventStatus.java b/api/src/main/java/org/openmrs/module/mambacore/db/debezium/DbEventStatus.java index accc091e..036f8ef1 100644 --- a/api/src/main/java/org/openmrs/module/mambacore/db/debezium/DbEventStatus.java +++ b/api/src/main/java/org/openmrs/module/mambacore/db/debezium/DbEventStatus.java @@ -1,4 +1,13 @@ -package com.ayinza.util.debezium.domain.model; +/** + * This Source Code Form is subject to the terms of the Mozilla Public License, + * v. 2.0. If a copy of the MPL was not distributed with this file, You can + * obtain one at http://mozilla.org/MPL/2.0/. OpenMRS is also distributed under + * the terms of the Healthcare Disclaimer located at http://openmrs.org/license. + *

+ * Copyright (C) OpenMRS Inc. OpenMRS is a registered trademark and the OpenMRS + * graphic logo is a trademark of OpenMRS Inc. + */ +package org.openmrs.module.mambacore.db.debezium; import java.time.Instant; import java.util.Optional; diff --git a/api/src/main/java/org/openmrs/module/mambacore/db/debezium/DbOperation.java b/api/src/main/java/org/openmrs/module/mambacore/db/debezium/DbOperation.java index 0d8ce4a8..9991840d 100644 --- a/api/src/main/java/org/openmrs/module/mambacore/db/debezium/DbOperation.java +++ b/api/src/main/java/org/openmrs/module/mambacore/db/debezium/DbOperation.java @@ -1,10 +1,15 @@ -package com.ayinza.util.debezium.domain.model; - /** - * @author smallGod - * @date: 16/10/2024 + * This Source Code Form is subject to the terms of the Mozilla Public License, + * v. 2.0. If a copy of the MPL was not distributed with this file, You can + * obtain one at http://mozilla.org/MPL/2.0/. OpenMRS is also distributed under + * the terms of the Healthcare Disclaimer located at http://openmrs.org/license. + *

+ * Copyright (C) OpenMRS Inc. OpenMRS is a registered trademark and the OpenMRS + * graphic logo is a trademark of OpenMRS Inc. */ -public enum DbOperation implements ValueObject { +package org.openmrs.module.mambacore.db.debezium; + +public enum DbOperation { CREATE("c"), READ("r"), @@ -30,9 +35,4 @@ public static DbOperation convertToEnum(String value) public String getValue() { return this.value; } - - @Override - public boolean isSameAs(DbOperation other) { - return this.value.equalsIgnoreCase(other.value); - } -} +} \ No newline at end of file diff --git a/api/src/main/java/org/openmrs/module/mambacore/db/debezium/DbSnapshot.java b/api/src/main/java/org/openmrs/module/mambacore/db/debezium/DbSnapshot.java index a8bf0e23..17a6526f 100644 --- a/api/src/main/java/org/openmrs/module/mambacore/db/debezium/DbSnapshot.java +++ b/api/src/main/java/org/openmrs/module/mambacore/db/debezium/DbSnapshot.java @@ -1,10 +1,15 @@ -package com.ayinza.util.debezium.domain.model; - /** - * @author smallGod - * @date: 16/10/2024 + * This Source Code Form is subject to the terms of the Mozilla Public License, + * v. 2.0. If a copy of the MPL was not distributed with this file, You can + * obtain one at http://mozilla.org/MPL/2.0/. OpenMRS is also distributed under + * the terms of the Healthcare Disclaimer located at http://openmrs.org/license. + *

+ * Copyright (C) OpenMRS Inc. OpenMRS is a registered trademark and the OpenMRS + * graphic logo is a trademark of OpenMRS Inc. */ -public enum DbSnapshot implements ValueObject { +package org.openmrs.module.mambacore.db.debezium; + +public enum DbSnapshot { TRUE("true"), FALSE("false"), @@ -29,9 +34,4 @@ public static DbSnapshot convertToEnum(String value) public String getValue() { return this.value; } - - @Override - public boolean isSameAs(DbSnapshot other) { - return this.value.equalsIgnoreCase(other.value); - } } diff --git a/api/src/main/java/org/openmrs/module/mambacore/db/debezium/DebeziumConfigProducer.java b/api/src/main/java/org/openmrs/module/mambacore/db/debezium/DebeziumConfigProducer.java index 27a10028..4386fc77 100644 --- a/api/src/main/java/org/openmrs/module/mambacore/db/debezium/DebeziumConfigProducer.java +++ b/api/src/main/java/org/openmrs/module/mambacore/db/debezium/DebeziumConfigProducer.java @@ -1,12 +1,15 @@ -package com.ayinza.util.debezium.application.model; +/** + * This Source Code Form is subject to the terms of the Mozilla Public License, + * v. 2.0. If a copy of the MPL was not distributed with this file, You can + * obtain one at http://mozilla.org/MPL/2.0/. OpenMRS is also distributed under + * the terms of the Healthcare Disclaimer located at http://openmrs.org/license. + *

+ * Copyright (C) OpenMRS Inc. OpenMRS is a registered trademark and the OpenMRS + * graphic logo is a trademark of OpenMRS Inc. + */ +package org.openmrs.module.mambacore.db.debezium; -import com.ayinza.utils.domain.model.Properties; -import com.ayinza.utils.domain.model.debezium.DebeziumConstants; -import com.ayinza.utils.domain.model.debezium.DebeziumProperties; import io.debezium.config.Configuration; -import jakarta.enterprise.context.ApplicationScoped; -import jakarta.enterprise.inject.Produces; -import jakarta.inject.Inject; import org.apache.commons.lang3.StringUtils; import java.io.File; @@ -14,11 +17,6 @@ import java.util.Optional; import java.util.stream.Collectors; -/** - * @author smallGod - * @date: 17/10/2024 - */ -@ApplicationScoped public class DebeziumConfigProducer { private static final String DEBEZIUM_DIR = "debezium"; @@ -28,13 +26,11 @@ public class DebeziumConfigProducer { private final DebeziumProperties properties; private final String appDataDir; - @Inject public DebeziumConfigProducer(DebeziumProperties properties, Properties mainProperties) { this.properties = properties; this.appDataDir = mainProperties.getAppDataDir(); } - @Produces public Configuration createDebeziumConfig() { File debeziumConfigDir = new File(appDataDir, DEBEZIUM_DIR); diff --git a/api/src/main/java/org/openmrs/module/mambacore/db/debezium/DebeziumConstants.java b/api/src/main/java/org/openmrs/module/mambacore/db/debezium/DebeziumConstants.java index 3b9da314..20185ca3 100644 --- a/api/src/main/java/org/openmrs/module/mambacore/db/debezium/DebeziumConstants.java +++ b/api/src/main/java/org/openmrs/module/mambacore/db/debezium/DebeziumConstants.java @@ -1,9 +1,14 @@ -package com.ayinza.util.debezium.domain.model; - /** - * @author smallGod - * @date: 17/10/2024 + * This Source Code Form is subject to the terms of the Mozilla Public License, + * v. 2.0. If a copy of the MPL was not distributed with this file, You can + * obtain one at http://mozilla.org/MPL/2.0/. OpenMRS is also distributed under + * the terms of the Healthcare Disclaimer located at http://openmrs.org/license. + *

+ * Copyright (C) OpenMRS Inc. OpenMRS is a registered trademark and the OpenMRS + * graphic logo is a trademark of OpenMRS Inc. */ +package org.openmrs.module.mambacore.db.debezium; + public class DebeziumConstants { // General properties diff --git a/api/src/main/java/org/openmrs/module/mambacore/db/debezium/DebeziumConsumer.java b/api/src/main/java/org/openmrs/module/mambacore/db/debezium/DebeziumConsumer.java index 29822b93..eaa02a5f 100644 --- a/api/src/main/java/org/openmrs/module/mambacore/db/debezium/DebeziumConsumer.java +++ b/api/src/main/java/org/openmrs/module/mambacore/db/debezium/DebeziumConsumer.java @@ -1,13 +1,9 @@ -package com.ayinza.util.debezium.application.service; +package org.openmrs.module.mambacore.db.debezium; -import com.ayinza.utils.application.model.debezium.DbChangeToEvent; -import com.ayinza.utils.domain.model.debezium.DbEvent; -import com.ayinza.utils.domain.model.debezium.DbEventStatus; -import com.ayinza.utils.domain.model.debezium.EventConsumer; import io.debezium.engine.ChangeEvent; -import jakarta.inject.Inject; import org.apache.kafka.connect.source.SourceRecord; -import org.apache.logging.log4j.Logger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.function.Consumer; import java.util.function.Function; @@ -19,11 +15,11 @@ */ public class DebeziumConsumer implements Consumer> { + protected static final Logger logger = LoggerFactory.getLogger(DebeziumConsumer.class); + private final DbEventSourceConfig eventSourceConfig; - @Inject + private final EventConsumer eventConsumer; - @Inject - private Logger logger; private boolean stopped = false; private boolean disabled = false; diff --git a/api/src/main/java/org/openmrs/module/mambacore/db/debezium/DebeziumProperties.java b/api/src/main/java/org/openmrs/module/mambacore/db/debezium/DebeziumProperties.java index 3a107db6..1d6bd8c4 100644 --- a/api/src/main/java/org/openmrs/module/mambacore/db/debezium/DebeziumProperties.java +++ b/api/src/main/java/org/openmrs/module/mambacore/db/debezium/DebeziumProperties.java @@ -1,11 +1,16 @@ -package com.ayinza.util.debezium.domain.model; +/** + * This Source Code Form is subject to the terms of the Mozilla Public License, + * v. 2.0. If a copy of the MPL was not distributed with this file, You can + * obtain one at http://mozilla.org/MPL/2.0/. OpenMRS is also distributed under + * the terms of the Healthcare Disclaimer located at http://openmrs.org/license. + *

+ * Copyright (C) OpenMRS Inc. OpenMRS is a registered trademark and the OpenMRS + * graphic logo is a trademark of OpenMRS Inc. + */ +package org.openmrs.module.mambacore.db.debezium; import java.util.List; -/** - * @author smallGod - * @date: 17/10/2024 - */ public interface DebeziumProperties { String getConnectorName(); diff --git a/api/src/main/java/org/openmrs/module/mambacore/db/debezium/DebeziumPropertiesImpl.java b/api/src/main/java/org/openmrs/module/mambacore/db/debezium/DebeziumPropertiesImpl.java index 6c9a621a..5827dab3 100644 --- a/api/src/main/java/org/openmrs/module/mambacore/db/debezium/DebeziumPropertiesImpl.java +++ b/api/src/main/java/org/openmrs/module/mambacore/db/debezium/DebeziumPropertiesImpl.java @@ -1,200 +1,112 @@ -package com.ayinza.util.debezium.application.model; - -import com.ayinza.utils.domain.model.debezium.DebeziumProperties; -import jakarta.enterprise.context.ApplicationScoped; -import jakarta.inject.Inject; -import org.eclipse.microprofile.config.inject.ConfigProperty; +/** + * This Source Code Form is subject to the terms of the Mozilla Public License, + * v. 2.0. If a copy of the MPL was not distributed with this file, You can + * obtain one at http://mozilla.org/MPL/2.0/. OpenMRS is also distributed under + * the terms of the Healthcare Disclaimer located at http://openmrs.org/license. + *

+ * Copyright (C) OpenMRS Inc. OpenMRS is a registered trademark and the OpenMRS + * graphic logo is a trademark of OpenMRS Inc. + */ +package org.openmrs.module.mambacore.db.debezium; import java.util.List; -/** - * Implementation of DebeziumProperties to fetch configuration from MicroProfile Config. - * - * @author smallGod - * @date: 15/09/2024 - */ -@ApplicationScoped public class DebeziumPropertiesImpl implements DebeziumProperties { - @Inject - @ConfigProperty(name = "name") private String connectorName; // Database connection properties - @Inject - @ConfigProperty(name = "database.hostname") private String dbHostname; - @Inject - @ConfigProperty(name = "database.port") private int dbPort; - @Inject - @ConfigProperty(name = "database.name") private String dbName; - @Inject - @ConfigProperty(name = "database.user") private String dbUser; - @Inject - @ConfigProperty(name = "database.password") private String dbPassword; - @Inject - @ConfigProperty(name = "database.server.id", defaultValue = "85744") - private String dbServerId;// Server ID for MySQL replication + private String dbServerId = "85744";// Server ID for MySQL replication - @Inject - @ConfigProperty(name = "database.server.name", defaultValue = "debezium-app-connector") private String dbServerName; - @Inject - @ConfigProperty(name = "database.include.list", defaultValue = "") private String dbIncludeList; - @Inject - @ConfigProperty(name = "database.exclude.list", defaultValue = "") private String dbExcludeList; - @Inject - @ConfigProperty(name = "database.timezone", defaultValue = "UTC") - private String databaseTimeZone; + private String databaseTimeZone = "UTC"; - @Inject - @ConfigProperty(name = "table.include.list", defaultValue = "") private List tableIncludeList; - @Inject - @ConfigProperty(name = "table.exclude.list", defaultValue = "") private List tableExcludeList; // Offset storage-related properties - @Inject - @ConfigProperty(name = "offset.storage.file.name", defaultValue = "offsets.dat") - private String offsetStorageFileName; + private String offsetStorageFileName = "offsets.dat"; - @Inject - @ConfigProperty(name = "offset.storage.dir", defaultValue = "/tmp") - private String offsetStorageDir; + private String offsetStorageDir = "/tmp"; - @Inject - @ConfigProperty(name = "offset.storage.impl", defaultValue = "org.apache.kafka.connect.storage.FileOffsetBackingStore") - private String offsetStorageImpl; + private String offsetStorageImpl = "org.apache.kafka.connect.storage.FileOffsetBackingStore"; - @Inject - @ConfigProperty(name = "offset.flush.interval.ms", defaultValue = "60000") - private int offsetFlushIntervalMs;// 60 seconds: interval at which Debezium will persist the current offset position + private int offsetFlushIntervalMs = 60000;// 60 seconds: interval at which Debezium will persist the current offset position - @Inject - @ConfigProperty(name = "offset.flush.timeout.ms", defaultValue = "15000") - private int offsetFlushTimeoutMs; + private int offsetFlushTimeoutMs = 15000; - @Inject - @ConfigProperty(name = "offset.flush.size", defaultValue = "1000") - private int offsetFlushSize; + private int offsetFlushSize = 1000; // Snapshot-related properties - @Inject - @ConfigProperty(name = "snapshot.mode", defaultValue = "when_needed") - private String snapshotMode; + private String snapshotMode = "when_needed"; - @Inject - @ConfigProperty(name = "snapshot.locking.mode", defaultValue = "none") - private String snapshotLockingMode; + private String snapshotLockingMode = "none"; - @Inject - @ConfigProperty(name = "snapshot.fetch.size", defaultValue = "10000") - private int snapshotFetchSize; + private int snapshotFetchSize = 10000; - @Inject - @ConfigProperty(name = "snapshot.include.collection.list", defaultValue = "") private String snapshotIncludeCollectionList;// Specify tables to include during snapshot - @Inject - @ConfigProperty(name = "snapshot.exclude.collection.list", defaultValue = "") private String snapshotExcludeCollectionList; - @Inject - @ConfigProperty(name = "snapshot.delay.ms", defaultValue = "0") - private long snapshotDelayMs; + private long snapshotDelayMs = 0; // Connector-related properties - @Inject - @ConfigProperty(name = "connector.class", defaultValue = "io.debezium.connector.mysql.MySqlConnector") - private String connectorClass; + private String connectorClass = "io.debezium.connector.mysql.MySqlConnector"; - @Inject - @ConfigProperty(name = "database.history.impl", defaultValue = "io.debezium.relational.history.FileDatabaseHistory") - private String databaseHistoryImpl; + private String databaseHistoryImpl = "io.debezium.relational.history.FileDatabaseHistory"; - @Inject - @ConfigProperty(name = "database.history.file.name", defaultValue = "dbhistory.dat") - private String databaseHistoryFileName; + private String databaseHistoryFileName = "dbhistory.dat"; // Heartbeat properties - @Inject - @ConfigProperty(name = "heartbeat.interval.ms", defaultValue = "0") - private long heartbeatIntervalMs;// No heartbeat by default + private long heartbeatIntervalMs = 0;// No heartbeat by default - @Inject - @ConfigProperty(name = "heartbeat.topics.prefix", defaultValue = "__debezium-heartbeat") - private String heartbeatTopicsPrefix; + private String heartbeatTopicsPrefix = "__debezium-heartbeat"; // Event-related properties - @Inject - @ConfigProperty(name = "max.batch.size", defaultValue = "2048") - private int maxBatchSize; + private int maxBatchSize = 2048; - @Inject - @ConfigProperty(name = "max.queue.size", defaultValue = "8192") - private int maxQueueSize; + private int maxQueueSize = 8192; - @Inject - @ConfigProperty(name = "poll.interval.ms", defaultValue = "500") - private long pollIntervalMs; + private long pollIntervalMs = 500; - @Inject - @ConfigProperty(name = "schema.refresh.mode", defaultValue = "columns_diff") - private String schemaRefreshMode;// How schema changes are handled, can also be 'columns_diff_exclude_unchanged_toast' + private String schemaRefreshMode = "columns_diff";// How schema changes are handled, can also be 'columns_diff_exclude_unchanged_toast' - @Inject - @ConfigProperty(name = "tombstones.on.delete", defaultValue = "false") - private boolean tombstonesOnDelete; + private boolean tombstonesOnDelete = false; - @Inject - @ConfigProperty(name = "provide.transaction.metadata", defaultValue = "false") - private boolean provideTransactionMetadata; + private boolean provideTransactionMetadata = false; // Error handling-related properties - @Inject - @ConfigProperty(name = "max.retries", defaultValue = "10") - private int maxRetries;// Max retry attempts for failed tasks - @Inject - @ConfigProperty(name = "retry.delay.ms", defaultValue = "1000") - private long retryDelayMs; + private int maxRetries = 10;// Max retry attempts for failed tasks + + private long retryDelayMs = 1000; - @Inject - @ConfigProperty(name = "max.retry.duration.ms", defaultValue = "60000") - private long maxRetryDurationMs; + private long maxRetryDurationMs = 60000; // Additional general configurations - @Inject - @ConfigProperty(name = "include.schema.changes", defaultValue = "true") - private boolean includeSchemaChanges; + private boolean includeSchemaChanges = true; - @Inject - @ConfigProperty(name = "include.query", defaultValue = "false") - private boolean includeQuery; + private boolean includeQuery = false; - @Inject - @ConfigProperty(name = "decimal.handling.mode", defaultValue = "precise") - private String decimalHandlingMode;// How to handle decimal types: 'precise', 'string', 'double' + private String decimalHandlingMode = "precise";// How to handle decimal types: 'precise', 'string', 'double' - @Inject - @ConfigProperty(name = "binary.handling.mode", defaultValue = "bytes") - private String binaryHandlingMode;// How to handle binary fields: 'bytes', 'base64' + private String binaryHandlingMode = "bytes";// How to handle binary fields: 'bytes', 'base64' @Override public String getConnectorName() { diff --git a/api/src/main/java/org/openmrs/module/mambacore/db/debezium/EventConsumer.java b/api/src/main/java/org/openmrs/module/mambacore/db/debezium/EventConsumer.java index 4600fd90..aa544d69 100644 --- a/api/src/main/java/org/openmrs/module/mambacore/db/debezium/EventConsumer.java +++ b/api/src/main/java/org/openmrs/module/mambacore/db/debezium/EventConsumer.java @@ -1,4 +1,4 @@ -package com.ayinza.util.debezium.domain.model; +package org.openmrs.module.mambacore.db.debezium; import java.util.function.Consumer; diff --git a/api/src/main/java/org/openmrs/module/mambacore/db/debezium/MyEventConsumer.java b/api/src/main/java/org/openmrs/module/mambacore/db/debezium/MyEventConsumer.java index e69de29b..5cd1bcda 100644 --- a/api/src/main/java/org/openmrs/module/mambacore/db/debezium/MyEventConsumer.java +++ b/api/src/main/java/org/openmrs/module/mambacore/db/debezium/MyEventConsumer.java @@ -0,0 +1,14 @@ +package org.openmrs.module.mambacore.db.debezium; + +import io.debezium.engine.ChangeEvent; +import org.openmrs.module.dbevent.DbEvent; +import org.openmrs.module.dbevent.EventConsumer; + +public class MyEventConsumer implements EventConsumer { + + @Override + public void accept(DbEvent dbEvent) { + // Process the DbEvent (e.g., log it, update another system) + System.out.println("DbEvent detected: " + dbEvent); + } +} \ No newline at end of file diff --git a/api/src/main/java/org/openmrs/module/mambacore/db/debezium/ObjectMap.java b/api/src/main/java/org/openmrs/module/mambacore/db/debezium/ObjectMap.java index c1474fb4..f34e8679 100644 --- a/api/src/main/java/org/openmrs/module/mambacore/db/debezium/ObjectMap.java +++ b/api/src/main/java/org/openmrs/module/mambacore/db/debezium/ObjectMap.java @@ -1,14 +1,11 @@ -package com.ayinza.util.debezium.domain.model; - -import com.ayinza.utils.domain.model.Contract; - +package org.openmrs.module.mambacore.db.debezium; import java.util.Map; /** * Simple HashMap extension that contains utility methods for retrieving / converting values to certain types */ -public interface ObjectMap extends Contract, Map { +public interface ObjectMap extends Map { /** * @return the value with the given key cast as an Integer diff --git a/api/src/main/java/org/openmrs/module/mambacore/db/debezium/ObjectMapImpl.java b/api/src/main/java/org/openmrs/module/mambacore/db/debezium/ObjectMapImpl.java index 909d9210..3948d25e 100644 --- a/api/src/main/java/org/openmrs/module/mambacore/db/debezium/ObjectMapImpl.java +++ b/api/src/main/java/org/openmrs/module/mambacore/db/debezium/ObjectMapImpl.java @@ -1,6 +1,5 @@ -package com.ayinza.util.debezium.application.model; +package org.openmrs.module.mambacore.db.debezium; -import com.ayinza.utils.domain.model.debezium.ObjectMap; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Struct; @@ -25,9 +24,4 @@ public ObjectMapImpl(Struct struct) { } } } - - @Override - public ObjectMapImpl fromContract() { - return this; - } } diff --git a/api/src/main/java/org/openmrs/module/mambacore/db/debezium/OffsetUtils.java b/api/src/main/java/org/openmrs/module/mambacore/db/debezium/OffsetUtils.java index 86e9249d..037a5db1 100644 --- a/api/src/main/java/org/openmrs/module/mambacore/db/debezium/OffsetUtils.java +++ b/api/src/main/java/org/openmrs/module/mambacore/db/debezium/OffsetUtils.java @@ -1,20 +1,17 @@ -package com.ayinza.util.debezium.application.service; +package org.openmrs.module.mambacore.db.debezium; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; -import jakarta.enterprise.context.ApplicationScoped; -import jakarta.inject.Inject; import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.nio.ByteBuffer; import java.util.Map; -@ApplicationScoped public class OffsetUtils { - @Inject - private Logger logger; + private final static Logger logger = LoggerFactory.getLogger(OffsetUtils.class); /** * Transforms the specified offset file to match the new expected structure after the debezium and diff --git a/api/src/main/java/org/openmrs/module/mambacore/task/DebeziumConfigProvider.java b/api/src/main/java/org/openmrs/module/mambacore/task/DebeziumConfigProvider.java index 32ff7350..3ec83aad 100644 --- a/api/src/main/java/org/openmrs/module/mambacore/task/DebeziumConfigProvider.java +++ b/api/src/main/java/org/openmrs/module/mambacore/task/DebeziumConfigProvider.java @@ -7,7 +7,7 @@ * Copyright (C) OpenMRS Inc. OpenMRS is a registered trademark and the OpenMRS * graphic logo is a trademark of OpenMRS Inc. */ -package org.openmrs.module.mambacore.db.debezium; +package org.openmrs.module.mambacore.task; import io.debezium.config.Configuration; import org.openmrs.module.mambacore.util.MambaETLProperties; diff --git a/api/src/main/java/org/openmrs/module/mambacore/util/MambaETLProperties.java b/api/src/main/java/org/openmrs/module/mambacore/util/MambaETLProperties.java index a42d9a2a..1c95f09c 100644 --- a/api/src/main/java/org/openmrs/module/mambacore/util/MambaETLProperties.java +++ b/api/src/main/java/org/openmrs/module/mambacore/util/MambaETLProperties.java @@ -138,7 +138,7 @@ public String getSourceDatabaseServerId() { } public String getHistoryFilePath() { - return "dbhistory.dat"; // TODO: put in properties file + return "/Users/smallgod/srv/downloads/dbhistory.dat"; // TODO: put in properties file } private String getProperty(Properties properties, String key, String defaultValue) { diff --git a/api/src/main/resources/_core/database/mysql/xf_system/sp_mamba_dim_table_insert.sql b/api/src/main/resources/_core/database/mysql/xf_system/sp_mamba_dim_table_insert.sql index b1f0cd15..6b78af1d 100644 --- a/api/src/main/resources/_core/database/mysql/xf_system/sp_mamba_dim_table_insert.sql +++ b/api/src/main/resources/_core/database/mysql/xf_system/sp_mamba_dim_table_insert.sql @@ -68,7 +68,7 @@ BEGIN -- Set the join clause if it is an incremental insert IF is_incremental THEN SET join_clause = CONCAT( - ' INNER JOIN ic', + ' INNER JOIN mamba_etl_incremental_columns_index_new ic', ' ON tb.', pkey_column, ' = ic.incremental_table_pkey'); END IF; diff --git a/pom.xml b/pom.xml index 01334721..b91a915c 100644 --- a/pom.xml +++ b/pom.xml @@ -45,11 +45,12 @@ UTF-8 5.7.2 4.13.1 - 2.0.0 + 2.0.0 2.21.0 2.11.0 2.15.3 - 1.9.7.Final + 1.8.0.Final + 9.6.1 @@ -142,80 +143,95 @@ - - io.debezium - debezium-embedded - ${debezium.version} - - - org.glassfish.hk2.external - aopalliance-repackaged - - - com.fasterxml.jackson.core - jackson-databind - - - com.fasterxml.jackson.core - jackson-core - - - - com.fasterxml.jackson.datatype - jackson-datatype-jsr310 - - - com.fasterxml.jackson.core - jackson-annotations - - - javax.servlet - javax.servlet-api - - - org.apache.commons - commons-lang3 - - - org.slf4j - slf4j-api - - - org.slf4j - slf4j-log4j12 - - - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + - io.debezium - debezium-connector-mysql - ${debezium.version} - - - org.slf4j - slf4j-api - - - mysql - mysql-connector-java - - + org.openmrs.module + dbevent-omod + 1.0.0-SNAPSHOT + pom - io.debezium - debezium-api - ${debezium.version} - - - org.slf4j - slf4j-api - - + org.openmrs.module + dbevent-api + 1.0.0-SNAPSHOT