Skip to content

Commit

Permalink
multi-thread init and finalize
Browse files Browse the repository at this point in the history
  • Loading branch information
gisripa committed May 3, 2024
1 parent 424235b commit 26acc4d
Showing 1 changed file with 51 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,33 @@ package io.airbyte.integrations.destination.databricks.sync

import io.airbyte.cdk.integrations.destination.StreamSyncSummary
import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage
import io.airbyte.cdk.integrations.util.ConnectorExceptionUtil as exceptions
import io.airbyte.commons.concurrency.CompletableFutures
import io.airbyte.integrations.base.destination.typing_deduping.ParsedCatalog
import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState
import io.airbyte.integrations.destination.databricks.jdbc.DatabricksDestinationHandler
import io.airbyte.integrations.destination.sync.StreamOperations
import io.airbyte.integrations.destination.sync.SyncOperations
import io.airbyte.protocol.models.v0.StreamDescriptor
import io.github.oshai.kotlinlogging.KotlinLogging
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.stream.Stream
import org.apache.commons.lang3.concurrent.BasicThreadFactory

private val log = KotlinLogging.logger {}

class DatabricksSyncOperations(
private val parsedCatalog: ParsedCatalog,
private val destinationHandler: DatabricksDestinationHandler,
private val streamOperations: StreamOperations<MinimumDestinationState.Impl>,
private val defaultNamespace: String
private val defaultNamespace: String,
private val executorService: ExecutorService =
Executors.newFixedThreadPool(
10,
BasicThreadFactory.Builder().namingPattern("sync-operations").build(),
)
) : SyncOperations {
override fun initializeStreams() {
log.info { "Preparing required schemas and tables for all streams" }
Expand All @@ -37,7 +48,21 @@ class DatabricksSyncOperations(

// we will commit destinationStates and run Migrations here. For Dbricks we don't need
// either
streamsInitialStates.forEach { streamOperations.initialize(it) }
val initializationFutures =
streamsInitialStates
.map {
CompletableFuture.supplyAsync(
{ streamOperations.initialize(it) },
executorService,
)
}
.toList()
val futuresResult =
CompletableFutures.allOf(initializationFutures).toCompletableFuture().join()
exceptions.getResultsOrLogAndThrowFirst(
"Following exceptions occurred during sync initialization",
futuresResult,
)
}

override fun flushStreams() {
Expand All @@ -53,14 +78,29 @@ class DatabricksSyncOperations(

override fun finalizeStreams(streamSyncSummaries: Map<StreamDescriptor, StreamSyncSummary>) {
// Only call finalizeTable operations which has summary. rest will be skipped
streamSyncSummaries.entries.forEach {
streamOperations.finalizeTable(
parsedCatalog.getStream(
it.key.namespace ?: defaultNamespace,
it.key.name,
),
it.value,
)
}
val finalizeFutures =
streamSyncSummaries.entries
.map {
CompletableFuture.supplyAsync(
{
streamOperations.finalizeTable(
parsedCatalog.getStream(
it.key.namespace ?: defaultNamespace,
it.key.name,
),
it.value,
)
},
executorService,
)
}
.toList()
val futuresResult = CompletableFutures.allOf(finalizeFutures).toCompletableFuture().join()
exceptions.getResultsOrLogAndThrowFirst(
"Following exceptions occurred while finalizing the sync",
futuresResult,
)
log.info { "Cleaning up sync operation thread pools" }
executorService.shutdown()
}
}

0 comments on commit 26acc4d

Please sign in to comment.