From a0b27cade489518a1992664d2b848a07b60f872f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Objet=20Trouv=C3=A9?= Date: Wed, 15 Sep 2021 16:41:34 +0200 Subject: [PATCH] #386: Check serializability of crypto key reader and encryption keys. --- .../flink/streaming/connectors/pulsar/FlinkPulsarSink.java | 4 ++++ .../flink/streaming/connectors/pulsar/FlinkPulsarSource.java | 4 ++++ 2 files changed, 8 insertions(+) diff --git a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSink.java b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSink.java index dede0aed..2a2c0523 100644 --- a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSink.java +++ b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSink.java @@ -35,7 +35,9 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; +import static org.apache.flink.util.InstantiationUtil.isSerializable; import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; /** * Write data to Flink. @@ -136,6 +138,8 @@ public FlinkPulsarSink build(){ if ((cryptoKeyReader == null) != (encryptionKeys.isEmpty())){ throw new IllegalStateException("Set crypto key reader and encryption keys in conjunction."); } + checkState(isSerializable(cryptoKeyReader)); + checkState(isSerializable(encryptionKeys)); return new FlinkPulsarSink<>(this); } diff --git a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSource.java b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSource.java index a1709d32..e02305eb 100644 --- a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSource.java +++ b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSource.java @@ -88,8 +88,11 @@ import static org.apache.flink.streaming.connectors.pulsar.internal.metrics.PulsarSourceMetrics.COMMITS_FAILED_METRICS_COUNTER; import static org.apache.flink.streaming.connectors.pulsar.internal.metrics.PulsarSourceMetrics.COMMITS_SUCCEEDED_METRICS_COUNTER; import static org.apache.flink.streaming.connectors.pulsar.internal.metrics.PulsarSourceMetrics.PULSAR_SOURCE_METRICS_GROUP; +import static org.apache.flink.util.InstantiationUtil.isSerializable; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + /** * Pulsar data source. @@ -168,6 +171,7 @@ public FlinkPulsarSource build(){ if (clientConf == null){ throw new IllegalStateException("Client conf mustn't be null. Either provide a client conf or a service URL plus properties."); } + checkState(isSerializable(cryptoKeyReader)); return new FlinkPulsarSource<>(this); }