Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: improve stability under load #25

Merged
merged 9 commits into from
Nov 16, 2024
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
## 2.3.3

- fix: ensure serverToSocket handles sockets in strict sequence as they are
accepted
- fix: ensure that, when one side is closed, all data received from that side
has been delivered to the other side before closing the other side

## 2.3.2

- fix: stability under load
Expand Down
156 changes: 93 additions & 63 deletions lib/src/socket_connector.dart
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import 'dart:async';
import 'dart:io';
import 'dart:typed_data';
import 'package:chalkdart/chalk.dart';
import 'package:mutex/mutex.dart';
import 'package:socket_connector/src/types.dart';

/// Typical usage is via the [serverToServer], [serverToSocket],
Expand Down Expand Up @@ -97,6 +98,9 @@ class SocketConnector {
if (closed) {
throw StateError('Connector is closed');
}
unawaited(thisSide.socket.done
.then((v) => _closeSide(thisSide))
.catchError((err) => _closeSide(thisSide)));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As soon as we get reference to the socket, ensure there will be no uncaught exceptions

if (thisSide.socketAuthVerifier == null) {
thisSide.authenticated = true;
} else {
Expand All @@ -119,7 +123,7 @@ class SocketConnector {
}
if (!thisSide.authenticated) {
_log('Authentication failed on side ${thisSide.name}', force: true);
_destroySide(thisSide);
_closeSide(thisSide);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

renamed _destroySide to _closeSide

return;
}

Expand All @@ -136,31 +140,34 @@ class SocketConnector {
'Added connection. There are now ${connections.length} connections.'));

for (final side in [thisSide, thisSide.farSide!]) {
unawaited(side.socket.done
.then((v) => _destroySide(side))
.catchError((err) => _destroySide(side)));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved up the start of the handleSingleConnection function

if (side.transformer != null) {
// transformer is there to transform data originating FROM its side
// transformer's output will write to the SOCKET on the far side
StreamController<Uint8List> sc = StreamController<Uint8List>();
side.farSide!.sink = sc;
Stream<List<int>> transformed = side.transformer!(sc.stream);
transformed.listen((data) {
try {
if (side.farSide!.state == SideState.open) {
transformed.listen(
(data) {
try {
side.farSide!.socket.add(data);
} else {
throw StateError(
'Will not write to side ${side.farSide!.name} as its state is ${side.farSide!.state}');
side.farSide!.sent += data.length;
if (side.state == SideState.closed &&
side.rcvd == side.farSide!.sent) {
_closeSide(side.farSide!);
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Re the above code: Since the transformer stream may not have written everything to the other side's socket before this side's socket closes, we keep track of number of bytes sent, and, if this side has been closed, will call _closeSide on the far side which will result in both side's sockets being cleaned up correctly

} catch (e, st) {
_log('Failed to write to side ${side.farSide!.name} - closing',
force: true);
_log('(Error was $e; Stack trace follows\n$st', force: true);
_closeSide(side.farSide!);
}
} catch (e, st) {
_log('Failed to write to side ${side.farSide!.name} - closing',
force: true);
_log('(Error was $e; Stack trace follows\n$st');
_destroySide(side.farSide!);
}
});
},
onDone: () => _closeSide(side),
onError: (error) => _closeSide(side),
);
}
side.stream.listen((Uint8List data) {
side.rcvd += data.length;
if (logTraffic) {
final message = String.fromCharCodes(data);
if (side.isSideA) {
Expand All @@ -172,34 +179,41 @@ class SocketConnector {
}
}
try {
if (side.farSide!.state == SideState.open) {
side.farSide!.sink.add(data);
} else {
throw StateError(
'Will not write to side ${side.farSide!.name} as its state is ${side.farSide!.state}');
side.farSide!.sink.add(data);
if (side.farSide!.sink is Socket) {
side.farSide!.sent += data.length;
if (side.state == SideState.closed &&
side.rcvd == side.farSide!.sent) {
_closeSide(side.farSide!);
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code is here because sometimes we will have a transformer for writing to the far side, in which case the check on sent / rcvd happens in the transformer's listen block above; but if there is no transformer (i.e. the farSide's sink is a Socket) then we do the same check here

}
} catch (e, st) {
_log('Failed to write to side ${side.farSide!.name} - closing',
force: true);
_log('(Error was $e; Stack trace follows\n$st');
_destroySide(side.farSide!);
_log('(Error was $e; Stack trace follows\n$st', force: true);
_closeSide(side.farSide!);
}
}, onDone: () {
_log('stream.onDone on side ${side.name}');
_destroySide(side);
}, onDone: () async {
_log('${side.stream.runtimeType}.onDone on side ${side.name}');
_closeSide(side);
}, onError: (error) {
_log('stream.onError on side ${side.name}: $error', force: true);
_destroySide(side);
_log(
'${side.stream.runtimeType}.onError on side ${side.name}: $error',
force: true);
_closeSide(side);
});
}
}
}

_destroySide(final Side side) {
_closeSide(final Side side) async {
if (side.state != SideState.open) {
return;
}
side.state = SideState.closing;
side.state = SideState.closed;

_log(chalk.brightBlue('_closeSide ${side.name}: RCVD: ${side.rcvd} bytes; SENT: ${side.sent} bytes'));

Connection? connectionToRemove;
for (final c in connections) {
if (c.sideA == side || c.sideB == side) {
Expand All @@ -219,16 +233,24 @@ class SocketConnector {
close();
}
}
side.state = SideState.closed;

try {
_log(chalk.brightBlue('Destroying socket on side ${side.name}'));
await side.socket.flush();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes indeed, because you have to do this. Doh.

side.socket.destroy();
if (side.farSide != null) {
_log(chalk.brightBlue(
'Destroying socket on far side (${side.farSide?.name})'));
_destroySide(side.farSide!);
if (side.farSide != null && side.farSide!.state != SideState.closed) {
if (side.rcvd == side.farSide!.sent) {
_log(chalk.brightBlue(
'Far side (${side.farSide?.name}) has received all data - will close it'));
_closeSide(side.farSide!);
} else {
_log(chalk.brightBlue(
'Far side (${side.farSide?.name}) has NOT YET received all data'));
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check that all data received from this side has been delivered to the far side before calling _closeSide(farSide). Since sometimes, because of this check, _closeSide will not be called here, blocks of code in the listen onData functions above perform the same check when writing data to the far side's sink; the combination of all of this guarantees that all data is always written to the other side before closing, and guarantees that the sides are always closed (and thus their sockets flushed and destroyed) once all data has been written.

}
} catch (_) {}
} catch (err) {
_log('_closeSide encountered error $err');
}
}

void close() {
Expand All @@ -243,11 +265,11 @@ class SocketConnector {
_log('closed');
}
for (final s in pendingA) {
_destroySide(s);
_closeSide(s);
}
pendingA.clear();
for (final s in pendingB) {
_destroySide(s);
_closeSide(s);
}
pendingB.clear();
}
Expand Down Expand Up @@ -504,32 +526,40 @@ class SocketConnector {
);

StreamController<Socket> ssc = StreamController();
Mutex m = Mutex();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use a mutex to ensure that new connections to the bound ServerSocket are handled atomically, in strict sequence.

ssc.stream.listen((sideASocket) async {
Side sideA = Side(sideASocket, true, transformer: transformAtoB);
unawaited(connector.handleSingleConnection(sideA).catchError((err) {
logSink
.writeln('ERROR $err from handleSingleConnection on sideA $sideA');
}));

if (verbose) {
logSink.writeln('Creating socket #${++connections} to the "B" side');
}
// connect to the side 'B' address and port
Socket sideBSocket = await Socket.connect(addressB, portB);
if (verbose) {
logSink.writeln('"B" side socket #$connections created');
}
Side sideB = Side(sideBSocket, false, transformer: transformBtoA);
if (verbose) {
logSink.writeln('Calling the beforeJoining callback');
try {
// It's important we handle these in sequence with no chance for race
// So we're going to use a mutex
await m.acquire();
Side sideA = Side(sideASocket, true, transformer: transformAtoB);
unawaited(connector.handleSingleConnection(sideA).catchError((err) {
logSink.writeln(
'ERROR $err from handleSingleConnection on sideA $sideA');
}));

if (verbose) {
logSink.writeln('Creating socket #${++connections} to the "B" side');
}
// connect to the side 'B' address and port
Socket sideBSocket = await Socket.connect(addressB, portB);
if (verbose) {
logSink.writeln('"B" side socket #$connections created');
}
Side sideB = Side(sideBSocket, false, transformer: transformBtoA);
if (verbose) {
logSink.writeln('Calling the beforeJoining callback');
}
await beforeJoining?.call(sideA, sideB);
unawaited(connector.handleSingleConnection(sideB).catchError((err) {
logSink.writeln(
'ERROR $err from handleSingleConnection on sideB $sideB');
}));

onConnect?.call(sideASocket, sideBSocket);
} finally {
m.release();
}
await beforeJoining?.call(sideA, sideB);
unawaited(connector.handleSingleConnection(sideB).catchError((err) {
logSink
.writeln('ERROR $err from handleSingleConnection on sideB $sideB');
}));

onConnect?.call(sideASocket, sideBSocket);
});

// listen on the local port and connect the inbound socket
Expand Down
6 changes: 6 additions & 0 deletions lib/src/types.dart
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@ class Side {
SocketAuthVerifier? socketAuthVerifier;
DataTransformer? transformer;

/// number of bytes written to this side's socket
int sent = 0;

/// number of bytes received from this side's socket
int rcvd = 0;

String get name => isSideA ? 'A' : 'B';
Side(this.socket, this.isSideA, {this.socketAuthVerifier, this.transformer}) {
sink = socket;
Expand Down
3 changes: 2 additions & 1 deletion pubspec.yaml
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
name: socket_connector
description: Package for joining sockets together to create socket relays.

version: 2.3.2
version: 2.3.3
repository: https://github.com/cconstab/socket_connector

environment:
sdk: '>=3.0.0 <4.0.0'

dependencies:
chalkdart: ^2.2.1
mutex: ^3.1.0

dev_dependencies:
lints: ^3.0.0
Expand Down