Skip to content

Commit

Permalink
fix: add repeat interval on EventPipeline to prevent high frequency r…
Browse files Browse the repository at this point in the history
…equests on continuing failing (#250)

* fix: add repeat interval on EventPipeline to prevent high-frequency requests on continuing failing
  • Loading branch information
sojingle authored Dec 17, 2024
1 parent 2f16421 commit 8af9acf
Show file tree
Hide file tree
Showing 6 changed files with 198 additions and 34 deletions.
13 changes: 13 additions & 0 deletions Sources/Amplitude/Types.swift
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,11 @@ public protocol ResponseHandler {
func handleTooManyRequestsResponse(data: [String: Any])
func handleTimeoutResponse(data: [String: Any])
func handleFailedResponse(data: [String: Any])

// Added on v1.11.2.
// A replacement for handle(result: Result<Int, Error>) -> Void
// Return true if some attempts to recover are implemented
func handle(result: Result<Int, Error>) -> Bool
}

extension ResponseHandler {
Expand All @@ -170,3 +175,11 @@ extension ResponseHandler {
return indices
}
}

// Provide compatibility for new `handle` function added on v1.11.2.
extension ResponseHandler {
public func handle(result: Result<Int, any Error>) -> Bool {
let _: Void = handle(result: result)
return false
}
}
45 changes: 37 additions & 8 deletions Sources/Amplitude/Utilities/EventPipeline.swift
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ public class EventPipeline {
let storage: Storage?
let logger: (any Logger)?
let configuration: Configuration
var maxRetryInterval: TimeInterval = 60
var maxRetryCount: Int = 6

@Atomic internal var eventCount: Int = 0
internal var flushTimer: QueueTimer?
Expand Down Expand Up @@ -66,7 +68,7 @@ public class EventPipeline {
}
}

private func sendNextEventFile() {
private func sendNextEventFile(failures: Int = 0) {
autoreleasepool {
guard currentUpload == nil else {
logger?.log(message: "Existing upload in progress, skipping...")
Expand Down Expand Up @@ -100,14 +102,41 @@ public class EventPipeline {
eventBlock: nextEventFile,
eventsString: eventsString
)
responseHandler.handle(result: result)
// Don't send the next event file if we're being deallocated
self.uploadsQueue.async { [weak self] in
guard let self = self else {
return
let handled: Bool = responseHandler.handle(result: result)
var failures = failures

switch result {
case .success:
failures = 0
case .failure:
if !handled {
failures += 1
}
}

if failures > self.maxRetryCount {
self.uploadsQueue.async {
self.currentUpload = nil
}
self.configuration.offline = true
self.logger?.log(message: "Request failed more than \(self.maxRetryCount) times, marking offline")
} else {
// Don't send the next event file if we're being deallocated
let nextFileBlock: () -> Void = { [weak self] in
guard let self = self else {
return
}
self.currentUpload = nil
self.sendNextEventFile(failures: failures)
}

if failures == 0 || handled {
self.uploadsQueue.async(execute: nextFileBlock)
} else {
let sendingInterval = min(self.maxRetryInterval, pow(2, Double(failures - 1)))
self.uploadsQueue.asyncAfter(deadline: .now() + sendingInterval, execute: nextFileBlock)
self.logger?.debug(message: "Request failed \(failures) times, send next event file in \(sendingInterval) seconds")
}
self.currentUpload = nil
self.sendNextEventFile()
}
}
}
Expand Down
76 changes: 56 additions & 20 deletions Sources/Amplitude/Utilities/PersistentStorageResponseHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,22 @@ class PersistentStorageResponseHandler: ResponseHandler {
self.eventsString = eventsString
}

func handleSuccessResponse(code: Int) {
func handleSuccessResponse(code: Int) -> Bool {
guard let events = BaseEvent.fromArrayString(jsonString: eventsString) else {
storage.remove(eventBlock: eventBlock)
removeEventCallbackByEventsString(eventsString: eventsString)
return
return true
}
triggerEventsCallback(events: events, code: code, message: "Successfully send event")
storage.remove(eventBlock: eventBlock)
return true
}

func handleBadRequestResponse(data: [String: Any]) {
func handleBadRequestResponse(data: [String: Any]) -> Bool {
guard let events = BaseEvent.fromArrayString(jsonString: eventsString) else {
storage.remove(eventBlock: eventBlock)
removeEventCallbackByEventsString(eventsString: eventsString)
return
return true
}

let error = data["error"] as? String ?? ""
Expand All @@ -55,7 +56,7 @@ class PersistentStorageResponseHandler: ResponseHandler {
message: error
)
storage.remove(eventBlock: eventBlock)
return
return true
}

var dropIndexes = Set<Int>()
Expand Down Expand Up @@ -90,13 +91,14 @@ class PersistentStorageResponseHandler: ResponseHandler {
}

storage.remove(eventBlock: eventBlock)
return true
}

func handlePayloadTooLargeResponse(data: [String: Any]) {
func handlePayloadTooLargeResponse(data: [String: Any]) -> Bool {
guard let events = BaseEvent.fromArrayString(jsonString: eventsString) else {
storage.remove(eventBlock: eventBlock)
removeEventCallbackByEventsString(eventsString: eventsString)
return
return true
}
if events.count == 1 {
let error = data["error"] as? String ?? ""
Expand All @@ -106,28 +108,32 @@ class PersistentStorageResponseHandler: ResponseHandler {
message: error
)
storage.remove(eventBlock: eventBlock)
return
return true
}
storage.splitBlock(eventBlock: eventBlock, events: events)
return true
}

func handleTooManyRequestsResponse(data: [String: Any]) {
func handleTooManyRequestsResponse(data: [String: Any]) -> Bool {
// wait for next time to pick it up
return false
}

func handleTimeoutResponse(data: [String: Any]) {
func handleTimeoutResponse(data: [String: Any]) -> Bool {
// Wait for next time to pick it up
return false
}

func handleFailedResponse(data: [String: Any]) {
func handleFailedResponse(data: [String: Any]) -> Bool {
// wait for next time to try again
return false
}

func handle(result: Result<Int, Error>) {
func handle(result: Result<Int, Error>) -> Bool {
switch result {
case .success(let code):
// We don't care about the data when success
handleSuccessResponse(code: code)
return handleSuccessResponse(code: code)
case .failure(let error):
switch error {
case HttpClient.Exception.httpError(let code, let data):
Expand All @@ -137,20 +143,20 @@ class PersistentStorageResponseHandler: ResponseHandler {
}
switch code {
case HttpClient.HttpStatus.BAD_REQUEST.rawValue:
handleBadRequestResponse(data: json)
return handleBadRequestResponse(data: json)
case HttpClient.HttpStatus.PAYLOAD_TOO_LARGE.rawValue:
handlePayloadTooLargeResponse(data: json)
return handlePayloadTooLargeResponse(data: json)
case HttpClient.HttpStatus.TIMEOUT.rawValue:
handleTimeoutResponse(data: json)
return handleTimeoutResponse(data: json)
case HttpClient.HttpStatus.TOO_MANY_REQUESTS.rawValue:
handleTooManyRequestsResponse(data: json)
return handleTooManyRequestsResponse(data: json)
case HttpClient.HttpStatus.FAILED.rawValue:
handleFailedResponse(data: json)
return handleFailedResponse(data: json)
default:
handleFailedResponse(data: json)
return handleFailedResponse(data: json)
}
default:
break
return false
}
}
}
Expand Down Expand Up @@ -184,3 +190,33 @@ extension PersistentStorageResponseHandler {
}
}
}

extension PersistentStorageResponseHandler {
func handle(result: Result<Int, any Error>) {
let _: Bool = handle(result: result)
}

func handleSuccessResponse(code: Int) {
let _: Bool = handleSuccessResponse(code: code)
}

func handleBadRequestResponse(data: [String: Any]) {
let _: Bool = handleBadRequestResponse(data: data)
}

func handlePayloadTooLargeResponse(data: [String: Any]) {
let _: Bool = handlePayloadTooLargeResponse(data: data)
}

func handleTooManyRequestsResponse(data: [String: Any]) {
let _: Bool = handleTooManyRequestsResponse(data: data)
}

func handleTimeoutResponse(data: [String: Any]) {
let _: Bool = handleTimeoutResponse(data: data)
}

func handleFailedResponse(data: [String: Any]) {
let _: Bool = handleFailedResponse(data: data)
}
}
17 changes: 13 additions & 4 deletions Tests/AmplitudeTests/Supports/TestUtilities.swift
Original file line number Diff line number Diff line change
Expand Up @@ -186,17 +186,26 @@ class FakeResponseHandler: ResponseHandler {
self.eventsString = eventsString
}

func handle(result: Result<Int, Error>) {
func handle(result: Result<Int, Error>) -> Bool {
switch result {
case .success(let code):
handleSuccessResponse(code: code)
return handleSuccessResponse(code: code)
default:
break
return false
}
}

func handleSuccessResponse(code: Int) {
func handleSuccessResponse(code: Int) -> Bool {
storage.remove(eventBlock: eventBlock)
return true
}

func handle(result: Result<Int, any Error>) {
let _: Bool = handle(result: result)
}

func handleSuccessResponse(code: Int) {
let _: Bool = handleSuccessResponse(code: code)
}

func handleBadRequestResponse(data: [String: Any]) {
Expand Down
77 changes: 77 additions & 0 deletions Tests/AmplitudeTests/Utilities/EventPipelineTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -183,4 +183,81 @@ final class EventPipelineTests: XCTestCase {
XCTAssertEqual(uploadedEvents1?.count, 1)
XCTAssertEqual(uploadedEvents1?[0].eventType, "testEvent-1")
}

// test continues to fail until the event is uploaded
func testContinuousFailure() {
pipeline.configuration.offline = false
pipeline.maxRetryCount = 2

let testEvent = BaseEvent(userId: "unit-test", deviceId: "unit-test-machine", eventType: "testEvent")
try? pipeline.storage?.write(key: StorageKey.EVENTS, value: testEvent)

let uploadExpectations = (0..<4).map { i in expectation(description: "httpresponse-\(i)") }
httpClient.uploadExpectations = uploadExpectations

httpClient.uploadResults = [
.failure(NSError(domain: "unknown", code: 0, userInfo: nil)), // instant failure
.failure(NSError(domain: "unknown", code: 0, userInfo: nil)), // +1s failure
.failure(NSError(domain: "unknown", code: 0, userInfo: nil)), // +2s failure, go offline
.success(200)
]

pipeline.flush()
wait(for: [uploadExpectations[0], uploadExpectations[1]], timeout: 2)

XCTAssertEqual(httpClient.uploadCount, 2)
XCTAssertEqual(pipeline.configuration.offline, false)

wait(for: [uploadExpectations[2]], timeout: 3)

XCTAssertEqual(httpClient.uploadCount, 3)
XCTAssertEqual(pipeline.configuration.offline, true)

pipeline.configuration.offline = false
let flushExpectation = expectation(description: "flush")
pipeline.flush {
flushExpectation.fulfill()
}
wait(for: [uploadExpectations[3], flushExpectation], timeout: 1)

XCTAssertEqual(httpClient.uploadCount, 4)
}

func testContinuesHandledFailure() {
pipeline.configuration.offline = false
pipeline.maxRetryCount = 1

let testEvent1 = BaseEvent(userId: "unit-test", deviceId: "unit-test-machine", eventType: "testEvent")
try? pipeline.storage?.write(key: StorageKey.EVENTS, value: testEvent1)
pipeline.storage?.rollover()

let testEvent2 = BaseEvent(userId: "unit-test", deviceId: "unit-test-machine", eventType: "testEvent")
try? pipeline.storage?.write(key: StorageKey.EVENTS, value: testEvent2)
pipeline.storage?.rollover()

let testEvent3 = BaseEvent(userId: "unit-test", deviceId: "unit-test-machine", eventType: "testEvent")
try? pipeline.storage?.write(key: StorageKey.EVENTS, value: testEvent3)
pipeline.storage?.rollover()

let uploadExpectations = (0..<3).map { i in expectation(description: "httpresponse-\(i)") }
httpClient.uploadExpectations = uploadExpectations

httpClient.uploadResults = [
.failure(HttpClient.Exception.httpError(code: HttpClient.HttpStatus.BAD_REQUEST.rawValue, data: nil)),
.failure(HttpClient.Exception.httpError(code: HttpClient.HttpStatus.PAYLOAD_TOO_LARGE.rawValue, data: nil)),
.success(200),
]

let flushExpectation = expectation(description: "flush")
pipeline.flush {
flushExpectation.fulfill()
}
wait(for: [uploadExpectations[0], uploadExpectations[1]], timeout: 1)
XCTAssertEqual(httpClient.uploadCount, 2)
XCTAssertEqual(pipeline.configuration.offline, false)

wait(for: [uploadExpectations[2], flushExpectation], timeout: 1)
XCTAssertEqual(httpClient.uploadCount, 3)
XCTAssertEqual(pipeline.configuration.offline, false)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ final class PersistentStorageResponseHandlerTests: XCTestCase {
eventsString: eventsString
)

handler.handleSuccessResponse(code: 200)
let _: Bool = handler.handleSuccessResponse(code: 200)
XCTAssertEqual(
fakePersistentStorage.haveBeenCalledWith[0],
"remove(eventBlock: \(eventBlock.absoluteURL))"
Expand Down Expand Up @@ -150,7 +150,7 @@ final class PersistentStorageResponseHandlerTests: XCTestCase {
eventsString: eventsString
)

handler.handleBadRequestResponse(data: ["error": "Invalid API key: \(configuration.apiKey)"])
let _: Bool = handler.handleBadRequestResponse(data: ["error": "Invalid API key: \(configuration.apiKey)"])
XCTAssertEqual(
fakePersistentStorage.haveBeenCalledWith[0],
"remove(eventBlock: \(eventBlock.absoluteURL))"
Expand Down

0 comments on commit 8af9acf

Please sign in to comment.