From 193a5294ea0a1b36818797063142cee137e35967 Mon Sep 17 00:00:00 2001 From: gkc Date: Fri, 8 Nov 2024 13:06:35 +0000 Subject: [PATCH 1/3] Changes for v2.3.0 to fix issues seen with highly concurrent creation of a large number of sockets - feat: Add `authTimeout` property to SocketConnector. Previously this was - hard-coded to 5 seconds which is a bit restrictive for slow network connections. It now defaults to 30 seconds but may be set at time of construction - feat: added `authTimeout` parameter to `serverToServer`; this is provided to the SocketConnector constructor - feat: added `backlog` parameter to `serverToServer`; this is provided to the ServerSocket.bind call - feat: added `backlog` parameter to `serverToSocket`; this is provided to the ServerSocket.bind call - fix: reordered some code in `_destroySide` so it handles internal state before trying to destroy sockets - fix: added `catchError` blocks for everywhere we're calling `handleSingleConnection` when wrapped in `unawaited` - More logging (when in verbose mode) --- CHANGELOG.md | 39 +++++++-- lib/src/socket_connector.dart | 154 ++++++++++++++++++++++++---------- pubspec.yaml | 2 +- 3 files changed, 145 insertions(+), 50 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a6eab1b..d7809a0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,21 @@ +## 2.3.0 + +- feat: Add `authTimeout` property to SocketConnector. Previously this was +- hard-coded to 5 seconds which is a bit restrictive for slow network + connections. It now defaults to 30 seconds but may be set at time of + construction +- feat: added `authTimeout` parameter to `serverToServer`; this is provided + to the SocketConnector constructor +- feat: added `backlog` parameter to `serverToServer`; this is provided to + the ServerSocket.bind call +- feat: added `backlog` parameter to `serverToSocket`; this is provided to + the ServerSocket.bind call +- fix: reordered some code in `_destroySide` so it handles internal state + before trying to destroy sockets +- fix: added `catchError` blocks for everywhere we're calling + `handleSingleConnection` when wrapped in `unawaited` +- More logging (when in verbose mode) + ## 2.2.0 - feat: Enhance serverToSocket adding optional parameter `beforeJoining` which @@ -5,6 +23,7 @@ different transformers can be used for each socket pair. ## 2.1.0 + - Added `multi` parameter to `SocketConnector.serverToSocket` - whether to create new connections on the "B" side every time there is a new "A" side connection to the bound server port. Also added `onConnect` parameter, @@ -15,12 +34,14 @@ is zero or has dropped to zero ## 2.0.1 + - Removed an unnecessary dependency ## 2.0.0 -- Added support for requiring client sockets to be authenticated in some + +- Added support for requiring client sockets to be authenticated in some app-defined way before they will be connected to the other side -- Added support for app-defined data transformers which can be used to +- Added support for app-defined data transformers which can be used to transform the data while sending from A to B, and vice versa. Useful for adding traffic encryption, for example. - Refactored for readability @@ -29,32 +50,40 @@ - More tests ## 1.0.11 + - Added close function to SocketConnector ## 1.0.10 + - Small format error to get to 140/140 on pub.dev ## 1.0.9 + - Improved network throughput of socket_connector ## 1.0.8 - Added connection timeout if only one side connects + ## 1.0.7 - fix change log + ## 1.0.6 -- Bug fix with cloing sockets. +- Bug fix with closing sockets. + ## 1.0.5 - Ready for isolates + ## 1.0.4 -- Formated with dart format 140/140 (I hope) +- Formatted with dart format 140/140 (I hope) + ## 1.0.3 -- Included dart docs and formated with dart format +- Included dart docs and formatted with dart format ## 1.0.2 diff --git a/lib/src/socket_connector.dart b/lib/src/socket_connector.dart index d5ae49c..b100a20 100644 --- a/lib/src/socket_connector.dart +++ b/lib/src/socket_connector.dart @@ -27,6 +27,7 @@ class SocketConnector { this.verbose = false, this.logTraffic = false, this.timeout = defaultTimeout, + this.authTimeout = defaultTimeout, IOSink? logger, }) { this.logger = logger ?? stderr; @@ -82,6 +83,9 @@ class SocketConnector { final Completer _closedCompleter = Completer(); + /// How long to wait for a client to authenticate its self + final Duration authTimeout; + /// Add a [Side] with optional [SocketAuthVerifier] and /// [DataTransformer] /// - If [socketAuthVerifier] provided, wait for socket to be authenticated @@ -101,7 +105,7 @@ class SocketConnector { try { (authenticated, stream) = await thisSide.socketAuthVerifier! (thisSide.socket) - .timeout(Duration(seconds: 5)); + .timeout(authTimeout); thisSide.authenticated = authenticated; if (thisSide.authenticated) { thisSide.stream = stream!; @@ -128,6 +132,7 @@ class SocketConnector { if (pendingA.isNotEmpty && pendingB.isNotEmpty) { Connection c = Connection(pendingA.removeAt(0), pendingB.removeAt(0)); connections.add(c); + _log(chalk.brightBlue('Added connection. There are now ${connections.length} connections.')); for (final side in [thisSide, thisSide.farSide!]) { if (side.transformer != null) { @@ -165,37 +170,34 @@ class SocketConnector { return; } side.state = SideState.closing; + Connection? connectionToRemove; + for (final c in connections) { + if (c.sideA == side || c.sideB == side) { + _log(chalk.brightBlue('Will remove established connection')); + connectionToRemove = c; + break; + } + } + if (connectionToRemove != null) { + connections.remove(connectionToRemove); + _log(chalk.brightBlue('Removed connection. ${connections.length} remaining.')); + if (connections.isEmpty && gracePeriodPassed) { + _log(chalk.brightBlue('No established connections remain' + ' and grace period has passed - ' + ' will close connector')); + close(); + } + } + side.state = SideState.closed; try { _log(chalk.brightBlue('Destroying socket on side ${side.name}')); side.socket.destroy(); if (side.farSide != null) { _log(chalk.brightBlue( 'Destroying socket on far side (${side.farSide?.name})')); - side.farSide?.socket.destroy(); - } - - Connection? connectionToRemove; - for (final c in connections) { - if (c.sideA == side || c.sideB == side) { - _log(chalk.brightBlue('Will remove established connection')); - connectionToRemove = c; - break; - } + _destroySide(side.farSide!); } - if (connectionToRemove != null) { - connections.remove(connectionToRemove); - _log(chalk.brightBlue('Removed connection')); - if (connections.isEmpty && gracePeriodPassed) { - _log(chalk.brightBlue('No established connections remain' - ' and grace period has passed - ' - ' will close connector')); - close(); - } - } - } catch (_) { - } finally { - side.state = SideState.closed; - } + } catch (_) {} } void close() { @@ -241,7 +243,9 @@ class SocketConnector { SocketAuthVerifier? socketAuthVerifierA, SocketAuthVerifier? socketAuthVerifierB, Duration timeout = SocketConnector.defaultTimeout, + Duration authTimeout = SocketConnector.defaultTimeout, IOSink? logger, + int backlog = 0, }) async { IOSink logSink = logger ?? stderr; addressA ??= InternetAddress.anyIPv4; @@ -251,10 +255,19 @@ class SocketConnector { verbose: verbose, logTraffic: logTraffic, timeout: timeout, + authTimeout: authTimeout, logger: logSink, ); - connector._serverSocketA = await ServerSocket.bind(addressA, portA); - connector._serverSocketB = await ServerSocket.bind(addressB, portB); + connector._serverSocketA = await ServerSocket.bind( + addressA, + portA, + backlog: backlog, + ); + connector._serverSocketB = await ServerSocket.bind( + addressB, + portB, + backlog: backlog, + ); if (verbose) { logSink.writeln( '${DateTime.now()} | serverToServer | Bound ports A: ${connector.sideAPort}, B: ${connector.sideBPort}'); @@ -269,7 +282,17 @@ class SocketConnector { '${DateTime.now()} | serverToServer | Connection on serverSocketA: ${connector._serverSocketA!.port}'); } Side sideA = Side(socket, true, socketAuthVerifier: socketAuthVerifierA); - unawaited(connector.handleSingleConnection(sideA)); + unawaited(connector.handleSingleConnection(sideA).catchError((err) { + logSink.writeln('ERROR $err from handleSingleConnection on sideA $sideA'); + })); + }, onError: (error) { + logSink.writeln( + '${DateTime.now()} | serverToServer | ERROR on serverSocketA: ${connector._serverSocketA!.port} : $error'); + connector.close(); + }, onDone: () { + logSink.writeln( + '${DateTime.now()} | serverToServer | onDone called on serverSocketA: ${connector._serverSocketA!.port}'); + connector.close(); }); // listen for connections to the side 'B' server @@ -279,7 +302,17 @@ class SocketConnector { '${DateTime.now()} | serverToServer | Connection on serverSocketB: ${connector._serverSocketB!.port}'); } Side sideB = Side(socket, false, socketAuthVerifier: socketAuthVerifierB); - unawaited(connector.handleSingleConnection(sideB)); + unawaited(connector.handleSingleConnection(sideB).catchError((err) { + logSink.writeln('ERROR $err from handleSingleConnection on sideB $sideB'); + })); + }, onError: (error) { + logSink.writeln( + '${DateTime.now()} | serverToServer | ERROR on serverSocketB: ${connector._serverSocketB!.port} : $error'); + connector.close(); + }, onDone: () { + logSink.writeln( + '${DateTime.now()} | serverToServer | onDone called on serverSocketB: ${connector._serverSocketB!.port}'); + connector.close(); }); return (connector); @@ -319,7 +352,9 @@ class SocketConnector { // Create socket to an address and port Socket socket = await Socket.connect(addressA, portA); Side sideA = Side(socket, true, transformer: transformAtoB); - unawaited(connector.handleSingleConnection(sideA)); + unawaited(connector.handleSingleConnection(sideA).catchError((err) { + logSink.writeln('ERROR $err from handleSingleConnection on sideA $sideA'); + })); // bind to side 'B' port connector._serverSocketB = await ServerSocket.bind(addressB, portB); @@ -327,7 +362,9 @@ class SocketConnector { // listen for connections to the 'B' side port connector._serverSocketB?.listen((socketB) { Side sideB = Side(socketB, false, transformer: transformBtoA); - unawaited(connector.handleSingleConnection(sideB)); + unawaited(connector.handleSingleConnection(sideB).catchError((err) { + logSink.writeln('ERROR $err from handleSingleConnection on sideB $sideB'); + })); }); return (connector); } @@ -361,14 +398,18 @@ class SocketConnector { } Socket sideASocket = await Socket.connect(addressA, portA); Side sideA = Side(sideASocket, true, transformer: transformAtoB); - unawaited(connector.handleSingleConnection(sideA)); + unawaited(connector.handleSingleConnection(sideA).catchError((err) { + logSink.writeln('ERROR $err from handleSingleConnection on sideA $sideA'); + })); if (verbose) { logSink.writeln('socket_connector: Connecting to $addressB:$portB'); } Socket sideBSocket = await Socket.connect(addressB, portB); Side sideB = Side(sideBSocket, false, transformer: transformBtoA); - unawaited(connector.handleSingleConnection(sideB)); + unawaited(connector.handleSingleConnection(sideB).catchError((err) { + logSink.writeln('ERROR $err from handleSingleConnection on sideB $sideB'); + })); if (verbose) { logSink.writeln('socket_connector: started'); @@ -408,7 +449,8 @@ class SocketConnector { bool multi = false, @Deprecated("use beforeJoining instead") Function(Socket socketA, Socket socketB)? onConnect, - Function(Side sideA, Side sideB)? beforeJoining}) async { + Function(Side sideA, Side sideB)? beforeJoining, + int backlog = 0}) async { IOSink logSink = logger ?? stderr; addressA ??= InternetAddress.anyIPv4; @@ -421,27 +463,51 @@ class SocketConnector { int connections = 0; // bind to a local port for side 'A' - connector._serverSocketA = await ServerSocket.bind(addressA, portA); - // listen on the local port and connect the inbound socket - connector._serverSocketA?.listen((sideASocket) async { - if (!multi) { - unawaited(connector._serverSocketA?.close()); - } + connector._serverSocketA = await ServerSocket.bind( + addressA, + portA, + backlog: backlog, + ); + + StreamController ssc = StreamController(); + ssc.stream.listen((sideASocket) async { Side sideA = Side(sideASocket, true, transformer: transformAtoB); - unawaited(connector.handleSingleConnection(sideA)); + unawaited(connector.handleSingleConnection(sideA).catchError((err) { + logSink.writeln('ERROR $err from handleSingleConnection on sideA $sideA'); + })); if (verbose) { - logSink.writeln('Making connection ${++connections} to the "B" side'); + logSink.writeln('Creating socket #${++connections} to the "B" side'); } // connect to the side 'B' address and port Socket sideBSocket = await Socket.connect(addressB, portB); + if (verbose) { + logSink.writeln('"B" side socket #$connections created'); + } Side sideB = Side(sideBSocket, false, transformer: transformBtoA); - beforeJoining?.call(sideA, sideB); - unawaited(connector.handleSingleConnection(sideB)); + if (verbose) { + logSink.writeln('Calling the beforeJoining callback'); + } + await beforeJoining?.call(sideA, sideB); + unawaited(connector.handleSingleConnection(sideB).catchError((err) { + logSink.writeln('ERROR $err from handleSingleConnection on sideB $sideB'); + })); onConnect?.call(sideASocket, sideBSocket); }); + // listen on the local port and connect the inbound socket + connector._serverSocketA?.listen((sideASocket) { + if (!multi) { + try { + connector._serverSocketA?.close(); + } catch (e) { + logSink.writeln('Error while closing serverSocketA: $e'); + } + } + ssc.add(sideASocket); + }); + return (connector); } } diff --git a/pubspec.yaml b/pubspec.yaml index 6731a02..b211f99 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -1,7 +1,7 @@ name: socket_connector description: Package for joining sockets together to create socket relays. -version: 2.2.0 +version: 2.3.0 repository: https://github.com/cconstab/socket_connector environment: From f3895b54e794631e459a635a0fe11830a8080da8 Mon Sep 17 00:00:00 2001 From: gkc Date: Fri, 8 Nov 2024 13:19:36 +0000 Subject: [PATCH 2/3] fix: fix some log messages --- lib/src/socket_connector.dart | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/lib/src/socket_connector.dart b/lib/src/socket_connector.dart index b100a20..8a9223d 100644 --- a/lib/src/socket_connector.dart +++ b/lib/src/socket_connector.dart @@ -287,11 +287,11 @@ class SocketConnector { })); }, onError: (error) { logSink.writeln( - '${DateTime.now()} | serverToServer | ERROR on serverSocketA: ${connector._serverSocketA!.port} : $error'); + '${DateTime.now()} | serverToServer | ERROR on serverSocketA: ${connector._serverSocketA?.port} : $error'); connector.close(); }, onDone: () { logSink.writeln( - '${DateTime.now()} | serverToServer | onDone called on serverSocketA: ${connector._serverSocketA!.port}'); + '${DateTime.now()} | serverToServer | onDone called on serverSocketA: ${connector._serverSocketA?.port}'); connector.close(); }); @@ -307,11 +307,11 @@ class SocketConnector { })); }, onError: (error) { logSink.writeln( - '${DateTime.now()} | serverToServer | ERROR on serverSocketB: ${connector._serverSocketB!.port} : $error'); + '${DateTime.now()} | serverToServer | ERROR on serverSocketB: ${connector._serverSocketB?.port} : $error'); connector.close(); }, onDone: () { logSink.writeln( - '${DateTime.now()} | serverToServer | onDone called on serverSocketB: ${connector._serverSocketB!.port}'); + '${DateTime.now()} | serverToServer | onDone called on serverSocketB: ${connector._serverSocketB?.port}'); connector.close(); }); From 4ff7ab9d6a2db10d77e979a482c93c55e0c42ce4 Mon Sep 17 00:00:00 2001 From: gkc Date: Fri, 8 Nov 2024 13:20:03 +0000 Subject: [PATCH 3/3] chore: run dart format --- lib/src/socket_connector.dart | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/lib/src/socket_connector.dart b/lib/src/socket_connector.dart index 8a9223d..52baa0c 100644 --- a/lib/src/socket_connector.dart +++ b/lib/src/socket_connector.dart @@ -132,7 +132,8 @@ class SocketConnector { if (pendingA.isNotEmpty && pendingB.isNotEmpty) { Connection c = Connection(pendingA.removeAt(0), pendingB.removeAt(0)); connections.add(c); - _log(chalk.brightBlue('Added connection. There are now ${connections.length} connections.')); + _log(chalk.brightBlue( + 'Added connection. There are now ${connections.length} connections.')); for (final side in [thisSide, thisSide.farSide!]) { if (side.transformer != null) { @@ -180,7 +181,8 @@ class SocketConnector { } if (connectionToRemove != null) { connections.remove(connectionToRemove); - _log(chalk.brightBlue('Removed connection. ${connections.length} remaining.')); + _log(chalk + .brightBlue('Removed connection. ${connections.length} remaining.')); if (connections.isEmpty && gracePeriodPassed) { _log(chalk.brightBlue('No established connections remain' ' and grace period has passed - ' @@ -283,7 +285,8 @@ class SocketConnector { } Side sideA = Side(socket, true, socketAuthVerifier: socketAuthVerifierA); unawaited(connector.handleSingleConnection(sideA).catchError((err) { - logSink.writeln('ERROR $err from handleSingleConnection on sideA $sideA'); + logSink + .writeln('ERROR $err from handleSingleConnection on sideA $sideA'); })); }, onError: (error) { logSink.writeln( @@ -303,7 +306,8 @@ class SocketConnector { } Side sideB = Side(socket, false, socketAuthVerifier: socketAuthVerifierB); unawaited(connector.handleSingleConnection(sideB).catchError((err) { - logSink.writeln('ERROR $err from handleSingleConnection on sideB $sideB'); + logSink + .writeln('ERROR $err from handleSingleConnection on sideB $sideB'); })); }, onError: (error) { logSink.writeln( @@ -363,7 +367,8 @@ class SocketConnector { connector._serverSocketB?.listen((socketB) { Side sideB = Side(socketB, false, transformer: transformBtoA); unawaited(connector.handleSingleConnection(sideB).catchError((err) { - logSink.writeln('ERROR $err from handleSingleConnection on sideB $sideB'); + logSink + .writeln('ERROR $err from handleSingleConnection on sideB $sideB'); })); }); return (connector); @@ -473,7 +478,8 @@ class SocketConnector { ssc.stream.listen((sideASocket) async { Side sideA = Side(sideASocket, true, transformer: transformAtoB); unawaited(connector.handleSingleConnection(sideA).catchError((err) { - logSink.writeln('ERROR $err from handleSingleConnection on sideA $sideA'); + logSink + .writeln('ERROR $err from handleSingleConnection on sideA $sideA'); })); if (verbose) { @@ -490,7 +496,8 @@ class SocketConnector { } await beforeJoining?.call(sideA, sideB); unawaited(connector.handleSingleConnection(sideB).catchError((err) { - logSink.writeln('ERROR $err from handleSingleConnection on sideB $sideB'); + logSink + .writeln('ERROR $err from handleSingleConnection on sideB $sideB'); })); onConnect?.call(sideASocket, sideBSocket);