Skip to content

Commit

Permalink
remove DestinationProcess
Browse files Browse the repository at this point in the history
  • Loading branch information
edgao committed Sep 19, 2023
1 parent b28fe36 commit b12507c
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.lang.Exceptions;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.configoss.WorkerDestinationConfig;
import io.airbyte.protocol.models.v0.AirbyteMessage;
Expand Down Expand Up @@ -509,6 +510,9 @@ public void incrementalDedupIdenticalName() throws Exception {
* Run two syncs at the same time. They each have one stream, which has the same name for both syncs
* but different namespace. This should work fine. This test is similar to {@link #incrementalDedupIdenticalName()},
* but uses two separate syncs instead of one sync with two streams.
* <p>
* Note that destination stdout is a bit misleading: The two syncs' stdout _should_ be interleaved, but we're just
* dumping the entire sync1 stdout, and then the entire sync2 stdout.
*/
@Test
public void identicalNameSimultaneousSync() throws Exception {
Expand Down Expand Up @@ -540,19 +544,21 @@ public void identicalNameSimultaneousSync() throws Exception {
final List<AirbyteMessage> messages2 = readMessages("dat/sync1_messages2.jsonl", namespace2, streamName);

// Start two concurrent syncs
final DestinationProcess sync1 = startSync(catalog1);
final DestinationProcess sync2 = startSync(catalog2);
final AirbyteDestination sync1 = startSync(catalog1);
final AirbyteDestination sync2 = startSync(catalog2);
// Write some messages to both syncs. Write a lot of data to sync 2 to try and force a flush.
messages1.forEach(sync1::accept);
pushMessages(messages1, sync1);
for (int i = 0; i < 100_000; i++) {
messages2.forEach(sync2::accept);
pushMessages(messages2, sync2);
}
sync1.close();
// This will dump sync1's entire stdout to our stdout
endSync(sync1);
// Write some more messages to the second sync. It should not be affected by the first sync's shutdown.
for (int i = 0; i < 100_000; i++) {
messages2.forEach(sync2::accept);
pushMessages(messages2, sync2);
}
sync2.close();
// And this will dump sync2's entire stdout to our stdout
endSync(sync2);

verifySyncResult(
readRecords("dat/sync1_expectedrecords_dedup_raw.jsonl"),
Expand Down Expand Up @@ -693,16 +699,16 @@ protected void runSync(final ConfiguredAirbyteCatalog catalog, final List<Airbyt
}

protected void runSync(final ConfiguredAirbyteCatalog catalog, final List<AirbyteMessage> messages, final String imageName) throws Exception {
try (final DestinationProcess process = startSync(catalog, imageName)) {
messages.forEach(process::accept);
}
final AirbyteDestination destination = startSync(catalog, imageName);
pushMessages(messages, destination);
endSync(destination);
}

protected DestinationProcess startSync(final ConfiguredAirbyteCatalog catalog) throws Exception {
protected AirbyteDestination startSync(final ConfiguredAirbyteCatalog catalog) throws Exception {
return startSync(catalog, getImageName());
}

protected DestinationProcess startSync(final ConfiguredAirbyteCatalog catalog, final String imageName) throws Exception {
protected AirbyteDestination startSync(final ConfiguredAirbyteCatalog catalog, final String imageName) throws Exception {
synchronized (this) {
catalog.getStreams().forEach(s -> streamsToTearDown.add(AirbyteStreamNameNamespacePair.fromAirbyteStream(s.getStream())));
}
Expand Down Expand Up @@ -736,10 +742,24 @@ protected DestinationProcess startSync(final ConfiguredAirbyteCatalog catalog, f

destination.start(destinationConfig, jobRoot, Collections.emptyMap());

return new DestinationProcess(destination);
return destination;
}

private static void pushMessages(final List<AirbyteMessage> messages, final AirbyteDestination destination) {
messages.forEach(message -> Exceptions.toRuntime(() -> destination.accept(convertProtocolObject(message, io.airbyte.protocol.models.AirbyteMessage.class))));
}

// TODO Eventually we'll want to somehow extract the state messages while a sync is running, to verify checkpointing.
// That's going to require some nontrivial changes to how attemptRead() works.
private static void endSync(final AirbyteDestination destination) throws Exception {
destination.notifyEndOfInput();
while (!destination.isFinished()) {
destination.attemptRead();
}
destination.close();
}

static <V0, V1> V0 convertProtocolObject(final V1 v1, final Class<V0> klass) {
private static <V0, V1> V0 convertProtocolObject(final V1 v1, final Class<V0> klass) {
return Jsons.object(Jsons.jsonNode(v1), klass);
}

Expand Down

This file was deleted.

0 comments on commit b12507c

Please sign in to comment.