Skip to content

Commit

Permalink
add integration test cases for appsync client
Browse files Browse the repository at this point in the history
  • Loading branch information
5d committed Feb 24, 2024
1 parent 3d48710 commit d641f00
Show file tree
Hide file tree
Showing 4 changed files with 159 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,19 @@ actor AppSyncRealTimeClient: AppSyncRealTimeClientProtocol {
}

func connect() async throws {
if self.state.value == .connecting || self.state.value == .connected {
switch self.state.value {
case .connecting, .connected:
log.debug("[AppSyncRealTimeClient] client is already connecting or connected")
return
case .disconnecting:
try await waitForState(.disconnected)
case .connectionDropped, .disconnected, .none:
break
}

guard self.state.value != .connecting else {
log.debug("[AppSyncRealTimeClient] actor reentry, state has been changed to connecting")
return
}

self.state.send(.connecting)
Expand All @@ -89,11 +99,11 @@ actor AppSyncRealTimeClient: AppSyncRealTimeClientProtocol {
}

func disconnect() async {
defer { self.state.send(.disconnected) }
log.debug("[AppSyncRealTimeClient] client start disconnecting")
self.state.send(.disconnecting)
self.cancellablesBindToConnection = Set()
await self.webSocketClient.disconnect()
self.state.send(.disconnected)
log.debug("[AppSyncRealTimeClient] client is disconnected")
}

Expand All @@ -104,7 +114,7 @@ actor AppSyncRealTimeClient: AppSyncRealTimeClientProtocol {
Task {
if !self.isConnected {
try await connect()
try await waitForStateConnected()
try await waitForState(.connected)
}

try await startSubscription(id: id, query: query).store(in: &cancellablesBindToConnection)
Expand All @@ -113,11 +123,11 @@ actor AppSyncRealTimeClient: AppSyncRealTimeClientProtocol {
return filterAppSyncSubscriptionEvent(with: id)
}

private func waitForStateConnected() async throws {
private func waitForState(_ targetState: State) async throws {
var cancellables = Set<AnyCancellable>()

try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<Void, Swift.Error>) -> Void in
state.filter { $0 == .connected }
state.filter { $0 == targetState }
.setFailureType(to: AppSyncRealTimeRequest.Error.self)
.timeout(.seconds(10), scheduler: DispatchQueue.global())
.first()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,7 @@ extension AppSyncRealTimeClientFactory: Resettable {
func reset() async {
await withTaskGroup(of: Void.self) { taskGroup in
self.apiToClientCache.values
.map { $0 as? Resettable }
.compactMap { $0 }
.compactMap { $0 as? Resettable }
.forEach { resettable in
taskGroup.addTask { await resettable.reset()}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@
39E0F2AA28A440A700939D9F /* GraphQLWithUserPoolIntegrationTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 21698BBD28899B6D004BD994 /* GraphQLWithUserPoolIntegrationTests.swift */; };
39E0F2AD28A441B100939D9F /* TestConfigHelper.swift in Sources */ = {isa = PBXBuildFile; fileRef = 39E0F2AC28A441B100939D9F /* TestConfigHelper.swift */; };
39E0F2AF28A4425C00939D9F /* Todo.swift in Sources */ = {isa = PBXBuildFile; fileRef = 39E0F2AE28A4425C00939D9F /* Todo.swift */; };
606C8B792B895E5A00716094 /* AppSyncRealTimeClientTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 606C8B782B895E5A00716094 /* AppSyncRealTimeClientTests.swift */; };
681B35422A43962D0074F369 /* Team2+Schema.swift in Sources */ = {isa = PBXBuildFile; fileRef = 2126271F289ABFE9003788E3 /* Team2+Schema.swift */; };
681B35432A43962D0074F369 /* EnumTestModel.swift in Sources */ = {isa = PBXBuildFile; fileRef = 21262707289ABFE6003788E3 /* EnumTestModel.swift */; };
681B35442A43962D0074F369 /* ScalarContainer.swift in Sources */ = {isa = PBXBuildFile; fileRef = 21262720289ABFE9003788E3 /* ScalarContainer.swift */; };
Expand Down Expand Up @@ -676,6 +677,7 @@
39E0F2A128A43FB100939D9F /* AWSAPIPluginGraphQLUserPoolTests.xctest */ = {isa = PBXFileReference; explicitFileType = wrapper.cfbundle; includeInIndex = 0; path = AWSAPIPluginGraphQLUserPoolTests.xctest; sourceTree = BUILT_PRODUCTS_DIR; };
39E0F2AC28A441B100939D9F /* TestConfigHelper.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = TestConfigHelper.swift; sourceTree = "<group>"; };
39E0F2AE28A4425C00939D9F /* Todo.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = Todo.swift; sourceTree = "<group>"; };
606C8B782B895E5A00716094 /* AppSyncRealTimeClientTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = AppSyncRealTimeClientTests.swift; sourceTree = "<group>"; };
681B35292A4395730074F369 /* APIWatchApp.app */ = {isa = PBXFileReference; explicitFileType = wrapper.application; includeInIndex = 0; path = APIWatchApp.app; sourceTree = BUILT_PRODUCTS_DIR; };
681B35892A43962D0074F369 /* AWSAPIPluginFunctionalTestsWatch.xctest */ = {isa = PBXFileReference; explicitFileType = wrapper.cfbundle; includeInIndex = 0; path = AWSAPIPluginFunctionalTestsWatch.xctest; sourceTree = BUILT_PRODUCTS_DIR; };
681B35A12A4396CF0074F369 /* AWSAPIPluginGraphQLLambdaAuthTestsWatch.xctest */ = {isa = PBXFileReference; explicitFileType = wrapper.cfbundle; includeInIndex = 0; path = AWSAPIPluginGraphQLLambdaAuthTestsWatch.xctest; sourceTree = BUILT_PRODUCTS_DIR; };
Expand Down Expand Up @@ -899,6 +901,7 @@
children = (
21E581E32A6835910027D13A /* API.swift */,
212626CA289ABC79003788E3 /* Base */,
606C8B782B895E5A00716094 /* AppSyncRealTimeClientTests.swift */,
21698AA82889996A004BD994 /* GraphQLConnectionScenario1Tests.swift */,
21E581E12A6707900027D13A /* GraphQLConnectionScenario1APISwiftTests.swift */,
21698AB62889996A004BD994 /* GraphQLConnectionScenario2Tests.swift */,
Expand Down Expand Up @@ -2180,6 +2183,7 @@
2126273D289ABFEB003788E3 /* Blog6.swift in Sources */,
2126274B289ABFEB003788E3 /* Comment.swift in Sources */,
2126273E289ABFEB003788E3 /* Post6.swift in Sources */,
606C8B792B895E5A00716094 /* AppSyncRealTimeClientTests.swift in Sources */,
2126275F289ABFEB003788E3 /* Post3.swift in Sources */,
21262757289ABFEB003788E3 /* User5+Schema.swift in Sources */,
21262754289ABFEB003788E3 /* Blog6+Schema.swift in Sources */,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
//
// Copyright Amazon.com Inc. or its affiliates.
// All Rights Reserved.
//
// SPDX-License-Identifier: Apache-2.0
//


import XCTest
import Combine
@testable import Amplify
@testable import AWSAPIPlugin
@testable @_spi(AmplifySwift) import AWSPluginsCore

class AppSyncRealTimeClientTests: XCTestCase {
let subscriptionRequest = """
subscription MySubscription {
onCreatePost {
content
createdAt
draft
id
rating
status
title
updatedAt
}
}
"""

var appSyncRealTimeClient: AppSyncRealTimeClient?

override func setUp() async throws {
do {
Amplify.Logging.logLevel = .verbose

let data = try TestConfigHelper.retrieve(
forResource: GraphQLModelBasedTests.amplifyConfiguration
)

let amplifyConfig = try JSONDecoder().decode(JSONValue.self, from: data)
let (endpoint, apiKey) = (amplifyConfig.api?.plugins?.awsAPIPlugin?.asObject?.values
.map { ($0.endpoint?.stringValue, $0.apiKey?.stringValue)}
.first { $0.0 != nil && $0.1 != nil }
.map { ($0.0!, $0.1!) })!


let webSocketClient = WebSocketClient(
url: AppSyncRealTimeClientFactory.appSyncRealTimeEndpoint(URL(string: endpoint)!),
protocols: ["graphql-ws"],
interceptor: APIKeyAuthInterceptor(apiKey: apiKey)
)
appSyncRealTimeClient = AppSyncRealTimeClient(
endpoint: URL(string: endpoint)!,
requestInterceptor: APIKeyAuthInterceptor(apiKey: apiKey),
webSocketClient: webSocketClient
)

} catch {
XCTFail("Failed to setup appSyncRealTimeClient: \(error)")
}
}

override func tearDown() async throws {
await appSyncRealTimeClient?.reset()
appSyncRealTimeClient = nil
}

func testSubscribe_withSubscriptionConnection() async throws {
var cancellables = Set<AnyCancellable>()
let subscribedExpectation = expectation(description: "Subscription established")

try await appSyncRealTimeClient?.connect()
try await appSyncRealTimeClient?.subscribe(
id: UUID().uuidString,
query: Self.appSyncQuery(with: subscriptionRequest)
)
.sink(receiveCompletion: { completion in
print("### completion \(completion)")
}, receiveValue: { event in
if case .subscribed = event {
subscribedExpectation.fulfill()
}
})
.store(in: &cancellables)
await fulfillment(of: [subscribedExpectation], timeout: 5)
}

func testMultThreads_subscribeAndUnsubscribe() async throws {
let concurrentFactor = 100
let expectedSubscription = expectation(description: "Multi threads subscription")
expectedSubscription.expectedFulfillmentCount = concurrentFactor

let expectedUnsubscription = expectation(description: "Multi threads unsubscription")
expectedUnsubscription.expectedFulfillmentCount = concurrentFactor
_ = try await withThrowingTaskGroup(
of: AnyCancellable?.self,
returning: [AnyCancellable?].self
) { taskGroup in
(0..<concurrentFactor).forEach { index in
let id = UUID().uuidString
taskGroup.addTask { [weak self] () -> AnyCancellable? in
guard let self else { return nil }
let subscription: AnyCancellable? = try await self.appSyncRealTimeClient?.subscribe(
id: id,
query: Self.appSyncQuery(with: self.subscriptionRequest)
)
.sink {
if case .subscribed = $0 {
expectedSubscription.fulfill()
} else if case .unsubscribed = $0 {
expectedUnsubscription.fulfill()
}
}
try await self.appSyncRealTimeClient?.unsubscribe(id: id)

return subscription
}

}

return try await taskGroup.reduce([AnyCancellable?]()) { $0 + [$1] }
}

await fulfillment(of: [expectedSubscription, expectedUnsubscription], timeout:1)
}

private static func appSyncQuery(
with query: String,
variables: [String: JSONValue] = [:]
) throws -> String {
let payload: JSONValue = .object([
"query": .string(query),
"variables": (variables.isEmpty ? .null : .object(variables))
])
let data = try JSONEncoder().encode(payload)
return String(data: data, encoding: .utf8)!
}
}

0 comments on commit d641f00

Please sign in to comment.