Skip to content

Commit

Permalink
Implement the GeoParquetGroup interface
Browse files Browse the repository at this point in the history
  • Loading branch information
bchapuis committed May 27, 2024
1 parent 57f945c commit f2b526a
Show file tree
Hide file tree
Showing 10 changed files with 653 additions and 251 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import java.util.function.Consumer;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.baremaps.geoparquet.data.GeoParquetGroupImpl;
import org.apache.baremaps.geoparquet.data.GeoParquetGroup;
import org.apache.baremaps.geoparquet.hadoop.GeoParquetGroupReadSupport;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
Expand All @@ -52,7 +52,7 @@ public GeoParquetReader(URI uri, Configuration configuration) {
this.configuration = configuration;
}

public Stream<GeoParquetGroupImpl> readParallel() throws IOException, URISyntaxException {
public Stream<GeoParquetGroup> readParallel() throws IOException, URISyntaxException {
Path globPath = new Path(uri.getPath());
URI rootUri = getRootUri(uri);
FileSystem fileSystem = FileSystem.get(rootUri, configuration);
Expand All @@ -62,24 +62,24 @@ public Stream<GeoParquetGroupImpl> readParallel() throws IOException, URISyntaxE
true);
}

public Stream<GeoParquetGroupImpl> read() throws IOException, URISyntaxException {
public Stream<GeoParquetGroup> read() throws IOException, URISyntaxException {
return readParallel().sequential();
}

public class GeoParquetGroupSpliterator implements Spliterator<GeoParquetGroupImpl> {
public class GeoParquetGroupSpliterator implements Spliterator<GeoParquetGroup> {

private final Queue<FileStatus> files;

private FileStatus file;

private ParquetReader<GeoParquetGroupImpl> reader;
private ParquetReader<GeoParquetGroup> reader;

public GeoParquetGroupSpliterator(List<FileStatus> files) {
this.files = new ArrayBlockingQueue<>(files.size(), false, files);
}

@Override
public boolean tryAdvance(Consumer<? super GeoParquetGroupImpl> action) {
public boolean tryAdvance(Consumer<? super GeoParquetGroup> action) {
try {
// Poll the next file
if (file == null) {
Expand All @@ -97,7 +97,7 @@ public boolean tryAdvance(Consumer<? super GeoParquetGroupImpl> action) {
}

// Read the next group
GeoParquetGroupImpl group = reader.read();
GeoParquetGroup group = reader.read();

// If the group is null, close the resources and set the variables to null
if (group == null) {
Expand Down Expand Up @@ -127,7 +127,7 @@ public boolean tryAdvance(Consumer<? super GeoParquetGroupImpl> action) {
}

@Override
public Spliterator<GeoParquetGroupImpl> trySplit() {
public Spliterator<GeoParquetGroup> trySplit() {
// Create a new spliterator by polling the next file
FileStatus file = files.poll();

Expand All @@ -153,7 +153,7 @@ public int characteristics() {
}
}

private ParquetReader<GeoParquetGroupImpl> createParquetReader(FileStatus file)
private ParquetReader<GeoParquetGroup> createParquetReader(FileStatus file)
throws IOException {
return ParquetReader
.builder(new GeoParquetGroupReadSupport(), file.getPath())
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,14 @@ public interface GeoParquetGroup {

List<Integer> getIntegerValues(int fieldIndex);

Binary getInt96Value(int fieldIndex);

List<Binary> getInt96Values(int fieldIndex);

Binary getNanoTimeValue(int fieldIndex);

List<Binary> getNanoTimeValues(int fieldIndex);

Long getLongValue(int fieldIndex);

List<Long> getLongValues(int fieldIndex);
Expand All @@ -94,41 +102,49 @@ public interface GeoParquetGroup {

List<GeoParquetGroup> getGroupValues(int fieldIndex);

Binary getBinaryValue(String columnName);
Binary getBinaryValue(String fieldName);

List<Binary> getBinaryValues(String fieldName);

List<Binary> getBinaryValues(String columnName);
Boolean getBooleanValue(String fieldName);

Boolean getBooleanValue(String columnName);
List<Boolean> getBooleanValues(String fieldName);

List<Boolean> getBooleanValues(String columnName);
Double getDoubleValue(String fieldName);

Double getDoubleValue(String columnName);
List<Double> getDoubleValues(String fieldName);

List<Double> getDoubleValues(String columnName);
Float getFloatValue(String fieldName);

Float getFloatValue(String columnName);
List<Float> getFloatValues(String fieldName);

List<Float> getFloatValues(String columnName);
Integer getIntegerValue(String fieldName);

Integer getIntegerValue(String columnName);
List<Integer> getIntegerValues(String fieldName);

List<Integer> getIntegerValues(String columnName);
Binary getInt96Value(String fieldName);

Long getLongValue(String columnName);
List<Binary> getInt96Values(String fieldName);

List<Long> getLongValues(String columnName);
Binary getNanoTimeValue(String fieldName);

String getStringValue(String columnName);
List<Binary> getNanoTimeValues(String fieldName);

List<String> getStringValues(String columnName);
Long getLongValue(String fieldName);

Geometry getGeometryValue(String columnName);
List<Long> getLongValues(String fieldName);

List<Geometry> getGeometryValues(String columnName);
String getStringValue(String fieldName);

GeoParquetGroup getGroupValue(String columnName);
List<String> getStringValues(String fieldName);

List<GeoParquetGroup> getGroupValues(String columnName);
Geometry getGeometryValue(String fieldName);

List<Geometry> getGeometryValues(String fieldName);

GeoParquetGroup getGroupValue(String fieldName);

List<GeoParquetGroup> getGroupValues(String fieldName);

void setBinaryValue(int fieldIndex, Binary binaryValue);

Expand All @@ -150,6 +166,14 @@ public interface GeoParquetGroup {

void setIntegerValues(int fieldIndex, List<Integer> integerValues);

void setInt96Value(int fieldIndex, Binary int96Value);

void setInt96Values(int fieldIndex, List<Binary> int96Values);

void setNanoTimeValue(int fieldIndex, Binary nanoTimeValue);

void setNanoTimeValues(int fieldIndex, List<Binary> nanoTimeValues);

void setLongValue(int fieldIndex, Long longValue);

void setLongValues(int fieldIndex, List<Long> longValues);
Expand All @@ -166,41 +190,49 @@ public interface GeoParquetGroup {

void setGroupValues(int fieldIndex, List<GeoParquetGroup> groupValues);

void setBinaryValue(String columnName, Binary binaryValue);
void setBinaryValue(String fieldName, Binary binaryValue);

void setBinaryValues(String fieldName, List<Binary> binaryValues);

void setBooleanValue(String fieldName, Boolean booleanValue);

void setBinaryValues(String columnName, List<Binary> binaryValues);
void setBooleanValues(String fieldName, List<Boolean> booleanValues);

void setBooleanValue(String columnName, Boolean booleanValue);
void setDoubleValue(String fieldName, Double doubleValue);

void setBooleanValues(String columnName, List<Boolean> booleanValues);
void setDoubleValues(String fieldName, List<Double> doubleValues);

void setDoubleValue(String columnName, Double doubleValue);
void setFloatValue(String fieldName, Float floatValue);

void setDoubleValues(String columnName, List<Double> doubleValues);
void setFloatValues(String fieldName, List<Float> floatValues);

void setFloatValue(String columnName, Float floatValue);
void setIntegerValue(String fieldName, Integer integerValue);

void setFloatValues(String columnName, List<Float> floatValues);
void setIntegerValues(String fieldName, List<Integer> integerValues);

void setIntegerValue(String columnName, Integer integerValue);
void setInt96Value(String fieldName, Binary int96Value);

void setIntegerValues(String columnName, List<Integer> integerValues);
void setInt96Values(String fieldName, List<Binary> int96Values);

void setLongValue(String columnName, Long longValue);
void setNanoTimeValue(String fieldName, Binary nanoTimeValue);

void setLongValues(String columnName, List<Long> longValues);
void setNanoTimeValues(String fieldName, List<Binary> nanoTimeValues);

void setStringValue(String columnName, String stringValue);
void setLongValue(String fieldName, Long longValue);

void setStringValues(String columnName, List<String> stringValues);
void setLongValues(String fieldName, List<Long> longValues);

void setGeometryValue(String columnName, Geometry geometryValue);
void setStringValue(String fieldName, String stringValue);

void setGeometryValues(String columnName, List<Geometry> geometryValues);
void setStringValues(String fieldName, List<String> stringValues);

void setGroupValue(String columnName, GeoParquetGroup groupValue);
void setGeometryValue(String fieldName, Geometry geometryValue);

void setGroupValues(String columnName, List<GeoParquetGroup> groupValues);
void setGeometryValues(String fieldName, List<Geometry> geometryValues);

void setGroupValue(String fieldName, GeoParquetGroup groupValue);

void setGroupValues(String fieldName, List<GeoParquetGroup> groupValues);

/**
* A GeoParquet schema that describes the fields of a group and can easily be introspected.
Expand Down Expand Up @@ -237,55 +269,63 @@ public Type type() {
record BooleanField(String name, Cardinality cardinality) implements Field {

@Override
public Type type() {
public Type type() {
return Type.BOOLEAN;
}
}

record DoubleField(String name, Cardinality cardinality) implements Field {

@Override
public Type type() {
public Type type() {
return Type.DOUBLE;
}
}

record FloatField(String name, Cardinality cardinality) implements Field {

@Override
public Type type() {
public Type type() {
return Type.FLOAT;
}
}

record IntegerField(String name, Cardinality cardinality) implements Field {

@Override
public Type type() {
public Type type() {
return Type.INTEGER;
}
}

record LongField(String name, Cardinality cardinality) implements Field {
record Int96Field(String name, Cardinality cardinality) implements Field {

@Override
public Type type() {
return Type.INT96;
}
}

record LongField(String name, Cardinality cardinality) implements Field {

@Override
public Type type() {
return Type.LONG;
}
}

record StringField(String name, Cardinality cardinality) implements Field {

@Override
public Type type() {
public Type type() {
return Type.STRING;
}
}

record GeometryField(String name, Cardinality cardinality) implements Field {

@Override
public Type type() {
public Type type() {
return Type.GEOMETRY;
}
}
Expand All @@ -307,6 +347,8 @@ enum Type {
DOUBLE,
FLOAT,
INTEGER,
INT96,
NANO_TIME,
LONG,
STRING,
GEOMETRY,
Expand Down
Loading

0 comments on commit f2b526a

Please sign in to comment.