Skip to content

Commit

Permalink
Fix concurrent modification error in GrpcWebClientChannel.terminate
Browse files Browse the repository at this point in the history
Fixes #331

Co-authored-by: Vyacheslav Egorov <[email protected]>
  • Loading branch information
isaldana and mraleph authored Nov 12, 2020
1 parent f1c4756 commit 275cc54
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 31 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@
newer of protobuf compiler plugin.
* `Client.$createCall` is deprecated because it does not invoke client
interceptors.
* Fix an issue [#380](https://github.com/grpc/grpc-dart/issues/380) causing
* Fix issue [#380](https://github.com/grpc/grpc-dart/issues/380) causing
incorrect duplicated headers in gRPC-Web requests.
* Change minimum required Dart SDK to 2.8 to enable access to Unix domain sockets.
* Add support for Unix domain sockets in `Socket.serve` and `ClientChannel`.
* Fix issue [#331](https://github.com/grpc/grpc-dart/issues/331) causing
an exception in `GrpcWebClientChannel.terminate()`.

## 2.7.0

Expand Down
2 changes: 1 addition & 1 deletion lib/src/client/transport/xhr_transport.dart
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ class XhrClientConnection extends ClientConnection {

@override
Future<void> terminate() async {
for (XhrTransportStream request in _requests) {
for (var request in List.of(_requests)) {
request.terminate();
}
}
Expand Down
105 changes: 76 additions & 29 deletions test/grpc_web_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -11,42 +11,88 @@ import 'package:grpc/grpc_web.dart';
import 'src/generated/echo.pbgrpc.dart';

void main() {
GrpcWebServer server;

setUpAll(() async {
server = await GrpcWebServer.start();
});

tearDownAll(() async {
await server.shutdown();
});

// Test verifies that gRPC-web echo example works by talking to a gRPC
// server (written in Dart) via gRPC-web protocol through a third party
// gRPC-web proxy.
test('gRPC-web echo test', () async {
final server = await GrpcWebServer.start();
try {
final channel = GrpcWebClientChannel.xhr(server.uri);
final service = EchoServiceClient(channel);
final channel = GrpcWebClientChannel.xhr(server.uri);
final service = EchoServiceClient(channel);

const testMessage = 'hello from gRPC-web';
const testMessage = 'hello from gRPC-web';

// First test a simple echo request.
final response = await service.echo(EchoRequest()..message = testMessage);
// First test a simple echo request.
final response = await service.echo(EchoRequest()..message = testMessage);
expect(response.message, equals(testMessage));

// Now test that streaming requests also works by asking echo server
// to send us a number of messages every 100 ms. Check that we receive
// them fast enough (if streaming is broken we will receive all of them
// in one go).
final sw = Stopwatch()..start();
final timings = await service
.serverStreamingEcho(ServerStreamingEchoRequest()
..message = testMessage
..messageCount = 20
..messageInterval = 100)
.map((response) {
expect(response.message, equals(testMessage));
final timing = sw.elapsedMilliseconds;
sw.reset();
return timing;
}).toList();
final maxDelay = timings.reduce(math.max);
expect(maxDelay, lessThan(500));
});

// Verify that terminate does not cause an exception when terminating
// channel with multiple active requests.
test("terminate works", () async {
final channel = GrpcWebClientChannel.xhr(server.uri);
final service = EchoServiceClient(channel);

const testMessage = 'hello from gRPC-web';

// First test a simple echo request.
final response = await service.echo(EchoRequest()..message = testMessage);
expect(response.message, equals(testMessage));

var terminated = false;

service
.serverStreamingEcho(ServerStreamingEchoRequest()
..message = testMessage
..messageCount = 20
..messageInterval = 100)
.listen((response) {
expect(response.message, equals(testMessage));
}, onError: (e) {
expect(terminated, isTrue);
});

service
.serverStreamingEcho(ServerStreamingEchoRequest()
..message = testMessage
..messageCount = 20
..messageInterval = 100)
.listen((response) {
expect(response.message, equals(testMessage));
}, onError: (e) {
expect(terminated, isTrue);
});

// Now test that streaming requests also works by asking echo server
// to send us a number of messages every 100 ms. Check that we receive
// them fast enough (if streaming is broken we will receive all of them
// in one go).
final sw = Stopwatch()..start();
final timings = await service
.serverStreamingEcho(ServerStreamingEchoRequest()
..message = testMessage
..messageCount = 20
..messageInterval = 100)
.map((response) {
expect(response.message, equals(testMessage));
final timing = sw.elapsedMilliseconds;
sw.reset();
return timing;
}).toList();
final maxDelay = timings.reduce(math.max);
expect(maxDelay, lessThan(500));
} finally {
await server.shutdown();
}
await Future.delayed(Duration(milliseconds: 500));
terminated = true;
await channel.terminate();
});
}

Expand All @@ -72,7 +118,8 @@ class GrpcWebServer {
static Future<GrpcWebServer> start() async {
// Spawn the server code on the server side, it will send us back port
// number we should be talking to.
final serverChannel = spawnHybridUri('grpc_web_server.dart');
final serverChannel =
spawnHybridUri('grpc_web_server.dart', stayAlive: true);
final portCompleter = Completer<int>();
final exitCompleter = Completer<void>();
serverChannel.stream.listen((event) {
Expand Down

0 comments on commit 275cc54

Please sign in to comment.