diff --git a/fe/fe-core/src/main/java/org/apache/iceberg/MetadataParser.java b/fe/fe-core/src/main/java/org/apache/iceberg/MetadataParser.java index b460407acc2b0d..e2b1fe1c081712 100644 --- a/fe/fe-core/src/main/java/org/apache/iceberg/MetadataParser.java +++ b/fe/fe-core/src/main/java/org/apache/iceberg/MetadataParser.java @@ -20,6 +20,7 @@ import com.starrocks.connector.metadata.MetadataCollectJob; import com.starrocks.connector.share.iceberg.CommonMetadataBean; import com.starrocks.connector.share.iceberg.IcebergMetricsBean; +import com.starrocks.connector.share.iceberg.UnmodifiableCollectionsSerializer; import com.starrocks.qe.ConnectContext; import com.starrocks.qe.scheduler.Coordinator; import com.starrocks.rpc.ConfigurableSerDesFactory; diff --git a/java-extensions/hadoop-ext/pom.xml b/java-extensions/hadoop-ext/pom.xml index b70a73d9016717..aabd9c1ed74784 100644 --- a/java-extensions/hadoop-ext/pom.xml +++ b/java-extensions/hadoop-ext/pom.xml @@ -44,6 +44,12 @@ ${iceberg.version} provided + + + com.esotericsoftware + kryo-shaded + 4.0.2 + diff --git a/java-extensions/hadoop-ext/src/main/java/com/starrocks/connector/share/iceberg/JavaImmutableMapSerializer.java b/java-extensions/hadoop-ext/src/main/java/com/starrocks/connector/share/iceberg/JavaImmutableMapSerializer.java new file mode 100644 index 00000000000000..3cfe6e81323e66 --- /dev/null +++ b/java-extensions/hadoop-ext/src/main/java/com/starrocks/connector/share/iceberg/JavaImmutableMapSerializer.java @@ -0,0 +1,66 @@ +package com.starrocks.connector.share.iceberg; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.Serializer; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import java.io.Serializable; +import java.util.Map; + +@SuppressWarnings("rawtypes") +public class JavaImmutableMapSerializer extends Serializer> { + + @Override + public void write(Kryo kryo, Output output, Map map) { + ImmSerMapEntry[] entries = map + .entrySet() + .stream() + .map(ImmSerMapEntry::new) + .toArray(ImmSerMapEntry[]::new); + + kryo.writeObject(output, new ImmSerMap(entries)); + } + + @Override + public Map read(Kryo kryo, Input input, Class> type) { + return Map.ofEntries(kryo.readObject(input, ImmSerMap.class).getEntries()); + } + + + private static class ImmSerMap implements Serializable { + + private final ImmSerMapEntry[] array; + + private ImmSerMap(ImmSerMapEntry[] array) { + this.array = array; + } + + private Map.Entry[] getEntries() { + return array; + } + } + + private static class ImmSerMapEntry implements Map.Entry, Serializable { + + private final Object[] entry; + + private ImmSerMapEntry(Map.Entry entry) { + this.entry = new Object[] { entry.getKey(), entry.getValue() }; + } + + @Override + public K getKey() { + return (K) entry[0]; + } + + @Override + public V getValue() { + return (V) entry[1]; + } + + @Override + public V setValue(V value) { + throw new UnsupportedOperationException(); + } + } +} \ No newline at end of file diff --git a/fe/fe-core/src/main/java/org/apache/iceberg/UnmodifiableCollectionsSerializer.java b/java-extensions/hadoop-ext/src/main/java/com/starrocks/connector/share/iceberg/UnmodifiableCollectionsSerializer.java similarity index 99% rename from fe/fe-core/src/main/java/org/apache/iceberg/UnmodifiableCollectionsSerializer.java rename to java-extensions/hadoop-ext/src/main/java/com/starrocks/connector/share/iceberg/UnmodifiableCollectionsSerializer.java index 27a93ef7fffc90..3872ee0e95bf00 100644 --- a/fe/fe-core/src/main/java/org/apache/iceberg/UnmodifiableCollectionsSerializer.java +++ b/java-extensions/hadoop-ext/src/main/java/com/starrocks/connector/share/iceberg/UnmodifiableCollectionsSerializer.java @@ -11,7 +11,7 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. -package org.apache.iceberg; +package com.starrocks.connector.share.iceberg; import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.Serializer; diff --git a/java-extensions/iceberg-metadata-reader/pom.xml b/java-extensions/iceberg-metadata-reader/pom.xml index 2b0855ad1e2f5d..8fab2547f53c40 100644 --- a/java-extensions/iceberg-metadata-reader/pom.xml +++ b/java-extensions/iceberg-metadata-reader/pom.xml @@ -63,24 +63,6 @@ - - com.esotericsoftware - kryo-shaded - 4.0.2 - - - - de.javakaffee - kryo-serializers - 0.45 - - - com.esotericsoftware - kryo - - - - commons-lang commons-lang diff --git a/java-extensions/iceberg-metadata-reader/src/main/java/com/starrocks/connector/iceberg/IcebergMetadataScanner.java b/java-extensions/iceberg-metadata-reader/src/main/java/com/starrocks/connector/iceberg/IcebergMetadataScanner.java index 925960c394dbbd..77f23116e790bf 100644 --- a/java-extensions/iceberg-metadata-reader/src/main/java/com/starrocks/connector/iceberg/IcebergMetadataScanner.java +++ b/java-extensions/iceberg-metadata-reader/src/main/java/com/starrocks/connector/iceberg/IcebergMetadataScanner.java @@ -19,8 +19,9 @@ import com.google.common.collect.ImmutableList; import com.starrocks.connector.share.iceberg.CommonMetadataBean; import com.starrocks.connector.share.iceberg.IcebergMetricsBean; +import com.starrocks.connector.share.iceberg.JavaImmutableMapSerializer; +import com.starrocks.connector.share.iceberg.UnmodifiableCollectionsSerializer; import com.starrocks.jni.connector.ColumnValue; -import de.javakaffee.kryoserializers.UnmodifiableCollectionsSerializer; import org.apache.iceberg.ContentFile; import org.apache.iceberg.ManifestContent; import org.apache.iceberg.ManifestFile; @@ -165,6 +166,8 @@ private void initSerializer() { this.kryo = new Kryo(); this.kryo.register(CommonMetadataBean.class); this.kryo.register(IcebergMetricsBean.class); + this.kryo.register(Map.of().getClass(), new JavaImmutableMapSerializer()); + this.kryo.register(Map.of(1, 1).getClass(), new JavaImmutableMapSerializer()); UnmodifiableCollectionsSerializer.registerSerializers(kryo); this.stream = new ByteArrayOutputStream(); this.output = new Output(stream);