Skip to content

Commit

Permalink
Merge pull request #21 from atsign-foundation/gkc/2.3.0
Browse files Browse the repository at this point in the history
  • Loading branch information
gkc authored Nov 11, 2024
2 parents 1304018 + 4ff7ab9 commit a0e4285
Show file tree
Hide file tree
Showing 3 changed files with 152 additions and 50 deletions.
39 changes: 34 additions & 5 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,29 @@
## 2.3.0

- feat: Add `authTimeout` property to SocketConnector. Previously this was
- hard-coded to 5 seconds which is a bit restrictive for slow network
connections. It now defaults to 30 seconds but may be set at time of
construction
- feat: added `authTimeout` parameter to `serverToServer`; this is provided
to the SocketConnector constructor
- feat: added `backlog` parameter to `serverToServer`; this is provided to
the ServerSocket.bind call
- feat: added `backlog` parameter to `serverToSocket`; this is provided to
the ServerSocket.bind call
- fix: reordered some code in `_destroySide` so it handles internal state
before trying to destroy sockets
- fix: added `catchError` blocks for everywhere we're calling
`handleSingleConnection` when wrapped in `unawaited`
- More logging (when in verbose mode)

## 2.2.0

- feat: Enhance serverToSocket adding optional parameter `beforeJoining` which
will be called before a socket pair is actually joined together. Thus,
different transformers can be used for each socket pair.

## 2.1.0

- Added `multi` parameter to `SocketConnector.serverToSocket` - whether to
create new connections on the "B" side every time there is a new "A" side
connection to the bound server port. Also added `onConnect` parameter,
Expand All @@ -15,12 +34,14 @@
is zero or has dropped to zero

## 2.0.1

- Removed an unnecessary dependency

## 2.0.0
- Added support for requiring client sockets to be authenticated in some

- Added support for requiring client sockets to be authenticated in some
app-defined way before they will be connected to the other side
- Added support for app-defined data transformers which can be used to
- Added support for app-defined data transformers which can be used to
transform the data while sending from A to B, and vice versa. Useful for
adding traffic encryption, for example.
- Refactored for readability
Expand All @@ -29,32 +50,40 @@
- More tests

## 1.0.11

- Added close function to SocketConnector

## 1.0.10

- Small format error to get to 140/140 on pub.dev

## 1.0.9

- Improved network throughput of socket_connector

## 1.0.8

- Added connection timeout if only one side connects

## 1.0.7

- fix change log

## 1.0.6

- Bug fix with cloing sockets.
- Bug fix with closing sockets.

## 1.0.5

- Ready for isolates

## 1.0.4

- Formated with dart format 140/140 (I hope)
- Formatted with dart format 140/140 (I hope)

## 1.0.3

- Included dart docs and formated with dart format
- Included dart docs and formatted with dart format

## 1.0.2

Expand Down
161 changes: 117 additions & 44 deletions lib/src/socket_connector.dart
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class SocketConnector {
this.verbose = false,
this.logTraffic = false,
this.timeout = defaultTimeout,
this.authTimeout = defaultTimeout,
IOSink? logger,
}) {
this.logger = logger ?? stderr;
Expand Down Expand Up @@ -82,6 +83,9 @@ class SocketConnector {

final Completer _closedCompleter = Completer();

/// How long to wait for a client to authenticate its self
final Duration authTimeout;

/// Add a [Side] with optional [SocketAuthVerifier] and
/// [DataTransformer]
/// - If [socketAuthVerifier] provided, wait for socket to be authenticated
Expand All @@ -101,7 +105,7 @@ class SocketConnector {
try {
(authenticated, stream) = await thisSide.socketAuthVerifier!
(thisSide.socket)
.timeout(Duration(seconds: 5));
.timeout(authTimeout);
thisSide.authenticated = authenticated;
if (thisSide.authenticated) {
thisSide.stream = stream!;
Expand All @@ -128,6 +132,8 @@ class SocketConnector {
if (pendingA.isNotEmpty && pendingB.isNotEmpty) {
Connection c = Connection(pendingA.removeAt(0), pendingB.removeAt(0));
connections.add(c);
_log(chalk.brightBlue(
'Added connection. There are now ${connections.length} connections.'));

for (final side in [thisSide, thisSide.farSide!]) {
if (side.transformer != null) {
Expand Down Expand Up @@ -165,37 +171,35 @@ class SocketConnector {
return;
}
side.state = SideState.closing;
Connection? connectionToRemove;
for (final c in connections) {
if (c.sideA == side || c.sideB == side) {
_log(chalk.brightBlue('Will remove established connection'));
connectionToRemove = c;
break;
}
}
if (connectionToRemove != null) {
connections.remove(connectionToRemove);
_log(chalk
.brightBlue('Removed connection. ${connections.length} remaining.'));
if (connections.isEmpty && gracePeriodPassed) {
_log(chalk.brightBlue('No established connections remain'
' and grace period has passed - '
' will close connector'));
close();
}
}
side.state = SideState.closed;
try {
_log(chalk.brightBlue('Destroying socket on side ${side.name}'));
side.socket.destroy();
if (side.farSide != null) {
_log(chalk.brightBlue(
'Destroying socket on far side (${side.farSide?.name})'));
side.farSide?.socket.destroy();
}

Connection? connectionToRemove;
for (final c in connections) {
if (c.sideA == side || c.sideB == side) {
_log(chalk.brightBlue('Will remove established connection'));
connectionToRemove = c;
break;
}
_destroySide(side.farSide!);
}
if (connectionToRemove != null) {
connections.remove(connectionToRemove);
_log(chalk.brightBlue('Removed connection'));
if (connections.isEmpty && gracePeriodPassed) {
_log(chalk.brightBlue('No established connections remain'
' and grace period has passed - '
' will close connector'));
close();
}
}
} catch (_) {
} finally {
side.state = SideState.closed;
}
} catch (_) {}
}

void close() {
Expand Down Expand Up @@ -241,7 +245,9 @@ class SocketConnector {
SocketAuthVerifier? socketAuthVerifierA,
SocketAuthVerifier? socketAuthVerifierB,
Duration timeout = SocketConnector.defaultTimeout,
Duration authTimeout = SocketConnector.defaultTimeout,
IOSink? logger,
int backlog = 0,
}) async {
IOSink logSink = logger ?? stderr;
addressA ??= InternetAddress.anyIPv4;
Expand All @@ -251,10 +257,19 @@ class SocketConnector {
verbose: verbose,
logTraffic: logTraffic,
timeout: timeout,
authTimeout: authTimeout,
logger: logSink,
);
connector._serverSocketA = await ServerSocket.bind(addressA, portA);
connector._serverSocketB = await ServerSocket.bind(addressB, portB);
connector._serverSocketA = await ServerSocket.bind(
addressA,
portA,
backlog: backlog,
);
connector._serverSocketB = await ServerSocket.bind(
addressB,
portB,
backlog: backlog,
);
if (verbose) {
logSink.writeln(
'${DateTime.now()} | serverToServer | Bound ports A: ${connector.sideAPort}, B: ${connector.sideBPort}');
Expand All @@ -269,7 +284,18 @@ class SocketConnector {
'${DateTime.now()} | serverToServer | Connection on serverSocketA: ${connector._serverSocketA!.port}');
}
Side sideA = Side(socket, true, socketAuthVerifier: socketAuthVerifierA);
unawaited(connector.handleSingleConnection(sideA));
unawaited(connector.handleSingleConnection(sideA).catchError((err) {
logSink
.writeln('ERROR $err from handleSingleConnection on sideA $sideA');
}));
}, onError: (error) {
logSink.writeln(
'${DateTime.now()} | serverToServer | ERROR on serverSocketA: ${connector._serverSocketA?.port} : $error');
connector.close();
}, onDone: () {
logSink.writeln(
'${DateTime.now()} | serverToServer | onDone called on serverSocketA: ${connector._serverSocketA?.port}');
connector.close();
});

// listen for connections to the side 'B' server
Expand All @@ -279,7 +305,18 @@ class SocketConnector {
'${DateTime.now()} | serverToServer | Connection on serverSocketB: ${connector._serverSocketB!.port}');
}
Side sideB = Side(socket, false, socketAuthVerifier: socketAuthVerifierB);
unawaited(connector.handleSingleConnection(sideB));
unawaited(connector.handleSingleConnection(sideB).catchError((err) {
logSink
.writeln('ERROR $err from handleSingleConnection on sideB $sideB');
}));
}, onError: (error) {
logSink.writeln(
'${DateTime.now()} | serverToServer | ERROR on serverSocketB: ${connector._serverSocketB?.port} : $error');
connector.close();
}, onDone: () {
logSink.writeln(
'${DateTime.now()} | serverToServer | onDone called on serverSocketB: ${connector._serverSocketB?.port}');
connector.close();
});

return (connector);
Expand Down Expand Up @@ -319,15 +356,20 @@ class SocketConnector {
// Create socket to an address and port
Socket socket = await Socket.connect(addressA, portA);
Side sideA = Side(socket, true, transformer: transformAtoB);
unawaited(connector.handleSingleConnection(sideA));
unawaited(connector.handleSingleConnection(sideA).catchError((err) {
logSink.writeln('ERROR $err from handleSingleConnection on sideA $sideA');
}));

// bind to side 'B' port
connector._serverSocketB = await ServerSocket.bind(addressB, portB);

// listen for connections to the 'B' side port
connector._serverSocketB?.listen((socketB) {
Side sideB = Side(socketB, false, transformer: transformBtoA);
unawaited(connector.handleSingleConnection(sideB));
unawaited(connector.handleSingleConnection(sideB).catchError((err) {
logSink
.writeln('ERROR $err from handleSingleConnection on sideB $sideB');
}));
});
return (connector);
}
Expand Down Expand Up @@ -361,14 +403,18 @@ class SocketConnector {
}
Socket sideASocket = await Socket.connect(addressA, portA);
Side sideA = Side(sideASocket, true, transformer: transformAtoB);
unawaited(connector.handleSingleConnection(sideA));
unawaited(connector.handleSingleConnection(sideA).catchError((err) {
logSink.writeln('ERROR $err from handleSingleConnection on sideA $sideA');
}));

if (verbose) {
logSink.writeln('socket_connector: Connecting to $addressB:$portB');
}
Socket sideBSocket = await Socket.connect(addressB, portB);
Side sideB = Side(sideBSocket, false, transformer: transformBtoA);
unawaited(connector.handleSingleConnection(sideB));
unawaited(connector.handleSingleConnection(sideB).catchError((err) {
logSink.writeln('ERROR $err from handleSingleConnection on sideB $sideB');
}));

if (verbose) {
logSink.writeln('socket_connector: started');
Expand Down Expand Up @@ -408,7 +454,8 @@ class SocketConnector {
bool multi = false,
@Deprecated("use beforeJoining instead")
Function(Socket socketA, Socket socketB)? onConnect,
Function(Side sideA, Side sideB)? beforeJoining}) async {
Function(Side sideA, Side sideB)? beforeJoining,
int backlog = 0}) async {
IOSink logSink = logger ?? stderr;
addressA ??= InternetAddress.anyIPv4;

Expand All @@ -421,27 +468,53 @@ class SocketConnector {

int connections = 0;
// bind to a local port for side 'A'
connector._serverSocketA = await ServerSocket.bind(addressA, portA);
// listen on the local port and connect the inbound socket
connector._serverSocketA?.listen((sideASocket) async {
if (!multi) {
unawaited(connector._serverSocketA?.close());
}
connector._serverSocketA = await ServerSocket.bind(
addressA,
portA,
backlog: backlog,
);

StreamController<Socket> ssc = StreamController();
ssc.stream.listen((sideASocket) async {
Side sideA = Side(sideASocket, true, transformer: transformAtoB);
unawaited(connector.handleSingleConnection(sideA));
unawaited(connector.handleSingleConnection(sideA).catchError((err) {
logSink
.writeln('ERROR $err from handleSingleConnection on sideA $sideA');
}));

if (verbose) {
logSink.writeln('Making connection ${++connections} to the "B" side');
logSink.writeln('Creating socket #${++connections} to the "B" side');
}
// connect to the side 'B' address and port
Socket sideBSocket = await Socket.connect(addressB, portB);
if (verbose) {
logSink.writeln('"B" side socket #$connections created');
}
Side sideB = Side(sideBSocket, false, transformer: transformBtoA);
beforeJoining?.call(sideA, sideB);
unawaited(connector.handleSingleConnection(sideB));
if (verbose) {
logSink.writeln('Calling the beforeJoining callback');
}
await beforeJoining?.call(sideA, sideB);
unawaited(connector.handleSingleConnection(sideB).catchError((err) {
logSink
.writeln('ERROR $err from handleSingleConnection on sideB $sideB');
}));

onConnect?.call(sideASocket, sideBSocket);
});

// listen on the local port and connect the inbound socket
connector._serverSocketA?.listen((sideASocket) {
if (!multi) {
try {
connector._serverSocketA?.close();
} catch (e) {
logSink.writeln('Error while closing serverSocketA: $e');
}
}
ssc.add(sideASocket);
});

return (connector);
}
}
2 changes: 1 addition & 1 deletion pubspec.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
name: socket_connector
description: Package for joining sockets together to create socket relays.

version: 2.2.0
version: 2.3.0
repository: https://github.com/cconstab/socket_connector

environment:
Expand Down

0 comments on commit a0e4285

Please sign in to comment.