Skip to content

Commit

Permalink
SNOW-1806157 Fix field id mismatch during schema evolution for struct…
Browse files Browse the repository at this point in the history
…ured data type (#906)
  • Loading branch information
sfc-gh-alhuang authored Nov 14, 2024
1 parent 45c17a5 commit a233f7e
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,11 @@ public void setupSchema(List<ColumnMetadata> columns) {
* F7: ordinal=7, fieldId=0
*/
if (clientBufferParameters.isEnableIcebergStreaming()) {
Map<Integer, ColumnMetadata> ordinalToColumn = new HashMap<>();
for (ColumnMetadata column : columns) {
ordinalToColumn.put(column.getOrdinal(), column);
}

for (ColumnDescriptor columnDescriptor : schema.getColumns()) {
String[] path = columnDescriptor.getPath();
String columnDotPath = concatDotPath(path);
Expand Down Expand Up @@ -232,7 +237,15 @@ public void setupSchema(List<ColumnMetadata> columns) {
* checked by fieldId and ordinal where columnDisplayName doesn't matter.
*/
String columnDisplayName =
isPrimitiveColumn ? columns.get(ordinal - 1).getName() : columnDotPath;
isPrimitiveColumn
? Optional.ofNullable(ordinalToColumn.get(ordinal))
.orElseThrow(
() ->
new SFException(
ErrorCode.INTERNAL_ERROR,
String.format("Column not found. ordinal=%d.", ordinal)))
.getName()
: columnDotPath;

this.statsMap.put(
primitiveType.getId().toString(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,4 +244,106 @@ public void testNestedDataType() throws Exception {
Arrays.asList(value, newValue),
"id");
}

/** KC Testing pattern */
@Test
public void testReopen() throws Exception {
String tableName = createIcebergTableWithColumns("id int, int_col int");
SnowflakeStreamingIngestChannel channel = openChannel(tableName);
String channelName = channel.getName();

Map<String, Object> value = new HashMap<>();
value.put("id", 0L);
value.put("int_col", 1L);

verifyMultipleColumns(
tableName,
channel,
Collections.singletonList(value),
Collections.singletonList(value),
"id");

Map<String, Object> newValue = new HashMap<>();
newValue.put("id", 1L);
newValue.put("new_int_col", 2L);
Assertions.assertThatThrownBy(
() ->
verifyMultipleColumns(
tableName,
channel,
Collections.singletonList(newValue),
Arrays.asList(value, newValue),
"id"))
.isInstanceOf(SFException.class)
.hasMessage(
"The given row cannot be converted to the internal format: Extra columns:"
+ " [new_int_col]. Columns not present in the table shouldn't be specified,"
+ " rowIndex:0")
.extracting("vendorCode")
.isEqualTo(ErrorCode.INVALID_FORMAT_ROW.getMessageCode());

conn.createStatement()
.execute(String.format("ALTER ICEBERG TABLE %s ADD COLUMN new_int_col int", tableName));

Assertions.assertThat(channel.isValid()).isTrue();
SnowflakeStreamingIngestChannel newChannel = openChannel(tableName);
Assertions.assertThat(newChannel.getName()).isEqualTo(channelName);
Assertions.assertThat(channel.isValid()).isFalse();

verifyMultipleColumns(
tableName,
newChannel,
Collections.singletonList(newValue),
Arrays.asList(value, newValue),
"id");
}

@Test
public void testNewFieldId() throws Exception {
String tableName = createIcebergTableWithColumns("id int, obj_col object(a int, b int)");
SnowflakeStreamingIngestChannel channel = openChannel(tableName);

Map<String, Object> value = new HashMap<>();
value.put("id", 0L);
value.put("obj_col", ImmutableMap.of("a", 1, "b", 2));

verifyMultipleColumns(
tableName,
channel,
Collections.singletonList(value),
Collections.singletonList(value),
"id");

conn.createStatement()
.execute(String.format("ALTER ICEBERG TABLE %s ADD COLUMN new_col int", tableName));

Map<String, Object> newValue = new HashMap<>();
newValue.put("id", 1L);
newValue.put("obj_col", ImmutableMap.of("a", 3, "b", 4));
newValue.put("new_col", 5L);

Assertions.assertThatThrownBy(
() ->
verifyMultipleColumns(
tableName,
channel,
Collections.singletonList(newValue),
Arrays.asList(value, newValue),
"id"))
.isInstanceOf(SFException.class)
.hasMessage(
"The given row cannot be converted to the internal format: Extra columns: [new_col]."
+ " Columns not present in the table shouldn't be specified, rowIndex:0")
.extracting("vendorCode")
.isEqualTo(ErrorCode.INVALID_FORMAT_ROW.getMessageCode());

channel.close();
SnowflakeStreamingIngestChannel newChannel = openChannel(tableName);
verifyMultipleColumns(
tableName,
newChannel,
Collections.singletonList(newValue),
Arrays.asList(value, newValue),
"id");
}
}

0 comments on commit a233f7e

Please sign in to comment.