From 46366c0cfdd676df0ae90e28823471b3993cc55d Mon Sep 17 00:00:00 2001 From: zhaohehuhu Date: Mon, 13 Jan 2025 17:38:05 +0800 Subject: [PATCH 01/11] fix compilable issue Signed-off-by: zhaohehuhu --- .../iceberg-metadata-reader/pom.xml | 12 -- .../iceberg/IcebergMetadataScanner.java | 1 - .../UnmodifiableCollectionsSerializer.java | 173 ++++++++++++++++++ 3 files changed, 173 insertions(+), 13 deletions(-) create mode 100644 java-extensions/iceberg-metadata-reader/src/main/java/com/starrocks/connector/iceberg/UnmodifiableCollectionsSerializer.java diff --git a/java-extensions/iceberg-metadata-reader/pom.xml b/java-extensions/iceberg-metadata-reader/pom.xml index 2b0855ad1e2f5d..a4372aa31cc7ed 100644 --- a/java-extensions/iceberg-metadata-reader/pom.xml +++ b/java-extensions/iceberg-metadata-reader/pom.xml @@ -69,18 +69,6 @@ 4.0.2 - - de.javakaffee - kryo-serializers - 0.45 - - - com.esotericsoftware - kryo - - - - commons-lang commons-lang diff --git a/java-extensions/iceberg-metadata-reader/src/main/java/com/starrocks/connector/iceberg/IcebergMetadataScanner.java b/java-extensions/iceberg-metadata-reader/src/main/java/com/starrocks/connector/iceberg/IcebergMetadataScanner.java index 925960c394dbbd..612e04f3ead6a7 100644 --- a/java-extensions/iceberg-metadata-reader/src/main/java/com/starrocks/connector/iceberg/IcebergMetadataScanner.java +++ b/java-extensions/iceberg-metadata-reader/src/main/java/com/starrocks/connector/iceberg/IcebergMetadataScanner.java @@ -20,7 +20,6 @@ import com.starrocks.connector.share.iceberg.CommonMetadataBean; import com.starrocks.connector.share.iceberg.IcebergMetricsBean; import com.starrocks.jni.connector.ColumnValue; -import de.javakaffee.kryoserializers.UnmodifiableCollectionsSerializer; import org.apache.iceberg.ContentFile; import org.apache.iceberg.ManifestContent; import org.apache.iceberg.ManifestFile; diff --git a/java-extensions/iceberg-metadata-reader/src/main/java/com/starrocks/connector/iceberg/UnmodifiableCollectionsSerializer.java b/java-extensions/iceberg-metadata-reader/src/main/java/com/starrocks/connector/iceberg/UnmodifiableCollectionsSerializer.java new file mode 100644 index 00000000000000..27a93ef7fffc90 --- /dev/null +++ b/java-extensions/iceberg-metadata-reader/src/main/java/com/starrocks/connector/iceberg/UnmodifiableCollectionsSerializer.java @@ -0,0 +1,173 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package org.apache.iceberg; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.Serializer; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.SortedMap; +import java.util.SortedSet; +import java.util.TreeMap; +import java.util.TreeSet; + +/** + * copy from ... + * A kryo {@link Serializer} for unmodifiable {@link Collection}s and {@link Map}s created via + * {@link Collections}. + * + * @author Martin Grotzke + */ +public class UnmodifiableCollectionsSerializer extends Serializer { + + /** + * Creates a new {@link UnmodifiableCollectionsSerializer} and registers its serializer for the + * several unmodifiable Collections that can be created via {@link Collections}, including {@link + * Map}s. + * + * @param kryo the {@link Kryo} instance to set the serializer on. + * @see Collections#unmodifiableCollection(Collection) + * @see Collections#unmodifiableList(List) + * @see Collections#unmodifiableSet(Set) + * @see Collections#unmodifiableSortedSet(SortedSet) + * @see Collections#unmodifiableMap(Map) + * @see Collections#unmodifiableSortedMap(SortedMap) + */ + public static void registerSerializers(Kryo kryo) { + UnmodifiableCollectionsSerializer serializer = new UnmodifiableCollectionsSerializer(); + UnmodifiableCollection.values(); + for (UnmodifiableCollection item : UnmodifiableCollection.values()) { + kryo.register(item.type, serializer); + } + } + + @Override + public void write(Kryo kryo, Output output, Object object) { + try { + UnmodifiableCollection unmodifiableCollection = UnmodifiableCollection.valueOfType( + object.getClass() + ); + // the ordinal could be replaced by s.th. else (e.g. a explicitely managed "id") + output.writeInt(unmodifiableCollection.ordinal(), true); + kryo.writeClassAndObject(output, getValues(object)); + } catch (RuntimeException e) { + // Don't eat and wrap RuntimeExceptions because the ObjectBuffer.write... + // handles SerializationException specifically (resizing the buffer)... + throw e; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public Object read(Kryo kryo, Input input, Class clazz) { + int ordinal = input.readInt(true); + UnmodifiableCollection unmodifiableCollection = UnmodifiableCollection.values()[ordinal]; + Object sourceCollection = kryo.readClassAndObject(input); + return unmodifiableCollection.create(sourceCollection); + } + + @Override + public Object copy(Kryo kryo, Object original) { + try { + UnmodifiableCollection unmodifiableCollection = UnmodifiableCollection.valueOfType( + original.getClass() + ); + Object sourceCollectionCopy = kryo.copy(getValues(original)); + return unmodifiableCollection.create(sourceCollectionCopy); + } catch (RuntimeException e) { + // Don't eat and wrap RuntimeExceptions + throw e; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private Collection getValues(Object coll) { + return new ArrayList<>((Collection) coll); + } + + private enum UnmodifiableCollection { + COLLECTION(Collections.unmodifiableCollection(Arrays.asList("")).getClass()) { + @Override + public Object create(Object sourceCollection) { + return Collections.unmodifiableCollection((Collection) sourceCollection); + } + }, + RANDOM_ACCESS_LIST(Collections.unmodifiableList(new ArrayList()).getClass()) { + @Override + public Object create(Object sourceCollection) { + return Collections.unmodifiableList((List) sourceCollection); + } + }, + LIST(Collections.unmodifiableList(new LinkedList()).getClass()) { + @Override + public Object create(Object sourceCollection) { + return Collections.unmodifiableList((List) sourceCollection); + } + }, + SET(Collections.unmodifiableSet(new HashSet()).getClass()) { + @Override + public Object create(Object sourceCollection) { + return Collections.unmodifiableSet((Set) sourceCollection); + } + }, + SORTED_SET(Collections.unmodifiableSortedSet(new TreeSet()).getClass()) { + @Override + public Object create(Object sourceCollection) { + return Collections.unmodifiableSortedSet((SortedSet) sourceCollection); + } + }, + MAP(Collections.unmodifiableMap(new HashMap()).getClass()) { + @Override + public Object create(Object sourceCollection) { + return Collections.unmodifiableMap((Map) sourceCollection); + } + }, + SORTED_MAP(Collections.unmodifiableSortedMap(new TreeMap()).getClass()) { + @Override + public Object create(Object sourceCollection) { + return Collections.unmodifiableSortedMap((SortedMap) sourceCollection); + } + }; + + private final Class type; + + UnmodifiableCollection(Class type) { + this.type = type; + } + + public abstract Object create(Object sourceCollection); + + static UnmodifiableCollection valueOfType(Class type) { + for (UnmodifiableCollection item : values()) { + if (item.type.equals(type)) { + return item; + } + } + throw new IllegalArgumentException("The type " + type + " is not supported."); + } + } +} From b18278edd7f3bc1b31c694df2fb0cfa3188a657b Mon Sep 17 00:00:00 2001 From: zhaohehuhu Date: Wed, 15 Jan 2025 13:55:51 +0800 Subject: [PATCH 02/11] rename package Signed-off-by: zhaohehuhu --- .../connector/iceberg/UnmodifiableCollectionsSerializer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/java-extensions/iceberg-metadata-reader/src/main/java/com/starrocks/connector/iceberg/UnmodifiableCollectionsSerializer.java b/java-extensions/iceberg-metadata-reader/src/main/java/com/starrocks/connector/iceberg/UnmodifiableCollectionsSerializer.java index 27a93ef7fffc90..3855ec8a0b6a5b 100644 --- a/java-extensions/iceberg-metadata-reader/src/main/java/com/starrocks/connector/iceberg/UnmodifiableCollectionsSerializer.java +++ b/java-extensions/iceberg-metadata-reader/src/main/java/com/starrocks/connector/iceberg/UnmodifiableCollectionsSerializer.java @@ -11,7 +11,7 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. -package org.apache.iceberg; +package org.apache.connector.iceberg; import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.Serializer; From a82df8b0d2fd1ba13a2763f476f3e7b7b2ed4518 Mon Sep 17 00:00:00 2001 From: zhaohehuhu Date: Wed, 15 Jan 2025 15:52:03 +0800 Subject: [PATCH 03/11] add import Signed-off-by: zhaohehuhu --- .../com/starrocks/connector/iceberg/IcebergMetadataScanner.java | 1 + 1 file changed, 1 insertion(+) diff --git a/java-extensions/iceberg-metadata-reader/src/main/java/com/starrocks/connector/iceberg/IcebergMetadataScanner.java b/java-extensions/iceberg-metadata-reader/src/main/java/com/starrocks/connector/iceberg/IcebergMetadataScanner.java index 612e04f3ead6a7..ad5430d35ed802 100644 --- a/java-extensions/iceberg-metadata-reader/src/main/java/com/starrocks/connector/iceberg/IcebergMetadataScanner.java +++ b/java-extensions/iceberg-metadata-reader/src/main/java/com/starrocks/connector/iceberg/IcebergMetadataScanner.java @@ -19,6 +19,7 @@ import com.google.common.collect.ImmutableList; import com.starrocks.connector.share.iceberg.CommonMetadataBean; import com.starrocks.connector.share.iceberg.IcebergMetricsBean; +import org.apache.connector.iceberg.UnmodifiableCollectionsSerializer; import com.starrocks.jni.connector.ColumnValue; import org.apache.iceberg.ContentFile; import org.apache.iceberg.ManifestContent; From 2b5c9fd8cc45ad8ed1b3e9301e05ee78dda64eaf Mon Sep 17 00:00:00 2001 From: zhaohehuhu Date: Wed, 15 Jan 2025 17:19:35 +0800 Subject: [PATCH 04/11] add import Signed-off-by: zhaohehuhu --- .../connector/iceberg/UnmodifiableCollectionsSerializer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/java-extensions/iceberg-metadata-reader/src/main/java/com/starrocks/connector/iceberg/UnmodifiableCollectionsSerializer.java b/java-extensions/iceberg-metadata-reader/src/main/java/com/starrocks/connector/iceberg/UnmodifiableCollectionsSerializer.java index 3855ec8a0b6a5b..c24255bd0038dd 100644 --- a/java-extensions/iceberg-metadata-reader/src/main/java/com/starrocks/connector/iceberg/UnmodifiableCollectionsSerializer.java +++ b/java-extensions/iceberg-metadata-reader/src/main/java/com/starrocks/connector/iceberg/UnmodifiableCollectionsSerializer.java @@ -11,7 +11,7 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. -package org.apache.connector.iceberg; +package com.starrocks.connector.iceberg; import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.Serializer; From 5266a3e29d83d58925c72e0e8ac8d39e72b1b570 Mon Sep 17 00:00:00 2001 From: zhaohehuhu Date: Wed, 15 Jan 2025 17:49:00 +0800 Subject: [PATCH 05/11] delete import Signed-off-by: zhaohehuhu --- .../com/starrocks/connector/iceberg/IcebergMetadataScanner.java | 1 - 1 file changed, 1 deletion(-) diff --git a/java-extensions/iceberg-metadata-reader/src/main/java/com/starrocks/connector/iceberg/IcebergMetadataScanner.java b/java-extensions/iceberg-metadata-reader/src/main/java/com/starrocks/connector/iceberg/IcebergMetadataScanner.java index ad5430d35ed802..612e04f3ead6a7 100644 --- a/java-extensions/iceberg-metadata-reader/src/main/java/com/starrocks/connector/iceberg/IcebergMetadataScanner.java +++ b/java-extensions/iceberg-metadata-reader/src/main/java/com/starrocks/connector/iceberg/IcebergMetadataScanner.java @@ -19,7 +19,6 @@ import com.google.common.collect.ImmutableList; import com.starrocks.connector.share.iceberg.CommonMetadataBean; import com.starrocks.connector.share.iceberg.IcebergMetricsBean; -import org.apache.connector.iceberg.UnmodifiableCollectionsSerializer; import com.starrocks.jni.connector.ColumnValue; import org.apache.iceberg.ContentFile; import org.apache.iceberg.ManifestContent; From b4bb04ac2ad4f9d350175a50f428dbe024c5a09a Mon Sep 17 00:00:00 2001 From: zhaohehuhu Date: Fri, 17 Jan 2025 12:31:32 +0800 Subject: [PATCH 06/11] refactor Signed-off-by: zhaohehuhu --- .../iceberg/IcebergMetadataScanner.java | 1 + .../UnmodifiableCollectionsSerializer.java | 173 ------------------ 2 files changed, 1 insertion(+), 173 deletions(-) delete mode 100644 java-extensions/iceberg-metadata-reader/src/main/java/com/starrocks/connector/iceberg/UnmodifiableCollectionsSerializer.java diff --git a/java-extensions/iceberg-metadata-reader/src/main/java/com/starrocks/connector/iceberg/IcebergMetadataScanner.java b/java-extensions/iceberg-metadata-reader/src/main/java/com/starrocks/connector/iceberg/IcebergMetadataScanner.java index 612e04f3ead6a7..7853d861a4d605 100644 --- a/java-extensions/iceberg-metadata-reader/src/main/java/com/starrocks/connector/iceberg/IcebergMetadataScanner.java +++ b/java-extensions/iceberg-metadata-reader/src/main/java/com/starrocks/connector/iceberg/IcebergMetadataScanner.java @@ -19,6 +19,7 @@ import com.google.common.collect.ImmutableList; import com.starrocks.connector.share.iceberg.CommonMetadataBean; import com.starrocks.connector.share.iceberg.IcebergMetricsBean; +import com.starrocks.connector.share.iceberg.UnmodifiableCollectionsSerializer; import com.starrocks.jni.connector.ColumnValue; import org.apache.iceberg.ContentFile; import org.apache.iceberg.ManifestContent; diff --git a/java-extensions/iceberg-metadata-reader/src/main/java/com/starrocks/connector/iceberg/UnmodifiableCollectionsSerializer.java b/java-extensions/iceberg-metadata-reader/src/main/java/com/starrocks/connector/iceberg/UnmodifiableCollectionsSerializer.java deleted file mode 100644 index c24255bd0038dd..00000000000000 --- a/java-extensions/iceberg-metadata-reader/src/main/java/com/starrocks/connector/iceberg/UnmodifiableCollectionsSerializer.java +++ /dev/null @@ -1,173 +0,0 @@ -// Copyright 2021-present StarRocks, Inc. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -package com.starrocks.connector.iceberg; - -import com.esotericsoftware.kryo.Kryo; -import com.esotericsoftware.kryo.Serializer; -import com.esotericsoftware.kryo.io.Input; -import com.esotericsoftware.kryo.io.Output; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.SortedMap; -import java.util.SortedSet; -import java.util.TreeMap; -import java.util.TreeSet; - -/** - * copy from ... - * A kryo {@link Serializer} for unmodifiable {@link Collection}s and {@link Map}s created via - * {@link Collections}. - * - * @author Martin Grotzke - */ -public class UnmodifiableCollectionsSerializer extends Serializer { - - /** - * Creates a new {@link UnmodifiableCollectionsSerializer} and registers its serializer for the - * several unmodifiable Collections that can be created via {@link Collections}, including {@link - * Map}s. - * - * @param kryo the {@link Kryo} instance to set the serializer on. - * @see Collections#unmodifiableCollection(Collection) - * @see Collections#unmodifiableList(List) - * @see Collections#unmodifiableSet(Set) - * @see Collections#unmodifiableSortedSet(SortedSet) - * @see Collections#unmodifiableMap(Map) - * @see Collections#unmodifiableSortedMap(SortedMap) - */ - public static void registerSerializers(Kryo kryo) { - UnmodifiableCollectionsSerializer serializer = new UnmodifiableCollectionsSerializer(); - UnmodifiableCollection.values(); - for (UnmodifiableCollection item : UnmodifiableCollection.values()) { - kryo.register(item.type, serializer); - } - } - - @Override - public void write(Kryo kryo, Output output, Object object) { - try { - UnmodifiableCollection unmodifiableCollection = UnmodifiableCollection.valueOfType( - object.getClass() - ); - // the ordinal could be replaced by s.th. else (e.g. a explicitely managed "id") - output.writeInt(unmodifiableCollection.ordinal(), true); - kryo.writeClassAndObject(output, getValues(object)); - } catch (RuntimeException e) { - // Don't eat and wrap RuntimeExceptions because the ObjectBuffer.write... - // handles SerializationException specifically (resizing the buffer)... - throw e; - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - @Override - public Object read(Kryo kryo, Input input, Class clazz) { - int ordinal = input.readInt(true); - UnmodifiableCollection unmodifiableCollection = UnmodifiableCollection.values()[ordinal]; - Object sourceCollection = kryo.readClassAndObject(input); - return unmodifiableCollection.create(sourceCollection); - } - - @Override - public Object copy(Kryo kryo, Object original) { - try { - UnmodifiableCollection unmodifiableCollection = UnmodifiableCollection.valueOfType( - original.getClass() - ); - Object sourceCollectionCopy = kryo.copy(getValues(original)); - return unmodifiableCollection.create(sourceCollectionCopy); - } catch (RuntimeException e) { - // Don't eat and wrap RuntimeExceptions - throw e; - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - private Collection getValues(Object coll) { - return new ArrayList<>((Collection) coll); - } - - private enum UnmodifiableCollection { - COLLECTION(Collections.unmodifiableCollection(Arrays.asList("")).getClass()) { - @Override - public Object create(Object sourceCollection) { - return Collections.unmodifiableCollection((Collection) sourceCollection); - } - }, - RANDOM_ACCESS_LIST(Collections.unmodifiableList(new ArrayList()).getClass()) { - @Override - public Object create(Object sourceCollection) { - return Collections.unmodifiableList((List) sourceCollection); - } - }, - LIST(Collections.unmodifiableList(new LinkedList()).getClass()) { - @Override - public Object create(Object sourceCollection) { - return Collections.unmodifiableList((List) sourceCollection); - } - }, - SET(Collections.unmodifiableSet(new HashSet()).getClass()) { - @Override - public Object create(Object sourceCollection) { - return Collections.unmodifiableSet((Set) sourceCollection); - } - }, - SORTED_SET(Collections.unmodifiableSortedSet(new TreeSet()).getClass()) { - @Override - public Object create(Object sourceCollection) { - return Collections.unmodifiableSortedSet((SortedSet) sourceCollection); - } - }, - MAP(Collections.unmodifiableMap(new HashMap()).getClass()) { - @Override - public Object create(Object sourceCollection) { - return Collections.unmodifiableMap((Map) sourceCollection); - } - }, - SORTED_MAP(Collections.unmodifiableSortedMap(new TreeMap()).getClass()) { - @Override - public Object create(Object sourceCollection) { - return Collections.unmodifiableSortedMap((SortedMap) sourceCollection); - } - }; - - private final Class type; - - UnmodifiableCollection(Class type) { - this.type = type; - } - - public abstract Object create(Object sourceCollection); - - static UnmodifiableCollection valueOfType(Class type) { - for (UnmodifiableCollection item : values()) { - if (item.type.equals(type)) { - return item; - } - } - throw new IllegalArgumentException("The type " + type + " is not supported."); - } - } -} From 12a641ae1ff9e63c3dcb7ca7562f9caf343f035c Mon Sep 17 00:00:00 2001 From: zhaohehuhu Date: Fri, 17 Jan 2025 13:42:56 +0800 Subject: [PATCH 07/11] refactor Signed-off-by: zhaohehuhu --- .../org/apache/iceberg/MetadataParser.java | 1 + .../UnmodifiableCollectionsSerializer.java | 173 ------------------ 2 files changed, 1 insertion(+), 173 deletions(-) delete mode 100644 fe/fe-core/src/main/java/org/apache/iceberg/UnmodifiableCollectionsSerializer.java diff --git a/fe/fe-core/src/main/java/org/apache/iceberg/MetadataParser.java b/fe/fe-core/src/main/java/org/apache/iceberg/MetadataParser.java index b460407acc2b0d..e2b1fe1c081712 100644 --- a/fe/fe-core/src/main/java/org/apache/iceberg/MetadataParser.java +++ b/fe/fe-core/src/main/java/org/apache/iceberg/MetadataParser.java @@ -20,6 +20,7 @@ import com.starrocks.connector.metadata.MetadataCollectJob; import com.starrocks.connector.share.iceberg.CommonMetadataBean; import com.starrocks.connector.share.iceberg.IcebergMetricsBean; +import com.starrocks.connector.share.iceberg.UnmodifiableCollectionsSerializer; import com.starrocks.qe.ConnectContext; import com.starrocks.qe.scheduler.Coordinator; import com.starrocks.rpc.ConfigurableSerDesFactory; diff --git a/fe/fe-core/src/main/java/org/apache/iceberg/UnmodifiableCollectionsSerializer.java b/fe/fe-core/src/main/java/org/apache/iceberg/UnmodifiableCollectionsSerializer.java deleted file mode 100644 index 27a93ef7fffc90..00000000000000 --- a/fe/fe-core/src/main/java/org/apache/iceberg/UnmodifiableCollectionsSerializer.java +++ /dev/null @@ -1,173 +0,0 @@ -// Copyright 2021-present StarRocks, Inc. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -package org.apache.iceberg; - -import com.esotericsoftware.kryo.Kryo; -import com.esotericsoftware.kryo.Serializer; -import com.esotericsoftware.kryo.io.Input; -import com.esotericsoftware.kryo.io.Output; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.SortedMap; -import java.util.SortedSet; -import java.util.TreeMap; -import java.util.TreeSet; - -/** - * copy from ... - * A kryo {@link Serializer} for unmodifiable {@link Collection}s and {@link Map}s created via - * {@link Collections}. - * - * @author Martin Grotzke - */ -public class UnmodifiableCollectionsSerializer extends Serializer { - - /** - * Creates a new {@link UnmodifiableCollectionsSerializer} and registers its serializer for the - * several unmodifiable Collections that can be created via {@link Collections}, including {@link - * Map}s. - * - * @param kryo the {@link Kryo} instance to set the serializer on. - * @see Collections#unmodifiableCollection(Collection) - * @see Collections#unmodifiableList(List) - * @see Collections#unmodifiableSet(Set) - * @see Collections#unmodifiableSortedSet(SortedSet) - * @see Collections#unmodifiableMap(Map) - * @see Collections#unmodifiableSortedMap(SortedMap) - */ - public static void registerSerializers(Kryo kryo) { - UnmodifiableCollectionsSerializer serializer = new UnmodifiableCollectionsSerializer(); - UnmodifiableCollection.values(); - for (UnmodifiableCollection item : UnmodifiableCollection.values()) { - kryo.register(item.type, serializer); - } - } - - @Override - public void write(Kryo kryo, Output output, Object object) { - try { - UnmodifiableCollection unmodifiableCollection = UnmodifiableCollection.valueOfType( - object.getClass() - ); - // the ordinal could be replaced by s.th. else (e.g. a explicitely managed "id") - output.writeInt(unmodifiableCollection.ordinal(), true); - kryo.writeClassAndObject(output, getValues(object)); - } catch (RuntimeException e) { - // Don't eat and wrap RuntimeExceptions because the ObjectBuffer.write... - // handles SerializationException specifically (resizing the buffer)... - throw e; - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - @Override - public Object read(Kryo kryo, Input input, Class clazz) { - int ordinal = input.readInt(true); - UnmodifiableCollection unmodifiableCollection = UnmodifiableCollection.values()[ordinal]; - Object sourceCollection = kryo.readClassAndObject(input); - return unmodifiableCollection.create(sourceCollection); - } - - @Override - public Object copy(Kryo kryo, Object original) { - try { - UnmodifiableCollection unmodifiableCollection = UnmodifiableCollection.valueOfType( - original.getClass() - ); - Object sourceCollectionCopy = kryo.copy(getValues(original)); - return unmodifiableCollection.create(sourceCollectionCopy); - } catch (RuntimeException e) { - // Don't eat and wrap RuntimeExceptions - throw e; - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - private Collection getValues(Object coll) { - return new ArrayList<>((Collection) coll); - } - - private enum UnmodifiableCollection { - COLLECTION(Collections.unmodifiableCollection(Arrays.asList("")).getClass()) { - @Override - public Object create(Object sourceCollection) { - return Collections.unmodifiableCollection((Collection) sourceCollection); - } - }, - RANDOM_ACCESS_LIST(Collections.unmodifiableList(new ArrayList()).getClass()) { - @Override - public Object create(Object sourceCollection) { - return Collections.unmodifiableList((List) sourceCollection); - } - }, - LIST(Collections.unmodifiableList(new LinkedList()).getClass()) { - @Override - public Object create(Object sourceCollection) { - return Collections.unmodifiableList((List) sourceCollection); - } - }, - SET(Collections.unmodifiableSet(new HashSet()).getClass()) { - @Override - public Object create(Object sourceCollection) { - return Collections.unmodifiableSet((Set) sourceCollection); - } - }, - SORTED_SET(Collections.unmodifiableSortedSet(new TreeSet()).getClass()) { - @Override - public Object create(Object sourceCollection) { - return Collections.unmodifiableSortedSet((SortedSet) sourceCollection); - } - }, - MAP(Collections.unmodifiableMap(new HashMap()).getClass()) { - @Override - public Object create(Object sourceCollection) { - return Collections.unmodifiableMap((Map) sourceCollection); - } - }, - SORTED_MAP(Collections.unmodifiableSortedMap(new TreeMap()).getClass()) { - @Override - public Object create(Object sourceCollection) { - return Collections.unmodifiableSortedMap((SortedMap) sourceCollection); - } - }; - - private final Class type; - - UnmodifiableCollection(Class type) { - this.type = type; - } - - public abstract Object create(Object sourceCollection); - - static UnmodifiableCollection valueOfType(Class type) { - for (UnmodifiableCollection item : values()) { - if (item.type.equals(type)) { - return item; - } - } - throw new IllegalArgumentException("The type " + type + " is not supported."); - } - } -} From 627a0575556c0496a15422616d1fef7e6877956d Mon Sep 17 00:00:00 2001 From: zhaohehuhu Date: Fri, 17 Jan 2025 14:11:02 +0800 Subject: [PATCH 08/11] refactor Signed-off-by: zhaohehuhu --- .../UnmodifiableCollectionsSerializer.java | 173 ++++++++++++++++++ 1 file changed, 173 insertions(+) create mode 100644 java-extensions/hadoop-ext/src/main/java/com/starrocks/connector/share/iceberg/UnmodifiableCollectionsSerializer.java diff --git a/java-extensions/hadoop-ext/src/main/java/com/starrocks/connector/share/iceberg/UnmodifiableCollectionsSerializer.java b/java-extensions/hadoop-ext/src/main/java/com/starrocks/connector/share/iceberg/UnmodifiableCollectionsSerializer.java new file mode 100644 index 00000000000000..3872ee0e95bf00 --- /dev/null +++ b/java-extensions/hadoop-ext/src/main/java/com/starrocks/connector/share/iceberg/UnmodifiableCollectionsSerializer.java @@ -0,0 +1,173 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.starrocks.connector.share.iceberg; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.Serializer; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.SortedMap; +import java.util.SortedSet; +import java.util.TreeMap; +import java.util.TreeSet; + +/** + * copy from ... + * A kryo {@link Serializer} for unmodifiable {@link Collection}s and {@link Map}s created via + * {@link Collections}. + * + * @author Martin Grotzke + */ +public class UnmodifiableCollectionsSerializer extends Serializer { + + /** + * Creates a new {@link UnmodifiableCollectionsSerializer} and registers its serializer for the + * several unmodifiable Collections that can be created via {@link Collections}, including {@link + * Map}s. + * + * @param kryo the {@link Kryo} instance to set the serializer on. + * @see Collections#unmodifiableCollection(Collection) + * @see Collections#unmodifiableList(List) + * @see Collections#unmodifiableSet(Set) + * @see Collections#unmodifiableSortedSet(SortedSet) + * @see Collections#unmodifiableMap(Map) + * @see Collections#unmodifiableSortedMap(SortedMap) + */ + public static void registerSerializers(Kryo kryo) { + UnmodifiableCollectionsSerializer serializer = new UnmodifiableCollectionsSerializer(); + UnmodifiableCollection.values(); + for (UnmodifiableCollection item : UnmodifiableCollection.values()) { + kryo.register(item.type, serializer); + } + } + + @Override + public void write(Kryo kryo, Output output, Object object) { + try { + UnmodifiableCollection unmodifiableCollection = UnmodifiableCollection.valueOfType( + object.getClass() + ); + // the ordinal could be replaced by s.th. else (e.g. a explicitely managed "id") + output.writeInt(unmodifiableCollection.ordinal(), true); + kryo.writeClassAndObject(output, getValues(object)); + } catch (RuntimeException e) { + // Don't eat and wrap RuntimeExceptions because the ObjectBuffer.write... + // handles SerializationException specifically (resizing the buffer)... + throw e; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public Object read(Kryo kryo, Input input, Class clazz) { + int ordinal = input.readInt(true); + UnmodifiableCollection unmodifiableCollection = UnmodifiableCollection.values()[ordinal]; + Object sourceCollection = kryo.readClassAndObject(input); + return unmodifiableCollection.create(sourceCollection); + } + + @Override + public Object copy(Kryo kryo, Object original) { + try { + UnmodifiableCollection unmodifiableCollection = UnmodifiableCollection.valueOfType( + original.getClass() + ); + Object sourceCollectionCopy = kryo.copy(getValues(original)); + return unmodifiableCollection.create(sourceCollectionCopy); + } catch (RuntimeException e) { + // Don't eat and wrap RuntimeExceptions + throw e; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private Collection getValues(Object coll) { + return new ArrayList<>((Collection) coll); + } + + private enum UnmodifiableCollection { + COLLECTION(Collections.unmodifiableCollection(Arrays.asList("")).getClass()) { + @Override + public Object create(Object sourceCollection) { + return Collections.unmodifiableCollection((Collection) sourceCollection); + } + }, + RANDOM_ACCESS_LIST(Collections.unmodifiableList(new ArrayList()).getClass()) { + @Override + public Object create(Object sourceCollection) { + return Collections.unmodifiableList((List) sourceCollection); + } + }, + LIST(Collections.unmodifiableList(new LinkedList()).getClass()) { + @Override + public Object create(Object sourceCollection) { + return Collections.unmodifiableList((List) sourceCollection); + } + }, + SET(Collections.unmodifiableSet(new HashSet()).getClass()) { + @Override + public Object create(Object sourceCollection) { + return Collections.unmodifiableSet((Set) sourceCollection); + } + }, + SORTED_SET(Collections.unmodifiableSortedSet(new TreeSet()).getClass()) { + @Override + public Object create(Object sourceCollection) { + return Collections.unmodifiableSortedSet((SortedSet) sourceCollection); + } + }, + MAP(Collections.unmodifiableMap(new HashMap()).getClass()) { + @Override + public Object create(Object sourceCollection) { + return Collections.unmodifiableMap((Map) sourceCollection); + } + }, + SORTED_MAP(Collections.unmodifiableSortedMap(new TreeMap()).getClass()) { + @Override + public Object create(Object sourceCollection) { + return Collections.unmodifiableSortedMap((SortedMap) sourceCollection); + } + }; + + private final Class type; + + UnmodifiableCollection(Class type) { + this.type = type; + } + + public abstract Object create(Object sourceCollection); + + static UnmodifiableCollection valueOfType(Class type) { + for (UnmodifiableCollection item : values()) { + if (item.type.equals(type)) { + return item; + } + } + throw new IllegalArgumentException("The type " + type + " is not supported."); + } + } +} From 07359ebfed54037925cd9d1817a425c796d1783b Mon Sep 17 00:00:00 2001 From: zhaohehuhu Date: Fri, 17 Jan 2025 15:58:24 +0800 Subject: [PATCH 09/11] fix pom Signed-off-by: zhaohehuhu --- java-extensions/hadoop-ext/pom.xml | 6 ++++++ java-extensions/iceberg-metadata-reader/pom.xml | 6 ------ 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/java-extensions/hadoop-ext/pom.xml b/java-extensions/hadoop-ext/pom.xml index b70a73d9016717..aabd9c1ed74784 100644 --- a/java-extensions/hadoop-ext/pom.xml +++ b/java-extensions/hadoop-ext/pom.xml @@ -44,6 +44,12 @@ ${iceberg.version} provided + + + com.esotericsoftware + kryo-shaded + 4.0.2 + diff --git a/java-extensions/iceberg-metadata-reader/pom.xml b/java-extensions/iceberg-metadata-reader/pom.xml index a4372aa31cc7ed..8fab2547f53c40 100644 --- a/java-extensions/iceberg-metadata-reader/pom.xml +++ b/java-extensions/iceberg-metadata-reader/pom.xml @@ -63,12 +63,6 @@ - - com.esotericsoftware - kryo-shaded - 4.0.2 - - commons-lang commons-lang From 86ce2767d4ad1d40c2eedb61b9efc1c66acf7df3 Mon Sep 17 00:00:00 2001 From: zhaohehuhu Date: Wed, 22 Jan 2025 15:22:53 +0800 Subject: [PATCH 10/11] add JavaImmutableMapSerializer Signed-off-by: zhaohehuhu --- .../iceberg/JavaImmutableMapSerializer.java | 66 +++++++++++++++++++ .../iceberg/IcebergMetadataScanner.java | 3 + 2 files changed, 69 insertions(+) create mode 100644 java-extensions/hadoop-ext/src/main/java/com/starrocks/connector/share/iceberg/JavaImmutableMapSerializer.java diff --git a/java-extensions/hadoop-ext/src/main/java/com/starrocks/connector/share/iceberg/JavaImmutableMapSerializer.java b/java-extensions/hadoop-ext/src/main/java/com/starrocks/connector/share/iceberg/JavaImmutableMapSerializer.java new file mode 100644 index 00000000000000..77231902f57f92 --- /dev/null +++ b/java-extensions/hadoop-ext/src/main/java/com/starrocks/connector/share/iceberg/JavaImmutableMapSerializer.java @@ -0,0 +1,66 @@ +package com.starrocks.connector.share.iceberg; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.Serializer; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import java.io.Serializable; +import java.util.Map; + +@SuppressWarnings("rawtypes") +public class JavaImmutableMapSerializer extends Serializer> { + + @Override + public void write(Kryo kryo, Output output, Map map) { + ImmSerMapEntry[] entries = map + .entrySet() + .stream() + .map(ImmSerMapEntry::new) + .toArray(ImmSerMapEntry[]::new); + + kryo.writeObject(output, new ImmSerMap(entries)); + } + + @Override + public Map read(Kryo kryo, Input input, Class> type) { + return Map.ofEntries(kryo.readObject(input, ImmSerMap.class).getEntries()); + } + + + private static class ImmSerMap implements Serializable { + + private final ImmSerMapEntry[] array; + + private ImmSerMap(ImmSerMapEntry[] array) { + this.array = array; + } + + private Map.Entry[] getEntries() { + return array; + } + } + + private static class ImmSerMapEntry implements Map.Entry, Serializable { + + private final Object[] entry; + + private ImmSerMapEntry(Map.Entry entry) { + this.entry = new Object[] { entry.getKey(), entry.getValue() }; + } + + @Override + public K getKey() { + return (K) entry[0]; + } + + @Override + public V getValue() { + return (V) entry[1]; + } + + @Override + public V setValue(V value) { + throw new UnsupportedOperationException(); + } + } +} \ No newline at end of file diff --git a/java-extensions/iceberg-metadata-reader/src/main/java/com/starrocks/connector/iceberg/IcebergMetadataScanner.java b/java-extensions/iceberg-metadata-reader/src/main/java/com/starrocks/connector/iceberg/IcebergMetadataScanner.java index 7853d861a4d605..77f23116e790bf 100644 --- a/java-extensions/iceberg-metadata-reader/src/main/java/com/starrocks/connector/iceberg/IcebergMetadataScanner.java +++ b/java-extensions/iceberg-metadata-reader/src/main/java/com/starrocks/connector/iceberg/IcebergMetadataScanner.java @@ -19,6 +19,7 @@ import com.google.common.collect.ImmutableList; import com.starrocks.connector.share.iceberg.CommonMetadataBean; import com.starrocks.connector.share.iceberg.IcebergMetricsBean; +import com.starrocks.connector.share.iceberg.JavaImmutableMapSerializer; import com.starrocks.connector.share.iceberg.UnmodifiableCollectionsSerializer; import com.starrocks.jni.connector.ColumnValue; import org.apache.iceberg.ContentFile; @@ -165,6 +166,8 @@ private void initSerializer() { this.kryo = new Kryo(); this.kryo.register(CommonMetadataBean.class); this.kryo.register(IcebergMetricsBean.class); + this.kryo.register(Map.of().getClass(), new JavaImmutableMapSerializer()); + this.kryo.register(Map.of(1, 1).getClass(), new JavaImmutableMapSerializer()); UnmodifiableCollectionsSerializer.registerSerializers(kryo); this.stream = new ByteArrayOutputStream(); this.output = new Output(stream); From 5d35a7e032c2a29b411bc077f2a675997c866591 Mon Sep 17 00:00:00 2001 From: zhaohehuhu Date: Wed, 22 Jan 2025 16:24:29 +0800 Subject: [PATCH 11/11] reformat Signed-off-by: zhaohehuhu --- .../iceberg/JavaImmutableMapSerializer.java | 78 +++++++++---------- 1 file changed, 39 insertions(+), 39 deletions(-) diff --git a/java-extensions/hadoop-ext/src/main/java/com/starrocks/connector/share/iceberg/JavaImmutableMapSerializer.java b/java-extensions/hadoop-ext/src/main/java/com/starrocks/connector/share/iceberg/JavaImmutableMapSerializer.java index 77231902f57f92..3cfe6e81323e66 100644 --- a/java-extensions/hadoop-ext/src/main/java/com/starrocks/connector/share/iceberg/JavaImmutableMapSerializer.java +++ b/java-extensions/hadoop-ext/src/main/java/com/starrocks/connector/share/iceberg/JavaImmutableMapSerializer.java @@ -10,57 +10,57 @@ @SuppressWarnings("rawtypes") public class JavaImmutableMapSerializer extends Serializer> { - @Override - public void write(Kryo kryo, Output output, Map map) { - ImmSerMapEntry[] entries = map - .entrySet() - .stream() - .map(ImmSerMapEntry::new) - .toArray(ImmSerMapEntry[]::new); - - kryo.writeObject(output, new ImmSerMap(entries)); - } + @Override + public void write(Kryo kryo, Output output, Map map) { + ImmSerMapEntry[] entries = map + .entrySet() + .stream() + .map(ImmSerMapEntry::new) + .toArray(ImmSerMapEntry[]::new); + + kryo.writeObject(output, new ImmSerMap(entries)); + } - @Override - public Map read(Kryo kryo, Input input, Class> type) { - return Map.ofEntries(kryo.readObject(input, ImmSerMap.class).getEntries()); - } + @Override + public Map read(Kryo kryo, Input input, Class> type) { + return Map.ofEntries(kryo.readObject(input, ImmSerMap.class).getEntries()); + } - private static class ImmSerMap implements Serializable { + private static class ImmSerMap implements Serializable { - private final ImmSerMapEntry[] array; + private final ImmSerMapEntry[] array; - private ImmSerMap(ImmSerMapEntry[] array) { - this.array = array; - } + private ImmSerMap(ImmSerMapEntry[] array) { + this.array = array; + } - private Map.Entry[] getEntries() { - return array; + private Map.Entry[] getEntries() { + return array; + } } - } - private static class ImmSerMapEntry implements Map.Entry, Serializable { + private static class ImmSerMapEntry implements Map.Entry, Serializable { - private final Object[] entry; + private final Object[] entry; - private ImmSerMapEntry(Map.Entry entry) { - this.entry = new Object[] { entry.getKey(), entry.getValue() }; - } + private ImmSerMapEntry(Map.Entry entry) { + this.entry = new Object[] { entry.getKey(), entry.getValue() }; + } - @Override - public K getKey() { - return (K) entry[0]; - } + @Override + public K getKey() { + return (K) entry[0]; + } - @Override - public V getValue() { - return (V) entry[1]; - } + @Override + public V getValue() { + return (V) entry[1]; + } - @Override - public V setValue(V value) { - throw new UnsupportedOperationException(); + @Override + public V setValue(V value) { + throw new UnsupportedOperationException(); + } } - } } \ No newline at end of file