Skip to content

Commit

Permalink
Merge branch 'bmoric/big-query-standard-async' of github.com:airbyteh…
Browse files Browse the repository at this point in the history
…q/airbyte into bmoric/big-query-standard-async
  • Loading branch information
benmoriceau committed Oct 9, 2023
2 parents 2f85559 + bfe3638 commit 20775a9
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,11 @@ Optional<StreamDescriptor> getNextStreamToFlush(final long queueSizeThresholdByt
"trigger info: %s - %s, %s , %s",
stream.getNamespace(),
stream.getName(),
"nope",// isTimeTriggeredResult.getRight(),
"nope", // isTimeTriggeredResult.getRight(),
isSizeTriggeredResult.getRight());
log.debug("computed: {}", debugString);

if (isSizeTriggeredResult.getLeft()/* || isTimeTriggeredResult.getLeft()*/) {
if (isSizeTriggeredResult.getLeft()/* || isTimeTriggeredResult.getLeft() */) {
log.info("flushing: {}", debugString);
latestFlushTimeMsPerStream.put(stream, nowProvider.millis());
return Optional.of(stream);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

import io.airbyte.cdk.integrations.destination_async.buffers.BufferDequeue;
import io.airbyte.protocol.models.v0.StreamDescriptor;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
Expand Down Expand Up @@ -76,37 +75,38 @@ void testGetNextAccountsForAlreadyRunningWorkers() {
assertEquals(Optional.empty(), detect.getNextStreamToFlush(0));
}

//@Test
//void testGetNextPicksUpOnTimeTrigger() {
// final BufferDequeue bufferDequeue = mock(BufferDequeue.class);
// when(bufferDequeue.getBufferedStreams()).thenReturn(Set.of(DESC1));
// when(bufferDequeue.getQueueSizeBytes(DESC1)).thenReturn(Optional.of(1L));
// final Clock mockedNowProvider = mock(Clock.class);
//
// final RunningFlushWorkers runningFlushWorkers = mock(RunningFlushWorkers.class);
// when(runningFlushWorkers.getSizesOfRunningWorkerBatches(any())).thenReturn(List.of(Optional.of(SIZE_10MB)));
// final DetectStreamToFlush detect =
// new DetectStreamToFlush(bufferDequeue, runningFlushWorkers, new AtomicBoolean(false), flusher, mockedNowProvider);
//
// // initialize flush time
// when(mockedNowProvider.millis())
// .thenReturn(NOW.toEpochMilli());
//
// assertEquals(Optional.empty(), detect.getNextStreamToFlush(0));
//
// // check 5 minutes later
// when(mockedNowProvider.millis())
// .thenReturn(NOW.plus(FIVE_MIN).toEpochMilli());
//
// assertEquals(Optional.of(DESC1), detect.getNextStreamToFlush(0));
//
// // just flush once
// assertEquals(Optional.empty(), detect.getNextStreamToFlush(0));
//
// // check another 5 minutes later
// when(mockedNowProvider.millis())
// .thenReturn(NOW.plus(FIVE_MIN).plus(FIVE_MIN).toEpochMilli());
// assertEquals(Optional.of(DESC1), detect.getNextStreamToFlush(0));
//}
// @Test
// void testGetNextPicksUpOnTimeTrigger() {
// final BufferDequeue bufferDequeue = mock(BufferDequeue.class);
// when(bufferDequeue.getBufferedStreams()).thenReturn(Set.of(DESC1));
// when(bufferDequeue.getQueueSizeBytes(DESC1)).thenReturn(Optional.of(1L));
// final Clock mockedNowProvider = mock(Clock.class);
//
// final RunningFlushWorkers runningFlushWorkers = mock(RunningFlushWorkers.class);
// when(runningFlushWorkers.getSizesOfRunningWorkerBatches(any())).thenReturn(List.of(Optional.of(SIZE_10MB)));
// final DetectStreamToFlush detect =
// new DetectStreamToFlush(bufferDequeue, runningFlushWorkers, new AtomicBoolean(false), flusher,
// mockedNowProvider);
//
// // initialize flush time
// when(mockedNowProvider.millis())
// .thenReturn(NOW.toEpochMilli());
//
// assertEquals(Optional.empty(), detect.getNextStreamToFlush(0));
//
// // check 5 minutes later
// when(mockedNowProvider.millis())
// .thenReturn(NOW.plus(FIVE_MIN).toEpochMilli());
//
// assertEquals(Optional.of(DESC1), detect.getNextStreamToFlush(0));
//
// // just flush once
// assertEquals(Optional.empty(), detect.getNextStreamToFlush(0));
//
// // check another 5 minutes later
// when(mockedNowProvider.millis())
// .thenReturn(NOW.plus(FIVE_MIN).plus(FIVE_MIN).toEpochMilli());
// assertEquals(Optional.of(DESC1), detect.getNextStreamToFlush(0));
// }

}

0 comments on commit 20775a9

Please sign in to comment.