diff --git a/SubstrateSdk/Classes/Network/JSONRPCEngine.swift b/SubstrateSdk/Classes/Network/JSONRPCEngine.swift index de0e5c7..59af964 100644 --- a/SubstrateSdk/Classes/Network/JSONRPCEngine.swift +++ b/SubstrateSdk/Classes/Network/JSONRPCEngine.swift @@ -12,6 +12,13 @@ public protocol JSONRPCResponseHandling { func handle(error: Error) } +public typealias JSONRPCBatchId = String + +public struct JSONRPCBatchRequestItem { + public let requestId: UInt16 + public let data: Data +} + public struct JSONRPCRequest: Equatable { public let requestId: UInt16 public let data: Data @@ -41,6 +48,39 @@ struct JSONRPCResponseHandler: JSONRPCResponseHandling { } } +struct JSONRPCBatchHandler: JSONRPCResponseHandling { + let itemsCount: Int + + public let completionClosure: ([Result]) -> Void + + public func handle(data: Data) { + do { + let decoder = JSONDecoder() + let responses = try decoder.decode([JSONRPCData].self, from: data) + let results: [Result] = responses.map { rpcData in + if let value = rpcData.result { + return .success(value) + } else if let error = rpcData.error { + return .failure(error) + } else { + return .failure(JSONRPCEngineError.emptyResult) + } + } + + completionClosure(results) + + } catch { + let errorList: [Result] = (0..] = (0..: JSONRPCSubscribing { public let requestData: Data public let requestOptions: JSONRPCOptions public var remoteId: String? + public let unsubscribeMethod: String private lazy var jsonDecoder = JSONDecoder() @@ -74,12 +116,14 @@ public final class JSONRPCSubscription: JSONRPCSubscribing { requestId: UInt16, requestData: Data, requestOptions: JSONRPCOptions, + unsubscribeMethod: String, updateClosure: @escaping (T) -> Void, failureClosure: @escaping (Error, Bool) -> Void ) { self.requestId = requestId self.requestData = requestData self.requestOptions = requestOptions + self.unsubscribeMethod = unsubscribeMethod self.updateClosure = updateClosure self.failureClosure = failureClosure } @@ -105,15 +149,37 @@ public protocol JSONRPCEngine: AnyObject { func subscribe( _ method: String, params: P?, + unsubscribeMethod: String, updateClosure: @escaping (T) -> Void, failureClosure: @escaping (Error, Bool) -> Void ) throws -> UInt16 func cancelForIdentifier(_ identifier: UInt16) + + func addBatchCallMethod( + _ method: String, + params: P?, + batchId: JSONRPCBatchId + ) throws + + func submitBatch( + for batchId: JSONRPCBatchId, + options: JSONRPCOptions, + completion closure: (([Result]) -> Void)? + ) throws -> UInt16 + + func clearBatch(for batchId: JSONRPCBatchId) } public extension JSONRPCEngine { + func submitBatch( + for batchId: JSONRPCBatchId, + completion closure: (([Result]) -> Void)? + ) throws -> UInt16 { + try submitBatch(for: batchId, options: JSONRPCOptions(), completion: closure) + } + func callMethod( _ method: String, params: P?, @@ -126,4 +192,19 @@ public extension JSONRPCEngine { completion: closure ) } + + func subscribe( + _ method: String, + params: P?, + updateClosure: @escaping (T) -> Void, + failureClosure: @escaping (Error, Bool) -> Void + ) throws -> UInt16 { + try subscribe( + method, + params: params, + unsubscribeMethod: RPCMethod.storageUnsubscribe, + updateClosure: updateClosure, + failureClosure: failureClosure + ) + } } diff --git a/SubstrateSdk/Classes/Network/WebSocketEngine+Protocol.swift b/SubstrateSdk/Classes/Network/WebSocketEngine+Protocol.swift index af68e80..a98cbfb 100644 --- a/SubstrateSdk/Classes/Network/WebSocketEngine+Protocol.swift +++ b/SubstrateSdk/Classes/Network/WebSocketEngine+Protocol.swift @@ -1,6 +1,60 @@ import Foundation extension WebSocketEngine: JSONRPCEngine { + public func addBatchCallMethod( + _ method: String, + params: P?, + batchId: JSONRPCBatchId + ) throws { + mutex.lock() + + defer { + mutex.unlock() + } + + let batchItem = try prepareBatchRequestItem(method: method, params: params) + storeBatchItem(batchItem, for: batchId) + } + + public func submitBatch( + for batchId: JSONRPCBatchId, + options: JSONRPCOptions, + completion closure: (([Result]) -> Void)? + ) throws -> UInt16 { + mutex.lock() + + defer { + mutex.unlock() + } + + guard let batchItems = partialBatches[batchId], let requestId = batchItems.first?.requestId else { + throw JSONRPCEngineError.emptyResult + } + + clearPartialBatchStorage(for: batchId) + + let request = try prepareBatchRequest( + requestId: requestId, + from: batchItems, + options: options, + completion: closure + ) + + updateConnectionForRequest(request) + + return requestId + } + + public func clearBatch(for batchId: JSONRPCBatchId) { + mutex.lock() + + defer { + mutex.unlock() + } + + clearPartialBatchStorage(for: batchId) + } + public func callMethod( _ method: String, params: P?, @@ -28,6 +82,7 @@ extension WebSocketEngine: JSONRPCEngine { public func subscribe( _ method: String, params: P?, + unsubscribeMethod: String, updateClosure: @escaping (T) -> Void, failureClosure: @escaping (Error, Bool) -> Void ) throws -> UInt16 { @@ -50,6 +105,7 @@ extension WebSocketEngine: JSONRPCEngine { requestId: request.requestId, requestData: request.data, requestOptions: request.options, + unsubscribeMethod: unsubscribeMethod, updateClosure: updateClosure, failureClosure: failureClosure ) diff --git a/SubstrateSdk/Classes/Network/WebSocketEngine.swift b/SubstrateSdk/Classes/Network/WebSocketEngine.swift index da14a12..a3a0a7d 100644 --- a/SubstrateSdk/Classes/Network/WebSocketEngine.swift +++ b/SubstrateSdk/Classes/Network/WebSocketEngine.swift @@ -78,6 +78,7 @@ public final class WebSocketEngine { private(set) var pendingRequests: [JSONRPCRequest] = [] private(set) var inProgressRequests: [UInt16: JSONRPCRequest] = [:] + private(set) var partialBatches: [String: [JSONRPCBatchRequestItem]] = [:] private(set) var subscriptions: [UInt16: JSONRPCSubscribing] = [:] private(set) var pendingSubscriptionResponses: [String: [Data]] = [:] private(set) var selectedURLIndex: Int @@ -267,6 +268,17 @@ extension WebSocketEngine { } } + func clearPartialBatchStorage(for batchId: JSONRPCBatchId) { + partialBatches[batchId] = nil + } + + func storeBatchItem(_ item: JSONRPCBatchRequestItem, for batchId: JSONRPCBatchId) { + var items = partialBatches[batchId] ?? [] + items.append(item) + + partialBatches[batchId] = items + } + func send(request: JSONRPCRequest) { inProgressRequests[request.requestId] = request @@ -337,22 +349,46 @@ extension WebSocketEngine { func process(data: Data) { do { - let response = try jsonDecoder.decode(JSONRPCBasicData.self, from: data) - - if let identifier = response.identifier { - if let error = response.error { - completeRequestForRemoteId( - identifier, - error: error - ) + if let response = try? jsonDecoder.decode(JSONRPCBasicData.self, from: data) { + // handle single request or subscription + if let identifier = response.identifier { + if let error = response.error { + completeRequestForRemoteId( + identifier, + error: error + ) + } else { + completeRequestForRemoteId( + identifier, + data: data + ) + } } else { - completeRequestForRemoteId( - identifier, - data: data - ) + try processSubscriptionUpdate(data) } } else { - try processSubscriptionUpdate(data) + // handle batch response + + let batchResponses = try jsonDecoder.decode([JSONRPCBasicData].self, from: data) + + let optBatchResponse = batchResponses.first { response in + if let identifier = response.identifier, inProgressRequests[identifier] != nil { + return true + } else { + return false + } + } + + guard let identifier = optBatchResponse?.identifier else { + if let stringData = String(data: data, encoding: .utf8) { + logger?.error("(\(chainName):\(selectedURL)) Can't parse batch: \(stringData)") + } else { + logger?.error("(\(chainName):\(selectedURL)) Can't parse batch") + } + return + } + + completeRequestForRemoteId(identifier, data: data) } } catch { if let stringData = String(data: data, encoding: .utf8) { @@ -367,17 +403,11 @@ extension WebSocketEngine { subscriptions[subscription.requestId] = subscription } - func prepareRequest( + func prepareRequestData( method: String, - params: P?, - options: JSONRPCOptions, - completion closure: ((Result) -> Void)? - ) - throws -> JSONRPCRequest { - let data: Data - - let requestId = generateRequestId() - + requestId: UInt16, + params: P? + ) throws -> Data { if let params = params { let info = JSONRPCInfo( identifier: requestId, @@ -386,7 +416,7 @@ extension WebSocketEngine { params: params ) - data = try jsonEncoder.encode(info) + return try jsonEncoder.encode(info) } else { let info = JSONRPCInfo( identifier: requestId, @@ -395,9 +425,57 @@ extension WebSocketEngine { params: [String]() ) - data = try jsonEncoder.encode(info) + return try jsonEncoder.encode(info) + } + } + + func prepareBatchRequestItem( + method: String, + params: P? + ) throws -> JSONRPCBatchRequestItem { + let requestId = generateRequestId() + + let data = try prepareRequestData(method: method, requestId: requestId, params: params) + + return JSONRPCBatchRequestItem(requestId: requestId, data: data) + } + + func prepareBatchRequest( + requestId: UInt16, + from batchItems: [JSONRPCBatchRequestItem], + options: JSONRPCOptions, + completion closure: (([Result]) -> Void)? + ) throws -> JSONRPCRequest { + let jsonList = try batchItems.map { try jsonDecoder.decode(JSON.self, from: $0.data) } + + let data = try jsonEncoder.encode(JSON.arrayValue(jsonList)) + + let handler: JSONRPCBatchHandler? + + if let closure = closure { + handler = JSONRPCBatchHandler(itemsCount: batchItems.count, completionClosure: closure) + } else { + handler = nil } + return JSONRPCRequest( + requestId: requestId, + data: data, + options: options, + responseHandler: handler + ) + } + + func prepareRequest( + method: String, + params: P?, + options: JSONRPCOptions, + completion closure: ((Result) -> Void)? + ) throws -> JSONRPCRequest { + let requestId = generateRequestId() + + let data: Data = try prepareRequestData(method: method, requestId: requestId, params: params) + let handler: JSONRPCResponseHandling? if let completionClosure = closure { @@ -447,18 +525,18 @@ extension WebSocketEngine { // check whether there is subscription for this id and send unsubscribe request if let subscription = subscriptions[identifier], let remoteId = subscription.remoteId { - unsubscribe(for: remoteId) + unsubscribe(for: remoteId, method: subscription.unsubscribeMethod) } subscriptions[identifier] = nil } - func unsubscribe(for remoteId: String) { + func unsubscribe(for remoteId: String, method: String) { pendingSubscriptionResponses[remoteId] = nil do { let request = try prepareRequest( - method: RPCMethod.storageUnsubscribe, + method: method, params: [remoteId], options: JSONRPCOptions() ) { [weak self] (result: (Result)) in