-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: improve stability under load #25
Conversation
…t side has been delivered to the other side before closing the other side
@@ -97,6 +98,9 @@ class SocketConnector { | |||
if (closed) { | |||
throw StateError('Connector is closed'); | |||
} | |||
unawaited(thisSide.socket.done | |||
.then((v) => _closeSide(thisSide)) | |||
.catchError((err) => _closeSide(thisSide))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As soon as we get reference to the socket, ensure there will be no uncaught exceptions
@@ -119,7 +123,7 @@ class SocketConnector { | |||
} | |||
if (!thisSide.authenticated) { | |||
_log('Authentication failed on side ${thisSide.name}', force: true); | |||
_destroySide(thisSide); | |||
_closeSide(thisSide); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
renamed _destroySide to _closeSide
@@ -136,31 +140,34 @@ class SocketConnector { | |||
'Added connection. There are now ${connections.length} connections.')); | |||
|
|||
for (final side in [thisSide, thisSide.farSide!]) { | |||
unawaited(side.socket.done | |||
.then((v) => _destroySide(side)) | |||
.catchError((err) => _destroySide(side))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
moved up the start of the handleSingleConnection function
if (side.state == SideState.closed && | ||
side.rcvd == side.farSide!.sent) { | ||
_closeSide(side.farSide!); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Re the above code: Since the transformer stream may not have written everything to the other side's socket before this side's socket closes, we keep track of number of bytes sent, and, if this side has been closed, will call _closeSide on the far side which will result in both side's sockets being cleaned up correctly
if (side.state == SideState.closed && | ||
side.rcvd == side.farSide!.sent) { | ||
_closeSide(side.farSide!); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code is here because sometimes we will have a transformer for writing to the far side, in which case the check on sent / rcvd happens in the transformer's listen block above; but if there is no transformer (i.e. the farSide's sink is a Socket) then we do the same check here
} else { | ||
_log(chalk.brightBlue( | ||
'Far side (${side.farSide?.name}) has NOT YET received all data')); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Check that all data received from this side has been delivered to the far side before calling _closeSide(farSide). Since sometimes, because of this check, _closeSide will not be called here, blocks of code in the listen onData functions above perform the same check when writing data to the far side's sink; the combination of all of this guarantees that all data is always written to the other side before closing, and guarantees that the sides are always closed (and thus their sockets flushed and destroyed) once all data has been written.
try { | ||
_log(chalk.brightBlue('Destroying socket on side ${side.name}')); | ||
await side.socket.flush(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes indeed, because you have to do this. Doh.
@@ -504,32 +526,40 @@ class SocketConnector { | |||
); | |||
|
|||
StreamController<Socket> ssc = StreamController(); | |||
Mutex m = Mutex(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use a mutex to ensure that new connections to the bound ServerSocket are handled atomically, in strict sequence.
- What I did
accepted
has been delivered to the other side before closing the other side
- How I did it
- How to verify it
- Description for the changelog
accepted
has been delivered to the other side before closing the other side