Skip to content

Commit

Permalink
throw exception on bad generation id
Browse files Browse the repository at this point in the history
  • Loading branch information
edgao committed Jun 7, 2024
1 parent eed66f8 commit ea03c9b
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -162,13 +162,16 @@ constructor(

bufferManager.close()

val unsuccessfulStreams = ArrayList<StreamDescriptor>()
val streamSyncSummaries =
streamNames.associate { streamDescriptor ->
// If we didn't receive a stream status message, assume success.
// Platform won't send us any stream status messages yet (since we're not declaring
// supportsRefresh in metadata), so we will always hit this case.
// If we didn't receive a stream status message, assume failure.
// This is possible if e.g. the orchestrator crashes before sending us the message.
val terminalStatusFromSource =
terminalStatusesFromSource[streamDescriptor] ?: AirbyteStreamStatus.COMPLETE
terminalStatusesFromSource[streamDescriptor] ?: AirbyteStreamStatus.INCOMPLETE
if (terminalStatusFromSource == AirbyteStreamStatus.INCOMPLETE) {
unsuccessfulStreams.add(streamDescriptor)
}
StreamDescriptorUtils.withDefaultNamespace(
streamDescriptor,
bufferManager.defaultNamespace,
Expand All @@ -183,6 +186,15 @@ constructor(
// as this throws an exception, we need to be after all other close functions.
propagateFlushWorkerExceptionIfPresent()
logger.info { "${AsyncStreamConsumer::class.java} closed" }

// In principle, platform should detect this.
// However, as a backstop, the destination should still do this check.
// This handles e.g. platform bugs where we don't receive a stream status message.
// In this case, it would be misleading to mark the sync as successful, because e.g. we
// maybe didn't commit a truncate.
if (unsuccessfulStreams.isNotEmpty()) {
throw RuntimeException("Some streams were unsuccessful due to a source error: $unsuccessfulStreams")
}
}

private fun getRecordCounter(streamDescriptor: StreamDescriptor): AtomicLong {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,27 @@ class AsyncStreamConsumerTest {
),
),
)
private val STREAM2_SUCCESS_MESSAGE =
Jsons.serialize(
AirbyteMessage()
.withType(AirbyteMessage.Type.TRACE)
.withTrace(
AirbyteTraceMessage()
.withType(AirbyteTraceMessage.Type.STREAM_STATUS)
.withStreamStatus(
AirbyteStreamStatusTraceMessage()
.withStreamDescriptor(
StreamDescriptor()
.withName(STREAM_NAME2)
.withNamespace(SCHEMA_NAME),
)
.withStatus(
AirbyteStreamStatusTraceMessage.AirbyteStreamStatus
.COMPLETE
),
),
),
)
private val STREAM2_FAILURE_MESSAGE =
Jsons.serialize(
AirbyteMessage()
Expand Down Expand Up @@ -262,6 +283,9 @@ class AsyncStreamConsumerTest {
consumer.start()
consumeRecords(consumer, expectedRecords)
consumer.accept(Jsons.serialize(STATE_MESSAGE1), RECORD_SIZE_20_BYTES)
consumer.accept(STREAM1_SUCCESS_MESSAGE, STREAM1_SUCCESS_MESSAGE.length)
consumer.accept(STREAM2_SUCCESS_MESSAGE, STREAM2_SUCCESS_MESSAGE.length)
consumer.accept(STREAM3_SUCCESS_MESSAGE, STREAM3_SUCCESS_MESSAGE.length)
consumer.close()

verifyStartAndClose()
Expand Down Expand Up @@ -298,6 +322,9 @@ class AsyncStreamConsumerTest {
consumeRecords(consumer, expectedRecords)
consumer.accept(Jsons.serialize(STATE_MESSAGE1), RECORD_SIZE_20_BYTES)
consumer.accept(Jsons.serialize(STATE_MESSAGE2), RECORD_SIZE_20_BYTES)
consumer.accept(STREAM1_SUCCESS_MESSAGE, STREAM1_SUCCESS_MESSAGE.length)
consumer.accept(STREAM2_SUCCESS_MESSAGE, STREAM2_SUCCESS_MESSAGE.length)
consumer.accept(STREAM3_SUCCESS_MESSAGE, STREAM3_SUCCESS_MESSAGE.length)
consumer.close()

verifyStartAndClose()
Expand Down Expand Up @@ -334,6 +361,9 @@ class AsyncStreamConsumerTest {

consumer.start()
consumeRecords(consumer, allRecords)
consumer.accept(STREAM1_SUCCESS_MESSAGE, STREAM1_SUCCESS_MESSAGE.length)
consumer.accept(STREAM2_SUCCESS_MESSAGE, STREAM2_SUCCESS_MESSAGE.length)
consumer.accept(STREAM3_SUCCESS_MESSAGE, STREAM3_SUCCESS_MESSAGE.length)
consumer.close()

verifyStartAndClose()
Expand Down Expand Up @@ -496,7 +526,8 @@ class AsyncStreamConsumerTest {
consumer.accept(STREAM1_SUCCESS_MESSAGE, STREAM1_SUCCESS_MESSAGE.length)
consumer.accept(STREAM2_FAILURE_MESSAGE, STREAM2_FAILURE_MESSAGE.length)
consumer.accept(STREAM3_SUCCESS_MESSAGE, STREAM3_SUCCESS_MESSAGE.length)
consumer.close()
// We had a failure, so expect an exception
assertThrows(RuntimeException::class.java) { consumer.close() }

val captor: ArgumentCaptor<Map<StreamDescriptor, StreamSyncSummary>> =
ArgumentCaptor.captor()
Expand Down Expand Up @@ -532,29 +563,29 @@ class AsyncStreamConsumerTest {
consumer.start()
consumeRecords(consumer, expectedRecords)
// Note: no stream status messages
consumer.close()
// We assume stream failure, so expect an exception
assertThrows(RuntimeException::class.java) { consumer.close() }

val captor: ArgumentCaptor<Map<StreamDescriptor, StreamSyncSummary>> =
ArgumentCaptor.captor()
Mockito.verify(onClose).accept(any(), capture(captor))
assertEquals(
// All streams have a COMPLETE status.
// TODO: change this to INCOMPLETE after we switch the default behavior.
// All streams have an INCOMPLETE status.
mapOf(
StreamDescriptor().withNamespace(SCHEMA_NAME).withName(STREAM_NAME) to
StreamSyncSummary(
expectedRecords.size.toLong(),
AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.COMPLETE,
AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.INCOMPLETE,
),
StreamDescriptor().withNamespace(SCHEMA_NAME).withName(STREAM_NAME2) to
StreamSyncSummary(
0,
AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.COMPLETE,
AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.INCOMPLETE,
),
StreamDescriptor().withNamespace(DEFAULT_NAMESPACE).withName(STREAM_NAME3) to
StreamSyncSummary(
0,
AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.COMPLETE,
AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.INCOMPLETE,
),
),
captor.value,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ package io.airbyte.integrations.base.destination.typing_deduping
import com.google.common.annotations.VisibleForTesting
import io.airbyte.cdk.integrations.base.AirbyteExceptionHandler.Companion.addStringForDeinterpolation
import io.airbyte.cdk.integrations.base.JavaBaseConstants
import io.airbyte.commons.exceptions.ConfigErrorException
import io.airbyte.commons.json.Jsons
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream
import io.airbyte.protocol.models.v0.DestinationSyncMode
import io.github.oshai.kotlinlogging.KotlinLogging
import java.util.Optional
import java.util.function.Consumer
Expand All @@ -32,6 +34,12 @@ constructor(
if (it.stream.namespace.isNullOrEmpty()) {
it.stream.namespace = defaultNamespace
}
// The refreshes project is the beginning of the end for OVERWRITE syncs.
// The sync mode still exists, but we are fully dependent on min_generation to trigger
// overwrite logic.
if (it.destinationSyncMode == DestinationSyncMode.OVERWRITE) {
it.destinationSyncMode = DestinationSyncMode.APPEND
}
}

// this code is bad and I feel bad
Expand Down Expand Up @@ -122,9 +130,10 @@ constructor(
@VisibleForTesting
fun toStreamConfig(stream: ConfiguredAirbyteStream): StreamConfig {
if (stream.generationId == null) {
stream.generationId = 0
stream.minimumGenerationId = 0
stream.syncId = 0
// TODO set platform version
throw ConfigErrorException(
"You must upgrade your platform version to use this connector version. Either downgrade your connector or upgrade platform to X.Y.Z"
)
}
if (
stream.minimumGenerationId != 0.toLong() &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ airbyteJavaConnector {
'gcs-destinations',
'core',
]
useLocalCdk = true
useLocalCdk = false
}

java {
Expand Down

0 comments on commit ea03c9b

Please sign in to comment.