From b0ab1483af5e699fb6904b914c840cfab35ca2f8 Mon Sep 17 00:00:00 2001 From: Stephane Geneix <147216312+stephane-airbyte@users.noreply.github.com> Date: Thu, 23 May 2024 11:01:45 -0700 Subject: [PATCH] replace all java Collectors.toList with kotlin construct (#37537) more kotlin cleanup --- .../destination/StandardNameTransformer.kt | 3 +-- .../destination/async/DetectStreamToFlush.kt | 2 +- .../concurrent/ConcurrentStreamConsumer.kt | 3 +-- .../io/airbyte/cdk/db/jdbc/TestJdbcUtils.kt | 3 +-- .../base/IntegrationRunnerTest.kt | 5 ++-- .../async/AsyncStreamConsumerTest.kt | 3 +-- .../BufferedStreamConsumerTest.kt | 16 +++++------- .../cdk/db/bigquery/BigQueryDatabase.kt | 3 +-- .../jdbc/JdbcBufferedConsumerFactory.kt | 11 +++----- .../jdbc/typing_deduping/JdbcSqlGenerator.kt | 13 +++++----- .../staging/SerialStagingConsumerFactory.kt | 3 +-- .../destination/DestinationAcceptanceTest.kt | 11 ++++---- .../source/jdbc/AbstractJdbcSource.kt | 2 +- .../source/relationaldb/AbstractDbSource.kt | 6 ++--- .../relationaldb/DbSourceDiscoverUtil.kt | 11 ++++---- .../relationaldb/RelationalDbReadUtil.kt | 4 +-- .../relationaldb/state/StateGeneratorUtils.kt | 9 +++---- .../relationaldb/state/StreamStateManager.kt | 5 +--- .../state/GlobalStateManagerTest.kt | 11 ++++---- .../state/LegacyStateManagerTest.kt | 11 ++++---- .../state/StreamStateManagerTest.kt | 7 +++-- .../integrations/debezium/CdcSourceTest.kt | 6 ++--- .../jdbc/test/JdbcSourceAcceptanceTest.kt | 26 +++++++++---------- .../source/AbstractSourceDatabaseTypeTest.kt | 4 +-- .../source/SourceAcceptanceTest.kt | 11 ++++---- .../io/airbyte/commons/json/JsonPaths.kt | 3 +-- .../io/airbyte/commons/json/JsonSchemas.kt | 5 ++-- .../kotlin/io/airbyte/commons/json/Jsons.kt | 3 +-- .../validation/json/JsonSchemaValidator.kt | 4 +-- .../io/airbyte/commons/json/JsonPathsTest.kt | 7 +++-- .../io/airbyte/commons/yaml/YamlsTest.kt | 3 +-- .../DefaultAirbyteStreamFactoryTest.kt | 16 +++++------- .../workers/helper/CatalogClientConverters.kt | 4 +-- .../gcs/GcsDestinationAcceptanceTest.kt | 5 ++-- .../destination/s3/S3ConsumerFactory.kt | 6 +---- .../s3/avro/JsonToAvroSchemaConverter.kt | 13 +++++----- .../csv/RootLevelFlatteningSheetGenerator.kt | 3 +-- .../s3/S3DestinationAcceptanceTest.kt | 5 ++-- .../typing_deduping/BaseTypingDedupingTest.kt | 3 +-- .../typing_deduping/RecordDiffer.kt | 4 +-- 40 files changed, 116 insertions(+), 157 deletions(-) diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/StandardNameTransformer.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/StandardNameTransformer.kt index 88cc094bae49..e403a688f30b 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/StandardNameTransformer.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/StandardNameTransformer.kt @@ -8,7 +8,6 @@ import io.airbyte.commons.json.Jsons import io.airbyte.commons.string.Strings import io.airbyte.commons.text.Names import io.airbyte.commons.util.MoreIterators -import java.util.stream.Collectors open class StandardNameTransformer : NamingConventionTransformer { override fun getIdentifier(name: String): String { @@ -77,7 +76,7 @@ open class StandardNameTransformer : NamingConventionTransformer { MoreIterators.toList(root.elements()) .stream() .map { r: JsonNode -> formatJsonPath(r) } - .collect(Collectors.toList()) + .toList() ) } else { return root diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/DetectStreamToFlush.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/DetectStreamToFlush.kt index 4e65a6d9826d..69d8fc255c19 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/DetectStreamToFlush.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/DetectStreamToFlush.kt @@ -273,7 +273,7 @@ internal constructor( } .thenComparing { s: StreamDescriptor -> s.namespace + s.name }, ) - .collect(Collectors.toList()) + .toList() } companion object { diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/util/concurrent/ConcurrentStreamConsumer.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/util/concurrent/ConcurrentStreamConsumer.kt index 2fec67748569..444b619af41f 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/util/concurrent/ConcurrentStreamConsumer.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/util/concurrent/ConcurrentStreamConsumer.kt @@ -13,7 +13,6 @@ import java.util.* import java.util.concurrent.* import java.util.concurrent.ThreadPoolExecutor.AbortPolicy import java.util.function.Consumer -import java.util.stream.Collectors import kotlin.math.min import org.slf4j.Logger import org.slf4j.LoggerFactory @@ -85,7 +84,7 @@ class ConcurrentStreamConsumer( .map { runnable: ConcurrentStreamRunnable -> CompletableFuture.runAsync(runnable, executorService) } - .collect(Collectors.toList()) + .toList() /* * Wait for the submitted streams to complete before returning. This uses the join() method to allow diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/db/jdbc/TestJdbcUtils.kt b/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/db/jdbc/TestJdbcUtils.kt index ed15fe586a8a..7f57d453d328 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/db/jdbc/TestJdbcUtils.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/db/jdbc/TestJdbcUtils.kt @@ -20,7 +20,6 @@ import io.airbyte.commons.string.Strings import io.airbyte.protocol.models.JsonSchemaType import java.math.BigDecimal import java.sql.* -import java.util.stream.Collectors import javax.sql.DataSource import org.bouncycastle.util.encoders.Base64 import org.junit.jupiter.api.Assertions @@ -122,7 +121,7 @@ internal class TestJdbcUtils { JdbcDatabase.toUnsafeStream(rs) { queryContext: ResultSet -> sourceOperations.rowToJson(queryContext) } - .collect(Collectors.toList()) + .toList() Assertions.assertEquals(RECORDS_AS_JSON, actual) } } diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/base/IntegrationRunnerTest.kt b/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/base/IntegrationRunnerTest.kt index 3a44a4a7ed5c..ab356b85e5a9 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/base/IntegrationRunnerTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/base/IntegrationRunnerTest.kt @@ -24,7 +24,6 @@ import java.util.concurrent.Executors import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicBoolean import java.util.function.Consumer -import java.util.stream.Collectors import org.apache.commons.lang3.ThreadUtils import org.assertj.core.api.AssertionsForClassTypes import org.junit.jupiter.api.Assertions @@ -474,7 +473,7 @@ ${Jsons.serialize(message2)}""".toByteArray( ThreadUtils.getAllThreads() .stream() .filter(IntegrationRunner::filterOrphanedThread) - .collect(Collectors.toList()) + .toList() // all threads should be interrupted Assertions.assertEquals(listOf(), runningThreads) Assertions.assertEquals(1, caughtExceptions.size) @@ -502,7 +501,7 @@ ${Jsons.serialize(message2)}""".toByteArray( ThreadUtils.getAllThreads() .stream() .filter(IntegrationRunner::filterOrphanedThread) - .collect(Collectors.toList()) + .toList() // a thread that refuses to be interrupted should remain Assertions.assertEquals(1, runningThreads.size) Assertions.assertEquals(1, caughtExceptions.size) diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/destination/async/AsyncStreamConsumerTest.kt b/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/destination/async/AsyncStreamConsumerTest.kt index 3d93057fe5a3..3ebe3e1a1004 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/destination/async/AsyncStreamConsumerTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/destination/async/AsyncStreamConsumerTest.kt @@ -37,7 +37,6 @@ import java.util.concurrent.TimeUnit import java.util.concurrent.TimeoutException import java.util.concurrent.atomic.AtomicLong import java.util.function.Consumer -import java.util.stream.Collectors import java.util.stream.Stream import org.apache.commons.lang3.RandomStringUtils import org.junit.jupiter.api.Assertions.assertEquals @@ -574,7 +573,7 @@ class AsyncStreamConsumerTest { ), ) } - .collect(Collectors.toList()) + .toList() assertEquals(expRecords, actualRecords) } } diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/destination/buffered_stream_consumer/BufferedStreamConsumerTest.kt b/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/destination/buffered_stream_consumer/BufferedStreamConsumerTest.kt index 1202c94e1b9d..4eb6dc79d1bf 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/destination/buffered_stream_consumer/BufferedStreamConsumerTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/destination/buffered_stream_consumer/BufferedStreamConsumerTest.kt @@ -17,7 +17,6 @@ import java.time.Instant import java.util.* import java.util.concurrent.TimeUnit import java.util.function.Consumer -import java.util.stream.Collectors import java.util.stream.Stream import org.apache.commons.lang3.RandomStringUtils import org.junit.jupiter.api.Assertions @@ -150,7 +149,7 @@ class BufferedStreamConsumerTest { Lists.newArrayList(expectedRecordsBatch1, expectedRecordsBatch2) .stream() .flatMap { obj: List -> obj.stream() } - .collect(Collectors.toList()) + .toList() verifyRecords(STREAM_NAME, SCHEMA_NAME, expectedRecords) Mockito.verify(outputRecordCollector).accept(STATE_MESSAGE1) @@ -241,7 +240,7 @@ class BufferedStreamConsumerTest { .stream() .map { `object`: AirbyteMessage -> Jsons.clone(`object`) } .peek { m: AirbyteMessage -> m.record.withStream(STREAM_NAME2) } - .collect(Collectors.toList()) + .toList() consumer.start() consumeRecords(consumer, expectedRecordsStream1) @@ -266,7 +265,7 @@ class BufferedStreamConsumerTest { .stream() .map { `object`: AirbyteMessage -> Jsons.clone(`object`) } .peek { m: AirbyteMessage -> m.record.withStream(STREAM_NAME2) } - .collect(Collectors.toList()) + .toList() consumer.start() consumeRecords(consumer, expectedRecordsStream1) @@ -310,7 +309,7 @@ class BufferedStreamConsumerTest { STREAM_NAME, SCHEMA_NAME, Stream.concat(expectedRecordsStream1.stream(), expectedRecordsStream1Batch2.stream()) - .collect(Collectors.toList()) + .toList() ) Mockito.verify(outputRecordCollector).accept(STATE_MESSAGE1) } @@ -345,7 +344,7 @@ class BufferedStreamConsumerTest { STREAM_NAME, SCHEMA_NAME, Stream.concat(expectedRecordsStream1.stream(), expectedRecordsStream1Batch2.stream()) - .collect(Collectors.toList()) + .toList() ) verifyRecords(STREAM_NAME, SCHEMA_NAME, expectedRecordsStream1Batch3) // expects two STATE messages returned since one will be flushed after periodic flushing @@ -589,10 +588,7 @@ class BufferedStreamConsumerTest { Mockito.verify(recordWriter) .accept( AirbyteStreamNameNamespacePair(streamName, namespace), - expectedRecords - .stream() - .map { obj: AirbyteMessage -> obj.record } - .collect(Collectors.toList()) + expectedRecords.stream().map { obj: AirbyteMessage -> obj.record }.toList() ) } diff --git a/airbyte-cdk/java/airbyte-cdk/datastore-bigquery/src/main/kotlin/io/airbyte/cdk/db/bigquery/BigQueryDatabase.kt b/airbyte-cdk/java/airbyte-cdk/datastore-bigquery/src/main/kotlin/io/airbyte/cdk/db/bigquery/BigQueryDatabase.kt index 160445420e39..d123206000e5 100644 --- a/airbyte-cdk/java/airbyte-cdk/datastore-bigquery/src/main/kotlin/io/airbyte/cdk/db/bigquery/BigQueryDatabase.kt +++ b/airbyte-cdk/java/airbyte-cdk/datastore-bigquery/src/main/kotlin/io/airbyte/cdk/db/bigquery/BigQueryDatabase.kt @@ -16,7 +16,6 @@ import java.io.IOException import java.sql.SQLException import java.util.* import java.util.function.Consumer -import java.util.stream.Collectors import java.util.stream.Stream import org.apache.commons.lang3.StringUtils import org.apache.commons.lang3.tuple.ImmutablePair @@ -106,7 +105,7 @@ constructor( .setType(StandardSQLTypeName.STRING) .build() } - .collect(Collectors.toList()) + .toList() return query(sql, parameterValueList) } diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/JdbcBufferedConsumerFactory.kt b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/JdbcBufferedConsumerFactory.kt index 489f017c0435..dcf3dd3d1230 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/JdbcBufferedConsumerFactory.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/JdbcBufferedConsumerFactory.kt @@ -33,7 +33,6 @@ import java.util.* import java.util.concurrent.Executors import java.util.function.Consumer import java.util.function.Function -import java.util.stream.Collectors import org.slf4j.Logger import org.slf4j.LoggerFactory @@ -111,15 +110,13 @@ object JdbcBufferedConsumerFactory { return if (parsedCatalog == null) { catalog!! .streams - .stream() - .map(toWriteConfig(namingResolver, config, schemaRequired)) - .collect(Collectors.toList()) + .map { toWriteConfig(namingResolver, config, schemaRequired).apply(it) } + .toList() } else { // we should switch this to kotlin-style list processing, but meh for now parsedCatalog.streams - .stream() - .map(parsedStreamToWriteConfig(namingResolver)) - .collect(Collectors.toList()) + .map { parsedStreamToWriteConfig(namingResolver).apply(it) } + .toList() } } diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcSqlGenerator.kt b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcSqlGenerator.kt index 5194940c498e..0430588c9bc5 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcSqlGenerator.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcSqlGenerator.kt @@ -23,9 +23,10 @@ import io.airbyte.integrations.base.destination.typing_deduping.UnsupportedOneOf import io.airbyte.protocol.models.v0.DestinationSyncMode import java.sql.Timestamp import java.time.Instant -import java.util.Locale -import java.util.Optional -import java.util.stream.Collectors +import java.util.* +import kotlin.Any +import kotlin.Boolean +import kotlin.IllegalArgumentException import kotlin.Int import org.jooq.Condition import org.jooq.DSLContext @@ -176,14 +177,14 @@ constructor( .map { metaColumn: Map.Entry?> -> DSL.field(DSL.quotedName(metaColumn.key), metaColumn.value) } - .collect(Collectors.toList()) + .toList() val dataFields = columns.entries .stream() .map { column: Map.Entry -> DSL.field(DSL.quotedName(column.key!!.name), toDialectType(column.value)) } - .collect(Collectors.toList()) + .toList() dataFields.addAll(fields) return dataFields } @@ -227,7 +228,7 @@ constructor( .map { metaColumn: Map.Entry?> -> DSL.field(DSL.quotedName(metaColumn.key), metaColumn.value) } - .collect(Collectors.toList()) + .toList() // Use originalName with non-sanitized characters when extracting data from _airbyte_data val dataFields = extractRawDataFields(columns, useExpensiveSaferCasting) dataFields.addAll(fields) diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/SerialStagingConsumerFactory.kt b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/SerialStagingConsumerFactory.kt index 37a495e1f7aa..0cba3ecd68e0 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/SerialStagingConsumerFactory.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/SerialStagingConsumerFactory.kt @@ -22,7 +22,6 @@ import java.time.Instant import java.util.UUID import java.util.function.Consumer import java.util.function.Function -import java.util.stream.Collectors import org.slf4j.Logger import org.slf4j.LoggerFactory @@ -125,7 +124,7 @@ open class SerialStagingConsumerFactory { return catalog.streams .stream() .map(toWriteConfig(namingResolver, config, parsedCatalog, useDestinationsV2Columns)) - .collect(Collectors.toList()) + .toList() } private fun toWriteConfig( diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/DestinationAcceptanceTest.kt b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/DestinationAcceptanceTest.kt index d2a20448f3d3..ebec3bfa7954 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/DestinationAcceptanceTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/DestinationAcceptanceTest.kt @@ -62,7 +62,6 @@ import java.util.* import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicInteger import java.util.function.Consumer -import java.util.stream.Collectors import java.util.stream.Stream import kotlin.test.assertNotNull import org.junit.jupiter.api.* @@ -486,7 +485,7 @@ abstract class DestinationAcceptanceTest { else message.toString() } ) - .collect(Collectors.toList()) + .toList() val config = getConfig() runSyncAndVerifyStateOutput(config, largeNumberRecords, configuredCatalog, false) @@ -861,7 +860,7 @@ abstract class DestinationAcceptanceTest { } message } - .collect(Collectors.toList()) + .toList() assertSameMessages(expectedMessages, actualMessages, true) } @@ -1031,7 +1030,7 @@ abstract class DestinationAcceptanceTest { it.record.data["NZD"].asText() (it.record.emittedAt == latestMessagesOnly[key]!!.record.emittedAt) } - .collect(Collectors.toList()) + .toList() val defaultSchema = getDefaultSchema(config) retrieveRawRecordsAndAssertSameMessages( @@ -1754,7 +1753,7 @@ abstract class DestinationAcceptanceTest { if (pruneAirbyteInternalFields) safePrune(recordMessage) else recordMessage } .map { obj: AirbyteRecordMessage -> obj.data } - .collect(Collectors.toList()) + .toList() val actualProcessed = actual @@ -1763,7 +1762,7 @@ abstract class DestinationAcceptanceTest { if (pruneAirbyteInternalFields) safePrune(recordMessage) else recordMessage } .map { obj: AirbyteRecordMessage -> obj.data } - .collect(Collectors.toList()) + .toList() _testDataComparator.assertSameData(expectedProcessed, actualProcessed) } diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/jdbc/AbstractJdbcSource.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/jdbc/AbstractJdbcSource.kt index 132e27dbfa83..4756f2aa158e 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/jdbc/AbstractJdbcSource.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/jdbc/AbstractJdbcSource.kt @@ -771,7 +771,7 @@ abstract class AbstractJdbcSource( ) } .map { `object`: ConfiguredAirbyteStream -> Jsons.clone(`object`) } - .collect(Collectors.toList()) + .toList() } companion object { diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/AbstractDbSource.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/AbstractDbSource.kt index 0afcd6fcae0f..ca315cfce1a3 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/AbstractDbSource.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/AbstractDbSource.kt @@ -176,7 +176,7 @@ protected constructor(driverClassName: String) : val iteratorList = Stream.of(incrementalIterators, fullRefreshIterators) .flatMap(Collection>::stream) - .collect(Collectors.toList()) + .toList() return AutoCloseableIterators.appendOnClose( AutoCloseableIterators.concatWithEagerClose( @@ -318,7 +318,7 @@ protected constructor(driverClassName: String) : .filter { table: TableInfo> -> !systemNameSpaces.contains(table.nameSpace) && !systemViews.contains(table.name) } - .collect(Collectors.toList())) + .toList()) } protected fun getFullRefreshIterators( @@ -434,7 +434,7 @@ protected constructor(driverClassName: String) : .stream() .map { obj: CommonField -> obj.name } .filter { o: String -> selectedFieldsInCatalog.contains(o) } - .collect(Collectors.toList()) + .toList() val iterator: AutoCloseableIterator // checks for which sync mode we're using based on the configured airbytestream diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/DbSourceDiscoverUtil.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/DbSourceDiscoverUtil.kt index c8426abad4f3..40c99432deef 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/DbSourceDiscoverUtil.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/DbSourceDiscoverUtil.kt @@ -49,7 +49,7 @@ object DbSourceDiscoverUtil { toField(commonField, airbyteTypeConverter) } .distinct() - .collect(Collectors.toList()) + .toList() val currentJsonSchema = CatalogHelpers.fieldsToJsonSchema(fields) val catalogSchema = stream.jsonSchema val currentSchemaProperties = currentJsonSchema["properties"] @@ -105,7 +105,7 @@ object DbSourceDiscoverUtil { toField(commonField, airbyteTypeConverter) } .distinct() - .collect(Collectors.toList()) + .toList() val fullyQualifiedTableName = getFullyQualifiedTableName(t.nameSpace, t.name) val primaryKeys = fullyQualifiedTableNameToPrimaryKeys.getOrDefault( @@ -120,11 +120,10 @@ object DbSourceDiscoverUtil { cursorFields = t.cursorFields ) } - .collect(Collectors.toList()) + .toList() val streams = tableInfoFieldList - .stream() .map { tableInfo: TableInfo -> val primaryKeys = tableInfo.primaryKeys @@ -144,7 +143,9 @@ object DbSourceDiscoverUtil { ) .withSourceDefinedPrimaryKey(primaryKeys) } - .collect(Collectors.toList()) + // This is ugly. Some of our tests change the streams on the AirbyteCatalog + // object... + .toMutableList() return AirbyteCatalog().withStreams(streams) } diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/RelationalDbReadUtil.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/RelationalDbReadUtil.kt index fd8c5d79a565..2ea21ddcc7ac 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/RelationalDbReadUtil.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/RelationalDbReadUtil.kt @@ -28,7 +28,7 @@ object RelationalDbReadUtil { ) } .map { `object`: ConfiguredAirbyteStream -> Jsons.clone(`object`) } - .collect(Collectors.toList()) + .toList() } @JvmStatic @@ -52,7 +52,7 @@ object RelationalDbReadUtil { ) } .map { `object`: ConfiguredAirbyteStream -> Jsons.clone(`object`) } - .collect(Collectors.toList()) + .toList() } @JvmStatic diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/StateGeneratorUtils.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/StateGeneratorUtils.kt index ae9cf1380f46..0d249b11c46a 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/StateGeneratorUtils.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/StateGeneratorUtils.kt @@ -16,7 +16,6 @@ import io.airbyte.configoss.helpers.StateMessageHelper import io.airbyte.protocol.models.v0.* import java.util.* import java.util.function.Function -import java.util.stream.Collectors import org.slf4j.Logger import org.slf4j.LoggerFactory @@ -102,7 +101,7 @@ object StateGeneratorUtils { generateStreamState(e.key, e.value) } .filter { s: AirbyteStreamState -> isValidStreamDescriptor(s.streamDescriptor) } - .collect(Collectors.toList()) + .toList() } /** @@ -126,7 +125,7 @@ object StateGeneratorUtils { .map { e: Map.Entry -> generateDbStreamState(e.key, e.value) } - .collect(Collectors.toList()) + .toList() ) } @@ -217,7 +216,7 @@ object StateGeneratorUtils { ) .withStreamState(Jsons.jsonNode(s)) } - .collect(Collectors.toList()) + .toList() ) return AirbyteStateMessage() .withType(AirbyteStateMessage.AirbyteStateType.GLOBAL) @@ -250,7 +249,7 @@ object StateGeneratorUtils { .withStreamState(Jsons.jsonNode(s)) ) } - .collect(Collectors.toList()) + .toList() } fun convertStateMessage( diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/StreamStateManager.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/StreamStateManager.kt index 6de23aaddf9f..570f8b805cf4 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/StreamStateManager.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/StreamStateManager.kt @@ -11,7 +11,6 @@ import io.airbyte.protocol.models.v0.AirbyteStreamState import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog import java.util.* import java.util.function.Supplier -import java.util.stream.Collectors import org.slf4j.Logger import org.slf4j.LoggerFactory @@ -35,9 +34,7 @@ open class StreamStateManager ) : AbstractStateManager( catalog, - Supplier { - rawAirbyteStateMessages.stream().map { it.stream }.collect(Collectors.toList()) - }, + Supplier { rawAirbyteStateMessages.stream().map { it.stream }.toList() }, StateGeneratorUtils.CURSOR_FUNCTION, StateGeneratorUtils.CURSOR_FIELD_FUNCTION, StateGeneratorUtils.CURSOR_RECORD_COUNT_FUNCTION, diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/GlobalStateManagerTest.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/GlobalStateManagerTest.kt index c87e2ee6c3a8..2fb425949083 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/GlobalStateManagerTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/GlobalStateManagerTest.kt @@ -9,7 +9,6 @@ import io.airbyte.cdk.integrations.source.relationaldb.models.DbStreamState import io.airbyte.commons.json.Jsons import io.airbyte.protocol.models.v0.* import java.util.* -import java.util.stream.Collectors import org.junit.jupiter.api.Assertions import org.junit.jupiter.api.Disabled import org.junit.jupiter.api.Test @@ -101,7 +100,7 @@ class GlobalStateManagerTest { ) .stream() .sorted(Comparator.comparing { obj: DbStreamState -> obj.streamName }) - .collect(Collectors.toList()) + .toList() ) val stateManager: StateManager = GlobalStateManager(AirbyteStateMessage().withData(Jsons.jsonNode(dbState)), catalog) @@ -129,7 +128,7 @@ class GlobalStateManagerTest { ) .stream() .sorted(Comparator.comparing { obj: DbStreamState -> obj.streamName }) - .collect(Collectors.toList()) + .toList() ) val expectedGlobalState = @@ -191,7 +190,7 @@ class GlobalStateManagerTest { o.streamDescriptor.name } ) - .collect(Collectors.toList()) + .toList() ) val expected = AirbyteStateMessage() @@ -282,7 +281,7 @@ class GlobalStateManagerTest { ) .stream() .sorted(Comparator.comparing { obj: DbStreamState -> obj.streamName }) - .collect(Collectors.toList()) + .toList() ) val expectedGlobalState = @@ -344,7 +343,7 @@ class GlobalStateManagerTest { o.streamDescriptor.name } ) - .collect(Collectors.toList()) + .toList() ) val expected = AirbyteStateMessage() diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/LegacyStateManagerTest.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/LegacyStateManagerTest.kt index b6a585713b95..62ee96722e53 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/LegacyStateManagerTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/LegacyStateManagerTest.kt @@ -14,7 +14,6 @@ import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream import java.util.* import java.util.List import java.util.Map -import java.util.stream.Collectors import org.junit.jupiter.api.Assertions import org.junit.jupiter.api.Test import org.mockito.Mockito @@ -157,7 +156,7 @@ class LegacyStateManagerTest { obj.streamName } ) - .collect(Collectors.toList()) + .toList() ) .withCdc(false) ) @@ -197,7 +196,7 @@ class LegacyStateManagerTest { obj.streamName } ) - .collect(Collectors.toList()) + .toList() ) .withCdc(false) ) @@ -255,7 +254,7 @@ class LegacyStateManagerTest { obj.streamName } ) - .collect(Collectors.toList()) + .toList() ) .withCdc(false) ) @@ -318,7 +317,7 @@ class LegacyStateManagerTest { obj.streamName } ) - .collect(Collectors.toList()) + .toList() ) .withCdc(true) ) @@ -353,7 +352,7 @@ class LegacyStateManagerTest { obj.streamName } ) - .collect(Collectors.toList()) + .toList() ) .withCdc(true) ) diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/StreamStateManagerTest.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/StreamStateManagerTest.kt index 6fba4dda3a85..26a100a9f020 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/StreamStateManagerTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/StreamStateManagerTest.kt @@ -9,7 +9,6 @@ import io.airbyte.cdk.integrations.source.relationaldb.models.DbStreamState import io.airbyte.commons.json.Jsons import io.airbyte.protocol.models.v0.* import java.util.* -import java.util.stream.Collectors import org.junit.jupiter.api.Assertions import org.junit.jupiter.api.Test import org.mockito.Mockito @@ -188,7 +187,7 @@ class StreamStateManagerTest { ) .stream() .sorted(Comparator.comparing { obj: DbStreamState -> obj.streamName }) - .collect(Collectors.toList()) + .toList() ) val expectedFirstEmission = createStreamState( @@ -231,7 +230,7 @@ class StreamStateManagerTest { ) .stream() .sorted(Comparator.comparing { obj: DbStreamState -> obj.streamName }) - .collect(Collectors.toList()) + .toList() ) val expectedSecondEmission = createStreamState( @@ -397,7 +396,7 @@ class StreamStateManagerTest { ) .stream() .sorted(Comparator.comparing { obj: DbStreamState -> obj.streamName }) - .collect(Collectors.toList()) + .toList() ) val expectedFirstEmission = diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/debezium/CdcSourceTest.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/debezium/CdcSourceTest.kt index ff1990debba5..9a523b262244 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/debezium/CdcSourceTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/debezium/CdcSourceTest.kt @@ -315,7 +315,7 @@ abstract class CdcSourceTest> { .stream() .filter { r: AirbyteMessage -> r.type == AirbyteMessage.Type.STATE } .map { obj: AirbyteMessage -> obj.state } - .collect(Collectors.toList()) + .toList() } protected fun assertExpectedRecords( @@ -890,11 +890,11 @@ abstract class CdcSourceTest> { expectedCatalog.streams .stream() .sorted(Comparator.comparing { obj: AirbyteStream -> obj.name }) - .collect(Collectors.toList()), + .toList(), actualCatalog.streams .stream() .sorted(Comparator.comparing { obj: AirbyteStream -> obj.name }) - .collect(Collectors.toList()), + .toList() ) } diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.kt index b7c1cfc616a0..fc805a5b7008 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.kt @@ -23,7 +23,6 @@ import java.math.BigDecimal import java.sql.SQLException import java.util.* import java.util.function.Consumer -import java.util.stream.Collectors import junit.framework.TestCase.assertEquals import org.hamcrest.MatcherAssert import org.hamcrest.Matchers @@ -317,7 +316,7 @@ abstract class JdbcSourceAcceptanceTest> { stream.namespace.startsWith(schemaName) } } - .collect(Collectors.toList()) + .toList() return filteredCatalog } else { return catalog @@ -438,7 +437,7 @@ abstract class JdbcSourceAcceptanceTest> { convertIdBasedOnDatabase(m.record.data[COL_ID].asInt()) ) } - .collect(Collectors.toList()) + .toList() return expectedMessages } @@ -529,7 +528,7 @@ abstract class JdbcSourceAcceptanceTest> { convertIdBasedOnDatabase(m.record.data[COL_ID].asInt()) ) } - .collect(Collectors.toList()) + .toList() } @Test @@ -602,7 +601,7 @@ abstract class JdbcSourceAcceptanceTest> { convertIdBasedOnDatabase(m.record.data[COL_ID].asInt()) ) } - .collect(Collectors.toList()) + .toList() } @Test @@ -946,7 +945,7 @@ abstract class JdbcSourceAcceptanceTest> { convertIdBasedOnDatabase(m.record.data[COL_ID].asInt()) ) } - .collect(Collectors.toList()) + .toList() } // when initial and final cursor fields are the same. @@ -1266,9 +1265,8 @@ abstract class JdbcSourceAcceptanceTest> { // Filter to only keep the main stream name as configured stream catalog.withStreams( catalog.streams - .stream() .filter { s: ConfiguredAirbyteStream -> s.stream.name == streamName() } - .collect(Collectors.toList()) + .toMutableList() ) return catalog } @@ -1276,7 +1274,7 @@ abstract class JdbcSourceAcceptanceTest> { protected open fun getCatalog(defaultNamespace: String?): AirbyteCatalog { return AirbyteCatalog() .withStreams( - java.util.List.of( + mutableListOf( CatalogHelpers.createAirbyteStream( TABLE_NAME, defaultNamespace, @@ -1408,7 +1406,7 @@ abstract class JdbcSourceAcceptanceTest> { ) ) } - .collect(Collectors.toList()) + .toList() } protected open fun createState(states: List): List { @@ -1427,7 +1425,7 @@ abstract class JdbcSourceAcceptanceTest> { .withStreamState(Jsons.jsonNode(s)) ) } - .collect(Collectors.toList()) + .toList() } @Throws(SQLException::class) @@ -1657,7 +1655,7 @@ abstract class JdbcSourceAcceptanceTest> { return messages .stream() .filter { r: AirbyteMessage -> r.type == AirbyteMessage.Type.RECORD } - .collect(Collectors.toList()) + .toList() } protected fun extractStateMessage(messages: List): List { @@ -1665,7 +1663,7 @@ abstract class JdbcSourceAcceptanceTest> { .stream() .filter { r: AirbyteMessage -> r.type == AirbyteMessage.Type.STATE } .map { obj: AirbyteMessage -> obj.state } - .collect(Collectors.toList()) + .toList() } protected fun extractStateMessage( @@ -1679,7 +1677,7 @@ abstract class JdbcSourceAcceptanceTest> { r.state.stream.streamDescriptor.name == streamName } .map { obj: AirbyteMessage -> obj.state } - .collect(Collectors.toList()) + .toList() } protected fun createRecord( diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/AbstractSourceDatabaseTypeTest.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/AbstractSourceDatabaseTypeTest.kt index 4d721ea307f2..1af04e601b0f 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/AbstractSourceDatabaseTypeTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/AbstractSourceDatabaseTypeTest.kt @@ -309,7 +309,7 @@ abstract class AbstractSourceDatabaseTypeTest : AbstractSourceConnectorTest() { ) ) } - .collect(Collectors.toList()) + .toList() ) /** @@ -409,7 +409,7 @@ abstract class AbstractSourceDatabaseTypeTest : AbstractSourceConnectorTest() { .stream() .filter { r: AirbyteMessage -> r.type == AirbyteMessage.Type.STATE } .map { obj: AirbyteMessage -> obj.state } - .collect(Collectors.toList()) + .toList() } companion object { diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/SourceAcceptanceTest.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/SourceAcceptanceTest.kt index ce97b7ec20cf..7f9053cfd5cc 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/SourceAcceptanceTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/SourceAcceptanceTest.kt @@ -11,7 +11,6 @@ import io.airbyte.commons.json.Jsons import io.airbyte.configoss.StandardCheckConnectionOutput import io.airbyte.protocol.models.v0.* import java.util.* -import java.util.stream.Collectors import org.junit.jupiter.api.Assertions import org.junit.jupiter.api.Test import org.slf4j.Logger @@ -229,7 +228,7 @@ abstract class SourceAcceptanceTest : AbstractSourceConnectorTest() { configuredCatalog.streams .stream() .filter { s: ConfiguredAirbyteStream -> s.syncMode == SyncMode.INCREMENTAL } - .collect(Collectors.toList()) + .toList() val airbyteMessages = runRead(configuredCatalog, state) val recordMessages = filterRecords(airbyteMessages) @@ -238,7 +237,7 @@ abstract class SourceAcceptanceTest : AbstractSourceConnectorTest() { .stream() .filter { m: AirbyteMessage -> m.type == AirbyteMessage.Type.STATE } .map { obj: AirbyteMessage -> obj.state } - .collect(Collectors.toList()) + .toList() Assertions.assertFalse( recordMessages.isEmpty(), "Expected the first incremental sync to produce records" @@ -394,13 +393,13 @@ abstract class SourceAcceptanceTest : AbstractSourceConnectorTest() { .stream() .map { m: AirbyteRecordMessage -> this.pruneEmittedAt(m) } .map { m: AirbyteRecordMessage -> this.pruneCdcMetadata(m) } - .collect(Collectors.toList()) + .toList() val prunedActual = actual .stream() .map { m: AirbyteRecordMessage -> this.pruneEmittedAt(m) } .map { m: AirbyteRecordMessage -> this.pruneCdcMetadata(m) } - .collect(Collectors.toList()) + .toList() Assertions.assertEquals(prunedExpected.size, prunedActual.size, message) Assertions.assertTrue(prunedExpected.containsAll(prunedActual), message) Assertions.assertTrue(prunedActual.containsAll(prunedExpected), message) @@ -441,7 +440,7 @@ abstract class SourceAcceptanceTest : AbstractSourceConnectorTest() { .stream() .filter { m: AirbyteMessage -> m.type == AirbyteMessage.Type.RECORD } .map { obj: AirbyteMessage -> obj.record } - .collect(Collectors.toList()) + .toList() } @JvmStatic diff --git a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/json/JsonPaths.kt b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/json/JsonPaths.kt index 1c354a804ba4..a5a4c97d9159 100644 --- a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/json/JsonPaths.kt +++ b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/json/JsonPaths.kt @@ -17,7 +17,6 @@ import com.jayway.jsonpath.spi.mapper.MappingProvider import io.airbyte.commons.util.MoreIterators import java.util.* import java.util.function.BiFunction -import java.util.stream.Collectors import org.slf4j.Logger import org.slf4j.LoggerFactory @@ -171,7 +170,7 @@ object JsonPaths { return getInternal(GET_PATHS_CONFIGURATION, json, jsonPath) .stream() .map { obj: JsonNode -> obj.asText() } - .collect(Collectors.toList()) + .toList() } /** diff --git a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/json/JsonSchemas.kt b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/json/JsonSchemas.kt index e97475ec0f46..7b1076cfa8be 100644 --- a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/json/JsonSchemas.kt +++ b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/json/JsonSchemas.kt @@ -16,7 +16,6 @@ import java.util.* import java.util.function.BiConsumer import java.util.function.BiFunction import java.util.function.Predicate -import java.util.stream.Collectors private val log = KotlinLogging.logger {} @@ -68,7 +67,7 @@ object JsonSchemas { resources .map { p: Path -> p.fileName.toString() } .filter { p: String -> p.endsWith(".yaml") } - .collect(Collectors.toList()) + .toList() } val configRoot = Files.createTempDirectory("schemas") for (filename in filenames) { @@ -304,7 +303,7 @@ object JsonSchemas { MoreIterators.toList(jsonSchema[JSON_SCHEMA_TYPE_KEY].iterator()) .stream() .map { obj: JsonNode -> obj.asText() } - .collect(Collectors.toList()) + .toList() } else { java.util.List.of(jsonSchema[JSON_SCHEMA_TYPE_KEY].asText()) } diff --git a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/json/Jsons.kt b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/json/Jsons.kt index 7c09bf9e2e69..cb7132c4672f 100644 --- a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/json/Jsons.kt +++ b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/json/Jsons.kt @@ -22,7 +22,6 @@ import java.io.File import java.io.IOException import java.util.* import java.util.function.BiConsumer -import java.util.stream.Collectors import org.slf4j.Logger import org.slf4j.LoggerFactory @@ -250,7 +249,7 @@ object Jsons { } fun children(jsonNode: JsonNode): List { - return MoreStreams.toStream(jsonNode.elements()).collect(Collectors.toList()) + return MoreStreams.toStream(jsonNode.elements()).toList() } fun toPrettyString(jsonNode: JsonNode?): String { diff --git a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/validation/json/JsonSchemaValidator.kt b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/validation/json/JsonSchemaValidator.kt index 185a614e67d1..a7b85b909708 100644 --- a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/validation/json/JsonSchemaValidator.kt +++ b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/validation/json/JsonSchemaValidator.kt @@ -98,14 +98,14 @@ class JsonSchemaValidator @VisibleForTesting constructor(private val baseUri: UR return validateInternal(schemaJson, objectJson) .stream() .map { obj: ValidationMessage -> obj.arguments } - .collect(Collectors.toList()) + .toList() } fun getValidationMessagePaths(schemaJson: JsonNode, objectJson: JsonNode): List { return validateInternal(schemaJson, objectJson) .stream() .map { obj: ValidationMessage -> obj.path } - .collect(Collectors.toList()) + .toList() } @Throws(JsonValidationException::class) diff --git a/airbyte-cdk/java/airbyte-cdk/dependencies/src/test/kotlin/io/airbyte/commons/json/JsonPathsTest.kt b/airbyte-cdk/java/airbyte-cdk/dependencies/src/test/kotlin/io/airbyte/commons/json/JsonPathsTest.kt index 289759861974..5c26f9859df1 100644 --- a/airbyte-cdk/java/airbyte-cdk/dependencies/src/test/kotlin/io/airbyte/commons/json/JsonPathsTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/dependencies/src/test/kotlin/io/airbyte/commons/json/JsonPathsTest.kt @@ -6,7 +6,6 @@ package io.airbyte.commons.json import com.fasterxml.jackson.databind.JsonNode import com.fasterxml.jackson.databind.node.ArrayNode import com.jayway.jsonpath.PathNotFoundException -import java.util.stream.Collectors import org.junit.jupiter.api.Assertions import org.junit.jupiter.api.Test import org.junit.jupiter.api.function.Executable @@ -19,21 +18,21 @@ internal class JsonPathsTest { JsonPaths.getValues(JSON_NODE, LIST_ALL_QUERY) .stream() .map { obj: JsonNode -> obj.asInt() } - .collect(Collectors.toList()) + .toList() ) Assertions.assertEquals( listOf(1), JsonPaths.getValues(JSON_NODE, LIST_ONE_QUERY) .stream() .map { obj: JsonNode -> obj.asInt() } - .collect(Collectors.toList()) + .toList() ) Assertions.assertEquals( listOf(10), JsonPaths.getValues(JSON_NODE, NESTED_FIELD_QUERY) .stream() .map { obj: JsonNode -> obj.asInt() } - .collect(Collectors.toList()) + .toList() ) Assertions.assertEquals( JSON_NODE["two"], diff --git a/airbyte-cdk/java/airbyte-cdk/dependencies/src/test/kotlin/io/airbyte/commons/yaml/YamlsTest.kt b/airbyte-cdk/java/airbyte-cdk/dependencies/src/test/kotlin/io/airbyte/commons/yaml/YamlsTest.kt index 0e5fe8be3fed..c4b7057e7f50 100644 --- a/airbyte-cdk/java/airbyte-cdk/dependencies/src/test/kotlin/io/airbyte/commons/yaml/YamlsTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/dependencies/src/test/kotlin/io/airbyte/commons/yaml/YamlsTest.kt @@ -12,7 +12,6 @@ import io.airbyte.commons.stream.MoreStreams import java.io.* import java.nio.charset.StandardCharsets import java.util.* -import java.util.stream.Collectors import org.junit.jupiter.api.Assertions import org.junit.jupiter.api.Test import org.mockito.Mockito @@ -112,7 +111,7 @@ internal class YamlsTest { classes, MoreStreams.toStream(iterator) .map { e: JsonNode -> Jsons.`object`(e, ToClass::class.java) } - .collect(Collectors.toList()) + .toList() ) } } catch (e: Exception) { diff --git a/airbyte-cdk/java/airbyte-cdk/dependencies/src/test/kotlin/io/airbyte/workers/internal/DefaultAirbyteStreamFactoryTest.kt b/airbyte-cdk/java/airbyte-cdk/dependencies/src/test/kotlin/io/airbyte/workers/internal/DefaultAirbyteStreamFactoryTest.kt index 96e1983ebbb2..257dabcf235f 100644 --- a/airbyte-cdk/java/airbyte-cdk/dependencies/src/test/kotlin/io/airbyte/workers/internal/DefaultAirbyteStreamFactoryTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/dependencies/src/test/kotlin/io/airbyte/workers/internal/DefaultAirbyteStreamFactoryTest.kt @@ -14,7 +14,6 @@ import java.io.InputStream import java.io.InputStreamReader import java.nio.charset.StandardCharsets import java.util.* -import java.util.stream.Collectors import java.util.stream.Stream import org.junit.jupiter.api.Assertions import org.junit.jupiter.api.BeforeEach @@ -42,10 +41,7 @@ internal class DefaultAirbyteStreamFactoryTest { val messageStream = stringToMessageStream(Jsons.serialize(record1)) val expectedStream = Stream.of(record1) - Assertions.assertEquals( - expectedStream.collect(Collectors.toList()), - messageStream.collect(Collectors.toList()) - ) + Assertions.assertEquals(expectedStream.toList(), messageStream.toList()) Mockito.verifyNoInteractions(logger) } @@ -55,7 +51,7 @@ internal class DefaultAirbyteStreamFactoryTest { val messageStream = stringToMessageStream(invalidRecord) - Assertions.assertEquals(emptyList(), messageStream.collect(Collectors.toList())) + Assertions.assertEquals(emptyList(), messageStream.toList()) Mockito.verify(logger).info(ArgumentMatchers.anyString()) Mockito.verifyNoMoreInteractions(logger) } @@ -67,7 +63,7 @@ internal class DefaultAirbyteStreamFactoryTest { val messageStream = stringToMessageStream(Jsons.serialize(logMessage)) - Assertions.assertEquals(emptyList(), messageStream.collect(Collectors.toList())) + Assertions.assertEquals(emptyList(), messageStream.toList()) Mockito.verify(logger).warn("warning") Mockito.verifyNoMoreInteractions(logger) } @@ -80,7 +76,7 @@ internal class DefaultAirbyteStreamFactoryTest { val messageStream = stringToMessageStream(invalidRecord) - Assertions.assertEquals(emptyList(), messageStream.collect(Collectors.toList())) + Assertions.assertEquals(emptyList(), messageStream.toList()) Mockito.verify(logger).error(ArgumentMatchers.anyString(), ArgumentMatchers.anyString()) Mockito.verifyNoMoreInteractions(logger) } @@ -93,7 +89,7 @@ internal class DefaultAirbyteStreamFactoryTest { val messageStream = stringToMessageStream(invalidRecord) - Assertions.assertEquals(emptyList(), messageStream.collect(Collectors.toList())) + Assertions.assertEquals(emptyList(), messageStream.toList()) Mockito.verify(logger).error(ArgumentMatchers.anyString(), ArgumentMatchers.anyString()) Mockito.verifyNoMoreInteractions(logger) } @@ -129,7 +125,7 @@ internal class DefaultAirbyteStreamFactoryTest { val messageStream = stringToMessageStream(inputString) - Assertions.assertEquals(emptyList(), messageStream.collect(Collectors.toList())) + Assertions.assertEquals(emptyList(), messageStream.toList()) Mockito.verify(logger).error(ArgumentMatchers.anyString(), ArgumentMatchers.anyString()) Mockito.verifyNoMoreInteractions(logger) } diff --git a/airbyte-cdk/java/airbyte-cdk/dependencies/src/testFixtures/kotlin/io/airbyte/workers/helper/CatalogClientConverters.kt b/airbyte-cdk/java/airbyte-cdk/dependencies/src/testFixtures/kotlin/io/airbyte/workers/helper/CatalogClientConverters.kt index ea2da8e96773..619f5be63677 100644 --- a/airbyte-cdk/java/airbyte-cdk/dependencies/src/testFixtures/kotlin/io/airbyte/workers/helper/CatalogClientConverters.kt +++ b/airbyte-cdk/java/airbyte-cdk/dependencies/src/testFixtures/kotlin/io/airbyte/workers/helper/CatalogClientConverters.kt @@ -36,7 +36,7 @@ object CatalogClientConverters { return@map null } } - .collect(Collectors.toList()) + .toList() protoCatalog.withStreams(airbyteStream) return protoCatalog @@ -144,7 +144,7 @@ object CatalogClientConverters { .stream(s) .config(generateDefaultConfiguration(s)) } - .collect(Collectors.toList()) + .toList() ) } diff --git a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsDestinationAcceptanceTest.kt b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsDestinationAcceptanceTest.kt index e7ee60e6d018..b286fd92ad9a 100644 --- a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsDestinationAcceptanceTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsDestinationAcceptanceTest.kt @@ -24,7 +24,6 @@ import io.airbyte.configoss.StandardCheckConnectionOutput import io.airbyte.protocol.models.v0.AirbyteConnectionStatus import java.nio.file.Path import java.util.* -import java.util.stream.Collectors import org.apache.commons.lang3.RandomStringUtils import org.joda.time.DateTime import org.joda.time.DateTimeZone @@ -119,13 +118,13 @@ abstract class GcsDestinationAcceptanceTest(protected val outputFormat: FileUplo .stream() .filter { o: S3ObjectSummary -> o.key.contains("$streamNameStr/") } .sorted(Comparator.comparingLong { o: S3ObjectSummary -> o.lastModified.time }) - .collect(Collectors.toList()) + .toList() LOGGER.info( "All objects: {}", objectSummaries .stream() .map { o: S3ObjectSummary -> String.format("%s/%s", o.bucketName, o.key) } - .collect(Collectors.toList()) + .toList() ) return objectSummaries } diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3ConsumerFactory.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3ConsumerFactory.kt index 9d1b1bd8db3d..5d81dc60670d 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3ConsumerFactory.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3ConsumerFactory.kt @@ -164,11 +164,7 @@ class S3ConsumerFactory { config: S3DestinationConfig, catalog: ConfiguredAirbyteCatalog? ): List { - return catalog!! - .streams - .stream() - .map(toWriteConfig(storageOperations, config)) - .collect(Collectors.toList()) + return catalog!!.streams.stream().map(toWriteConfig(storageOperations, config)).toList() } private fun toWriteConfig( diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/avro/JsonToAvroSchemaConverter.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/avro/JsonToAvroSchemaConverter.kt index 3b938622d762..a04b3a27ac63 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/avro/JsonToAvroSchemaConverter.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/avro/JsonToAvroSchemaConverter.kt @@ -361,7 +361,7 @@ class JsonToAvroSchemaConverter { ) } .distinct() - .collect(Collectors.toList()) + .toList() return mergeRecordSchemas(fieldName, fieldNamespace, schemas, appendExtraProps) } @@ -494,7 +494,7 @@ class JsonToAvroSchemaConverter { .filter { s: Schema -> s != NULL_SCHEMA } } .distinct() - .collect(Collectors.toList()) + .toList() val subfieldNamespace: String = if (fieldNamespace == null) fieldName else ("$fieldNamespace.$fieldName") // recursively merge schemas of a subfield because they may include multiple record @@ -538,7 +538,6 @@ class JsonToAvroSchemaConverter { // Filter out null types, which will be added back in the end. val nonNullFieldTypes: MutableList = getNonNullTypes(fieldName, fieldDefinition) - .stream() .flatMap { fieldType: JsonSchemaType -> val singleFieldSchema: Schema = parseSingleType( @@ -550,15 +549,15 @@ class JsonToAvroSchemaConverter { addStringToLogicalTypes, ) if (singleFieldSchema.isUnion) { - return@flatMap singleFieldSchema.types.stream() + return@flatMap singleFieldSchema.types } else { - return@flatMap Stream.of( + return@flatMap listOf( singleFieldSchema, ) } } .distinct() - .collect(Collectors.toList()) + .toMutableList() if (nonNullFieldTypes.isEmpty()) { return Schema.create(Schema.Type.NULL) @@ -667,7 +666,7 @@ class JsonToAvroSchemaConverter { s.asText(), ) } - .collect(Collectors.toList()) + .toList() } if (hasTextValue(typeProperty)) { diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/csv/RootLevelFlatteningSheetGenerator.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/csv/RootLevelFlatteningSheetGenerator.kt index b70d4a7a217d..a77273bd9c42 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/csv/RootLevelFlatteningSheetGenerator.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/csv/RootLevelFlatteningSheetGenerator.kt @@ -10,7 +10,6 @@ import io.airbyte.cdk.integrations.base.JavaBaseConstants import io.airbyte.commons.json.Jsons import io.airbyte.commons.util.MoreIterators import java.util.LinkedList -import java.util.stream.Collectors class RootLevelFlatteningSheetGenerator(jsonSchema: JsonNode) : BaseSheetGenerator(), CsvSheetGenerator { @@ -21,7 +20,7 @@ class RootLevelFlatteningSheetGenerator(jsonSchema: JsonNode) : ) .stream() .sorted() - .collect(Collectors.toList()) + .toList() override fun getHeaderRow(): List { val headers: MutableList = diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/s3/S3DestinationAcceptanceTest.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/s3/S3DestinationAcceptanceTest.kt index f26ab4f1753d..f1fa28a36a8d 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/s3/S3DestinationAcceptanceTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/s3/S3DestinationAcceptanceTest.kt @@ -19,7 +19,6 @@ import io.airbyte.commons.jackson.MoreMappers import io.airbyte.commons.json.Jsons import java.nio.file.Path import java.util.* -import java.util.stream.Collectors import org.apache.commons.lang3.RandomStringUtils import org.joda.time.DateTime import org.joda.time.DateTimeZone @@ -93,13 +92,13 @@ protected constructor(protected val outputFormat: FileUploadFormat) : Destinatio .stream() .filter { o: S3ObjectSummary -> o.key.contains("$streamNameStr/") } .sorted(Comparator.comparingLong { o: S3ObjectSummary -> o.lastModified.time }) - .collect(Collectors.toList()) + .toList() LOGGER.info( "All objects: {}", objectSummaries .stream() .map { o: S3ObjectSummary -> String.format("%s/%s", o.bucketName, o.key) } - .collect(Collectors.toList()), + .toList(), ) return objectSummaries } diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseTypingDedupingTest.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseTypingDedupingTest.kt index 112837f2fe5e..2cd441e61a2d 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseTypingDedupingTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseTypingDedupingTest.kt @@ -28,7 +28,6 @@ import java.util.concurrent.Executors import java.util.concurrent.TimeUnit import java.util.function.Consumer import java.util.function.Function -import java.util.stream.Collectors import java.util.stream.Stream import kotlin.test.assertFails import org.apache.commons.lang3.RandomStringUtils @@ -953,7 +952,7 @@ abstract class BaseTypingDedupingTest { return Collections.nCopies(n, list) .stream() .flatMap { obj: List -> obj.stream() } - .collect(Collectors.toList()) + .toList() } @Throws(Exception::class) diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/RecordDiffer.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/RecordDiffer.kt index 3fbf648f80ee..e32b6dcd171c 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/RecordDiffer.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/RecordDiffer.kt @@ -93,11 +93,11 @@ constructor( expectedRecords .stream() .map { record: JsonNode -> this.copyWithLiftedData(record) } - .collect(Collectors.toList()), + .toList(), actualRecords .stream() .map { record: JsonNode -> this.copyWithLiftedData(record) } - .collect(Collectors.toList()), + .toList(), rawRecordIdentityComparator, rawRecordSortComparator, rawRecordIdentityExtractor,