Skip to content

Commit

Permalink
Make sure that the buffers are push at least every 5 minutes (#30546)
Browse files Browse the repository at this point in the history
Co-authored-by: benmoriceau <[email protected]>
  • Loading branch information
benmoriceau and benmoriceau authored Sep 19, 2023
1 parent bca775d commit 12fb753
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@
import com.google.common.annotations.VisibleForTesting;
import io.airbyte.integrations.destination_async.buffers.BufferDequeue;
import io.airbyte.protocol.models.v0.StreamDescriptor;
import java.time.Clock;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -27,20 +29,32 @@ public class DetectStreamToFlush {

private static final double EAGER_FLUSH_THRESHOLD = 0.90;
private static final long QUEUE_FLUSH_THRESHOLD_BYTES = 10 * 1024 * 1024; // 10MB
private static final long MAX_TIME_BETWEEN_REC_MIN = 5L;
private static final long MAX_TIME_BETWEEN_FLUSH_MS = 5 * 60 * 1000;
private final BufferDequeue bufferDequeue;
private final RunningFlushWorkers runningFlushWorkers;
private final AtomicBoolean isClosing;
private final DestinationFlushFunction flusher;
private final Clock nowProvider;
private final ConcurrentMap<StreamDescriptor, Long> latestFlushTimeMsPerStream = new ConcurrentHashMap<>();

public DetectStreamToFlush(final BufferDequeue bufferDequeue,
final RunningFlushWorkers runningFlushWorkers,
final AtomicBoolean isClosing,
final DestinationFlushFunction flusher) {
this(bufferDequeue, runningFlushWorkers, isClosing, flusher, Clock.systemUTC());
}

@VisibleForTesting
DetectStreamToFlush(final BufferDequeue bufferDequeue,
final RunningFlushWorkers runningFlushWorkers,
final AtomicBoolean isClosing,
final DestinationFlushFunction flusher,
final Clock nowProvider) {
this.bufferDequeue = bufferDequeue;
this.runningFlushWorkers = runningFlushWorkers;
this.isClosing = isClosing;
this.flusher = flusher;
this.nowProvider = nowProvider;
}

/**
Expand Down Expand Up @@ -83,8 +97,8 @@ long computeQueueThreshold() {
* Return an empty optional if no streams are ready.
* <p>
* A stream is ready to flush if it either meets a size threshold or a time threshold. See
* {@link #isSizeTriggered(StreamDescriptor, long)} and {@link #isTimeTriggered(StreamDescriptor)}
* for details on these triggers.
* {@link #isSizeTriggered(StreamDescriptor, long)} and {@link #isTimeTriggered(long)} for details
* on these triggers.
*
* @param queueSizeThresholdBytes - the size threshold to use for determining if a stream is ready
* to flush.
Expand All @@ -93,7 +107,8 @@ long computeQueueThreshold() {
@VisibleForTesting
Optional<StreamDescriptor> getNextStreamToFlush(final long queueSizeThresholdBytes) {
for (final StreamDescriptor stream : orderStreamsByPriority(bufferDequeue.getBufferedStreams())) {
final ImmutablePair<Boolean, String> isTimeTriggeredResult = isTimeTriggered(stream);
final long latestFlushTimeMs = latestFlushTimeMsPerStream.computeIfAbsent(stream, _k -> nowProvider.millis());
final ImmutablePair<Boolean, String> isTimeTriggeredResult = isTimeTriggered(latestFlushTimeMs);
final ImmutablePair<Boolean, String> isSizeTriggeredResult = isSizeTriggered(stream, queueSizeThresholdBytes);

final String debugString = String.format(
Expand All @@ -106,7 +121,7 @@ Optional<StreamDescriptor> getNextStreamToFlush(final long queueSizeThresholdByt

if (isSizeTriggeredResult.getLeft() || isTimeTriggeredResult.getLeft()) {
log.info("flushing: {}", debugString);

latestFlushTimeMsPerStream.put(stream, nowProvider.millis());
return Optional.of(stream);
}
}
Expand All @@ -122,15 +137,13 @@ Optional<StreamDescriptor> getNextStreamToFlush(final long queueSizeThresholdByt
* This method also returns debug string with info that about the computation. We do it this way, so
* that the debug info that is printed is exactly what is used in the computation.
*
* @param stream stream
* @param latestFlushTimeMs latestFlushTimeMs
* @return is time triggered and a debug string
*/
@VisibleForTesting
ImmutablePair<Boolean, String> isTimeTriggered(final StreamDescriptor stream) {
final Boolean isTimeTriggered = bufferDequeue.getTimeOfLastRecord(stream)
.map(time -> time.isBefore(Instant.now().minus(MAX_TIME_BETWEEN_REC_MIN, ChronoUnit.MINUTES)))
.orElse(false);

ImmutablePair<Boolean, String> isTimeTriggered(final long latestFlushTimeMs) {
final long timeSinceLastFlushMs = nowProvider.millis() - latestFlushTimeMs;
final Boolean isTimeTriggered = timeSinceLastFlushMs >= MAX_TIME_BETWEEN_FLUSH_MS;
final String debugString = String.format("time trigger: %s", isTimeTriggered);

return ImmutablePair.of(isTimeTriggered, debugString);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@

import io.airbyte.integrations.destination_async.buffers.BufferDequeue;
import io.airbyte.protocol.models.v0.StreamDescriptor;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Optional;
Expand All @@ -22,7 +24,7 @@
class DetectStreamToFlushTest {

public static final Instant NOW = Instant.now();
public static final Instant FIVE_MIN_AGO = NOW.minusSeconds(60 * 5);
public static final Duration FIVE_MIN = Duration.ofMinutes(5);
private static final long SIZE_10MB = 10 * 1024 * 1024;
private static final long SIZE_200MB = 200 * 1024 * 1024;

Expand Down Expand Up @@ -75,21 +77,31 @@ void testGetNextPicksUpOnTimeTrigger() {
final BufferDequeue bufferDequeue = mock(BufferDequeue.class);
when(bufferDequeue.getBufferedStreams()).thenReturn(Set.of(DESC1));
when(bufferDequeue.getQueueSizeBytes(DESC1)).thenReturn(Optional.of(1L));
when(bufferDequeue.getTimeOfLastRecord(DESC1))
// because we eagerly load values and later access them again
// double the mocks for correctness; two calls here equals one test case.
.thenReturn(Optional.empty())
.thenReturn(Optional.empty())
.thenReturn(Optional.of(NOW))
.thenReturn(Optional.of(NOW))
.thenReturn(Optional.of(FIVE_MIN_AGO))
.thenReturn(Optional.of(FIVE_MIN_AGO));
final Clock mockedNowProvider = mock(Clock.class);

final RunningFlushWorkers runningFlushWorkers = mock(RunningFlushWorkers.class);
when(runningFlushWorkers.getSizesOfRunningWorkerBatches(any())).thenReturn(List.of(Optional.of(SIZE_10MB)));
final DetectStreamToFlush detect = new DetectStreamToFlush(bufferDequeue, runningFlushWorkers, new AtomicBoolean(false), flusher);
final DetectStreamToFlush detect =
new DetectStreamToFlush(bufferDequeue, runningFlushWorkers, new AtomicBoolean(false), flusher, mockedNowProvider);

// initialize flush time
when(mockedNowProvider.millis())
.thenReturn(NOW.toEpochMilli());

assertEquals(Optional.empty(), detect.getNextStreamToFlush(0));

// check 5 minutes later
when(mockedNowProvider.millis())
.thenReturn(NOW.plus(FIVE_MIN).toEpochMilli());

assertEquals(Optional.of(DESC1), detect.getNextStreamToFlush(0));

// just flush once
assertEquals(Optional.empty(), detect.getNextStreamToFlush(0));

// check another 5 minutes later
when(mockedNowProvider.millis())
.thenReturn(NOW.plus(FIVE_MIN).plus(FIVE_MIN).toEpochMilli());
assertEquals(Optional.of(DESC1), detect.getNextStreamToFlush(0));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,32 +9,27 @@
import static org.mockito.Mockito.when;

import io.airbyte.integrations.destination_async.buffers.BufferDequeue;
import io.airbyte.protocol.models.v0.StreamDescriptor;
import java.time.Instant;
import java.util.Optional;
import java.util.Set;
import java.time.Clock;
import org.junit.jupiter.api.Test;

public class TimeTriggerTest {

public static final Instant NOW = Instant.now();
public static final Instant FIVE_MIN_AGO = NOW.minusSeconds(60 * 5);

private static final StreamDescriptor DESC1 = new StreamDescriptor().withName("test1");
private static final long NOW_MS = System.currentTimeMillis();
private static final long ONE_SEC = 1000L;
private static final long FIVE_MIN = 5 * 60 * 1000;

@Test
void testTimeTrigger() {
final BufferDequeue bufferDequeue = mock(BufferDequeue.class);
when(bufferDequeue.getBufferedStreams()).thenReturn(Set.of(DESC1));
when(bufferDequeue.getTimeOfLastRecord(DESC1))
.thenReturn(Optional.empty())
.thenReturn(Optional.of(NOW))
.thenReturn(Optional.of(FIVE_MIN_AGO));

final DetectStreamToFlush detect = new DetectStreamToFlush(bufferDequeue, null, null, null);
assertEquals(false, detect.isTimeTriggered(DESC1).getLeft());
assertEquals(false, detect.isTimeTriggered(DESC1).getLeft());
assertEquals(true, detect.isTimeTriggered(DESC1).getLeft());

final Clock mockedNowProvider = mock(Clock.class);
when(mockedNowProvider.millis())
.thenReturn(NOW_MS);

final DetectStreamToFlush detect = new DetectStreamToFlush(bufferDequeue, null, null, null, mockedNowProvider);
assertEquals(false, detect.isTimeTriggered(NOW_MS).getLeft());
assertEquals(false, detect.isTimeTriggered(NOW_MS - ONE_SEC).getLeft());
assertEquals(true, detect.isTimeTriggered(NOW_MS - FIVE_MIN).getLeft());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,5 @@ RUN tar xf ${APPLICATION}.tar --strip-components=1

ENV ENABLE_SENTRY true

LABEL io.airbyte.version=3.1.9
LABEL io.airbyte.version=3.1.10
LABEL io.airbyte.name=airbyte/destination-snowflake
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 424892c4-daac-4491-b35d-c6688ba547ba
dockerImageTag: 3.1.9
dockerImageTag: 3.1.10
dockerRepository: airbyte/destination-snowflake
githubIssueLabel: destination-snowflake
icon: snowflake.svg
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/destinations/snowflake.md
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ Otherwise, make sure to grant the role the required permissions in the desired n

| Version | Date | Pull Request | Subject |
|:----------------|:-----------|:-----------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 3.1.10 | 2023-09-18 | [\#30546](https://github.com/airbytehq/airbyte/pull/30546) | Make sure that the async buffer are flush every 5 minutes |
| 3.1.9 | 2023-09-19 | [\#30319](https://github.com/airbytehq/airbyte/pull/30319) | Support column names that are reserved |
| 3.1.8 | 2023-09-18 | [\#30479](https://github.com/airbytehq/airbyte/pull/30479) | Fix async memory management |
| 3.1.7 | 2023-09-15 | [\#30491](https://github.com/airbytehq/airbyte/pull/30491) | Improve error message display |
Expand Down

0 comments on commit 12fb753

Please sign in to comment.