Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into keu/declarative-cdk…
Browse files Browse the repository at this point in the history
…/add-conditional-authenticator
  • Loading branch information
eugene-kulak committed Dec 18, 2023
2 parents 73192f7 + 743ab29 commit cc0466a
Show file tree
Hide file tree
Showing 72 changed files with 719 additions and 665 deletions.
2 changes: 2 additions & 0 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ MavenLocal debugging steps:

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.7.8 | 2023-12-18 | [\#33365](https://github.com/airbytehq/airbyte/pull/33365) | Emit stream statuses more consistently |
| 0.7.7 | 2023-12-18 | [\#33434](https://github.com/airbytehq/airbyte/pull/33307) | Remove LEGACY state |
| 0.7.6 | 2023-12-14 | [\#32328](https://github.com/airbytehq/airbyte/pull/33307) | Add schema less mode for mongodb CDC. Fixes for non standard mongodb id type. |
| 0.7.4 | 2023-12-13 | [\#33232](https://github.com/airbytehq/airbyte/pull/33232) | Track stream record count during sync; only run T+D if a stream had nonzero records or the previous sync left unprocessed records. |
| 0.7.3 | 2023-12-13 | [\#33369](https://github.com/airbytehq/airbyte/pull/33369) | Extract shared JDBC T+D code. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,6 @@ private Map<String, String> getWorkerMetadata() {
.put("WORKER_CONNECTOR_IMAGE", imageName)
.put("WORKER_JOB_ID", jobId)
.put("WORKER_JOB_ATTEMPT", String.valueOf(attempt))
.put(EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, String.valueOf(featureFlags.useStreamCapableState()))
.put(EnvVariableFeatureFlags.AUTO_DETECT_SCHEMA, String.valueOf(featureFlags.autoDetectSchema()))
.put(EnvVariableFeatureFlags.APPLY_FIELD_SELECTION, String.valueOf(featureFlags.applyFieldSelection()))
.put(EnvVariableFeatureFlags.FIELD_SELECTION_WORKSPACES, featureFlags.fieldSelectionWorkspaces())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ public class EnvVariableFeatureFlags implements FeatureFlags {

private static final Logger log = LoggerFactory.getLogger(EnvVariableFeatureFlags.class);

public static final String USE_STREAM_CAPABLE_STATE = "USE_STREAM_CAPABLE_STATE";
public static final String AUTO_DETECT_SCHEMA = "AUTO_DETECT_SCHEMA";
// Set this value to true to see all messages from the source to destination, set to one second
// emission
Expand All @@ -24,11 +23,6 @@ public class EnvVariableFeatureFlags implements FeatureFlags {
public static final String STRICT_COMPARISON_NORMALIZATION_TAG = "STRICT_COMPARISON_NORMALIZATION_TAG";
public static final String DEPLOYMENT_MODE = "DEPLOYMENT_MODE";

@Override
public boolean useStreamCapableState() {
return getEnvOrDefault(USE_STREAM_CAPABLE_STATE, false, Boolean::parseBoolean);
}

@Override
public boolean autoDetectSchema() {
return getEnvOrDefault(AUTO_DETECT_SCHEMA, true, Boolean::parseBoolean);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@
*/
public interface FeatureFlags {

boolean useStreamCapableState();

boolean autoDetectSchema();

boolean logConnectorMessages();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,6 @@

public class FeatureFlagsWrapper implements FeatureFlags {

/**
* Overrides the {@link FeatureFlags#useStreamCapableState} method in the feature flags.
*/
static public FeatureFlags overridingUseStreamCapableState(
final FeatureFlags wrapped,
final boolean useStreamCapableState) {
return new FeatureFlagsWrapper(wrapped) {

@Override
public boolean useStreamCapableState() {
return useStreamCapableState;
}

};
}

/**
* Overrides the {@link FeatureFlags#deploymentMode} method in the feature flags.
*/
Expand All @@ -44,11 +28,6 @@ public FeatureFlagsWrapper(FeatureFlags wrapped) {
this.wrapped = wrapped;
}

@Override
public boolean useStreamCapableState() {
return wrapped.useStreamCapableState();
}

@Override
public boolean autoDetectSchema() {
return wrapped.autoDetectSchema();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@
import io.airbyte.commons.stream.StreamStatusUtils;
import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -45,7 +47,7 @@ public final class CompositeIterator<T> extends AbstractIterator<T> implements A
private final List<AutoCloseableIterator<T>> iterators;

private int i;
private boolean firstRead;
private final Set<Optional<AirbyteStreamNameNamespacePair>> seenIterators;
private boolean hasClosed;

CompositeIterator(final List<AutoCloseableIterator<T>> iterators, final Consumer<AirbyteStreamStatusHolder> airbyteStreamStatusConsumer) {
Expand All @@ -54,7 +56,7 @@ public final class CompositeIterator<T> extends AbstractIterator<T> implements A
this.airbyteStreamStatusConsumer = Optional.ofNullable(airbyteStreamStatusConsumer);
this.iterators = iterators;
this.i = 0;
this.firstRead = true;
this.seenIterators = new HashSet<Optional<AirbyteStreamNameNamespacePair>>();
this.hasClosed = false;
}

Expand All @@ -72,6 +74,7 @@ protected T computeNext() {
while (!currentIterator().hasNext()) {
try {
currentIterator().close();
emitStartStreamStatus(currentIterator().getAirbyteStream());
StreamStatusUtils.emitCompleteStreamStatus(getAirbyteStream(), airbyteStreamStatusConsumer);
} catch (final Exception e) {
StreamStatusUtils.emitIncompleteStreamStatus(getAirbyteStream(), airbyteStreamStatusConsumer);
Expand All @@ -80,35 +83,35 @@ protected T computeNext() {

if (i + 1 < iterators.size()) {
i++;
StreamStatusUtils.emitStartStreamStatus(getAirbyteStream(), airbyteStreamStatusConsumer);
firstRead = true;
} else {
return endOfData();
}
}

try {
if (isFirstStream()) {
StreamStatusUtils.emitStartStreamStatus(getAirbyteStream(), airbyteStreamStatusConsumer);
final boolean isFirstRun = emitStartStreamStatus(currentIterator().getAirbyteStream());
final T next = currentIterator().next();
if (isFirstRun) {
StreamStatusUtils.emitRunningStreamStatus(getAirbyteStream(), airbyteStreamStatusConsumer);
}
return currentIterator().next();
return next;
} catch (final RuntimeException e) {
StreamStatusUtils.emitIncompleteStreamStatus(getAirbyteStream(), airbyteStreamStatusConsumer);
throw e;
} finally {
if (firstRead) {
StreamStatusUtils.emitRunningStreamStatus(getAirbyteStream(), airbyteStreamStatusConsumer);
firstRead = false;
}
}
}

private AutoCloseableIterator<T> currentIterator() {
return iterators.get(i);
}

private boolean isFirstStream() {
return i == 0 && firstRead;
private boolean emitStartStreamStatus(final Optional<AirbyteStreamNameNamespacePair> airbyteStream) {
if (airbyteStream.isPresent() && !seenIterators.contains(airbyteStream)) {
seenIterators.add(airbyteStream);
StreamStatusUtils.emitStartStreamStatus(airbyteStream, airbyteStreamStatusConsumer);
return true;
}
return false;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.Iterables;
import io.airbyte.commons.json.Jsons;
import io.airbyte.configoss.State;
import io.airbyte.configoss.StateType;
Expand All @@ -29,7 +28,7 @@ public static class AirbyteStateMessageListTypeReference extends TypeReference<L
* @return An optional state wrapper, if there is no state an empty optional will be returned
*/
@SuppressWarnings("PMD.AvoidLiteralsInIfCondition")
public static Optional<StateWrapper> getTypedState(final JsonNode state, final boolean useStreamCapableState) {
public static Optional<StateWrapper> getTypedState(final JsonNode state) {
if (state == null) {
return Optional.empty();
} else {
Expand All @@ -49,10 +48,10 @@ public static Optional<StateWrapper> getTypedState(final JsonNode state, final b
} else {
switch (stateMessages.get(0).getType()) {
case GLOBAL -> {
return Optional.of(provideGlobalState(stateMessages.get(0), useStreamCapableState));
return Optional.of(provideGlobalState(stateMessages.get(0)));
}
case STREAM -> {
return Optional.of(provideStreamState(stateMessages, useStreamCapableState));
return Optional.of(provideStreamState(stateMessages));
}
case LEGACY -> {
return Optional.of(getLegacyStateWrapper(stateMessages.get(0).getData()));
Expand All @@ -65,7 +64,7 @@ public static Optional<StateWrapper> getTypedState(final JsonNode state, final b
}
} else {
if (stateMessages.stream().allMatch(stateMessage -> stateMessage.getType() == AirbyteStateType.STREAM)) {
return Optional.of(provideStreamState(stateMessages, useStreamCapableState));
return Optional.of(provideStreamState(stateMessages));
}
if (stateMessages.stream().allMatch(stateMessage -> stateMessage.getType() == null)) {
return Optional.of(getLegacyStateWrapper(state));
Expand Down Expand Up @@ -104,16 +103,10 @@ public static Boolean isMigration(final StateType currentStateType, final @Nulla
return previousStateType == StateType.LEGACY && currentStateType != StateType.LEGACY;
}

private static StateWrapper provideGlobalState(final AirbyteStateMessage stateMessages, final boolean useStreamCapableState) {
if (useStreamCapableState) {
return new StateWrapper()
.withStateType(StateType.GLOBAL)
.withGlobal(stateMessages);
} else {
return new StateWrapper()
.withStateType(StateType.LEGACY)
.withLegacyState(stateMessages.getData());
}
private static StateWrapper provideGlobalState(final AirbyteStateMessage stateMessages) {
return new StateWrapper()
.withStateType(StateType.GLOBAL)
.withGlobal(stateMessages);
}

/**
Expand All @@ -123,16 +116,11 @@ private static StateWrapper provideGlobalState(final AirbyteStateMessage stateMe
* @param useStreamCapableState - a flag that indicates whether to return the new format
* @return a wrapped state
*/
private static StateWrapper provideStreamState(final List<AirbyteStateMessage> stateMessages, final boolean useStreamCapableState) {
if (useStreamCapableState) {
return new StateWrapper()
.withStateType(StateType.STREAM)
.withStateMessages(stateMessages);
} else {
return new StateWrapper()
.withStateType(StateType.LEGACY)
.withLegacyState(Iterables.getLast(stateMessages).getData());
}
private static StateWrapper provideStreamState(final List<AirbyteStateMessage> stateMessages) {
return new StateWrapper()
.withStateType(StateType.STREAM)
.withStateMessages(stateMessages);

}

private static StateWrapper getLegacyStateWrapper(final JsonNode state) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.7.6
version=0.7.8
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ public AutoCloseableIterator<AirbyteMessage> read(final JsonNode config,
final AirbyteStateType supportedStateType = getSupportedStateType(config);
final StateManager stateManager =
StateManagerFactory.createStateManager(supportedStateType,
StateGeneratorUtils.deserializeInitialState(state, featureFlags.useStreamCapableState(), supportedStateType), catalog);
StateGeneratorUtils.deserializeInitialState(state, supportedStateType), catalog);
final Instant emittedAt = Instant.now();

final Database database = createDatabase(config);
Expand Down Expand Up @@ -689,7 +689,7 @@ protected int getStateEmissionFrequency() {
* @return A {@link AirbyteStateType} representing the state supported by this connector.
*/
protected AirbyteStateType getSupportedStateType(final JsonNode config) {
return AirbyteStateType.LEGACY;
return AirbyteStateType.STREAM;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -225,10 +225,8 @@ public static AirbyteStateMessage convertStateMessage(final io.airbyte.protocol.
* @return The deserialized object representation of the state.
*/
public static List<AirbyteStateMessage> deserializeInitialState(final JsonNode initialStateJson,
final boolean useStreamCapableState,
final AirbyteStateType supportedStateType) {
final Optional<StateWrapper> typedState = StateMessageHelper.getTypedState(initialStateJson,
useStreamCapableState);
final Optional<StateWrapper> typedState = StateMessageHelper.getTypedState(initialStateJson);
return typedState
.map(state -> switch (state.getStateType()) {
case GLOBAL -> List.of(StateGeneratorUtils.convertStateMessage(state.getGlobal()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
import io.airbyte.cdk.integrations.source.jdbc.test.JdbcSourceAcceptanceTest;
import io.airbyte.cdk.integrations.util.HostPortResolver;
import io.airbyte.cdk.testutils.TestDatabase;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.commons.features.FeatureFlagsWrapper;
import io.airbyte.commons.json.Jsons;
import io.airbyte.protocol.models.v0.AirbyteStateMessage.AirbyteStateType;
import java.sql.JDBCType;
Expand Down Expand Up @@ -59,9 +57,7 @@ protected JsonNode config() {

@Override
protected PostgresTestSource source() {
final var source = new PostgresTestSource();
source.setFeatureFlags(FeatureFlagsWrapper.overridingUseStreamCapableState(new EnvVariableFeatureFlags(), true));
return source;
return new PostgresTestSource();
}

@Override
Expand All @@ -86,11 +82,6 @@ public JsonNode getConfigWithConnectionProperties(final PostgreSQLContainer<?> p
.build());
}

@Override
protected boolean supportsPerStream() {
return true;
}

@AfterAll
static void cleanUp() {
PSQL_CONTAINER.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.cdk.integrations.source.relationaldb.state.StateGeneratorUtils;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.protocol.models.v0.AirbyteStateMessage;
Expand Down Expand Up @@ -41,38 +40,36 @@ void testDeserializationOfLegacyState() throws IOException {
final String legacyStateJson = MoreResources.readResource("states/legacy.json");
final JsonNode legacyState = Jsons.deserialize(legacyStateJson);

final List<AirbyteStateMessage> result = StateGeneratorUtils.deserializeInitialState(legacyState, false,
final List<AirbyteStateMessage> result = StateGeneratorUtils.deserializeInitialState(legacyState,
dbSource.getSupportedStateType(config));
assertEquals(1, result.size());
assertEquals(AirbyteStateType.LEGACY, result.get(0).getType());
}

@Test
void testDeserializationOfGlobalState() throws IOException {
environmentVariables.set(EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, "true");
final AbstractDbSource dbSource = mock(AbstractDbSource.class, withSettings().useConstructor("").defaultAnswer(CALLS_REAL_METHODS));
final JsonNode config = mock(JsonNode.class);

final String globalStateJson = MoreResources.readResource("states/global.json");
final JsonNode globalState = Jsons.deserialize(globalStateJson);

final List<AirbyteStateMessage> result =
StateGeneratorUtils.deserializeInitialState(globalState, true, dbSource.getSupportedStateType(config));
StateGeneratorUtils.deserializeInitialState(globalState, dbSource.getSupportedStateType(config));
assertEquals(1, result.size());
assertEquals(AirbyteStateType.GLOBAL, result.get(0).getType());
}

@Test
void testDeserializationOfStreamState() throws IOException {
environmentVariables.set(EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, "true");
final AbstractDbSource dbSource = mock(AbstractDbSource.class, withSettings().useConstructor("").defaultAnswer(CALLS_REAL_METHODS));
final JsonNode config = mock(JsonNode.class);

final String streamStateJson = MoreResources.readResource("states/per_stream.json");
final JsonNode streamState = Jsons.deserialize(streamStateJson);

final List<AirbyteStateMessage> result =
StateGeneratorUtils.deserializeInitialState(streamState, true, dbSource.getSupportedStateType(config));
StateGeneratorUtils.deserializeInitialState(streamState, dbSource.getSupportedStateType(config));
assertEquals(2, result.size());
assertEquals(AirbyteStateType.STREAM, result.get(0).getType());
}
Expand All @@ -82,7 +79,7 @@ void testDeserializationOfNullState() throws IOException {
final AbstractDbSource dbSource = mock(AbstractDbSource.class, withSettings().useConstructor("").defaultAnswer(CALLS_REAL_METHODS));
final JsonNode config = mock(JsonNode.class);

final List<AirbyteStateMessage> result = StateGeneratorUtils.deserializeInitialState(null, false, dbSource.getSupportedStateType(config));
final List<AirbyteStateMessage> result = StateGeneratorUtils.deserializeInitialState(null, dbSource.getSupportedStateType(config));
assertEquals(1, result.size());
assertEquals(dbSource.getSupportedStateType(config), result.get(0).getType());
}
Expand Down
Loading

0 comments on commit cc0466a

Please sign in to comment.