Skip to content

Commit

Permalink
Mssql stream status messages (#38640)
Browse files Browse the repository at this point in the history
Co-authored-by: Xiaohan Song <[email protected]>
  • Loading branch information
rodireich and xiaohansong authored May 28, 2024
1 parent 24eb923 commit ccea023
Show file tree
Hide file tree
Showing 7 changed files with 60 additions and 9 deletions.
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mssql/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.35.4'
cdkVersionRequired = '0.35.11'
features = ['db-sources']
useLocalCdk = false
}
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mssql/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: b5ea17b1-f170-46dc-bc31-cc744ca984c1
dockerImageTag: 4.0.23
dockerImageTag: 4.0.24
dockerRepository: airbyte/source-mssql
documentationUrl: https://docs.airbyte.com/integrations/sources/mssql
githubIssueLabel: source-mssql
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,10 @@
import io.airbyte.cdk.integrations.source.relationaldb.state.StateGeneratorUtils;
import io.airbyte.cdk.integrations.source.relationaldb.state.StateManager;
import io.airbyte.cdk.integrations.source.relationaldb.state.StateManagerFactory;
import io.airbyte.cdk.integrations.source.relationaldb.streamstatus.StreamStatusTraceEmitterIterator;
import io.airbyte.commons.functional.CheckedConsumer;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.stream.AirbyteStreamStatusHolder;
import io.airbyte.commons.util.AutoCloseableIterator;
import io.airbyte.commons.util.AutoCloseableIterators;
import io.airbyte.commons.util.MoreIterators;
Expand Down Expand Up @@ -428,11 +430,11 @@ protected void assertSqlServerAgentRunning(final JdbcDatabase database) throws S
new MssqlInitialLoadHandler(sourceConfig, database, new MssqlSourceOperations(), getQuoteString(), initialLoadStateManager,
Optional.of(namespacePair -> Jsons.jsonNode(pairToCursorBasedStatus.get(namespacePair))),
getTableSizeInfoForStreams(database, initialLoadStreams.streamsForInitialLoad(), getQuoteString()));

// Cursor based incremental iterators are decorated with start and complete status traces
final List<AutoCloseableIterator<AirbyteMessage>> initialLoadIterator = new ArrayList<>(initialLoadHandler.getIncrementalIterators(
new ConfiguredAirbyteCatalog().withStreams(initialLoadStreams.streamsForInitialLoad()),
tableNameToTable,
emittedAt));
emittedAt, true, true));

// Build Cursor based iterator
final List<AutoCloseableIterator<AirbyteMessage>> cursorBasedIterator =
Expand Down Expand Up @@ -672,4 +674,17 @@ public boolean supportResumableFullRefresh(final JdbcDatabase database, final Co
return false;
}

@NotNull
@Override
public AutoCloseableIterator<AirbyteMessage> augmentWithStreamStatus(@NotNull final ConfiguredAirbyteStream airbyteStream,
@NotNull final AutoCloseableIterator<AirbyteMessage> streamItrator) {
final var pair =
new io.airbyte.protocol.models.AirbyteStreamNameNamespacePair(airbyteStream.getStream().getName(), airbyteStream.getStream().getNamespace());
final var starterStatus =
new StreamStatusTraceEmitterIterator(new AirbyteStreamStatusHolder(pair, AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.STARTED));
final var completeStatus =
new StreamStatusTraceEmitterIterator(new AirbyteStreamStatusHolder(pair, AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.COMPLETE));
return AutoCloseableIterators.concatWithEagerClose(starterStatus, streamItrator, completeStatus);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import io.airbyte.cdk.integrations.source.relationaldb.TableInfo;
import io.airbyte.cdk.integrations.source.relationaldb.state.SourceStateIterator;
import io.airbyte.cdk.integrations.source.relationaldb.state.StateEmitFrequency;
import io.airbyte.cdk.integrations.source.relationaldb.streamstatus.StreamStatusTraceEmitterIterator;
import io.airbyte.commons.stream.AirbyteStreamStatusHolder;
import io.airbyte.commons.stream.AirbyteStreamUtils;
import io.airbyte.commons.util.AutoCloseableIterator;
import io.airbyte.commons.util.AutoCloseableIterators;
Expand Down Expand Up @@ -131,20 +133,30 @@ static Map<String, String> aggregateClusteredIndexes(final List<ClusteredIndexAt
public List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(
final ConfiguredAirbyteCatalog catalog,
final Map<String, TableInfo<CommonField<JDBCType>>> tableNameToTable,
final Instant emittedAt) {
final Instant emittedAt,
final boolean decorateWithStartedStatus,
final boolean decorateWithCompletedStatus) {
final List<AutoCloseableIterator<AirbyteMessage>> iteratorList = new ArrayList<>();
for (final ConfiguredAirbyteStream airbyteStream : catalog.getStreams()) {
final AirbyteStream stream = airbyteStream.getStream();
final String streamName = stream.getName();
final String namespace = stream.getNamespace();
// TODO: need to select column according to indexing status of table. may not be primary key
final AirbyteStreamNameNamespacePair pair = new AirbyteStreamNameNamespacePair(streamName, namespace);
final var pair = new io.airbyte.protocol.models.AirbyteStreamNameNamespacePair(streamName, namespace);
if (airbyteStream.getSyncMode().equals(SyncMode.INCREMENTAL)) {
final String fullyQualifiedTableName = DbSourceDiscoverUtil.getFullyQualifiedTableName(namespace, streamName);

// Grab the selected fields to sync
final TableInfo<CommonField<JDBCType>> table = tableNameToTable.get(fullyQualifiedTableName);
if (decorateWithStartedStatus) {
iteratorList.add(
new StreamStatusTraceEmitterIterator(new AirbyteStreamStatusHolder(pair, AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.STARTED)));
}
iteratorList.add(getIteratorForStream(airbyteStream, table, emittedAt));
if (decorateWithCompletedStatus) {
iteratorList.add(new StreamStatusTraceEmitterIterator(
new AirbyteStreamStatusHolder(pair, AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.COMPLETE)));
}
}
}
return iteratorList;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@
import io.airbyte.cdk.integrations.source.relationaldb.models.CursorBasedStatus;
import io.airbyte.cdk.integrations.source.relationaldb.models.OrderedColumnLoadStatus;
import io.airbyte.cdk.integrations.source.relationaldb.state.StateManager;
import io.airbyte.cdk.integrations.source.relationaldb.streamstatus.StreamStatusTraceEmitterIterator;
import io.airbyte.commons.exceptions.ConfigErrorException;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.stream.AirbyteStreamStatusHolder;
import io.airbyte.commons.util.AutoCloseableIterator;
import io.airbyte.commons.util.AutoCloseableIterators;
import io.airbyte.integrations.source.mssql.*;
Expand Down Expand Up @@ -202,12 +204,22 @@ public static List<AutoCloseableIterator<AirbyteMessage>> getCdcReadIterators(fi
final MssqlInitialLoadHandler initialLoadHandler =
getMssqlInitialLoadHandler(database, emittedAt, quoteString, initialLoadStreams, initialLoadStateManager,
Optional.of(new CdcMetadataInjector(emittedAt.toString(), stateAttributes, metadataInjector)));
// Because initial load streams will be followed by cdc read of those stream, we only decorate with
// complete status trace after CDC read is done.
initialLoadIterator.addAll(initialLoadHandler.getIncrementalIterators(
new ConfiguredAirbyteCatalog().withStreams(initialLoadStreams.streamsForInitialLoad()),
tableNameToTable,
emittedAt));
emittedAt, true, false));
}

final List<AutoCloseableIterator<AirbyteMessage>> cdcStreamsStartStatusEmitters = catalog.getStreams().stream()
.filter(stream -> !initialLoadStreams.streamsForInitialLoad.contains(stream))
.map(stream -> (AutoCloseableIterator<AirbyteMessage>) new StreamStatusTraceEmitterIterator(
new AirbyteStreamStatusHolder(
new io.airbyte.protocol.models.AirbyteStreamNameNamespacePair(stream.getStream().getName(), stream.getStream().getNamespace()),
AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.STARTED)))
.toList();

// Build the incremental CDC iterators.
final var targetPosition = MssqlCdcTargetPosition.getTargetPosition(database, sourceConfig.get(JdbcUtils.DATABASE_KEY).asText());
final AirbyteDebeziumHandler<Lsn> handler = new AirbyteDebeziumHandler<>(
Expand All @@ -223,14 +235,24 @@ public static List<AutoCloseableIterator<AirbyteMessage>> getCdcReadIterators(fi
final Supplier<AutoCloseableIterator<AirbyteMessage>> incrementalIteratorsSupplier = () -> handler.getIncrementalIterators(
propertiesManager, eventConverter, new MssqlCdcSavedInfoFetcher(stateToBeUsed), new MssqlCdcStateHandler(stateManager));

final List<AutoCloseableIterator<AirbyteMessage>> allStreamsCompleteStatusEmitters = catalog.getStreams().stream()
.filter(stream -> stream.getSyncMode() == SyncMode.INCREMENTAL)
.map(stream -> (AutoCloseableIterator<AirbyteMessage>) new StreamStatusTraceEmitterIterator(
new AirbyteStreamStatusHolder(
new io.airbyte.protocol.models.AirbyteStreamNameNamespacePair(stream.getStream().getName(), stream.getStream().getNamespace()),
AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.COMPLETE)))
.toList();
// This starts processing the transaction logs as soon as initial sync is complete,
// this is a bit different from the current cdc syncs.
// We finish the current CDC once the initial snapshot is complete and the next sync starts
// processing the transaction logs
return Collections.singletonList(
AutoCloseableIterators.concatWithEagerClose(
Stream
.of(initialLoadIterator, Collections.singletonList(AutoCloseableIterators.lazyIterator(incrementalIteratorsSupplier, null)))
.of(initialLoadIterator,
cdcStreamsStartStatusEmitters,
Collections.singletonList(AutoCloseableIterators.lazyIterator(incrementalIteratorsSupplier, null)),
allStreamsCompleteStatusEmitters)
.flatMap(Collection::stream)
.collect(Collectors.toList()),
AirbyteTraceMessageUtility::emitStreamStatusTrace));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ protected JsonNode config() {
@Override
protected void assertExpectedStateMessageCountMatches(final List<? extends AirbyteStateMessage> stateMessages, long totalCount) {
AtomicLong count = new AtomicLong(0L);
stateMessages.stream().forEach(stateMessage -> count.addAndGet(stateMessage.getSourceStats().getRecordCount().longValue()));
stateMessages.stream().forEach(
stateMessage -> count.addAndGet(stateMessage.getSourceStats() != null ? stateMessage.getSourceStats().getRecordCount().longValue() : 0L));
assertEquals(totalCount, count.get());
}

Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/mssql.md
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,7 @@ WHERE actor_definition_id ='b5ea17b1-f170-46dc-bc31-cc744ca984c1' AND (configura

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:------------------------------------------------------------------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------|
| 4.0.24 | 2024-05-23 | [38640](https://github.com/airbytehq/airbyte/pull/38640) | Sync sending trace status messages indicating progress. |
| 4.0.23 | 2024-05-15 | [38208](https://github.com/airbytehq/airbyte/pull/38208) | disable counts in full refresh stream in state message. |
| 4.0.22 | 2024-05-14 | [38196](https://github.com/airbytehq/airbyte/pull/38196) | Bump jdbc driver version to 12.6.1.jre11 |
| 4.0.21 | 2024-05-07 | [38054](https://github.com/airbytehq/airbyte/pull/38054) | Resumeable refresh should run only if there is source defined pk. |
Expand Down

0 comments on commit ccea023

Please sign in to comment.