Skip to content

Commit

Permalink
wait for call to close before asserting
Browse files Browse the repository at this point in the history
  • Loading branch information
marcoferrer committed Jan 12, 2020
1 parent 979174d commit 9bd2cde
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@ class ClientCallBidiStreamingTests :
responseChannel.consumeAsFlow().map { it.message }.toList()
}

callState.blockUntilClosed()

assertEquals(3,result.size)
result.forEachIndexed { index, message ->
assertEquals("Req:#$index/Resp:#$index",message)
Expand Down Expand Up @@ -180,6 +182,8 @@ class ClientCallBidiStreamingTests :
}
}

callState.blockUntilClosed()

assert(requestChannel.isClosedForSend) { "Request channel should be closed for send" }
assert(responseChannel.isClosedForReceive) { "Response channel should be closed for receive" }
}
Expand Down Expand Up @@ -222,6 +226,8 @@ class ClientCallBidiStreamingTests :
}
}

callState.blockUntilCancellation()

verify(exactly = 1) { rpcSpy.call.cancel(any(), any()) }
assert(requestChannel.isClosedForSend) { "Request channel should be closed for send" }
assert(responseChannel.isClosedForReceive) { "Response channel should be closed for receive" }
Expand Down Expand Up @@ -272,6 +278,8 @@ class ClientCallBidiStreamingTests :
}
}

callState.blockUntilCancellation()

verify(exactly = 1) { rpcSpy.call.cancel(any(), any()) }
assert(requestChannel.isClosedForSend) { "Request channel should be closed for send" }
assert(responseChannel.isClosedForReceive) { "Response channel should be closed for receive" }
Expand Down Expand Up @@ -317,6 +325,8 @@ class ClientCallBidiStreamingTests :
}
}

callState.blockUntilCancellation()

verify(exactly = 1) { rpcSpy.call.cancel(any(), any()) }
assert(requestChannel.isClosedForSend) { "Request channel should be closed for send" }
assert(responseChannel.isClosedForReceive) { "Response channel should be closed for receive" }
Expand Down Expand Up @@ -345,18 +355,15 @@ class ClientCallBidiStreamingTests :
requestChannel.close(expectedException)
}

//TODO: Cleanup
// cause = Status.CANCELLED
// .withDescription("Cancelled by client with StreamObserver.onError()")
// .withCause(expectedException)
// .asRuntimeException()
assertFailsWithStatus2(Status.CANCELLED, "CANCELLED: $expectedCancelMessage") {
responseChannel.consumeAsFlow()
.map { it.message }
.collect { result.add(it) }
}
}

callState.blockUntilCancellation()

assert(result.isNotEmpty())
result.forEachIndexed { index, message ->
assertEquals("Req:#$index/Resp:#$index",message)
Expand Down Expand Up @@ -386,11 +393,6 @@ class ClientCallBidiStreamingTests :
result.add(responseChannel.receive().message)
requestChannel.close(expectedException)

//TODO clean up
// cause = Status.CANCELLED
// .withDescription("Cancelled by client with StreamObserver.onError()")
// .withCause(expectedException)
// .asRuntimeException()
assertFailsWithStatus2(Status.CANCELLED, "CANCELLED: $expectedCancelMessage") {
responseChannel.consumeAsFlow()
.collect { result.add(it.message) }
Expand Down Expand Up @@ -437,6 +439,7 @@ class ClientCallBidiStreamingTests :
responseChannel.cancel()
}

callState.blockUntilCancellation()

verify(exactly = 1) { rpcSpy.call.cancel(MESSAGE_CLIENT_CANCELLED_CALL,matchStatus(Status.CANCELLED)) }
assert(requestChannel.isClosedForSend) { "Request channel should be closed for send" }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ class BidiStreamingTests : RpcCallTest<HelloRequest, HelloReply>(GreeterCoroutin
})
lateinit var reqChanSpy: SendChannel<HelloRequest>
runTest(5000L) {
val stub = GreeterCoroutineGrpc.newStub(nonDirectGrpcServerRule.channel)
val stub = GreeterCoroutineGrpc.newStub(grpcServerRule.channel)
.withInterceptors(callState)
.withCoroutineContext()

Expand All @@ -133,9 +133,6 @@ class BidiStreamingTests : RpcCallTest<HelloRequest, HelloReply>(GreeterCoroutin

runBlocking { serverJob.join() }

// 1 - Server requests and receives
// 2 - Message is loaded into outbound buffer
// 3 - Suspending invocation awaiting next onReady
coVerify(exactly = 3) { reqChanSpy.send(any()) }
assert(reqChanSpy.isClosedForSend) { "Request channel should be closed after response channel is closed" }
}
Expand All @@ -162,7 +159,7 @@ class BidiStreamingTests : RpcCallTest<HelloRequest, HelloReply>(GreeterCoroutin

val numMessages = 500000
val receivedCount = AtomicInteger()
runTest(timeout = 60_000) {
runTest(timeout = 60_000 * 2) {
val req = HelloRequest.newBuilder()
.setName("test").build()

Expand Down

0 comments on commit 9bd2cde

Please sign in to comment.