From a67d2568104eaed488478f98925113d5a3fdf01c Mon Sep 17 00:00:00 2001 From: uniding <116274818+uniding@users.noreply.github.com> Date: Sun, 2 Feb 2025 12:51:16 +0800 Subject: [PATCH 1/6] resolve bug 8460 https://github.com/apache/seatunnel/issues/8460 --- .../org/apache/seatunnel/engine/server/master/JobMaster.java | 1 - 1 file changed, 1 deletion(-) diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java index a8fa0de57a0..9049801f697 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java @@ -245,7 +245,6 @@ public synchronized void init(long initializationTimestamp, boolean restart) thr try { Thread.currentThread().setContextClassLoader(classLoader); if (!restart - && !logicalDag.isStartWithSavePoint() && ReadonlyConfig.fromMap(logicalDag.getJobConfig().getEnvOptions()) .get(EnvCommonOptions.SAVEMODE_EXECUTE_LOCATION) .equals(SaveModeExecuteLocation.CLUSTER)) { From 9670a456bc568d5c5ffc5e55c080063ca1a4f2e1 Mon Sep 17 00:00:00 2001 From: zhouyq Date: Sat, 8 Feb 2025 11:59:37 +0800 Subject: [PATCH 2/6] =?UTF-8?q?=E5=9B=9E=E6=BB=9A?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../org/apache/seatunnel/engine/server/master/JobMaster.java | 1 + 1 file changed, 1 insertion(+) diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java index f2ac52ef9d5..e97b10b4503 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java @@ -249,6 +249,7 @@ public synchronized void init(long initializationTimestamp, boolean restart) thr try { Thread.currentThread().setContextClassLoader(classLoader); if (!restart + && !logicalDag.isStartWithSavePoint() && ReadonlyConfig.fromMap(logicalDag.getJobConfig().getEnvOptions()) .get(EnvCommonOptions.SAVEMODE_EXECUTE_LOCATION) .equals(SaveModeExecuteLocation.CLUSTER)) { From cd6afd12aabf589435a8b1a1bf223de5ec266080 Mon Sep 17 00:00:00 2001 From: uniding <116274818+uniding@users.noreply.github.com> Date: Tue, 11 Feb 2025 14:29:06 +0800 Subject: [PATCH 3/6] Update PaimonCatalog.java update close error msg --- .../connectors/seatunnel/paimon/catalog/PaimonCatalog.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java index 6883a47ce5c..a14a250cf2e 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java @@ -85,7 +85,7 @@ public void close() throws CatalogException { try { ((Closeable) catalog).close(); } catch (IOException e) { - log.error("Error while closing IcebergCatalog.", e); + log.error("Error while closing PaimonCatalog.", e); throw new CatalogException(e); } } From fc3b8143125bbab50eade4c98ce6d69f2bfe6198 Mon Sep 17 00:00:00 2001 From: zhouyq Date: Fri, 14 Feb 2025 17:01:49 +0800 Subject: [PATCH 4/6] resove bug: https://github.com/apache/seatunnel/issues/8460 https://github.com/apache/seatunnel/issues/8070 --- .../connectors/seatunnel/paimon/sink/PaimonSink.java | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java index 0129438c835..fb0a5f3458e 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java @@ -29,6 +29,7 @@ import org.apache.seatunnel.api.sink.SupportSaveMode; import org.apache.seatunnel.api.sink.SupportSchemaEvolutionSink; import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.api.table.schema.SchemaChangeType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.connectors.seatunnel.paimon.catalog.PaimonCatalog; @@ -80,6 +81,17 @@ public PaimonSink(ReadonlyConfig readonlyConfig, CatalogTable catalogTable) { this.paimonSinkConfig = new PaimonSinkConfig(readonlyConfig); this.catalogTable = catalogTable; this.paimonHadoopConfiguration = PaimonSecurityContext.loadHadoopConfig(paimonSinkConfig); + try (PaimonCatalog paimonCatalog = PaimonCatalog.loadPaimonCatalog(readonlyConfig)) { + paimonCatalog.open(); + boolean databaseExists = paimonCatalog.databaseExists(this.paimonSinkConfig.getNamespace()); + if (databaseExists) { + TablePath tablePath = catalogTable.getTablePath(); + boolean tableExists = paimonCatalog.tableExists(tablePath); + if (tableExists) { + this.paimonTable = paimonCatalog.getPaimonTable(tablePath); + } + } + } } @Override From e881a5537805040736afbb7cbcd1d77b7cc4c875 Mon Sep 17 00:00:00 2001 From: zhouyq Date: Mon, 17 Feb 2025 14:37:26 +0800 Subject: [PATCH 5/6] spotless --- .../seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java index fb0a5f3458e..3b502e4a757 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java @@ -83,7 +83,8 @@ public PaimonSink(ReadonlyConfig readonlyConfig, CatalogTable catalogTable) { this.paimonHadoopConfiguration = PaimonSecurityContext.loadHadoopConfig(paimonSinkConfig); try (PaimonCatalog paimonCatalog = PaimonCatalog.loadPaimonCatalog(readonlyConfig)) { paimonCatalog.open(); - boolean databaseExists = paimonCatalog.databaseExists(this.paimonSinkConfig.getNamespace()); + boolean databaseExists = + paimonCatalog.databaseExists(this.paimonSinkConfig.getNamespace()); if (databaseExists) { TablePath tablePath = catalogTable.getTablePath(); boolean tableExists = paimonCatalog.tableExists(tablePath); From 428c5c0ecf90c7d31d5f4ae3638b121bf38613a4 Mon Sep 17 00:00:00 2001 From: zhouyq Date: Tue, 18 Feb 2025 18:05:10 +0800 Subject: [PATCH 6/6] fix bugs: #8460 #8070 --- .../seatunnel/paimon/handler/PaimonSaveModeHandler.java | 5 ++++- .../connectors/seatunnel/paimon/sink/PaimonSink.java | 5 +++++ .../connectors/seatunnel/paimon/sink/SupportLoadTable.java | 2 ++ 3 files changed, 11 insertions(+), 1 deletion(-) diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/handler/PaimonSaveModeHandler.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/handler/PaimonSaveModeHandler.java index b479ebf14b0..7e93ee3512f 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/handler/PaimonSaveModeHandler.java +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/handler/PaimonSaveModeHandler.java @@ -53,6 +53,9 @@ public void handleSchemaSaveMode() { TablePath tablePath = catalogTable.getTablePath(); Table paimonTable = ((PaimonCatalog) catalog).getPaimonTable(tablePath); // load paimon table and set it into paimon sink - this.supportLoadTable.setLoadTable(paimonTable); + Table loadTable = this.supportLoadTable.getLoadTable(); + if (loadTable == null || this.schemaSaveMode == SchemaSaveMode.RECREATE_SCHEMA) { + this.supportLoadTable.setLoadTable(paimonTable); + } } } diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java index 3b502e4a757..3368a8b7c60 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java @@ -165,6 +165,11 @@ public void setLoadTable(Table table) { this.paimonTable = table; } + @Override + public Table getLoadTable() { + return this.paimonTable; + } + @Override public Optional getWriteCatalogTable() { return Optional.ofNullable(catalogTable); diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/SupportLoadTable.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/SupportLoadTable.java index 734762e23ca..538fb3b6c4a 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/SupportLoadTable.java +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/SupportLoadTable.java @@ -19,4 +19,6 @@ public interface SupportLoadTable { void setLoadTable(T table); + + T getLoadTable(); }