Skip to content

Commit

Permalink
working cdc with non resumable full refresh
Browse files Browse the repository at this point in the history
  • Loading branch information
rodireich committed May 20, 2024
1 parent c2a8f98 commit 1cddbdc
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ abstract class AbstractJdbcSource<Datatype>(
}

// If flag is off, fall back to legacy non-resumable refresh
return super.getFullRefreshStream(
return augmentWithStreamStatus(airbyteStream, super.getFullRefreshStream(
database,
airbyteStream,
catalog,
Expand All @@ -150,7 +150,12 @@ abstract class AbstractJdbcSource<Datatype>(
emittedAt,
syncMode,
cursorField,
)
))
}

open fun augmentWithStreamStatus(airbyteStream: ConfiguredAirbyteStream, streamItrator: AutoCloseableIterator<AirbyteMessage>): AutoCloseableIterator<AirbyteMessage> {
//no-op
return streamItrator
}

override fun queryTableFullRefresh(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import io.airbyte.commons.stream.AirbyteStreamStatusHolder;
import io.airbyte.commons.util.AirbyteStreamAware;
import io.airbyte.commons.util.AutoCloseableIterator;
import io.airbyte.commons.util.AutoCloseableIterators;
import io.airbyte.integrations.source.mysql.cdc.CdcConfigurationHelper;
import io.airbyte.integrations.source.mysql.cursor_based.MySqlCursorBasedStateManager;
import io.airbyte.integrations.source.mysql.initialsync.MySqlInitialLoadGlobalStateManager;
Expand Down Expand Up @@ -88,6 +89,8 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.sql.DataSource;

import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -497,18 +500,6 @@ public List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(final
emittedAt);
}

// private List<AutoCloseableIterator<AirbyteMessage>> decorateWithStreamStatus(List<AutoCloseableIterator<AirbyteMessage>> iterators) {
// var ret = new ArrayList<AutoCloseableIterator<AirbyteMessage>>();
// iterators.forEach(iter -> {
// var sai = (AirbyteStreamAware) iter;
// var pair = new io.airbyte.protocol.models.AirbyteStreamNameNamespacePair(sai.getAirbyteStream().get().getName(), sai.getAirbyteStream().get().getNamespace());
// ret.add(new StatusEmitterIterator(new AirbyteStreamStatusHolder(pair, AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.STARTED)));
// ret.add(iter);
// ret.add(new StatusEmitterIterator(new AirbyteStreamStatusHolder(pair, AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.COMPLETE)));
//
// });
// return ret;
// }
@Override
public Set<String> getExcludedInternalNameSpaces() {
return Set.of(
Expand Down Expand Up @@ -658,4 +649,12 @@ public enum ReplicationMethod {
CDC
}

@NotNull
@Override
public AutoCloseableIterator<AirbyteMessage> augmentWithStreamStatus(@NotNull final ConfiguredAirbyteStream airbyteStream, @NotNull final AutoCloseableIterator<AirbyteMessage> streamItrator) {
final var pair = new io.airbyte.protocol.models.AirbyteStreamNameNamespacePair(airbyteStream.getStream().getName(), airbyteStream.getStream().getNamespace());
final var starterStatus = new StatusEmitterIterator(new AirbyteStreamStatusHolder(pair, AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.STARTED));
final var completeStatus = new StatusEmitterIterator(new AirbyteStreamStatusHolder(pair, AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.COMPLETE));
return AutoCloseableIterators.concatWithEagerClose(starterStatus, streamItrator, completeStatus);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ public static List<AutoCloseableIterator<AirbyteMessage>> getCdcReadIterators(fi

final List<AutoCloseableIterator<AirbyteMessage>> completers = catalog.getStreams().stream()
// .filter(stream -> !initialLoadStreams.streamsForInitialLoad.contains(stream))
.filter(stream -> stream.getSyncMode() == SyncMode.INCREMENTAL)
.map(stream -> (AutoCloseableIterator<AirbyteMessage>)new StatusEmitterIterator(
new AirbyteStreamStatusHolder(
new io.airbyte.protocol.models.AirbyteStreamNameNamespacePair(stream.getStream().getName(), stream.getStream().getNamespace()),
Expand Down

0 comments on commit 1cddbdc

Please sign in to comment.