Skip to content

Commit

Permalink
refactor exception and status assertions
Browse files Browse the repository at this point in the history
  • Loading branch information
marcoferrer committed Jan 13, 2020
1 parent 5e35e7c commit 7e3ae1f
Show file tree
Hide file tree
Showing 10 changed files with 124 additions and 96 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import kotlinx.coroutines.CancellableContinuation
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.ReceiveChannel
Expand Down Expand Up @@ -210,12 +209,9 @@ public fun <ReqT, RespT> clientCallClientStreaming(

val rpcScope = newRpcScope(callOptions, method)
val response = CompletableDeferred<RespT>(parent = rpcScope.coroutineContext[Job])
val requestChannel = rpcScope.actor<ReqT>(
capacity = Channel.RENDEZVOUS,
context = Dispatchers.Unconfined
) {
val requestChannel = rpcScope.actor<ReqT>(capacity = Channel.RENDEZVOUS) {
val responseObserver = ClientStreamingResponseObserver(
this, this@actor.channel, response
this@actor.channel, response
)

val call = channel
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ internal class ServerStreamingResponseObserver<ReqT, RespT>: StatefulClientRespo
}

internal class ClientStreamingResponseObserver<ReqT, RespT>(
private val rpcScope: CoroutineScope,
private val requestChannel: Channel<ReqT>,
private val response: CompletableDeferred<RespT>
) : StatefulClientResponseObserver<ReqT, RespT>() {
Expand Down Expand Up @@ -128,8 +127,13 @@ internal class ClientStreamingResponseObserver<ReqT, RespT>(

override fun onError(t: Throwable) {
isAborted.set(true)
response.completeExceptionally(t)
requestChannel.close(t)
if(t is CancellationException){
requestChannel.cancel(t)
response.cancel(t)
}else{
requestChannel.close(t)
response.completeExceptionally(t)
}
readyObserver.cancel(t)
}

Expand Down Expand Up @@ -180,7 +184,7 @@ internal class BidiStreamingResponseObserver<ReqT, RespT>(
}
}

responseChannel = flow<RespT> {
responseChannel = flow {
var error: Throwable? = null
try {
inboundChannel.consumeEach { message ->
Expand Down Expand Up @@ -232,8 +236,12 @@ internal class BidiStreamingResponseObserver<ReqT, RespT>(

override fun onError(t: Throwable) {
isAborted.set(true)
inboundChannel.close(t)
requestChannel.close(t)
if(t is CancellationException){
inboundChannel.cancel(t)
}else{
inboundChannel.close(t)
}
readyObserver.cancel(t)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,50 @@ abstract class RpcCallTest<ReqT, RespT>(
fun setupCall() {
callState = RpcStateInterceptor()
CALL_TRACE_ENABLED = true
// mockkObject(Testing)
//
// every { Testing.asyncClientStreamingCallK<ReqT, RespT>(any(), any()) } answers answer@ {
// val call = firstArg<ClientCall<ReqT, RespT>>()
// val responseObserver = secondArg<ClientResponseObserver<ReqT, RespT>>()
//
// val reqObserver = ClientCalls.asyncClientStreamingCall(call, object: ClientResponseObserver<ReqT, RespT>{
// override fun onNext(value: RespT) {
// responseObserver.onNext(value)
// }
//
// override fun onError(t: Throwable) {
// println("Client: Response observer onError(${t.toDebugString()})")
// responseObserver.onError(t)
// }
//
// override fun onCompleted() {
// println("Client: Response observer onComplete()")
// responseObserver.onCompleted()
// }
//
// override fun beforeStart(requestStream: ClientCallStreamObserver<ReqT>) {
// responseObserver.beforeStart(requestStream)
// }
//
// } as StreamObserver<RespT>)
//
// return@answer object : StreamObserver<ReqT> {
// override fun onNext(value: ReqT) {
// reqObserver.onNext(value)
// }
//
// override fun onError(t: Throwable) {
// println("Client: Request observer onError(${t.toDebugString()})")
// reqObserver.onError(t)
// }
//
// override fun onCompleted() {
// println("Client: Request observer onComplete()")
// reqObserver.onCompleted()
// }
//
// }
// }
}

fun registerService(service: BindableService){
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package com.github.marcoferrer.krotoplus.coroutines.client
import com.github.marcoferrer.krotoplus.coroutines.RpcCallTest
import com.github.marcoferrer.krotoplus.coroutines.utils.assertFails
import com.github.marcoferrer.krotoplus.coroutines.utils.assertFailsWithCancellation
import com.github.marcoferrer.krotoplus.coroutines.utils.assertFailsWithStatus2
import com.github.marcoferrer.krotoplus.coroutines.utils.assertFailsWithStatus
import com.github.marcoferrer.krotoplus.coroutines.utils.matchStatus
import com.github.marcoferrer.krotoplus.coroutines.utils.matchThrowable
import com.github.marcoferrer.krotoplus.coroutines.withCoroutineContext
Expand Down Expand Up @@ -163,7 +163,7 @@ class ClientCallBidiStreamingTests :
.setName(it.toString())
.build())
}
assertFailsWithStatus2(Status.INVALID_ARGUMENT) {
assertFailsWithStatus(Status.INVALID_ARGUMENT) {
send(HelloRequest.newBuilder()
.setName("fails")
.build()
Expand All @@ -175,7 +175,7 @@ class ClientCallBidiStreamingTests :
repeat(3) {
assertEquals("Req:#$it/Resp:#$it", responseChannel.receive().message)
}
assertFailsWithStatus2(Status.INVALID_ARGUMENT) {
assertFailsWithStatus(Status.INVALID_ARGUMENT) {
responseChannel.receive().message
}
}
Expand Down Expand Up @@ -303,7 +303,7 @@ class ClientCallBidiStreamingTests :

launch(Dispatchers.Default) {
launch(start = CoroutineStart.UNDISPATCHED) {
assertFailsWithStatus2(Status.CANCELLED) {
assertFailsWithStatus(Status.CANCELLED) {
repeat(3) {
requestChannel.send(
HelloRequest.newBuilder()
Expand All @@ -317,7 +317,7 @@ class ClientCallBidiStreamingTests :
launch {
error("cancel")
}
assertFailsWithStatus2(Status.CANCELLED) {
assertFailsWithStatus(Status.CANCELLED) {
callChannel.responseChannel.receive().message
}
}
Expand Down Expand Up @@ -354,7 +354,7 @@ class ClientCallBidiStreamingTests :
requestChannel.close(expectedException)
}

assertFailsWithStatus2(Status.CANCELLED, "CANCELLED: $expectedCancelMessage") {
assertFailsWithStatus(Status.CANCELLED, "CANCELLED: $expectedCancelMessage") {
responseChannel.consumeAsFlow()
.map { it.message }
.collect { result.add(it) }
Expand Down Expand Up @@ -392,12 +392,13 @@ class ClientCallBidiStreamingTests :
result.add(responseChannel.receive().message)
requestChannel.close(expectedException)

assertFailsWithStatus2(Status.CANCELLED, "CANCELLED: $expectedCancelMessage") {
assertFailsWithStatus(Status.CANCELLED, "CANCELLED: $expectedCancelMessage") {
responseChannel.consumeAsFlow()
.collect { result.add(it.message) }
}
}

callState.client.cancelled.assertBlocking{ "Client must be cancelled" }

assertEquals(1, result.size)
result.forEachIndexed { index, message ->
Expand All @@ -419,7 +420,7 @@ class ClientCallBidiStreamingTests :

runTest {
launch {
assertFailsWithStatus2(Status.CANCELLED) {
assertFailsWithStatus(Status.CANCELLED) {
repeat(10) {
requestChannel.send(
HelloRequest.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@ package com.github.marcoferrer.krotoplus.coroutines.client

import com.github.marcoferrer.krotoplus.coroutines.RpcCallTest
import com.github.marcoferrer.krotoplus.coroutines.utils.assertExEquals
import com.github.marcoferrer.krotoplus.coroutines.utils.assertFails
import com.github.marcoferrer.krotoplus.coroutines.utils.assertFailsWithCancellation
import com.github.marcoferrer.krotoplus.coroutines.utils.assertFailsWithStatus
import com.github.marcoferrer.krotoplus.coroutines.utils.assertFailsWithStatus2
import com.github.marcoferrer.krotoplus.coroutines.utils.matchThrowable
import com.github.marcoferrer.krotoplus.coroutines.utils.toDebugString
import com.github.marcoferrer.krotoplus.coroutines.withCoroutineContext
Expand All @@ -31,10 +30,11 @@ import io.grpc.examples.helloworld.GreeterGrpc
import io.grpc.examples.helloworld.HelloReply
import io.grpc.examples.helloworld.HelloRequest
import io.grpc.stub.StreamObserver
import io.mockk.coVerify
import io.mockk.spyk
import io.mockk.verify
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineStart
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.cancel
Expand Down Expand Up @@ -176,24 +176,28 @@ class ClientCallClientStreamingTests :
requestsSent++
requestChannel.send(
HelloRequest.newBuilder()
.setName(it.toString())
.setName(0.toString())
.build()
)
}
assertFailsWithStatus(Status.INVALID_ARGUMENT) {
requestChannel.send(
HelloRequest.newBuilder()
.setName("request")
.build()
)
}
}
assertFailsWithStatus(Status.INVALID_ARGUMENT) {
response.await().message
}
}
}

assertFailsWithStatus(Status.INVALID_ARGUMENT) {
runBlocking { response.await().message }
}
assertFailsWithStatus(Status.INVALID_ARGUMENT) {
runBlocking {
requestChannel.send(
HelloRequest.newBuilder()
.setName("request")
.build()
)
}
}


assertEquals(2,requestsSent)
assert(requestChannel.isClosedForSend) { "Request channel should be closed for send" }
}
Expand All @@ -213,11 +217,11 @@ class ClientCallClientStreamingTests :
launch(Dispatchers.Default) {
val job = launch {
launch(start = CoroutineStart.UNDISPATCHED){
assertFailsWithStatus(Status.CANCELLED) {
assertFailsWithCancellation {
response.await().message
}
}
assertFailsWithStatus(Status.CANCELLED) {
assertFailsWithCancellation {
repeat(3) {
delay(5)
requestChannel.send(
Expand All @@ -236,6 +240,8 @@ class ClientCallClientStreamingTests :
}
}

callState.client.cancelled.assertBlocking { "Client must be cancelled" }

verify(exactly = 1) { rpcSpy.call.cancel(any(), any()) }
assert(requestChannel.isClosedForSend) { "Request channel should be closed for send" }
}
Expand All @@ -247,37 +253,42 @@ class ClientCallClientStreamingTests :
setupServerHandlerNoop()

lateinit var requestChannel: SendChannel<HelloRequest>
assertFails<CancellationException> {
lateinit var response: Deferred<HelloReply>

assertFailsWithCancellation {
runBlocking {
launch(start = CoroutineStart.UNDISPATCHED) {
val callChannel = stub
.withCoroutineContext()
.clientCallClientStreaming(methodDescriptor)

requestChannel = callChannel.requestChannel
requestChannel = spyk(callChannel.requestChannel)
response = callChannel.response

val job = launch {
callChannel.response.await().message
}
assertFailsWithStatus(Status.CANCELLED) {
assertFailsWithCancellation {
repeat(3) {
requestChannel.send(
HelloRequest.newBuilder()
.setName(it.toString())
.build()
)
delay(5)
callState.client.cancelled.await()
}
}
assertFailsWithStatus(Status.CANCELLED) {
job.join()
}
}
callState.client.onReady.await()
cancel()
}
}

verify { rpcSpy.call.cancel(any(), any()) }
callState.client.cancelled.assertBlocking { "Client must be cancelled" }

assertFailsWithCancellation {
runBlocking { response.await().message }
}

coVerify(exactly = 1) { requestChannel.send(any()) }
verify(exactly = 1) { rpcSpy.call.cancel(any(), any()) }
assert(requestChannel.isClosedForSend) { "Request channel should be closed for send" }

}
Expand All @@ -299,7 +310,7 @@ class ClientCallClientStreamingTests :

launch(Dispatchers.Default) {
launch(start = CoroutineStart.UNDISPATCHED) {
assertFailsWithStatus(Status.CANCELLED) {
assertFailsWithCancellation {
repeat(3) {
requestChannel.send(
HelloRequest.newBuilder()
Expand All @@ -313,14 +324,14 @@ class ClientCallClientStreamingTests :
launch {
error("cancel")
}
assertFailsWithStatus(Status.CANCELLED) {
assertFailsWithCancellation {
callChannel.response.await().message
}
}
}
}

verify { rpcSpy.call.cancel(any(), any()) }
verify(exactly = 1) { rpcSpy.call.cancel(any(), any()) }
assert(requestChannel.isClosedForSend) { "Request channel should be closed for send" }

}
Expand Down Expand Up @@ -348,7 +359,7 @@ class ClientCallClientStreamingTests :
requestChannel.close(expectedException)
}

assertFailsWithStatus2(Status.CANCELLED,"CANCELLED: $expectedCancelMessage"){
assertFailsWithStatus(Status.CANCELLED,"CANCELLED: $expectedCancelMessage"){
response.await()
}
}
Expand Down
Loading

0 comments on commit 7e3ae1f

Please sign in to comment.