From a6e81086cb8f0585ef83ddd969aff6995552b146 Mon Sep 17 00:00:00 2001 From: shatiwar Date: Thu, 17 Oct 2024 13:12:26 +0530 Subject: [PATCH] Add Implementation db2 dialect getUpsertStatement Add E2E for upsert db2 scenario --- .../jdbc/internal/dialect/db2/DB2Dialect.java | 54 ++++++- .../seatunnel/jdbc/AbstractJdbcIT.java | 22 +++ .../connectors/seatunnel/jdbc/JdbcCase.java | 2 + .../connectors/seatunnel/jdbc/JdbcDb2IT.java | 6 +- .../seatunnel/jdbc/JdbcDb2UpsertIT.java | 133 ++++++++++++++++++ .../jdbc_db2_source_and_sink_upsert.conf | 57 ++++++++ 6 files changed, 270 insertions(+), 4 deletions(-) create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcDb2UpsertIT.java create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_db2_source_and_sink_upsert.conf diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/db2/DB2Dialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/db2/DB2Dialect.java index 6150dd4330d..5af57bf1045 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/db2/DB2Dialect.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/db2/DB2Dialect.java @@ -22,7 +22,9 @@ import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper; +import java.util.Arrays; import java.util.Optional; +import java.util.stream.Collectors; public class DB2Dialect implements JdbcDialect { @@ -44,6 +46,56 @@ public JdbcDialectTypeMapper getJdbcDialectTypeMapper() { @Override public Optional getUpsertStatement( String database, String tableName, String[] fieldNames, String[] uniqueKeyFields) { - return Optional.empty(); + // Generate field list for USING and INSERT clauses + String fieldList = String.join(", ", fieldNames); + + // Generate placeholder list for VALUES clause + String placeholderList = + Arrays.stream(fieldNames).map(field -> "?").collect(Collectors.joining(", ")); + + // Generate ON clause + String onClause = + Arrays.stream(uniqueKeyFields) + .map(field -> "target." + field + " = source." + field) + .collect(Collectors.joining(" AND ")); + + // Generate WHEN MATCHED clause + String whenMatchedClause = + Arrays.stream(fieldNames) + .map(field -> "target." + field + " <> source." + field) + .collect(Collectors.joining(" OR ")); + + // Generate UPDATE SET clause + String updateSetClause = + Arrays.stream(fieldNames) + .map(field -> "target." + field + " = source." + field) + .collect(Collectors.joining(", ")); + + // Generate WHEN NOT MATCHED clause + String insertClause = + "INSERT (" + + fieldList + + ") VALUES (" + + Arrays.stream(fieldNames) + .map(field -> "source." + field) + .collect(Collectors.joining(", ")) + + ")"; + + // Combine all parts to form the final SQL statement + String mergeStatement = + String.format( + "MERGE INTO %s.%s AS target USING (VALUES (%s)) AS source (%s) ON %s " + + "WHEN MATCHED AND (%s) THEN UPDATE SET %s " + + "WHEN NOT MATCHED THEN %s;", + database, + tableName, + placeholderList, + fieldList, + onClause, + whenMatchedClause, + updateSetClause, + insertClause); + + return Optional.of(mergeStatement); } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java index 24b916d4049..9747396b61f 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java @@ -209,6 +209,17 @@ protected void createNeededTables() { jdbcCase.getSourceTable())); statement.execute(createSource); + if (jdbcCase.getAdditionalSqlOnSource() != null) { + String additionalSql = + String.format( + jdbcCase.getAdditionalSqlOnSource(), + buildTableInfoWithSchema( + jdbcCase.getDatabase(), + jdbcCase.getSchema(), + jdbcCase.getSourceTable())); + statement.execute(additionalSql); + } + if (!jdbcCase.isUseSaveModeCreateTable()) { if (jdbcCase.getSinkCreateSql() != null) { createTemplate = jdbcCase.getSinkCreateSql(); @@ -223,6 +234,17 @@ protected void createNeededTables() { statement.execute(createSink); } + if (jdbcCase.getAdditionalSqlOnSink() != null) { + String additionalSql = + String.format( + jdbcCase.getAdditionalSqlOnSink(), + buildTableInfoWithSchema( + jdbcCase.getDatabase(), + jdbcCase.getSchema(), + jdbcCase.getSinkTable())); + statement.execute(additionalSql); + } + connection.commit(); } catch (Exception exception) { log.error(ExceptionUtils.getMessage(exception)); diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcCase.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcCase.java index 3dd7b64b95d..e6bbbd19a70 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcCase.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcCase.java @@ -48,6 +48,8 @@ public class JdbcCase { private String jdbcUrl; private String createSql; private String sinkCreateSql; + private String additionalSqlOnSource; + private String additionalSqlOnSink; private String insertSql; private List configFile; private Pair> testData; diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcDb2IT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcDb2IT.java index 22a29b3b679..a876d9bf7a0 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcDb2IT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcDb2IT.java @@ -44,9 +44,9 @@ public class JdbcDb2IT extends AbstractJdbcIT { private static final String DB2_CONTAINER_HOST = "db2-e2e"; - private static final String DB2_DATABASE = "E2E"; - private static final String DB2_SOURCE = "SOURCE"; - private static final String DB2_SINK = "SINK"; + protected static final String DB2_DATABASE = "E2E"; + protected static final String DB2_SOURCE = "SOURCE"; + protected static final String DB2_SINK = "SINK"; private static final String DB2_URL = "jdbc:db2://" + HOST + ":%s/%s"; diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcDb2UpsertIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcDb2UpsertIT.java new file mode 100644 index 00000000000..d6e0147368e --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcDb2UpsertIT.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc; + +import org.apache.seatunnel.e2e.common.container.TestContainer; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.TestTemplate; +import org.testcontainers.containers.Container; + +import com.google.common.collect.Lists; + +import java.io.IOException; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.List; + +public class JdbcDb2UpsertIT extends JdbcDb2IT { + + private static final String CREATE_SQL_SINK = + "create table %s\n" + + "(\n" + + " C_BOOLEAN BOOLEAN,\n" + + " C_SMALLINT SMALLINT,\n" + + " C_INT INTEGER NOT NULL PRIMARY KEY,\n" + + " C_INTEGER INTEGER,\n" + + " C_BIGINT BIGINT,\n" + + " C_DECIMAL DECIMAL(5),\n" + + " C_DEC DECIMAL(5),\n" + + " C_NUMERIC DECIMAL(5),\n" + + " C_NUM DECIMAL(5),\n" + + " C_REAL REAL,\n" + + " C_FLOAT DOUBLE,\n" + + " C_DOUBLE DOUBLE,\n" + + " C_DOUBLE_PRECISION DOUBLE,\n" + + " C_CHAR CHARACTER(1),\n" + + " C_VARCHAR VARCHAR(255),\n" + + " C_BINARY BINARY(1),\n" + + " C_VARBINARY VARBINARY(2048),\n" + + " C_DATE DATE,\n" + + " C_UPDATED_AT TIMESTAMP DEFAULT CURRENT_TIMESTAMP\n" + + ");\n"; + + // create a trigger to update the timestamp when the row is updated. + // if no changes are made to the row, the timestamp should not be updated. + private static final String CREATE_TRIGGER_SQL = + "CREATE TRIGGER c_updated_at_trigger\n" + + " BEFORE UPDATE ON %s\n" + + " REFERENCING NEW AS new_row\n" + + " FOR EACH ROW\n" + + "BEGIN ATOMIC\n" + + "SET new_row.c_updated_at = CURRENT_TIMESTAMP;\n" + + "END;"; + + private static final List CONFIG_FILE = + Lists.newArrayList("/jdbc_db2_source_and_sink_upsert.conf"); + + @Override + JdbcCase getJdbcCase() { + jdbcCase = super.getJdbcCase(); + jdbcCase.setSinkCreateSql(CREATE_SQL_SINK); + jdbcCase.setConfigFile(CONFIG_FILE); + jdbcCase.setAdditionalSqlOnSink(CREATE_TRIGGER_SQL); + return jdbcCase; + } + + @TestTemplate + public void testDb2UpsertE2e(TestContainer container) + throws IOException, InterruptedException, SQLException { + try { + // step 1: run the job to migrate data from source to sink. + Container.ExecResult execResult = + container.executeJob("/jdbc_db2_source_and_sink_upsert.conf"); + Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); + List> updatedAtTimestampsBeforeUpdate = + query( + String.format( + "SELECT C_UPDATED_AT FROM %s", + buildTableInfoWithSchema(DB2_DATABASE, DB2_SINK))); + // step 2: run the job to update the data in the sink. + // expected: timestamps should not be updated as the data is not changed. + execResult = container.executeJob("/jdbc_db2_source_and_sink_upsert.conf"); + Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); + List> updatedAtTimestampsAfterUpdate = + query( + String.format( + "SELECT C_UPDATED_AT FROM %s", + buildTableInfoWithSchema(DB2_DATABASE, DB2_SINK))); + Assertions.assertIterableEquals( + updatedAtTimestampsBeforeUpdate, updatedAtTimestampsAfterUpdate); + } finally { + clearTable(DB2_DATABASE, DB2_SINK); + } + } + + private List> query(String sql) { + try (Statement statement = connection.createStatement(); + ResultSet resultSet = statement.executeQuery(sql)) { + List> result = new ArrayList<>(); + int columnCount = resultSet.getMetaData().getColumnCount(); + while (resultSet.next()) { + ArrayList objects = new ArrayList<>(); + for (int i = 1; i <= columnCount; i++) { + objects.add(resultSet.getString(i)); + } + result.add(objects); + log.debug(String.format("Print query, sql: %s, data: %s", sql, objects)); + } + connection.commit(); + return result; + } catch (SQLException e) { + throw new RuntimeException(e); + } + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_db2_source_and_sink_upsert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_db2_source_and_sink_upsert.conf new file mode 100644 index 00000000000..518a027d34d --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_db2_source_and_sink_upsert.conf @@ -0,0 +1,57 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + # This is a example source plugin **only for test and demonstrate the feature source plugin** + Jdbc { + driver = com.ibm.db2.jcc.DB2Driver + url = "jdbc:db2://db2-e2e:50000/E2E" + user = "db2inst1" + password = "123456" + query = """ + select * from "E2E".SOURCE; + """ + } + + # If you would like to get more information about how to configure seatunnel and see full list of source plugins, + # please go to https://seatunnel.apache.org/docs/connector-v2/source/Jdbc +} + +sink { + Jdbc { + driver = com.ibm.db2.jcc.DB2Driver + url = "jdbc:db2://db2-e2e:50000/E2E" + user = "db2inst1" + password = "123456" + database = "E2E" + table = "SINK" + enable_upsert = true + # The primary keys of the table, which will be used to generate the upsert sql + generate_sink_sql = true + primary_keys = [ + C_INT + ] + } + + # If you would like to get more information about how to configure seatunnel and see full list of sink plugins, + # please go to https://seatunnel.apache.org/docs/connector-v2/sink/Jdbc +}