From 7a08a263addd20fede631c90656cdea80185009f Mon Sep 17 00:00:00 2001 From: Scott Sandre Date: Mon, 6 Jan 2025 13:11:35 -0800 Subject: [PATCH] initial fix and new test --- .../source/internal/SchemaConverter.java | 4 +- .../source/internal/SchemaConverterTest.java | 6 +-- .../it/suite/DeltaEndToEndTableTestSuite.java | 50 +++++++++++++++++++ 3 files changed, 55 insertions(+), 5 deletions(-) diff --git a/connectors/flink/src/main/java/io/delta/flink/source/internal/SchemaConverter.java b/connectors/flink/src/main/java/io/delta/flink/source/internal/SchemaConverter.java index ad24209c167..6d0ada0b366 100644 --- a/connectors/flink/src/main/java/io/delta/flink/source/internal/SchemaConverter.java +++ b/connectors/flink/src/main/java/io/delta/flink/source/internal/SchemaConverter.java @@ -2,7 +2,6 @@ import org.apache.flink.table.types.logical.ArrayType; import org.apache.flink.table.types.logical.BigIntType; -import org.apache.flink.table.types.logical.BinaryType; import org.apache.flink.table.types.logical.BooleanType; import org.apache.flink.table.types.logical.DateType; import org.apache.flink.table.types.logical.DecimalType; @@ -16,6 +15,7 @@ import org.apache.flink.table.types.logical.SmallIntType; import org.apache.flink.table.types.logical.TimestampType; import org.apache.flink.table.types.logical.TinyIntType; +import org.apache.flink.table.types.logical.VarBinaryType; import org.apache.flink.table.types.logical.VarCharType; import io.delta.standalone.types.DataType; @@ -64,7 +64,7 @@ public static LogicalType toFlinkDataType(DataType deltaType, boolean nullable) case LONG: return new BigIntType(nullable); case BINARY: - return new BinaryType(nullable, BinaryType.DEFAULT_LENGTH); + return new VarBinaryType(nullable, VarBinaryType.MAX_LENGTH); case BOOLEAN: return new BooleanType(nullable); case BYTE: diff --git a/connectors/flink/src/test/java/io/delta/flink/source/internal/SchemaConverterTest.java b/connectors/flink/src/test/java/io/delta/flink/source/internal/SchemaConverterTest.java index c56e77cba0a..7c734e6fbea 100644 --- a/connectors/flink/src/test/java/io/delta/flink/source/internal/SchemaConverterTest.java +++ b/connectors/flink/src/test/java/io/delta/flink/source/internal/SchemaConverterTest.java @@ -5,7 +5,6 @@ import org.apache.flink.table.types.logical.ArrayType; import org.apache.flink.table.types.logical.BigIntType; -import org.apache.flink.table.types.logical.BinaryType; import org.apache.flink.table.types.logical.BooleanType; import org.apache.flink.table.types.logical.DateType; import org.apache.flink.table.types.logical.DecimalType; @@ -18,6 +17,7 @@ import org.apache.flink.table.types.logical.SmallIntType; import org.apache.flink.table.types.logical.TimestampType; import org.apache.flink.table.types.logical.TinyIntType; +import org.apache.flink.table.types.logical.VarBinaryType; import org.apache.flink.table.types.logical.VarCharType; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; @@ -71,7 +71,7 @@ private static Stream dataTypes() { Arguments.of(new io.delta.standalone.types.ByteType(), new TinyIntType()), Arguments.of(new io.delta.standalone.types.ShortType(), new SmallIntType()), Arguments.of(new io.delta.standalone.types.LongType(), new BigIntType()), - Arguments.of(new io.delta.standalone.types.BinaryType(), new BinaryType()), + Arguments.of(new io.delta.standalone.types.BinaryType(), new VarBinaryType(VarBinaryType.MAX_LENGTH)), Arguments.of(new io.delta.standalone.types.TimestampType(), new TimestampType()), Arguments.of(new io.delta.standalone.types.DateType(), new DateType()), Arguments.of( @@ -163,7 +163,7 @@ private static Stream mapTypes() { new io.delta.standalone.types.ShortType(), true ), - new MapType(new BinaryType(), new SmallIntType())), + new MapType(new VarBinaryType(VarBinaryType.MAX_LENGTH), new SmallIntType())), Arguments.of( new io.delta.standalone.types.MapType( new io.delta.standalone.types.StringType(), diff --git a/connectors/flink/src/test/java/io/delta/flink/table/it/suite/DeltaEndToEndTableTestSuite.java b/connectors/flink/src/test/java/io/delta/flink/table/it/suite/DeltaEndToEndTableTestSuite.java index 4ac015b5380..3e79481e253 100644 --- a/connectors/flink/src/test/java/io/delta/flink/table/it/suite/DeltaEndToEndTableTestSuite.java +++ b/connectors/flink/src/test/java/io/delta/flink/table/it/suite/DeltaEndToEndTableTestSuite.java @@ -94,6 +94,56 @@ public void setUp() { nonPartitionedLargeTablePath); } + /** Tests fix for https://github.com/delta-io/delta/issues/3977 */ + @Test + public void testWriteFromDatagenTableToDeltaTypeWithBytesType() throws Exception { + final StreamTableEnvironment tableEnv = setupTableEnvAndDeltaCatalog(false); + final String targetTablePath = TEMPORARY_FOLDER.newFolder().getAbsolutePath(); + final String datagenSourceDDL = "" + + "CREATE TABLE source_table (" + + " id BIGINT," + + " binary_data BYTES" + + ") WITH (" + + " 'connector' = 'datagen'," + + " 'fields.id.kind' = 'sequence'," + + " 'fields.id.start' = '1'," + + " 'fields.id.end' = '8'," + + " 'number-of-rows' = '8'," // this makes the source BOUNDED + + " 'fields.binary_data.kind' = 'random'," + + " 'fields.binary_data.length' = '16'" + + ")"; + final String deltaSinkDDL = String.format("" + + "CREATE TABLE target_table (" + + " id BIGINT," + + " binary_data BYTES" + + ") WITH (" + + " 'connector' = 'delta'," + + " 'table-path' = '%s'" + + ")", + targetTablePath); + + // Stage 1: Create the source and validate it + + tableEnv.executeSql(datagenSourceDDL).await(); + + final List sourceRows = + DeltaTestUtils.readTableResult(tableEnv.executeSql("SELECT * FROM source_table")); + + assertThat(sourceRows).hasSize(8); + + // Stage 2: Create the sink and insert into it and validate it + + tableEnv.executeSql(deltaSinkDDL).await(); + + // If our fix for issue #3977 did not work, then this would have thrown an exception. + tableEnv.executeSql("INSERT INTO target_table SELECT * FROM source_table").await(); + + final List targetRows = + DeltaTestUtils.readTableResult(tableEnv.executeSql("SELECT * FROM target_table")); + + assertThat(targetRows).hasSize(8); + } + @Test public void shouldReadAndWriteDeltaTable() throws Exception {