From cd43aaeeeb3d0b59bb7afc7f7910f34e85597986 Mon Sep 17 00:00:00 2001 From: sebr72 <48369171+sebr72@users.noreply.github.com> Date: Wed, 5 Jun 2024 09:57:42 +0200 Subject: [PATCH] Make SplitIterator an outer class and fix parallelism (#863) --- .../GeoParquetGroupSpliterator.java | 135 ++++++++++++++++++ .../baremaps/geoparquet/GeoParquetReader.java | 112 +-------------- 2 files changed, 138 insertions(+), 109 deletions(-) create mode 100644 baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetGroupSpliterator.java diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetGroupSpliterator.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetGroupSpliterator.java new file mode 100644 index 000000000..465b1226c --- /dev/null +++ b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetGroupSpliterator.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.baremaps.geoparquet; + +import java.io.IOException; +import java.util.Map; +import java.util.Queue; +import java.util.Spliterator; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.function.Consumer; +import org.apache.baremaps.geoparquet.data.GeoParquetGroup; +import org.apache.baremaps.geoparquet.hadoop.GeoParquetGroupReadSupport; +import org.apache.hadoop.fs.FileStatus; +import org.apache.parquet.hadoop.ParquetReader; + +public class GeoParquetGroupSpliterator implements Spliterator { + + private final GeoParquetReader geoParquetReader; + private final Queue queue; + private final Map files; + private FileStatus fileStatus = null; + private ParquetReader reader; + + GeoParquetGroupSpliterator(GeoParquetReader geoParquetReader, + Map files) { + this.geoParquetReader = geoParquetReader; + this.files = files; + this.queue = new ArrayBlockingQueue<>(files.keySet().size(), false, files.keySet()); + } + + @Override + public boolean tryAdvance(Consumer action) { + try { + // Poll the next file + if (fileStatus == null) { + fileStatus = queue.poll(); + } + + // If there are no more files, return false + if (fileStatus == null) { + return false; + } + + // Create a new reader if it does not exist + if (reader == null) { + reader = createParquetReader(fileStatus); + } + + // Read the next group + GeoParquetGroup group = reader.read(); + + // If the group is null, close the resources and set the variables to null + if (group == null) { + reader.close(); + reader = null; + fileStatus = null; + + // Try to advance again + return tryAdvance(action); + } + + // Accept the group and tell the caller that there are more groups to read + action.accept(group); + return true; + + } catch (IOException e) { + // If an exception occurs, try to close the resources and throw a runtime exception + if (reader != null) { + try { + reader.close(); + } catch (IOException e2) { + // Ignore the exception as the original exception is more important + } + reader = null; + } + throw new GeoParquetException("IOException caught while trying to read the next file.", e); + } + } + + private ParquetReader createParquetReader(FileStatus file) + throws IOException { + return ParquetReader + .builder(new GeoParquetGroupReadSupport(), file.getPath()) + .withConf(geoParquetReader.configuration) + .build(); + } + + @Override + public Spliterator trySplit() { + if (queue.size() < 2) { + // There is nothing left to split + return null; + } + + // Create a new spliterator by polling the next polledFileStatus + FileStatus polledFileStatus = queue.poll(); + + // If there are no more files, tell the caller that there is nothing to split anymore + if (polledFileStatus == null) { + return null; + } + + // Return a new spliterator with the polledFileStatus + return new GeoParquetGroupSpliterator(geoParquetReader, + Map.of(polledFileStatus, files.get(polledFileStatus))); + } + + @Override + public long estimateSize() { + return files.values().stream() + .map(GeoParquetReader.FileInfo::recordCount) + .reduce(0L, Long::sum); + } + + @Override + public int characteristics() { + // The spliterator is not ordered, or sorted + return NONNULL | IMMUTABLE | SIZED | DISTINCT; + } +} diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetReader.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetReader.java index 1eae14172..e88f2c61f 100644 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetReader.java +++ b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetReader.java @@ -23,15 +23,12 @@ import java.net.URI; import java.net.URISyntaxException; import java.util.*; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.function.Consumer; import java.util.stream.Stream; import java.util.stream.StreamSupport; import org.apache.baremaps.geoparquet.data.GeoParquetGroup; import org.apache.baremaps.geoparquet.data.GeoParquetGroup.Schema; import org.apache.baremaps.geoparquet.data.GeoParquetGroupFactory; import org.apache.baremaps.geoparquet.data.GeoParquetMetadata; -import org.apache.baremaps.geoparquet.hadoop.GeoParquetGroupReadSupport; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -39,7 +36,6 @@ import org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider; import org.apache.hadoop.fs.s3a.S3AFileSystem; import org.apache.parquet.hadoop.ParquetFileReader; -import org.apache.parquet.hadoop.ParquetReader; import org.apache.parquet.schema.MessageType; @@ -50,11 +46,11 @@ public class GeoParquetReader { private final URI uri; - private final Configuration configuration; + final Configuration configuration; private Map files; - private record FileInfo(FileStatus file, Long recordCount, Map keyValueMetadata, + record FileInfo(FileStatus file, Long recordCount, Map keyValueMetadata, MessageType messageType, GeoParquetMetadata metadata, GeoParquetGroup.Schema geoParquetSchema) { } @@ -144,115 +140,13 @@ private FileInfo buildFileInfo(FileStatus file) throws IOException { } public Stream readParallel() throws URISyntaxException { - return StreamSupport.stream( - new GeoParquetGroupSpliterator(files()), - true); + return StreamSupport.stream(new GeoParquetGroupSpliterator(this, files()), true); } public Stream read() throws IOException, URISyntaxException { return readParallel().sequential(); } - public class GeoParquetGroupSpliterator implements Spliterator { - - private final Queue queue; - private final Map files; - - private FileStatus fileStatus; - - private ParquetReader reader; - - GeoParquetGroupSpliterator(Map files) { - this.files = files; - this.queue = new ArrayBlockingQueue<>(files.keySet().size(), false, files.keySet()); - - } - - @Override - public boolean tryAdvance(Consumer action) { - try { - // Poll the next file - if (fileStatus == null) { - fileStatus = queue.poll(); - } - - // If there are no more files, return false - if (fileStatus == null) { - return false; - } - - // Create a new reader if it does not exist - if (reader == null) { - reader = createParquetReader(fileStatus); - } - - // Read the next group - GeoParquetGroup group = reader.read(); - - // If the group is null, close the resources and set the variables to null - if (group == null) { - reader.close(); - reader = null; - fileStatus = null; - - // Try to advance again - return tryAdvance(action); - } - - // Accept the group and tell the caller that there are more groups to read - action.accept(group); - return true; - - } catch (IOException e) { - // If an exception occurs, try to close the resources and throw a runtime exception - if (reader != null) { - try { - reader.close(); - } catch (IOException e2) { - // Ignore the exception as the original exception is more important - } - } - throw new GeoParquetException("IOException caught while trying to read the next file.", e); - } - } - - private ParquetReader createParquetReader(FileStatus file) - throws IOException { - return ParquetReader - .builder(new GeoParquetGroupReadSupport(), file.getPath()) - .withConf(configuration) - .build(); - } - - @Override - public Spliterator trySplit() { - // Create a new spliterator by polling the next polledFileStatus - FileStatus polledFileStatus = queue.poll(); - - // If there are no more files, tell the caller that there is nothing to split anymore - if (polledFileStatus == null) { - return null; - } - - // Return a new spliterator with the polledFileStatus - return new GeoParquetGroupSpliterator(Map.of(polledFileStatus, files.get(polledFileStatus))); - } - - @Override - public long estimateSize() { - // The size is unknown - return files.values().stream() - .map(FileInfo::recordCount) - .reduce(0L, Long::sum); - } - - @Override - public int characteristics() { - // The spliterator is not sized, ordered, or sorted - return Spliterator.NONNULL | Spliterator.IMMUTABLE; - } - } - private static Configuration createConfiguration() { Configuration conf = new Configuration(); conf.set("fs.s3a.endpoint", "s3.us-west-2.amazonaws.com");