Skip to content

Commit

Permalink
[Source-MSSQL] Enable per-stream and Global states and migrate away f…
Browse files Browse the repository at this point in the history
…rom Legacy states (airbytehq#33018)
  • Loading branch information
Duy Nguyen authored Dec 13, 2023
1 parent bc891ea commit ce75540
Show file tree
Hide file tree
Showing 15 changed files with 273 additions and 23 deletions.
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mssql/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: b5ea17b1-f170-46dc-bc31-cc744ca984c1
dockerImageTag: 3.2.1
dockerImageTag: 3.3.0
dockerRepository: airbyte/source-mssql
documentationUrl: https://docs.airbyte.com/integrations/sources/mssql
githubIssueLabel: source-mssql
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ static DataToSync getDataToSyncConfig(final JsonNode config) {
return DataToSync.EXISTING_AND_NEW;
}

static Properties getDebeziumProperties(final JdbcDatabase database, final ConfiguredAirbyteCatalog catalog) {
static Properties getDebeziumProperties(final JdbcDatabase database, final ConfiguredAirbyteCatalog catalog, final boolean isSnapshot) {
final JsonNode config = database.getSourceConfig();
final JsonNode dbConfig = database.getDatabaseConfig();

Expand All @@ -160,7 +160,15 @@ static Properties getDebeziumProperties(final JdbcDatabase database, final Confi
props.setProperty("converters", "mssql_converter");
props.setProperty("mssql_converter.type", MSSQLConverter.class.getName());

props.setProperty("snapshot.mode", getDataToSyncConfig(config).getDebeziumSnapshotMode());
// If new stream(s) are added after a previously successful sync,
// the snapshot.mode needs to be initial_only since we don't want to continue streaming changes
// https://debezium.io/documentation/reference/stable/connectors/sqlserver.html#sqlserver-property-snapshot-mode
if (isSnapshot) {
props.setProperty("snapshot.mode", "initial_only");
} else {
props.setProperty("snapshot.mode", getDataToSyncConfig(config).getDebeziumSnapshotMode());
}

props.setProperty("snapshot.isolation.mode", getSnapshotIsolationConfig(config).getDebeziumIsolationMode());

props.setProperty("schema.include.list", getSchema(catalog));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,13 @@ public AirbyteMessage saveState(final Map<String, String> offset, final SchemaHi

@Override
public AirbyteMessage saveStateAfterCompletionOfSnapshotOfNewStreams() {
throw new RuntimeException("Snapshot of individual tables is not implemented in MSSQL");
LOGGER.info("Snapshot of new tables is complete, saving state");
/*
* Namespace pair is ignored by global state manager, but is needed for satisfy the API contract.
* Therefore, provide an empty optional.
*/
final AirbyteStateMessage stateMessage = stateManager.emit(Optional.empty());
return new AirbyteMessage().withType(Type.STATE).withState(stateMessage);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
Expand All @@ -39,13 +40,16 @@
import io.airbyte.commons.functional.CheckedConsumer;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.util.AutoCloseableIterator;
import io.airbyte.commons.util.AutoCloseableIterators;
import io.airbyte.integrations.source.mssql.MssqlCdcHelper.SnapshotIsolation;
import io.airbyte.protocol.models.CommonField;
import io.airbyte.protocol.models.v0.AirbyteCatalog;
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.AirbyteStateMessage.AirbyteStateType;
import io.airbyte.protocol.models.v0.AirbyteStream;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.v0.SyncMode;
import io.debezium.connector.sqlserver.Lsn;
import java.io.File;
Expand All @@ -58,14 +62,12 @@
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Set;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -103,6 +105,15 @@ public MssqlSource() {
super(DRIVER_CLASS, AdaptiveStreamingQueryConfig::new, new MssqlSourceOperations());
}

@Override
protected AirbyteStateType getSupportedStateType(final JsonNode config) {
if (!featureFlags.useStreamCapableState()) {
return AirbyteStateType.LEGACY;
}

return MssqlCdcHelper.isCdc(config) ? AirbyteStateType.GLOBAL : AirbyteStateType.STREAM;
}

@Override
public AirbyteConnectionStatus check(final JsonNode config) throws Exception {
// #15808 Disallow connecting to db with disable, prefer or allow SSL mode when connecting directly
Expand Down Expand Up @@ -472,8 +483,7 @@ protected void assertSnapshotIsolationAllowed(final JsonNode config, final JdbcD
}

@Override
public List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(
final JdbcDatabase database,
public List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(final JdbcDatabase database,
final ConfiguredAirbyteCatalog catalog,
final Map<String, TableInfo<CommonField<JDBCType>>> tableNameToTable,
final StateManager stateManager,
Expand All @@ -491,18 +501,45 @@ public List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(
firstRecordWaitTime,
subsequentRecordWaitTime,
OptionalInt.empty());

final MssqlCdcConnectorMetadataInjector mssqlCdcConnectorMetadataInjector = MssqlCdcConnectorMetadataInjector.getInstance(emittedAt);

final Supplier<AutoCloseableIterator<AirbyteMessage>> incrementalIteratorSupplier = () -> handler.getIncrementalIterators(catalog,
// Determine if new stream(s) have been added to the catalog after initial sync of existing streams
final List<ConfiguredAirbyteStream> streamsToSnapshot = identifyStreamsToSnapshot(catalog, stateManager);
final ConfiguredAirbyteCatalog streamsToSnapshotCatalog = new ConfiguredAirbyteCatalog().withStreams(streamsToSnapshot);

final Supplier<AutoCloseableIterator<AirbyteMessage>> incrementalIteratorsSupplier = () -> handler.getIncrementalIterators(
catalog,
new MssqlCdcSavedInfoFetcher(stateManager.getCdcStateManager().getCdcState()),
new MssqlCdcStateHandler(stateManager),
mssqlCdcConnectorMetadataInjector,
MssqlCdcHelper.getDebeziumProperties(database, catalog),
MssqlCdcHelper.getDebeziumProperties(database, catalog, false),
DebeziumPropertiesManager.DebeziumConnectorType.RELATIONALDB,
emittedAt, true);
emittedAt,
true);

/*
* If the CDC state is null or there is no streams to snapshot, that means no stream has gone
* through the initial sync, so we return the list of incremental iterators
*/
if ((stateManager.getCdcStateManager().getCdcState() == null ||
stateManager.getCdcStateManager().getCdcState().getState() == null ||
streamsToSnapshot.isEmpty())) {
return List.of(incrementalIteratorsSupplier.get());
}

return Collections.singletonList(incrementalIteratorSupplier.get());
// Otherwise, we build the snapshot iterators for the newly added streams(s)
final AutoCloseableIterator<AirbyteMessage> snapshotIterators =
handler.getSnapshotIterators(streamsToSnapshotCatalog,
mssqlCdcConnectorMetadataInjector,
MssqlCdcHelper.getDebeziumProperties(database, catalog, true),
new MssqlCdcStateHandler(stateManager),
DebeziumPropertiesManager.DebeziumConnectorType.RELATIONALDB,
emittedAt);
/*
* The incremental iterators needs to be wrapped in a lazy iterator since only 1 Debezium engine for
* the DB can be running at a time
*/
return List.of(snapshotIterators, AutoCloseableIterators.lazyIterator(incrementalIteratorsSupplier, null));
} else {
LOGGER.info("using CDC: {}", false);
return super.getIncrementalIterators(database, catalog, tableNameToTable, stateManager, emittedAt);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import io.airbyte.cdk.integrations.base.ssh.SshTunnel;
import io.airbyte.cdk.integrations.standardtest.source.SourceAcceptanceTest;
import io.airbyte.cdk.integrations.standardtest.source.TestDestinationEnv;
import io.airbyte.commons.features.FeatureFlags;
import io.airbyte.commons.features.FeatureFlagsWrapper;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.source.mssql.MsSQLTestDatabase.BaseImage;
import io.airbyte.integrations.source.mssql.MsSQLTestDatabase.ContainerModifier;
Expand All @@ -31,6 +33,11 @@ public abstract class AbstractSshMssqlSourceAcceptanceTest extends SourceAccepta
private static final String STREAM_NAME = "dbo.id_and_name";
private static final String STREAM_NAME2 = "dbo.starships";

@Override
protected FeatureFlags featureFlags() {
return FeatureFlagsWrapper.overridingUseStreamCapableState(super.featureFlags(), true);
}

public abstract SshTunnel.TunnelMethod getTunnelMethod();

protected MsSQLTestDatabase testdb;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,34 @@

package io.airbyte.integrations.source.mssql;

import static org.junit.jupiter.api.Assertions.assertEquals;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import io.airbyte.cdk.integrations.base.ssh.SshHelpers;
import io.airbyte.cdk.integrations.standardtest.source.SourceAcceptanceTest;
import io.airbyte.cdk.integrations.standardtest.source.TestDestinationEnv;
import io.airbyte.commons.features.FeatureFlags;
import io.airbyte.commons.features.FeatureFlagsWrapper;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.source.mssql.MsSQLTestDatabase.BaseImage;
import io.airbyte.integrations.source.mssql.MsSQLTestDatabase.ContainerModifier;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.JsonSchemaType;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.AirbyteRecordMessage;
import io.airbyte.protocol.models.v0.AirbyteStateMessage;
import io.airbyte.protocol.models.v0.AirbyteStreamState;
import io.airbyte.protocol.models.v0.CatalogHelpers;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.v0.ConnectorSpecification;
import io.airbyte.protocol.models.v0.DestinationSyncMode;
import io.airbyte.protocol.models.v0.SyncMode;
import java.util.List;
import java.util.stream.Collectors;
import org.junit.jupiter.api.Test;

public class CdcMssqlSourceAcceptanceTest extends SourceAcceptanceTest {

Expand All @@ -35,6 +47,11 @@ protected String getImageName() {
return "airbyte/source-mssql:dev";
}

@Override
protected FeatureFlags featureFlags() {
return FeatureFlagsWrapper.overridingUseStreamCapableState(super.featureFlags(), true);
}

@Override
protected ConnectorSpecification getSpec() throws Exception {
return SshHelpers.getSpecAndInjectSsh();
Expand All @@ -50,7 +67,11 @@ protected JsonNode getConfig() {

@Override
protected ConfiguredAirbyteCatalog getConfiguredCatalog() {
return new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList(
return new ConfiguredAirbyteCatalog().withStreams(getConfiguredAirbyteStreams());
}

protected List<ConfiguredAirbyteStream> getConfiguredAirbyteStreams() {
return Lists.newArrayList(
new ConfiguredAirbyteStream()
.withSyncMode(SyncMode.INCREMENTAL)
.withDestinationSyncMode(DestinationSyncMode.APPEND)
Expand All @@ -74,7 +95,7 @@ protected ConfiguredAirbyteCatalog getConfiguredCatalog() {
.withSourceDefinedCursor(true)
.withSourceDefinedPrimaryKey(List.of(List.of("id")))
.withSupportedSyncModes(
Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)))));
Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))));
}

@Override
Expand Down Expand Up @@ -119,4 +140,49 @@ protected void tearDown(final TestDestinationEnv testEnv) {
testdb.close();
}

@Test
void testAddNewStreamToExistingSync() throws Exception {
final ConfiguredAirbyteCatalog configuredCatalogWithOneStream =
new ConfiguredAirbyteCatalog().withStreams(List.of(getConfiguredAirbyteStreams().get(0)));

// Start a sync with one stream
final List<AirbyteMessage> messages = runRead(configuredCatalogWithOneStream);
final List<AirbyteRecordMessage> recordMessages = filterRecords(messages);
final List<AirbyteStateMessage> stateMessages = filterStateMessages(messages);
final List<AirbyteStreamState> streamStates = stateMessages.get(0).getGlobal().getStreamStates();

assertEquals(3, recordMessages.size());
assertEquals(1, stateMessages.size());
assertEquals(1, streamStates.size());
assertEquals(STREAM_NAME, streamStates.get(0).getStreamDescriptor().getName());
assertEquals(SCHEMA_NAME, streamStates.get(0).getStreamDescriptor().getNamespace());

final AirbyteStateMessage lastStateMessage = Iterables.getLast(stateMessages);

final ConfiguredAirbyteCatalog configuredCatalogWithTwoStreams = configuredCatalogWithOneStream.withStreams(getConfiguredAirbyteStreams());

// Start another sync with a newly added stream
final List<AirbyteMessage> messages2 = runRead(configuredCatalogWithTwoStreams, Jsons.jsonNode(List.of(lastStateMessage)));
final List<AirbyteRecordMessage> recordMessages2 = filterRecords(messages2);
final List<AirbyteStateMessage> stateMessages2 = filterStateMessages(messages2);

assertEquals(3, recordMessages2.size());
assertEquals(2, stateMessages2.size());

final AirbyteStateMessage lastStateMessage2 = Iterables.getLast(stateMessages2);
final List<AirbyteStreamState> streamStates2 = lastStateMessage2.getGlobal().getStreamStates();

assertEquals(2, streamStates2.size());

assertEquals(STREAM_NAME, streamStates2.get(0).getStreamDescriptor().getName());
assertEquals(SCHEMA_NAME, streamStates2.get(0).getStreamDescriptor().getNamespace());
assertEquals(STREAM_NAME2, streamStates2.get(1).getStreamDescriptor().getName());
assertEquals(SCHEMA_NAME, streamStates2.get(1).getStreamDescriptor().getNamespace());
}

private List<AirbyteStateMessage> filterStateMessages(final List<AirbyteMessage> messages) {
return messages.stream().filter(r -> r.getType() == AirbyteMessage.Type.STATE).map(AirbyteMessage::getState)
.collect(Collectors.toList());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.cdk.db.Database;
import io.airbyte.cdk.integrations.standardtest.source.TestDestinationEnv;
import io.airbyte.commons.features.FeatureFlags;
import io.airbyte.commons.features.FeatureFlagsWrapper;
import io.airbyte.integrations.source.mssql.MsSQLTestDatabase.BaseImage;
import io.airbyte.integrations.source.mssql.MsSQLTestDatabase.ContainerModifier;

Expand All @@ -20,6 +22,11 @@ protected JsonNode getConfig() {
.build();
}

@Override
protected FeatureFlags featureFlags() {
return FeatureFlagsWrapper.overridingUseStreamCapableState(super.featureFlags(), true);
}

@Override
protected Database setupDatabase() {
testdb = MsSQLTestDatabase.in(BaseImage.MSSQL_2022, ContainerModifier.AGENT)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@ protected void setupEnvironment(final TestDestinationEnv environment) {
.withConnectionProperty("databaseName", testdb.getDatabaseName())
.initialized()
.with("CREATE TABLE id_and_name(id INTEGER, name VARCHAR(200), born DATETIMEOFFSET(7));")
.with("CREATE TABLE %s.%s(id INTEGER PRIMARY KEY, name VARCHAR(200));", SCHEMA_NAME, STREAM_NAME2)
.with("INSERT INTO id_and_name (id, name, born) VALUES " +
"(1,'picard', '2124-03-04T01:01:01Z'), " +
"(2, 'crusher', '2124-03-04T01:01:01Z'), " +
"(3, 'vash', '2124-03-04T01:01:01Z');");
"(3, 'vash', '2124-03-04T01:01:01Z');")
.with("INSERT INTO %s.%s (id, name) VALUES (1,'enterprise-d'), (2, 'defiant'), (3, 'yamato'), (4, 'Argo');", SCHEMA_NAME, STREAM_NAME2);
}

@Override
Expand Down
Loading

0 comments on commit ce75540

Please sign in to comment.