From 275cc544c933e9fc9c9f4b1e1d2f63d99bd7532c Mon Sep 17 00:00:00 2001 From: Isaac Saldana Date: Thu, 12 Nov 2020 03:52:04 -0800 Subject: [PATCH] Fix concurrent modification error in GrpcWebClientChannel.terminate Fixes #331 Co-authored-by: Vyacheslav Egorov --- CHANGELOG.md | 4 +- lib/src/client/transport/xhr_transport.dart | 2 +- test/grpc_web_test.dart | 105 ++++++++++++++------ 3 files changed, 80 insertions(+), 31 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3c5fc076..9455dab7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,10 +7,12 @@ newer of protobuf compiler plugin. * `Client.$createCall` is deprecated because it does not invoke client interceptors. -* Fix an issue [#380](https://github.com/grpc/grpc-dart/issues/380) causing +* Fix issue [#380](https://github.com/grpc/grpc-dart/issues/380) causing incorrect duplicated headers in gRPC-Web requests. * Change minimum required Dart SDK to 2.8 to enable access to Unix domain sockets. * Add support for Unix domain sockets in `Socket.serve` and `ClientChannel`. +* Fix issue [#331](https://github.com/grpc/grpc-dart/issues/331) causing + an exception in `GrpcWebClientChannel.terminate()`. ## 2.7.0 diff --git a/lib/src/client/transport/xhr_transport.dart b/lib/src/client/transport/xhr_transport.dart index afb65be4..4150e3cd 100644 --- a/lib/src/client/transport/xhr_transport.dart +++ b/lib/src/client/transport/xhr_transport.dart @@ -214,7 +214,7 @@ class XhrClientConnection extends ClientConnection { @override Future terminate() async { - for (XhrTransportStream request in _requests) { + for (var request in List.of(_requests)) { request.terminate(); } } diff --git a/test/grpc_web_test.dart b/test/grpc_web_test.dart index 604baf96..7b347bcd 100644 --- a/test/grpc_web_test.dart +++ b/test/grpc_web_test.dart @@ -11,42 +11,88 @@ import 'package:grpc/grpc_web.dart'; import 'src/generated/echo.pbgrpc.dart'; void main() { + GrpcWebServer server; + + setUpAll(() async { + server = await GrpcWebServer.start(); + }); + + tearDownAll(() async { + await server.shutdown(); + }); + // Test verifies that gRPC-web echo example works by talking to a gRPC // server (written in Dart) via gRPC-web protocol through a third party // gRPC-web proxy. test('gRPC-web echo test', () async { - final server = await GrpcWebServer.start(); - try { - final channel = GrpcWebClientChannel.xhr(server.uri); - final service = EchoServiceClient(channel); + final channel = GrpcWebClientChannel.xhr(server.uri); + final service = EchoServiceClient(channel); - const testMessage = 'hello from gRPC-web'; + const testMessage = 'hello from gRPC-web'; - // First test a simple echo request. - final response = await service.echo(EchoRequest()..message = testMessage); + // First test a simple echo request. + final response = await service.echo(EchoRequest()..message = testMessage); + expect(response.message, equals(testMessage)); + + // Now test that streaming requests also works by asking echo server + // to send us a number of messages every 100 ms. Check that we receive + // them fast enough (if streaming is broken we will receive all of them + // in one go). + final sw = Stopwatch()..start(); + final timings = await service + .serverStreamingEcho(ServerStreamingEchoRequest() + ..message = testMessage + ..messageCount = 20 + ..messageInterval = 100) + .map((response) { expect(response.message, equals(testMessage)); + final timing = sw.elapsedMilliseconds; + sw.reset(); + return timing; + }).toList(); + final maxDelay = timings.reduce(math.max); + expect(maxDelay, lessThan(500)); + }); + + // Verify that terminate does not cause an exception when terminating + // channel with multiple active requests. + test("terminate works", () async { + final channel = GrpcWebClientChannel.xhr(server.uri); + final service = EchoServiceClient(channel); + + const testMessage = 'hello from gRPC-web'; + + // First test a simple echo request. + final response = await service.echo(EchoRequest()..message = testMessage); + expect(response.message, equals(testMessage)); + + var terminated = false; + + service + .serverStreamingEcho(ServerStreamingEchoRequest() + ..message = testMessage + ..messageCount = 20 + ..messageInterval = 100) + .listen((response) { + expect(response.message, equals(testMessage)); + }, onError: (e) { + expect(terminated, isTrue); + }); + + service + .serverStreamingEcho(ServerStreamingEchoRequest() + ..message = testMessage + ..messageCount = 20 + ..messageInterval = 100) + .listen((response) { + expect(response.message, equals(testMessage)); + }, onError: (e) { + expect(terminated, isTrue); + }); - // Now test that streaming requests also works by asking echo server - // to send us a number of messages every 100 ms. Check that we receive - // them fast enough (if streaming is broken we will receive all of them - // in one go). - final sw = Stopwatch()..start(); - final timings = await service - .serverStreamingEcho(ServerStreamingEchoRequest() - ..message = testMessage - ..messageCount = 20 - ..messageInterval = 100) - .map((response) { - expect(response.message, equals(testMessage)); - final timing = sw.elapsedMilliseconds; - sw.reset(); - return timing; - }).toList(); - final maxDelay = timings.reduce(math.max); - expect(maxDelay, lessThan(500)); - } finally { - await server.shutdown(); - } + await Future.delayed(Duration(milliseconds: 500)); + terminated = true; + await channel.terminate(); }); } @@ -72,7 +118,8 @@ class GrpcWebServer { static Future start() async { // Spawn the server code on the server side, it will send us back port // number we should be talking to. - final serverChannel = spawnHybridUri('grpc_web_server.dart'); + final serverChannel = + spawnHybridUri('grpc_web_server.dart', stayAlive: true); final portCompleter = Completer(); final exitCompleter = Completer(); serverChannel.stream.listen((event) {