Skip to content

Commit

Permalink
cleanup question marks in CDK code (#37518)
Browse files Browse the repository at this point in the history
just some kotlin cleanup
  • Loading branch information
stephane-airbyte authored May 23, 2024
1 parent 570cc86 commit c304df3
Show file tree
Hide file tree
Showing 102 changed files with 578 additions and 655 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ abstract class AzureBlobStorageStreamCopier(
}
}

override fun prepareStagingFile(): String? {
override fun prepareStagingFile(): String {
currentFile = prepareAzureStagingFile()
val currentFile = this.currentFile!!
if (!azureStagingFiles.contains(currentFile)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ abstract class AzureBlobStorageStreamCopierFactory : StreamCopierFactory<AzureBl
nameTransformer: StandardNameTransformer?,
db: JdbcDatabase?,
sqlOperations: SqlOperations?
): StreamCopier? {
): StreamCopier {
try {
val stream = configuredStream!!.stream
val syncMode = configuredStream.destinationSyncMode
Expand Down Expand Up @@ -62,5 +62,5 @@ abstract class AzureBlobStorageStreamCopierFactory : StreamCopierFactory<AzureBl
azureBlobConfig: AzureBlobStorageConfig?,
nameTransformer: StandardNameTransformer?,
sqlOperations: SqlOperations?
): StreamCopier?
): StreamCopier
}
25 changes: 1 addition & 24 deletions airbyte-cdk/java/airbyte-cdk/core/build.gradle
Original file line number Diff line number Diff line change
@@ -1,18 +1,7 @@
import org.jetbrains.kotlin.gradle.dsl.JvmTarget
import org.jetbrains.kotlin.gradle.dsl.KotlinVersion

java {
// TODO: rewrite code to avoid javac wornings in the first place
compileJava {
options.compilerArgs += "-Xlint:-deprecation,-try,-rawtypes,-overloads,-this-escape"
}
compileTestJava {
options.compilerArgs += "-Xlint:-try,-divzero,-cast"
}
compileTestFixturesJava {
options.compilerArgs += "-Xlint:-cast,-deprecation"
}
}


compileTestFixturesKotlin {
compilerOptions {
Expand All @@ -38,18 +27,6 @@ compileTestKotlin {
}
}

compileKotlin {
compilerOptions {
jvmTarget = JvmTarget.JVM_21
languageVersion = KotlinVersion.KOTLIN_1_9
freeCompilerArgs = ["-Xjvm-default=all"]
}
dependsOn {
tasks.matching { it.name == 'generate' }
}
}


dependencies {

api 'com.datadoghq:dd-trace-api:1.28.0'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ import java.sql.SQLException
import org.jooq.DSLContext

fun interface ContextQueryFunction<T> {
@Throws(SQLException::class) fun query(context: DSLContext?): T
@Throws(SQLException::class) fun query(context: DSLContext): T
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,13 @@ object DataTypeUtils {

@JvmStatic
fun <T> returnNullIfInvalid(valueProducer: DataTypeSupplier<T>): T? {
return returnNullIfInvalid(valueProducer, Function { _: T? -> true })
return returnNullIfInvalid(valueProducer, Function { _: T -> true })
}

@JvmStatic
fun <T> returnNullIfInvalid(
valueProducer: DataTypeSupplier<T>,
isValidFn: Function<T?, Boolean>
isValidFn: Function<T, Boolean>
): T? {
// Some edge case values (e.g: Infinity, NaN) have no java or JSON equivalent, and will
// throw an
Expand All @@ -72,13 +72,13 @@ object DataTypeUtils {

@JvmStatic
fun <T> throwExceptionIfInvalid(valueProducer: DataTypeSupplier<T>): T? {
return throwExceptionIfInvalid(valueProducer, Function { _: T? -> true })
return throwExceptionIfInvalid(valueProducer, Function { _: T -> true })
}

@JvmStatic
fun <T> throwExceptionIfInvalid(
valueProducer: DataTypeSupplier<T>,
isValidFn: Function<T?, Boolean>
isValidFn: Function<T, Boolean>
): T? {
// Some edge case values (e.g: Infinity, NaN) have no java or JSON equivalent, and will
// throw an
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,15 @@ import org.jooq.DSLContext
import org.jooq.impl.DSL

/** Database object for interacting with a Jooq connection. */
open class Database(private val dslContext: DSLContext?) {
open class Database(protected val dslContext: DSLContext) {
@Throws(SQLException::class)
open fun <T> query(transform: ContextQueryFunction<T>): T? {
return transform.query(dslContext)
}

@Throws(SQLException::class)
open fun <T> transaction(transform: ContextQueryFunction<T>): T? {
return dslContext!!.transactionResult { configuration: Configuration? ->
return dslContext.transactionResult { configuration: Configuration ->
transform.query(DSL.using(configuration))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,5 @@ abstract class SqlDatabase : AbstractDatabase() {
@Throws(Exception::class) abstract fun execute(sql: String?)

@Throws(Exception::class)
abstract fun unsafeQuery(sql: String?, vararg params: String?): Stream<JsonNode>
abstract fun unsafeQuery(sql: String?, vararg params: String): Stream<JsonNode>
}
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ abstract class AbstractJdbcCompatibleSourceOperations<Datatype> :
columnName,
DataTypeUtils.returnNullIfInvalid(
{ resultSet.getDouble(index) },
{ d: Double? -> java.lang.Double.isFinite(d!!) },
{ d: Double -> java.lang.Double.isFinite(d) },
),
)
}
Expand All @@ -191,7 +191,7 @@ abstract class AbstractJdbcCompatibleSourceOperations<Datatype> :
columnName,
DataTypeUtils.returnNullIfInvalid(
{ resultSet.getFloat(index) },
{ f: Float? -> java.lang.Float.isFinite(f!!) },
{ f: Float -> java.lang.Float.isFinite(f) },
),
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ constructor(
}
}

override fun <T> executeMetadataQuery(query: Function<DatabaseMetaData?, T>): T {
override fun <T> executeMetadataQuery(query: Function<DatabaseMetaData, T>): T {
try {
dataSource.connection.use { connection ->
val metaData = connection.metaData
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ abstract class JdbcDatabase(protected val sourceOperations: JdbcCompatibleSource
*/
@MustBeClosed
@Throws(SQLException::class)
override fun unsafeQuery(sql: String?, vararg params: String?): Stream<JsonNode> {
override fun unsafeQuery(sql: String?, vararg params: String): Stream<JsonNode> {
return unsafeQuery(
{ connection: Connection ->
val statement = connection.prepareStatement(sql)
Expand All @@ -192,7 +192,7 @@ abstract class JdbcDatabase(protected val sourceOperations: JdbcCompatibleSource
* syntactic sugar.
*/
@Throws(SQLException::class)
fun queryJsons(sql: String?, vararg params: String?): List<JsonNode> {
fun queryJsons(sql: String?, vararg params: String): List<JsonNode> {
unsafeQuery(sql, *params).use { stream ->
return stream.toList()
}
Expand All @@ -212,7 +212,7 @@ abstract class JdbcDatabase(protected val sourceOperations: JdbcCompatibleSource
@get:Throws(SQLException::class) abstract val metaData: DatabaseMetaData

@Throws(SQLException::class)
abstract fun <T> executeMetadataQuery(query: Function<DatabaseMetaData?, T>): T
abstract fun <T> executeMetadataQuery(query: Function<DatabaseMetaData, T>): T

companion object {
private val LOGGER: Logger = LoggerFactory.getLogger(JdbcDatabase::class.java)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ object AirbyteTraceMessageUtility {
// Not sure why defaultOutputRecordCollector is under Destination specifically,
// but this matches usage elsewhere in base-java
val outputRecordCollector =
Consumer<AirbyteMessage> { m: AirbyteMessage? ->
Consumer<AirbyteMessage> { m: AirbyteMessage ->
Destination.Companion.defaultOutputRecordCollector(m)
}
outputRecordCollector.accept(message)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ internal constructor(
val catalog =
parseConfig(parsed.getCatalogPath(), ConfiguredAirbyteCatalog::class.java)!!
val stateOptional =
parsed.getStatePath().map { path: Path? -> parseConfig(path) }
parsed.getStatePath().map { path: Path -> parseConfig(path) }
try {
if (featureFlags.concurrentSourceStreamRead()) {
LOGGER.info("Concurrent source stream read enabled.")
Expand Down Expand Up @@ -271,11 +271,11 @@ internal constructor(
messageIterator: AutoCloseableIterator<AirbyteMessage>,
recordCollector: Consumer<AirbyteMessage>
) {
messageIterator.airbyteStream.ifPresent { s: AirbyteStreamNameNamespacePair? ->
messageIterator.airbyteStream.ifPresent { s: AirbyteStreamNameNamespacePair ->
LOGGER.debug("Producing messages for stream {}...", s)
}
messageIterator.forEachRemaining(recordCollector)
messageIterator.airbyteStream.ifPresent { s: AirbyteStreamNameNamespacePair? ->
messageIterator.airbyteStream.ifPresent { s: AirbyteStreamNameNamespacePair ->
LOGGER.debug("Finished producing messages for stream {}...", s)
}
}
Expand Down Expand Up @@ -352,7 +352,7 @@ internal constructor(
)
produceMessages(stream, streamStatusTrackingRecordConsumer)
} catch (e: Exception) {
stream.airbyteStream.ifPresent { s: AirbyteStreamNameNamespacePair? ->
stream.airbyteStream.ifPresent { s: AirbyteStreamNameNamespacePair ->
LOGGER.error("Failed to consume from stream {}.", s, e)
}
throw RuntimeException(e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,7 @@ constructor(
portKey: List<String>,
wrapped: CheckedConsumer<JsonNode?, Exception?>
) {
sshWrap<Any?>(config, hostKey, portKey) { configInTunnel: JsonNode? ->
sshWrap<Any?>(config, hostKey, portKey) { configInTunnel: JsonNode ->
wrapped.accept(configInTunnel)
null
}
Expand All @@ -532,7 +532,7 @@ constructor(
endPointKey: String,
wrapped: CheckedConsumer<JsonNode?, Exception?>
) {
sshWrap<Any?>(config, endPointKey) { configInTunnel: JsonNode? ->
sshWrap<Any?>(config, endPointKey) { configInTunnel: JsonNode ->
wrapped.accept(configInTunnel)
null
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ fun interface CheckAndRemoveRecordWriter {
* of the new file where the record will be sent will be returned.
*/
@Throws(Exception::class)
fun apply(stream: AirbyteStreamNameNamespacePair?, stagingFileName: String?): String?
fun apply(stream: AirbyteStreamNameNamespacePair, stagingFileName: String?): String?
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ interface StreamCopier {
* @return A string that unqiuely identifies the file. E.g. the filename, or a unique suffix
* that is appended to a shared filename prefix
*/
fun prepareStagingFile(): String?
fun prepareStagingFile(): String

/** @return current staging file name */
val currentFile: String?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ interface StreamCopierFactory<T> {
nameTransformer: StandardNameTransformer?,
db: JdbcDatabase?,
sqlOperations: SqlOperations?
): StreamCopier?
): StreamCopier

companion object {
@JvmStatic
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ import org.apache.logging.log4j.util.Strings
class NormalizationLogParser {
val dbtErrors: MutableList<String> = ArrayList()

fun create(bufferedReader: BufferedReader): Stream<AirbyteMessage?> {
fun create(bufferedReader: BufferedReader): Stream<AirbyteMessage> {
return bufferedReader.lines().flatMap { line: String -> this.toMessages(line) }
}

@VisibleForTesting
fun toMessages(line: String): Stream<AirbyteMessage?> {
fun toMessages(line: String): Stream<AirbyteMessage> {
if (Strings.isEmpty(line)) {
return Stream.of(logMessage(AirbyteLogMessage.Level.INFO, ""))
}
Expand All @@ -51,7 +51,7 @@ class NormalizationLogParser {
*
* This is needed for dbt < 1.0.0, which don't support json-format logs.
*/
private fun nonJsonLineToMessage(line: String): Stream<AirbyteMessage?> {
private fun nonJsonLineToMessage(line: String): Stream<AirbyteMessage> {
// Super hacky thing to try and detect error lines
if (line.contains("[error]")) {
dbtErrors.add(line)
Expand All @@ -64,7 +64,7 @@ class NormalizationLogParser {
* emit it without change), or it's dbt json log, and we need to do some extra work to convert
* it to a log message + aggregate error logs.
*/
private fun jsonToMessage(jsonLine: JsonNode): Stream<AirbyteMessage?> {
private fun jsonToMessage(jsonLine: JsonNode): Stream<AirbyteMessage> {
val message = Jsons.tryObject(jsonLine, AirbyteMessage::class.java)
if (message.isPresent) {
// This line is already an AirbyteMessage; we can just return it directly
Expand Down Expand Up @@ -117,7 +117,7 @@ class NormalizationLogParser {
normalizationLogParser.create(
BufferedReader(InputStreamReader(System.`in`, StandardCharsets.UTF_8))
)
airbyteMessageStream.forEachOrdered { message: AirbyteMessage? ->
airbyteMessageStream.forEachOrdered { message: AirbyteMessage ->
println(Jsons.serialize(message))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class InMemoryRecordBufferingStrategy(
}

val bufferedRecords =
streamBuffer.computeIfAbsent(stream) { _: AirbyteStreamNameNamespacePair? ->
streamBuffer.computeIfAbsent(stream) { _: AirbyteStreamNameNamespacePair ->
ArrayList()
}
bufferedRecords.add(message.record)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ class SerializedBufferingStrategy
* computed buffer
*/
private fun getOrCreateBuffer(stream: AirbyteStreamNameNamespacePair): SerializableBuffer {
return allBuffers.computeIfAbsent(stream) { _: AirbyteStreamNameNamespacePair? ->
return allBuffers.computeIfAbsent(stream) { _: AirbyteStreamNameNamespacePair ->
LOGGER.info(
"Starting a new buffer for stream {} (current state: {} in {} buffers)",
stream.name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ object ConnectorExceptionUtil {
val stacktraces =
throwables
.stream()
.map { throwable: Throwable? -> ExceptionUtils.getStackTrace(throwable) }
.map { throwable: Throwable -> ExceptionUtils.getStackTrace(throwable) }
.collect(Collectors.joining("\n"))
LOGGER.error("$initialMessage$stacktraces\nRethrowing first exception.")
throw throwables.iterator().next()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,18 +181,18 @@ class ConcurrentStreamConsumer(
private fun executeStream(stream: AutoCloseableIterator<AirbyteMessage>) {
try {
stream.use {
stream.airbyteStream.ifPresent { s: AirbyteStreamNameNamespacePair? ->
stream.airbyteStream.ifPresent { s: AirbyteStreamNameNamespacePair ->
LOGGER.debug("Consuming from stream {}...", s)
}
StreamStatusUtils.emitStartStreamStatus(stream, streamStatusEmitter)
streamConsumer.accept(stream)
StreamStatusUtils.emitCompleteStreamStatus(stream, streamStatusEmitter)
stream.airbyteStream.ifPresent { s: AirbyteStreamNameNamespacePair? ->
stream.airbyteStream.ifPresent { s: AirbyteStreamNameNamespacePair ->
LOGGER.debug("Consumption from stream {} complete.", s)
}
}
} catch (e: Exception) {
stream.airbyteStream.ifPresent { s: AirbyteStreamNameNamespacePair? ->
stream.airbyteStream.ifPresent { s: AirbyteStreamNameNamespacePair ->
LOGGER.error("Unable to consume from stream {}.", s, e)
}
StreamStatusUtils.emitIncompleteStreamStatus(stream, streamStatusEmitter)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ internal open class CommonFactoryTest {
@BeforeAll
fun dbSetup(): Unit {
container.withDatabaseName(DATABASE_NAME).withUsername("docker").withPassword("docker")
container!!.start()
container.start()
}

@JvmStatic
@AfterAll
fun dbDown(): Unit {
container!!.close()
container.close()
}
}
}
Loading

0 comments on commit c304df3

Please sign in to comment.