From d1635272edadadf05291348db97f125625c17440 Mon Sep 17 00:00:00 2001 From: Peter Vlugter Date: Mon, 21 Dec 2020 12:36:09 +1300 Subject: [PATCH] Rename interceptor expect client/service to expect incoming/outgoing --- .../scala/io/cloudstate/tck/ActionTCK.scala | 84 ++++++------ .../io/cloudstate/tck/CrdtEntityTCK.scala | 118 ++++++++--------- .../scala/io/cloudstate/tck/EntityTCK.scala | 104 +++++++-------- .../tck/EventSourcedEntityTCK.scala | 122 +++++++++--------- .../scala/io/cloudstate/tck/EventingTCK.scala | 12 +- .../action/InterceptActionService.scala | 16 +-- .../testkit/crdt/InterceptCrdtService.scala | 10 +- .../InterceptEventSourcedService.scala | 12 +- .../InterceptValueEntityService.scala | 10 +- 9 files changed, 244 insertions(+), 244 deletions(-) diff --git a/tck/src/main/scala/io/cloudstate/tck/ActionTCK.scala b/tck/src/main/scala/io/cloudstate/tck/ActionTCK.scala index d55d8f616..2ad585b3c 100644 --- a/tck/src/main/scala/io/cloudstate/tck/ActionTCK.scala +++ b/tck/src/main/scala/io/cloudstate/tck/ActionTCK.scala @@ -449,8 +449,8 @@ trait ActionTCK extends TCKSpec { tckModelClient.processUnary(single(replyWith("one"))).futureValue mustBe Response("one") interceptor .expectActionUnaryConnection() - .expectClient(processUnary(single(replyWith("one")))) - .expectService(reply(Response("one"))) + .expectIncoming(processUnary(single(replyWith("one")))) + .expectOutgoing(reply(Response("one"))) } "verify streamed-in command processing" in actionTest { @@ -461,10 +461,10 @@ trait ActionTCK extends TCKSpec { response.futureValue mustBe Response("two") interceptor .expectActionStreamedInConnection() - .expectClient(processStreamedIn) - .expectClient(command(single(replyWith("two")))) + .expectIncoming(processStreamedIn) + .expectIncoming(command(single(replyWith("two")))) .expectInComplete() - .expectService(reply(Response("two"))) + .expectOutgoing(reply(Response("two"))) } "verify streamed-out command processing" in actionTest { @@ -481,10 +481,10 @@ trait ActionTCK extends TCKSpec { .expectComplete() interceptor .expectActionStreamedOutConnection() - .expectClient(processStreamedOut(streamedOutRequest)) - .expectService(reply(Response("A"))) - .expectService(reply(Response("B"))) - .expectService(reply(Response("C"))) + .expectIncoming(processStreamedOut(streamedOutRequest)) + .expectOutgoing(reply(Response("A"))) + .expectOutgoing(reply(Response("B"))) + .expectOutgoing(reply(Response("C"))) .expectOutComplete() } @@ -508,13 +508,13 @@ trait ActionTCK extends TCKSpec { .expectComplete() interceptor .expectActionStreamedConnection() - .expectClient(processStreamed) - .expectClient(command(single(replyWith("X")))) - .expectService(reply(Response("X"))) - .expectClient(command(single(replyWith("Y")))) - .expectService(reply(Response("Y"))) - .expectClient(command(single(replyWith("Z")))) - .expectService(reply(Response("Z"))) + .expectIncoming(processStreamed) + .expectIncoming(command(single(replyWith("X")))) + .expectOutgoing(reply(Response("X"))) + .expectIncoming(command(single(replyWith("Y")))) + .expectOutgoing(reply(Response("Y"))) + .expectIncoming(command(single(replyWith("Z")))) + .expectOutgoing(reply(Response("Z"))) .expectComplete() } @@ -522,22 +522,22 @@ trait ActionTCK extends TCKSpec { tckModelClient.processUnary(single(forwardTo("other"))).futureValue mustBe Response() interceptor .expectActionUnaryConnection() - .expectClient(processUnary(single(forwardTo("other")))) - .expectService(forwarded("other")) + .expectIncoming(processUnary(single(forwardTo("other")))) + .expectOutgoing(forwarded("other")) interceptor .expectActionUnaryConnection() - .expectClient(serviceTwoCall("other")) - .expectService(reply(Response())) + .expectIncoming(serviceTwoCall("other")) + .expectOutgoing(reply(Response())) tckModelClient.processUnary(single(replyWith(""), sideEffectTo("another"))).futureValue mustBe Response() interceptor .expectActionUnaryConnection() - .expectClient(processUnary(single(replyWith(""), sideEffectTo("another")))) - .expectService(reply(Response(), sideEffects("another"))) + .expectIncoming(processUnary(single(replyWith(""), sideEffectTo("another")))) + .expectOutgoing(reply(Response(), sideEffects("another"))) interceptor .expectActionUnaryConnection() - .expectClient(serviceTwoCall("another")) - .expectService(reply(Response())) + .expectIncoming(serviceTwoCall("another")) + .expectOutgoing(reply(Response())) } "verify streamed forwards and side effects" in actionTest { @@ -552,22 +552,22 @@ trait ActionTCK extends TCKSpec { responses.request(1).expectNext(Response()) val connection = interceptor .expectActionStreamedConnection() - .expectClient(processStreamed) - .expectClient(command(single(forwardTo("one")))) - .expectService(forwarded("one")) + .expectIncoming(processStreamed) + .expectIncoming(command(single(forwardTo("one")))) + .expectOutgoing(forwarded("one")) interceptor .expectActionUnaryConnection() - .expectClient(serviceTwoCall("one")) - .expectService(reply(Response())) + .expectIncoming(serviceTwoCall("one")) + .expectOutgoing(reply(Response())) requests.sendNext(single(sideEffectTo("two"))) connection - .expectClient(command(single(sideEffectTo("two")))) - .expectService(noReply(sideEffects("two"))) + .expectIncoming(command(single(sideEffectTo("two")))) + .expectOutgoing(noReply(sideEffects("two"))) interceptor .expectActionUnaryConnection() - .expectClient(serviceTwoCall("two")) - .expectService(reply(Response())) + .expectIncoming(serviceTwoCall("two")) + .expectOutgoing(reply(Response())) requests.sendComplete() responses.expectComplete() @@ -580,8 +580,8 @@ trait ActionTCK extends TCKSpec { failed.asInstanceOf[StatusRuntimeException].getStatus.getDescription mustBe "expected failure" interceptor .expectActionUnaryConnection() - .expectClient(processUnary(single(failWith("expected failure")))) - .expectService(failure("expected failure")) + .expectIncoming(processUnary(single(failWith("expected failure")))) + .expectOutgoing(failure("expected failure")) } "verify streamed failures" in actionTest { @@ -593,15 +593,15 @@ trait ActionTCK extends TCKSpec { .ensureSubscription() val connection = interceptor .expectActionStreamedConnection() - .expectClient(processStreamed) + .expectIncoming(processStreamed) requests.sendNext(single(failWith("expected failure"))) val failed = responses.request(1).expectError() failed mustBe a[StatusRuntimeException] failed.asInstanceOf[StatusRuntimeException].getStatus.getDescription mustBe "expected failure" requests.expectCancellation() connection - .expectClient(command(single(failWith("expected failure")))) - .expectService(failure("expected failure")) + .expectIncoming(command(single(failWith("expected failure")))) + .expectOutgoing(failure("expected failure")) } "verify unary HTTP API" in actionTest { @@ -610,16 +610,16 @@ trait ActionTCK extends TCKSpec { .futureValue mustBe """{"message":"foo"}""" interceptor .expectActionUnaryConnection() - .expectClient(processUnary(single(replyWith("foo")))) - .expectService(reply(Response("foo"))) + .expectIncoming(processUnary(single(replyWith("foo")))) + .expectOutgoing(reply(Response("foo"))) client.http .requestToError("tck/model/action/unary", """{"groups": [{"steps": [{"fail": {"message": "boom"}}]}]}""") .futureValue mustBe "boom" interceptor .expectActionUnaryConnection() - .expectClient(processUnary(single(failWith("boom")))) - .expectService(failure("boom")) + .expectIncoming(processUnary(single(failWith("boom")))) + .expectOutgoing(failure("boom")) } } } diff --git a/tck/src/main/scala/io/cloudstate/tck/CrdtEntityTCK.scala b/tck/src/main/scala/io/cloudstate/tck/CrdtEntityTCK.scala index 307f1a847..2e2e4aa3b 100644 --- a/tck/src/main/scala/io/cloudstate/tck/CrdtEntityTCK.scala +++ b/tck/src/main/scala/io/cloudstate/tck/CrdtEntityTCK.scala @@ -1815,24 +1815,24 @@ trait CrdtEntityTCK extends TCKSpec { tckModelClient.process(Request(id)).futureValue mustBe PNCounter.state(0) val connection = interceptor .expectCrdtEntityConnection() - .expectClient(init(Service, id)) - .expectClient(command(1, id, "Process", Request(id))) - .expectService(reply(1, PNCounter.state(0))) + .expectIncoming(init(Service, id)) + .expectIncoming(command(1, id, "Process", Request(id))) + .expectOutgoing(reply(1, PNCounter.state(0))) tckModelClient.process(Request(id, PNCounter.changeBy(+1, -2, +3))).futureValue mustBe PNCounter.state(+2) connection - .expectClient(command(2, id, "Process", Request(id, PNCounter.changeBy(+1, -2, +3)))) - .expectService(reply(2, PNCounter.state(+2), PNCounter.update(+2))) + .expectIncoming(command(2, id, "Process", Request(id, PNCounter.changeBy(+1, -2, +3)))) + .expectOutgoing(reply(2, PNCounter.state(+2), PNCounter.update(+2))) tckModelClient.process(Request(id, PNCounter.changeBy(+4, -5, +6))).futureValue mustBe PNCounter.state(+7) connection - .expectClient(command(3, id, "Process", Request(id, PNCounter.changeBy(+4, -5, +6)))) - .expectService(reply(3, PNCounter.state(+7), PNCounter.update(+5))) + .expectIncoming(command(3, id, "Process", Request(id, PNCounter.changeBy(+4, -5, +6)))) + .expectOutgoing(reply(3, PNCounter.state(+7), PNCounter.update(+5))) tckModelClient.process(Request(id, Seq(requestDelete))).futureValue mustBe PNCounter.state(+7) connection - .expectClient(command(4, id, "Process", Request(id, Seq(requestDelete)))) - .expectService(reply(4, PNCounter.state(+7), deleteCrdt)) + .expectIncoming(command(4, id, "Process", Request(id, Seq(requestDelete)))) + .expectOutgoing(reply(4, PNCounter.state(+7), deleteCrdt)) .expectClosed() } @@ -1840,38 +1840,38 @@ trait CrdtEntityTCK extends TCKSpec { tckModelClient.process(Request(id, GSet.add("one"))).futureValue mustBe GSet.state("one") val connection = interceptor .expectCrdtEntityConnection() - .expectClient(init(Service, id)) - .expectClient(command(1, id, "Process", Request(id, GSet.add("one")))) - .expectService(reply(1, GSet.state("one"), GSet.update("one"))) + .expectIncoming(init(Service, id)) + .expectIncoming(command(1, id, "Process", Request(id, GSet.add("one")))) + .expectOutgoing(reply(1, GSet.state("one"), GSet.update("one"))) tckModelClient.process(Request(id, Seq(forwardTo(id)))).futureValue mustBe Response() connection - .expectClient(command(2, id, "Process", Request(id, Seq(forwardTo(id))))) - .expectService(forward(2, ServiceTwo, "Call", Request(id))) + .expectIncoming(command(2, id, "Process", Request(id, Seq(forwardTo(id))))) + .expectOutgoing(forward(2, ServiceTwo, "Call", Request(id))) val connection2 = interceptor .expectCrdtEntityConnection() - .expectClient(init(ServiceTwo, id)) - .expectClient(command(1, id, "Call", Request(id))) - .expectService(reply(1, Response())) + .expectIncoming(init(ServiceTwo, id)) + .expectIncoming(command(1, id, "Call", Request(id))) + .expectOutgoing(reply(1, Response())) tckModelClient.process(Request(id, Seq(sideEffectTo(id)))).futureValue mustBe GSet.state("one") connection - .expectClient(command(3, id, "Process", Request(id, Seq(sideEffectTo(id))))) - .expectService(reply(3, GSet.state("one"), sideEffects(id))) + .expectIncoming(command(3, id, "Process", Request(id, Seq(sideEffectTo(id))))) + .expectOutgoing(reply(3, GSet.state("one"), sideEffects(id))) connection2 - .expectClient(command(2, id, "Call", Request(id))) - .expectService(reply(2, Response())) + .expectIncoming(command(2, id, "Call", Request(id))) + .expectOutgoing(reply(2, Response())) tckModelClient.process(Request(id, Seq(requestDelete))).futureValue mustBe GSet.state("one") connection - .expectClient(command(4, id, "Process", Request(id, Seq(requestDelete)))) - .expectService(reply(4, GSet.state("one"), deleteCrdt)) + .expectIncoming(command(4, id, "Process", Request(id, Seq(requestDelete)))) + .expectOutgoing(reply(4, GSet.state("one"), deleteCrdt)) .expectClosed() modelTwoClient.call(Request(id, Seq(requestDelete))).futureValue mustBe Response() connection2 - .expectClient(command(3, id, "Call", Request(id, Seq(requestDelete)))) - .expectService(reply(3, Response(), deleteCrdt)) + .expectIncoming(command(3, id, "Call", Request(id, Seq(requestDelete)))) + .expectOutgoing(reply(3, Response(), deleteCrdt)) .expectClosed() } @@ -1879,21 +1879,21 @@ trait CrdtEntityTCK extends TCKSpec { tckModelClient.process(Request(id, GCounter.incrementBy(42))).futureValue mustBe GCounter.state(42) val connection = interceptor .expectCrdtEntityConnection() - .expectClient(init(Service, id)) - .expectClient(command(1, id, "Process", Request(id, GCounter.incrementBy(42)))) - .expectService(reply(1, GCounter.state(42), GCounter.update(42))) + .expectIncoming(init(Service, id)) + .expectIncoming(command(1, id, "Process", Request(id, GCounter.incrementBy(42)))) + .expectOutgoing(reply(1, GCounter.state(42), GCounter.update(42))) val failed = tckModelClient.process(Request(id, Seq(failWith("expected failure")))).failed.futureValue failed mustBe a[StatusRuntimeException] failed.asInstanceOf[StatusRuntimeException].getStatus.getDescription mustBe "expected failure" connection - .expectClient(command(2, id, "Process", Request(id, Seq(failWith("expected failure"))))) - .expectService(failure(2, "expected failure")) + .expectIncoming(command(2, id, "Process", Request(id, Seq(failWith("expected failure"))))) + .expectOutgoing(failure(2, "expected failure")) tckModelClient.process(Request(id, Seq(requestDelete))).futureValue mustBe GCounter.state(42) connection - .expectClient(command(3, id, "Process", Request(id, Seq(requestDelete)))) - .expectService(reply(3, GCounter.state(42), deleteCrdt)) + .expectIncoming(command(3, id, "Process", Request(id, Seq(requestDelete)))) + .expectOutgoing(reply(3, GCounter.state(42), deleteCrdt)) .expectClosed() } @@ -1909,36 +1909,36 @@ trait CrdtEntityTCK extends TCKSpec { monitor.request(1).expectNext(state0) val connection = interceptor .expectCrdtEntityConnection() - .expectClient(init(CrdtTckModel.name, id)) - .expectClient(command(1, id, "ProcessStreamed", StreamedRequest(id), streamed = true)) - .expectService(reply(1, state0, Effects(streamed = true))) + .expectIncoming(init(CrdtTckModel.name, id)) + .expectIncoming(command(1, id, "ProcessStreamed", StreamedRequest(id), streamed = true)) + .expectOutgoing(reply(1, state0, Effects(streamed = true))) val connectRequest = StreamedRequest(id, initialUpdate = voteTrue, cancelUpdate = voteFalse, empty = true) val connect = tckModelClient.processStreamed(connectRequest).runWith(TestSink.probe[Response]) connect.request(1).expectNoMessage(100.millis) monitor.request(1).expectNext(state1) connection - .expectClient(command(2, id, "ProcessStreamed", connectRequest, streamed = true)) - .expectService(crdtReply(2, None, Effects(streamed = true) ++ Vote.update(true))) - .expectService(streamed(1, state1)) + .expectIncoming(command(2, id, "ProcessStreamed", connectRequest, streamed = true)) + .expectOutgoing(crdtReply(2, None, Effects(streamed = true) ++ Vote.update(true))) + .expectOutgoing(streamed(1, state1)) connect.cancel() monitor.request(1).expectNext(state0) connection - .expectClient(crdtStreamCancelled(2, id)) - .expectService(streamCancelledResponse(2, Vote.update(false))) - .expectService(streamed(1, state0)) + .expectIncoming(crdtStreamCancelled(2, id)) + .expectOutgoing(streamCancelledResponse(2, Vote.update(false))) + .expectOutgoing(streamed(1, state0)) monitor.cancel() connection - .expectClient(crdtStreamCancelled(1, id)) - .expectService(streamCancelledResponse(1)) + .expectIncoming(crdtStreamCancelled(1, id)) + .expectOutgoing(streamCancelledResponse(1)) val deleteRequest = Request(id, Seq(requestDelete)) tckModelClient.process(deleteRequest).futureValue mustBe state0 connection - .expectClient(command(3, id, "Process", deleteRequest)) - .expectService(reply(3, state0, deleteCrdt)) + .expectIncoming(command(3, id, "Process", deleteRequest)) + .expectOutgoing(reply(3, state0, deleteCrdt)) .expectClosed() } @@ -1948,37 +1948,37 @@ trait CrdtEntityTCK extends TCKSpec { .futureValue mustBe """{"state":{"pncounter":{"value":"0"}}}""" val connection = interceptor .expectCrdtEntityConnection() - .expectClient(init(Service, id)) - .expectClient(command(1, id, "Process", Request(id))) - .expectService(reply(1, PNCounter.state(0))) + .expectIncoming(init(Service, id)) + .expectIncoming(command(1, id, "Process", Request(id))) + .expectOutgoing(reply(1, PNCounter.state(0))) client.http .request(s"tck/model/crdt/$id", """{"actions": [{"update": {"pncounter": {"change": 42} }}]}""") .futureValue mustBe """{"state":{"pncounter":{"value":"42"}}}""" connection - .expectClient(command(2, id, "Process", Request(id, PNCounter.changeBy(+42)))) - .expectService(reply(2, PNCounter.state(+42), PNCounter.update(+42))) + .expectIncoming(command(2, id, "Process", Request(id, PNCounter.changeBy(+42)))) + .expectOutgoing(reply(2, PNCounter.state(+42), PNCounter.update(+42))) client.http .requestToError(s"tck/model/crdt/$id", """{"actions": [{"fail": {"message": "expected failure"}}]}""") .futureValue mustBe "expected failure" connection - .expectClient(command(3, id, "Process", Request(id, Seq(failWith("expected failure"))))) - .expectService(failure(3, "expected failure")) + .expectIncoming(command(3, id, "Process", Request(id, Seq(failWith("expected failure"))))) + .expectOutgoing(failure(3, "expected failure")) client.http .request(s"tck/model/crdt/$id", """{"actions": [{"update": {"pncounter": {"change": -123} }}]}""") .futureValue mustBe """{"state":{"pncounter":{"value":"-81"}}}""" connection - .expectClient(command(4, id, "Process", Request(id, PNCounter.changeBy(-123)))) - .expectService(reply(4, PNCounter.state(-81), PNCounter.update(-123))) + .expectIncoming(command(4, id, "Process", Request(id, PNCounter.changeBy(-123)))) + .expectOutgoing(reply(4, PNCounter.state(-81), PNCounter.update(-123))) client.http .request(s"tck/model/crdt/$id", """{"actions": [{"delete": {}}]}""") .futureValue mustBe """{"state":{"pncounter":{"value":"-81"}}}""" connection - .expectClient(command(5, id, "Process", Request(id, Seq(requestDelete)))) - .expectService(reply(5, PNCounter.state(-81), deleteCrdt)) + .expectIncoming(command(5, id, "Process", Request(id, Seq(requestDelete)))) + .expectOutgoing(reply(5, PNCounter.state(-81), deleteCrdt)) .expectClosed() } @@ -1986,9 +1986,9 @@ trait CrdtEntityTCK extends TCKSpec { configuredClient.call(Request(id)) interceptor .expectCrdtEntityConnection() - .expectClient(init(ServiceConfigured, id)) - .expectClient(command(1, id, "Call", Request(id))) - .expectService(reply(1, Response())) + .expectIncoming(init(ServiceConfigured, id)) + .expectIncoming(command(1, id, "Call", Request(id))) + .expectOutgoing(reply(1, Response())) .expectClosed(2.seconds) // check passivation (with expected timeout of 100 millis) } } diff --git a/tck/src/main/scala/io/cloudstate/tck/EntityTCK.scala b/tck/src/main/scala/io/cloudstate/tck/EntityTCK.scala index 496696054..3e5f6269e 100644 --- a/tck/src/main/scala/io/cloudstate/tck/EntityTCK.scala +++ b/tck/src/main/scala/io/cloudstate/tck/EntityTCK.scala @@ -324,141 +324,141 @@ trait EntityTCK extends TCKSpec { tckModelClient.process(Request(id)).futureValue mustBe Response() val connection = interceptor .expectEntityConnection() - .expectClient(init(Service, id)) - .expectClient(command(1, id, "Process", Request(id))) - .expectService(reply(1, Response())) + .expectIncoming(init(Service, id)) + .expectIncoming(command(1, id, "Process", Request(id))) + .expectOutgoing(reply(1, Response())) tckModelClient.process(Request(id, updateStates("one"))).futureValue mustBe Response("one") connection - .expectClient(command(2, id, "Process", Request(id, updateStates("one")))) - .expectService(reply(2, Response("one"), update("one"))) + .expectIncoming(command(2, id, "Process", Request(id, updateStates("one")))) + .expectOutgoing(reply(2, Response("one"), update("one"))) tckModelClient.process(Request(id, updateStates("two"))).futureValue mustBe Response("two") connection - .expectClient(command(3, id, "Process", Request(id, updateStates("two")))) - .expectService(reply(3, Response("two"), update("two"))) + .expectIncoming(command(3, id, "Process", Request(id, updateStates("two")))) + .expectOutgoing(reply(3, Response("two"), update("two"))) tckModelClient.process(Request(id)).futureValue mustBe Response("two") connection - .expectClient(command(4, id, "Process", Request(id))) - .expectService(reply(4, Response("two"))) + .expectIncoming(command(4, id, "Process", Request(id))) + .expectOutgoing(reply(4, Response("two"))) tckModelClient.process(Request(id, Seq(deleteState()))).futureValue mustBe Response() connection - .expectClient(command(5, id, "Process", Request(id, Seq(deleteState())))) - .expectService(reply(5, Response(), delete())) + .expectIncoming(command(5, id, "Process", Request(id, Seq(deleteState())))) + .expectOutgoing(reply(5, Response(), delete())) tckModelClient.process(Request(id)).futureValue mustBe Response() connection - .expectClient(command(6, id, "Process", Request(id))) - .expectService(reply(6, Response())) + .expectIncoming(command(6, id, "Process", Request(id))) + .expectOutgoing(reply(6, Response())) tckModelClient.process(Request(id, updateStates("foo"))).futureValue mustBe Response("foo") connection - .expectClient(command(7, id, "Process", Request(id, updateStates("foo")))) - .expectService(reply(7, Response("foo"), update("foo"))) + .expectIncoming(command(7, id, "Process", Request(id, updateStates("foo")))) + .expectOutgoing(reply(7, Response("foo"), update("foo"))) tckModelClient.process(Request(id, Seq(deleteState()))).futureValue mustBe Response() connection - .expectClient(command(8, id, "Process", Request(id, Seq(deleteState())))) - .expectService(reply(8, Response(), delete())) + .expectIncoming(command(8, id, "Process", Request(id, Seq(deleteState())))) + .expectOutgoing(reply(8, Response(), delete())) } "verify forwards and side effects" in valueEntityTest { id => tckModelClient.process(Request(id, updateStates("one"))).futureValue mustBe Response("one") val connection = interceptor .expectEntityConnection() - .expectClient(init(Service, id)) - .expectClient(command(1, id, "Process", Request(id, updateStates("one")))) - .expectService(reply(1, Response("one"), update("one"))) + .expectIncoming(init(Service, id)) + .expectIncoming(command(1, id, "Process", Request(id, updateStates("one")))) + .expectOutgoing(reply(1, Response("one"), update("one"))) tckModelClient.process(Request(id, Seq(forwardTo(id)))).futureValue mustBe Response() connection - .expectClient(command(2, id, "Process", Request(id, Seq(forwardTo(id))))) - .expectService(forward(2, ServiceTwo, "Call", Request(id))) + .expectIncoming(command(2, id, "Process", Request(id, Seq(forwardTo(id))))) + .expectOutgoing(forward(2, ServiceTwo, "Call", Request(id))) val connection2 = interceptor .expectEntityConnection() - .expectClient(init(ServiceTwo, id)) - .expectClient(command(1, id, "Call", Request(id))) - .expectService(reply(1, Response())) + .expectIncoming(init(ServiceTwo, id)) + .expectIncoming(command(1, id, "Call", Request(id))) + .expectOutgoing(reply(1, Response())) tckModelClient.process(Request(id, Seq(sideEffectTo(id)))).futureValue mustBe Response("one") connection - .expectClient(command(3, id, "Process", Request(id, Seq(sideEffectTo(id))))) - .expectService(reply(3, Response("one"), sideEffects(id))) + .expectIncoming(command(3, id, "Process", Request(id, Seq(sideEffectTo(id))))) + .expectOutgoing(reply(3, Response("one"), sideEffects(id))) connection2 - .expectClient(command(2, id, "Call", Request(id))) - .expectService(reply(2, Response())) + .expectIncoming(command(2, id, "Call", Request(id))) + .expectOutgoing(reply(2, Response())) tckModelClient.process(Request(id)).futureValue mustBe Response("one") connection - .expectClient(command(4, id, "Process", Request(id))) - .expectService(reply(4, Response("one"))) + .expectIncoming(command(4, id, "Process", Request(id))) + .expectOutgoing(reply(4, Response("one"))) } "verify failures" in valueEntityTest { id => tckModelClient.process(Request(id, updateStates("one"))).futureValue mustBe Response("one") val connection = interceptor .expectEntityConnection() - .expectClient(init(Service, id)) - .expectClient(command(1, id, "Process", Request(id, updateStates("one")))) - .expectService(reply(1, Response("one"), update("one"))) + .expectIncoming(init(Service, id)) + .expectIncoming(command(1, id, "Process", Request(id, updateStates("one")))) + .expectOutgoing(reply(1, Response("one"), update("one"))) val failed = tckModelClient.process(Request(id, Seq(failWith("expected failure")))).failed.futureValue failed mustBe a[StatusRuntimeException] failed.asInstanceOf[StatusRuntimeException].getStatus.getDescription mustBe "expected failure" connection - .expectClient(command(2, id, "Process", Request(id, Seq(failWith("expected failure"))))) - .expectService(actionFailure(2, "expected failure")) + .expectIncoming(command(2, id, "Process", Request(id, Seq(failWith("expected failure"))))) + .expectOutgoing(actionFailure(2, "expected failure")) tckModelClient.process(Request(id)).futureValue mustBe Response("one") connection - .expectClient(command(3, id, "Process", Request(id))) - .expectService(reply(3, Response("one"))) + .expectIncoming(command(3, id, "Process", Request(id))) + .expectOutgoing(reply(3, Response("one"))) } "verify HTTP API" in valueEntityTest { id => client.http.request(s"tck/model/entity/$id", "{}").futureValue mustBe """{"message":""}""" val connection = interceptor .expectEntityConnection() - .expectClient(init(Service, id)) - .expectClient(command(1, id, "Process", Request(id))) - .expectService(reply(1, Response())) + .expectIncoming(init(Service, id)) + .expectIncoming(command(1, id, "Process", Request(id))) + .expectOutgoing(reply(1, Response())) client.http .request(s"tck/model/entity/$id", """{"actions": [{"update": {"value": "one"}}]}""") .futureValue mustBe """{"message":"one"}""" connection - .expectClient(command(2, id, "Process", Request(id, updateStates("one")))) - .expectService(reply(2, Response("one"), update("one"))) + .expectIncoming(command(2, id, "Process", Request(id, updateStates("one")))) + .expectOutgoing(reply(2, Response("one"), update("one"))) client.http .requestToError(s"tck/model/entity/$id", """{"actions": [{"fail": {"message": "expected failure"}}]}""") .futureValue mustBe "expected failure" connection - .expectClient(command(3, id, "Process", Request(id, Seq(failWith("expected failure"))))) - .expectService(actionFailure(3, "expected failure")) + .expectIncoming(command(3, id, "Process", Request(id, Seq(failWith("expected failure"))))) + .expectOutgoing(actionFailure(3, "expected failure")) client.http .request(s"tck/model/entity/$id", """{"actions": [{"update": {"value": "two"}}]}""") .futureValue mustBe """{"message":"two"}""" connection - .expectClient(command(4, id, "Process", Request(id, updateStates("two")))) - .expectService(reply(4, Response("two"), update("two"))) + .expectIncoming(command(4, id, "Process", Request(id, updateStates("two")))) + .expectOutgoing(reply(4, Response("two"), update("two"))) client.http.request(s"tck/model/entity/$id", "{}").futureValue mustBe """{"message":"two"}""" connection - .expectClient(command(5, id, "Process", Request(id))) - .expectService(reply(5, Response("two"))) + .expectIncoming(command(5, id, "Process", Request(id))) + .expectOutgoing(reply(5, Response("two"))) } "verify passivation timeout" in valueEntityConfiguredTest { id => configuredClient.call(Request(id)) interceptor .expectEntityConnection() - .expectClient(init(ServiceConfigured, id)) - .expectClient(command(1, id, "Call", Request(id))) - .expectService(reply(1, Response())) + .expectIncoming(init(ServiceConfigured, id)) + .expectIncoming(command(1, id, "Call", Request(id))) + .expectOutgoing(reply(1, Response())) .expectClosed(2.seconds) // check passivation (with expected timeout of 100 millis) } } diff --git a/tck/src/main/scala/io/cloudstate/tck/EventSourcedEntityTCK.scala b/tck/src/main/scala/io/cloudstate/tck/EventSourcedEntityTCK.scala index 1448a9782..e8ee3afba 100644 --- a/tck/src/main/scala/io/cloudstate/tck/EventSourcedEntityTCK.scala +++ b/tck/src/main/scala/io/cloudstate/tck/EventSourcedEntityTCK.scala @@ -367,167 +367,167 @@ trait EventSourcedEntityTCK extends TCKSpec { tckModelClient.process(Request(id)).futureValue mustBe Response() val connection = interceptor .expectEventSourcedEntityConnection() - .expectClient(init(Service, id)) - .expectClient(command(1, id, "Process", Request(id))) - .expectService(reply(1, Response())) + .expectIncoming(init(Service, id)) + .expectIncoming(command(1, id, "Process", Request(id))) + .expectOutgoing(reply(1, Response())) tckModelClient.process(Request(id, emitEvents("A"))).futureValue mustBe Response("A") connection - .expectClient(command(2, id, "Process", Request(id, emitEvents("A")))) - .expectService(reply(2, Response("A"), events("A"))) + .expectIncoming(command(2, id, "Process", Request(id, emitEvents("A")))) + .expectOutgoing(reply(2, Response("A"), events("A"))) tckModelClient.process(Request(id, emitEvents("B", "C", "D"))).futureValue mustBe Response("ABCD") connection - .expectClient(command(3, id, "Process", Request(id, emitEvents("B", "C", "D")))) - .expectService(reply(3, Response("ABCD"), events("B", "C", "D"))) + .expectIncoming(command(3, id, "Process", Request(id, emitEvents("B", "C", "D")))) + .expectOutgoing(reply(3, Response("ABCD"), events("B", "C", "D"))) tckModelClient.process(Request(id, emitEvents("E"))).futureValue mustBe Response("ABCDE") connection - .expectClient(command(4, id, "Process", Request(id, emitEvents("E")))) - .expectService(reply(4, Response("ABCDE"), snapshotAndEvents("ABCDE", "E"))) + .expectIncoming(command(4, id, "Process", Request(id, emitEvents("E")))) + .expectOutgoing(reply(4, Response("ABCDE"), snapshotAndEvents("ABCDE", "E"))) tckModelClient.process(Request(id, emitEvents("F", "G", "H", "I"))).futureValue mustBe Response("ABCDEFGHI") connection - .expectClient(command(5, id, "Process", Request(id, emitEvents("F", "G", "H", "I")))) - .expectService(reply(5, Response("ABCDEFGHI"), events("F", "G", "H", "I"))) + .expectIncoming(command(5, id, "Process", Request(id, emitEvents("F", "G", "H", "I")))) + .expectOutgoing(reply(5, Response("ABCDEFGHI"), events("F", "G", "H", "I"))) tckModelClient.process(Request(id, emitEvents("J", "K"))).futureValue mustBe Response("ABCDEFGHIJK") connection - .expectClient(command(6, id, "Process", Request(id, emitEvents("J", "K")))) - .expectService(reply(6, Response("ABCDEFGHIJK"), snapshotAndEvents("ABCDEFGHIJK", "J", "K"))) + .expectIncoming(command(6, id, "Process", Request(id, emitEvents("J", "K")))) + .expectOutgoing(reply(6, Response("ABCDEFGHIJK"), snapshotAndEvents("ABCDEFGHIJK", "J", "K"))) } "verify forwards and side effects" in eventSourcedTest { id => tckModelClient.process(Request(id, emitEvents("one"))).futureValue mustBe Response("one") val connection = interceptor .expectEventSourcedEntityConnection() - .expectClient(init(Service, id)) - .expectClient(command(1, id, "Process", Request(id, emitEvents("one")))) - .expectService(reply(1, Response("one"), events("one"))) + .expectIncoming(init(Service, id)) + .expectIncoming(command(1, id, "Process", Request(id, emitEvents("one")))) + .expectOutgoing(reply(1, Response("one"), events("one"))) tckModelClient.process(Request(id, Seq(forwardTo(id)))).futureValue mustBe Response() connection - .expectClient(command(2, id, "Process", Request(id, Seq(forwardTo(id))))) - .expectService(forward(2, ServiceTwo, "Call", Request(id))) + .expectIncoming(command(2, id, "Process", Request(id, Seq(forwardTo(id))))) + .expectOutgoing(forward(2, ServiceTwo, "Call", Request(id))) val connection2 = interceptor .expectEventSourcedEntityConnection() - .expectClient(init(ServiceTwo, id)) - .expectClient(command(1, id, "Call", Request(id))) - .expectService(reply(1, Response())) + .expectIncoming(init(ServiceTwo, id)) + .expectIncoming(command(1, id, "Call", Request(id))) + .expectOutgoing(reply(1, Response())) tckModelClient.process(Request(id, Seq(sideEffectTo(id)))).futureValue mustBe Response("one") connection - .expectClient(command(3, id, "Process", Request(id, Seq(sideEffectTo(id))))) - .expectService(reply(3, Response("one"), sideEffects(id))) + .expectIncoming(command(3, id, "Process", Request(id, Seq(sideEffectTo(id))))) + .expectOutgoing(reply(3, Response("one"), sideEffects(id))) connection2 - .expectClient(command(2, id, "Call", Request(id))) - .expectService(reply(2, Response())) + .expectIncoming(command(2, id, "Call", Request(id))) + .expectOutgoing(reply(2, Response())) tckModelClient.process(Request(id)).futureValue mustBe Response("one") connection - .expectClient(command(4, id, "Process", Request(id))) - .expectService(reply(4, Response("one"))) + .expectIncoming(command(4, id, "Process", Request(id))) + .expectOutgoing(reply(4, Response("one"))) } "verify failures" in eventSourcedTest { id => tckModelClient.process(Request(id, emitEvents("1", "2", "3"))).futureValue mustBe Response("123") val connection = interceptor .expectEventSourcedEntityConnection() - .expectClient(init(Service, id)) - .expectClient(command(1, id, "Process", Request(id, emitEvents("1", "2", "3")))) - .expectService(reply(1, Response("123"), events("1", "2", "3"))) + .expectIncoming(init(Service, id)) + .expectIncoming(command(1, id, "Process", Request(id, emitEvents("1", "2", "3")))) + .expectOutgoing(reply(1, Response("123"), events("1", "2", "3"))) val failed = tckModelClient.process(Request(id, Seq(failWith("expected failure")))).failed.futureValue failed mustBe a[StatusRuntimeException] failed.asInstanceOf[StatusRuntimeException].getStatus.getDescription mustBe "expected failure" connection - .expectClient(command(2, id, "Process", Request(id, Seq(failWith("expected failure"))))) - .expectService(actionFailure(2, "expected failure")) + .expectIncoming(command(2, id, "Process", Request(id, Seq(failWith("expected failure"))))) + .expectOutgoing(actionFailure(2, "expected failure")) tckModelClient.process(Request(id)).futureValue mustBe Response("123") connection - .expectClient(command(3, id, "Process", Request(id))) - .expectService(reply(3, Response("123"))) + .expectIncoming(command(3, id, "Process", Request(id))) + .expectOutgoing(reply(3, Response("123"))) val emitAndFail = Seq(emitEvent("4"), failWith("another failure"), emitEvent("5")) val failed2 = tckModelClient.process(Request(id, emitAndFail)).failed.futureValue failed2 mustBe a[StatusRuntimeException] failed2.asInstanceOf[StatusRuntimeException].getStatus.getDescription mustBe "another failure" connection - .expectClient(command(4, id, "Process", Request(id, emitAndFail))) - .expectService(actionFailure(4, "another failure", restart = true)) + .expectIncoming(command(4, id, "Process", Request(id, emitAndFail))) + .expectOutgoing(actionFailure(4, "another failure", restart = true)) .expectClosed() val connection2 = interceptor .expectEventSourcedEntityConnection() - .expectClient(init(Service, id)) - .expectClient(event(1, persisted("1"))) - .expectClient(event(2, persisted("2"))) - .expectClient(event(3, persisted("3"))) + .expectIncoming(init(Service, id)) + .expectIncoming(event(1, persisted("1"))) + .expectIncoming(event(2, persisted("2"))) + .expectIncoming(event(3, persisted("3"))) tckModelClient.process(Request(id, emitEvents("4", "5"))).futureValue mustBe Response("12345") connection2 - .expectClient(command(1, id, "Process", Request(id, emitEvents("4", "5")))) - .expectService(reply(1, Response("12345"), snapshotAndEvents("12345", "4", "5"))) + .expectIncoming(command(1, id, "Process", Request(id, emitEvents("4", "5")))) + .expectOutgoing(reply(1, Response("12345"), snapshotAndEvents("12345", "4", "5"))) } "verify HTTP API" in eventSourcedTest { id => client.http.request(s"tck/model/eventsourced/$id", "{}").futureValue mustBe """{"message":""}""" val connection = interceptor .expectEventSourcedEntityConnection() - .expectClient(init(Service, id)) - .expectClient(command(1, id, "Process", Request(id))) - .expectService(reply(1, Response())) + .expectIncoming(init(Service, id)) + .expectIncoming(command(1, id, "Process", Request(id))) + .expectOutgoing(reply(1, Response())) client.http .request(s"tck/model/eventsourced/$id", """{"actions": [{"emit": {"value": "x"}}]}""") .futureValue mustBe """{"message":"x"}""" connection - .expectClient(command(2, id, "Process", Request(id, emitEvents("x")))) - .expectService(reply(2, Response("x"), events("x"))) + .expectIncoming(command(2, id, "Process", Request(id, emitEvents("x")))) + .expectOutgoing(reply(2, Response("x"), events("x"))) client.http .requestToError(s"tck/model/eventsourced/$id", """{"actions": [{"fail": {"message": "expected failure"}}]}""") .futureValue mustBe "expected failure" connection - .expectClient(command(3, id, "Process", Request(id, Seq(failWith("expected failure"))))) - .expectService(actionFailure(3, "expected failure")) + .expectIncoming(command(3, id, "Process", Request(id, Seq(failWith("expected failure"))))) + .expectOutgoing(actionFailure(3, "expected failure")) client.http .request(s"tck/model/eventsourced/$id", """{"actions": [{"emit": {"value": "y"}}]}""") .futureValue mustBe """{"message":"xy"}""" connection - .expectClient(command(4, id, "Process", Request(id, emitEvents("y")))) - .expectService(reply(4, Response("xy"), events("y"))) + .expectIncoming(command(4, id, "Process", Request(id, emitEvents("y")))) + .expectOutgoing(reply(4, Response("xy"), events("y"))) client.http .requestToError(s"tck/model/eventsourced/$id", """{"actions": [{"emit": {"value": "z"}}, {"fail": {"message": "emit then fail"}}]}""") .futureValue mustBe "emit then fail" connection - .expectClient(command(5, id, "Process", Request(id, Seq(emitEvent("z"), failWith("emit then fail"))))) - .expectService(actionFailure(5, "emit then fail", restart = true)) + .expectIncoming(command(5, id, "Process", Request(id, Seq(emitEvent("z"), failWith("emit then fail"))))) + .expectOutgoing(actionFailure(5, "emit then fail", restart = true)) .expectClosed() val connection2 = interceptor .expectEventSourcedEntityConnection() - .expectClient(init(Service, id)) - .expectClient(event(1, persisted("x"))) - .expectClient(event(2, persisted("y"))) + .expectIncoming(init(Service, id)) + .expectIncoming(event(1, persisted("x"))) + .expectIncoming(event(2, persisted("y"))) client.http .request(s"tck/model/eventsourced/$id", """{"actions": [{"emit": {"value": "z"}}]}""") .futureValue mustBe """{"message":"xyz"}""" connection2 - .expectClient(command(1, id, "Process", Request(id, emitEvents("z")))) - .expectService(reply(1, Response("xyz"), events("z"))) + .expectIncoming(command(1, id, "Process", Request(id, emitEvents("z")))) + .expectOutgoing(reply(1, Response("xyz"), events("z"))) } "verify passivation timeout" in eventSourcedConfiguredTest { id => configuredClient.call(Request(id)) interceptor .expectEventSourcedEntityConnection() - .expectClient(init(ServiceConfigured, id)) - .expectClient(command(1, id, "Call", Request(id))) - .expectService(reply(1, Response())) + .expectIncoming(init(ServiceConfigured, id)) + .expectIncoming(command(1, id, "Call", Request(id))) + .expectOutgoing(reply(1, Response())) .expectClosed(2.seconds) // check passivation (with expected timeout of 100 millis) } } diff --git a/tck/src/main/scala/io/cloudstate/tck/EventingTCK.scala b/tck/src/main/scala/io/cloudstate/tck/EventingTCK.scala index aa7b30813..5dd6ebfa4 100644 --- a/tck/src/main/scala/io/cloudstate/tck/EventingTCK.scala +++ b/tck/src/main/scala/io/cloudstate/tck/EventingTCK.scala @@ -56,11 +56,11 @@ trait EventingTCK extends TCKSpec { def verifyEventSourcedInitCommandReply(id: String): Unit = { val connection = interceptor.expectEventSourcedEntityConnection() - val init = connection.expectClientMessage[EventSourcedStreamIn.Message.Init] + val init = connection.expectIncomingMessage[EventSourcedStreamIn.Message.Init] init.value.serviceName must ===(eventlogeventing.EventSourcedEntityOne.name) init.value.entityId must ===(id) - connection.expectClientMessage[EventSourcedStreamIn.Message.Command] - connection.expectServiceMessage[EventSourcedStreamOut.Message.Reply] + connection.expectIncomingMessage[EventSourcedStreamIn.Message.Command] + connection.expectOutgoingMessage[EventSourcedStreamOut.Message.Reply] } def verifySubscriberCommandResponse(step: eventlogeventing.ProcessStep.Step): ActionResponse = { @@ -121,11 +121,11 @@ trait EventingTCK extends TCKSpec { ) val connection = interceptor.expectEventSourcedEntityConnection() - val init = connection.expectClientMessage[EventSourcedStreamIn.Message.Init] + val init = connection.expectIncomingMessage[EventSourcedStreamIn.Message.Init] init.value.serviceName must ===(eventlogeventing.EventSourcedEntityTwo.name) init.value.entityId must ===("eventlogeventing:3") - connection.expectClientMessage[EventSourcedStreamIn.Message.Command] - val reply = connection.expectServiceMessage[EventSourcedStreamOut.Message.Reply] + connection.expectIncomingMessage[EventSourcedStreamIn.Message.Command] + val reply = connection.expectOutgoingMessage[EventSourcedStreamOut.Message.Reply] reply.value.events must have size (1) reply.value.events.head.typeUrl must startWith("json.cloudstate.io/") diff --git a/testkit/src/main/scala/io/cloudstate/testkit/action/InterceptActionService.scala b/testkit/src/main/scala/io/cloudstate/testkit/action/InterceptActionService.scala index a5d2d3d4d..f25ff8c0b 100644 --- a/testkit/src/main/scala/io/cloudstate/testkit/action/InterceptActionService.scala +++ b/testkit/src/main/scala/io/cloudstate/testkit/action/InterceptActionService.scala @@ -119,13 +119,13 @@ object InterceptActionService { def expectResponse(): ActionResponse = out.expectMsgType[ActionResponse] - def expectClient(expected: ActionCommand): UnaryConnection = { + def expectIncoming(expected: ActionCommand): UnaryConnection = { val received = ignoreMetadata(command) assert(received == expected, s"Unexpected unary action command: expected $expected, found $received") this } - def expectService(expected: ActionResponse): UnaryConnection = { + def expectOutgoing(expected: ActionResponse): UnaryConnection = { val received = ignoreMetadata(expectResponse()) assert(received == expected, s"Unexpected unary action response: expected $expected, found $received") this @@ -144,13 +144,13 @@ object InterceptActionService { def expectCommand(): ActionCommand = in.expectMsgType[ActionCommand] - def expectClient(expected: ActionCommand): StreamedInConnection = { + def expectIncoming(expected: ActionCommand): StreamedInConnection = { val received = ignoreMetadata(expectCommand()) assert(received == expected, s"Unexpected streamed-in action command: expected $expected, found $received") this } - def expectService(expected: ActionResponse): StreamedInConnection = { + def expectOutgoing(expected: ActionResponse): StreamedInConnection = { val received = ignoreMetadata(expectResponse()) assert(received == expected, s"Unexpected unary action response: expected $expected, found $received") this @@ -170,13 +170,13 @@ object InterceptActionService { def expectResponse(): ActionResponse = out.expectMsgType[ActionResponse] - def expectClient(expected: ActionCommand): StreamedOutConnection = { + def expectIncoming(expected: ActionCommand): StreamedOutConnection = { val received = ignoreMetadata(command) assert(received == expected, s"Unexpected streamed-out action command: expected $expected, found $received") this } - def expectService(expected: ActionResponse): StreamedOutConnection = { + def expectOutgoing(expected: ActionResponse): StreamedOutConnection = { val received = ignoreMetadata(expectResponse()) assert(received == expected, s"Unexpected unary action response: expected $expected, found $received") this @@ -201,13 +201,13 @@ object InterceptActionService { def expectResponse(): ActionResponse = out.expectMsgType[ActionResponse] - def expectClient(expected: ActionCommand): StreamedConnection = { + def expectIncoming(expected: ActionCommand): StreamedConnection = { val received = ignoreMetadata(expectCommand()) assert(received == expected, s"Unexpected streamed action command: expected $expected, found $received") this } - def expectService(expected: ActionResponse): StreamedConnection = { + def expectOutgoing(expected: ActionResponse): StreamedConnection = { val received = ignoreMetadata(expectResponse()) assert(received == expected, s"Unexpected unary action response: expected $expected, found $received") this diff --git a/testkit/src/main/scala/io/cloudstate/testkit/crdt/InterceptCrdtService.scala b/testkit/src/main/scala/io/cloudstate/testkit/crdt/InterceptCrdtService.scala index 22be1f658..93b773af9 100644 --- a/testkit/src/main/scala/io/cloudstate/testkit/crdt/InterceptCrdtService.scala +++ b/testkit/src/main/scala/io/cloudstate/testkit/crdt/InterceptCrdtService.scala @@ -66,20 +66,20 @@ object InterceptCrdtService { private[testkit] def inSink: Sink[CrdtStreamIn, NotUsed] = Sink.actorRef(in.ref, Complete, Error.apply) private[testkit] def outSink: Sink[CrdtStreamOut, NotUsed] = Sink.actorRef(out.ref, Complete, Error.apply) - def expectClient(message: CrdtStreamIn.Message): Connection = { + def expectIncoming(message: CrdtStreamIn.Message): Connection = { in.expectMsg(CrdtStreamIn(message)) this } - def expectService(message: CrdtStreamOut.Message): Connection = { + def expectOutgoing(message: CrdtStreamOut.Message): Connection = { out.expectMsg(CrdtStreamOut(message)) this } - def expectServiceMessage[T](implicit classTag: ClassTag[T]): T = - expectServiceMessageClass(classTag.runtimeClass.asInstanceOf[Class[T]]) + def expectOutgoingMessage[T](implicit classTag: ClassTag[T]): T = + expectOutgoingMessageClass(classTag.runtimeClass.asInstanceOf[Class[T]]) - def expectServiceMessageClass[T](messageClass: Class[T]): T = { + def expectOutgoingMessageClass[T](messageClass: Class[T]): T = { val message = out.expectMsgType[CrdtStreamOut].message assert(messageClass.isInstance(message), s"expected message $messageClass, found ${message.getClass} ($message)") message.asInstanceOf[T] diff --git a/testkit/src/main/scala/io/cloudstate/testkit/eventsourced/InterceptEventSourcedService.scala b/testkit/src/main/scala/io/cloudstate/testkit/eventsourced/InterceptEventSourcedService.scala index cc8f1ed8b..818472bb6 100644 --- a/testkit/src/main/scala/io/cloudstate/testkit/eventsourced/InterceptEventSourcedService.scala +++ b/testkit/src/main/scala/io/cloudstate/testkit/eventsourced/InterceptEventSourcedService.scala @@ -66,27 +66,27 @@ object InterceptEventSourcedService { private[testkit] def inSink: Sink[EventSourcedStreamIn, NotUsed] = Sink.actorRef(in.ref, Complete, Error.apply) private[testkit] def outSink: Sink[EventSourcedStreamOut, NotUsed] = Sink.actorRef(out.ref, Complete, Error.apply) - def expectClient(message: EventSourcedStreamIn.Message): Connection = { + def expectIncoming(message: EventSourcedStreamIn.Message): Connection = { in.expectMsg(EventSourcedStreamIn(message)) this } - def expectClientMessage[T](implicit classTag: ClassTag[T]): T = { + def expectIncomingMessage[T](implicit classTag: ClassTag[T]): T = { val message = in.expectMsgType[EventSourcedStreamIn].message assert(classTag.runtimeClass.isInstance(message), s"expected message ${classTag.runtimeClass}, found ${message.getClass} ($message)") message.asInstanceOf[T] } - def expectService(message: EventSourcedStreamOut.Message): Connection = { + def expectOutgoing(message: EventSourcedStreamOut.Message): Connection = { out.expectMsg(EventSourcedStreamOut(message)) this } - def expectServiceMessage[T](implicit classTag: ClassTag[T]): T = - expectServiceMessageClass(classTag.runtimeClass.asInstanceOf[Class[T]]) + def expectOutgoingMessage[T](implicit classTag: ClassTag[T]): T = + expectOutgoingMessageClass(classTag.runtimeClass.asInstanceOf[Class[T]]) - def expectServiceMessageClass[T](messageClass: Class[T]): T = { + def expectOutgoingMessageClass[T](messageClass: Class[T]): T = { val message = out.expectMsgType[EventSourcedStreamOut].message assert(messageClass.isInstance(message), s"expected message $messageClass, found ${message.getClass} ($message)") message.asInstanceOf[T] diff --git a/testkit/src/main/scala/io/cloudstate/testkit/valueentity/InterceptValueEntityService.scala b/testkit/src/main/scala/io/cloudstate/testkit/valueentity/InterceptValueEntityService.scala index 00471daaf..c159c8b0e 100644 --- a/testkit/src/main/scala/io/cloudstate/testkit/valueentity/InterceptValueEntityService.scala +++ b/testkit/src/main/scala/io/cloudstate/testkit/valueentity/InterceptValueEntityService.scala @@ -68,20 +68,20 @@ object InterceptValueEntityService { private[testkit] def inSink: Sink[ValueEntityStreamIn, NotUsed] = Sink.actorRef(in.ref, Complete, Error.apply) private[testkit] def outSink: Sink[ValueEntityStreamOut, NotUsed] = Sink.actorRef(out.ref, Complete, Error.apply) - def expectClient(message: ValueEntityStreamIn.Message): Connection = { + def expectIncoming(message: ValueEntityStreamIn.Message): Connection = { in.expectMsg(ValueEntityStreamIn(message)) this } - def expectService(message: ValueEntityStreamOut.Message): Connection = { + def expectOutgoing(message: ValueEntityStreamOut.Message): Connection = { out.expectMsg(ValueEntityStreamOut(message)) this } - def expectServiceMessage[T](implicit classTag: ClassTag[T]): T = - expectServiceMessageClass(classTag.runtimeClass.asInstanceOf[Class[T]]) + def expectOutgoingMessage[T](implicit classTag: ClassTag[T]): T = + expectOutgoingMessageClass(classTag.runtimeClass.asInstanceOf[Class[T]]) - def expectServiceMessageClass[T](messageClass: Class[T]): T = { + def expectOutgoingMessageClass[T](messageClass: Class[T]): T = { val message = out.expectMsgType[ValueEntityStreamOut].message assert(messageClass.isInstance(message), s"expected message $messageClass, found ${message.getClass} ($message)") message.asInstanceOf[T]