diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/IntegrationRunner.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/IntegrationRunner.kt index 3287df5f0f93..df4420142341 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/IntegrationRunner.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/IntegrationRunner.kt @@ -422,7 +422,7 @@ internal constructor( ) { val currentThread = Thread.currentThread() - val runningThreads = ThreadUtils.getAllThreads().filter(::filterOrphanedThread).toList() + val runningThreads = ThreadUtils.getAllThreads().filter(::filterOrphanedThread) if (runningThreads.isNotEmpty()) { LOGGER.warn( """ diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/StandardNameTransformer.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/StandardNameTransformer.kt index 106160b776fb..131a0bf26bde 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/StandardNameTransformer.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/StandardNameTransformer.kt @@ -73,9 +73,7 @@ open class StandardNameTransformer : NamingConventionTransformer { return Jsons.jsonNode>(properties) } else if (root.isArray) { return Jsons.jsonNode( - MoreIterators.toList(root.elements()) - .map { r: JsonNode -> formatJsonPath(r) } - .toList() + MoreIterators.toList(root.elements()).map { r: JsonNode -> formatJsonPath(r) } ) } else { return root diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/DetectStreamToFlush.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/DetectStreamToFlush.kt index 92e43b748b6d..db61cdaebf45 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/DetectStreamToFlush.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/DetectStreamToFlush.kt @@ -241,20 +241,18 @@ internal constructor( streamDescriptor, ) } - return streams - .sortedWith( - Comparator.comparing( - { s: StreamDescriptor -> sdToQueueSize[s]!!.orElseThrow() }, - Comparator.reverseOrder(), - ) // if no time is present, it suggests the queue has no records. set MAX time - // as a sentinel value to - // represent no records. - .thenComparing { s: StreamDescriptor -> - sdToTimeOfLastRecord[s]!!.orElse(Instant.MAX) - } - .thenComparing { s: StreamDescriptor -> s.namespace + s.name }, - ) - .toList() + return streams.sortedWith( + Comparator.comparing( + { s: StreamDescriptor -> sdToQueueSize[s]!!.orElseThrow() }, + Comparator.reverseOrder(), + ) // if no time is present, it suggests the queue has no records. set MAX time + // as a sentinel value to + // represent no records. + .thenComparing { s: StreamDescriptor -> + sdToTimeOfLastRecord[s]!!.orElse(Instant.MAX) + } + .thenComparing { s: StreamDescriptor -> s.namespace + s.name }, + ) } companion object { diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/state/GlobalAsyncStateManager.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/state/GlobalAsyncStateManager.kt index 2db1c727fd07..d2e56c746977 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/state/GlobalAsyncStateManager.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/state/GlobalAsyncStateManager.kt @@ -281,9 +281,7 @@ class GlobalAsyncStateManager(private val memoryManager: GlobalMemoryManager) { // into the non-STREAM world for correctness. synchronized(lock) { aliasIds.addAll( - descToStateIdQ.values - .flatMap { obj: LinkedBlockingDeque -> obj } - .toList(), + descToStateIdQ.values.flatMap { obj: LinkedBlockingDeque -> obj }, ) descToStateIdQ.clear() retroactiveGlobalStateId = StateIdProvider.nextId diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/normalization/SentryExceptionHelper.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/normalization/SentryExceptionHelper.kt index 74548816238b..c43b0589060f 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/normalization/SentryExceptionHelper.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/normalization/SentryExceptionHelper.kt @@ -122,11 +122,7 @@ object SentryExceptionHelper { errorMessageAndType[ErrorMapKeys.ERROR_MAP_MESSAGE_KEY] = String.format( "%s", - stacktraceLines[ - Arrays.stream(stacktraceLines) - .toList() - .indexOf(followingLine) + 1 - ] + stacktraceLines[stacktraceLines.indexOf(followingLine) + 1] .trim { it <= ' ' } ) errorMessageAndType[ErrorMapKeys.ERROR_MAP_TYPE_KEY] = diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/record_buffer/InMemoryRecordBufferingStrategy.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/record_buffer/InMemoryRecordBufferingStrategy.kt index 70dfaffbf1e9..92ed3b5fb71e 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/record_buffer/InMemoryRecordBufferingStrategy.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/record_buffer/InMemoryRecordBufferingStrategy.kt @@ -71,7 +71,7 @@ class InMemoryRecordBufferingStrategy( stream.name, streamBuffer[stream]!!.size ) - recordWriter.accept(stream, streamBuffer[stream]!!.toList()) + recordWriter.accept(stream, streamBuffer[stream]!!) LOGGER.info("Flushing completed for {}", stream.name) } diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/util/ConnectorExceptionUtil.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/util/ConnectorExceptionUtil.kt index 755bf002edf9..81b324953c3d 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/util/ConnectorExceptionUtil.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/util/ConnectorExceptionUtil.kt @@ -93,12 +93,12 @@ object ConnectorExceptionUtil { initialMessage: String, eithers: List> ): List { - val throwables: List = eithers.filter { it.isLeft() }.map { it.left!! }.toList() + val throwables: List = eithers.filter { it.isLeft() }.map { it.left!! } if (throwables.isNotEmpty()) { logAllAndThrowFirst(initialMessage, throwables) } // No need to filter on isRight since isLeft will throw before reaching this line. - return eithers.map { obj: Either -> obj.right!! }.toList() + return eithers.map { obj: Either -> obj.right!! } } private fun isConfigErrorException(e: Throwable?): Boolean { diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/util/concurrent/ConcurrentStreamConsumer.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/util/concurrent/ConcurrentStreamConsumer.kt index 22b826e747db..6c392c3cd132 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/util/concurrent/ConcurrentStreamConsumer.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/util/concurrent/ConcurrentStreamConsumer.kt @@ -83,7 +83,6 @@ class ConcurrentStreamConsumer( .map { runnable: ConcurrentStreamRunnable -> CompletableFuture.runAsync(runnable, executorService) } - .toList() /* * Wait for the submitted streams to complete before returning. This uses the join() method to allow diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/db/jdbc/TestDefaultJdbcDatabase.kt b/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/db/jdbc/TestDefaultJdbcDatabase.kt index ffc54ce72d0f..e61a22ac963a 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/db/jdbc/TestDefaultJdbcDatabase.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/db/jdbc/TestDefaultJdbcDatabase.kt @@ -82,7 +82,7 @@ internal class TestDefaultJdbcDatabase { }, { queryContext: ResultSet -> sourceOperations.rowToJson(queryContext) } ) - .use { actual -> Assertions.assertEquals(RECORDS_AS_JSON, actual.toList()) } + .use { actual -> Assertions.assertEquals(RECORDS_AS_JSON, actual) } } @Test diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/db/jdbc/TestJdbcUtils.kt b/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/db/jdbc/TestJdbcUtils.kt index 7f57d453d328..44de7316016b 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/db/jdbc/TestJdbcUtils.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/db/jdbc/TestJdbcUtils.kt @@ -119,9 +119,9 @@ internal class TestJdbcUtils { val rs = connection.createStatement().executeQuery("SELECT * FROM id_and_name;") val actual = JdbcDatabase.toUnsafeStream(rs) { queryContext: ResultSet -> - sourceOperations.rowToJson(queryContext) - } - .toList() + sourceOperations.rowToJson(queryContext) + } + Assertions.assertEquals(RECORDS_AS_JSON, actual) } } diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/db/jdbc/TestStreamingJdbcDatabase.kt b/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/db/jdbc/TestStreamingJdbcDatabase.kt index 3693a8aef75e..f9313761e323 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/db/jdbc/TestStreamingJdbcDatabase.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/db/jdbc/TestStreamingJdbcDatabase.kt @@ -144,7 +144,7 @@ internal class TestStreamingJdbcDatabase { // This check assumes that FetchSizeConstants.TARGET_BUFFER_BYTE_SIZE = 200 MB. // Update this check if the buffer size constant is changed. Assertions.assertEquals(2, fetchSizes.size) - val sortedSizes = fetchSizes.sorted().toList() + val sortedSizes = fetchSizes.sorted() Assertions.assertTrue(sortedSizes[0] < FetchSizeConstants.INITIAL_SAMPLE_SIZE) Assertions.assertEquals(FetchSizeConstants.INITIAL_SAMPLE_SIZE, sortedSizes[1]) } diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/base/normalization/NormalizationLogParserTest.kt b/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/base/normalization/NormalizationLogParserTest.kt index 59df191d10f3..bbf5a7079239 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/base/normalization/NormalizationLogParserTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/base/normalization/NormalizationLogParserTest.kt @@ -110,16 +110,14 @@ internal class NormalizationLogParserTest { expectedDbtErrors: List ) { val messages = - parser!! - .create( - BufferedReader( - InputStreamReader( - ByteArrayInputStream(rawLogs.toByteArray(StandardCharsets.UTF_8)), - StandardCharsets.UTF_8 - ) + parser!!.create( + BufferedReader( + InputStreamReader( + ByteArrayInputStream(rawLogs.toByteArray(StandardCharsets.UTF_8)), + StandardCharsets.UTF_8 ) ) - .toList() + ) Assertions.assertEquals(expectedMessages, messages) Assertions.assertEquals(expectedDbtErrors, parser!!.dbtErrors) diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/destination/async/AsyncStreamConsumerTest.kt b/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/destination/async/AsyncStreamConsumerTest.kt index 7b49b5d410ac..d3cf80c1a507 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/destination/async/AsyncStreamConsumerTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/destination/async/AsyncStreamConsumerTest.kt @@ -553,7 +553,6 @@ class AsyncStreamConsumerTest { .stream() // flatten those results into a single list for the simplicity of // comparison .flatMap { s: Stream<*> -> s } - .toList() val expRecords = allRecords.map { m: AirbyteMessage -> diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/destination/async/state/GlobalAsyncStateManagerTest.kt b/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/destination/async/state/GlobalAsyncStateManagerTest.kt index 8a15090c8485..573db49cb6a6 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/destination/async/state/GlobalAsyncStateManagerTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/destination/async/state/GlobalAsyncStateManagerTest.kt @@ -234,9 +234,9 @@ class GlobalAsyncStateManagerTest { expectedDestinationStats, ), ), - stateWithStats2.keys.toList(), + stateWithStats2.keys, ) - assertEquals(listOf(expectedDestinationStats), stateWithStats2.values.toList()) + assertEquals(listOf(expectedDestinationStats), stateWithStats2.values) } private fun attachDestinationStateStats( @@ -276,9 +276,9 @@ class GlobalAsyncStateManagerTest { expectedDestinationStats, ), ), - stateWithStats.keys.toList(), + stateWithStats.keys, ) - assertEquals(listOf(expectedDestinationStats), stateWithStats.values.toList()) + assertEquals(listOf(expectedDestinationStats), stateWithStats.values) assertThrows( IllegalArgumentException::class.java, @@ -338,9 +338,9 @@ class GlobalAsyncStateManagerTest { expectedDestinationStats, ), ), - stateWithStats.keys.toList(), + stateWithStats.keys, ) - assertEquals(listOf(expectedDestinationStats), stateWithStats.values.toList()) + assertEquals(listOf(expectedDestinationStats), stateWithStats.values) } @Test @@ -370,9 +370,9 @@ class GlobalAsyncStateManagerTest { expectedDestinationStats, ), ), - stateWithStats.keys.toList(), + stateWithStats.keys, ) - assertEquals(listOf(expectedDestinationStats), stateWithStats.values.toList()) + assertEquals(listOf(expectedDestinationStats), stateWithStats.values) emittedStatesFromDestination.clear() @@ -396,9 +396,9 @@ class GlobalAsyncStateManagerTest { expectedDestinationStats, ), ), - stateWithStats2.keys.toList(), + stateWithStats2.keys, ) - assertEquals(listOf(expectedDestinationStats), stateWithStats2.values.toList()) + assertEquals(listOf(expectedDestinationStats), stateWithStats2.values) } @Test @@ -428,9 +428,9 @@ class GlobalAsyncStateManagerTest { expectedDestinationStats, ), ), - stateWithStats.keys.toList(), + stateWithStats.keys, ) - assertEquals(listOf(expectedDestinationStats), stateWithStats.values.toList()) + assertEquals(listOf(expectedDestinationStats), stateWithStats.values) emittedStatesFromDestination.clear() stateManager.trackState(GLOBAL_STATE_MESSAGE2, STATE_MSG_SIZE, DEFAULT_NAMESPACE) @@ -452,11 +452,11 @@ class GlobalAsyncStateManagerTest { expectedDestinationStats2, ), ), - stateWithStats2.keys.toList(), + stateWithStats2.keys, ) assertEquals( listOf(expectedDestinationStats2), - stateWithStats2.values.toList(), + stateWithStats2.values, ) emittedStatesFromDestination.clear() @@ -480,9 +480,9 @@ class GlobalAsyncStateManagerTest { expectedDestinationStats, ), ), - stateWithStats3.keys.toList(), + stateWithStats3.keys, ) - assertEquals(listOf(expectedDestinationStats), stateWithStats3.values.toList()) + assertEquals(listOf(expectedDestinationStats), stateWithStats3.values) } @Test @@ -515,9 +515,9 @@ class GlobalAsyncStateManagerTest { expectedDestinationStats, ), ), - stateWithStats.keys.toList(), + stateWithStats.keys, ) - assertEquals(listOf(expectedDestinationStats), stateWithStats.values.toList()) + assertEquals(listOf(expectedDestinationStats), stateWithStats.values) emittedStatesFromDestination.clear() val afterConvertId0: Long = simulateIncomingRecords(STREAM1_DESC, 10, stateManager) @@ -542,9 +542,9 @@ class GlobalAsyncStateManagerTest { expectedDestinationStats, ), ), - stateWithStats2.keys.toList(), + stateWithStats2.keys, ) - assertEquals(listOf(expectedDestinationStats), stateWithStats2.values.toList()) + assertEquals(listOf(expectedDestinationStats), stateWithStats2.values) } } @@ -576,9 +576,9 @@ class GlobalAsyncStateManagerTest { expectedDestinationStats, ), ), - stateWithStats.keys.toList(), + stateWithStats.keys, ) - assertEquals(listOf(expectedDestinationStats), stateWithStats.values.toList()) + assertEquals(listOf(expectedDestinationStats), stateWithStats.values) assertThrows( IllegalArgumentException::class.java, @@ -618,9 +618,9 @@ class GlobalAsyncStateManagerTest { expectedDestinationStats, ), ), - stateWithStats.keys.toList(), + stateWithStats.keys, ) - assertEquals(listOf(expectedDestinationStats), stateWithStats.values.toList()) + assertEquals(listOf(expectedDestinationStats), stateWithStats.values) emittedStatesFromDestination.clear() @@ -645,11 +645,11 @@ class GlobalAsyncStateManagerTest { expectedDestinationStats2, ), ), - stateWithStats2.keys.toList(), + stateWithStats2.keys, ) assertEquals( listOf(expectedDestinationStats2), - stateWithStats2.values.toList(), + stateWithStats2.values, ) } @@ -680,9 +680,9 @@ class GlobalAsyncStateManagerTest { expectedDestinationStats, ), ), - stateWithStats.keys.toList(), + stateWithStats.keys, ) - assertEquals(listOf(expectedDestinationStats), stateWithStats.values.toList()) + assertEquals(listOf(expectedDestinationStats), stateWithStats.values) emittedStatesFromDestination.clear() stateManager.trackState(STREAM1_STATE_MESSAGE2, STATE_MSG_SIZE, DEFAULT_NAMESPACE) @@ -704,11 +704,11 @@ class GlobalAsyncStateManagerTest { expectedDestinationStats2, ), ), - stateWithStats2.keys.toList(), + stateWithStats2.keys, ) assertEquals( listOf(expectedDestinationStats2), - stateWithStats2.values.toList(), + stateWithStats2.values, ) emittedStatesFromDestination.clear() @@ -733,11 +733,11 @@ class GlobalAsyncStateManagerTest { expectedDestinationStats3, ), ), - stateWithStats3.keys.toList(), + stateWithStats3.keys, ) assertEquals( listOf(expectedDestinationStats3), - stateWithStats3.values.toList(), + stateWithStats3.values, ) } @@ -770,9 +770,9 @@ class GlobalAsyncStateManagerTest { expectedDestinationStats, ), ), - stateWithStats.keys.toList(), + stateWithStats.keys, ) - assertEquals(listOf(expectedDestinationStats), stateWithStats.values.toList()) + assertEquals(listOf(expectedDestinationStats), stateWithStats.values) emittedStatesFromDestination.clear() stateManager.decrement(stream2StateId, 4) @@ -803,11 +803,11 @@ class GlobalAsyncStateManagerTest { expectedDestinationStats2, ), ), - stateWithStats2.keys.toList(), + stateWithStats2.keys, ) assertEquals( listOf(expectedDestinationStats2), - stateWithStats2.values.toList(), + stateWithStats2.values, ) } } diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/destination/buffered_stream_consumer/BufferedStreamConsumerTest.kt b/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/destination/buffered_stream_consumer/BufferedStreamConsumerTest.kt index 2889871675bc..d8b5be49db9f 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/destination/buffered_stream_consumer/BufferedStreamConsumerTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/destination/buffered_stream_consumer/BufferedStreamConsumerTest.kt @@ -145,9 +145,11 @@ class BufferedStreamConsumerTest { verifyStartAndClose() val expectedRecords = - Lists.newArrayList(expectedRecordsBatch1, expectedRecordsBatch2) - .flatMap { obj: List -> obj } - .toList() + Lists.newArrayList(expectedRecordsBatch1, expectedRecordsBatch2).flatMap { + obj: List -> + obj + } + verifyRecords(STREAM_NAME, SCHEMA_NAME, expectedRecords) Mockito.verify(outputRecordCollector).accept(STATE_MESSAGE1) @@ -580,7 +582,7 @@ class BufferedStreamConsumerTest { Mockito.verify(recordWriter) .accept( AirbyteStreamNameNamespacePair(streamName, namespace), - expectedRecords.map { obj: AirbyteMessage -> obj.record }.toList() + expectedRecords.map { obj: AirbyteMessage -> obj.record } ) } diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/util/concurrent/ConcurrentStreamConsumerTest.kt b/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/util/concurrent/ConcurrentStreamConsumerTest.kt index d23ef0455900..9dd4a383c527 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/util/concurrent/ConcurrentStreamConsumerTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/util/concurrent/ConcurrentStreamConsumerTest.kt @@ -107,7 +107,7 @@ internal class ConcurrentStreamConsumerTest { val concurrentStreamConsumer = ConcurrentStreamConsumer(streamConsumer, streams.size) val partitionSize = concurrentStreamConsumer.parallelism - val partitions = Lists.partition(streams.toList(), partitionSize) + val partitions = Lists.partition(streams, partitionSize) for (partition in partitions) { Assertions.assertDoesNotThrow { concurrentStreamConsumer.accept(partition) } diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/kotlin/io/airbyte/cdk/testutils/ContainerFactory.kt b/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/kotlin/io/airbyte/cdk/testutils/ContainerFactory.kt index 2adceaa3b3f8..b6ac329ccea5 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/kotlin/io/airbyte/cdk/testutils/ContainerFactory.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/kotlin/io/airbyte/cdk/testutils/ContainerFactory.kt @@ -7,13 +7,11 @@ import com.google.common.collect.Lists import io.airbyte.commons.logging.LoggingHelper import io.airbyte.commons.logging.MdcScope import java.lang.reflect.InvocationTargetException -import java.util.List import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentMap import java.util.concurrent.atomic.AtomicInteger import java.util.function.Consumer import java.util.function.Supplier -import java.util.stream.Stream import kotlin.concurrent.Volatile import org.apache.commons.lang3.StringUtils import org.slf4j.Logger @@ -66,7 +64,7 @@ abstract class ContainerFactory> { private fun getTestContainerLogMdcBuilder( imageName: DockerImageName?, - containerModifiers: MutableList> + containerModifiers: List> ): MdcScope.Builder { return MdcScope.Builder() .setLogPrefix( @@ -93,26 +91,24 @@ abstract class ContainerFactory> { fun shared(imageName: String, vararg methods: String): C { return shared( imageName, - Stream.of(*methods) - .map { n: String -> NamedContainerModifierImpl(n, resolveModifierByName(n)) } - .toList() + methods.map { n: String -> NamedContainerModifierImpl(n, resolveModifierByName(n)) } ) } fun shared(imageName: String, vararg namedContainerModifiers: NamedContainerModifier): C { - return shared(imageName, List.of(*namedContainerModifiers)) + return shared(imageName, listOf(*namedContainerModifiers)) } @JvmOverloads fun shared( imageName: String, - namedContainerModifiers: MutableList> = ArrayList() + namedContainerModifiers: List> = ArrayList() ): C { val containerKey = ContainerKey( javaClass, DockerImageName.parse(imageName), - namedContainerModifiers.map { it.name() }.toList() + namedContainerModifiers.map { it.name() } ) // We deliberately avoid creating the container itself eagerly during the evaluation of the // map @@ -137,20 +133,18 @@ abstract class ContainerFactory> { fun exclusive(imageName: String, vararg methods: String): C { return exclusive( imageName, - Stream.of(*methods) - .map { n: String -> NamedContainerModifierImpl(n, resolveModifierByName(n)) } - .toList() + methods.map { n: String -> NamedContainerModifierImpl(n, resolveModifierByName(n)) } ) } fun exclusive(imageName: String, vararg namedContainerModifiers: NamedContainerModifier): C { - return exclusive(imageName, List.of(*namedContainerModifiers)) + return exclusive(imageName, *namedContainerModifiers) } @JvmOverloads fun exclusive( imageName: String, - namedContainerModifiers: MutableList> = ArrayList() + namedContainerModifiers: List> = ArrayList() ): C { return createAndStartContainer(DockerImageName.parse(imageName), namedContainerModifiers) } @@ -200,7 +194,7 @@ abstract class ContainerFactory> { private fun createAndStartContainer( imageName: DockerImageName?, - namedContainerModifiers: MutableList> + namedContainerModifiers: List> ): C { LOGGER!!.info( "Creating new container based on {} with {}.", diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/kotlin/io/airbyte/cdk/testutils/TestDatabase.kt b/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/kotlin/io/airbyte/cdk/testutils/TestDatabase.kt index 58ecdbb48387..1bae8bf823e0 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/kotlin/io/airbyte/cdk/testutils/TestDatabase.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/kotlin/io/airbyte/cdk/testutils/TestDatabase.kt @@ -184,16 +184,13 @@ protected constructor(val container: C) : AutoCloseable { protected fun execInContainer(cmds: Stream) { val cmd = cmds.toList() - if (cmd!!.isEmpty()) { + if (cmd.isEmpty()) { return } try { LOGGER!!.info( formatLogLine( - String.format( - "executing command %s", - Strings.join(cmd.toList().asIterable(), " ") - ) + String.format("executing command %s", Strings.join(cmd.asIterable(), " ")) ) ) val exec = container.execInContainer(*cmd.toTypedArray()) diff --git a/airbyte-cdk/java/airbyte-cdk/datastore-bigquery/src/main/kotlin/io/airbyte/cdk/db/bigquery/BigQueryDatabase.kt b/airbyte-cdk/java/airbyte-cdk/datastore-bigquery/src/main/kotlin/io/airbyte/cdk/db/bigquery/BigQueryDatabase.kt index d123206000e5..aeb4cb550a44 100644 --- a/airbyte-cdk/java/airbyte-cdk/datastore-bigquery/src/main/kotlin/io/airbyte/cdk/db/bigquery/BigQueryDatabase.kt +++ b/airbyte-cdk/java/airbyte-cdk/datastore-bigquery/src/main/kotlin/io/airbyte/cdk/db/bigquery/BigQueryDatabase.kt @@ -98,14 +98,12 @@ constructor( @Throws(Exception::class) override fun unsafeQuery(sql: String?, vararg params: String): Stream { val parameterValueList = - Arrays.stream(params) - .map { param: String -> - QueryParameterValue.newBuilder() - .setValue(param) - .setType(StandardSQLTypeName.STRING) - .build() - } - .toList() + params.map { param: String -> + QueryParameterValue.newBuilder() + .setValue(param) + .setType(StandardSQLTypeName.STRING) + .build() + } return query(sql, parameterValueList) } diff --git a/airbyte-cdk/java/airbyte-cdk/datastore-mongo/src/main/kotlin/io/airbyte/cdk/db/mongodb/MongoUtils.kt b/airbyte-cdk/java/airbyte-cdk/datastore-mongo/src/main/kotlin/io/airbyte/cdk/db/mongodb/MongoUtils.kt index e2929a4b9673..3d8d35c9b1ee 100644 --- a/airbyte-cdk/java/airbyte-cdk/datastore-mongo/src/main/kotlin/io/airbyte/cdk/db/mongodb/MongoUtils.kt +++ b/airbyte-cdk/java/airbyte-cdk/datastore-mongo/src/main/kotlin/io/airbyte/cdk/db/mongodb/MongoUtils.kt @@ -127,9 +127,10 @@ object MongoUtils { val field = node.data if (node.hasChildren()) { val subFields = - node.children!! - .map { obj: TreeNode> -> nodeToCommonField(obj) } - .toList() + node.children!!.map { obj: TreeNode> -> + nodeToCommonField(obj) + } + return CommonField(field.name, field.type, subFields) } else { return CommonField(field.name, field.type) @@ -294,17 +295,15 @@ object MongoUtils { ): List>> { val allkeys = HashSet(getFieldsName(collection)) - return allkeys - .map { key: String -> - val types = getTypes(collection, key) - val type = getUniqueType(types) - val fieldNode = TreeNode(CommonField(transformName(types, key), type)) - if (type == BsonType.DOCUMENT) { - setSubFields(collection, fieldNode, key) - } - fieldNode + return allkeys.map { key: String -> + val types = getTypes(collection, key) + val type = getUniqueType(types) + val fieldNode = TreeNode(CommonField(transformName(types, key), type)) + if (type == BsonType.DOCUMENT) { + setSubFields(collection, fieldNode, key) } - .toList() + fieldNode + } } /** diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcDestinationHandler.kt b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcDestinationHandler.kt index d9e4e20349c3..62d21df44cf4 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcDestinationHandler.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcDestinationHandler.kt @@ -195,11 +195,10 @@ abstract class JdbcDestinationHandler( } val initialStates = - streamConfigs - .map { streamConfig: StreamConfig -> - retrieveState(destinationStatesFuture, streamConfig) - } - .toList() + streamConfigs.map { streamConfig: StreamConfig -> + retrieveState(destinationStatesFuture, streamConfig) + } + val states = CompletableFutures.allOf(initialStates).toCompletableFuture().join() return getResultsOrLogAndThrowFirst("Failed to retrieve initial state", states) } diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcSqlGenerator.kt b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcSqlGenerator.kt index fab24831ab0f..6a81cbf02512 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcSqlGenerator.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcSqlGenerator.kt @@ -157,11 +157,10 @@ constructor( metaColumns: Map?> ): List> { val fields = - metaColumns.entries - .map { metaColumn: Map.Entry?> -> - DSL.field(DSL.quotedName(metaColumn.key), metaColumn.value) - } - .toList() + metaColumns.entries.map { metaColumn: Map.Entry?> -> + DSL.field(DSL.quotedName(metaColumn.key), metaColumn.value) + } + val dataFields = columns.entries .map { column: Map.Entry -> @@ -203,11 +202,10 @@ constructor( useExpensiveSaferCasting: Boolean ): List> { val fields = - metaColumns.entries - .map { metaColumn: Map.Entry?> -> - DSL.field(DSL.quotedName(metaColumn.key), metaColumn.value) - } - .toList() + metaColumns.entries.map { metaColumn: Map.Entry?> -> + DSL.field(DSL.quotedName(metaColumn.key), metaColumn.value) + } + // Use originalName with non-sanitized characters when extracting data from _airbyte_data val dataFields = extractRawDataFields(columns, useExpensiveSaferCasting) dataFields.addAll(fields) diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/DestinationAcceptanceTest.kt b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/DestinationAcceptanceTest.kt index d9ee39dc5ebe..a0794f881585 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/DestinationAcceptanceTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/DestinationAcceptanceTest.kt @@ -424,13 +424,9 @@ abstract class DestinationAcceptanceTest { ) val configuredCatalog = CatalogHelpers.toDefaultConfiguredCatalog(catalog) val messages: List = - MoreResources.readResource(messagesFilename) - .trim() - .lines() - .map { - Jsons.deserialize(it, io.airbyte.protocol.models.v0.AirbyteMessage::class.java) - } - .toList() + MoreResources.readResource(messagesFilename).trim().lines().map { + Jsons.deserialize(it, io.airbyte.protocol.models.v0.AirbyteMessage::class.java) + } val config = getConfig() val defaultSchema = getDefaultSchema(config) @@ -454,13 +450,9 @@ abstract class DestinationAcceptanceTest { ) val configuredCatalog = CatalogHelpers.toDefaultConfiguredCatalog(catalog) val messages: List = - MoreResources.readResource(messagesFilename) - .trim() - .lines() - .map { - Jsons.deserialize(it, io.airbyte.protocol.models.v0.AirbyteMessage::class.java) - } - .toList() + MoreResources.readResource(messagesFilename).trim().lines().map { + Jsons.deserialize(it, io.airbyte.protocol.models.v0.AirbyteMessage::class.java) + } val largeNumberRecords = Collections.nCopies(400, messages) @@ -478,7 +470,6 @@ abstract class DestinationAcceptanceTest { else message.toString() } ) - .toList() val config = getConfig() runSyncAndVerifyStateOutput(config, largeNumberRecords, configuredCatalog, false) @@ -518,7 +509,7 @@ abstract class DestinationAcceptanceTest { io.airbyte.protocol.models.v0.AirbyteMessage::class.java ) } - .toList() + val config = getConfig() runSyncAndVerifyStateOutput(config, firstSyncMessages, configuredCatalog, false) @@ -710,7 +701,7 @@ abstract class DestinationAcceptanceTest { .trim() .lines() .map { Jsons.deserialize(it, AirbyteMessage::class.java) } - .toList() + val config = getConfig() runSyncAndVerifyStateOutput(config, firstSyncMessages, configuredCatalog, false) val secondSyncMessages: List = @@ -844,14 +835,13 @@ abstract class DestinationAcceptanceTest { // We expect all the of messages to be missing the removed column after normalization. val expectedMessages = - messages - .map { message: io.airbyte.protocol.models.v0.AirbyteMessage -> - if (message.record != null) { - (message.record.data as ObjectNode).remove("HKD") - } - message + messages.map { message: io.airbyte.protocol.models.v0.AirbyteMessage -> + if (message.record != null) { + (message.record.data as ObjectNode).remove("HKD") } - .toList() + message + } + assertSameMessages(expectedMessages, actualMessages, true) } @@ -1020,7 +1010,6 @@ abstract class DestinationAcceptanceTest { it.record.data["NZD"].asText() (it.record.emittedAt == latestMessagesOnly[key]!!.record.emittedAt) } - .toList() val defaultSchema = getDefaultSchema(config) retrieveRawRecordsAndAssertSameMessages( @@ -1689,14 +1678,14 @@ abstract class DestinationAcceptanceTest { val streamName = stream.name val schema = if (stream.namespace != null) stream.namespace else defaultSchema!! val msgList = - retrieveRecords(testEnv, streamName, schema, stream.jsonSchema) - .map { data: JsonNode -> - AirbyteRecordMessage() - .withStream(streamName) - .withNamespace(schema) - .withData(data) - } - .toList() + retrieveRecords(testEnv, streamName, schema, stream.jsonSchema).map { data: JsonNode + -> + AirbyteRecordMessage() + .withStream(streamName) + .withNamespace(schema) + .withData(data) + } + actualMessages.addAll(msgList) } @@ -1720,7 +1709,6 @@ abstract class DestinationAcceptanceTest { if (pruneAirbyteInternalFields) safePrune(recordMessage) else recordMessage } .map { obj: AirbyteRecordMessage -> obj.data } - .toList() val actualProcessed = actual @@ -1728,7 +1716,6 @@ abstract class DestinationAcceptanceTest { if (pruneAirbyteInternalFields) safePrune(recordMessage) else recordMessage } .map { obj: AirbyteRecordMessage -> obj.data } - .toList() _testDataComparator.assertSameData(expectedProcessed, actualProcessed) } @@ -1744,11 +1731,11 @@ abstract class DestinationAcceptanceTest { val streamName = stream.name val msgList = - retrieveNormalizedRecords(testEnv, streamName, defaultSchema) - .map { data: JsonNode -> - AirbyteRecordMessage().withStream(streamName).withData(data) - } - .toList() + retrieveNormalizedRecords(testEnv, streamName, defaultSchema).map { data: JsonNode + -> + AirbyteRecordMessage().withStream(streamName).withData(data) + } + actualMessages.addAll(msgList) } return actualMessages diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/typing_deduping/JdbcSqlGeneratorIntegrationTest.kt b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/typing_deduping/JdbcSqlGeneratorIntegrationTest.kt index be7ea6293de8..d150ec8d7e3c 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/typing_deduping/JdbcSqlGeneratorIntegrationTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/typing_deduping/JdbcSqlGeneratorIntegrationTest.kt @@ -64,32 +64,28 @@ abstract class JdbcSqlGeneratorIntegrationTest DSL.field(DSL.quotedName(columnName)) } - .toList() + columnNames.map { columnName: String -> DSL.field(DSL.quotedName(columnName)) } ) for (record in records) { insert = insert.values( - columnNames - .map { fieldName: String -> - // Convert this field to a string. Pretty naive implementation. - val column = record[fieldName] - val columnAsString = - if (column == null) { - null - } else if (column.isTextual) { - column.asText() - } else { - column.toString() - } - if (Arrays.asList(*columnsToParseJson).contains(fieldName)) { - return@map toJsonValue(columnAsString) + columnNames.map { fieldName: String -> + // Convert this field to a string. Pretty naive implementation. + val column = record[fieldName] + val columnAsString = + if (column == null) { + null + } else if (column.isTextual) { + column.asText() } else { - return@map DSL.`val`(columnAsString) + column.toString() } + if (Arrays.asList(*columnsToParseJson).contains(fieldName)) { + return@map toJsonValue(columnAsString) + } else { + return@map DSL.`val`(columnAsString) } - .toList() + } ) } database.execute(insert.getSQL(ParamType.INLINED)) diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/AirbyteFileOffsetBackingStore.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/AirbyteFileOffsetBackingStore.kt index de8399a0213d..2d8eed56d374 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/AirbyteFileOffsetBackingStore.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/AirbyteFileOffsetBackingStore.kt @@ -61,6 +61,7 @@ class AirbyteFileOffsetBackingStore( private fun updateStateForDebezium2_1(mapAsString: Map): Map { val updatedMap: MutableMap = LinkedHashMap() if (mapAsString.size > 0) { + // We're getting the 1st of a map. Something fishy going on here val key = mapAsString.keys.toList()[0] val i = key.indexOf('[') val i1 = key.lastIndexOf(']') diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/jdbc/AbstractJdbcSource.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/jdbc/AbstractJdbcSource.kt index 7540f838d75f..221d32d877e4 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/jdbc/AbstractJdbcSource.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/jdbc/AbstractJdbcSource.kt @@ -256,12 +256,10 @@ abstract class AbstractJdbcSource( f.get(INTERNAL_COLUMN_NAME).asText(), datatype ) {} - } - .toList(), + }, cursorFields = extractCursorFields(fields) ) } - .toList() } private fun extractCursorFields(fields: List): List { @@ -270,7 +268,6 @@ abstract class AbstractJdbcSource( isCursorType(sourceOperations.getDatabaseFieldType(field)) } .map { it.get(INTERNAL_COLUMN_NAME).asText() } - .toList() } protected fun excludeNotAccessibleTables( @@ -698,7 +695,6 @@ abstract class AbstractJdbcSource( ) } .map { `object`: ConfiguredAirbyteStream -> Jsons.clone(`object`) } - .toList() } companion object { diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/AbstractDbSource.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/AbstractDbSource.kt index 0c8a5ec04657..a9834933993e 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/AbstractDbSource.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/AbstractDbSource.kt @@ -291,11 +291,9 @@ protected constructor(driverClassName: String) : val discoveredTables = discoverInternal(database) return (if (systemNameSpaces.isEmpty()) discoveredTables else - discoveredTables - .filter { table: TableInfo> -> - !systemNameSpaces.contains(table.nameSpace) && !systemViews.contains(table.name) - } - .toList()) + discoveredTables.filter { table: TableInfo> -> + !systemNameSpaces.contains(table.nameSpace) && !systemViews.contains(table.name) + }) } protected fun getFullRefreshIterators( @@ -402,7 +400,6 @@ protected constructor(driverClassName: String) : table.fields .map { obj: CommonField -> obj.name } .filter { o: String -> selectedFieldsInCatalog.contains(o) } - .toList() val iterator: AutoCloseableIterator // checks for which sync mode we're using based on the configured airbytestream diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/DbSourceDiscoverUtil.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/DbSourceDiscoverUtil.kt index 26d82375e197..59023764e39d 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/DbSourceDiscoverUtil.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/DbSourceDiscoverUtil.kt @@ -46,7 +46,7 @@ object DbSourceDiscoverUtil { toField(commonField, airbyteTypeConverter) } .distinct() - .toList() + val currentJsonSchema = CatalogHelpers.fieldsToJsonSchema(fields) val catalogSchema = stream.jsonSchema val currentSchemaProperties = currentJsonSchema["properties"] @@ -85,37 +85,35 @@ object DbSourceDiscoverUtil { airbyteTypeConverter: Function ): AirbyteCatalog { val tableInfoFieldList = - tableInfos - .map { t: TableInfo> -> - // some databases return multiple copies of the same record for a column (e.g. - // redshift) because - // they have at least once delivery guarantees. we want to dedupe these, but - // first we check that the - // records are actually the same and provide a good error message if they are - // not. - assertColumnsWithSameNameAreSame(t.nameSpace, t.name, t.fields) - val fields = - t.fields - .map { commonField: CommonField -> - toField(commonField, airbyteTypeConverter) - } - .distinct() - .toList() - val fullyQualifiedTableName = getFullyQualifiedTableName(t.nameSpace, t.name) - val primaryKeys = - fullyQualifiedTableNameToPrimaryKeys.getOrDefault( - fullyQualifiedTableName, - emptyList() - ) - TableInfo( - nameSpace = t.nameSpace, - name = t.name, - fields = fields, - primaryKeys = primaryKeys, - cursorFields = t.cursorFields + tableInfos.map { t: TableInfo> -> + // some databases return multiple copies of the same record for a column (e.g. + // redshift) because + // they have at least once delivery guarantees. we want to dedupe these, but + // first we check that the + // records are actually the same and provide a good error message if they are + // not. + assertColumnsWithSameNameAreSame(t.nameSpace, t.name, t.fields) + val fields = + t.fields + .map { commonField: CommonField -> + toField(commonField, airbyteTypeConverter) + } + .distinct() + + val fullyQualifiedTableName = getFullyQualifiedTableName(t.nameSpace, t.name) + val primaryKeys = + fullyQualifiedTableNameToPrimaryKeys.getOrDefault( + fullyQualifiedTableName, + emptyList() ) - } - .toList() + TableInfo( + nameSpace = t.nameSpace, + name = t.name, + fields = fields, + primaryKeys = primaryKeys, + cursorFields = t.cursorFields + ) + } val streams = tableInfoFieldList @@ -124,7 +122,7 @@ object DbSourceDiscoverUtil { tableInfo.primaryKeys .filter { obj: String -> Objects.nonNull(obj) } .map { listOf(it) } - .toList() + CatalogHelpers.createAirbyteStream( tableInfo.name, tableInfo.nameSpace, @@ -157,11 +155,10 @@ object DbSourceDiscoverUtil { !commonField.properties.isEmpty() ) { val properties = - commonField.properties - .map { commField: CommonField -> - toField(commField, airbyteTypeConverter) - } - .toList() + commonField.properties.map { commField: CommonField -> + toField(commField, airbyteTypeConverter) + } + return Field.of( commonField.name, airbyteTypeConverter.apply(commonField.type), diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/RelationalDbReadUtil.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/RelationalDbReadUtil.kt index 4e1a2a7fdf8d..3a489c4f2c0e 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/RelationalDbReadUtil.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/RelationalDbReadUtil.kt @@ -26,7 +26,6 @@ object RelationalDbReadUtil { ) } .map { `object`: ConfiguredAirbyteStream -> Jsons.clone(`object`) } - .toList() } @JvmStatic @@ -48,7 +47,6 @@ object RelationalDbReadUtil { ) } .map { `object`: ConfiguredAirbyteStream -> Jsons.clone(`object`) } - .toList() } @JvmStatic diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/GlobalStateManager.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/GlobalStateManager.kt index db5d05b22abe..33c08d2d433e 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/GlobalStateManager.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/GlobalStateManager.kt @@ -167,7 +167,6 @@ class GlobalStateManager( .withName(s.streamName) ) } - .toList() } else { return@Supplier listOf() } diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/StateGeneratorUtils.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/StateGeneratorUtils.kt index 5283189c58cb..817a9245ae5f 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/StateGeneratorUtils.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/StateGeneratorUtils.kt @@ -100,7 +100,6 @@ object StateGeneratorUtils { generateStreamState(e.key, e.value) } .filter { s: AirbyteStreamState -> isValidStreamDescriptor(s.streamDescriptor) } - .toList() } /** @@ -123,7 +122,6 @@ object StateGeneratorUtils { .map { e: Map.Entry -> generateDbStreamState(e.key, e.value) } - .toList() ) } @@ -203,17 +201,15 @@ object StateGeneratorUtils { AirbyteGlobalState() .withSharedState(Jsons.jsonNode(dbState.cdcState)) .withStreamStates( - dbState.streams - .map { s: DbStreamState -> - AirbyteStreamState() - .withStreamDescriptor( - StreamDescriptor() - .withName(s.streamName) - .withNamespace(s.streamNamespace) - ) - .withStreamState(Jsons.jsonNode(s)) - } - .toList() + dbState.streams.map { s: DbStreamState -> + AirbyteStreamState() + .withStreamDescriptor( + StreamDescriptor() + .withName(s.streamName) + .withNamespace(s.streamNamespace) + ) + .withStreamState(Jsons.jsonNode(s)) + } ) return AirbyteStateMessage() .withType(AirbyteStateMessage.AirbyteStateType.GLOBAL) @@ -230,22 +226,20 @@ object StateGeneratorUtils { fun convertLegacyStateToStreamState( airbyteStateMessage: AirbyteStateMessage ): List { - return Jsons.`object`(airbyteStateMessage.data, DbState::class.java)!! - .streams - .map { s: DbStreamState -> - AirbyteStateMessage() - .withType(AirbyteStateMessage.AirbyteStateType.STREAM) - .withStream( - AirbyteStreamState() - .withStreamDescriptor( - StreamDescriptor() - .withNamespace(s.streamNamespace) - .withName(s.streamName) - ) - .withStreamState(Jsons.jsonNode(s)) - ) - } - .toList() + return Jsons.`object`(airbyteStateMessage.data, DbState::class.java)!!.streams.map { + s: DbStreamState -> + AirbyteStateMessage() + .withType(AirbyteStateMessage.AirbyteStateType.STREAM) + .withStream( + AirbyteStreamState() + .withStreamDescriptor( + StreamDescriptor() + .withNamespace(s.streamNamespace) + .withName(s.streamName) + ) + .withStreamState(Jsons.jsonNode(s)) + ) + } } fun convertStateMessage( diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/StreamStateManager.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/StreamStateManager.kt index fdd11857631b..1e34647d779d 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/StreamStateManager.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/StreamStateManager.kt @@ -34,7 +34,7 @@ open class StreamStateManager ) : AbstractStateManager( catalog, - Supplier { rawAirbyteStateMessages.map { it.stream }.toList() }, + Supplier { rawAirbyteStateMessages.map { it.stream } }, StateGeneratorUtils.CURSOR_FUNCTION, StateGeneratorUtils.CURSOR_FIELD_FUNCTION, StateGeneratorUtils.CURSOR_RECORD_COUNT_FUNCTION, diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/debezium/CdcSourceTest.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/debezium/CdcSourceTest.kt index f62439cf0081..97f3e4b154a2 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/debezium/CdcSourceTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/debezium/CdcSourceTest.kt @@ -289,7 +289,6 @@ abstract class CdcSourceTest> { return messages .filter { r: AirbyteMessage -> r.type == AirbyteMessage.Type.STATE } .map { obj: AirbyteMessage -> obj.state } - .toList() } protected fun assertExpectedRecords( @@ -1052,18 +1051,17 @@ abstract class CdcSourceTest> { @JvmField val MODEL_RECORDS_RANDOM: List = MODEL_RECORDS.map { r: JsonNode -> - Jsons.jsonNode( - ImmutableMap.of( - COL_ID + "_random", - r[COL_ID].asInt() * 1000, - COL_MAKE_ID + "_random", - r[COL_MAKE_ID], - COL_MODEL + "_random", - r[COL_MODEL].asText() + "-random" - ) + Jsons.jsonNode( + ImmutableMap.of( + COL_ID + "_random", + r[COL_ID].asInt() * 1000, + COL_MAKE_ID + "_random", + r[COL_MAKE_ID], + COL_MODEL + "_random", + r[COL_MODEL].asText() + "-random" ) - } - .toList() + ) + } @JvmStatic protected fun removeDuplicates( diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.kt index da48b9ecfa88..850636430527 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.kt @@ -943,7 +943,7 @@ abstract class JdbcSourceAcceptanceTest> { firstSyncActualMessages .filter { r: AirbyteMessage -> r.type == AirbyteMessage.Type.RECORD } .map { r: AirbyteMessage -> r.record.data[COL_NAME].asText() } - .toList() + // some databases don't make insertion order guarantee when equal ordering value if ( testdb.databaseDriver == DatabaseDriver.TERADATA || @@ -993,7 +993,7 @@ abstract class JdbcSourceAcceptanceTest> { secondSyncActualMessages .filter { r: AirbyteMessage -> r.type == AirbyteMessage.Type.RECORD } .map { r: AirbyteMessage -> r.record.data[COL_NAME].asText() } - .toList() + Assertions.assertEquals(listOf("c"), secondSyncNames) // 3rd sync has records with duplicated cursors @@ -1047,7 +1047,6 @@ abstract class JdbcSourceAcceptanceTest> { thirdSyncActualMessages .filter { r: AirbyteMessage -> r.type == AirbyteMessage.Type.RECORD } .map { r: AirbyteMessage -> r.record.data[COL_NAME].asText() } - .toList() // teradata doesn't make insertion order guarantee when equal ordering value if (testdb.databaseDriver == DatabaseDriver.TERADATA) { @@ -1269,47 +1268,41 @@ abstract class JdbcSourceAcceptanceTest> { states: List, numRecords: Long ): List { - return states - .map { s: DbStreamState -> - AirbyteMessage() - .withType(AirbyteMessage.Type.STATE) - .withState( - AirbyteStateMessage() - .withType(AirbyteStateMessage.AirbyteStateType.STREAM) - .withStream( - AirbyteStreamState() - .withStreamDescriptor( - StreamDescriptor() - .withNamespace(s.streamNamespace) - .withName(s.streamName) - ) - .withStreamState(Jsons.jsonNode(s)) - ) - .withData(Jsons.jsonNode(DbState().withCdc(false).withStreams(states))) - .withSourceStats( - AirbyteStateStats().withRecordCount(numRecords.toDouble()) - ) - ) - } - .toList() + return states.map { s: DbStreamState -> + AirbyteMessage() + .withType(AirbyteMessage.Type.STATE) + .withState( + AirbyteStateMessage() + .withType(AirbyteStateMessage.AirbyteStateType.STREAM) + .withStream( + AirbyteStreamState() + .withStreamDescriptor( + StreamDescriptor() + .withNamespace(s.streamNamespace) + .withName(s.streamName) + ) + .withStreamState(Jsons.jsonNode(s)) + ) + .withData(Jsons.jsonNode(DbState().withCdc(false).withStreams(states))) + .withSourceStats(AirbyteStateStats().withRecordCount(numRecords.toDouble())) + ) + } } protected open fun createState(states: List): List { - return states - .map { s: DbStreamState -> - AirbyteStateMessage() - .withType(AirbyteStateMessage.AirbyteStateType.STREAM) - .withStream( - AirbyteStreamState() - .withStreamDescriptor( - StreamDescriptor() - .withNamespace(s.streamNamespace) - .withName(s.streamName) - ) - .withStreamState(Jsons.jsonNode(s)) - ) - } - .toList() + return states.map { s: DbStreamState -> + AirbyteStateMessage() + .withType(AirbyteStateMessage.AirbyteStateType.STREAM) + .withStream( + AirbyteStreamState() + .withStreamDescriptor( + StreamDescriptor() + .withNamespace(s.streamNamespace) + .withName(s.streamName) + ) + .withStreamState(Jsons.jsonNode(s)) + ) + } } @Throws(SQLException::class) @@ -1507,20 +1500,16 @@ abstract class JdbcSourceAcceptanceTest> { if (s.stream.streamState[field] != null) s.stream.streamState[field].asText() else "" } - .toList() } protected fun filterRecords(messages: List): List { - return messages - .filter { r: AirbyteMessage -> r.type == AirbyteMessage.Type.RECORD } - .toList() + return messages.filter { r: AirbyteMessage -> r.type == AirbyteMessage.Type.RECORD } } protected fun extractStateMessage(messages: List): List { return messages .filter { r: AirbyteMessage -> r.type == AirbyteMessage.Type.STATE } .map { obj: AirbyteMessage -> obj.state } - .toList() } protected fun extractStateMessage( @@ -1533,7 +1522,6 @@ abstract class JdbcSourceAcceptanceTest> { r.state.stream.streamDescriptor.name == streamName } .map { obj: AirbyteMessage -> obj.state } - .toList() } protected fun createRecord( diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/AbstractSourceDatabaseTypeTest.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/AbstractSourceDatabaseTypeTest.kt index 5846fdcfeaff..533b0007749e 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/AbstractSourceDatabaseTypeTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/AbstractSourceDatabaseTypeTest.kt @@ -129,9 +129,8 @@ abstract class AbstractSourceDatabaseTypeTest : AbstractSourceConnectorTest() { val allMessages = runRead(catalog) val recordMessages = - allMessages - .filter { m: AirbyteMessage -> m.type == AirbyteMessage.Type.RECORD } - .toList() + allMessages.filter { m: AirbyteMessage -> m.type == AirbyteMessage.Type.RECORD } + val expectedValues: MutableMap?> = HashMap() val missedValuesByStream: MutableMap> = HashMap() val unexpectedValuesByStream: MutableMap> = HashMap() @@ -272,32 +271,30 @@ abstract class AbstractSourceDatabaseTypeTest : AbstractSourceConnectorTest() { get() = ConfiguredAirbyteCatalog() .withStreams( - testDataHolders - .map { test: TestDataHolder -> - ConfiguredAirbyteStream() - .withSyncMode(SyncMode.INCREMENTAL) - .withCursorField(Lists.newArrayList(idColumnName)) - .withDestinationSyncMode(DestinationSyncMode.APPEND) - .withStream( - CatalogHelpers.createAirbyteStream( - String.format("%s", test.nameWithTestPrefix), - String.format("%s", nameSpace), - Field.of(idColumnName, JsonSchemaType.INTEGER), - Field.of(testColumnName, test.airbyteType) - ) - .withSourceDefinedCursor(true) - .withSourceDefinedPrimaryKey( - java.util.List.of(java.util.List.of(idColumnName)) - ) - .withSupportedSyncModes( - Lists.newArrayList( - SyncMode.FULL_REFRESH, - SyncMode.INCREMENTAL - ) + testDataHolders.map { test: TestDataHolder -> + ConfiguredAirbyteStream() + .withSyncMode(SyncMode.INCREMENTAL) + .withCursorField(Lists.newArrayList(idColumnName)) + .withDestinationSyncMode(DestinationSyncMode.APPEND) + .withStream( + CatalogHelpers.createAirbyteStream( + String.format("%s", test.nameWithTestPrefix), + String.format("%s", nameSpace), + Field.of(idColumnName, JsonSchemaType.INTEGER), + Field.of(testColumnName, test.airbyteType) + ) + .withSourceDefinedCursor(true) + .withSourceDefinedPrimaryKey( + java.util.List.of(java.util.List.of(idColumnName)) + ) + .withSupportedSyncModes( + Lists.newArrayList( + SyncMode.FULL_REFRESH, + SyncMode.INCREMENTAL ) - ) - } - .toList() + ) + ) + } ) /** @@ -393,7 +390,6 @@ abstract class AbstractSourceDatabaseTypeTest : AbstractSourceConnectorTest() { return messages .filter { r: AirbyteMessage -> r.type == AirbyteMessage.Type.STATE } .map { obj: AirbyteMessage -> obj.state } - .toList() } companion object { diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/PythonSourceAcceptanceTest.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/PythonSourceAcceptanceTest.kt index 849e0376ba4f..bb96403fdf15 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/PythonSourceAcceptanceTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/PythonSourceAcceptanceTest.kt @@ -52,10 +52,11 @@ class PythonSourceAcceptanceTest : SourceAcceptanceTest() { Streams.stream( runExecutable(Command.GET_REGEX_TESTS).withArray("tests").elements() ) - .map { obj: JsonNode -> obj.textValue() } .toList() + .map { obj: JsonNode -> obj.textValue() } + val stringMessages = - allMessages.map { `object`: AirbyteMessage -> Jsons.serialize(`object`) }.toList() + allMessages.map { `object`: AirbyteMessage -> Jsons.serialize(`object`) } LOGGER.info("Running " + regexTests.size + " regex tests...") regexTests.forEach( Consumer { regex: String -> diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/SourceAcceptanceTest.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/SourceAcceptanceTest.kt index 0b2c3cfad509..02682c17de16 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/SourceAcceptanceTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/SourceAcceptanceTest.kt @@ -211,9 +211,9 @@ abstract class SourceAcceptanceTest : AbstractSourceConnectorTest() { val configuredCatalog = withSourceDefinedCursors(configuredCatalog) // only sync incremental streams configuredCatalog.streams = - configuredCatalog.streams - .filter { s: ConfiguredAirbyteStream -> s.syncMode == SyncMode.INCREMENTAL } - .toList() + configuredCatalog.streams.filter { s: ConfiguredAirbyteStream -> + s.syncMode == SyncMode.INCREMENTAL + } val airbyteMessages = runRead(configuredCatalog, state) val recordMessages = filterRecords(airbyteMessages) @@ -221,7 +221,7 @@ abstract class SourceAcceptanceTest : AbstractSourceConnectorTest() { airbyteMessages .filter { m: AirbyteMessage -> m.type == AirbyteMessage.Type.STATE } .map { obj: AirbyteMessage -> obj.state } - .toList() + Assertions.assertFalse( recordMessages.isEmpty(), "Expected the first incremental sync to produce records" @@ -372,13 +372,12 @@ abstract class SourceAcceptanceTest : AbstractSourceConnectorTest() { actual: List, message: String ) { - val prunedExpected = - expected.map { m: AirbyteRecordMessage -> this.pruneEmittedAt(m) }.toList() + val prunedExpected = expected.map { m: AirbyteRecordMessage -> this.pruneEmittedAt(m) } val prunedActual = actual .map { m: AirbyteRecordMessage -> this.pruneEmittedAt(m) } .map { m: AirbyteRecordMessage -> this.pruneCdcMetadata(m) } - .toList() + Assertions.assertEquals(prunedExpected.size, prunedActual.size, message) Assertions.assertTrue(prunedExpected.containsAll(prunedActual), message) Assertions.assertTrue(prunedActual.containsAll(prunedExpected), message) @@ -418,7 +417,6 @@ abstract class SourceAcceptanceTest : AbstractSourceConnectorTest() { return messages .filter { m: AirbyteMessage -> m.type == AirbyteMessage.Type.RECORD } .map { obj: AirbyteMessage -> obj.record } - .toList() } @JvmStatic diff --git a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/enums/Enums.kt b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/enums/Enums.kt index 9bee126061c8..d8f1f2c30ebe 100644 --- a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/enums/Enums.kt +++ b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/enums/Enums.kt @@ -62,7 +62,7 @@ class Enums { ies: List, oe: Class ): List { - return ies.map { convertTo(it, oe) }.toList() + return ies.map { convertTo(it, oe) } } } } diff --git a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/json/JsonPaths.kt b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/json/JsonPaths.kt index 8553edb6a9ce..10495a84ed00 100644 --- a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/json/JsonPaths.kt +++ b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/json/JsonPaths.kt @@ -167,9 +167,9 @@ object JsonPaths { * specifically that said, we do expect that there will be no duplicates in the returned list. */ fun getPaths(json: JsonNode?, jsonPath: String): List { - return getInternal(GET_PATHS_CONFIGURATION, json, jsonPath) - .map { obj: JsonNode -> obj.asText() } - .toList() + return getInternal(GET_PATHS_CONFIGURATION, json, jsonPath).map { obj: JsonNode -> + obj.asText() + } } /** diff --git a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/json/JsonSchemas.kt b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/json/JsonSchemas.kt index 79bd68f348d5..8e978239fb6a 100644 --- a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/json/JsonSchemas.kt +++ b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/json/JsonSchemas.kt @@ -150,7 +150,7 @@ object JsonSchemas { traverseJsonSchema(jsonSchema) { node: JsonNode?, path: List -> mapper.apply(node, path).ifPresent { e: T -> collector.add(e) } } - return collector.toList() // make list unmodifiable + return collector // make list unmodifiable } /** @@ -300,9 +300,10 @@ object JsonSchemas { fun getType(jsonSchema: JsonNode): List { if (jsonSchema.has(JSON_SCHEMA_TYPE_KEY)) { return if (jsonSchema[JSON_SCHEMA_TYPE_KEY].isArray) { - MoreIterators.toList(jsonSchema[JSON_SCHEMA_TYPE_KEY].iterator()) - .map { obj: JsonNode -> obj.asText() } - .toList() + MoreIterators.toList(jsonSchema[JSON_SCHEMA_TYPE_KEY].iterator()).map { + obj: JsonNode -> + obj.asText() + } } else { java.util.List.of(jsonSchema[JSON_SCHEMA_TYPE_KEY].asText()) } diff --git a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/validation/json/JsonSchemaValidator.kt b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/validation/json/JsonSchemaValidator.kt index 695ceae76b23..9ab5247b4e0b 100644 --- a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/validation/json/JsonSchemaValidator.kt +++ b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/validation/json/JsonSchemaValidator.kt @@ -93,15 +93,13 @@ class JsonSchemaValidator @VisibleForTesting constructor(private val baseUri: UR } fun getValidationMessageArgs(schemaJson: JsonNode, objectJson: JsonNode): List> { - return validateInternal(schemaJson, objectJson) - .map { obj: ValidationMessage -> obj.arguments } - .toList() + return validateInternal(schemaJson, objectJson).map { obj: ValidationMessage -> + obj.arguments + } } fun getValidationMessagePaths(schemaJson: JsonNode, objectJson: JsonNode): List { - return validateInternal(schemaJson, objectJson) - .map { obj: ValidationMessage -> obj.path } - .toList() + return validateInternal(schemaJson, objectJson).map { obj: ValidationMessage -> obj.path } } @Throws(JsonValidationException::class) diff --git a/airbyte-cdk/java/airbyte-cdk/dependencies/src/test/kotlin/io/airbyte/commons/concurrency/CompletableFuturesTest.kt b/airbyte-cdk/java/airbyte-cdk/dependencies/src/test/kotlin/io/airbyte/commons/concurrency/CompletableFuturesTest.kt index e2b95daef178..2914cae0aa16 100644 --- a/airbyte-cdk/java/airbyte-cdk/dependencies/src/test/kotlin/io/airbyte/commons/concurrency/CompletableFuturesTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/dependencies/src/test/kotlin/io/airbyte/commons/concurrency/CompletableFuturesTest.kt @@ -26,7 +26,7 @@ internal class CompletableFuturesTest { val allOfResult = CompletableFutures.allOf(futures).toCompletableFuture() val result = allOfResult.join() - val success = result.filter { obj: Either -> obj.isRight() }.toList() + val success = result.filter { obj: Either -> obj.isRight() } Assertions.assertEquals( success, Arrays.asList( @@ -41,7 +41,7 @@ internal class CompletableFuturesTest { result .filter { obj: Either -> obj.isLeft() } .map { either: Either -> either.left!!.cause!!.message } - .toList() + Assertions.assertEquals(failureMessages, mutableListOf("Fail 5", "Fail 6")) } diff --git a/airbyte-cdk/java/airbyte-cdk/dependencies/src/test/kotlin/io/airbyte/commons/json/JsonPathsTest.kt b/airbyte-cdk/java/airbyte-cdk/dependencies/src/test/kotlin/io/airbyte/commons/json/JsonPathsTest.kt index ca7f6c592613..6955f4dd0dfd 100644 --- a/airbyte-cdk/java/airbyte-cdk/dependencies/src/test/kotlin/io/airbyte/commons/json/JsonPathsTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/dependencies/src/test/kotlin/io/airbyte/commons/json/JsonPathsTest.kt @@ -15,21 +15,15 @@ internal class JsonPathsTest { fun testGetValues() { Assertions.assertEquals( listOf(0, 1, 2), - JsonPaths.getValues(JSON_NODE, LIST_ALL_QUERY) - .map { obj: JsonNode -> obj.asInt() } - .toList() + JsonPaths.getValues(JSON_NODE, LIST_ALL_QUERY).map { obj: JsonNode -> obj.asInt() } ) Assertions.assertEquals( listOf(1), - JsonPaths.getValues(JSON_NODE, LIST_ONE_QUERY) - .map { obj: JsonNode -> obj.asInt() } - .toList() + JsonPaths.getValues(JSON_NODE, LIST_ONE_QUERY).map { obj: JsonNode -> obj.asInt() } ) Assertions.assertEquals( listOf(10), - JsonPaths.getValues(JSON_NODE, NESTED_FIELD_QUERY) - .map { obj: JsonNode -> obj.asInt() } - .toList() + JsonPaths.getValues(JSON_NODE, NESTED_FIELD_QUERY).map { obj: JsonNode -> obj.asInt() } ) Assertions.assertEquals( JSON_NODE["two"], diff --git a/airbyte-cdk/java/airbyte-cdk/dependencies/src/test/kotlin/io/airbyte/commons/yaml/YamlsTest.kt b/airbyte-cdk/java/airbyte-cdk/dependencies/src/test/kotlin/io/airbyte/commons/yaml/YamlsTest.kt index c4b7057e7f50..546600acfc64 100644 --- a/airbyte-cdk/java/airbyte-cdk/dependencies/src/test/kotlin/io/airbyte/commons/yaml/YamlsTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/dependencies/src/test/kotlin/io/airbyte/commons/yaml/YamlsTest.kt @@ -109,9 +109,9 @@ internal class YamlsTest { Yamls.deserializeArray(input).use { iterator -> Assertions.assertEquals( classes, - MoreStreams.toStream(iterator) - .map { e: JsonNode -> Jsons.`object`(e, ToClass::class.java) } - .toList() + MoreStreams.toStream(iterator).map { e: JsonNode -> + Jsons.`object`(e, ToClass::class.java) + } ) } } catch (e: Exception) { diff --git a/airbyte-cdk/java/airbyte-cdk/dependencies/src/test/kotlin/io/airbyte/workers/internal/DefaultAirbyteStreamFactoryTest.kt b/airbyte-cdk/java/airbyte-cdk/dependencies/src/test/kotlin/io/airbyte/workers/internal/DefaultAirbyteStreamFactoryTest.kt index 257dabcf235f..a6f4c199819c 100644 --- a/airbyte-cdk/java/airbyte-cdk/dependencies/src/test/kotlin/io/airbyte/workers/internal/DefaultAirbyteStreamFactoryTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/dependencies/src/test/kotlin/io/airbyte/workers/internal/DefaultAirbyteStreamFactoryTest.kt @@ -41,7 +41,7 @@ internal class DefaultAirbyteStreamFactoryTest { val messageStream = stringToMessageStream(Jsons.serialize(record1)) val expectedStream = Stream.of(record1) - Assertions.assertEquals(expectedStream.toList(), messageStream.toList()) + Assertions.assertEquals(expectedStream, messageStream.toList()) Mockito.verifyNoInteractions(logger) } @@ -51,7 +51,7 @@ internal class DefaultAirbyteStreamFactoryTest { val messageStream = stringToMessageStream(invalidRecord) - Assertions.assertEquals(emptyList(), messageStream.toList()) + Assertions.assertEquals(emptyList(), messageStream) Mockito.verify(logger).info(ArgumentMatchers.anyString()) Mockito.verifyNoMoreInteractions(logger) } @@ -63,7 +63,7 @@ internal class DefaultAirbyteStreamFactoryTest { val messageStream = stringToMessageStream(Jsons.serialize(logMessage)) - Assertions.assertEquals(emptyList(), messageStream.toList()) + Assertions.assertEquals(emptyList(), messageStream) Mockito.verify(logger).warn("warning") Mockito.verifyNoMoreInteractions(logger) } @@ -76,7 +76,7 @@ internal class DefaultAirbyteStreamFactoryTest { val messageStream = stringToMessageStream(invalidRecord) - Assertions.assertEquals(emptyList(), messageStream.toList()) + Assertions.assertEquals(emptyList(), messageStream) Mockito.verify(logger).error(ArgumentMatchers.anyString(), ArgumentMatchers.anyString()) Mockito.verifyNoMoreInteractions(logger) } @@ -89,7 +89,7 @@ internal class DefaultAirbyteStreamFactoryTest { val messageStream = stringToMessageStream(invalidRecord) - Assertions.assertEquals(emptyList(), messageStream.toList()) + Assertions.assertEquals(emptyList(), messageStream) Mockito.verify(logger).error(ArgumentMatchers.anyString(), ArgumentMatchers.anyString()) Mockito.verifyNoMoreInteractions(logger) } @@ -112,7 +112,7 @@ internal class DefaultAirbyteStreamFactoryTest { ) .create(bufferedReader) - Assertions.assertThrows(RuntimeException::class.java) { messageStream.toList() } + Assertions.assertThrows(RuntimeException::class.java) { messageStream } } @Test @@ -125,7 +125,7 @@ internal class DefaultAirbyteStreamFactoryTest { val messageStream = stringToMessageStream(inputString) - Assertions.assertEquals(emptyList(), messageStream.toList()) + Assertions.assertEquals(emptyList(), messageStream) Mockito.verify(logger).error(ArgumentMatchers.anyString(), ArgumentMatchers.anyString()) Mockito.verifyNoMoreInteractions(logger) } diff --git a/airbyte-cdk/java/airbyte-cdk/dependencies/src/testFixtures/kotlin/io/airbyte/workers/helper/CatalogClientConverters.kt b/airbyte-cdk/java/airbyte-cdk/dependencies/src/testFixtures/kotlin/io/airbyte/workers/helper/CatalogClientConverters.kt index fbefb4d4b0fa..d26d784eb5f8 100644 --- a/airbyte-cdk/java/airbyte-cdk/dependencies/src/testFixtures/kotlin/io/airbyte/workers/helper/CatalogClientConverters.kt +++ b/airbyte-cdk/java/airbyte-cdk/dependencies/src/testFixtures/kotlin/io/airbyte/workers/helper/CatalogClientConverters.kt @@ -26,15 +26,13 @@ object CatalogClientConverters { fun toAirbyteProtocol(catalog: AirbyteCatalog): io.airbyte.protocol.models.AirbyteCatalog { val protoCatalog = io.airbyte.protocol.models.AirbyteCatalog() val airbyteStream = - catalog.streams - .map { stream: AirbyteStreamAndConfiguration -> - try { - return@map toConfiguredProtocol(stream.stream, stream.config) - } catch (e: JsonValidationException) { - return@map null - } + catalog.streams.map { stream: AirbyteStreamAndConfiguration -> + try { + return@map toConfiguredProtocol(stream.stream, stream.config) + } catch (e: JsonValidationException) { + return@map null } - .toList() + } protoCatalog.withStreams(airbyteStream) return protoCatalog @@ -140,7 +138,6 @@ object CatalogClientConverters { .stream(s) .config(generateDefaultConfiguration(s)) } - .toList() ) } diff --git a/airbyte-cdk/java/airbyte-cdk/dependencies/src/testFixtures/kotlin/io/airbyte/workers/helper/FailureHelper.kt b/airbyte-cdk/java/airbyte-cdk/dependencies/src/testFixtures/kotlin/io/airbyte/workers/helper/FailureHelper.kt index 1e9d4f401d97..2c00077e0d2f 100644 --- a/airbyte-cdk/java/airbyte-cdk/dependencies/src/testFixtures/kotlin/io/airbyte/workers/helper/FailureHelper.kt +++ b/airbyte-cdk/java/airbyte-cdk/dependencies/src/testFixtures/kotlin/io/airbyte/workers/helper/FailureHelper.kt @@ -158,7 +158,7 @@ object FailureHelper { } val compareByTimestamp = Comparator.comparing { obj: FailureReason -> obj.timestamp } val compareByTraceAndTimestamp = compareByIsTrace.thenComparing(compareByTimestamp) - return failures.sortedWith(compareByTraceAndTimestamp).toList() + return failures.sortedWith(compareByTraceAndTimestamp) } enum class ConnectorCommand(private val value: String) { diff --git a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsAvroParquetDestinationAcceptanceTest.kt b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsAvroParquetDestinationAcceptanceTest.kt index f9f066a5d234..95d8d98fc503 100644 --- a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsAvroParquetDestinationAcceptanceTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsAvroParquetDestinationAcceptanceTest.kt @@ -124,9 +124,9 @@ abstract class GcsAvroParquetDestinationAcceptanceTest(fileUploadFormat: FileUpl protected fun getTypes(record: GenericData.Record): Map> { val fieldList = - record.schema.fields - .filter { field: Schema.Field -> !field.name().startsWith("_airbyte") } - .toList() + record.schema.fields.filter { field: Schema.Field -> + !field.name().startsWith("_airbyte") + } return if (fieldList.size == 1) { fieldList.associate { diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3StorageOperations.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3StorageOperations.kt index 267710949141..07b3d83db9b7 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3StorageOperations.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3StorageOperations.kt @@ -320,7 +320,7 @@ open class S3StorageOperations( obj.key, ) } - .toList() + cleanUpObjects(bucket, keysToDelete) logger.info { "Storage bucket $objectPath has been cleaned-up (${keysToDelete.size} objects matching $regexFormat were deleted)..." @@ -371,7 +371,7 @@ open class S3StorageOperations( obj.key, ) } - .toList() + cleanUpObjects(bucket, keysToDelete) logger.info { "Storage bucket $objectPath has been cleaned-up (${keysToDelete.size} objects were deleted)..." @@ -391,7 +391,7 @@ open class S3StorageOperations( if (keysToDelete.isNotEmpty()) { logger.info { "Deleting objects ${keysToDelete.map { obj: DeleteObjectsRequest.KeyVersion -> obj.key } - .toList().joinToString(separator = ", ")}" + .joinToString(separator = ", ")}" } s3Client.deleteObjects(DeleteObjectsRequest(bucket).withKeys(keysToDelete)) } diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/avro/AvroNameTransformer.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/avro/AvroNameTransformer.kt index f526f95fb8d4..8b5750019d7e 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/avro/AvroNameTransformer.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/avro/AvroNameTransformer.kt @@ -5,7 +5,6 @@ package io.airbyte.cdk.integrations.destination.s3.avro import io.airbyte.cdk.integrations.destination.StandardNameTransformer -import java.util.Arrays import java.util.Locale /** @@ -30,13 +29,12 @@ class AvroNameTransformer : StandardNameTransformer() { override fun getNamespace(namespace: String): String { val tokens = namespace.split("\\.".toRegex()).dropLastWhile { it.isEmpty() }.toTypedArray() - return Arrays.stream(tokens) + return tokens .map { name: String -> this.getIdentifier( name, ) } - .toList() .joinToString(separator = ".") } } diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/avro/JsonSchemaType.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/avro/JsonSchemaType.kt index 70ebb36ff07f..f355ea6c4bb6 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/avro/JsonSchemaType.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/avro/JsonSchemaType.kt @@ -71,26 +71,24 @@ enum class JsonSchemaType { // Match by Type + airbyteType if (jsonSchemaAirbyteType != null) { matchSchemaType = - Arrays.stream(entries.toTypedArray()) + entries + .toTypedArray() .filter { type: JsonSchemaType -> jsonSchemaType == type.jsonSchemaType } .filter { type: JsonSchemaType -> jsonSchemaAirbyteType == type.jsonSchemaAirbyteType } - .toList() } // Match by Type are no results already if (matchSchemaType == null || matchSchemaType.isEmpty()) { matchSchemaType = - Arrays.stream(entries.toTypedArray()) - .filter { format: JsonSchemaType -> - jsonSchemaType == format.jsonSchemaType && - format.jsonSchemaAirbyteType == null - } - .toList() + entries.toTypedArray().filter { format: JsonSchemaType -> + jsonSchemaType == format.jsonSchemaType && + format.jsonSchemaAirbyteType == null + } } - require(!matchSchemaType!!.isEmpty()) { + require(!matchSchemaType.isEmpty()) { String.format( "Unexpected jsonSchemaType - %s and jsonSchemaAirbyteType - %s", jsonSchemaType, diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/avro/JsonToAvroSchemaConverter.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/avro/JsonToAvroSchemaConverter.kt index 254146e413ce..ea7642afe533 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/avro/JsonToAvroSchemaConverter.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/avro/JsonToAvroSchemaConverter.kt @@ -358,7 +358,6 @@ class JsonToAvroSchemaConverter { ) } .distinct() - .toList() return mergeRecordSchemas(fieldName, fieldNamespace, schemas, appendExtraProps) } @@ -490,7 +489,7 @@ class JsonToAvroSchemaConverter { .filter { s: Schema -> s != NULL_SCHEMA } } .distinct() - .toList() + val subfieldNamespace: String = if (fieldNamespace == null) fieldName else ("$fieldNamespace.$fieldName") // recursively merge schemas of a subfield because they may include multiple record @@ -611,7 +610,7 @@ class JsonToAvroSchemaConverter { type, )) } - return Schema.createUnion(unionTypes.filter { removeTimestampType.test(it) }.toList()) + return Schema.createUnion(unionTypes.filter { removeTimestampType.test(it) }) } companion object { @@ -628,9 +627,9 @@ class JsonToAvroSchemaConverter { @Suppress("DEPRECATION") fun getNonNullTypes(fieldName: String?, fieldDefinition: JsonNode): List { - return getTypes(fieldName, fieldDefinition) - .filter { type: JsonSchemaType -> type != JsonSchemaType.NULL } - .toList() + return getTypes(fieldName, fieldDefinition).filter { type: JsonSchemaType -> + type != JsonSchemaType.NULL + } } /** When no type or $ref are specified, it will default to string. */ @@ -645,13 +644,11 @@ class JsonToAvroSchemaConverter { val airbyteType: String? = fieldDefinition.get(AIRBYTE_TYPE)?.asText() if (typeProperty != null && typeProperty.isArray) { - return MoreIterators.toList(typeProperty.elements()) - .map { s: JsonNode -> - JsonSchemaType.fromJsonSchemaType( - s.asText(), - ) - } - .toList() + return MoreIterators.toList(typeProperty.elements()).map { s: JsonNode -> + JsonSchemaType.fromJsonSchemaType( + s.asText(), + ) + } } if (hasTextValue(typeProperty)) { diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/csv/RootLevelFlatteningSheetGenerator.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/csv/RootLevelFlatteningSheetGenerator.kt index 5a9957463abe..cbcd4c96e468 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/csv/RootLevelFlatteningSheetGenerator.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/csv/RootLevelFlatteningSheetGenerator.kt @@ -19,7 +19,6 @@ class RootLevelFlatteningSheetGenerator(jsonSchema: JsonNode) : jsonSchema["properties"].fieldNames(), ) .sorted() - .toList() override fun getHeaderRow(): List { val headers: MutableList = diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/test/kotlin/io/airbyte/cdk/integrations/destination/s3/parquet/ParquetSerializedBufferTest.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/test/kotlin/io/airbyte/cdk/integrations/destination/s3/parquet/ParquetSerializedBufferTest.kt index c13f44a96a87..2cd872a16912 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/test/kotlin/io/airbyte/cdk/integrations/destination/s3/parquet/ParquetSerializedBufferTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/test/kotlin/io/airbyte/cdk/integrations/destination/s3/parquet/ParquetSerializedBufferTest.kt @@ -42,7 +42,7 @@ class ParquetSerializedBufferTest { "column2" to "string value", "another field" to true, "nested_column" to mapOf("array_column" to listOf(1, 2, 3)), - "string_array_column" to Stream.of("test_string", null).toList(), + "string_array_column" to Stream.of("test_string", null), "datetime_with_timezone" to "2022-05-12T15:35:44.192950Z", ), ) diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/s3/S3AvroParquetDestinationAcceptanceTest.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/s3/S3AvroParquetDestinationAcceptanceTest.kt index c6fb4112902e..94a8a5960b45 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/s3/S3AvroParquetDestinationAcceptanceTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/s3/S3AvroParquetDestinationAcceptanceTest.kt @@ -98,11 +98,9 @@ protected constructor(fileUploadFormat: FileUploadFormat) : @Throws(IOException::class) private fun readMessagesFromFile(messagesFilename: String): List { - return MoreResources.readResource(messagesFilename) - .trim() - .lines() - .map { record -> Jsons.deserialize(record, AirbyteMessage::class.java) } - .toList() + return MoreResources.readResource(messagesFilename).trim().lines().map { record -> + Jsons.deserialize(record, AirbyteMessage::class.java) + } } @Throws(Exception::class) @@ -113,9 +111,9 @@ protected constructor(fileUploadFormat: FileUploadFormat) : protected fun getTypes(record: GenericData.Record): Map> { val fieldList = - record.schema.fields - .filter { field: Schema.Field -> !field.name().startsWith("_airbyte") } - .toList() + record.schema.fields.filter { field: Schema.Field -> + !field.name().startsWith("_airbyte") + } return if (fieldList.size == 1) { fieldList.associate { diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/s3/S3DestinationAcceptanceTest.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/s3/S3DestinationAcceptanceTest.kt index f2a6bb1f863b..99d6eaf6d5f4 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/s3/S3DestinationAcceptanceTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/s3/S3DestinationAcceptanceTest.kt @@ -91,12 +91,12 @@ protected constructor(protected val outputFormat: FileUploadFormat) : Destinatio .objectSummaries .filter { o: S3ObjectSummary -> o.key.contains("$streamNameStr/") } .sortedWith(Comparator.comparingLong { o: S3ObjectSummary -> o.lastModified.time }) - .toList() + LOGGER.info( "All objects: {}", - objectSummaries - .map { o: S3ObjectSummary -> String.format("%s/%s", o.bucketName, o.key) } - .toList(), + objectSummaries.map { o: S3ObjectSummary -> + String.format("%s/%s", o.bucketName, o.key) + }, ) return objectSummaries } diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/AirbyteType.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/AirbyteType.kt index 70eb45d0474a..e0d7412c4c21 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/AirbyteType.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/AirbyteType.kt @@ -107,11 +107,10 @@ interface AirbyteType { // Recurse into a schema that forces a specific one of each option val options = - typeOptions - .map { typeOption: String -> - fromJsonSchema(getTrimmedJsonSchema(schema, typeOption)) - } - .toList() + typeOptions.map { typeOption: String -> + fromJsonSchema(getTrimmedJsonSchema(schema, typeOption)) + } + return Union(options) } diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/CatalogParser.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/CatalogParser.kt index f5e2e74e0037..2847c3ad666d 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/CatalogParser.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/CatalogParser.kt @@ -117,9 +117,7 @@ constructor( "Only top-level primary keys are supported" } val primaryKey = - stream.primaryKey - .map { key: List -> sqlGenerator.buildColumnId(key[0]) } - .toList() + stream.primaryKey.map { key: List -> sqlGenerator.buildColumnId(key[0]) } require(stream.cursorField.size <= 1) { "Only top-level cursors are supported" } val cursor: Optional = diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/DefaultTyperDeduper.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/DefaultTyperDeduper.kt index 110703b03fff..762ff471c5eb 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/DefaultTyperDeduper.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/DefaultTyperDeduper.kt @@ -155,7 +155,7 @@ class DefaultTyperDeduper( val prepareTablesFutureResult = CompletableFutures.allOf( - destinationInitialStatuses.map { this.prepareTablesFuture(it) }.toList() + destinationInitialStatuses.map { this.prepareTablesFuture(it) } ) .toCompletableFuture() .join() diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/FutureUtils.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/FutureUtils.kt index 0e09e029a9ac..4fa0d2d62a45 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/FutureUtils.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/FutureUtils.kt @@ -36,7 +36,7 @@ object FutureUtils { .map { obj: CompletableFuture> -> obj.join() } .filter { obj: Optional -> obj.isPresent } .map { obj: Optional -> obj.get() } - .toList() + logAllAndThrowFirst(initialMessage, exceptions) } } diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/Sql.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/Sql.kt index 642c9ac77df0..17c890f67179 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/Sql.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/Sql.kt @@ -4,7 +4,6 @@ package io.airbyte.integrations.base.destination.typing_deduping import java.util.function.Consumer -import java.util.stream.Stream /** * Represents a list of SQL transactions, where each transaction consists of one or more SQL @@ -25,27 +24,25 @@ data class Sql(val transactions: List>) { * @return A list of SQL strings, each of which represents a transaction. */ fun asSqlStrings(begin: String?, commit: String?): List { - return transactions - .map { transaction: List -> - // If there's only one statement, we don't need to wrap it in a transaction. - if (transaction.size == 1) { - return@map transaction[0] - } - val builder = StringBuilder() - builder.append(begin) - builder.append(";\n") - transaction.forEach( - Consumer { statement: String -> - builder.append(statement) - // No semicolon - statements already end with a semicolon - builder.append("\n") - } - ) - builder.append(commit) - builder.append(";\n") - builder.toString() + return transactions.map { transaction: List -> + // If there's only one statement, we don't need to wrap it in a transaction. + if (transaction.size == 1) { + return@map transaction[0] } - .toList() + val builder = StringBuilder() + builder.append(begin) + builder.append(";\n") + transaction.forEach( + Consumer { statement: String -> + builder.append(statement) + // No semicolon - statements already end with a semicolon + builder.append("\n") + } + ) + builder.append(commit) + builder.append(";\n") + builder.toString() + } } init { @@ -74,12 +71,12 @@ data class Sql(val transactions: List>) { /** Execute each statement as its own transaction. */ @JvmStatic fun separately(statements: List): Sql { - return create(statements.map { listOf(it) }.toList()) + return create(statements.map { listOf(it) }) } @JvmStatic fun separately(vararg statements: String): Sql { - return separately(Stream.of(*statements).toList()) + return separately(*statements) } /** @@ -118,10 +115,8 @@ data class Sql(val transactions: List>) { } statement } - .toList() } .filter { transaction: List -> !transaction.isEmpty() } - .toList() ) } } diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/TyperDeduperUtil.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/TyperDeduperUtil.kt index 4167aa8d25ac..f43b4347b708 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/TyperDeduperUtil.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/TyperDeduperUtil.kt @@ -134,7 +134,7 @@ class TyperDeduperUtil { } getResultsOrLogAndThrowFirst( "The following exceptions were thrown attempting to run migrations:\n", - CompletableFutures.allOf(futures.toList()).toCompletableFuture().join() + CompletableFutures.allOf(futures).toCompletableFuture().join() ) } diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/typing_deduping/CatalogParserTest.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/typing_deduping/CatalogParserTest.kt index 563daa146ecb..d9bc54ae396f 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/typing_deduping/CatalogParserTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/typing_deduping/CatalogParserTest.kt @@ -118,6 +118,7 @@ internal class CatalogParserTest { val catalog = ConfiguredAirbyteCatalog().withStreams(List.of(stream("a", "a", schema))) val parsedCatalog = parser.parseCatalog(catalog) + // Fishy fishy! val columnsList = parsedCatalog.streams[0].columns!!.keys.toList() assertAll( @@ -156,6 +157,7 @@ internal class CatalogParserTest { val catalog = ConfiguredAirbyteCatalog().withStreams(listOf(stream("a", "a", schema))) val parsedCatalog = parser.parseCatalog(catalog) + // Fishy fishy! val columnsList = parsedCatalog.streams[0].columns!!.keys.toList() assertAll( diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseTypingDedupingTest.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseTypingDedupingTest.kt index 9e4aff3fc523..1e2171436719 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseTypingDedupingTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseTypingDedupingTest.kt @@ -888,7 +888,7 @@ abstract class BaseTypingDedupingTest { } private fun repeatList(n: Int, list: List): List { - return Collections.nCopies(n, list).flatMap { obj: List -> obj }.toList() + return Collections.nCopies(n, list).flatMap { obj: List -> obj } } @Throws(Exception::class) @@ -1080,7 +1080,6 @@ abstract class BaseTypingDedupingTest { .filter { line: String -> !line.isEmpty() } .filter { line: String -> !line.startsWith("//") } .map { jsonString: String -> Jsons.deserializeExact(jsonString) } - .toList() } @Throws(IOException::class) diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/RecordDiffer.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/RecordDiffer.kt index a398faa1a1bb..cbaf9ab51be1 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/RecordDiffer.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/RecordDiffer.kt @@ -89,10 +89,8 @@ constructor( fun diffRawTableRecords(expectedRecords: List, actualRecords: List) { val diff = diffRecords( - expectedRecords - .map { record: JsonNode -> this.copyWithLiftedData(record) } - .toList(), - actualRecords.map { record: JsonNode -> this.copyWithLiftedData(record) }.toList(), + expectedRecords.map { record: JsonNode -> this.copyWithLiftedData(record) }, + actualRecords.map { record: JsonNode -> this.copyWithLiftedData(record) }, rawRecordIdentityComparator, rawRecordSortComparator, rawRecordIdentityExtractor, @@ -276,7 +274,7 @@ constructor( "Row had incorrect data: " + recordIdExtractor.apply(expectedRecord) + "\n" // Iterate through each column in the expected record and compare it to the actual record's // value. - for (column in Streams.stream(expectedRecord.fieldNames()).sorted().toList()) { + for (column in Streams.stream(expectedRecord.fieldNames()).sorted()) { // For all other columns, we can just compare their values directly. val expectedValue = expectedRecord[column] val actualValue = actualRecord[column] @@ -316,7 +314,7 @@ constructor( columnNames: Map ): LinkedHashMap { val extraFields = LinkedHashMap() - for (column in Streams.stream(actualRecord.fieldNames()).sorted().toList()) { + for (column in Streams.stream(actualRecord.fieldNames()).sorted()) { // loaded_at and raw_id are generated dynamically, so we just ignore them. val isLoadedAt = getMetadataColumnName(columnNames, "_airbyte_loaded_at") == column val isRawId = getMetadataColumnName(columnNames, "_airbyte_raw_id") == column