Skip to content

Commit

Permalink
initial fix and new test
Browse files Browse the repository at this point in the history
  • Loading branch information
scottsand-db committed Jan 7, 2025
1 parent acfb8df commit 7a08a26
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.BinaryType;
import org.apache.flink.table.types.logical.BooleanType;
import org.apache.flink.table.types.logical.DateType;
import org.apache.flink.table.types.logical.DecimalType;
Expand All @@ -16,6 +15,7 @@
import org.apache.flink.table.types.logical.SmallIntType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.TinyIntType;
import org.apache.flink.table.types.logical.VarBinaryType;
import org.apache.flink.table.types.logical.VarCharType;

import io.delta.standalone.types.DataType;
Expand Down Expand Up @@ -64,7 +64,7 @@ public static LogicalType toFlinkDataType(DataType deltaType, boolean nullable)
case LONG:
return new BigIntType(nullable);
case BINARY:
return new BinaryType(nullable, BinaryType.DEFAULT_LENGTH);
return new VarBinaryType(nullable, VarBinaryType.MAX_LENGTH);
case BOOLEAN:
return new BooleanType(nullable);
case BYTE:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.BinaryType;
import org.apache.flink.table.types.logical.BooleanType;
import org.apache.flink.table.types.logical.DateType;
import org.apache.flink.table.types.logical.DecimalType;
Expand All @@ -18,6 +17,7 @@
import org.apache.flink.table.types.logical.SmallIntType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.TinyIntType;
import org.apache.flink.table.types.logical.VarBinaryType;
import org.apache.flink.table.types.logical.VarCharType;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
Expand Down Expand Up @@ -71,7 +71,7 @@ private static Stream<Arguments> dataTypes() {
Arguments.of(new io.delta.standalone.types.ByteType(), new TinyIntType()),
Arguments.of(new io.delta.standalone.types.ShortType(), new SmallIntType()),
Arguments.of(new io.delta.standalone.types.LongType(), new BigIntType()),
Arguments.of(new io.delta.standalone.types.BinaryType(), new BinaryType()),
Arguments.of(new io.delta.standalone.types.BinaryType(), new VarBinaryType(VarBinaryType.MAX_LENGTH)),
Arguments.of(new io.delta.standalone.types.TimestampType(), new TimestampType()),
Arguments.of(new io.delta.standalone.types.DateType(), new DateType()),
Arguments.of(
Expand Down Expand Up @@ -163,7 +163,7 @@ private static Stream<Arguments> mapTypes() {
new io.delta.standalone.types.ShortType(),
true
),
new MapType(new BinaryType(), new SmallIntType())),
new MapType(new VarBinaryType(VarBinaryType.MAX_LENGTH), new SmallIntType())),
Arguments.of(
new io.delta.standalone.types.MapType(
new io.delta.standalone.types.StringType(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,56 @@ public void setUp() {
nonPartitionedLargeTablePath);
}

/** Tests fix for https://github.com/delta-io/delta/issues/3977 */
@Test
public void testWriteFromDatagenTableToDeltaTypeWithBytesType() throws Exception {
final StreamTableEnvironment tableEnv = setupTableEnvAndDeltaCatalog(false);
final String targetTablePath = TEMPORARY_FOLDER.newFolder().getAbsolutePath();
final String datagenSourceDDL = ""
+ "CREATE TABLE source_table ("
+ " id BIGINT,"
+ " binary_data BYTES"
+ ") WITH ("
+ " 'connector' = 'datagen',"
+ " 'fields.id.kind' = 'sequence',"
+ " 'fields.id.start' = '1',"
+ " 'fields.id.end' = '8',"
+ " 'number-of-rows' = '8'," // this makes the source BOUNDED
+ " 'fields.binary_data.kind' = 'random',"
+ " 'fields.binary_data.length' = '16'"
+ ")";
final String deltaSinkDDL = String.format(""
+ "CREATE TABLE target_table ("
+ " id BIGINT,"
+ " binary_data BYTES"
+ ") WITH ("
+ " 'connector' = 'delta',"
+ " 'table-path' = '%s'"
+ ")",
targetTablePath);

// Stage 1: Create the source and validate it

tableEnv.executeSql(datagenSourceDDL).await();

final List<Row> sourceRows =
DeltaTestUtils.readTableResult(tableEnv.executeSql("SELECT * FROM source_table"));

assertThat(sourceRows).hasSize(8);

// Stage 2: Create the sink and insert into it and validate it

tableEnv.executeSql(deltaSinkDDL).await();

// If our fix for issue #3977 did not work, then this would have thrown an exception.
tableEnv.executeSql("INSERT INTO target_table SELECT * FROM source_table").await();

final List<Row> targetRows =
DeltaTestUtils.readTableResult(tableEnv.executeSql("SELECT * FROM target_table"));

assertThat(targetRows).hasSize(8);
}

@Test
public void shouldReadAndWriteDeltaTable() throws Exception {

Expand Down

0 comments on commit 7a08a26

Please sign in to comment.