diff --git a/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/flink/api/common/functions/OpenContext.java b/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/flink/api/common/functions/OpenContext.java
new file mode 100644
index 000000000000..4ff5484b3b08
--- /dev/null
+++ b/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/flink/api/common/functions/OpenContext.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.functions;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * The {@link OpenContext} interface provides necessary information required by the {@link
+ * RichFunction} when it is opened. The {@link OpenContext} is currently empty because it can be
+ * used to add more methods without affecting the signature of {@code RichFunction#open}.
+ */
+@PublicEvolving
+public interface OpenContext {}
diff --git a/paimon-flink/paimon-flink-1.16/src/main/java/org/apache/flink/api/common/functions/OpenContext.java b/paimon-flink/paimon-flink-1.16/src/main/java/org/apache/flink/api/common/functions/OpenContext.java
new file mode 100644
index 000000000000..4ff5484b3b08
--- /dev/null
+++ b/paimon-flink/paimon-flink-1.16/src/main/java/org/apache/flink/api/common/functions/OpenContext.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.functions;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * The {@link OpenContext} interface provides necessary information required by the {@link
+ * RichFunction} when it is opened. The {@link OpenContext} is currently empty because it can be
+ * used to add more methods without affecting the signature of {@code RichFunction#open}.
+ */
+@PublicEvolving
+public interface OpenContext {}
diff --git a/paimon-flink/paimon-flink-1.17/src/main/java/org/apache/flink/api/common/functions/OpenContext.java b/paimon-flink/paimon-flink-1.17/src/main/java/org/apache/flink/api/common/functions/OpenContext.java
new file mode 100644
index 000000000000..4ff5484b3b08
--- /dev/null
+++ b/paimon-flink/paimon-flink-1.17/src/main/java/org/apache/flink/api/common/functions/OpenContext.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.functions;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * The {@link OpenContext} interface provides necessary information required by the {@link
+ * RichFunction} when it is opened. The {@link OpenContext} is currently empty because it can be
+ * used to add more methods without affecting the signature of {@code RichFunction#open}.
+ */
+@PublicEvolving
+public interface OpenContext {}
diff --git a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/flink/api/common/functions/OpenContext.java b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/flink/api/common/functions/OpenContext.java
new file mode 100644
index 000000000000..4ff5484b3b08
--- /dev/null
+++ b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/flink/api/common/functions/OpenContext.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.functions;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * The {@link OpenContext} interface provides necessary information required by the {@link
+ * RichFunction} when it is opened. The {@link OpenContext} is currently empty because it can be
+ * used to add more methods without affecting the signature of {@code RichFunction#open}.
+ */
+@PublicEvolving
+public interface OpenContext {}
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/kafka/KafkaSinkFunction.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/kafka/KafkaSinkFunction.java
index 72a177adceaf..41e7141cf48a 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/kafka/KafkaSinkFunction.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/kafka/KafkaSinkFunction.java
@@ -21,6 +21,7 @@
import org.apache.paimon.flink.sink.LogSinkFunction;
import org.apache.paimon.table.sink.SinkRecord;
+import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaException;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
@@ -65,7 +66,16 @@ public void setWriteCallback(WriteCallback writeCallback) {
this.writeCallback = writeCallback;
}
- @Override
+ /**
+ * Do not annotate with @override
here to maintain compatibility with Flink 1.18-.
+ */
+ public void open(OpenContext openContext) throws Exception {
+ open(new Configuration());
+ }
+
+ /**
+ * Do not annotate with @override
here to maintain compatibility with Flink 2.0+.
+ */
public void open(Configuration configuration) throws Exception {
super.open(configuration);
Callback baseCallback = requireNonNull(callback);
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicTableParsingProcessFunction.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicTableParsingProcessFunction.java
index 0961ff160048..886e33e2046a 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicTableParsingProcessFunction.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicTableParsingProcessFunction.java
@@ -22,6 +22,7 @@
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.types.DataField;
+import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
@@ -74,7 +75,16 @@ public CdcDynamicTableParsingProcessFunction(
this.parserFactory = parserFactory;
}
- @Override
+ /**
+ * Do not annotate with @override
here to maintain compatibility with Flink 1.18-.
+ */
+ public void open(OpenContext openContext) throws Exception {
+ open(new Configuration());
+ }
+
+ /**
+ * Do not annotate with @override
here to maintain compatibility with Flink 2.0+.
+ */
public void open(Configuration parameters) throws Exception {
parser = parserFactory.create();
catalog = catalogLoader.load();
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiTableParsingProcessFunction.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiTableParsingProcessFunction.java
index b18a05c280cb..4c5e0600bb47 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiTableParsingProcessFunction.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiTableParsingProcessFunction.java
@@ -20,6 +20,7 @@
import org.apache.paimon.types.DataField;
+import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ListTypeInfo;
import org.apache.flink.configuration.Configuration;
@@ -51,7 +52,16 @@ public CdcMultiTableParsingProcessFunction(EventParser.Factory parserFactory)
this.parserFactory = parserFactory;
}
- @Override
+ /**
+ * Do not annotate with @override
here to maintain compatibility with Flink 1.18-.
+ */
+ public void open(OpenContext openContext) throws Exception {
+ open(new Configuration());
+ }
+
+ /**
+ * Do not annotate with @override
here to maintain compatibility with Flink 2.0+.
+ */
public void open(Configuration parameters) throws Exception {
parser = parserFactory.create();
updatedDataFieldsOutputTags = new HashMap<>();
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcParsingProcessFunction.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcParsingProcessFunction.java
index 3456634942c8..eec228f3c09b 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcParsingProcessFunction.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcParsingProcessFunction.java
@@ -20,6 +20,7 @@
import org.apache.paimon.types.DataField;
+import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.java.typeutils.ListTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.ProcessFunction;
@@ -50,7 +51,16 @@ public CdcParsingProcessFunction(EventParser.Factory parserFactory) {
this.parserFactory = parserFactory;
}
- @Override
+ /**
+ * Do not annotate with @override
here to maintain compatibility with Flink 1.18-.
+ */
+ public void open(OpenContext openContext) throws Exception {
+ open(new Configuration());
+ }
+
+ /**
+ * Do not annotate with @override
here to maintain compatibility with Flink 2.0+.
+ */
public void open(Configuration parameters) throws Exception {
parser = parserFactory.create();
}
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
index c2e928bd4a0a..4f02b784c2ba 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
@@ -31,6 +31,7 @@
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.StringUtils;
+import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.slf4j.Logger;
@@ -73,7 +74,16 @@ protected UpdatedDataFieldsProcessFunctionBase(Catalog.Loader catalogLoader) {
this.catalogLoader = catalogLoader;
}
- @Override
+ /**
+ * Do not annotate with @override
here to maintain compatibility with Flink 1.18-.
+ */
+ public void open(OpenContext openContext) throws Exception {
+ open(new Configuration());
+ }
+
+ /**
+ * Do not annotate with @override
here to maintain compatibility with Flink 2.0+.
+ */
public void open(Configuration parameters) {
this.catalog = catalogLoader.load();
this.allowUpperCase = this.catalog.allowUpperCase();
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryAddressRegister.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryAddressRegister.java
index df3cf7abf2a5..524f2e5f01c1 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryAddressRegister.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryAddressRegister.java
@@ -23,6 +23,7 @@
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
+import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
@@ -44,7 +45,16 @@ public QueryAddressRegister(Table table) {
this.serviceManager = ((FileStoreTable) table).store().newServiceManager();
}
- @Override
+ /**
+ * Do not annotate with @override
here to maintain compatibility with Flink 1.18-.
+ */
+ public void open(OpenContext openContext) throws Exception {
+ open(new Configuration());
+ }
+
+ /**
+ * Do not annotate with @override
here to maintain compatibility with Flink 2.0+.
+ */
public void open(Configuration parameters) throws Exception {
this.executors = new TreeMap<>();
}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryFileMonitor.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryFileMonitor.java
index 43cf654e91fe..02f8a654112e 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryFileMonitor.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryFileMonitor.java
@@ -31,6 +31,7 @@
import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.table.system.FileMonitorTable;
+import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -70,7 +71,16 @@ public QueryFileMonitor(Table table) {
.toMillis();
}
- @Override
+ /**
+ * Do not annotate with @override
here to maintain compatibility with Flink 1.18-.
+ */
+ public void open(OpenContext openContext) throws Exception {
+ open(new Configuration());
+ }
+
+ /**
+ * Do not annotate with @override
here to maintain compatibility with Flink 2.0+.
+ */
public void open(Configuration parameters) throws Exception {
FileMonitorTable monitorTable = new FileMonitorTable((FileStoreTable) table);
ReadBuilder readBuilder = monitorTable.newReadBuilder();
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/shuffle/RangeShuffle.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/shuffle/RangeShuffle.java
index 54104130438b..8760f1dc5f80 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/shuffle/RangeShuffle.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/shuffle/RangeShuffle.java
@@ -27,6 +27,7 @@
import org.apache.paimon.utils.SerializableSupplier;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
@@ -182,9 +183,19 @@ public KeyAndSizeExtractor(RowType rowType, boolean isSortBySize) {
this.isSortBySize = isSortBySize;
}
- @Override
+ /**
+ * Do not annotate with @override
here to maintain compatibility with Flink
+ * 1.18-.
+ */
+ public void open(OpenContext openContext) throws Exception {
+ open(new Configuration());
+ }
+
+ /**
+ * Do not annotate with @override
here to maintain compatibility with Flink
+ * 2.0+.
+ */
public void open(Configuration parameters) throws Exception {
- super.open(parameters);
InternalRowToSizeVisitor internalRowToSizeVisitor = new InternalRowToSizeVisitor();
fieldSizeCalculator =
rowType.getFieldTypes().stream()
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java
index 07fe275543a1..2b25f074667c 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java
@@ -23,6 +23,8 @@
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.SinkRecord;
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.api.common.functions.util.FunctionUtils;
import org.apache.flink.api.common.state.CheckpointListener;
@@ -42,6 +44,8 @@
import javax.annotation.Nullable;
import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
import java.util.List;
import java.util.Objects;
@@ -97,17 +101,29 @@ public void open() throws Exception {
this.sinkContext = new SimpleContext(getProcessingTimeService());
if (logSinkFunction != null) {
- // to stay compatible with Flink 1.18-
- if (logSinkFunction instanceof RichFunction) {
- RichFunction richFunction = (RichFunction) logSinkFunction;
- richFunction.open(new Configuration());
- }
-
+ openFunction(logSinkFunction);
logCallback = new LogWriteCallback();
logSinkFunction.setWriteCallback(logCallback);
}
}
+ private static void openFunction(Function function) throws Exception {
+ if (function instanceof RichFunction) {
+ RichFunction richFunction = (RichFunction) function;
+
+ try {
+ Method method = RichFunction.class.getDeclaredMethod("open", OpenContext.class);
+ method.invoke(richFunction, new OpenContext() {});
+ return;
+ } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
+ // to stay compatible with Flink 1.18-
+ }
+
+ Method method = RichFunction.class.getDeclaredMethod("open", Configuration.class);
+ method.invoke(richFunction, new Configuration());
+ }
+ }
+
@Override
public void processWatermark(Watermark mark) throws Exception {
super.processWatermark(mark);
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortUtils.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortUtils.java
index f590c2fb7fff..b30e14551296 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortUtils.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortUtils.java
@@ -31,6 +31,7 @@
import org.apache.paimon.utils.KeyProjectedRow;
import org.apache.paimon.utils.SerializableSupplier;
+import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
@@ -119,9 +120,19 @@ public static DataStream sortStreamByKey(
.map(
new RichMapFunction>() {
- @Override
+ /**
+ * Do not annotate with @override
here to maintain
+ * compatibility with Flink 1.18-.
+ */
+ public void open(OpenContext openContext) throws Exception {
+ open(new Configuration());
+ }
+
+ /**
+ * Do not annotate with @override
here to maintain
+ * compatibility with Flink 2.0+.
+ */
public void open(Configuration parameters) throws Exception {
- super.open(parameters);
shuffleKeyAbstract.open();
}
@@ -172,7 +183,18 @@ public Tuple2 map(RowData value) {
private transient KeyProjectedRow keyProjectedRow;
- @Override
+ /**
+ * Do not annotate with @override
here to maintain
+ * compatibility with Flink 1.18-.
+ */
+ public void open(OpenContext openContext) {
+ open(new Configuration());
+ }
+
+ /**
+ * Do not annotate with @override
here to maintain
+ * compatibility with Flink 2.0+.
+ */
public void open(Configuration parameters) {
keyProjectedRow = new KeyProjectedRow(valueProjectionMap);
}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BucketUnawareCompactSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BucketUnawareCompactSource.java
index d306c7d8e1e5..e768c717ddaa 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BucketUnawareCompactSource.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BucketUnawareCompactSource.java
@@ -26,6 +26,7 @@
import org.apache.paimon.table.source.EndOfScanException;
import org.apache.paimon.utils.Preconditions;
+import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
@@ -72,7 +73,16 @@ public BucketUnawareCompactSource(
this.filter = filter;
}
- @Override
+ /**
+ * Do not annotate with @override
here to maintain compatibility with Flink 1.18-.
+ */
+ public void open(OpenContext openContext) throws Exception {
+ open(new Configuration());
+ }
+
+ /**
+ * Do not annotate with @override
here to maintain compatibility with Flink 2.0+.
+ */
public void open(Configuration parameters) throws Exception {
compactionCoordinator =
new UnawareAppendTableCompactionCoordinator(table, streaming, filter);
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareBatchSourceFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareBatchSourceFunction.java
index cee6081aa29f..2157be51aee4 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareBatchSourceFunction.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareBatchSourceFunction.java
@@ -25,6 +25,7 @@
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.Split;
+import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Boundedness;
@@ -62,7 +63,16 @@ public CombinedAwareBatchSourceFunction(
super(catalogLoader, includingPattern, excludingPattern, databasePattern, false);
}
- @Override
+ /**
+ * Do not annotate with @override
here to maintain compatibility with Flink 1.18-.
+ */
+ public void open(OpenContext openContext) throws Exception {
+ open(new Configuration());
+ }
+
+ /**
+ * Do not annotate with @override
here to maintain compatibility with Flink 2.0+.
+ */
public void open(Configuration parameters) throws Exception {
super.open(parameters);
tableScan =
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareStreamingSourceFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareStreamingSourceFunction.java
index bff690ea30c2..01e0127e9fda 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareStreamingSourceFunction.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareStreamingSourceFunction.java
@@ -25,6 +25,7 @@
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.Split;
+import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Boundedness;
@@ -59,7 +60,16 @@ public CombinedAwareStreamingSourceFunction(
this.monitorInterval = monitorInterval;
}
- @Override
+ /**
+ * Do not annotate with @override
here to maintain compatibility with Flink 1.18-.
+ */
+ public void open(OpenContext openContext) throws Exception {
+ open(new Configuration());
+ }
+
+ /**
+ * Do not annotate with @override
here to maintain compatibility with Flink 2.0+.
+ */
public void open(Configuration parameters) throws Exception {
super.open(parameters);
tableScan =
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedCompactorSourceFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedCompactorSourceFunction.java
index 1964927b5cdd..02bb8786505d 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedCompactorSourceFunction.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedCompactorSourceFunction.java
@@ -22,6 +22,7 @@
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.table.source.Split;
+import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
@@ -70,7 +71,16 @@ public CombinedCompactorSourceFunction(
this.isStreaming = isStreaming;
}
- @Override
+ /**
+ * Do not annotate with @override
here to maintain compatibility with Flink 1.18-.
+ */
+ public void open(OpenContext openContext) throws Exception {
+ open(new Configuration());
+ }
+
+ /**
+ * Do not annotate with @override
here to maintain compatibility with Flink 2.0+.
+ */
public void open(Configuration parameters) throws Exception {
isRunning = new AtomicBoolean(true);
}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedUnawareBatchSourceFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedUnawareBatchSourceFunction.java
index 8ec8d5f2c1a2..6a40f10ada61 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedUnawareBatchSourceFunction.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedUnawareBatchSourceFunction.java
@@ -29,6 +29,7 @@
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
+import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
@@ -69,7 +70,16 @@ public CombinedUnawareBatchSourceFunction(
super(catalogLoader, includingPattern, excludingPattern, databasePattern, false);
}
- @Override
+ /**
+ * Do not annotate with @override
here to maintain compatibility with Flink 1.18-.
+ */
+ public void open(OpenContext openContext) throws Exception {
+ open(new Configuration());
+ }
+
+ /**
+ * Do not annotate with @override
here to maintain compatibility with Flink 2.0+.
+ */
public void open(Configuration parameters) throws Exception {
super.open(parameters);
tableScan =
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedUnawareStreamingSourceFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedUnawareStreamingSourceFunction.java
index e398e09a8451..b64518a7ef60 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedUnawareStreamingSourceFunction.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedUnawareStreamingSourceFunction.java
@@ -24,6 +24,7 @@
import org.apache.paimon.flink.compact.MultiUnawareBucketTableScan;
import org.apache.paimon.flink.sink.MultiTableCompactionTaskTypeInfo;
+import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
@@ -55,7 +56,16 @@ public CombinedUnawareStreamingSourceFunction(
this.monitorInterval = monitorInterval;
}
- @Override
+ /**
+ * Do not annotate with @override
here to maintain compatibility with Flink 1.18-.
+ */
+ public void open(OpenContext openContext) throws Exception {
+ open(new Configuration());
+ }
+
+ /**
+ * Do not annotate with @override
here to maintain compatibility with Flink 2.0+.
+ */
public void open(Configuration parameters) throws Exception {
super.open(parameters);
tableScan =