diff --git a/paimon-common/src/main/java/org/apache/paimon/factories/FormatFactoryUtil.java b/paimon-common/src/main/java/org/apache/paimon/factories/FormatFactoryUtil.java deleted file mode 100644 index 083b775461aa..000000000000 --- a/paimon-common/src/main/java/org/apache/paimon/factories/FormatFactoryUtil.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.factories; - -import org.apache.paimon.format.FileFormatFactory; - -import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache; -import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine; - -import java.util.List; -import java.util.stream.Collectors; - -import static org.apache.paimon.factories.FactoryUtil.discoverFactories; - -/** Utility for working with {@link FileFormatFactory}s. */ -public class FormatFactoryUtil { - - private static final Cache> FACTORIES = - Caffeine.newBuilder().softValues().maximumSize(100).executor(Runnable::run).build(); - - /** Discovers a file format factory. */ - @SuppressWarnings("unchecked") - public static T discoverFactory( - ClassLoader classLoader, String identifier) { - final List foundFactories = getFactories(classLoader); - - final List matchingFactories = - foundFactories.stream() - .filter(f -> f.identifier().equals(identifier)) - .collect(Collectors.toList()); - - if (matchingFactories.isEmpty()) { - throw new FactoryException( - String.format( - "Could not find any factory for identifier '%s' that implements FileFormatFactory in the classpath.\n\n" - + "Available factory identifiers are:\n\n" - + "%s", - identifier, - foundFactories.stream() - .map(FileFormatFactory::identifier) - .collect(Collectors.joining("\n")))); - } - - return (T) matchingFactories.get(0); - } - - private static List getFactories(ClassLoader classLoader) { - return FACTORIES.get( - classLoader, s -> discoverFactories(classLoader, FileFormatFactory.class)); - } -} diff --git a/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java b/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java index e1391e7f5396..9ddb58257324 100644 --- a/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java +++ b/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java @@ -19,7 +19,7 @@ package org.apache.paimon.format; import org.apache.paimon.CoreOptions; -import org.apache.paimon.factories.FormatFactoryUtil; +import org.apache.paimon.factories.FactoryUtil; import org.apache.paimon.format.FileFormatFactory.FormatContext; import org.apache.paimon.options.Options; import org.apache.paimon.predicate.Predicate; @@ -88,9 +88,15 @@ public static FileFormat fromIdentifier(String identifier, Options options) { /** Create a {@link FileFormat} from format identifier and format options. */ public static FileFormat fromIdentifier(String identifier, FormatContext context) { - return FormatFactoryUtil.discoverFactory( - FileFormat.class.getClassLoader(), identifier.toLowerCase()) - .create(context); + if (identifier != null) { + identifier = identifier.toLowerCase(); + } + FileFormatFactory fileFormatFactory = + FactoryUtil.discoverFactory( + FileFormatFactory.class.getClassLoader(), + FileFormatFactory.class, + identifier); + return fileFormatFactory.create(context); } protected Options getIdentifierPrefixOptions(Options options) { diff --git a/paimon-common/src/main/java/org/apache/paimon/format/FileFormatFactory.java b/paimon-common/src/main/java/org/apache/paimon/format/FileFormatFactory.java index b726a84f24a2..d377cb2815c2 100644 --- a/paimon-common/src/main/java/org/apache/paimon/format/FileFormatFactory.java +++ b/paimon-common/src/main/java/org/apache/paimon/format/FileFormatFactory.java @@ -19,13 +19,14 @@ package org.apache.paimon.format; import org.apache.paimon.annotation.VisibleForTesting; +import org.apache.paimon.factories.Factory; import org.apache.paimon.options.MemorySize; import org.apache.paimon.options.Options; import javax.annotation.Nullable; /** Factory to create {@link FileFormat}. */ -public interface FileFormatFactory { +public interface FileFormatFactory extends Factory { String identifier(); diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/ObjectCacheManager.java b/paimon-common/src/main/java/org/apache/paimon/fs/ObjectCacheManager.java new file mode 100644 index 000000000000..f157578107eb --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/fs/ObjectCacheManager.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.fs; + +import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache; +import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine; + +import java.time.Duration; +import java.util.function.Function; + +/** + * Sample Object Cache Manager . + * + * @param + * @param + */ +public class ObjectCacheManager { + private final Cache cache; + + private ObjectCacheManager(Duration timeout, int maxSize) { + this.cache = Caffeine.newBuilder().maximumSize(maxSize).expireAfterWrite(timeout).build(); + } + + public static ObjectCacheManager newObjectCacheManager( + Duration timeout, int maxSize) { + return new ObjectCacheManager<>(timeout, maxSize); + } + + public ObjectCacheManager put(K k, V v) { + this.cache.put(k, v); + return this; + } + + public V get(K k, Function creator) { + return this.cache.get(k, creator); + } + + public V getIfPresent(K k) { + return this.cache.getIfPresent(k); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntrySerializer.java b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntrySerializer.java index b1030448a7e1..69f74078a585 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntrySerializer.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntrySerializer.java @@ -37,11 +37,18 @@ public class ManifestEntrySerializer extends VersionedObjectSerializer filters) { RowType entryType = VersionedObjectSerializer.versionType(ManifestEntry.SCHEMA); return new ManifestFile( fileIO, schemaManager, partitionType, - new ManifestEntrySerializer(), + ManifestEntrySerializer.getInstance(), entryType, - fileFormat.createReaderFactory(entryType), + fileFormat.createReaderFactory(entryType, filters), fileFormat.createWriterFactory(entryType), compression, pathFactory.manifestFileFactory(), diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java index 4f8a1f3264de..392a3aea145e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java @@ -35,9 +35,13 @@ import org.apache.paimon.operation.metrics.ScanStats; import org.apache.paimon.partition.PartitionPredicate; import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.predicate.PredicateBuilder; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.source.ScanMode; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.IntType; +import org.apache.paimon.types.RowType; import org.apache.paimon.utils.BiFilter; import org.apache.paimon.utils.Filter; import org.apache.paimon.utils.Pair; @@ -79,6 +83,7 @@ public abstract class AbstractFileStoreScan implements FileStoreScan { private Snapshot specifiedSnapshot = null; private Filter bucketFilter = null; + private Collection buckets; private BiFilter totalAwareBucketFilter = null; protected ScanMode scanMode = ScanMode.ALL; private Filter levelFilter = null; @@ -127,6 +132,14 @@ public FileStoreScan withPartitionFilter(PartitionPredicate predicate) { @Override public FileStoreScan withBucket(int bucket) { this.bucketFilter = i -> i == bucket; + this.buckets = Collections.singletonList(bucket); + return this; + } + + @Override + public FileStoreScan withBuckets(Collection buckets) { + this.bucketFilter = buckets::contains; + this.buckets = buckets; return this; } @@ -427,7 +440,7 @@ private List readManifest( @Nullable Filter additionalTFilter) { List entries = manifestFileFactory - .create() + .create(createPushDownFilter(buckets)) .withCacheMetrics( scanMetrics != null ? scanMetrics.getCacheMetrics() : null) .read( @@ -471,6 +484,22 @@ private Filter createCacheRowFilter() { return row -> manifestCacheFilter.test(partitionGetter.apply(row), bucketGetter.apply(row)); } + /** + * Read the corresponding entries based on the current required bucket, but push down into file + * format . + */ + private static List createPushDownFilter(Collection buckets) { + if (buckets == null || buckets.isEmpty()) { + return null; + } + List predicates = new ArrayList<>(); + PredicateBuilder predicateBuilder = + new PredicateBuilder( + RowType.of(new DataType[] {new IntType()}, new String[] {"_BUCKET"})); + predicates.add(predicateBuilder.in(0, new ArrayList<>(buckets))); + return predicates; + } + /** * Read the corresponding entries based on the current required partition and bucket. * diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java index 179d16de6cd2..8e9dc31757fe 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java @@ -38,6 +38,7 @@ import javax.annotation.Nullable; import java.util.ArrayList; +import java.util.Collection; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; @@ -57,6 +58,8 @@ public interface FileStoreScan { FileStoreScan withBucket(int bucket); + FileStoreScan withBuckets(Collection buckets); + FileStoreScan withBucketFilter(Filter bucketFilter); FileStoreScan withTotalAwareBucketFilter(BiFilter bucketFilter); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java index e3e290f06086..050b0841074d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java @@ -33,6 +33,7 @@ import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.table.source.DataTableScan; import org.apache.paimon.table.source.InnerTableRead; +import org.apache.paimon.table.source.InnerTableScan; import org.apache.paimon.table.source.Split; import org.apache.paimon.table.source.TableRead; import org.apache.paimon.table.source.TableScan; @@ -43,6 +44,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -265,6 +267,13 @@ public Scan withBucketFilter(Filter bucketFilter) { return this; } + @Override + public InnerTableScan withBuckets(Collection buckets) { + mainScan.withBuckets(buckets); + fallbackScan.withBuckets(buckets); + return this; + } + @Override public Scan withLevelFilter(Filter levelFilter) { mainScan.withLevelFilter(levelFilter); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java index 24c6943f546f..1f478f283b68 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java @@ -48,6 +48,7 @@ import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.SnapshotManager; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Optional; @@ -79,6 +80,12 @@ public AbstractDataTableScan withBucketFilter(Filter bucketFilter) { return this; } + @Override + public AbstractDataTableScan withBuckets(Collection buckets) { + snapshotReader.withBuckets(buckets); + return this; + } + @Override public AbstractDataTableScan withPartitionFilter(Map partitionSpec) { snapshotReader.withPartitionFilter(partitionSpec); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java index c2425ff16f97..58f884528054 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java @@ -23,6 +23,8 @@ import org.apache.paimon.predicate.Predicate; import org.apache.paimon.utils.Filter; +import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Map; @@ -47,6 +49,15 @@ default InnerTableScan withBucketFilter(Filter bucketFilter) { return this; } + default InnerTableScan withBucket(Integer bucket) { + return withBuckets(Collections.singletonList(bucket)); + } + + default InnerTableScan withBuckets(Collection buckets) { + // return this is not safe for too many class not impl this method and withBucketFilter + return this; + } + default InnerTableScan withLevelFilter(Filter levelFilter) { return this; } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java index f3e0a92b8fc7..50a5b6940c3e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java @@ -39,6 +39,7 @@ import javax.annotation.Nullable; +import java.util.Collection; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -83,6 +84,8 @@ public interface SnapshotReader { SnapshotReader withBucket(int bucket); + SnapshotReader withBuckets(Collection buckets); + SnapshotReader withBucketFilter(Filter bucketFilter); SnapshotReader withDataFileNameFilter(Filter fileNameFilter); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java index bf19ba10c689..f24b0760e6f3 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java @@ -54,6 +54,7 @@ import javax.annotation.Nullable; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -253,6 +254,11 @@ public SnapshotReader withBucket(int bucket) { return this; } + public SnapshotReader withBuckets(Collection buckets) { + scan.withBuckets(buckets); + return this; + } + @Override public SnapshotReader withBucketFilter(Filter bucketFilter) { scan.withBucketFilter(bucketFilter); @@ -285,7 +291,13 @@ public SnapshotReader withShard(int indexOfThisSubtask, int numberOfParallelSubt Math.abs(file.hashCode() % numberOfParallelSubtasks) == indexOfThisSubtask); } else { - withBucketFilter(bucket -> bucket % numberOfParallelSubtasks == indexOfThisSubtask); + Set buckets = new HashSet<>(); + for (int bucket = 0; bucket < this.tableSchema.numBuckets(); bucket++) { + if (bucket % numberOfParallelSubtasks == indexOfThisSubtask) { + buckets.add(bucket); + } + } + withBuckets(buckets); } return this; } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java index 1cb967f8d1e2..f1dc3331b4ee 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java @@ -70,6 +70,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.Iterator; import java.util.List; @@ -342,6 +343,12 @@ public SnapshotReader withBucket(int bucket) { return this; } + @Override + public SnapshotReader withBuckets(Collection buckets) { + wrapped.withBuckets(buckets); + return this; + } + @Override public SnapshotReader withBucketFilter(Filter bucketFilter) { wrapped.withBucketFilter(bucketFilter); @@ -452,6 +459,12 @@ public InnerTableScan withBucketFilter(Filter bucketFilter) { return this; } + @Override + public InnerTableScan withBuckets(Collection buckets) { + batchScan.withBuckets(buckets); + return this; + } + @Override public InnerTableScan withLevelFilter(Filter levelFilter) { batchScan.withLevelFilter(levelFilter); @@ -489,6 +502,18 @@ public StreamDataTableScan withFilter(Predicate predicate) { return this; } + @Override + public InnerTableScan withBucketFilter(Filter bucketFilter) { + streamScan.withBucketFilter(bucketFilter); + return this; + } + + @Override + public InnerTableScan withBuckets(Collection buckets) { + streamScan.withBuckets(buckets); + return this; + } + @Override public StartingContext startingContext() { return streamScan.startingContext(); diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsCache.java b/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsCache.java index 8fe13943a3b2..e304455dcfc0 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsCache.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsCache.java @@ -29,6 +29,9 @@ import org.apache.paimon.operation.metrics.CacheMetrics; import org.apache.paimon.types.RowType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; @@ -42,6 +45,7 @@ /** Cache records to {@link SegmentsCache} by compacted serializer. */ @ThreadSafe public class ObjectsCache { + protected static final Logger LOG = LoggerFactory.getLogger(ObjectsCache.class); private final SegmentsCache cache; private final ObjectSerializer projectedSerializer; @@ -83,6 +87,9 @@ public List read( } return readFromSegments(segments, readFilter, readVFilter); } else { + if (LOG.isDebugEnabled()) { + LOG.debug("not match cache key {}", key); + } if (cacheMetrics != null) { cacheMetrics.increaseMissedObject(); } diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java index 4f3d5c1c24dd..f20fd06d31e0 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java @@ -39,6 +39,7 @@ import org.junit.jupiter.api.io.TempDir; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashSet; @@ -224,6 +225,30 @@ public void testWithBucket() throws Exception { runTestExactMatch(scan, snapshot.id(), expected); } + @Test + public void testWithBuckets() throws Exception { + ThreadLocalRandom random = ThreadLocalRandom.current(); + List data = generateData(random.nextInt(1000) + 1); + Snapshot snapshot = writeData(data); + + int wantedBucket1 = random.nextInt(NUM_BUCKETS); + int wantedBucket2 = random.nextInt(NUM_BUCKETS); + int wantedBucket3 = random.nextInt(NUM_BUCKETS); + Set buckets = + new HashSet<>(Arrays.asList(wantedBucket1, wantedBucket2, wantedBucket3)); + + FileStoreScan scan = store.newScan(); + scan.withSnapshot(snapshot.id()); + scan.withBuckets(buckets); + + Map expected = + store.toKvMap( + data.stream() + .filter(kv -> buckets.contains(getBucket(kv))) + .collect(Collectors.toList())); + runTestExactMatch(scan, snapshot.id(), expected); + } + @Test public void testWithSnapshot() throws Exception { ThreadLocalRandom random = ThreadLocalRandom.current(); diff --git a/paimon-core/src/test/resources/META-INF/services/org.apache.paimon.factories.Factory b/paimon-core/src/test/resources/META-INF/services/org.apache.paimon.factories.Factory index 7eb517ab9835..5e87f5e9ea2e 100644 --- a/paimon-core/src/test/resources/META-INF/services/org.apache.paimon.factories.Factory +++ b/paimon-core/src/test/resources/META-INF/services/org.apache.paimon.factories.Factory @@ -13,4 +13,5 @@ # See the License for the specific language governing permissions and # limitations under the License. +org.apache.paimon.format.FileStatsExtractingAvroFormatFactory org.apache.paimon.mergetree.compact.aggregate.TestCustomAggFactory \ No newline at end of file diff --git a/paimon-core/src/test/resources/META-INF/services/org.apache.paimon.format.FileFormatFactory b/paimon-core/src/test/resources/META-INF/services/org.apache.paimon.format.FileFormatFactory deleted file mode 100644 index 14386c45f21b..000000000000 --- a/paimon-core/src/test/resources/META-INF/services/org.apache.paimon.format.FileFormatFactory +++ /dev/null @@ -1,16 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -org.apache.paimon.format.FileStatsExtractingAvroFormatFactory diff --git a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory index 6251189560f6..4a9a46144798 100644 --- a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory +++ b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory @@ -82,3 +82,8 @@ org.apache.paimon.flink.procedure.MarkPartitionDoneProcedure org.apache.paimon.flink.procedure.CloneProcedure org.apache.paimon.flink.procedure.CompactManifestProcedure org.apache.paimon.flink.procedure.RefreshObjectTableProcedure + +### fileFormat factories +org.apache.paimon.flink.compact.changelog.format.CompactedChangelogReadOnlyFormat$OrcFactory +org.apache.paimon.flink.compact.changelog.format.CompactedChangelogReadOnlyFormat$ParquetFactory +org.apache.paimon.flink.compact.changelog.format.CompactedChangelogReadOnlyFormat$AvroFactory diff --git a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.format.FileFormatFactory b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.format.FileFormatFactory deleted file mode 100644 index 6e7553d5c668..000000000000 --- a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.format.FileFormatFactory +++ /dev/null @@ -1,18 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -org.apache.paimon.flink.compact.changelog.format.CompactedChangelogReadOnlyFormat$OrcFactory -org.apache.paimon.flink.compact.changelog.format.CompactedChangelogReadOnlyFormat$ParquetFactory -org.apache.paimon.flink.compact.changelog.format.CompactedChangelogReadOnlyFormat$AvroFactory diff --git a/paimon-format/src/main/java/org/apache/orc/OrcConf.java b/paimon-format/src/main/java/org/apache/orc/OrcConf.java index a7fa1a21bc8c..ee07e45117a4 100644 --- a/paimon-format/src/main/java/org/apache/orc/OrcConf.java +++ b/paimon-format/src/main/java/org/apache/orc/OrcConf.java @@ -305,6 +305,21 @@ public enum OrcConf { + "must have the filter\n" + "reapplied to avoid using unset values in the unselected rows.\n" + "If unsure please leave this as false."), + + READER_ONLY_ALLOW_SARG_TO_FILTER( + "orc.reader.sarg.to.filter", + "orc.reader.sarg.to.filter", + false, + "A boolean flag to determine if a SArg is allowed to become a filter, only for reader."), + READER_ONLY_USE_SELECTED( + "orc.reader.filter.use.selected", + "orc.reader.filter.use.selected", + false, + "A boolean flag to determine if the selected vector is supported by\n" + + "the reading application, only for reader. If false, the output of the ORC reader " + + "must have the filter\n" + + "reapplied to avoid using unset values in the unselected rows.\n" + + "If unsure please leave this as false."), ALLOW_PLUGIN_FILTER( "orc.filter.plugin", "orc.filter.plugin", diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcArrayColumnVector.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcArrayColumnVector.java index 25a1935f3e4b..6aeb8c98892e 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcArrayColumnVector.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcArrayColumnVector.java @@ -42,7 +42,7 @@ public OrcArrayColumnVector( @Override public InternalArray getArray(int i) { - i = rowMapper(i); + // no need call rowMapper(i) here . long offset = hiveVector.offsets[i]; long length = hiveVector.lengths[i]; return new ColumnarArray(paimonVector, (int) offset, (int) length); diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcRowColumnVector.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcRowColumnVector.java index 6c73c9fdbe0d..b925f5b536b1 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcRowColumnVector.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcRowColumnVector.java @@ -46,7 +46,7 @@ public OrcRowColumnVector( @Override public ColumnarRow getRow(int i) { - i = rowMapper(i); + // no need to call rowMapper here . return new ColumnarRow(batch, i); } diff --git a/paimon-format/src/main/resources/META-INF/services/org.apache.paimon.format.FileFormatFactory b/paimon-format/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory similarity index 93% rename from paimon-format/src/main/resources/META-INF/services/org.apache.paimon.format.FileFormatFactory rename to paimon-format/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory index 7af6f79b3493..4aa1b23b3b7a 100644 --- a/paimon-format/src/main/resources/META-INF/services/org.apache.paimon.format.FileFormatFactory +++ b/paimon-format/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory @@ -15,4 +15,4 @@ org.apache.paimon.format.avro.AvroFileFormatFactory org.apache.paimon.format.orc.OrcFileFormatFactory -org.apache.paimon.format.parquet.ParquetFileFormatFactory +org.apache.paimon.format.parquet.ParquetFileFormatFactory \ No newline at end of file