Skip to content

Commit

Permalink
Implement the GeoParquetGroup interface
Browse files Browse the repository at this point in the history
  • Loading branch information
bchapuis committed May 27, 2024
1 parent 57f945c commit e0abe9a
Show file tree
Hide file tree
Showing 9 changed files with 644 additions and 161 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import java.util.function.Consumer;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.baremaps.geoparquet.data.GeoParquetGroupImpl;
import org.apache.baremaps.geoparquet.data.GeoParquetGroup;
import org.apache.baremaps.geoparquet.hadoop.GeoParquetGroupReadSupport;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
Expand All @@ -52,7 +52,7 @@ public GeoParquetReader(URI uri, Configuration configuration) {
this.configuration = configuration;
}

public Stream<GeoParquetGroupImpl> readParallel() throws IOException, URISyntaxException {
public Stream<GeoParquetGroup> readParallel() throws IOException, URISyntaxException {
Path globPath = new Path(uri.getPath());
URI rootUri = getRootUri(uri);
FileSystem fileSystem = FileSystem.get(rootUri, configuration);
Expand All @@ -62,24 +62,24 @@ public Stream<GeoParquetGroupImpl> readParallel() throws IOException, URISyntaxE
true);
}

public Stream<GeoParquetGroupImpl> read() throws IOException, URISyntaxException {
public Stream<GeoParquetGroup> read() throws IOException, URISyntaxException {
return readParallel().sequential();
}

public class GeoParquetGroupSpliterator implements Spliterator<GeoParquetGroupImpl> {
public class GeoParquetGroupSpliterator implements Spliterator<GeoParquetGroup> {

private final Queue<FileStatus> files;

private FileStatus file;

private ParquetReader<GeoParquetGroupImpl> reader;
private ParquetReader<GeoParquetGroup> reader;

public GeoParquetGroupSpliterator(List<FileStatus> files) {
this.files = new ArrayBlockingQueue<>(files.size(), false, files);
}

@Override
public boolean tryAdvance(Consumer<? super GeoParquetGroupImpl> action) {
public boolean tryAdvance(Consumer<? super GeoParquetGroup> action) {
try {
// Poll the next file
if (file == null) {
Expand All @@ -97,7 +97,7 @@ public boolean tryAdvance(Consumer<? super GeoParquetGroupImpl> action) {
}

// Read the next group
GeoParquetGroupImpl group = reader.read();
GeoParquetGroup group = reader.read();

// If the group is null, close the resources and set the variables to null
if (group == null) {
Expand Down Expand Up @@ -127,7 +127,7 @@ public boolean tryAdvance(Consumer<? super GeoParquetGroupImpl> action) {
}

@Override
public Spliterator<GeoParquetGroupImpl> trySplit() {
public Spliterator<GeoParquetGroup> trySplit() {
// Create a new spliterator by polling the next file
FileStatus file = files.poll();

Expand All @@ -153,7 +153,7 @@ public int characteristics() {
}
}

private ParquetReader<GeoParquetGroupImpl> createParquetReader(FileStatus file)
private ParquetReader<GeoParquetGroup> createParquetReader(FileStatus file)
throws IOException {
return ParquetReader
.builder(new GeoParquetGroupReadSupport(), file.getPath())
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,14 @@ public interface GeoParquetGroup {

List<Integer> getIntegerValues(int fieldIndex);

Binary getInt96Value(int fieldIndex);

List<Binary> getInt96Values(int fieldIndex);

Binary getNanoTimeValue(int fieldIndex);

List<Binary> getNanoTimeValues(int fieldIndex);

Long getLongValue(int fieldIndex);

List<Long> getLongValues(int fieldIndex);
Expand All @@ -94,41 +102,49 @@ public interface GeoParquetGroup {

List<GeoParquetGroup> getGroupValues(int fieldIndex);

Binary getBinaryValue(String columnName);
Binary getBinaryValue(String fieldName);

List<Binary> getBinaryValues(String fieldName);

Boolean getBooleanValue(String fieldName);

List<Binary> getBinaryValues(String columnName);
List<Boolean> getBooleanValues(String fieldName);

Boolean getBooleanValue(String columnName);
Double getDoubleValue(String fieldName);

List<Boolean> getBooleanValues(String columnName);
List<Double> getDoubleValues(String fieldName);

Double getDoubleValue(String columnName);
Float getFloatValue(String fieldName);

List<Double> getDoubleValues(String columnName);
List<Float> getFloatValues(String fieldName);

Float getFloatValue(String columnName);
Integer getIntegerValue(String fieldName);

List<Float> getFloatValues(String columnName);
List<Integer> getIntegerValues(String fieldName);

Integer getIntegerValue(String columnName);
Binary getInt96Value(String fieldName);

List<Integer> getIntegerValues(String columnName);
List<Binary> getInt96Values(String fieldName);

Long getLongValue(String columnName);
Binary getNanoTimeValue(String fieldName);

List<Long> getLongValues(String columnName);
List<Binary> getNanoTimeValues(String fieldName);

String getStringValue(String columnName);
Long getLongValue(String fieldName);

List<String> getStringValues(String columnName);
List<Long> getLongValues(String fieldName);

Geometry getGeometryValue(String columnName);
String getStringValue(String fieldName);

List<Geometry> getGeometryValues(String columnName);
List<String> getStringValues(String fieldName);

GeoParquetGroup getGroupValue(String columnName);
Geometry getGeometryValue(String fieldName);

List<GeoParquetGroup> getGroupValues(String columnName);
List<Geometry> getGeometryValues(String fieldName);

GeoParquetGroup getGroupValue(String fieldName);

List<GeoParquetGroup> getGroupValues(String fieldName);

void setBinaryValue(int fieldIndex, Binary binaryValue);

Expand All @@ -150,6 +166,14 @@ public interface GeoParquetGroup {

void setIntegerValues(int fieldIndex, List<Integer> integerValues);

void setInt96Value(int fieldIndex, Binary int96Value);

void setInt96Values(int fieldIndex, List<Binary> int96Values);

void setNanoTimeValue(int fieldIndex, Binary nanoTimeValue);

void setNanoTimeValues(int fieldIndex, List<Binary> nanoTimeValues);

void setLongValue(int fieldIndex, Long longValue);

void setLongValues(int fieldIndex, List<Long> longValues);
Expand All @@ -166,41 +190,49 @@ public interface GeoParquetGroup {

void setGroupValues(int fieldIndex, List<GeoParquetGroup> groupValues);

void setBinaryValue(String columnName, Binary binaryValue);
void setBinaryValue(String fieldName, Binary binaryValue);

void setBinaryValues(String columnName, List<Binary> binaryValues);
void setBinaryValues(String fieldName, List<Binary> binaryValues);

void setBooleanValue(String columnName, Boolean booleanValue);
void setBooleanValue(String fieldName, Boolean booleanValue);

void setBooleanValues(String columnName, List<Boolean> booleanValues);
void setBooleanValues(String fieldName, List<Boolean> booleanValues);

void setDoubleValue(String columnName, Double doubleValue);
void setDoubleValue(String fieldName, Double doubleValue);

void setDoubleValues(String columnName, List<Double> doubleValues);
void setDoubleValues(String fieldName, List<Double> doubleValues);

void setFloatValue(String columnName, Float floatValue);
void setFloatValue(String fieldName, Float floatValue);

void setFloatValues(String columnName, List<Float> floatValues);
void setFloatValues(String fieldName, List<Float> floatValues);

void setIntegerValue(String columnName, Integer integerValue);
void setIntegerValue(String fieldName, Integer integerValue);

void setIntegerValues(String columnName, List<Integer> integerValues);
void setIntegerValues(String fieldName, List<Integer> integerValues);

void setLongValue(String columnName, Long longValue);
void setInt96Value(String fieldName, Binary int96Value);

void setLongValues(String columnName, List<Long> longValues);
void setInt96Values(String fieldName, List<Binary> int96Values);

void setStringValue(String columnName, String stringValue);
void setNanoTimeValue(String fieldName, Binary nanoTimeValue);

void setStringValues(String columnName, List<String> stringValues);
void setNanoTimeValues(String fieldName, List<Binary> nanoTimeValues);

void setGeometryValue(String columnName, Geometry geometryValue);
void setLongValue(String fieldName, Long longValue);

void setGeometryValues(String columnName, List<Geometry> geometryValues);
void setLongValues(String fieldName, List<Long> longValues);

void setGroupValue(String columnName, GeoParquetGroup groupValue);
void setStringValue(String fieldName, String stringValue);

void setGroupValues(String columnName, List<GeoParquetGroup> groupValues);
void setStringValues(String fieldName, List<String> stringValues);

void setGeometryValue(String fieldName, Geometry geometryValue);

void setGeometryValues(String fieldName, List<Geometry> geometryValues);

void setGroupValue(String fieldName, GeoParquetGroup groupValue);

void setGroupValues(String fieldName, List<GeoParquetGroup> groupValues);

/**
* A GeoParquet schema that describes the fields of a group and can easily be introspected.
Expand Down Expand Up @@ -237,55 +269,55 @@ public Type type() {
record BooleanField(String name, Cardinality cardinality) implements Field {

@Override
public Type type() {
public Type type() {
return Type.BOOLEAN;
}
}

record DoubleField(String name, Cardinality cardinality) implements Field {

@Override
public Type type() {
public Type type() {
return Type.DOUBLE;
}
}

record FloatField(String name, Cardinality cardinality) implements Field {

@Override
public Type type() {
public Type type() {
return Type.FLOAT;
}
}

record IntegerField(String name, Cardinality cardinality) implements Field {

@Override
public Type type() {
public Type type() {
return Type.INTEGER;
}
}

record LongField(String name, Cardinality cardinality) implements Field {

@Override
public Type type() {
public Type type() {
return Type.LONG;
}
}

record StringField(String name, Cardinality cardinality) implements Field {

@Override
public Type type() {
public Type type() {
return Type.STRING;
}
}

record GeometryField(String name, Cardinality cardinality) implements Field {

@Override
public Type type() {
public Type type() {
return Type.GEOMETRY;
}
}
Expand Down
Loading

0 comments on commit e0abe9a

Please sign in to comment.