Skip to content

Commit

Permalink
fix: Add maxQueuedEventCount parameter to trim events in storage if o…
Browse files Browse the repository at this point in the history
…ver a limit (#222)
  • Loading branch information
crleona authored Sep 5, 2024
1 parent 63e76d9 commit 0134383
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 1 deletion.
30 changes: 30 additions & 0 deletions Sources/Amplitude/Amplitude.swift
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ public class Amplitude {
configuration.optOutChanged = { [weak self] optOut in
self?.timeline.onOptOutChanged(optOut)
}

trackingQueue.async { [self] in
self.trimQueuedEvents()
}
}

convenience init(apiKey: String, configuration: Configuration) {
Expand Down Expand Up @@ -450,4 +454,30 @@ public class Amplitude {
internal func isSandboxEnabled() -> Bool {
return SandboxHelper().isSandboxEnabled()
}

func trimQueuedEvents() {
logger?.debug(message: "Trimming queued events..")
guard configuration.maxQueuedEventCount > 0,
let eventBlocks: [URL] = storage.read(key: .EVENTS),
!eventBlocks.isEmpty else {
return
}

var eventCount = 0
// Blocks are returned in sorted order, oldest -> newest. Reverse to count newest blocks first.
// Only whole blocks are deleted, meaning up to maxQueuedEventCount + flushQueueSize - 1
// events may be left on device.
for eventBlock in eventBlocks.reversed() {
if eventCount < configuration.maxQueuedEventCount {
if let eventString = storage.getEventsString(eventBlock: eventBlock),
let eventArray = BaseEvent.fromArrayString(jsonString: eventString) {
eventCount += eventArray.count
}
} else {
logger?.debug(message: "Trimming \(eventBlock)")
storage.remove(eventBlock: eventBlock)
}
}
logger?.debug(message: "Completed trimming events, kept \(eventCount) most recent events")
}
}
3 changes: 3 additions & 0 deletions Sources/Amplitude/Configuration.swift
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public class Configuration {
public internal(set) var autocapture: AutocaptureOptions
public var offline: Bool?
internal let diagonostics: Diagnostics
public var maxQueuedEventCount = -1
var optOutChanged: ((Bool) -> Void)?

@available(*, deprecated, message: "Please use the `autocapture` parameter instead.")
Expand Down Expand Up @@ -135,6 +136,7 @@ public class Configuration {
// `trackingSessionEvents` has been replaced by `defaultTracking.sessions`
autocapture: AutocaptureOptions = .sessions,
identifyBatchIntervalMillis: Int = Constants.Configuration.IDENTIFY_BATCH_INTERVAL_MILLIS,
maxQueuedEventCount: Int = -1,
migrateLegacyData: Bool = true,
offline: Bool? = false
) {
Expand Down Expand Up @@ -167,6 +169,7 @@ public class Configuration {
self.minTimeBetweenSessionsMillis = minTimeBetweenSessionsMillis
self.autocapture = autocapture
self.identifyBatchIntervalMillis = identifyBatchIntervalMillis
self.maxQueuedEventCount = maxQueuedEventCount
self.migrateLegacyData = migrateLegacyData
// Logging is OFF by default
self.loggerProvider.logLevel = logLevel.rawValue
Expand Down
71 changes: 71 additions & 0 deletions Tests/AmplitudeTests/AmplitudeTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -843,6 +843,77 @@ final class AmplitudeTests: XCTestCase {
wait(for: [deallocExpectation], timeout: 10.0)
}

func testTrimQueuedEvents() {
class TrimTestStorage: Storage {

private var events: [URL: Int] = [:]

func addEventFile(url: URL, eventCount: Int) {
events[url] = eventCount
}

func write(key: StorageKey, value: Any?) throws {}

func read<T>(key: StorageKey) -> T? {
switch key {
case .EVENTS:
return events.keys.sorted(by: {$0.absoluteString < $1.absoluteString}) as? T
default:
return nil
}
}

func getEventsString(eventBlock: URL) -> String? {
guard let eventCount = events[eventBlock] else {
return nil
}

let events = (0..<eventCount).map { BaseEvent(eventType: "Event \($0)") }

guard let jsonData = try? JSONEncoder().encode(events) else {
return nil
}

return String(data: jsonData, encoding: .utf8)
}

func remove(eventBlock: URL) {
events[eventBlock] = nil
}

func splitBlock(eventBlock: URL, events: [BaseEvent]) {}

func rollover() {}

func reset() {}

func getResponseHandler(configuration: Configuration,
eventPipeline: EventPipeline,
eventBlock: URL,
eventsString: String) -> ResponseHandler {
abort()
}
}

let storage = TrimTestStorage()
storage.addEventFile(url: URL(string: "file://test/0")!, eventCount: 10)
storage.addEventFile(url: URL(string: "file://test/1")!, eventCount: 10)
storage.addEventFile(url: URL(string: "file://test/2")!, eventCount: 10)
storage.addEventFile(url: URL(string: "file://test/3")!, eventCount: 10)

let amplitude = Amplitude(configuration: Configuration(apiKey: "test-api-key",
storageProvider: storage,
maxQueuedEventCount: 15))
amplitude.waitForTrackingQueue()

let allEventBlocks: [URL] = storage.read(key: .EVENTS) ?? []

XCTAssert(!allEventBlocks.contains(URL(string: "file://test/0")!))
XCTAssert(!allEventBlocks.contains(URL(string: "file://test/1")!))
XCTAssert(allEventBlocks.contains(URL(string: "file://test/2")!))
XCTAssert(allEventBlocks.contains(URL(string: "file://test/3")!))
}

func getDictionary(_ props: [String: Any?]) -> NSDictionary {
return NSDictionary(dictionary: props as [AnyHashable: Any])
}
Expand Down
2 changes: 1 addition & 1 deletion Tests/AmplitudeTests/Utilities/EventPipelineTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ final class EventPipelineTests: XCTestCase {
pipeline?.storage?.rollover()

let testEvent2 = BaseEvent(userId: "unit-test", deviceId: "unit-test-machine", eventType: "testEvent2")
try? pipeline.storage?.write(key: StorageKey.EVENTS, value: testEvent)
try? pipeline.storage?.write(key: StorageKey.EVENTS, value: testEvent2)
pipeline.storage?.rollover()

let httpResponseExpectation1 = expectation(description: "httpresponse1")
Expand Down

0 comments on commit 0134383

Please sign in to comment.