diff --git a/LDSwiftEventSource.xcodeproj/project.pbxproj b/LDSwiftEventSource.xcodeproj/project.pbxproj index 7403a95..bb29829 100644 --- a/LDSwiftEventSource.xcodeproj/project.pbxproj +++ b/LDSwiftEventSource.xcodeproj/project.pbxproj @@ -253,7 +253,7 @@ ); runOnlyForDeploymentPostprocessing = 0; shellPath = /bin/sh; - shellScript = "echo `pwd`\nif which mint >/dev/null; then\n /usr/bin/xcrun --sdk macosx mint run realm/SwiftLint\nelse\n echo \"warning: mint not installed, available from https://github.com/yonaskolb/Mint\"\nfi\n"; + shellScript = "# Adds support for Apple Silicon brew directory\nexport PATH=\"$PATH:/opt/homebrew/bin\"\n\nif which mint >/dev/null; then\n /usr/bin/xcrun --sdk macosx mint run realm/SwiftLint\nelse\n echo \"warning: mint not installed, available from https://github.com/yonaskolb/Mint\"\nfi\n"; showEnvVarsInLog = 0; }; /* End PBXShellScriptBuildPhase section */ diff --git a/Source/EventParser.swift b/Source/EventParser.swift index 29756b6..6fa9da6 100644 --- a/Source/EventParser.swift +++ b/Source/EventParser.swift @@ -1,7 +1,5 @@ import Foundation -typealias ConnectionHandler = (setReconnectionTime: (TimeInterval) -> Void, setLastEventId: (String) -> Void) - class EventParser { private struct Constants { static let dataLabel: Substring = "data" @@ -11,15 +9,17 @@ class EventParser { } private let handler: EventHandler - private let connectionHandler: ConnectionHandler private var data: String = "" - private var lastEventId: String? private var eventType: String = "" + private var lastEventIdBuffer: String? + private var lastEventId: String? + private var currentRetry: TimeInterval - init(handler: EventHandler, connectionHandler: ConnectionHandler) { + init(handler: EventHandler, initialEventId: String?, initialRetry: TimeInterval) { self.handler = handler - self.connectionHandler = connectionHandler + self.lastEventId = initialEventId + self.currentRetry = initialRetry } func parse(line: String) { @@ -35,9 +35,13 @@ class EventParser { } } - func reset() { + func getLastEventId() -> String? { lastEventId } + + func reset() -> TimeInterval { data = "" eventType = "" + lastEventIdBuffer = nil + return currentRetry } private func dropLeadingSpace(str: Substring) -> Substring { @@ -56,13 +60,13 @@ class EventParser { // See https://github.com/whatwg/html/issues/689 for reasoning on not setting lastEventId if the value // contains a null code point. if !value.contains("\u{0000}") { - lastEventId = String(value) + lastEventIdBuffer = String(value) } case Constants.eventLabel: eventType = String(value) case Constants.retryLabel: if value.allSatisfy(("0"..."9").contains), let reconnectionTime = Int64(value) { - connectionHandler.setReconnectionTime(Double(reconnectionTime) * 0.001) + currentRetry = Double(reconnectionTime) * 0.001 } default: break @@ -70,9 +74,8 @@ class EventParser { } private func dispatchEvent() { - if let lastEventId = lastEventId { - connectionHandler.setLastEventId(lastEventId) - } + lastEventId = lastEventIdBuffer ?? lastEventId + lastEventIdBuffer = nil guard !data.isEmpty else { eventType = "" diff --git a/Source/LDSwiftEventSource.swift b/Source/LDSwiftEventSource.swift index 7574ecb..e87b095 100644 --- a/Source/LDSwiftEventSource.swift +++ b/Source/LDSwiftEventSource.swift @@ -108,6 +108,30 @@ public class EventSource { } } +class ReconnectionTimer { + private let maxDelay: TimeInterval + private let resetInterval: TimeInterval + + var backoffCount: Int = 0 + var connectedTime: Date? + + init(maxDelay: TimeInterval, resetInterval: TimeInterval) { + self.maxDelay = maxDelay + self.resetInterval = resetInterval + } + + func reconnectDelay(baseDelay: TimeInterval) -> TimeInterval { + backoffCount += 1 + if let connectedTime = connectedTime, Date().timeIntervalSince(connectedTime) >= resetInterval { + backoffCount = 0 + } + self.connectedTime = nil + let maxSleep = min(maxDelay, baseDelay * pow(2.0, Double(backoffCount))) + return maxSleep / 2 + Double.random(in: 0...(maxSleep / 2)) + } +} + +// MARK: EventSourceDelegate class EventSourceDelegate: NSObject, URLSessionDataDelegate { private let delegateQueue: DispatchQueue = DispatchQueue(label: "ESDelegateQueue") private let logger = Logs() @@ -120,22 +144,19 @@ class EventSourceDelegate: NSObject, URLSessionDataDelegate { } } - private var lastEventId: String? - private var reconnectTime: TimeInterval - private var connectedTime: Date? - - private var reconnectionAttempts: Int = 0 - private var errorHandlerAction: ConnectionErrorAction = .proceed private let utf8LineParser: UTF8LineParser = UTF8LineParser() - // swiftlint:disable:next implicitly_unwrapped_optional - private var eventParser: EventParser! + private let eventParser: EventParser + private let reconnectionTimer: ReconnectionTimer private var urlSession: URLSession? private var sessionTask: URLSessionDataTask? init(config: EventSource.Config) { self.config = config - self.lastEventId = config.lastEventId - self.reconnectTime = config.reconnectTime + self.eventParser = EventParser(handler: config.handler, + initialEventId: config.lastEventId, + initialRetry: config.reconnectTime) + self.reconnectionTimer = ReconnectionTimer(maxDelay: config.maxReconnectTime, + resetInterval: config.backoffResetThreshold) } func start() { @@ -153,19 +174,24 @@ class EventSourceDelegate: NSObject, URLSessionDataDelegate { } func stop() { - let previousState = readyState - readyState = .shutdown - sessionTask?.cancel() - if previousState == .open { - config.handler.onClosed() + delegateQueue.async { + let previousState = self.readyState + self.readyState = .shutdown + self.sessionTask?.cancel() + if previousState == .open { + self.config.handler.onClosed() + } + self.urlSession?.invalidateAndCancel() + self.urlSession = nil } - urlSession?.invalidateAndCancel() } - func getLastEventId() -> String? { lastEventId } + func getLastEventId() -> String? { eventParser.getLastEventId() } func createSession() -> URLSession { - URLSession(configuration: config.urlSessionConfiguration, delegate: self, delegateQueue: nil) + let opQueue = OperationQueue() + opQueue.underlyingQueue = self.delegateQueue + return URLSession(configuration: config.urlSessionConfiguration, delegate: self, delegateQueue: opQueue) } func createRequest() -> URLRequest { @@ -174,7 +200,7 @@ class EventSourceDelegate: NSObject, URLSessionDataDelegate { timeoutInterval: self.config.idleTimeout) urlRequest.httpMethod = self.config.method urlRequest.httpBody = self.config.body - urlRequest.setValue(self.lastEventId, forHTTPHeaderField: "Last-Event-Id") + urlRequest.setValue(eventParser.getLastEventId(), forHTTPHeaderField: "Last-Event-Id") urlRequest.allHTTPHeaderFields = self.config.headerTransform( urlRequest.allHTTPHeaderFields?.merging(self.config.headers) { $1 } ?? self.config.headers ) @@ -183,11 +209,6 @@ class EventSourceDelegate: NSObject, URLSessionDataDelegate { private func connect() { logger.log(.info, "Starting EventSource client") - let connectionHandler: ConnectionHandler = ( - setReconnectionTime: { [weak self] reconnectionTime in self?.reconnectTime = reconnectionTime }, - setLastEventId: { [weak self] eventId in self?.lastEventId = eventId } - ) - self.eventParser = EventParser(handler: self.config.handler, connectionHandler: connectionHandler) let task = urlSession?.dataTask(with: createRequest()) task?.resume() sessionTask = task @@ -201,44 +222,6 @@ class EventSourceDelegate: NSObject, URLSessionDataDelegate { return action } - private func afterComplete() { - guard readyState != .shutdown - else { return } - - var nextState: ReadyState = .closed - let currentState: ReadyState = readyState - if errorHandlerAction == .shutdown { - logger.log(.info, "Connection has been explicitly shut down by error handler") - nextState = .shutdown - } - readyState = nextState - - if currentState == .open { - config.handler.onClosed() - } - - if nextState != .shutdown { - reconnect() - } - } - - private func reconnect() { - reconnectionAttempts += 1 - - if let connectedTime = connectedTime, Date().timeIntervalSince(connectedTime) >= config.backoffResetThreshold { - reconnectionAttempts = 0 - } - self.connectedTime = nil - - let maxSleep = min(config.maxReconnectTime, reconnectTime * pow(2.0, Double(reconnectionAttempts))) - let sleep = maxSleep / 2 + Double.random(in: 0...(maxSleep / 2)) - - logger.log(.info, "Waiting %.3f seconds before reconnecting...", sleep) - delegateQueue.asyncAfter(deadline: .now() + sleep) { [weak self] in - self?.connect() - } - } - // MARK: URLSession Delegates // Tells the delegate that the task finished transferring data. @@ -246,22 +229,37 @@ class EventSourceDelegate: NSObject, URLSessionDataDelegate { task: URLSessionTask, didCompleteWithError error: Error?) { utf8LineParser.closeAndReset() - eventParser.reset() + let currentRetry = eventParser.reset() + + guard readyState != .shutdown + else { return } if let error = error { - // Ignore cancelled error - if (error as NSError).code == NSURLErrorCancelled { - } else if readyState != .shutdown && errorHandlerAction != .shutdown { + if (error as NSError).code != NSURLErrorCancelled { logger.log(.info, "Connection error: %@", error.localizedDescription) - errorHandlerAction = dispatchError(error: error) - } else { - errorHandlerAction = .shutdown + if dispatchError(error: error) == .shutdown { + logger.log(.info, "Connection has been explicitly shut down by error handler") + if readyState == .open { + config.handler.onClosed() + } + readyState = .shutdown + return + } } } else { logger.log(.info, "Connection unexpectedly closed.") } - afterComplete() + if readyState == .open { + config.handler.onClosed() + } + + readyState = .closed + let sleep = reconnectionTimer.reconnectDelay(baseDelay: currentRetry) + logger.log(.info, "Waiting %.3f seconds before reconnecting...", sleep) + delegateQueue.asyncAfter(deadline: .now() + sleep) { [weak self] in + self?.connect() + } } // Tells the delegate that the data task received the initial reply (headers) from the server. @@ -280,13 +278,16 @@ class EventSourceDelegate: NSObject, URLSessionDataDelegate { // swiftlint:disable:next force_cast let httpResponse = response as! HTTPURLResponse if (200..<300).contains(httpResponse.statusCode) { - connectedTime = Date() + reconnectionTimer.connectedTime = Date() readyState = .open config.handler.onOpened() completionHandler(.allow) } else { logger.log(.info, "Unsuccessful response: %d", httpResponse.statusCode) - errorHandlerAction = dispatchError(error: UnsuccessfulResponseError(responseCode: httpResponse.statusCode)) + if dispatchError(error: UnsuccessfulResponseError(responseCode: httpResponse.statusCode)) == .shutdown { + logger.log(.info, "Connection has been explicitly shut down by error handler") + readyState = .shutdown + } completionHandler(.cancel) } } diff --git a/Tests/EventParserTests.swift b/Tests/EventParserTests.swift index 7db13f7..86656a5 100644 --- a/Tests/EventParserTests.swift +++ b/Tests/EventParserTests.swift @@ -2,94 +2,87 @@ import XCTest @testable import LDSwiftEventSource final class EventParserTests: XCTestCase { - var receivedReconnectionTime: TimeInterval? - var receivedLastEventId: String? - lazy var connectionHandler: ConnectionHandler = { (setReconnectionTime: { self.receivedReconnectionTime = $0 }, - setLastEventId: { self.receivedLastEventId = $0 }) }() var handler: MockHandler! var parser: EventParser! override func setUp() { super.setUp() - resetMocks() - parser = EventParser(handler: handler, connectionHandler: connectionHandler) + handler = MockHandler() + parser = EventParser(handler: handler, initialEventId: nil, initialRetry: 1.0) } override func tearDown() { super.tearDown() XCTAssertNil(handler.events.maybeEvent()) - // Validate that `reset` completely resets the parser - receivedReconnectionTime = nil - parser.reset() - parser.parse(line: "data: hello") - parser.parse(line: "") - guard case let .message(eventType, event) = handler.events.maybeEvent() - else { - XCTFail("Unexpectedly received comment event") - return - } - XCTAssertEqual(eventType, "message") - XCTAssertEqual(event.data, "hello") - XCTAssertNil(handler.events.maybeEvent()) - XCTAssertNil(receivedReconnectionTime) - } - - func resetMocks() { - receivedReconnectionTime = nil - receivedLastEventId = nil - handler = MockHandler() } - func expectNoConnectionHandlerCalls() { - XCTAssertNil(receivedReconnectionTime) - XCTAssertNil(receivedLastEventId) + // MARK: Retry time tests + func testUnsetRetryReturnsConfigured() { + parser = EventParser(handler: handler, initialEventId: nil, initialRetry: 5.0) + XCTAssertEqual(parser.reset(), 5.0) } - // MARK: Retry time tests func testSetsRetryTimeToSevenSeconds() { parser.parse(line: "retry: 7000") - XCTAssertEqual(receivedReconnectionTime, 7.0) - XCTAssertNil(receivedLastEventId) + XCTAssertEqual(parser.reset(), 7.0) + XCTAssertNil(parser.getLastEventId()) } func testRetryWithNoSpace() { parser.parse(line: "retry:7000") - XCTAssertEqual(receivedReconnectionTime, 7.0) - XCTAssertNil(receivedLastEventId) + XCTAssertEqual(parser.reset(), 7.0) + XCTAssertNil(parser.getLastEventId()) } func testDoesNotSetRetryTimeUnlessEntireValueIsNumeric() { parser.parse(line: "retry: 7000L") - expectNoConnectionHandlerCalls() + XCTAssertEqual(parser.reset(), 1.0) } func testSafeToUseEmptyRetryTime() { parser.parse(line: "retry") - expectNoConnectionHandlerCalls() + XCTAssertEqual(parser.reset(), 1.0) } func testSafeToAttemptToSetRetryToOutOfBoundsValue() { parser.parse(line: "retry: 10000000000000000000000000") - expectNoConnectionHandlerCalls() + XCTAssertEqual(parser.reset(), 1.0) + } + + func testResetDoesNotResetRetry() { + parser.parse(line: "retry: 7000") + XCTAssertEqual(parser.reset(), 7.0) + XCTAssertEqual(parser.reset(), 7.0) + } + + func testRetryNotChangedDuringOtherMessages() { + parser.parse(line: "retry: 7000") + parser.parse(line: "") + parser.parse(line: ":123") + parser.parse(line: "event: 123") + parser.parse(line: "data: 123") + parser.parse(line: "id: 123") + parser.parse(line: "none: 123") + parser.parse(line: "") + XCTAssertEqual(parser.reset(), 7.0) + _ = handler.events.maybeEvent() + _ = handler.events.maybeEvent() } // MARK: Comment tests func testEmptyComment() { parser.parse(line: ":") XCTAssertEqual(handler.events.maybeEvent(), .comment("")) - expectNoConnectionHandlerCalls() } func testCommentBody() { parser.parse(line: ": comment") XCTAssertEqual(handler.events.maybeEvent(), .comment(" comment")) - expectNoConnectionHandlerCalls() } func testCommentCanContainColon() { parser.parse(line: ":comment:line") XCTAssertEqual(handler.events.maybeEvent(), .comment("comment:line")) - expectNoConnectionHandlerCalls() } // MARK: Message data tests @@ -103,14 +96,12 @@ final class EventParserTests: XCTestCase { XCTAssertEqual(handler.events.maybeEvent(), .message("message", MessageEvent(data: "", lastEventId: nil))) XCTAssertEqual(handler.events.maybeEvent(), .message("message", MessageEvent(data: "", lastEventId: nil))) XCTAssertEqual(handler.events.maybeEvent(), .message("message", MessageEvent(data: "", lastEventId: nil))) - expectNoConnectionHandlerCalls() } func testDoesNotRemoveTrailingSpaceWhenColonNotPresent() { parser.parse(line: "data ") parser.parse(line: "") XCTAssertNil(handler.events.maybeEvent()) - expectNoConnectionHandlerCalls() } func testEmptyFirstDataAppendsNewline() { @@ -118,14 +109,12 @@ final class EventParserTests: XCTestCase { parser.parse(line: "data:") parser.parse(line: "") XCTAssertEqual(handler.events.maybeEvent(), .message("message", MessageEvent(data: "\n", lastEventId: nil))) - expectNoConnectionHandlerCalls() } func testDispatchesSingleLineMessage() { parser.parse(line: "data: hello") parser.parse(line: "") XCTAssertEqual(handler.events.maybeEvent(), .message("message", MessageEvent(data: "hello", lastEventId: nil))) - expectNoConnectionHandlerCalls() } func testEmptyDataWithBufferedDataAppendsNewline() { @@ -140,7 +129,6 @@ final class EventParserTests: XCTestCase { parser.parse(line: "") parser.parse(line: "") XCTAssertEqual(handler.events.maybeEvent(), .message("message", MessageEvent(data: "hello", lastEventId: nil))) - expectNoConnectionHandlerCalls() } func testRemovesOnlyFirstSpace() { @@ -220,14 +208,18 @@ final class EventParserTests: XCTestCase { } // MARK: Last event ID tests + func testLastEventIdNotReturnedUntilDispatch() { + XCTAssertNil(parser.getLastEventId()) + parser.parse(line: "id: 1") + XCTAssertNil(handler.events.maybeEvent()) + XCTAssertNil(parser.getLastEventId()) + } + func testRecordsLastEventIdWithoutData() { parser.parse(line: "id: 1") - // Should not have set until we dispatch with an empty line - expectNoConnectionHandlerCalls() parser.parse(line: "") XCTAssertNil(handler.events.maybeEvent()) - XCTAssertEqual(receivedLastEventId, "1") - XCTAssertNil(receivedReconnectionTime) + XCTAssertEqual(parser.getLastEventId(), "1") } func testEventIdIncludedInMessageEvent() { @@ -235,8 +227,6 @@ final class EventParserTests: XCTestCase { parser.parse(line: "id: 1") parser.parse(line: "") XCTAssertEqual(handler.events.maybeEvent(), .message("message", MessageEvent(data: "hello", lastEventId: "1"))) - XCTAssertEqual(receivedLastEventId, "1") - XCTAssertNil(receivedReconnectionTime) } func testReusesEventIdIfNotSet() { @@ -247,21 +237,17 @@ final class EventParserTests: XCTestCase { parser.parse(line: "") XCTAssertEqual(handler.events.maybeEvent(), .message("message", MessageEvent(data: "hello", lastEventId: "reused"))) XCTAssertEqual(handler.events.maybeEvent(), .message("message", MessageEvent(data: "world", lastEventId: "reused"))) - XCTAssertEqual(receivedLastEventId, "reused") - XCTAssertNil(receivedReconnectionTime) + XCTAssertEqual(parser.getLastEventId(), "reused") } func testEventIdSetTwiceInEvent() { parser.parse(line: "id: abc") - // We want to only dispatch the ID when the event is completed - XCTAssertNil(receivedLastEventId) parser.parse(line: "id: def") parser.parse(line: "data") - XCTAssertNil(receivedLastEventId) + XCTAssertNil(parser.getLastEventId()) parser.parse(line: "") XCTAssertEqual(handler.events.maybeEvent(), .message("message", MessageEvent(data: "", lastEventId: "def"))) - XCTAssertEqual(receivedLastEventId, "def") - XCTAssertNil(receivedReconnectionTime) + XCTAssertEqual(parser.getLastEventId(), "def") } func testEventIdContainingNullIgnored() { @@ -270,18 +256,26 @@ final class EventParserTests: XCTestCase { parser.parse(line: "data") parser.parse(line: "") XCTAssertEqual(handler.events.maybeEvent(), .message("message", MessageEvent(data: "", lastEventId: "reused"))) - XCTAssertEqual(receivedLastEventId, "reused") - XCTAssertNil(receivedReconnectionTime) + XCTAssertEqual(parser.getLastEventId(), "reused") + } + + func testResetDoesResetLastEventIdBuffer() { + parser.parse(line: "id: 1") + _ = parser.reset() + parser.parse(line: "data: hello") + parser.parse(line: "") + XCTAssertEqual(handler.events.maybeEvent(), .message("message", MessageEvent(data: "hello", lastEventId: nil))) + XCTAssertNil(parser.getLastEventId()) } func testResetDoesNotResetLastEventId() { parser.parse(line: "id: 1") - parser.reset() + parser.parse(line: "") + _ = parser.reset() parser.parse(line: "data: hello") parser.parse(line: "") XCTAssertEqual(handler.events.maybeEvent(), .message("message", MessageEvent(data: "hello", lastEventId: "1"))) - XCTAssertEqual(receivedLastEventId, "1") - XCTAssertNil(receivedReconnectionTime) + XCTAssertEqual(parser.getLastEventId(), "1") } // MARK: Mixed and other tests @@ -290,13 +284,11 @@ final class EventParserTests: XCTestCase { parser.parse(line: "") parser.parse(line: "") XCTAssertNil(handler.events.maybeEvent()) - expectNoConnectionHandlerCalls() } func testNothingDoneForInvalidFieldName() { parser.parse(line: "invalid: bar") XCTAssertNil(handler.events.maybeEvent()) - expectNoConnectionHandlerCalls() } func testInvalidFieldNameIgnoredInEvent() {