Skip to content

Commit

Permalink
Remove Dv2 Null checks (#31149)
Browse files Browse the repository at this point in the history
Co-authored-by: evantahler <[email protected]>
Co-authored-by: Edward Gao <[email protected]>
  • Loading branch information
3 people authored Oct 10, 2023
1 parent 73a6ec0 commit 898846d
Show file tree
Hide file tree
Showing 21 changed files with 232 additions and 233 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.airbyte.commons.string.Strings;
import io.airbyte.protocol.models.v0.DestinationSyncMode;
import io.airbyte.protocol.models.v0.SyncMode;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
Expand Down Expand Up @@ -353,43 +354,6 @@ public void detectColumnChanged() throws Exception {
"Altering a column was not detected as a schema change.");
}

/**
* Test that T+D throws an error for an incremental-dedup sync where at least one record has a null
* primary key, and that we don't write any final records.
*/
@Test
public void incrementalDedupInvalidPrimaryKey() throws Exception {
createRawTable(streamId);
createFinalTable(incrementalDedupStream, "");
insertRawTableRecords(
streamId,
List.of(
Jsons.deserialize(
"""
{
"_airbyte_raw_id": "10d6e27d-ae7a-41b5-baf8-c4c277ef9c11",
"_airbyte_extracted_at": "2023-01-01T00:00:00Z",
"_airbyte_data": {}
}
"""),
Jsons.deserialize(
"""
{
"_airbyte_raw_id": "5ce60e70-98aa-4fe3-8159-67207352c4f0",
"_airbyte_extracted_at": "2023-01-01T00:00:00Z",
"_airbyte_data": {"id1": 1, "id2": 100}
}
""")));

final String sql = generator.updateTable(incrementalDedupStream, "");
assertThrows(
Exception.class,
() -> destinationHandler.execute(sql));
DIFFER.diffFinalTableRecords(
emptyList(),
dumpFinalTableRecords(streamId, ""));
}

/**
* Test that T+D supports streams whose name and namespace are the same.
*/
Expand Down Expand Up @@ -926,6 +890,46 @@ public void testReservedKeywords() throws Exception {
dumpFinalTableRecords(streamId, ""));
}

/**
* Verify that the final table does not include NON-NULL PKs (after
* https://github.com/airbytehq/airbyte/pull/31082)
*/
@Test
public void ensurePKsAreIndexedUnique() throws Exception {
createRawTable(streamId);
insertRawTableRecords(
streamId,
List.of(Jsons.deserialize(
"""
{
"_airbyte_raw_id": "14ba7c7f-e398-4e69-ac22-28d578400dbc",
"_airbyte_extracted_at": "2023-01-01T00:00:00Z",
"_airbyte_data": {
"id1": 1,
"id2": 2
}
}
""")));

final String createTable = generator.createTable(incrementalDedupStream, "", false);

// should be OK with new tables
destinationHandler.execute(createTable);
final Optional<DialectTableDefinition> existingTableA = destinationHandler.findExistingTable(streamId);
assertTrue(generator.existingSchemaMatchesStreamConfig(incrementalDedupStream, existingTableA.get()));
destinationHandler.execute("DROP TABLE " + streamId.finalTableId(""));

// Hack the create query to add NOT NULLs to emulate the old behavior
final String createTableModified = Arrays.stream(createTable.split(System.lineSeparator()))
.map(line -> !line.contains("CLUSTER") && (line.contains("id1") || line.contains("id2") || line.contains("ID1") || line.contains("ID2"))
? line.replace(",", " NOT NULL,")
: line)
.collect(Collectors.joining("\r\n"));
destinationHandler.execute(createTableModified);
final Optional<DialectTableDefinition> existingTableB = destinationHandler.findExistingTable(streamId);
assertFalse(generator.existingSchemaMatchesStreamConfig(incrementalDedupStream, existingTableB.get()));
}

/**
* A stream with no columns is weird, but we shouldn't treat it specially in any way. It should
* create a final table as usual, and populate it with the relevant metadata columns.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
// there is no entry for id2, which is a required PK for this schema
{"type": "RECORD", "record": {"emitted_at": 1000, "data": {"id1": 1, "id2": null, "updated_at": "2000-01-01T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "San Francisco", "state": "CA"}}}}
{"type": "RECORD", "record": {"emitted_at": 2000, "data": {"id1": 1, "id2": null, "updated_at": "2000-01-02T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "San Francisco", "state": "CA"}}}}
{"type": "RECORD", "record": {"emitted_at": 3000, "data": {"id1": 1, "id2": null, "updated_at": "2000-01-03T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "San Francisco", "state": "CA"}}}}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public void migrateIfNecessary(
final SqlGenerator sqlGenerator,
final DestinationHandler destinationHandler,
final StreamConfig streamConfig)
throws TableNotMigratedException, UnexpectedSchemaException {
throws TableNotMigratedException, UnexpectedSchemaException, Exception {
LOGGER.info("Assessing whether migration is necessary for stream {}", streamConfig.id().finalName());
if (shouldMigrate(streamConfig)) {
LOGGER.info("Starting v2 Migration for stream {}", streamConfig.id().finalName());
Expand All @@ -40,7 +40,7 @@ public void migrateIfNecessary(
* @param streamConfig the stream in question
* @return whether to migrate the stream
*/
protected boolean shouldMigrate(final StreamConfig streamConfig) {
protected boolean shouldMigrate(final StreamConfig streamConfig) throws Exception {
final var v1RawTable = convertToV1RawName(streamConfig);
LOGGER.info("Checking whether v1 raw table {} in dataset {} exists", v1RawTable.tableName(), v1RawTable.namespace());
final var syncModeNeedsMigration = isMigrationRequiredForSyncMode(streamConfig.destinationSyncMode());
Expand All @@ -66,7 +66,7 @@ public void migrate(final SqlGenerator<DialectTableDefinition> sqlGenerator,
final var namespacedTableName = convertToV1RawName(streamConfig);
try {
destinationHandler.execute(sqlGenerator.migrateFromV1toV2(streamConfig.id(), namespacedTableName.namespace(), namespacedTableName.tableName()));
} catch (Exception e) {
} catch (final Exception e) {
final var message = "Attempted and failed to migrate stream %s".formatted(streamConfig.id().finalName());
throw new TableNotMigratedException(message, e);
}
Expand All @@ -78,7 +78,7 @@ public void migrate(final SqlGenerator<DialectTableDefinition> sqlGenerator,
* @param existingV2AirbyteRawTable the v1 raw table
* @return whether the schema is as expected
*/
private boolean doesV1RawTableMatchExpectedSchema(DialectTableDefinition existingV2AirbyteRawTable) {
private boolean doesV1RawTableMatchExpectedSchema(final DialectTableDefinition existingV2AirbyteRawTable) {

return schemaMatchesExpectation(existingV2AirbyteRawTable, LEGACY_RAW_TABLE_COLUMNS);
}
Expand All @@ -88,7 +88,7 @@ private boolean doesV1RawTableMatchExpectedSchema(DialectTableDefinition existin
*
* @param existingV2AirbyteRawTable the v2 raw table
*/
private void validateAirbyteInternalNamespaceRawTableMatchExpectedV2Schema(DialectTableDefinition existingV2AirbyteRawTable) {
private void validateAirbyteInternalNamespaceRawTableMatchExpectedV2Schema(final DialectTableDefinition existingV2AirbyteRawTable) {
if (!schemaMatchesExpectation(existingV2AirbyteRawTable, V2_RAW_TABLE_COLUMN_NAMES)) {
throw new UnexpectedSchemaException("Destination V2 Raw Table does not match expected Schema");
}
Expand All @@ -110,7 +110,7 @@ private boolean isMigrationRequiredForSyncMode(final DestinationSyncMode destina
* @param streamConfig the raw table to check
* @return whether it exists and is in the correct format
*/
private boolean doesValidV2RawTableAlreadyExist(final StreamConfig streamConfig) {
private boolean doesValidV2RawTableAlreadyExist(final StreamConfig streamConfig) throws Exception {
if (doesAirbyteInternalNamespaceExist(streamConfig)) {
final var existingV2Table = getTableIfExists(streamConfig.id().rawNamespace(), streamConfig.id().rawName());
existingV2Table.ifPresent(this::validateAirbyteInternalNamespaceRawTableMatchExpectedV2Schema);
Expand All @@ -126,7 +126,7 @@ private boolean doesValidV2RawTableAlreadyExist(final StreamConfig streamConfig)
* @param tableName
* @return whether it exists and is in the correct format
*/
protected boolean doesValidV1RawTableExist(final String namespace, final String tableName) {
protected boolean doesValidV1RawTableExist(final String namespace, final String tableName) throws Exception {
final var existingV1RawTable = getTableIfExists(namespace, tableName);
return existingV1RawTable.isPresent() && doesV1RawTableMatchExpectedSchema(existingV1RawTable.get());
}
Expand All @@ -137,7 +137,7 @@ protected boolean doesValidV1RawTableExist(final String namespace, final String
* @param streamConfig the stream to check
* @return whether the schema exists
*/
abstract protected boolean doesAirbyteInternalNamespaceExist(StreamConfig streamConfig);
abstract protected boolean doesAirbyteInternalNamespaceExist(StreamConfig streamConfig) throws Exception;

/**
* Checks a Table's schema and compares it to an expected schema to make sure it matches
Expand All @@ -155,7 +155,7 @@ protected boolean doesValidV1RawTableExist(final String namespace, final String
* @param tableName
* @return an optional potentially containing a reference to the table
*/
abstract protected Optional<DialectTableDefinition> getTableIfExists(String namespace, String tableName);
abstract protected Optional<DialectTableDefinition> getTableIfExists(String namespace, String tableName) throws Exception;

/**
* We use different naming conventions for raw table names in destinations v2, we need a way to map
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,6 @@ void migrateIfNecessary(
final SqlGenerator<DialectTableDefinition> sqlGenerator,
final DestinationHandler<DialectTableDefinition> destinationHandler,
final StreamConfig streamConfig)
throws TableNotMigratedException, UnexpectedSchemaException;
throws TableNotMigratedException, UnexpectedSchemaException, Exception;

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public class DestinationV1V2MigratorTest {
public static class ShouldMigrateTestArgumentProvider implements ArgumentsProvider {

@Override
public Stream<? extends Arguments> provideArguments(ExtensionContext context) throws Exception {
public Stream<? extends Arguments> provideArguments(final ExtensionContext context) throws Exception {

// Don't throw an exception
final boolean v2SchemaMatches = true;
Expand All @@ -52,24 +52,25 @@ public Stream<? extends Arguments> provideArguments(ExtensionContext context) th

@ParameterizedTest
@ArgumentsSource(ShouldMigrateTestArgumentProvider.class)
public void testShouldMigrate(final DestinationSyncMode destinationSyncMode, final BaseDestinationV1V2Migrator migrator, boolean expected) {
public void testShouldMigrate(final DestinationSyncMode destinationSyncMode, final BaseDestinationV1V2Migrator migrator, final boolean expected)
throws Exception {
final StreamConfig config = new StreamConfig(STREAM_ID, null, destinationSyncMode, null, null, null);
final var actual = migrator.shouldMigrate(config);
Assertions.assertEquals(expected, actual);
}

@Test
public void testMismatchedSchemaThrowsException() {
public void testMismatchedSchemaThrowsException() throws Exception {
final StreamConfig config = new StreamConfig(STREAM_ID, null, DestinationSyncMode.APPEND_DEDUP, null, null, null);
final var migrator = makeMockMigrator(true, true, false, false, false);
UnexpectedSchemaException exception = Assertions.assertThrows(UnexpectedSchemaException.class,
final UnexpectedSchemaException exception = Assertions.assertThrows(UnexpectedSchemaException.class,
() -> migrator.shouldMigrate(config));
Assertions.assertEquals("Destination V2 Raw Table does not match expected Schema", exception.getMessage());
}

@SneakyThrows
@Test
public void testMigrate() {
public void testMigrate() throws Exception {
final var sqlGenerator = new MockSqlGenerator();
final StreamConfig stream = new StreamConfig(STREAM_ID, null, DestinationSyncMode.APPEND_DEDUP, null, null, null);
final DestinationHandler<String> handler = Mockito.mock(DestinationHandler.class);
Expand All @@ -80,16 +81,17 @@ public void testMigrate() {
Mockito.verify(handler).execute(sql);
// Exception thrown when executing sql, TableNotMigratedException thrown
Mockito.doThrow(Exception.class).when(handler).execute(Mockito.anyString());
TableNotMigratedException exception = Assertions.assertThrows(TableNotMigratedException.class,
final TableNotMigratedException exception = Assertions.assertThrows(TableNotMigratedException.class,
() -> migrator.migrate(sqlGenerator, handler, stream));
Assertions.assertEquals("Attempted and failed to migrate stream final_table", exception.getMessage());
}

public static BaseDestinationV1V2Migrator makeMockMigrator(final boolean v2NamespaceExists,
final boolean v2TableExists,
final boolean v2RawSchemaMatches,
boolean v1RawTableExists,
boolean v1RawTableSchemaMatches) {
final boolean v1RawTableExists,
final boolean v1RawTableSchemaMatches)
throws Exception {
final BaseDestinationV1V2Migrator migrator = Mockito.spy(BaseDestinationV1V2Migrator.class);
Mockito.when(migrator.doesAirbyteInternalNamespaceExist(Mockito.any())).thenReturn(v2NamespaceExists);
final var existingTable = v2TableExists ? Optional.of("v2_raw") : Optional.empty();
Expand All @@ -103,7 +105,7 @@ public static BaseDestinationV1V2Migrator makeMockMigrator(final boolean v2Names
return migrator;
}

public static BaseDestinationV1V2Migrator noIssuesMigrator() {
public static BaseDestinationV1V2Migrator noIssuesMigrator() throws Exception {
return makeMockMigrator(true, false, true, true, true);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,5 @@ ENV AIRBYTE_NORMALIZATION_INTEGRATION bigquery

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=2.0.26
LABEL io.airbyte.version=2.1.0
LABEL io.airbyte.name=airbyte/destination-bigquery
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 22f6c74f-5699-40ff-833c-4a879ea40133
dockerImageTag: 2.0.26
dockerImageTag: 2.1.0
dockerRepository: airbyte/destination-bigquery
githubIssueLabel: destination-bigquery
icon: bigquery.svg
Expand Down
Loading

0 comments on commit 898846d

Please sign in to comment.