From 26acc4d32661ed53e82fd7cd8c4a7cbe3077dd61 Mon Sep 17 00:00:00 2001 From: Gireesh Sreepathi Date: Fri, 3 May 2024 12:57:20 -0700 Subject: [PATCH] multi-thread init and finalize --- .../sync/DatabricksSyncOperations.kt | 62 +++++++++++++++---- 1 file changed, 51 insertions(+), 11 deletions(-) diff --git a/airbyte-integrations/connectors/destination-databricks/src/main/kotlin/io/airbyte/integrations/destination/databricks/sync/DatabricksSyncOperations.kt b/airbyte-integrations/connectors/destination-databricks/src/main/kotlin/io/airbyte/integrations/destination/databricks/sync/DatabricksSyncOperations.kt index 3c421a840243..a00043990c23 100644 --- a/airbyte-integrations/connectors/destination-databricks/src/main/kotlin/io/airbyte/integrations/destination/databricks/sync/DatabricksSyncOperations.kt +++ b/airbyte-integrations/connectors/destination-databricks/src/main/kotlin/io/airbyte/integrations/destination/databricks/sync/DatabricksSyncOperations.kt @@ -6,6 +6,8 @@ package io.airbyte.integrations.destination.databricks.sync import io.airbyte.cdk.integrations.destination.StreamSyncSummary import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage +import io.airbyte.cdk.integrations.util.ConnectorExceptionUtil as exceptions +import io.airbyte.commons.concurrency.CompletableFutures import io.airbyte.integrations.base.destination.typing_deduping.ParsedCatalog import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState import io.airbyte.integrations.destination.databricks.jdbc.DatabricksDestinationHandler @@ -13,7 +15,11 @@ import io.airbyte.integrations.destination.sync.StreamOperations import io.airbyte.integrations.destination.sync.SyncOperations import io.airbyte.protocol.models.v0.StreamDescriptor import io.github.oshai.kotlinlogging.KotlinLogging +import java.util.concurrent.CompletableFuture +import java.util.concurrent.ExecutorService +import java.util.concurrent.Executors import java.util.stream.Stream +import org.apache.commons.lang3.concurrent.BasicThreadFactory private val log = KotlinLogging.logger {} @@ -21,7 +27,12 @@ class DatabricksSyncOperations( private val parsedCatalog: ParsedCatalog, private val destinationHandler: DatabricksDestinationHandler, private val streamOperations: StreamOperations, - private val defaultNamespace: String + private val defaultNamespace: String, + private val executorService: ExecutorService = + Executors.newFixedThreadPool( + 10, + BasicThreadFactory.Builder().namingPattern("sync-operations").build(), + ) ) : SyncOperations { override fun initializeStreams() { log.info { "Preparing required schemas and tables for all streams" } @@ -37,7 +48,21 @@ class DatabricksSyncOperations( // we will commit destinationStates and run Migrations here. For Dbricks we don't need // either - streamsInitialStates.forEach { streamOperations.initialize(it) } + val initializationFutures = + streamsInitialStates + .map { + CompletableFuture.supplyAsync( + { streamOperations.initialize(it) }, + executorService, + ) + } + .toList() + val futuresResult = + CompletableFutures.allOf(initializationFutures).toCompletableFuture().join() + exceptions.getResultsOrLogAndThrowFirst( + "Following exceptions occurred during sync initialization", + futuresResult, + ) } override fun flushStreams() { @@ -53,14 +78,29 @@ class DatabricksSyncOperations( override fun finalizeStreams(streamSyncSummaries: Map) { // Only call finalizeTable operations which has summary. rest will be skipped - streamSyncSummaries.entries.forEach { - streamOperations.finalizeTable( - parsedCatalog.getStream( - it.key.namespace ?: defaultNamespace, - it.key.name, - ), - it.value, - ) - } + val finalizeFutures = + streamSyncSummaries.entries + .map { + CompletableFuture.supplyAsync( + { + streamOperations.finalizeTable( + parsedCatalog.getStream( + it.key.namespace ?: defaultNamespace, + it.key.name, + ), + it.value, + ) + }, + executorService, + ) + } + .toList() + val futuresResult = CompletableFutures.allOf(finalizeFutures).toCompletableFuture().join() + exceptions.getResultsOrLogAndThrowFirst( + "Following exceptions occurred while finalizing the sync", + futuresResult, + ) + log.info { "Cleaning up sync operation thread pools" } + executorService.shutdown() } }