Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve GeoParquetReader #863

Merged
merged 1 commit into from
Jun 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.baremaps.geoparquet;

import java.io.IOException;
import java.util.Map;
import java.util.Queue;
import java.util.Spliterator;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.function.Consumer;
import org.apache.baremaps.geoparquet.data.GeoParquetGroup;
import org.apache.baremaps.geoparquet.hadoop.GeoParquetGroupReadSupport;
import org.apache.hadoop.fs.FileStatus;
import org.apache.parquet.hadoop.ParquetReader;

public class GeoParquetGroupSpliterator implements Spliterator<GeoParquetGroup> {

private final GeoParquetReader geoParquetReader;
private final Queue<FileStatus> queue;
private final Map<FileStatus, GeoParquetReader.FileInfo> files;
private FileStatus fileStatus = null;
private ParquetReader<GeoParquetGroup> reader;

GeoParquetGroupSpliterator(GeoParquetReader geoParquetReader,
Map<FileStatus, GeoParquetReader.FileInfo> files) {
this.geoParquetReader = geoParquetReader;
this.files = files;
this.queue = new ArrayBlockingQueue<>(files.keySet().size(), false, files.keySet());
}

@Override
public boolean tryAdvance(Consumer<? super GeoParquetGroup> action) {
try {
// Poll the next file
if (fileStatus == null) {
fileStatus = queue.poll();
}

// If there are no more files, return false
if (fileStatus == null) {
return false;
}

// Create a new reader if it does not exist
if (reader == null) {
reader = createParquetReader(fileStatus);
}

// Read the next group
GeoParquetGroup group = reader.read();

// If the group is null, close the resources and set the variables to null
if (group == null) {
reader.close();
reader = null;
fileStatus = null;

// Try to advance again
return tryAdvance(action);
}

// Accept the group and tell the caller that there are more groups to read
action.accept(group);
return true;

} catch (IOException e) {
// If an exception occurs, try to close the resources and throw a runtime exception
if (reader != null) {
try {
reader.close();
} catch (IOException e2) {
// Ignore the exception as the original exception is more important
}
reader = null;
}
throw new GeoParquetException("IOException caught while trying to read the next file.", e);
}
}

private ParquetReader<GeoParquetGroup> createParquetReader(FileStatus file)
throws IOException {
return ParquetReader
.builder(new GeoParquetGroupReadSupport(), file.getPath())
.withConf(geoParquetReader.configuration)
.build();
}

@Override
public Spliterator<GeoParquetGroup> trySplit() {
if (queue.size() < 2) {
// There is nothing left to split
return null;
}

// Create a new spliterator by polling the next polledFileStatus
FileStatus polledFileStatus = queue.poll();

// If there are no more files, tell the caller that there is nothing to split anymore
if (polledFileStatus == null) {
return null;
}

// Return a new spliterator with the polledFileStatus
return new GeoParquetGroupSpliterator(geoParquetReader,
Map.of(polledFileStatus, files.get(polledFileStatus)));
}

@Override
public long estimateSize() {
return files.values().stream()
.map(GeoParquetReader.FileInfo::recordCount)
.reduce(0L, Long::sum);
}

@Override
public int characteristics() {
// The spliterator is not ordered, or sorted
return NONNULL | IMMUTABLE | SIZED | DISTINCT;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,17 @@
import java.net.URI;
import java.net.URISyntaxException;
import java.util.*;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.function.Consumer;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.baremaps.geoparquet.data.GeoParquetGroup;
import org.apache.baremaps.geoparquet.data.GeoParquetGroup.Schema;
import org.apache.baremaps.geoparquet.data.GeoParquetGroupFactory;
import org.apache.baremaps.geoparquet.data.GeoParquetMetadata;
import org.apache.baremaps.geoparquet.hadoop.GeoParquetGroupReadSupport;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.schema.MessageType;


Expand All @@ -48,11 +44,11 @@ public class GeoParquetReader {

private final URI uri;

private final Configuration configuration;
final Configuration configuration;

private Map<FileStatus, FileInfo> files;

private record FileInfo(FileStatus file, Long recordCount, Map<String, String> keyValueMetadata,
record FileInfo(FileStatus file, Long recordCount, Map<String, String> keyValueMetadata,
MessageType messageType, GeoParquetMetadata metadata,
GeoParquetGroup.Schema geoParquetSchema) {
}
Expand Down Expand Up @@ -144,115 +140,13 @@ private FileInfo buildFileInfo(FileStatus file) throws IOException {
}

public Stream<GeoParquetGroup> readParallel() throws URISyntaxException {
return StreamSupport.stream(
new GeoParquetGroupSpliterator(files()),
true);
return StreamSupport.stream(new GeoParquetGroupSpliterator(this, files()), true);
}

public Stream<GeoParquetGroup> read() throws IOException, URISyntaxException {
return readParallel().sequential();
}

public class GeoParquetGroupSpliterator implements Spliterator<GeoParquetGroup> {

private final Queue<FileStatus> queue;
private final Map<FileStatus, FileInfo> files;

private FileStatus fileStatus;

private ParquetReader<GeoParquetGroup> reader;

GeoParquetGroupSpliterator(Map<FileStatus, FileInfo> files) {
this.files = files;
this.queue = new ArrayBlockingQueue<>(files.keySet().size(), false, files.keySet());

}

@Override
public boolean tryAdvance(Consumer<? super GeoParquetGroup> action) {
try {
// Poll the next file
if (fileStatus == null) {
fileStatus = queue.poll();
}

// If there are no more files, return false
if (fileStatus == null) {
return false;
}

// Create a new reader if it does not exist
if (reader == null) {
reader = createParquetReader(fileStatus);
}

// Read the next group
GeoParquetGroup group = reader.read();

// If the group is null, close the resources and set the variables to null
if (group == null) {
reader.close();
reader = null;
fileStatus = null;

// Try to advance again
return tryAdvance(action);
}

// Accept the group and tell the caller that there are more groups to read
action.accept(group);
return true;

} catch (IOException e) {
// If an exception occurs, try to close the resources and throw a runtime exception
if (reader != null) {
try {
reader.close();
} catch (IOException e2) {
// Ignore the exception as the original exception is more important
}
}
throw new GeoParquetException("IOException caught while trying to read the next file.", e);
}
}

private ParquetReader<GeoParquetGroup> createParquetReader(FileStatus file)
throws IOException {
return ParquetReader
.builder(new GeoParquetGroupReadSupport(), file.getPath())
.withConf(configuration)
.build();
}

@Override
public Spliterator<GeoParquetGroup> trySplit() {
// Create a new spliterator by polling the next polledFileStatus
FileStatus polledFileStatus = queue.poll();

// If there are no more files, tell the caller that there is nothing to split anymore
if (polledFileStatus == null) {
return null;
}

// Return a new spliterator with the polledFileStatus
return new GeoParquetGroupSpliterator(Map.of(polledFileStatus, files.get(polledFileStatus)));
}

@Override
public long estimateSize() {
// The size is unknown
return files.values().stream()
.map(FileInfo::recordCount)
.reduce(0L, Long::sum);
}

@Override
public int characteristics() {
// The spliterator is not sized, ordered, or sorted
return Spliterator.NONNULL | Spliterator.IMMUTABLE;
}
}

private static Configuration createConfiguration() {
Configuration configuration = new Configuration();
configuration.set("fs.s3a.aws.credentials.provider",
Expand Down
Loading