Skip to content

Commit

Permalink
remove nullable from generics
Browse files Browse the repository at this point in the history
  • Loading branch information
stephane-airbyte committed Apr 25, 2024
1 parent e92ae48 commit b78c13e
Show file tree
Hide file tree
Showing 89 changed files with 269 additions and 286 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ abstract class AzureBlobStorageStreamCopier(
@Suppress("DEPRECATION")
@get:VisibleForTesting
val tmpTableName: String = nameTransformer.getTmpTableName(streamName)
protected val activeStagingWriterFileNames: MutableSet<String?> = HashSet()
private val csvPrinters = HashMap<String?, CSVPrinter>()
private val blobClients = HashMap<String?, AppendBlobClient>()
protected val activeStagingWriterFileNames: MutableSet<String> = HashSet()
private val csvPrinters = HashMap<String, CSVPrinter>()
private val blobClients = HashMap<String, AppendBlobClient>()
override var currentFile: String? = null

@Throws(Exception::class)
Expand Down Expand Up @@ -210,7 +210,7 @@ abstract class AzureBlobStorageStreamCopier(
@Throws(Exception::class)
override fun closeNonCurrentStagingFileWriters() {
LOGGER.info("Begin closing non current file writers")
val removedKeys: MutableSet<String?> = HashSet()
val removedKeys: MutableSet<String> = HashSet()
for (key in activeStagingWriterFileNames) {
if (key != currentFile) {
csvPrinters[key]!!.close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@ constructor(
sourceOperations: JdbcCompatibleSourceOperations<*>? = JdbcUtils.defaultSourceOperations
) : JdbcDatabase(sourceOperations) {
@Throws(SQLException::class)
override fun execute(query: CheckedConsumer<Connection, SQLException?>) {
override fun execute(query: CheckedConsumer<Connection, SQLException>) {
dataSource.connection.use { connection -> query.accept(connection) }
}

@Throws(SQLException::class)
override fun <T> bufferedResultSetQuery(
query: CheckedFunction<Connection, ResultSet, SQLException?>,
recordTransform: CheckedFunction<ResultSet, T, SQLException?>
query: CheckedFunction<Connection, ResultSet, SQLException>,
recordTransform: CheckedFunction<ResultSet, T, SQLException>
): List<T> {
dataSource.connection.use { connection ->
toUnsafeStream<T>(query.apply(connection), recordTransform).use { results ->
Expand All @@ -45,8 +45,8 @@ constructor(
@MustBeClosed
@Throws(SQLException::class)
override fun <T> unsafeResultSetQuery(
query: CheckedFunction<Connection, ResultSet, SQLException?>,
recordTransform: CheckedFunction<ResultSet, T, SQLException?>
query: CheckedFunction<Connection, ResultSet, SQLException>,
recordTransform: CheckedFunction<ResultSet, T, SQLException>
): Stream<T> {
val connection = dataSource.connection
return JdbcDatabase.Companion.toUnsafeStream<T>(query.apply(connection), recordTransform)
Expand Down Expand Up @@ -114,8 +114,8 @@ constructor(
@MustBeClosed
@Throws(SQLException::class)
override fun <T> unsafeQuery(
statementCreator: CheckedFunction<Connection, PreparedStatement, SQLException?>,
recordTransform: CheckedFunction<ResultSet, T, SQLException?>
statementCreator: CheckedFunction<Connection, PreparedStatement, SQLException>,
recordTransform: CheckedFunction<ResultSet, T, SQLException>
): Stream<T> {
val connection = dataSource.connection
return JdbcDatabase.Companion.toUnsafeStream<T>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,15 @@ abstract class JdbcDatabase(protected val sourceOperations: JdbcCompatibleSource
* @throws SQLException SQL related exceptions.
*/
@Throws(SQLException::class)
abstract fun execute(query: CheckedConsumer<Connection, SQLException?>)
abstract fun execute(query: CheckedConsumer<Connection, SQLException>)

@Throws(SQLException::class)
override fun execute(sql: String?) {
execute { connection: Connection -> connection.createStatement().execute(sql) }
}

@Throws(SQLException::class)
fun executeWithinTransaction(queries: List<String?>) {
fun executeWithinTransaction(queries: List<String>) {
execute { connection: Connection ->
connection.autoCommit = false
for (s in queries) {
Expand All @@ -63,8 +63,8 @@ abstract class JdbcDatabase(protected val sourceOperations: JdbcCompatibleSource
*/
@Throws(SQLException::class)
abstract fun <T> bufferedResultSetQuery(
query: CheckedFunction<Connection, ResultSet, SQLException?>,
recordTransform: CheckedFunction<ResultSet, T, SQLException?>
query: CheckedFunction<Connection, ResultSet, SQLException>,
recordTransform: CheckedFunction<ResultSet, T, SQLException>
): List<T>

/**
Expand All @@ -85,8 +85,8 @@ abstract class JdbcDatabase(protected val sourceOperations: JdbcCompatibleSource
@MustBeClosed
@Throws(SQLException::class)
abstract fun <T> unsafeResultSetQuery(
query: CheckedFunction<Connection, ResultSet, SQLException?>,
recordTransform: CheckedFunction<ResultSet, T, SQLException?>
query: CheckedFunction<Connection, ResultSet, SQLException>,
recordTransform: CheckedFunction<ResultSet, T, SQLException>
): Stream<T>

/**
Expand All @@ -95,8 +95,8 @@ abstract class JdbcDatabase(protected val sourceOperations: JdbcCompatibleSource
*/
@Throws(SQLException::class)
fun queryStrings(
query: CheckedFunction<Connection, ResultSet, SQLException?>,
recordTransform: CheckedFunction<ResultSet, String, SQLException?>
query: CheckedFunction<Connection, ResultSet, SQLException>,
recordTransform: CheckedFunction<ResultSet, String, SQLException>
): List<String> {
unsafeResultSetQuery(query, recordTransform).use { stream ->
return stream.toList()
Expand All @@ -122,8 +122,8 @@ abstract class JdbcDatabase(protected val sourceOperations: JdbcCompatibleSource
@MustBeClosed
@Throws(SQLException::class)
abstract fun <T> unsafeQuery(
statementCreator: CheckedFunction<Connection, PreparedStatement, SQLException?>,
recordTransform: CheckedFunction<ResultSet, T, SQLException?>
statementCreator: CheckedFunction<Connection, PreparedStatement, SQLException>,
recordTransform: CheckedFunction<ResultSet, T, SQLException>
): Stream<T>

/**
Expand All @@ -132,8 +132,8 @@ abstract class JdbcDatabase(protected val sourceOperations: JdbcCompatibleSource
*/
@Throws(SQLException::class)
fun queryJsons(
statementCreator: CheckedFunction<Connection, PreparedStatement, SQLException?>,
recordTransform: CheckedFunction<ResultSet, JsonNode, SQLException?>
statementCreator: CheckedFunction<Connection, PreparedStatement, SQLException>,
recordTransform: CheckedFunction<ResultSet, JsonNode, SQLException>
): List<JsonNode> {
unsafeQuery(statementCreator, recordTransform).use { stream ->
return stream.toList()
Expand Down Expand Up @@ -224,7 +224,7 @@ abstract class JdbcDatabase(protected val sourceOperations: JdbcCompatibleSource
@MustBeClosed
fun <T> toUnsafeStream(
resultSet: ResultSet,
mapper: CheckedFunction<ResultSet, T, SQLException?>
mapper: CheckedFunction<ResultSet, T, SQLException>
): Stream<T> {
return StreamSupport.stream(
object : AbstractSpliterator<T>(Long.MAX_VALUE, ORDERED) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ class StreamingJdbcDatabase(
@MustBeClosed
@Throws(SQLException::class)
override fun <T> unsafeQuery(
statementCreator: CheckedFunction<Connection, PreparedStatement, SQLException?>,
recordTransform: CheckedFunction<ResultSet, T, SQLException?>
statementCreator: CheckedFunction<Connection, PreparedStatement, SQLException>,
recordTransform: CheckedFunction<ResultSet, T, SQLException>
): Stream<T> {
try {
val connection = dataSource.connection
Expand Down Expand Up @@ -79,7 +79,7 @@ class StreamingJdbcDatabase(
*/
protected fun <T> toUnsafeStream(
resultSet: ResultSet,
mapper: CheckedFunction<ResultSet, T, SQLException?>,
mapper: CheckedFunction<ResultSet, T, SQLException>,
streamingConfig: JdbcStreamingQueryConfig
): Stream<T> {
return StreamSupport.stream(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ class AirbyteExceptionHandler : Thread.UncaughtExceptionHandler {
* 1. Contain the original exception message as the external message, and a mangled message
* as the internal message.
*/
@VisibleForTesting val STRINGS_TO_DEINTERPOLATE: MutableSet<String?> = HashSet()
@VisibleForTesting val STRINGS_TO_DEINTERPOLATE: MutableSet<String> = HashSet()

init {
addCommonStringsToDeinterpolate()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ internal constructor(
private fun readConcurrent(
config: JsonNode,
catalog: ConfiguredAirbyteCatalog,
stateOptional: Optional<JsonNode?>
stateOptional: Optional<JsonNode>
) {
val streams = source!!.readStreams(config, catalog, stateOptional.orElse(null))

Expand Down Expand Up @@ -305,7 +305,7 @@ internal constructor(
private fun readSerial(
config: JsonNode,
catalog: ConfiguredAirbyteCatalog,
stateOptional: Optional<JsonNode?>
stateOptional: Optional<JsonNode>
) {
try {
source!!.read(config, catalog, stateOptional.orElse(null)).use { messageIterator ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,13 @@ class BufferDequeue(
val output: MutableList<StreamAwareQueue.MessageWithMeta> = LinkedList()
while (queue!!.size() > 0) {
val memoryItem:
MemoryBoundedLinkedBlockingQueue.MemoryItem<
StreamAwareQueue.MessageWithMeta?>? =
MemoryBoundedLinkedBlockingQueue.MemoryItem<StreamAwareQueue.MessageWithMeta> =
queue.peek().orElseThrow()

// otherwise pull records until we hit the memory limit.
val newSize: Long = (memoryItem?.size ?: 0) + bytesRead.get()
val newSize: Long = (memoryItem.size) + bytesRead.get()
if (newSize <= optimalBytesToRead) {
memoryItem?.size?.let { bytesRead.addAndGet(it) }
memoryItem.size.let { bytesRead.addAndGet(it) }
queue.poll()?.item?.let { output.add(it) }
} else {
break
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ private val logger = KotlinLogging.logger {}
* @param <E> type in the queue </E>
*/
class MemoryBoundedLinkedBlockingQueue<E>(maxMemoryUsage: Long) {
private val hiddenQueue = HiddenQueue<E?>(maxMemoryUsage)
private val hiddenQueue = HiddenQueue<E>(maxMemoryUsage)

val currentMemoryUsage: Long
get() = hiddenQueue.currentMemoryUsage.get()
Expand All @@ -48,24 +48,24 @@ class MemoryBoundedLinkedBlockingQueue<E>(maxMemoryUsage: Long) {
return hiddenQueue.offer(e, itemSizeInBytes)
}

fun peek(): MemoryItem<E?>? {
fun peek(): MemoryItem<E>? {
return hiddenQueue.peek()
}

@Throws(InterruptedException::class)
fun take(): MemoryItem<E?> {
fun take(): MemoryItem<E> {
return hiddenQueue.take()
}

fun poll(): MemoryItem<E?>? {
fun poll(): MemoryItem<E>? {
return hiddenQueue.poll()
}

@Throws(InterruptedException::class)
fun poll(
timeout: Long,
unit: TimeUnit,
): MemoryItem<E?>? {
): MemoryItem<E>? {
return hiddenQueue.poll(timeout, unit)
}

Expand All @@ -78,7 +78,7 @@ class MemoryBoundedLinkedBlockingQueue<E>(maxMemoryUsage: Long) {
*
* @param <E> </E>
*/
private class HiddenQueue<E>(maxMemoryUsage: Long) : LinkedBlockingQueue<MemoryItem<E>?>() {
private class HiddenQueue<E>(maxMemoryUsage: Long) : LinkedBlockingQueue<MemoryItem<E>>() {
val currentMemoryUsage: AtomicLong = AtomicLong(0)
val maxMemoryUsage: AtomicLong = AtomicLong(maxMemoryUsage)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class StreamAwareQueue(maxMemoryUsage: Long) {
return Optional.ofNullable(timeOfLastMessage.get())
}

fun peek(): Optional<MemoryBoundedLinkedBlockingQueue.MemoryItem<MessageWithMeta?>> {
fun peek(): Optional<MemoryBoundedLinkedBlockingQueue.MemoryItem<MessageWithMeta>> {
return Optional.ofNullable(memoryAwareQueue.peek())
}

Expand All @@ -59,19 +59,19 @@ class StreamAwareQueue(maxMemoryUsage: Long) {
}

@Throws(InterruptedException::class)
fun take(): MemoryBoundedLinkedBlockingQueue.MemoryItem<MessageWithMeta?> {
fun take(): MemoryBoundedLinkedBlockingQueue.MemoryItem<MessageWithMeta> {
return memoryAwareQueue.take()
}

fun poll(): MemoryBoundedLinkedBlockingQueue.MemoryItem<MessageWithMeta?>? {
fun poll(): MemoryBoundedLinkedBlockingQueue.MemoryItem<MessageWithMeta>? {
return memoryAwareQueue.poll()
}

@Throws(InterruptedException::class)
fun poll(
timeout: Long,
unit: TimeUnit,
): MemoryBoundedLinkedBlockingQueue.MemoryItem<MessageWithMeta?>? {
): MemoryBoundedLinkedBlockingQueue.MemoryItem<MessageWithMeta>? {
return memoryAwareQueue.poll(timeout, unit)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ class GlobalAsyncStateManager(private val memoryManager: GlobalMemoryManager) {
var bytesFlushed: Long = 0L
logger.info { "Flushing states" }
synchronized(lock) {
for (entry: Map.Entry<StreamDescriptor, LinkedBlockingDeque<Long>?> in
for (entry: Map.Entry<StreamDescriptor, LinkedBlockingDeque<Long>> in
descToStateIdQ.entries) {
// Remove all states with 0 counters.
// Per-stream synchronized is required to make sure the state (at the head of the
Expand Down Expand Up @@ -196,7 +196,7 @@ class GlobalAsyncStateManager(private val memoryManager: GlobalMemoryManager) {
bytesFlushed += oldestState.second

// cleanup
entry.value!!.poll()
entry.value.poll()
stateIdToState.remove(oldestStateId)
stateIdToCounter.remove(oldestStateId)
stateIdToCounterForPopulatingDestinationStats.remove(oldestStateId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ interface SqlOperations {
* @throws Exception exception
*/
@Throws(Exception::class)
fun createSchemaIfNotExists(database: JdbcDatabase?, schemaName: String?)
fun createSchemaIfNotExists(database: JdbcDatabase?, schemaName: String)

/**
* Denotes whether the schema exists in destination database
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ interface StagingOperations : SqlOperations {
schemaName: String?,
stageName: String?,
stagingPath: String?
): String?
): String

/**
* Load the data stored in the stage area into a temporary table in the destination
Expand All @@ -80,7 +80,7 @@ interface StagingOperations : SqlOperations {
database: JdbcDatabase?,
stageName: String?,
stagingPath: String?,
stagedFiles: List<String?>?,
stagedFiles: List<String>?,
tableName: String?,
schemaName: String?
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ object ApmTraceUtils {
* @param tags A map of tags to be added to the currently active span.
*/
@JvmOverloads
fun addTagsToTrace(tags: Map<String?, Any>, tagPrefix: String? = TAG_PREFIX) {
fun addTagsToTrace(tags: Map<String, Any>, tagPrefix: String? = TAG_PREFIX) {
addTagsToTrace(GlobalTracer.get().activeSpan(), tags, tagPrefix)
}

Expand All @@ -45,10 +45,10 @@ object ApmTraceUtils {
* @param tags A map of tags to be added to the currently active span.
* @param tagPrefix The prefix to be added to each custom tag name.
*/
fun addTagsToTrace(span: Span?, tags: Map<String?, Any>, tagPrefix: String?) {
fun addTagsToTrace(span: Span?, tags: Map<String, Any>, tagPrefix: String?) {
if (span != null) {
tags.entries.forEach(
Consumer { entry: Map.Entry<String?, Any> ->
Consumer { entry: Map.Entry<String, Any> ->
span.setTag(formatTag(entry.key, tagPrefix), entry.value.toString())
}
)
Expand Down Expand Up @@ -83,12 +83,12 @@ object ApmTraceUtils {
*
* @param tags A map of tags to be added to the root span.
*/
fun addTagsToRootSpan(tags: Map<String?, Any>) {
fun addTagsToRootSpan(tags: Map<String, Any>) {
val activeSpan = GlobalTracer.get().activeSpan()
if (activeSpan is MutableSpan) {
val localRootSpan = (activeSpan as MutableSpan).localRootSpan
tags.entries.forEach(
Consumer { entry: Map.Entry<String?, Any> ->
Consumer { entry: Map.Entry<String, Any> ->
localRootSpan.setTag(formatTag(entry.key, TAG_PREFIX), entry.value.toString())
}
)
Expand Down
Loading

0 comments on commit b78c13e

Please sign in to comment.