Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: refactor file storage to fix thread safety issues #144

Closed
wants to merge 1 commit into from

Conversation

falconandy
Copy link
Contributor

@falconandy falconandy commented Aug 22, 2023

Summary

Refactored file storage to fix thread safety issues.

  1. Added unique part to file names (timestamp, file manager identifier) to eliminate possible collisions between different file manager instances.
  2. Changed internal file format to use json-event-per-line structure instead of single-line-with-json-event-array.
  3. Added a few unit tests to cover file manager logic.

Checklist

  • Does your PR title have the correct title format?
  • Does your PR have a breaking change?: No

@falconandy falconandy marked this pull request as ready for review August 22, 2023 17:06
Base automatically changed from AMP-82679-storage-should-use-instance-name to main August 22, 2023 23:19
@falconandy falconandy force-pushed the AMP-80046-thread-safe-file-storage branch from 51229e4 to 095cfdb Compare August 23, 2023 17:19
val readMutex = Mutex()
val filePathSet: MutableSet<String> = Collections.newSetFromMap(ConcurrentHashMap<String, Boolean>())
val curFile: MutableMap<String, File> = ConcurrentHashMap<String, File>()
private val writeMutex = Mutex()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @falconandy.

I was hoping that we could resolve the issue by update the mutexs shared a single instance for a given instanceName rather than being isolated to each storage instance.

I would like to avoid changing the storage format or file structure if possible.

Some ideas

  1. getMutex(directory, storageKey): Mutex a provider to create a mutex, or return existing mutex for this directory/storageKey. This could allow multiple file managers to access the same files without concurrency issues.
  2. getEventsFileManager(): EventFileManager a provider for the EventsFileManager instance used in the FileStorage, that could ensure a single instance of EventsFileManager for a given directory, storageKey. This may be less code, but also less ideal because logger and other constructor args would likely need to be a single instance as well.

wdyt?

Copy link
Contributor Author

@falconandy falconandy Sep 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@justin-fiedler

Current format: single line with json array. If an app crashes/terminates for some reason, the line can be unfinished (an event object can be written partially) - in this case we can't repair the json array easily, all events will be lost. With new format (one line per event) we can skip only broken lines - most lines/events will be correct.

I would prefer timestamps instead of indexes. Indexes are stored in storage - it complicates code.

let client1 = Amplitude(Configuration())
let client2 = Amplitude(Configuration())

Should client1.flush() flush client2's events? What if flush options are different?

Right now code doesn't support "sharing" properly, there are not-shared local state. A couple of examples:
https://github.com/amplitude/Amplitude-Kotlin/blob/main/core/src/main/java/com/amplitude/core/platform/EventPipeline.kt#L25
https://github.com/amplitude/Amplitude-Kotlin/blob/main/core/src/main/java/com/amplitude/core/platform/intercept/IdentifyInterceptor.kt#L29

Also look at https://github.com/amplitude/Amplitude-Kotlin/blob/main/core/src/main/java/com/amplitude/core/utilities/EventsFileManager.kt#L30-L32 - how long read "locks" are implemented to prevent read the same files.

It looks like we should ensure no Amplitude duplicates with the same instance name. Maybe Amplitude.getInstance(instanceName) is a better way instead of Amplitude(...). Alternatively we can throw/log a error in ``Amplitude(...)` if new instance with existing instance name is created.

wdyt?

@justin-fiedler
Copy link
Contributor

Resolved in #181

@justin-fiedler justin-fiedler deleted the AMP-80046-thread-safe-file-storage branch March 27, 2024 16:24
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants