Skip to content

Commit

Permalink
source-mssql: enable heartbeats
Browse files Browse the repository at this point in the history
  • Loading branch information
postamar committed Jan 22, 2024
1 parent 614b1c2 commit 403a116
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ default boolean isEventAheadOffset(final Map<String, String> offset, final Chang
* @return Returns `true` if both offsets are at the same position. Otherwise, it returns `false`
*/
default boolean isSameOffset(final Map<String, String> offsetA, final Map<String, String> offsetB) {
return true;
return false;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public class DebeziumStateDecoratingIterator<T> extends AbstractIterator<Airbyte
private final Duration syncCheckpointDuration;
private final Long syncCheckpointRecords;
private OffsetDateTime dateTimeLastSync;
private Long recordsLastSync;
private long recordsLastSync, recordsAllSyncs;
private boolean sendCheckpointMessage = false;

/**
Expand All @@ -77,7 +77,7 @@ public class DebeziumStateDecoratingIterator<T> extends AbstractIterator<Airbyte
* we just rely on the `offsetManger.read()`, there is a chance to sent duplicate states, generating
* an unneeded usage of networking and processing.
*/
private final HashMap<String, String> previousCheckpointOffset;
private final HashMap<String, String> initialOffset, previousCheckpointOffset;
private final DebeziumConnectorType debeziumConnectorType;
private final ConfiguredAirbyteCatalog configuredAirbyteCatalog;
private final JsonNode config;
Expand Down Expand Up @@ -120,6 +120,7 @@ public DebeziumStateDecoratingIterator(final Iterator<ChangeEventWithMetadata> c
this.syncCheckpointDuration = checkpointDuration;
this.syncCheckpointRecords = checkpointRecords;
this.previousCheckpointOffset = (HashMap<String, String>) offsetManager.read();
this.initialOffset = new HashMap<>(this.previousCheckpointOffset);
this.debeziumConnectorType = debeziumConnectorType;
this.config = config;
resetCheckpointValues();
Expand Down Expand Up @@ -179,11 +180,20 @@ protected AirbyteMessage computeNext() {
}
}
recordsLastSync++;
recordsAllSyncs++;
return DebeziumEventUtils.toAirbyteMessage(event, cdcMetadataInjector, configuredAirbyteCatalog, emittedAt, debeziumConnectorType, config);
}

isSyncFinished = true;
return createStateMessage(offsetManager.read(), recordsLastSync);
final var syncFinishedOffset = (HashMap<String, String>) offsetManager.read();
if (recordsAllSyncs == 0L && targetPosition.isSameOffset(initialOffset, syncFinishedOffset)) {
// Edge case where no progress has been made: wrap up the sync by returning the initial offset instead of the
// current offset. We do this because we found that for some databases, heartbeats will cause Debezium to
// overwrite the offset file with a state which doesn't include all necessary data such as snapshot completion.
// This is the case for MS SQL Server, at least.
return createStateMessage(initialOffset, 0);
}
return createStateMessage(syncFinishedOffset, recordsLastSync);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public class MssqlDebeziumStateUtil {
public JsonNode constructInitialDebeziumState(final Properties properties,
final ConfiguredAirbyteCatalog catalog,
final JdbcDatabase database) {
properties.setProperty("heartbeat.interval.ms", "0");
final JsonNode highWaterMark = constructLsnSnapshotState(database, database.getSourceConfig().get(JdbcUtils.DATABASE_KEY).asText());
final AirbyteFileOffsetBackingStore emptyOffsetManager = AirbyteFileOffsetBackingStore.initializeState(null,
Optional.empty());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,6 @@ public static Properties getDebeziumProperties(final JdbcDatabase database, fina
? HEARTBEAT_INTERVAL_IN_TESTS
: HEARTBEAT_INTERVAL;
props.setProperty("heartbeat.interval.ms", Long.toString(heartbeatInterval.toMillis()));
// TODO: enable heartbeats in MS SQL Server.
props.setProperty("heartbeat.interval.ms", "0");

if (config.has("ssl_method")) {
final JsonNode sslConfig = config.get("ssl_method");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,5 +74,4 @@ public AirbyteMessage saveStateAfterCompletionOfSnapshotOfNewStreams() {
public boolean compressSchemaHistoryForState() {
return COMPRESSION_ENABLED;

Check failure on line 75 in airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlCdcStateHandler.java

View workflow job for this annotation

GitHub Actions / Gradle Check

[Task :airbyte-integrations:connectors:source-mssql:compileJava FAILED] cannot find symbol return COMPRESSION_ENABLED; ^ symbol: variable COMPRESSION_ENABLED
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.airbyte.cdk.integrations.debezium.CdcTargetPosition;
import io.airbyte.cdk.integrations.debezium.internals.ChangeEventWithMetadata;
import io.airbyte.cdk.integrations.debezium.internals.SnapshotMetadata;
import io.airbyte.commons.json.Jsons;
import io.debezium.connector.sqlserver.Lsn;
import java.io.IOException;
import java.sql.SQLException;
Expand Down Expand Up @@ -52,7 +53,8 @@ public boolean reachedTargetPosition(final ChangeEventWithMetadata changeEventWi

@Override
public Lsn extractPositionFromHeartbeatOffset(final Map<String, ?> sourceOffset) {
throw new RuntimeException("Heartbeat is not supported for MSSQL");
final Object commitLsnValue = sourceOffset.get("commit_lsn");
return (commitLsnValue == null) ? Lsn.NULL : Lsn.valueOf(commitLsnValue.toString());
}

private Lsn extractLsn(final JsonNode valueAsJson) {
Expand Down Expand Up @@ -111,4 +113,38 @@ public static MssqlCdcTargetPosition getTargetPosition(final JdbcDatabase databa
}
}

@Override
public boolean isHeartbeatSupported() {
return true;
}

@Override
public boolean reachedTargetPosition(Lsn positionFromHeartbeat) {
return positionFromHeartbeat.compareTo(targetLsn) >= 0;
}

@Override
public boolean isEventAheadOffset(Map<String, String> offset, ChangeEventWithMetadata event) {
if (offset == null || offset.size() != 1) {
return false;
}
final Lsn eventLsn = extractLsn(event.eventValueAsJson());
final Lsn offsetLsn = offsetToLsn(offset);
return eventLsn.compareTo(offsetLsn) > 0;
}

@Override
public boolean isSameOffset(Map<String, String> offsetA, Map<String, String> offsetB) {
if ((offsetA == null || offsetA.size() != 1) || (offsetB == null || offsetB.size() != 1)) {
return false;
}
return offsetToLsn(offsetA).equals(offsetToLsn(offsetB));
}

private Lsn offsetToLsn(Map<String, String> offset) {
final JsonNode offsetJson = Jsons.deserialize((String) offset.values().toArray()[0]);
final JsonNode commitLsnJson = offsetJson.get("commit_lsn");
return (commitLsnJson == null || commitLsnJson.isNull()) ? Lsn.NULL : Lsn.valueOf(commitLsnJson.asText());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,6 @@ private JsonNode config() {
.with("replication_method", Map.of(
"method", "CDC",
"initial_waiting_seconds", 60))

.build();
}

Expand Down

0 comments on commit 403a116

Please sign in to comment.