Skip to content

Commit

Permalink
Work on data handlers to work with coroutines
Browse files Browse the repository at this point in the history
  • Loading branch information
this-Aditya committed Nov 3, 2024
1 parent 981ae83 commit 5f092fb
Show file tree
Hide file tree
Showing 4 changed files with 177 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package org.radarbase.android.data
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.launch
import org.radarbase.kotlin.coroutines.launchJoin
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.io.Closeable
import java.io.File
Expand All @@ -15,15 +16,15 @@ class DataCacheGroup<K: Any, V: Any>(
val topicName: String = activeDataCache.topic.name

@Throws(IOException::class)
fun deleteEmptyCaches() {
suspend fun deleteEmptyCaches() {
val cacheIterator = deprecatedCaches.iterator()
while (cacheIterator.hasNext()) {
val storedCache = cacheIterator.next()
if (storedCache.numberOfRecords > 0) {
if (storedCache.numberOfRecords.value > 0) {
continue
}
cacheIterator.remove()
storedCache.close()
storedCache.stop()
val tapeFile = storedCache.file
if (!tapeFile.delete()) {
logger.warn("Cannot remove old DataCache file " + tapeFile + " for topic " + storedCache.readTopic.name)
Expand Down Expand Up @@ -52,4 +53,8 @@ class DataCacheGroup<K: Any, V: Any>(
activeDataCache.stop()
}
}

companion object {
private val logger: Logger = LoggerFactory.getLogger(DataCacheGroup::class.java)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ interface DataHandler<K: Any, V: Any>: ServerStatusListener {

override val recordsSent: MutableSharedFlow<TopicSendReceipt>

fun <W: V> registerCache(topic: AvroTopic<K, W>, handler: SafeHandler? = null): DataCache<K, W>
suspend fun <W: V> registerCache(topic: AvroTopic<K, W>, handler: SafeHandler? = null): DataCache<K, W>

fun handler(build: DataHandlerConfiguration.() -> Unit)
fun getCache(topic: String): DataCache<*, *>
fun flushCaches(successCallback: () -> Unit, errorCallback: () -> Unit)
suspend fun flushCaches(successCallback: () -> Unit, errorCallback: () -> Unit)
val numberOfRecords: SharedFlow<TableDataHandler.CacheSize>
}
Loading

0 comments on commit 5f092fb

Please sign in to comment.