Skip to content

Commit

Permalink
Merge pull request #25 from atsign-foundation/fix-2.3.3
Browse files Browse the repository at this point in the history
fix: improve stability under load
  • Loading branch information
gkc authored Nov 16, 2024
2 parents e9c435a + f7e5cec commit 1453018
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 64 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
## 2.3.3

- fix: ensure serverToSocket handles sockets in strict sequence as they are
accepted
- fix: ensure that, when one side is closed, all data received from that side
has been delivered to the other side before closing the other side

## 2.3.2

- fix: stability under load
Expand Down
156 changes: 93 additions & 63 deletions lib/src/socket_connector.dart
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import 'dart:async';
import 'dart:io';
import 'dart:typed_data';
import 'package:chalkdart/chalk.dart';
import 'package:mutex/mutex.dart';
import 'package:socket_connector/src/types.dart';

/// Typical usage is via the [serverToServer], [serverToSocket],
Expand Down Expand Up @@ -97,6 +98,9 @@ class SocketConnector {
if (closed) {
throw StateError('Connector is closed');
}
unawaited(thisSide.socket.done
.then((v) => _closeSide(thisSide))
.catchError((err) => _closeSide(thisSide)));
if (thisSide.socketAuthVerifier == null) {
thisSide.authenticated = true;
} else {
Expand All @@ -119,7 +123,7 @@ class SocketConnector {
}
if (!thisSide.authenticated) {
_log('Authentication failed on side ${thisSide.name}', force: true);
_destroySide(thisSide);
_closeSide(thisSide);
return;
}

Expand All @@ -136,31 +140,34 @@ class SocketConnector {
'Added connection. There are now ${connections.length} connections.'));

for (final side in [thisSide, thisSide.farSide!]) {
unawaited(side.socket.done
.then((v) => _destroySide(side))
.catchError((err) => _destroySide(side)));
if (side.transformer != null) {
// transformer is there to transform data originating FROM its side
// transformer's output will write to the SOCKET on the far side
StreamController<Uint8List> sc = StreamController<Uint8List>();
side.farSide!.sink = sc;
Stream<List<int>> transformed = side.transformer!(sc.stream);
transformed.listen((data) {
try {
if (side.farSide!.state == SideState.open) {
transformed.listen(
(data) {
try {
side.farSide!.socket.add(data);
} else {
throw StateError(
'Will not write to side ${side.farSide!.name} as its state is ${side.farSide!.state}');
side.farSide!.sent += data.length;
if (side.state == SideState.closed &&
side.rcvd == side.farSide!.sent) {
_closeSide(side.farSide!);
}
} catch (e, st) {
_log('Failed to write to side ${side.farSide!.name} - closing',
force: true);
_log('(Error was $e; Stack trace follows\n$st', force: true);
_closeSide(side.farSide!);
}
} catch (e, st) {
_log('Failed to write to side ${side.farSide!.name} - closing',
force: true);
_log('(Error was $e; Stack trace follows\n$st');
_destroySide(side.farSide!);
}
});
},
onDone: () => _closeSide(side),
onError: (error) => _closeSide(side),
);
}
side.stream.listen((Uint8List data) {
side.rcvd += data.length;
if (logTraffic) {
final message = String.fromCharCodes(data);
if (side.isSideA) {
Expand All @@ -172,34 +179,41 @@ class SocketConnector {
}
}
try {
if (side.farSide!.state == SideState.open) {
side.farSide!.sink.add(data);
} else {
throw StateError(
'Will not write to side ${side.farSide!.name} as its state is ${side.farSide!.state}');
side.farSide!.sink.add(data);
if (side.farSide!.sink is Socket) {
side.farSide!.sent += data.length;
if (side.state == SideState.closed &&
side.rcvd == side.farSide!.sent) {
_closeSide(side.farSide!);
}
}
} catch (e, st) {
_log('Failed to write to side ${side.farSide!.name} - closing',
force: true);
_log('(Error was $e; Stack trace follows\n$st');
_destroySide(side.farSide!);
_log('(Error was $e; Stack trace follows\n$st', force: true);
_closeSide(side.farSide!);
}
}, onDone: () {
_log('stream.onDone on side ${side.name}');
_destroySide(side);
}, onDone: () async {
_log('${side.stream.runtimeType}.onDone on side ${side.name}');
_closeSide(side);
}, onError: (error) {
_log('stream.onError on side ${side.name}: $error', force: true);
_destroySide(side);
_log(
'${side.stream.runtimeType}.onError on side ${side.name}: $error',
force: true);
_closeSide(side);
});
}
}
}

_destroySide(final Side side) {
_closeSide(final Side side) async {
if (side.state != SideState.open) {
return;
}
side.state = SideState.closing;
side.state = SideState.closed;

_log(chalk.brightBlue('_closeSide ${side.name}: RCVD: ${side.rcvd} bytes; SENT: ${side.sent} bytes'));

Connection? connectionToRemove;
for (final c in connections) {
if (c.sideA == side || c.sideB == side) {
Expand All @@ -219,16 +233,24 @@ class SocketConnector {
close();
}
}
side.state = SideState.closed;

try {
_log(chalk.brightBlue('Destroying socket on side ${side.name}'));
await side.socket.flush();
side.socket.destroy();
if (side.farSide != null) {
_log(chalk.brightBlue(
'Destroying socket on far side (${side.farSide?.name})'));
_destroySide(side.farSide!);
if (side.farSide != null && side.farSide!.state != SideState.closed) {
if (side.rcvd == side.farSide!.sent) {
_log(chalk.brightBlue(
'Far side (${side.farSide?.name}) has received all data - will close it'));
_closeSide(side.farSide!);
} else {
_log(chalk.brightBlue(
'Far side (${side.farSide?.name}) has NOT YET received all data'));
}
}
} catch (_) {}
} catch (err) {
_log('_closeSide encountered error $err');
}
}

void close() {
Expand All @@ -243,11 +265,11 @@ class SocketConnector {
_log('closed');
}
for (final s in pendingA) {
_destroySide(s);
_closeSide(s);
}
pendingA.clear();
for (final s in pendingB) {
_destroySide(s);
_closeSide(s);
}
pendingB.clear();
}
Expand Down Expand Up @@ -504,32 +526,40 @@ class SocketConnector {
);

StreamController<Socket> ssc = StreamController();
Mutex m = Mutex();
ssc.stream.listen((sideASocket) async {
Side sideA = Side(sideASocket, true, transformer: transformAtoB);
unawaited(connector.handleSingleConnection(sideA).catchError((err) {
logSink
.writeln('ERROR $err from handleSingleConnection on sideA $sideA');
}));

if (verbose) {
logSink.writeln('Creating socket #${++connections} to the "B" side');
}
// connect to the side 'B' address and port
Socket sideBSocket = await Socket.connect(addressB, portB);
if (verbose) {
logSink.writeln('"B" side socket #$connections created');
}
Side sideB = Side(sideBSocket, false, transformer: transformBtoA);
if (verbose) {
logSink.writeln('Calling the beforeJoining callback');
try {
// It's important we handle these in sequence with no chance for race
// So we're going to use a mutex
await m.acquire();
Side sideA = Side(sideASocket, true, transformer: transformAtoB);
unawaited(connector.handleSingleConnection(sideA).catchError((err) {
logSink.writeln(
'ERROR $err from handleSingleConnection on sideA $sideA');
}));

if (verbose) {
logSink.writeln('Creating socket #${++connections} to the "B" side');
}
// connect to the side 'B' address and port
Socket sideBSocket = await Socket.connect(addressB, portB);
if (verbose) {
logSink.writeln('"B" side socket #$connections created');
}
Side sideB = Side(sideBSocket, false, transformer: transformBtoA);
if (verbose) {
logSink.writeln('Calling the beforeJoining callback');
}
await beforeJoining?.call(sideA, sideB);
unawaited(connector.handleSingleConnection(sideB).catchError((err) {
logSink.writeln(
'ERROR $err from handleSingleConnection on sideB $sideB');
}));

onConnect?.call(sideASocket, sideBSocket);
} finally {
m.release();
}
await beforeJoining?.call(sideA, sideB);
unawaited(connector.handleSingleConnection(sideB).catchError((err) {
logSink
.writeln('ERROR $err from handleSingleConnection on sideB $sideB');
}));

onConnect?.call(sideASocket, sideBSocket);
});

// listen on the local port and connect the inbound socket
Expand Down
6 changes: 6 additions & 0 deletions lib/src/types.dart
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@ class Side {
SocketAuthVerifier? socketAuthVerifier;
DataTransformer? transformer;

/// number of bytes written to this side's socket
int sent = 0;

/// number of bytes received from this side's socket
int rcvd = 0;

String get name => isSideA ? 'A' : 'B';
Side(this.socket, this.isSideA, {this.socketAuthVerifier, this.transformer}) {
sink = socket;
Expand Down
3 changes: 2 additions & 1 deletion pubspec.yaml
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
name: socket_connector
description: Package for joining sockets together to create socket relays.

version: 2.3.2
version: 2.3.3
repository: https://github.com/cconstab/socket_connector

environment:
sdk: '>=3.0.0 <4.0.0'

dependencies:
chalkdart: ^2.2.1
mutex: ^3.1.0

dev_dependencies:
lints: ^3.0.0
Expand Down

0 comments on commit 1453018

Please sign in to comment.