Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Don't fail when reading non-identity partitioning field #6477

Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,33 +11,31 @@
import io.deephaven.iceberg.util.IcebergTableAdapter;
import io.deephaven.iceberg.internal.DataInstructionsProviderLoader;
import io.deephaven.util.type.TypeUtils;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.iceberg.*;
import org.apache.iceberg.data.IdentityPartitionConverters;
import org.jetbrains.annotations.NotNull;

import java.net.URI;
import java.util.*;
import java.util.stream.Collectors;

/**
* Iceberg {@link TableLocationKeyFinder location finder} for tables with partitions that will discover data files from
* a {@link Snapshot}
*/
public final class IcebergKeyValuePartitionedLayout extends IcebergBaseLayout {
private static class ColumnData {
private static class IdentityPartitioningColData {
final String name;
final Class<?> type;
final int index;
final int index; // position in the partition spec

public ColumnData(String name, Class<?> type, int index) {
private IdentityPartitioningColData(String name, Class<?> type, int index) {
this.name = name;
this.type = type;
this.index = index;
}
}

private final List<ColumnData> outputPartitioningColumns;
private final List<IdentityPartitioningColData> identityPartitioningColumns;

/**
* @param tableAdapter The {@link IcebergTableAdapter} that will be used to access the table.
Expand All @@ -53,33 +51,27 @@ public IcebergKeyValuePartitionedLayout(

// We can assume due to upstream validation that there are no duplicate names (after renaming) that are included
// in the output definition, so we can ignore duplicates.
final MutableInt icebergIndex = new MutableInt(0);
final Map<String, Integer> availablePartitioningColumns = partitionSpec.fields().stream()
.peek(partitionField -> {
// TODO (deephaven-core#6438): Add support to handle non-identity transforms
if (!partitionField.transform().isIdentity()) {
throw new TableDataException("Partition field " + partitionField.name() + " has a " +
"non-identity transform: " + partitionField.transform() + ", which is not supported");
}
})
.map(PartitionField::name)
.map(name -> instructions.columnRenames().getOrDefault(name, name))
.collect(Collectors.toMap(
name -> name,
name -> icebergIndex.getAndIncrement(),
(v1, v2) -> v1,
LinkedHashMap::new));
final List<PartitionField> partitionFields = partitionSpec.fields();
final int numPartitionFields = partitionFields.size();
identityPartitioningColumns = new ArrayList<>(numPartitionFields);
for (int fieldId = 0; fieldId < numPartitionFields; ++fieldId) {
final PartitionField partitionField = partitionFields.get(fieldId);
if (!partitionField.transform().isIdentity()) {
// TODO (DH-18160): Improve support for handling non-identity transforms
continue;
}
final String icebergColName = partitionField.name();
final String dhColName = instructions.columnRenames().getOrDefault(icebergColName, icebergColName);
final ColumnDefinition<?> columnDef = tableDef.getColumn(dhColName);
if (columnDef == null) {
throw new TableDataException("Partitioning column " + dhColName + " not found in table definition " +
"but corresponding identity partitioning column " + icebergColName + " is present in the " +
"partition spec, table definition: " + tableDef + ", partition spec: " + partitionSpec);
}
identityPartitioningColumns.add(new IdentityPartitioningColData(dhColName,
TypeUtils.getBoxedType(columnDef.getDataType()), fieldId));

outputPartitioningColumns = tableDef.getColumnStream()
.map((final ColumnDefinition<?> columnDef) -> {
final Integer index = availablePartitioningColumns.get(columnDef.getName());
if (index == null) {
return null;
}
return new ColumnData(columnDef.getName(), TypeUtils.getBoxedType(columnDef.getDataType()), index);
})
.filter(Objects::nonNull)
.collect(Collectors.toList());
}
}

@Override
Expand All @@ -95,12 +87,11 @@ IcebergTableLocationKey keyFromDataFile(
final Map<String, Comparable<?>> partitions = new LinkedHashMap<>();

final PartitionData partitionData = (PartitionData) dataFile.partition();
for (final ColumnData colData : outputPartitioningColumns) {
for (final IdentityPartitioningColData colData : identityPartitioningColumns) {
final String colName = colData.name;
final Object colValue;
final Object valueFromPartitionData = partitionData.get(colData.index);
if (valueFromPartitionData != null) {
// TODO (deephaven-core#6438): Assuming identity transform here
colValue = IdentityPartitionConverters.convertConstant(
partitionData.getType(colData.index), valueFromPartitionData);
if (!colData.type.isAssignableFrom(colValue.getClass())) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.iceberg;

import io.deephaven.engine.table.ColumnDefinition;
import io.deephaven.engine.table.Table;
import io.deephaven.engine.table.TableDefinition;
import io.deephaven.engine.testutil.TstUtils;
import io.deephaven.engine.util.TableTools;
import io.deephaven.iceberg.sqlite.DbResource;
import io.deephaven.iceberg.util.IcebergCatalogAdapter;
import io.deephaven.iceberg.util.IcebergTableAdapter;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;

import java.time.LocalDateTime;
import java.net.URISyntaxException;
import java.util.List;
import static io.deephaven.util.QueryConstants.NULL_DOUBLE;

import static org.assertj.core.api.Assertions.assertThat;

/**
* This test shows that we can integrate with data written by <a href="https://py.iceberg.apache.org/">pyiceberg</a>.
* See TESTING.md and generate-pyiceberg-2.py for more details.
*/
@Tag("security-manager-allow")
class Pyiceberg2Test {
private static final Namespace NAMESPACE = Namespace.of("trading");
private static final TableIdentifier TRADING_DATA = TableIdentifier.of(NAMESPACE, "data");

// This will need to be updated if the data is regenerated
private static final long SNAPSHOT_1_ID = 2806418501596315192L;

private static final TableDefinition TABLE_DEFINITION = TableDefinition.of(
ColumnDefinition.fromGenericType("datetime", LocalDateTime.class),
ColumnDefinition.ofString("symbol").withPartitioning(),
ColumnDefinition.ofDouble("bid"),
ColumnDefinition.ofDouble("ask"));

private IcebergCatalogAdapter catalogAdapter;

@BeforeEach
void setUp() throws URISyntaxException {
catalogAdapter = DbResource.openCatalog("pyiceberg-2");
}

@Test
void catalogInfo() {
assertThat(catalogAdapter.listNamespaces()).containsExactly(NAMESPACE);
assertThat(catalogAdapter.listTables(NAMESPACE)).containsExactly(TRADING_DATA);

final IcebergTableAdapter tableAdapter = catalogAdapter.loadTable(TRADING_DATA);
final List<Snapshot> snapshots = tableAdapter.listSnapshots();
assertThat(snapshots).hasSize(1);
{
final Snapshot snapshot = snapshots.get(0);
assertThat(snapshot.parentId()).isNull();
assertThat(snapshot.schemaId()).isEqualTo(0);
assertThat(snapshot.sequenceNumber()).isEqualTo(1L);
assertThat(snapshot.snapshotId()).isEqualTo(SNAPSHOT_1_ID);
}
}

@Test
void testDefinition() {
final IcebergTableAdapter tableAdapter = catalogAdapter.loadTable(TRADING_DATA);
final TableDefinition td = tableAdapter.definition();
assertThat(td).isEqualTo(TABLE_DEFINITION);

// Check the partition spec
final PartitionSpec partitionSpec = tableAdapter.icebergTable().spec();
assertThat(partitionSpec.fields().size()).isEqualTo(2);
final PartitionField firstPartitionField = partitionSpec.fields().get(0);
assertThat(firstPartitionField.name()).isEqualTo("datetime_day");
assertThat(firstPartitionField.transform().toString()).isEqualTo("day");

final PartitionField secondPartitionField = partitionSpec.fields().get(1);
assertThat(secondPartitionField.name()).isEqualTo("symbol");
assertThat(secondPartitionField.transform().toString()).isEqualTo("identity");
}

@Test
void testData() {
final IcebergTableAdapter tableAdapter = catalogAdapter.loadTable(TRADING_DATA);
final Table fromIceberg = tableAdapter.table();
assertThat(fromIceberg.size()).isEqualTo(5);
final Table expectedData = TableTools.newTable(TABLE_DEFINITION,
TableTools.col("datetime",
LocalDateTime.of(2024, 11, 27, 10, 0, 0),
LocalDateTime.of(2024, 11, 27, 10, 0, 0),
LocalDateTime.of(2024, 11, 26, 10, 1, 0),
LocalDateTime.of(2024, 11, 26, 10, 2, 0),
LocalDateTime.of(2024, 11, 28, 10, 3, 0)),
TableTools.stringCol("symbol", "AAPL", "MSFT", "GOOG", "AMZN", "MSFT"),
TableTools.doubleCol("bid", 150.25, 150.25, 2800.75, 3400.5, NULL_DOUBLE),
TableTools.doubleCol("ask", 151.0, 151.0, 2810.5, 3420.0, 250.0));
TstUtils.assertTableEquals(expectedData.sort("datetime", "symbol"),
fromIceberg.sort("datetime", "symbol"));
}
}
Git LFS file not shown
Git LFS file not shown
Git LFS file not shown
Git LFS file not shown
Git LFS file not shown
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"location":"catalogs/pyiceberg-2/trading.db/data","table-uuid":"d1f874d4-c065-432d-969b-39efb3e9eb1c","last-updated-ms":1733868694938,"last-column-id":4,"schemas":[{"type":"struct","fields":[{"id":1,"name":"datetime","type":"timestamp","required":false},{"id":2,"name":"symbol","type":"string","required":false},{"id":3,"name":"bid","type":"double","required":false},{"id":4,"name":"ask","type":"double","required":false}],"schema-id":0,"identifier-field-ids":[]}],"current-schema-id":0,"partition-specs":[{"spec-id":0,"fields":[{"source-id":1,"field-id":1000,"transform":"day","name":"datetime_day"},{"source-id":2,"field-id":1001,"transform":"identity","name":"symbol"}]}],"default-spec-id":0,"last-partition-id":1001,"properties":{},"snapshots":[],"snapshot-log":[],"metadata-log":[],"sort-orders":[{"order-id":0,"fields":[]}],"default-sort-order-id":0,"refs":{},"format-version":2,"last-sequence-number":0}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"location":"catalogs/pyiceberg-2/trading.db/data","table-uuid":"d1f874d4-c065-432d-969b-39efb3e9eb1c","last-updated-ms":1733868695120,"last-column-id":4,"schemas":[{"type":"struct","fields":[{"id":1,"name":"datetime","type":"timestamp","required":false},{"id":2,"name":"symbol","type":"string","required":false},{"id":3,"name":"bid","type":"double","required":false},{"id":4,"name":"ask","type":"double","required":false}],"schema-id":0,"identifier-field-ids":[]}],"current-schema-id":0,"partition-specs":[{"spec-id":0,"fields":[{"source-id":1,"field-id":1000,"transform":"day","name":"datetime_day"},{"source-id":2,"field-id":1001,"transform":"identity","name":"symbol"}]}],"default-spec-id":0,"last-partition-id":1001,"properties":{},"current-snapshot-id":2806418501596315192,"snapshots":[{"snapshot-id":2806418501596315192,"sequence-number":1,"timestamp-ms":1733868695120,"manifest-list":"catalogs/pyiceberg-2/trading.db/data/metadata/snap-2806418501596315192-0-d9c06748-9892-404f-a744-7bbfd06d0eeb.avro","summary":{"operation":"append","added-files-size":"9816","added-data-files":"5","added-records":"5","changed-partition-count":"5","total-data-files":"5","total-delete-files":"0","total-records":"5","total-files-size":"9816","total-position-deletes":"0","total-equality-deletes":"0"},"schema-id":0}],"snapshot-log":[{"snapshot-id":2806418501596315192,"timestamp-ms":1733868695120}],"metadata-log":[],"sort-orders":[{"order-id":0,"fields":[]}],"default-sort-order-id":0,"refs":{"main":{"snapshot-id":2806418501596315192,"type":"branch"}},"format-version":2,"last-sequence-number":1}
Binary file not shown.
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
'''
See TESTING.md for how to run this script.
'''

from pyiceberg.schema import Schema
from pyiceberg.types import NestedField, StringType, DoubleType
from pyiceberg.catalog.sql import SqlCatalog
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
'''
See TESTING.md for how to run this script.
'''

import pyarrow as pa
from datetime import datetime
from pyiceberg.catalog.sql import SqlCatalog
from pyiceberg.schema import Schema
from pyiceberg.types import TimestampType, FloatType, DoubleType, StringType, NestedField, StructType
from pyiceberg.partitioning import PartitionSpec, PartitionField
from pyiceberg.transforms import DayTransform, IdentityTransform

catalog = SqlCatalog(
"pyiceberg-2",
**{
"uri": f"sqlite:///dh-iceberg-test.db",
"warehouse": f"catalogs/pyiceberg-2",
},
)

schema = Schema(
NestedField(field_id=1, name="datetime", field_type=TimestampType(), required=False),
NestedField(field_id=2, name="symbol", field_type=StringType(), required=False),
NestedField(field_id=3, name="bid", field_type=DoubleType(), required=False),
NestedField(field_id=4, name="ask", field_type=DoubleType(), required=False),
)

partition_spec = PartitionSpec(
PartitionField(
source_id=1, field_id=1000, transform=DayTransform(), name="datetime_day",
),
PartitionField(
source_id=2, field_id=1001, transform=IdentityTransform(), name="symbol",
)
)

catalog.create_namespace("trading")

tbl = catalog.create_table(
identifier="trading.data",
schema=schema,
partition_spec=partition_spec,
)

# Define the data according to your Iceberg schema
data = [
{"datetime": datetime(2024, 11, 27, 10, 0, 0), "symbol": "AAPL", "bid": 150.25, "ask": 151.0},
{"datetime": datetime(2024, 11, 27, 10, 0, 0), "symbol": "MSFT", "bid": 150.25, "ask": 151.0},
{"datetime": datetime(2024, 11, 26, 10, 1, 0), "symbol": "GOOG", "bid": 2800.75, "ask": 2810.5},
{"datetime": datetime(2024, 11, 26, 10, 2, 0), "symbol": "AMZN", "bid": 3400.5, "ask": 3420.0},
{"datetime": datetime(2024, 11, 28, 10, 3, 0), "symbol": "MSFT", "bid": None, "ask": 250.0},
]

# Create a PyArrow Table
table = pa.Table.from_pylist(data)

# Append the table to the Iceberg table
tbl.append(table)