Skip to content

Commit

Permalink
Fix OPTIONAL group cardinality
Browse files Browse the repository at this point in the history
  • Loading branch information
Drabble committed Jun 3, 2024
1 parent 55b2970 commit f1225b9
Show file tree
Hide file tree
Showing 5 changed files with 14 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,24 +30,22 @@
public class GeoParquetDataStore implements DataStore {

private final URI uri;
private final String tableName;

public GeoParquetDataStore(URI uri, String tableName) {
public GeoParquetDataStore(URI uri) {
this.uri = uri;
this.tableName = tableName;
}

@Override
public List<String> list() throws DataStoreException {
return List.of(tableName);
return List.of(uri.toString());
}

@Override
public DataTable get(String name) throws DataStoreException {
if (!tableName.equals(name)) {
if (!uri.toString().equals(name)) {
throw new DataStoreException("Table not found");
}
return new GeoParquetDataTable(uri, tableName);
return new GeoParquetDataTable(uri);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,12 @@ public class GeoParquetDataTable implements DataTable {

private final URI path;

private final String name;

private DataSchema schema;

private GeoParquetReader reader;

public GeoParquetDataTable(URI path, String name) {
public GeoParquetDataTable(URI path) {
this.path = path;
this.name = name;
}

private GeoParquetReader reader() {
Expand Down Expand Up @@ -78,7 +75,7 @@ public Stream<DataRow> stream() {
public Stream<DataRow> parallelStream() {
try {
return reader().read().map(group -> new DataRowImpl(
GeoParquetTypeConversion.asSchema(name, group.getSchema()),
GeoParquetTypeConversion.asSchema(path.toString(), group.getSchema()),
GeoParquetTypeConversion.asRowValues(group)));
} catch (IOException | URISyntaxException e) {
throw new GeoParquetException("Fail to read() the reader", e);
Expand All @@ -101,7 +98,7 @@ public DataSchema schema() {
if (schema == null) {
try {
Schema schema = reader().getGeoParquetSchema();
this.schema = GeoParquetTypeConversion.asSchema(name, schema);
this.schema = GeoParquetTypeConversion.asSchema(path.toString(), schema);
return this.schema;
} catch (URISyntaxException e) {
throw new GeoParquetException("Failed to get the schema.", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ private static List<DataColumn> asDataColumns(Schema field) {

private static DataColumn asDataColumn(Field field) {
Cardinality cardinality = switch (field.cardinality()) {
case REQUIRED -> Cardinality.OPTIONAL;
case REQUIRED -> Cardinality.REQUIRED;
case OPTIONAL -> Cardinality.OPTIONAL;
case REPEATED -> Cardinality.OPTIONAL;
case REPEATED -> Cardinality.REPEATED;
};
return switch (field.type()) {
case BINARY -> new DataColumnFixed(field.name(), cardinality, Type.BINARY);
Expand All @@ -70,11 +70,11 @@ public static List<Object> asRowValues(GeoParquetGroup group) {
Schema schema = group.getSchema();
List<Field> fields = schema.fields();
for (int i = 0; i < fields.size(); i++) {
Field field = fields.get(i);
if (group.getValues(i).isEmpty()) {
values.add(null);
continue;
}
Field field = fields.get(i);
switch (field.type()) {
case BINARY -> values.add(group.getBinaryValue(i).getBytes());
case BOOLEAN -> values.add(group.getBooleanValue(i));
Expand All @@ -96,11 +96,10 @@ public static Map<String, Object> asNested(GeoParquetGroup group) {
Schema schema = group.getSchema();
List<Field> fields = schema.fields();
for (int i = 0; i < fields.size(); i++) {
Field field = fields.get(i);
if (group.getValues(i).isEmpty()) {
nested.put(field.name(), null);
continue;
}
Field field = fields.get(i);
nested.put(field.name(), switch (field.type()) {
case BINARY -> group.getBinaryValue(i).getBytes();
case BOOLEAN -> group.getBooleanValue(i);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public ImportGeoParquet(URI uri, String tableName, Object database, Integer data
*/
@Override
public void execute(WorkflowContext context) throws Exception {
var geoParquetDataStore = new GeoParquetDataStore(uri, tableName);
var geoParquetDataStore = new GeoParquetDataStore(uri);
var dataSource = context.getDataSource(database);
var postgresDataStore = new PostgresDataStore(dataSource);
for (var name : geoParquetDataStore.list()) {
Expand All @@ -80,7 +80,7 @@ public void execute(WorkflowContext context) throws Exception {
new DataTableGeometryMapper(geoParquetTable, projectionTransformer);
var transformedDataTable =
new DataTableMapper(geoParquetDataStore.get(name), rowTransformer);
postgresDataStore.add(transformedDataTable);
postgresDataStore.add(tableName, transformedDataTable);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ else if (!field.isPrimitive()) {
GeoParquetGroup.Schema geoParquetSchema = createGeoParquetSchema(groupType, metadata);
return (Field) new GeoParquetGroup.GroupField(
groupType.getName(),
GeoParquetGroup.Cardinality.REQUIRED,
cardinality,
geoParquetSchema);
}

Expand Down

0 comments on commit f1225b9

Please sign in to comment.