Skip to content

Commit

Permalink
e2e test source should emit PER-STREAM state as well
Browse files Browse the repository at this point in the history
  • Loading branch information
subodh1810 committed Dec 12, 2023
1 parent e25806f commit 909f2fc
Showing 1 changed file with 8 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
import io.airbyte.protocol.models.v0.AirbyteMessage.Type;
import io.airbyte.protocol.models.v0.AirbyteRecordMessage;
import io.airbyte.protocol.models.v0.AirbyteStateMessage;
import io.airbyte.protocol.models.v0.AirbyteStreamState;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.v0.StreamDescriptor;
import io.airbyte.protocol.models.v0.SyncMode;
import java.time.Instant;
import java.util.List;
Expand Down Expand Up @@ -78,7 +80,12 @@ protected AirbyteMessage computeNext() {
hasEmittedStateAtCount.set(true);
return new AirbyteMessage()
.withType(Type.STATE)
.withState(new AirbyteStateMessage().withData(Jsons.jsonNode(ImmutableMap.of(LegacyConstants.DEFAULT_COLUMN, recordValue.get()))));
.withState(new AirbyteStateMessage()
.withType(AirbyteStateMessage.AirbyteStateType.STREAM)
.withStream(new AirbyteStreamState()
.withStreamDescriptor(new StreamDescriptor().withName(LegacyConstants.DEFAULT_STREAM))
.withStreamState(Jsons.jsonNode(ImmutableMap.of(LegacyConstants.DEFAULT_COLUMN, recordValue.get()))))
.withData(Jsons.jsonNode(ImmutableMap.of(LegacyConstants.DEFAULT_COLUMN, recordValue.get()))));
} else if (throwAfterNRecords > recordsEmitted.get()) {
recordsEmitted.incrementAndGet();
recordValue.incrementAndGet();
Expand Down

0 comments on commit 909f2fc

Please sign in to comment.