Skip to content

Commit

Permalink
Remove hadoop dependencies and hadoop catalog (#491)
Browse files Browse the repository at this point in the history
* Remove hadoop dependencies and hadoop catalog

* Remove hadoop dependencies and hadoop catalog

* Remove hadoop dependencies and hadoop catalog

* Remove hadoop dependencies and hadoop catalog

* Remove hadoop dependencies and hadoop catalog

* Remove hadoop dependencies and hadoop catalog

* Remove hadoop dependencies and hadoop catalog

* Remove hadoop dependencies and hadoop catalog
  • Loading branch information
ismailsimsek authored Feb 15, 2025
1 parent 94ac5c7 commit 2fdb645
Show file tree
Hide file tree
Showing 20 changed files with 185 additions and 152 deletions.
36 changes: 25 additions & 11 deletions debezium-server-iceberg-sink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -110,21 +110,11 @@
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-bundled-guava</artifactId>
</dependency>
<!-- Google -->
<dependency>
<groupId>com.google.cloud.bigdataoss</groupId>
<artifactId>gcs-connector</artifactId>
<version>hadoop3-${version.googlebigdataoss}</version>
</dependency>
<dependency>
<groupId>com.google.cloud.bigdataoss</groupId>
<artifactId>gcsio</artifactId>
<version>${version.googlebigdataoss}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aws</artifactId>
<version>${version.hadoop}</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>com.amazonaws</groupId>
Expand Down Expand Up @@ -167,6 +157,7 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${version.hadoop}</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
Expand Down Expand Up @@ -225,6 +216,29 @@
<artifactId>spark-sql_${version.spark.scala}</artifactId>
<version>${version.spark}</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>jul-to-slf4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
</exclusion>
<exclusion>
<!-- Excluded to use iceberg arrow version-->
<groupId>org.apache.arrow</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package io.debezium.server.iceberg;

import io.debezium.server.iceberg.testresources.BaseSparkTest;
import io.debezium.server.iceberg.testresources.CatalogJdbc;
import io.debezium.server.iceberg.testresources.S3Minio;
import io.debezium.server.iceberg.testresources.SourceMangoDB;
import io.quarkus.test.common.QuarkusTestResource;
Expand All @@ -31,6 +32,7 @@
@QuarkusTest
@QuarkusTestResource(value = S3Minio.class, restrictToAnnotatedClass = true)
@QuarkusTestResource(value = SourceMangoDB.class, restrictToAnnotatedClass = true)
@QuarkusTestResource(value = CatalogJdbc.class, restrictToAnnotatedClass = true)
@TestProfile(IcebergChangeConsumerMangodbTest.TestProfile.class)
public class IcebergChangeConsumerMangodbTest extends BaseSparkTest {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

import com.google.common.collect.Lists;
import io.debezium.server.iceberg.testresources.BaseTest;
import io.debezium.server.iceberg.testresources.CatalogJdbc;
import io.debezium.server.iceberg.testresources.CatalogJdbc;
import io.debezium.server.iceberg.testresources.S3Minio;
import io.debezium.server.iceberg.testresources.SourceMysqlDB;
import io.quarkus.test.common.QuarkusTestResource;
Expand All @@ -31,6 +33,7 @@
@QuarkusTest
@QuarkusTestResource(value = S3Minio.class, restrictToAnnotatedClass = true)
@QuarkusTestResource(value = SourceMysqlDB.class, restrictToAnnotatedClass = true)
@QuarkusTestResource(value = CatalogJdbc.class, restrictToAnnotatedClass = true)
@TestProfile(IcebergChangeConsumerMysqlTest.TestProfile.class)
public class IcebergChangeConsumerMysqlTest extends BaseTest {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import com.google.common.collect.Lists;
import io.debezium.server.iceberg.testresources.BaseTest;
import io.debezium.server.iceberg.testresources.CatalogJdbc;
import io.debezium.server.iceberg.testresources.S3Minio;
import io.debezium.server.iceberg.testresources.SourceMysqlDB;
import io.quarkus.test.common.QuarkusTestResource;
Expand All @@ -35,6 +36,7 @@
@QuarkusTest
@QuarkusTestResource(value = S3Minio.class, restrictToAnnotatedClass = true)
@QuarkusTestResource(value = SourceMysqlDB.class, restrictToAnnotatedClass = true)
@QuarkusTestResource(value = CatalogJdbc.class, restrictToAnnotatedClass = true)
@TestProfile(IcebergChangeConsumerMysqlTestUnwrapped.TestProfile.class)
public class IcebergChangeConsumerMysqlTestUnwrapped extends BaseTest {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

import com.google.common.collect.Lists;
import io.debezium.server.iceberg.testresources.BaseSparkTest;
import io.debezium.server.iceberg.testresources.CatalogJdbc;
import io.debezium.server.iceberg.testresources.CatalogRest;
import io.debezium.server.iceberg.testresources.S3Minio;
import io.debezium.server.iceberg.testresources.SourcePostgresqlDB;
import io.quarkus.test.common.QuarkusTestResource;
Expand Down Expand Up @@ -45,6 +47,7 @@
@QuarkusTest
@QuarkusTestResource(value = S3Minio.class, restrictToAnnotatedClass = true)
@QuarkusTestResource(value = SourcePostgresqlDB.class, restrictToAnnotatedClass = true)
@QuarkusTestResource(value = CatalogJdbc.class, restrictToAnnotatedClass = true)
@TestProfile(IcebergChangeConsumerTest.TestProfile.class)
public class IcebergChangeConsumerTest extends BaseSparkTest {

Expand All @@ -61,52 +64,52 @@ public class IcebergChangeConsumerTest extends BaseSparkTest {
public void testConsumingVariousDataTypes() throws Exception {
assertEquals(sinkType, "iceberg");
String sql = "\n" +
" DROP TABLE IF EXISTS inventory.data_types;\n" +
" CREATE TABLE IF NOT EXISTS inventory.data_types (\n" +
" c_id INTEGER ,\n" +
" c_text TEXT,\n" +
" c_varchar VARCHAR,\n" +
" c_int INTEGER,\n" +
" c_date DATE,\n" +
" c_timestamp TIMESTAMP,\n" +
" c_timestamptz TIMESTAMPTZ,\n" +
" c_float FLOAT,\n" +
" c_decimal DECIMAL(18,4),\n" +
" c_numeric NUMERIC(18,4),\n" +
" c_interval INTERVAL,\n" +
" c_boolean BOOLEAN,\n" +
" c_uuid UUID,\n" +
" c_bytea BYTEA,\n" +
" c_json JSON,\n" +
" c_jsonb JSONB,\n" +
" c_hstore_keyval hstore,\n" +
" c_last_field VARCHAR\n" +
" );";
" DROP TABLE IF EXISTS inventory.data_types;\n" +
" CREATE TABLE IF NOT EXISTS inventory.data_types (\n" +
" c_id INTEGER ,\n" +
" c_text TEXT,\n" +
" c_varchar VARCHAR,\n" +
" c_int INTEGER,\n" +
" c_date DATE,\n" +
" c_timestamp TIMESTAMP,\n" +
" c_timestamptz TIMESTAMPTZ,\n" +
" c_float FLOAT,\n" +
" c_decimal DECIMAL(18,4),\n" +
" c_numeric NUMERIC(18,4),\n" +
" c_interval INTERVAL,\n" +
" c_boolean BOOLEAN,\n" +
" c_uuid UUID,\n" +
" c_bytea BYTEA,\n" +
" c_json JSON,\n" +
" c_jsonb JSONB,\n" +
" c_hstore_keyval hstore,\n" +
" c_last_field VARCHAR\n" +
" );";
SourcePostgresqlDB.runSQL(sql);
sql = "INSERT INTO inventory.data_types (" +
"c_id, " +
"c_text, c_varchar, c_int, c_date, c_timestamp, c_timestamptz, " +
"c_float, c_decimal,c_numeric,c_interval,c_boolean,c_uuid,c_bytea, " +
"c_json, c_jsonb, c_hstore_keyval, c_last_field) " +
"VALUES (1, null, null, null,null,null,null," +
"null,null,null,null,null,null,null," +
"null,null, null, null)," +
"(2, 'val_text', 'A', 123, current_date , current_timestamp, current_timestamp," +
"'1.23'::float,'1234566.34456'::decimal,'345672123.452'::numeric, interval '1 day',false," +
"'3f207ac6-5dba-11eb-ae93-0242ac130002'::UUID, 'aBC'::bytea," +
"'{\"reading\": 1123}'::json, '{\"reading\": 1123}'::jsonb, " +
"'mapkey1=>1, mapkey2=>2'::hstore, " +
"'stringvalue' " +
")";
"c_id, " +
"c_text, c_varchar, c_int, c_date, c_timestamp, c_timestamptz, " +
"c_float, c_decimal,c_numeric,c_interval,c_boolean,c_uuid,c_bytea, " +
"c_json, c_jsonb, c_hstore_keyval, c_last_field) " +
"VALUES (1, null, null, null,null,null,null," +
"null,null,null,null,null,null,null," +
"null,null, null, null)," +
"(2, 'val_text', 'A', 123, current_date , current_timestamp, current_timestamp," +
"'1.23'::float,'1234566.34456'::decimal,'345672123.452'::numeric, interval '1 day',false," +
"'3f207ac6-5dba-11eb-ae93-0242ac130002'::UUID, 'aBC'::bytea," +
"'{\"reading\": 1123}'::json, '{\"reading\": 1123}'::jsonb, " +
"'mapkey1=>1, mapkey2=>2'::hstore, " +
"'stringvalue' " +
")";
SourcePostgresqlDB.runSQL(sql);
Awaitility.await().atMost(Duration.ofSeconds(320)).until(() -> {
try {
Dataset<Row> df = getTableData("testc.inventory.data_types");
df.show(false);
return df.where("c_text is null AND c_varchar is null AND c_int is null " +
"AND c_date is null AND c_timestamp is null AND c_timestamptz is null " +
"AND c_float is null AND c_decimal is null AND c_numeric is null AND c_interval is null " +
"AND c_boolean is null AND c_uuid is null AND c_bytea is null").count() == 1;
"AND c_date is null AND c_timestamp is null AND c_timestamptz is null " +
"AND c_float is null AND c_decimal is null AND c_numeric is null AND c_interval is null " +
"AND c_boolean is null AND c_uuid is null AND c_bytea is null").count() == 1;
} catch (Exception e) {
return false;
}
Expand All @@ -125,27 +128,27 @@ public void testConsumingVariousDataTypes() throws Exception {
@Test
public void testConsumingArrayDataType() throws Exception {
String sql = " DROP TABLE IF EXISTS inventory.array_data;\n" +
" CREATE TABLE IF NOT EXISTS inventory.array_data (\n" +
" name text,\n" +
" pay_by_quarter integer[],\n" +
" c_array_of_map hstore[],\n" +
" schedule text[][]\n" +
" );\n" +
" INSERT INTO inventory.array_data\n" +
" VALUES " +
"('Carol2',\n" +
" ARRAY[20000, 25000, 25000, 25000],\n" +
" ARRAY['mapkey1=>1, mapkey2=>2'::hstore],\n" +
" ARRAY[['breakfast', 'consulting'], ['meeting', 'lunch']]),\n" +
"('Bill',\n" +
" '{10000, 10000, 10000, 10000}',\n" +
" ARRAY['mapkey1=>1, mapkey2=>2'::hstore],\n" +
" '{{\"meeting\", \"lunch\"}, {\"training\", \"presentation\"}}'),\n" +
" ('Carol1',\n" +
" '{20000, 25000, 25000, 25000}',\n" +
" ARRAY['mapkey1=>1, mapkey2=>2'::hstore],\n" +
" '{{\"breakfast\", \"consulting\"}, {\"meeting\", \"lunch\"}}')" +
";";
" CREATE TABLE IF NOT EXISTS inventory.array_data (\n" +
" name text,\n" +
" pay_by_quarter integer[],\n" +
" c_array_of_map hstore[],\n" +
" schedule text[][]\n" +
" );\n" +
" INSERT INTO inventory.array_data\n" +
" VALUES " +
"('Carol2',\n" +
" ARRAY[20000, 25000, 25000, 25000],\n" +
" ARRAY['mapkey1=>1, mapkey2=>2'::hstore],\n" +
" ARRAY[['breakfast', 'consulting'], ['meeting', 'lunch']]),\n" +
"('Bill',\n" +
" '{10000, 10000, 10000, 10000}',\n" +
" ARRAY['mapkey1=>1, mapkey2=>2'::hstore],\n" +
" '{{\"meeting\", \"lunch\"}, {\"training\", \"presentation\"}}'),\n" +
" ('Carol1',\n" +
" '{20000, 25000, 25000, 25000}',\n" +
" ARRAY['mapkey1=>1, mapkey2=>2'::hstore],\n" +
" '{{\"breakfast\", \"consulting\"}, {\"meeting\", \"lunch\"}}')" +
";";
SourcePostgresqlDB.runSQL(sql);

Awaitility.await().atMost(Duration.ofSeconds(320)).until(() -> {
Expand All @@ -170,7 +173,7 @@ public void testSchemaChanges() throws Exception {
SourcePostgresqlDB.runSQL("UPDATE inventory.customers SET first_name='George__UPDATE1' WHERE id = 1002 ;");
SourcePostgresqlDB.runSQL("ALTER TABLE inventory.customers ALTER COLUMN email DROP NOT NULL;");
SourcePostgresqlDB.runSQL("INSERT INTO inventory.customers VALUES " +
"(default,'SallyUSer2','Thomas',null,'value1',false, '2020-01-01');");
"(default,'SallyUSer2','Thomas',null,'value1',false, '2020-01-01');");
SourcePostgresqlDB.runSQL("ALTER TABLE inventory.customers ALTER COLUMN last_name DROP NOT NULL;");
SourcePostgresqlDB.runSQL("UPDATE inventory.customers SET last_name = NULL WHERE id = 1002 ;");
SourcePostgresqlDB.runSQL("DELETE FROM inventory.customers WHERE id = 1004 ;");
Expand All @@ -181,13 +184,13 @@ public void testSchemaChanges() throws Exception {
ds.show();
return
ds.where("__op == 'r'").count() == 4 // snapshot rows. initial table data
&& ds.where("__op == 'u'").count() == 3 // 3 update
&& ds.where("__op == 'c'").count() == 1 // 1 insert
&& ds.where("__op == 'd'").count() == 1 // 1 insert
&& ds.where("first_name == 'George__UPDATE1'").count() == 3
&& ds.where("first_name == 'SallyUSer2'").count() == 1
&& ds.where("last_name is null").count() == 1
&& ds.where("id == '1004'").where("__op == 'd'").count() == 1;
&& ds.where("__op == 'u'").count() == 3 // 3 update
&& ds.where("__op == 'c'").count() == 1 // 1 insert
&& ds.where("__op == 'd'").count() == 1 // 1 insert
&& ds.where("first_name == 'George__UPDATE1'").count() == 3
&& ds.where("first_name == 'SallyUSer2'").count() == 1
&& ds.where("last_name is null").count() == 1
&& ds.where("id == '1004'").where("__op == 'd'").count() == 1;
} catch (Exception e) {
return false;
}
Expand All @@ -197,20 +200,20 @@ public void testSchemaChanges() throws Exception {
getTableData("testc.inventory.customers").show();
// insert row after defining new column in target iceberg table
SourcePostgresqlDB.runSQL("INSERT INTO inventory.customers VALUES " +
"(default,'After-Defining-Iceberg-fields','Thomas',null,'value1',false, '2020-01-01');");
"(default,'After-Defining-Iceberg-fields','Thomas',null,'value1',false, '2020-01-01');");

// remove column from source
SourcePostgresqlDB.runSQL("ALTER TABLE inventory.customers DROP COLUMN email;");
SourcePostgresqlDB.runSQL("INSERT INTO inventory.customers VALUES " +
"(default,'User3','lastname_value3','after-dropping-email-column-from-source',true, '2020-01-01'::DATE);");
"(default,'User3','lastname_value3','after-dropping-email-column-from-source',true, '2020-01-01'::DATE);");

Awaitility.await().atMost(Duration.ofSeconds(180)).until(() -> {
try {
Dataset<Row> ds = getTableData("testc.inventory.customers");
ds.show();
return ds.where("first_name == 'User3'").count() == 1
&& ds.where("first_name == 'After-Defining-Iceberg-fields'").count() == 1
&& ds.where("test_varchar_column == 'after-dropping-email-column-from-source' AND email is null").count() == 1;
&& ds.where("first_name == 'After-Defining-Iceberg-fields'").count() == 1
&& ds.where("test_varchar_column == 'after-dropping-email-column-from-source' AND email is null").count() == 1;
} catch (Exception e) {
return false;
}
Expand Down Expand Up @@ -289,6 +292,7 @@ public void testSimpleUpload() {
ds.show(false);
return ds.count() >= 3;
} catch (Exception e) {
e.printStackTrace();
return false;
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import com.google.common.collect.Lists;
import io.debezium.server.iceberg.testresources.BaseSparkTest;
import io.debezium.server.iceberg.testresources.CatalogJdbc;
import io.debezium.server.iceberg.testresources.S3Minio;
import io.debezium.server.iceberg.testresources.SourcePostgresqlDB;
import io.quarkus.test.common.QuarkusTestResource;
Expand Down Expand Up @@ -39,6 +40,7 @@
@QuarkusTest
@QuarkusTestResource(value = S3Minio.class, restrictToAnnotatedClass = true)
@QuarkusTestResource(value = SourcePostgresqlDB.class, restrictToAnnotatedClass = true)
@QuarkusTestResource(value = CatalogJdbc.class, restrictToAnnotatedClass = true)
@TestProfile(IcebergChangeConsumerTestUnwraapped.TestProfile.class)
public class IcebergChangeConsumerTestUnwraapped extends BaseSparkTest {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package io.debezium.server.iceberg;

import io.debezium.server.iceberg.testresources.BaseSparkTest;
import io.debezium.server.iceberg.testresources.CatalogJdbc;
import io.debezium.server.iceberg.testresources.S3Minio;
import io.debezium.server.iceberg.testresources.TestUtil;
import io.quarkus.test.common.QuarkusTestResource;
Expand All @@ -32,6 +33,7 @@
*/
@QuarkusTest
@QuarkusTestResource(value = S3Minio.class, restrictToAnnotatedClass = true)
@QuarkusTestResource(value = CatalogJdbc.class, restrictToAnnotatedClass = true)
@TestProfile(IcebergChangeConsumerUpsertDeleteDeletesTest.TestProfile.class)
public class IcebergChangeConsumerUpsertDeleteDeletesTest extends BaseSparkTest {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package io.debezium.server.iceberg;

import io.debezium.server.iceberg.testresources.BaseSparkTest;
import io.debezium.server.iceberg.testresources.CatalogJdbc;
import io.debezium.server.iceberg.testresources.S3Minio;
import io.debezium.server.iceberg.testresources.SourcePostgresqlDB;
import io.debezium.server.iceberg.testresources.TestUtil;
Expand Down Expand Up @@ -37,6 +38,7 @@
@QuarkusTest
@QuarkusTestResource(value = S3Minio.class, restrictToAnnotatedClass = true)
@QuarkusTestResource(value = SourcePostgresqlDB.class, restrictToAnnotatedClass = true)
@QuarkusTestResource(value = CatalogJdbc.class, restrictToAnnotatedClass = true)
@TestProfile(IcebergChangeConsumerUpsertTest.TestProfile.class)
public class IcebergChangeConsumerUpsertTest extends BaseSparkTest {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package io.debezium.server.iceberg;

import io.debezium.server.iceberg.testresources.BaseSparkTest;
import io.debezium.server.iceberg.testresources.CatalogJdbc;
import io.debezium.server.iceberg.testresources.S3Minio;
import io.debezium.server.iceberg.testresources.SourceMysqlDB;
import io.quarkus.test.common.QuarkusTestResource;
Expand All @@ -35,6 +36,7 @@
@QuarkusTest
@QuarkusTestResource(value = S3Minio.class, restrictToAnnotatedClass = true)
@QuarkusTestResource(value = SourceMysqlDB.class, restrictToAnnotatedClass = true)
@QuarkusTestResource(value = CatalogJdbc.class, restrictToAnnotatedClass = true)
@TestProfile(IcebergEventsChangeConsumerTest.TestProfile.class)
public class IcebergEventsChangeConsumerTest extends BaseSparkTest {
@ConfigProperty(name = "debezium.sink.type")
Expand Down
Loading

0 comments on commit 2fdb645

Please sign in to comment.