From 095cfdb3387be70aa328caa948b6753ad5221b36 Mon Sep 17 00:00:00 2001 From: Andrey Sokolov Date: Tue, 22 Aug 2023 20:50:34 +0400 Subject: [PATCH] feat: refactor file storage to fix thread safety issues --- .../android/migration/StorageKeyMigration.kt | 29 +- .../android/utilities/AndroidStorage.kt | 13 +- .../migration/StorageKeyMigrationTest.kt | 44 -- .../main/java/com/amplitude/core/Storage.kt | 2 +- .../core/utilities/EventsFileManager.kt | 249 +++++----- .../core/utilities/FileResponseHandler.kt | 3 - .../amplitude/core/utilities/FileStorage.kt | 14 +- .../core/utilities/InMemoryStorage.kt | 2 +- .../com/amplitude/core/utilities/JSONUtil.kt | 12 +- .../core/utilities/EventsFileManagerTest.kt | 456 ++++++++++++++++++ 10 files changed, 618 insertions(+), 206 deletions(-) create mode 100644 core/src/test/kotlin/com/amplitude/core/utilities/EventsFileManagerTest.kt diff --git a/android/src/main/java/com/amplitude/android/migration/StorageKeyMigration.kt b/android/src/main/java/com/amplitude/android/migration/StorageKeyMigration.kt index 52f7217a..980027c8 100644 --- a/android/src/main/java/com/amplitude/android/migration/StorageKeyMigration.kt +++ b/android/src/main/java/com/amplitude/android/migration/StorageKeyMigration.kt @@ -4,7 +4,6 @@ import com.amplitude.android.utilities.AndroidStorage import com.amplitude.common.Logger import com.amplitude.core.Storage import java.io.File -import java.util.UUID class StorageKeyMigration( private val source: AndroidStorage, @@ -29,22 +28,22 @@ class StorageKeyMigration( for (sourceEventFilePath in sourceEventFiles) { val sourceEventFile = File(sourceEventFilePath) - var destinationEventFile = - File(sourceEventFilePath.replace(source.storageKey, destination.storageKey)) + val destinationFilePath = sourceEventFilePath.replace( + "/${source.storageKey}/", + "/${destination.storageKey}/" + ).replace( + source.eventsFile.id, + destination.eventsFile.id, + ) + val destinationEventFile = File(destinationFilePath) if (destinationEventFile.exists()) { - var fileExtension = destinationEventFile.extension - if (fileExtension != "") { - fileExtension = ".$fileExtension" + logger.error("destination ${destinationEventFile.absoluteFile} already exists") + } else { + try { + sourceEventFile.renameTo(destinationEventFile) + } catch (e: Exception) { + logger.error("can't rename $sourceEventFile to $destinationEventFile: ${e.message}") } - destinationEventFile = File( - destinationEventFile.parent, - "${destinationEventFile.nameWithoutExtension}-${UUID.randomUUID()}$fileExtension" - ) - } - try { - sourceEventFile.renameTo(destinationEventFile) - } catch (e: Exception) { - logger.error("can't rename $sourceEventFile to $destinationEventFile: ${e.message}") } } } catch (e: Exception) { diff --git a/android/src/main/java/com/amplitude/android/utilities/AndroidStorage.kt b/android/src/main/java/com/amplitude/android/utilities/AndroidStorage.kt index b3e24ede..b53d4a4f 100644 --- a/android/src/main/java/com/amplitude/android/utilities/AndroidStorage.kt +++ b/android/src/main/java/com/amplitude/android/utilities/AndroidStorage.kt @@ -13,7 +13,6 @@ import com.amplitude.core.platform.EventPipeline import com.amplitude.core.utilities.EventsFileManager import com.amplitude.core.utilities.EventsFileStorage import com.amplitude.core.utilities.FileResponseHandler -import com.amplitude.core.utilities.JSONUtil import com.amplitude.core.utilities.ResponseHandler import kotlinx.coroutines.CoroutineDispatcher import kotlinx.coroutines.CoroutineScope @@ -34,12 +33,12 @@ class AndroidStorage( internal val sharedPreferences: SharedPreferences = context.getSharedPreferences("${getPrefix()}-$storageKey", Context.MODE_PRIVATE) private val storageDirectory: File = context.getDir(getDir(), Context.MODE_PRIVATE) - private val eventsFile = - EventsFileManager(storageDirectory, storageKey, AndroidKVS(sharedPreferences)) + internal val eventsFile = + EventsFileManager(storageDirectory, storageKey, logger) private val eventCallbacksMap = mutableMapOf() override suspend fun writeEvent(event: BaseEvent) { - eventsFile.storeEvent(JSONUtil.eventToString(event)) + eventsFile.storeEvent(event) event.callback?.let { callback -> event.insertId?.let { eventCallbacksMap.put(it, callback) @@ -67,11 +66,7 @@ class AndroidStorage( return eventsFile.read() } - override fun releaseFile(filePath: String) { - eventsFile.release(filePath) - } - - override suspend fun getEventsString(content: Any): String { + override fun getEventsString(content: Any): String { return eventsFile.getEventString(content as String) } diff --git a/android/src/test/java/com/amplitude/android/migration/StorageKeyMigrationTest.kt b/android/src/test/java/com/amplitude/android/migration/StorageKeyMigrationTest.kt index 8b1e069f..947e97b6 100644 --- a/android/src/test/java/com/amplitude/android/migration/StorageKeyMigrationTest.kt +++ b/android/src/test/java/com/amplitude/android/migration/StorageKeyMigrationTest.kt @@ -145,50 +145,6 @@ class StorageKeyMigrationTest { Assertions.assertEquals(0, destinationEventFiles.size) } - @Test - fun `event files with duplicated names should be migrated`() { - val context = ApplicationProvider.getApplicationContext() - val logger = ConsoleLogger() - - val source = AndroidStorage(context, UUID.randomUUID().toString(), logger, null) - val destination = AndroidStorage(context, UUID.randomUUID().toString(), logger, null) - - runBlocking { - source.writeEvent(createEvent(1)) - source.rollover() - source.writeEvent(createEvent(22)) - source.rollover() - } - - val sourceEventFiles = source.readEventsContent() as List - Assertions.assertEquals(2, sourceEventFiles.size) - - val sourceFileSizes = sourceEventFiles.map { File(it).length() } - - runBlocking { - destination.writeEvent(createEvent(333)) - destination.rollover() - } - - var destinationEventFiles = destination.readEventsContent() as List - Assertions.assertEquals(1, destinationEventFiles.size) - - val destinationFileSizes = destinationEventFiles.map { File(it).length() } - - val migration = StorageKeyMigration(source, destination, logger) - runBlocking { - migration.execute() - } - - destinationEventFiles = destination.readEventsContent() as List - Assertions.assertEquals("-0", destinationEventFiles[0].substring(destinationEventFiles[0].length - 2)) - Assertions.assertTrue(destinationEventFiles[1].contains("-0-")) - Assertions.assertEquals("-1", destinationEventFiles[2].substring(destinationEventFiles[0].length - 2)) - Assertions.assertEquals(destinationFileSizes[0], File(destinationEventFiles[0]).length()) - Assertions.assertEquals(sourceFileSizes[0], File(destinationEventFiles[1]).length()) - Assertions.assertEquals(sourceFileSizes[1], File(destinationEventFiles[2]).length()) - } - private fun createEvent(eventIndex: Int): BaseEvent { val event = BaseEvent() event.eventType = "event-$eventIndex" diff --git a/core/src/main/java/com/amplitude/core/Storage.kt b/core/src/main/java/com/amplitude/core/Storage.kt index a18e94be..7f8993f5 100644 --- a/core/src/main/java/com/amplitude/core/Storage.kt +++ b/core/src/main/java/com/amplitude/core/Storage.kt @@ -30,7 +30,7 @@ interface Storage { fun readEventsContent(): List - suspend fun getEventsString(content: Any): String + fun getEventsString(content: Any): String fun getResponseHandler(eventPipeline: EventPipeline, configuration: Configuration, scope: CoroutineScope, dispatcher: CoroutineDispatcher): ResponseHandler } diff --git a/core/src/main/java/com/amplitude/core/utilities/EventsFileManager.kt b/core/src/main/java/com/amplitude/core/utilities/EventsFileManager.kt index d0828e61..38353882 100644 --- a/core/src/main/java/com/amplitude/core/utilities/EventsFileManager.kt +++ b/core/src/main/java/com/amplitude/core/utilities/EventsFileManager.kt @@ -1,84 +1,81 @@ package com.amplitude.core.utilities -import com.amplitude.id.utilities.KeyValueStore +import com.amplitude.common.Logger +import com.amplitude.core.events.BaseEvent import com.amplitude.id.utilities.createDirectory import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock import org.json.JSONArray -import java.io.BufferedReader +import org.json.JSONException +import org.json.JSONObject import java.io.File -import java.io.FileOutputStream -import java.util.Collections -import java.util.concurrent.ConcurrentHashMap +// Supports 2 event file versions: +// * version 1: +// file path: BASEDIR/STORAGEKEY-INDEX, for example: BASEDIR/$default_instance-0, BASEDIR/$default_instance-1.tmp +// internal format: single-line JSON event array +// * version 2 (current): +// file path: BASEDIR/STORAGEKEY/TIMESTAMP-STORAGEID, for example: BASEDIR/$default_instance/1692717940123-abcdefg_hjklmno, BASEDIR/$default_instance/1692718000555-abcdefg_hjklmno.tmp +// internal format: each line is a JSON event class EventsFileManager( - private val directory: File, + directory: File, private val storageKey: String, - private val kvs: KeyValueStore + private val logger: Logger, + private val getCurrentTimestamp: () -> Long = System::currentTimeMillis ) { + private val directory = directory.resolve(storageKey) + val id = "${ID}_${generateRandomString(INSTANCE_ID_LENGTH)}" + private val idRegex = "-$id($|\\.|-)".toRegex() + private var lastUsedTimestamp: Long = -1 + init { - createDirectory(directory) + createDirectory(this.directory) + attachPreviousV1Files() + attachPreviousV2Files() } - private val fileIndexKey = "amplitude.events.file.index.$storageKey" - companion object { const val MAX_FILE_SIZE = 975_000 // 975KB + private const val ID_LENGTH = 7 + private const val INSTANCE_ID_LENGTH = 7 + const val TIMESTAMP_SIZE = 13 // Size of unix timestamp in milliseconds. + + // Static part of storage Id - changed on each app run. + val ID = generateRandomString(ID_LENGTH) + + private fun generateRandomString(length: Int): String { + val allowedChars = ('a'..'z') + ('0'..'9') + return (1..length) + .map { allowedChars.random() } + .joinToString("") + } } - val writeMutex = Mutex() - val readMutex = Mutex() - val filePathSet: MutableSet = Collections.newSetFromMap(ConcurrentHashMap()) - val curFile: MutableMap = ConcurrentHashMap() + private val writeMutex = Mutex() + private var currentFile: File? = null /** - * closes existing file, if at capacity - * opens a new file, if current file is full or uncreated * stores the event */ - suspend fun storeEvent(event: String) = writeMutex.withLock { - var file = currentFile() - if (!file.exists()) { - // create it - file.createNewFile() - } - - // check if file is at capacity - while (file.length() > MAX_FILE_SIZE) { + suspend fun storeEvent(event: BaseEvent) = writeMutex.withLock { + val file = currentFile() + file.appendText("${JSONUtil.eventToString(event)}\n") + if (file.length() > MAX_FILE_SIZE) { finish(file) - // update index - file = currentFile() - if (!file.exists()) { - // create it - file.createNewFile() - } + currentFile = null } - - var contents = "" - if (file.length() == 0L) { - start(file) - } else if (file.length() > 1) { - contents += "," - } - contents += event - writeToFile(contents.toByteArray(), file) - } - - private fun incrementFileIndex(): Boolean { - val index = kvs.getLong(fileIndexKey, 0) - return kvs.putLong(fileIndexKey, index + 1) } /** * Returns a comma-separated list of file paths that are not yet uploaded */ fun read(): List { - // we need to filter out .temp file, since it's operating on the writing thread + // we need to filter out .tmp file, since it's operating on the writing thread val fileList = directory.listFiles { _, name -> - name.contains(storageKey) && !name.endsWith(".tmp") + idRegex.containsMatchIn(name) && !name.endsWith(".tmp") } ?: emptyArray() return fileList.sortedBy { it -> - getSortKeyForFile(it) + it.name }.map { it.absolutePath } @@ -88,24 +85,17 @@ class EventsFileManager( * deletes the file at filePath */ fun remove(filePath: String): Boolean { - filePathSet.remove(filePath) return File(filePath).delete() } - private fun start(file: File) { - // start batch object and events array - val contents = """[""" - writeToFile(contents.toByteArray(), file) - } - /** - * closes current file, and increase the index - * so next write go to a new file + * closes current file */ suspend fun rollover() = writeMutex.withLock { - val file = currentFile() - if (file.exists() && file.length() > 0) { + val file = this.currentFile + if (file != null) { finish(file) + currentFile = null } } @@ -119,85 +109,110 @@ class EventsFileManager( return } val fileName = originalFile.name - val firstHalfFile = File(directory, "$fileName-1.tmp") - val secondHalfFile = File(directory, "$fileName-2.tmp") + val firstHalfFile = File(originalFile.parent, "$fileName-1.tmp") + val secondHalfFile = File(originalFile.parent, "$fileName-2.tmp") val splitStrings = events.split() - writeToFile(splitStrings.first, firstHalfFile) - writeToFile(splitStrings.second, secondHalfFile) + writeEventsToFile(splitStrings.first, firstHalfFile) + writeEventsToFile(splitStrings.second, secondHalfFile) + this.finish(firstHalfFile) + this.finish(secondHalfFile) this.remove(filePath) } - suspend fun getEventString(filePath: String): String = readMutex.withLock { - // Block one time of file reads if another task has read the content of this file - if (filePathSet.contains(filePath)) { - filePathSet.remove(filePath) - return "" - } - filePathSet.add(filePath) - File(filePath).bufferedReader().use { - return it.readText() + fun getEventString(filePath: String): String { + val content = File(filePath).readText() + val isV1Content = content.startsWith("[") || content.endsWith("]") || content.endsWith(",") + if (isV1Content) { + val normalizedContent = "[${content.trimStart('[').trimEnd(']', ',')}]" + return try { + JSONArray(normalizedContent) + normalizedContent + } catch (e: JSONException) { + logger.error("can't parse json events $normalizedContent: ${e.localizedMessage}") + "" + } + } else { + val events = JSONArray() + val lines = content.split('\n') + lines.forEach { + if (it != "") { + try { + val event = JSONObject(it) + events.put(event) + } catch (e: JSONException) { + logger.error("can't parse json event $it: ${e.localizedMessage}") + // skip invalid event + } + } + } + return if (events.length() > 0) events.toString() else "" } } - fun release(filePath: String) { - filePathSet.remove(filePath) - } - - private fun finish(file: File?) { - if (file == null || !file.exists() || file.length() == 0L) { - // if tmp file doesn't exist or empty then we don't need to do anything - return + private fun finish(file: File) { + if (file.exists()) { + file.renameTo(File(file.parent, file.nameWithoutExtension)) } - // close events array and batch object - val contents = """]""" - writeToFile(contents.toByteArray(), file) - file.renameTo(File(directory, file.nameWithoutExtension)) - incrementFileIndex() - reset() } // return the current tmp file private fun currentFile(): File { - val file = curFile[storageKey] ?: run { - // check leftover tmp file - val fileList = directory.listFiles { _, name -> - name.contains(storageKey) && name.endsWith(".tmp") - } ?: emptyArray() - - fileList.getOrNull(0) + var file = currentFile + if (file != null) { + return file } - val index = kvs.getLong(fileIndexKey, 0) - curFile[storageKey] = file ?: File(directory, "$storageKey-$index.tmp") - return curFile[storageKey]!! - } - private fun getSortKeyForFile(file: File): String { - val name = file.nameWithoutExtension.replace("$storageKey-", "") - val dashIndex = name.indexOf('-') - if (dashIndex >= 0) { - return name.substring(0, dashIndex).padStart(10, '0') + name.substring(dashIndex) + var timestamp = getCurrentTimestamp() + // Timestamps should be unique. + if (timestamp <= lastUsedTimestamp) { + timestamp = lastUsedTimestamp + 1 } - return name + lastUsedTimestamp = timestamp + file = File(directory, "${timestamp.toString().padStart(TIMESTAMP_SIZE, '0')}-$id.tmp") + this.currentFile = file + return file } - // write to underlying file - private fun writeToFile(content: ByteArray, file: File) { - FileOutputStream(file, true).use { - it.write(content) - it.flush() - } + private fun writeEventsToFile(events: List, file: File) { + val content = events.joinToString("\n", postfix = "\n") { it.toString() } + file.writeText(content) } - private fun writeToFile(content: String, file: File) { - file.createNewFile() - FileOutputStream(file).use { - it.write(content.toByteArray()) - it.flush() + // Rename (change id part) and move version 1 files from parent directory (from previous app runs). + private fun attachPreviousV1Files() { + val fileList = directory.parentFile.listFiles { _, name -> + name.startsWith("$storageKey-") + } ?: emptyArray() + fileList.forEach { + val name = it.name.removePrefix("$storageKey-").removeSuffix(".tmp") + val nameParts = name.split("-").toMutableList() + nameParts[0] = nameParts[0].padStart(TIMESTAMP_SIZE, '0') + nameParts.add(1, id) + val newName = nameParts.joinToString("-") + try { + it.renameTo(File(directory, newName)) + } catch (e: Exception) { + logger.error("can't rename ${it.absolutePath} to $newName: ${e.localizedMessage}") + } } - file.renameTo(File(directory, file.nameWithoutExtension)) } - private fun reset() { - curFile.remove(storageKey) + // Rename (change id part) version 2 files (from previous app runs). + private fun attachPreviousV2Files() { + val previousFiles = directory.listFiles { _, name -> + !name.contains("-$ID-") + } ?: emptyArray() + previousFiles.forEach { + val nameParts = it.name.removeSuffix(".tmp").split("-").toMutableList() + if (nameParts.size > 1) { + nameParts[1] = id + } + val newName = nameParts.joinToString("-") + try { + it.renameTo(File(directory, newName)) + } catch (e: Exception) { + logger.error("can't rename ${it.absolutePath} to $newName: ${e.localizedMessage}") + } + } } } diff --git a/core/src/main/java/com/amplitude/core/utilities/FileResponseHandler.kt b/core/src/main/java/com/amplitude/core/utilities/FileResponseHandler.kt index ebbbd3ee..5a460a7b 100644 --- a/core/src/main/java/com/amplitude/core/utilities/FileResponseHandler.kt +++ b/core/src/main/java/com/amplitude/core/utilities/FileResponseHandler.kt @@ -119,14 +119,12 @@ class FileResponseHandler( override fun handleTooManyRequestsResponse(tooManyRequestsResponse: TooManyRequestsResponse, events: Any, eventsString: String) { logger?.debug("Handle response, status: ${tooManyRequestsResponse.status}, error: ${tooManyRequestsResponse.error}") // trigger exponential backoff - storage.releaseFile(events as String) triggerBackOff(true) } override fun handleTimeoutResponse(timeoutResponse: TimeoutResponse, events: Any, eventsString: String) { logger?.debug("Handle response, status: ${timeoutResponse.status}") // trigger exponential backoff - storage.releaseFile(events as String) triggerBackOff(true) } @@ -134,7 +132,6 @@ class FileResponseHandler( logger?.debug("Handle response, status: ${failedResponse.status}, error: ${failedResponse.error}") // wait for next time to try again // trigger exponential backoff - storage.releaseFile(events as String) triggerBackOff(true) } diff --git a/core/src/main/java/com/amplitude/core/utilities/FileStorage.kt b/core/src/main/java/com/amplitude/core/utilities/FileStorage.kt index 3b978ad6..a15bce3a 100644 --- a/core/src/main/java/com/amplitude/core/utilities/FileStorage.kt +++ b/core/src/main/java/com/amplitude/core/utilities/FileStorage.kt @@ -28,7 +28,7 @@ class FileStorage( private val storageDirectoryEvents = File(storageDirectory, "events") private val propertiesFile = PropertiesFile(storageDirectory, storageKey, getPrefix(), null) - private val eventsFile = EventsFileManager(storageDirectoryEvents, storageKey, propertiesFile) + private val eventsFile = EventsFileManager(storageDirectoryEvents, storageKey, logger) private val eventCallbacksMap = mutableMapOf() init { @@ -36,7 +36,7 @@ class FileStorage( } override suspend fun writeEvent(event: BaseEvent) { - eventsFile.storeEvent(JSONUtil.eventToString(event)) + eventsFile.storeEvent(event) event.callback?.let { callback -> event.insertId?. let { eventCallbacksMap.put(it, callback) @@ -65,11 +65,7 @@ class FileStorage( return eventsFile.read() } - override fun releaseFile(filePath: String) { - eventsFile.release(filePath) - } - - override suspend fun getEventsString(content: Any): String { + override fun getEventsString(content: Any): String { // content is filePath String return eventsFile.getEventString(content as String) } @@ -132,9 +128,7 @@ interface EventsFileStorage { fun readEventsContent(): List - fun releaseFile(filePath: String) - - suspend fun getEventsString(content: Any): String + fun getEventsString(content: Any): String suspend fun rollover() } diff --git a/core/src/main/java/com/amplitude/core/utilities/InMemoryStorage.kt b/core/src/main/java/com/amplitude/core/utilities/InMemoryStorage.kt index ca2ed2ea..de2d4970 100644 --- a/core/src/main/java/com/amplitude/core/utilities/InMemoryStorage.kt +++ b/core/src/main/java/com/amplitude/core/utilities/InMemoryStorage.kt @@ -47,7 +47,7 @@ class InMemoryStorage : Storage { return listOf(eventsToSend) } - override suspend fun getEventsString(content: Any): String { + override fun getEventsString(content: Any): String { // content is list of BaseEvent return JSONUtil.eventsToString(content as List) } diff --git a/core/src/main/java/com/amplitude/core/utilities/JSONUtil.kt b/core/src/main/java/com/amplitude/core/utilities/JSONUtil.kt index 87c2791a..800f45f2 100644 --- a/core/src/main/java/com/amplitude/core/utilities/JSONUtil.kt +++ b/core/src/main/java/com/amplitude/core/utilities/JSONUtil.kt @@ -215,18 +215,18 @@ fun JSONArray.toEvents(): List { return events } -internal fun JSONArray.split(): Pair { +internal fun JSONArray.split(): Pair, List> { val mid = this.length() / 2 - val firstHalf = JSONArray() - val secondHalf = JSONArray() + val firstHalf = mutableListOf() + val secondHalf = mutableListOf() (0 until this.length()).forEach { index, -> if (index < mid) { - firstHalf.put(this.getJSONObject(index)) + firstHalf.add(this.getJSONObject(index)) } else { - secondHalf.put(this.getJSONObject(index)) + secondHalf.add(this.getJSONObject(index)) } } - return Pair(firstHalf.toString(), secondHalf.toString()) + return Pair(firstHalf, secondHalf) } internal fun JSONObject.addValue(key: String, value: Any?) { diff --git a/core/src/test/kotlin/com/amplitude/core/utilities/EventsFileManagerTest.kt b/core/src/test/kotlin/com/amplitude/core/utilities/EventsFileManagerTest.kt new file mode 100644 index 00000000..6c1b767b --- /dev/null +++ b/core/src/test/kotlin/com/amplitude/core/utilities/EventsFileManagerTest.kt @@ -0,0 +1,456 @@ +package com.amplitude.core.utilities + +import com.amplitude.common.jvm.ConsoleLogger +import com.amplitude.core.events.BaseEvent +import kotlinx.coroutines.runBlocking +import org.json.JSONArray +import org.junit.jupiter.api.Assertions +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.io.TempDir +import java.io.File +import kotlin.concurrent.thread + +class EventsFileManagerTest { + @TempDir + lateinit var root: File + + @Test + fun `events should be stored and read`() { + val storageKey = "\$default" + var timestamp: Long = 0 + val manager = EventsFileManager(root, storageKey, ConsoleLogger()) { timestamp } + runBlocking { + timestamp = 100 + manager.storeEvent(createEvent(1)) + manager.rollover() + timestamp = 200 + manager.storeEvent(createEvent(2)) + manager.storeEvent(createEvent(3)) + manager.rollover() + timestamp = 300 + manager.storeEvent(createEvent(4)) + } + val filePaths = manager.read() + Assertions.assertEquals(2, filePaths.size) + filePaths.withIndex().forEach { + val file = File(it.value) + Assertions.assertEquals(root.resolve(storageKey).absolutePath, file.parentFile.absolutePath) + Assertions.assertEquals("0000000000${(it.index + 1) * 100}-${manager.id}", file.name) + } + + val eventsString1 = manager.getEventString(filePaths[0]) + val events1 = JSONArray(eventsString1) + Assertions.assertEquals(1, events1.length()) + Assertions.assertEquals("event-1", events1.getJSONObject(0).getString("event_type")) + + val eventsString2 = manager.getEventString(filePaths[1]) + val events2 = JSONArray(eventsString2) + Assertions.assertEquals(2, events2.length()) + Assertions.assertEquals("event-2", events2.getJSONObject(0).getString("event_type")) + Assertions.assertEquals("event-3", events2.getJSONObject(1).getString("event_type")) + } + + @Test + fun `rollover should finish current non-empty temp file`() { + val storageKey = "\$default" + val manager = EventsFileManager(root, storageKey, ConsoleLogger()) + runBlocking { + manager.storeEvent(createEvent(1)) + } + var filePaths = manager.read() + Assertions.assertEquals(0, filePaths.size) + + runBlocking { + manager.rollover() + } + + filePaths = manager.read() + Assertions.assertEquals(1, filePaths.size) + } + + @Test + fun `rollover should ignore current empty temp file`() { + val storageKey = "\$default" + val manager = EventsFileManager(root, storageKey, ConsoleLogger()) + var filePaths = manager.read() + Assertions.assertEquals(0, filePaths.size) + + runBlocking { + manager.rollover() + } + + filePaths = manager.read() + Assertions.assertEquals(0, filePaths.size) + } + + @Test + fun `remove should delete a file`() { + val storageKey = "\$default" + val manager = EventsFileManager(root, storageKey, ConsoleLogger()) + runBlocking { + manager.storeEvent(createEvent(1)) + manager.rollover() + } + + var filePaths = manager.read() + Assertions.assertEquals(1, filePaths.size) + + manager.remove(filePaths[0]) + + filePaths = manager.read() + Assertions.assertEquals(0, filePaths.size) + } + + @Test + fun `previous event files (version 2) should be attached to current manager`() { + val storageKey = "\$default" + val storageDir = root.resolve(storageKey) + storageDir.mkdir() + val previousFile1 = File(storageDir, "1000000000001-abcd_xyz") + previousFile1.appendText("{\"event_type\": \"event-1\"}\n") + previousFile1.appendText("{\"event_type\": \"event-2\"}\n") + val previousFile2 = File(storageDir, "1000000000002-abcd_xyz.tmp") + previousFile2.appendText("{\"event_type\": \"event-3\"}\n") + + val manager = EventsFileManager(root, storageKey, ConsoleLogger()) + + val filePaths = manager.read() + Assertions.assertEquals(2, filePaths.size) + filePaths.forEach { + val file = File(it) + Assertions.assertEquals(root.resolve(storageKey).absolutePath, file.parentFile.absolutePath) + } + Assertions.assertEquals("1000000000001-${manager.id}", File(filePaths[0]).name) + Assertions.assertEquals("1000000000002-${manager.id}", File(filePaths[1]).name) + + val eventsString1 = manager.getEventString(filePaths[0]) + val events1 = JSONArray(eventsString1) + Assertions.assertEquals(2, events1.length()) + Assertions.assertEquals("event-1", events1.getJSONObject(0).getString("event_type")) + Assertions.assertEquals("event-2", events1.getJSONObject(1).getString("event_type")) + + val eventsString2 = manager.getEventString(filePaths[1]) + val events2 = JSONArray(eventsString2) + Assertions.assertEquals(1, events2.length()) + Assertions.assertEquals("event-3", events2.getJSONObject(0).getString("event_type")) + } + + @Test + fun `previous event files (version 1) should be attached to current manager`() { + val storageKey = "\$default" + val previousFile1 = File(root, "$storageKey-1") + previousFile1.writeText("[{\"event_type\": \"event-1\"},{\"event_type\": \"event-2\"}]") + val previousFile2 = File(root, "$storageKey-3.tmp") + previousFile2.writeText("[{\"event_type\": \"event-3\"},") + + val manager = EventsFileManager(root, storageKey, ConsoleLogger()) + + val filePaths = manager.read() + Assertions.assertEquals(2, filePaths.size) + filePaths.forEach { + val file = File(it) + Assertions.assertEquals(root.resolve(storageKey).absolutePath, file.parentFile.absolutePath) + } + Assertions.assertEquals("0000000000001-${manager.id}", File(filePaths[0]).name) + Assertions.assertEquals("0000000000003-${manager.id}", File(filePaths[1]).name) + + val eventsString1 = manager.getEventString(filePaths[0]) + val events1 = JSONArray(eventsString1) + Assertions.assertEquals(2, events1.length()) + Assertions.assertEquals("event-1", events1.getJSONObject(0).getString("event_type")) + Assertions.assertEquals("event-2", events1.getJSONObject(1).getString("event_type")) + + val eventsString2 = manager.getEventString(filePaths[1]) + val events2 = JSONArray(eventsString2) + Assertions.assertEquals(1, events2.length()) + Assertions.assertEquals("event-3", events2.getJSONObject(0).getString("event_type")) + } + + @Test + fun `event files should not exceed max size`() { + val storageKey = "\$default" + var timestamp: Long = 0 + val manager = EventsFileManager(root, storageKey, ConsoleLogger()) { timestamp } + val eventCount = EventsFileManager.MAX_FILE_SIZE / 1000 + runBlocking { + for (i in 1..eventCount) { + timestamp = (100 * i).toLong() + manager.storeEvent(createEvent(i, 1000)) + } + manager.rollover() + } + val filePaths = manager.read() + Assertions.assertEquals(2, filePaths.size) + filePaths.withIndex().forEach { + val file = File(it.value) + Assertions.assertEquals(root.resolve(storageKey).absolutePath, file.parentFile.absolutePath) + } + Assertions.assertEquals("0000000000100-${manager.id}", File(filePaths[0]).name) + Assertions.assertEquals("0000000087300-${manager.id}", File(filePaths[1]).name) + + val eventsString1 = manager.getEventString(filePaths[0]) + val events1 = JSONArray(eventsString1) + val eventsString2 = manager.getEventString(filePaths[1]) + val events2 = JSONArray(eventsString2) + Assertions.assertEquals(eventCount, events1.length() + events2.length()) + } + + @Test + fun `event files should be split`() { + val storageKey = "\$default" + val timestamp: Long = 100 + val manager = EventsFileManager(root, storageKey, ConsoleLogger()) { timestamp } + val eventCount = 11 + val events = (1..eventCount).map { createEvent(it) } + runBlocking { + events.forEach { manager.storeEvent(it) } + manager.rollover() + } + var filePaths = manager.read() + Assertions.assertEquals(1, filePaths.size) + val originalFile = File(filePaths[0]) + Assertions.assertEquals("0000000000100-${manager.id}", originalFile.name) + Assertions.assertTrue(originalFile.exists()) + + val jsonEvents = JSONArray(events.map { JSONUtil.eventToJsonObject(it) }) + manager.splitFile(filePaths[0], jsonEvents) + + filePaths = manager.read() + Assertions.assertEquals(2, filePaths.size) + Assertions.assertEquals("0000000000100-${manager.id}-1", File(filePaths[0]).name) + Assertions.assertEquals("0000000000100-${manager.id}-2", File(filePaths[1]).name) + + val eventsString1 = manager.getEventString(filePaths[0]) + val events1 = JSONArray(eventsString1) + Assertions.assertEquals(eventCount / 2, events1.length()) + val eventsString2 = manager.getEventString(filePaths[1]) + val events2 = JSONArray(eventsString2) + Assertions.assertEquals(eventCount - eventCount / 2, events2.length()) + } + + @Test + fun `recoverable corrupted event files (version 1) should be read`() { + val storageKey = "\$default" + val manager = EventsFileManager(root, storageKey, ConsoleLogger()) + val storageDirectory = root.resolve(storageKey) + storageDirectory.resolve("0000000000100-${manager.id}").writeText("[[{\"event_type\": \"event-1\"}]") + storageDirectory.resolve("0000000000200-${manager.id}").writeText("[{\"event_type\": \"event-2\"},{\"event_type\": \"event-3\"}]]") + storageDirectory.resolve("0000000000300-${manager.id}").writeText("[[{\"event_type\": \"event-4\"},,") + + val filePaths = manager.read() + Assertions.assertEquals(3, filePaths.size) + filePaths.withIndex().forEach { + val file = File(it.value) + Assertions.assertEquals("0000000000${(it.index + 1) * 100}-${manager.id}", file.name) + } + + val eventsString1 = manager.getEventString(filePaths[0]) + val events1 = JSONArray(eventsString1) + Assertions.assertEquals(1, events1.length()) + Assertions.assertEquals("event-1", events1.getJSONObject(0).getString("event_type")) + + val eventsString2 = manager.getEventString(filePaths[1]) + val events2 = JSONArray(eventsString2) + Assertions.assertEquals(2, events2.length()) + Assertions.assertEquals("event-2", events2.getJSONObject(0).getString("event_type")) + Assertions.assertEquals("event-3", events2.getJSONObject(1).getString("event_type")) + + val eventsString3 = manager.getEventString(filePaths[2]) + val events3 = JSONArray(eventsString3) + Assertions.assertEquals(1, events3.length()) + Assertions.assertEquals("event-4", events3.getJSONObject(0).getString("event_type")) + } + + @Test + fun `unrecoverable corrupted event files (version 1) should be skipped`() { + val storageKey = "\$default" + val manager = EventsFileManager(root, storageKey, ConsoleLogger()) + val storageDirectory = root.resolve(storageKey) + storageDirectory.resolve("0000000000100-${manager.id}").writeText("[[{\"event_type\": \"event-1\"") + storageDirectory.resolve("0000000000200-${manager.id}").writeText("[{\"event_type\": \"event-2\"},{\"event_type\": \"event-3\"]]") + storageDirectory.resolve("0000000000300-${manager.id}").writeText("[[{\"event_type\": \"event-4\",") + + val filePaths = manager.read() + Assertions.assertEquals(3, filePaths.size) + filePaths.withIndex().forEach { + val file = File(it.value) + Assertions.assertEquals("0000000000${(it.index + 1) * 100}-${manager.id}", file.name) + } + + val eventsString1 = manager.getEventString(filePaths[0]) + Assertions.assertTrue(eventsString1.isEmpty()) + + val eventsString2 = manager.getEventString(filePaths[1]) + Assertions.assertTrue(eventsString2.isEmpty()) + + val eventsString3 = manager.getEventString(filePaths[2]) + Assertions.assertTrue(eventsString3.isEmpty()) + } + + @Test + fun `corrupted events (version 2) should be skipped`() { + val storageKey = "\$default" + val manager = EventsFileManager(root, storageKey, ConsoleLogger()) + val storageDirectory = root.resolve(storageKey) + storageDirectory.resolve("0000000000100-${manager.id}").writeText("{\"event_type\": \"event-1\"") + storageDirectory.resolve("0000000000200-${manager.id}").writeText("{\"event_type\": \"event-2\"}\n{\"event_type\": \"event-3}") + storageDirectory.resolve("0000000000300-${manager.id}").writeText("{\"event_type\": \"event-4\"\n{\"event_type\": \"event-5\"}") + + val filePaths = manager.read() + Assertions.assertEquals(3, filePaths.size) + filePaths.withIndex().forEach { + val file = File(it.value) + Assertions.assertEquals("0000000000${(it.index + 1) * 100}-${manager.id}", file.name) + } + + val eventsString1 = manager.getEventString(filePaths[0]) + Assertions.assertTrue(eventsString1.isEmpty()) + + val eventsString2 = manager.getEventString(filePaths[1]) + val events2 = JSONArray(eventsString2) + Assertions.assertEquals(1, events2.length()) + Assertions.assertEquals("event-2", events2.getJSONObject(0).getString("event_type")) + + val eventsString3 = manager.getEventString(filePaths[2]) + val events3 = JSONArray(eventsString3) + Assertions.assertEquals(1, events3.length()) + Assertions.assertEquals("event-5", events3.getJSONObject(0).getString("event_type")) + } + + @Test + fun `concurrent writes to the same manager should not corrupt events`() { + val storageKey = "\$default" + val manager = EventsFileManager(root, storageKey, ConsoleLogger()) + val eventCount = 50_000 + val eventCount1 = eventCount / 2 + val eventCount2 = eventCount - eventCount / 2 + + val thread1 = thread { + for (i in 1..eventCount1) { + runBlocking { + manager.storeEvent(createEvent(i, prefix = "thread1-")) + } + } + runBlocking { + manager.rollover() + } + } + + val thread2 = thread { + for (i in 1..eventCount2) { + runBlocking { + manager.storeEvent(createEvent(i, prefix = "thread2-")) + } + } + runBlocking { + manager.rollover() + } + } + + thread1.join() + thread2.join() + + val filePaths = manager.read() + var readEventCount1 = 0 + var readEventCount2 = 0 + filePaths.forEach { + val eventString = manager.getEventString(it) + val events = JSONArray(eventString) + for (i in 0 until events.length()) { + val event = events.getJSONObject(i) + val eventType = event.getString("event_type") + if (eventType.startsWith("thread1-")) { + readEventCount1++ + } else if (eventType.startsWith("thread2-")) { + readEventCount2++ + } + } + } + + Assertions.assertEquals(eventCount1, readEventCount1) + Assertions.assertEquals(eventCount2, readEventCount2) + } + + @Test + fun `concurrent writes to the same directory should not corrupt events`() { + val storageKey = "\$default" + val eventCount1 = 50_000 + val manager1 = EventsFileManager(root, storageKey, ConsoleLogger()) + val eventCount2 = 30_000 + val manager2 = EventsFileManager(root, storageKey, ConsoleLogger()) + + val thread1 = thread { + for (i in 1..eventCount1) { + runBlocking { + manager1.storeEvent(createEvent(i, prefix = "thread1-")) + } + } + runBlocking { + manager1.rollover() + } + } + + val thread2 = thread { + for (i in 1..eventCount2) { + runBlocking { + manager2.storeEvent(createEvent(i, prefix = "thread2-")) + } + } + runBlocking { + manager2.rollover() + } + } + + thread1.join() + thread2.join() + + var filePaths = manager1.read() + var readEventCount1 = 0 + var readEventCount2 = 0 + filePaths.forEach { + val eventString = manager1.getEventString(it) + val events = JSONArray(eventString) + for (i in 0 until events.length()) { + val event = events.getJSONObject(i) + val eventType = event.getString("event_type") + if (eventType.startsWith("thread1-")) { + readEventCount1++ + } else if (eventType.startsWith("thread2-")) { + readEventCount2++ + } + } + } + + Assertions.assertEquals(eventCount1, readEventCount1) + Assertions.assertEquals(0, readEventCount2) + + filePaths = manager2.read() + readEventCount1 = 0 + readEventCount2 = 0 + filePaths.forEach { + val eventString = manager2.getEventString(it) + val events = JSONArray(eventString) + for (i in 0 until events.length()) { + val event = events.getJSONObject(i) + val eventType = event.getString("event_type") + if (eventType.startsWith("thread1-")) { + readEventCount1++ + } else if (eventType.startsWith("thread2-")) { + readEventCount2++ + } + } + } + + Assertions.assertEquals(0, readEventCount1) + Assertions.assertEquals(eventCount2, readEventCount2) + } + + private fun createEvent(eventIndex: Int, propertySize: Int? = null, prefix: String = ""): BaseEvent { + val event = BaseEvent() + event.eventType = "${prefix}event-$eventIndex" + if (propertySize != null) { + event.eventProperties = mutableMapOf("property-1" to "a".repeat(propertySize)) + } + return event + } +}