Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Websocket connection uptake #2178

Merged
merged 35 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
38692f1
feat: interim commit
gkc Mar 8, 2024
5d60232
Merge remote-tracking branch 'origin/trunk' into feat/web-socket-conn…
gkc Jul 5, 2024
15cdeff
Merge remote-tracking branch 'origin/trunk' into feat/web-socket-conn…
gkc Jul 15, 2024
9278521
chore: fix merge issue
gkc Jul 22, 2024
a526f6c
Merge remote-tracking branch 'origin/trunk' into feat/web-socket-conn…
gkc Jul 25, 2024
fd54baf
chore: to eliminate some lint messages following change of Dart minim…
gkc Jul 25, 2024
ce14b25
docs: add some code comments to explain how the ALPN support works
gkc Jul 25, 2024
1e7e9aa
make a webSocketListener function in at_secondary_impl; add a new met…
gkc Jul 25, 2024
d428b25
Merge remote-tracking branch 'origin/trunk' into feat/web-socket-conn…
gkc Jul 26, 2024
3a55fd9
Merge remote-tracking branch 'origin/trunk' into feat/web-socket-conn…
gkc Aug 27, 2024
a8bc574
feat: at_secondary_server: add createWebSocketConnection to connectio…
gkc Sep 5, 2024
dd4b339
Merge remote-tracking branch 'origin/trunk' into feat/web-socket-conn…
gkc Sep 5, 2024
81a130f
Merge remote-tracking branch 'origin/trunk' into feat/web-socket-conn…
gkc Sep 18, 2024
507aad9
refactor: allow creation of InboundWebSocketConnection with minimal d…
gkc Sep 18, 2024
fe52c06
feat: implement createWebSocketConnection in InboundConnectionManager
gkc Sep 18, 2024
ad62ece
feat: atServer support for atProtocol over websockets : MVP complete
gkc Sep 18, 2024
4ffb9ba
fix: logging cleanup for http get handling
gkc Oct 9, 2024
54a3229
chore: add a TODO
gkc Oct 22, 2024
b79ae5e
Merge remote-tracking branch 'origin/trunk' into feat/web-socket-conn…
gkc Nov 26, 2024
db7bebf
Merge branch 'trunk' of https://github.com/atsign-foundation/at_serve…
purnimavenkatasubbu Dec 11, 2024
3c65ad9
updated changelog and pubspec
purnimavenkatasubbu Dec 11, 2024
6deb580
fix analyzer issue
purnimavenkatasubbu Dec 11, 2024
3ae6a4f
chore: Added TODOs to add end-to-end tests for the websocket handling
gkc Dec 12, 2024
84419de
chore: removed a TODO which had been implemented
gkc Dec 12, 2024
810578e
test: removed unused `at_end2end_test/test/commons.dart`
gkc Dec 12, 2024
5b4e6b0
chore: syntax error?
gkc Dec 12, 2024
cc2ce8d
Merge branch 'trunk' into feat/web-socket-connections
gkc Dec 12, 2024
7a62b0e
test: removed unnecessary TODOS; updated config12.yaml adding connect…
gkc Dec 12, 2024
445bb02
test: fixed e2e_test_utils for new enum
gkc Dec 12, 2024
6aca4bd
test: fixed e2e_test_utils for new enum
gkc Dec 12, 2024
e56107d
Merge branch 'trunk' of https://github.com/atsign-foundation/at_serve…
purnimavenkatasubbu Dec 17, 2024
39600c9
addressed todos for e2e tests
purnimavenkatasubbu Dec 17, 2024
55da73f
Merge branch 'trunk' into feat/web-socket-connections
purnimavenkatasubbu Dec 18, 2024
60200fc
revert connectionType in config-e2etest-runtime.yaml
purnimavenkatasubbu Dec 18, 2024
30db96f
Merge branch 'feat/web-socket-connections' of https://github.com/atsi…
purnimavenkatasubbu Dec 18, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/at_server.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -574,7 +574,7 @@ jobs:
sed -i "s/ATSIGN_2_HOST/$AT_SIGN_2_HOST/g" tests/at_end2end_test/config/config-e2e_test_runtime.yaml
mv tests/at_end2end_test/config/config-e2e_test_runtime.yaml tests/at_end2end_test/config/config.yaml
cat tests/at_end2end_test/config/config.yaml
echo "Connection successfull"
echo "Connection successful"
break
else
echo "Connection error on attempt ${try}"
Expand Down
2 changes: 2 additions & 0 deletions packages/at_secondary_server/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# 3.2.0
- feat: Added WebSocket support for inbound connections
# 3.1.1
- fix: Store "publicKeyHash" value in the keystore
- fix: add limit param in SyncProgressiveVerbHandler
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ import 'dart:io';

import 'package:at_server_spec/at_server_spec.dart';

abstract class AtConnectionFactory<T extends AtConnection> {
T createSocketConnection(Socket socket, {String? sessionId});
abstract class AtConnectionFactory {
InboundConnection createSocketConnection(Socket socket, {String? sessionId});
InboundConnection createWebSocketConnection(WebSocket socket,
{String? sessionId});
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,192 @@
import 'dart:collection';
import 'dart:math';

import 'package:at_secondary/src/server/at_secondary_config.dart';
import 'package:at_secondary/src/server/server_context.dart';
import 'package:at_server_spec/at_server_spec.dart';

import 'inbound_connection_pool.dart';

// ignore: implementation_imports
import 'package:at_server_spec/src/at_rate_limiter/at_rate_limiter.dart';

class InboundRateLimiter implements AtRateLimiter {
/// The maximum number of requests allowed within the specified time frame.
@override
late int maxRequestsPerTimeFrame;

/// The duration of the time frame within which requests are limited.
@override
late int timeFrameInMillis;

/// A list of timestamps representing the times when requests were made.
late final Queue<int> requestTimestampQueue;

InboundRateLimiter() {
maxRequestsPerTimeFrame = AtSecondaryConfig.maxEnrollRequestsAllowed;
timeFrameInMillis = AtSecondaryConfig.timeFrameInMills;
requestTimestampQueue = Queue();
}

@override
bool isRequestAllowed() {
int currentTimeInMills = DateTime.now().toUtc().millisecondsSinceEpoch;
_checkAndUpdateQueue(currentTimeInMills);
if (requestTimestampQueue.length < maxRequestsPerTimeFrame) {
requestTimestampQueue.addLast(currentTimeInMills);
return true;
}
return false;
}

/// Checks and updates the request timestamp queue based on the current time.
///
/// This method removes timestamps from the queue that are older than the specified
/// time window.
///
/// [currentTimeInMillis] is the current time in milliseconds since epoch.
void _checkAndUpdateQueue(int currentTimeInMillis) {
if (requestTimestampQueue.isEmpty) return;
int calculatedTime = (currentTimeInMillis - requestTimestampQueue.first);
while (calculatedTime >= timeFrameInMillis) {
requestTimestampQueue.removeFirst();
if (requestTimestampQueue.isEmpty) break;
calculatedTime = (currentTimeInMillis - requestTimestampQueue.first);
}
}
}

class InboundIdleChecker {
AtSecondaryContext secondaryContext;
InboundConnection connection;
InboundConnectionPool? owningPool;

InboundIdleChecker(this.secondaryContext, this.connection, this.owningPool) {
lowWaterMarkRatio = secondaryContext.inboundConnectionLowWaterMarkRatio;
progressivelyReduceAllowableInboundIdleTime =
secondaryContext.progressivelyReduceAllowableInboundIdleTime;

// As number of connections increases then the "allowable" idle time
// reduces from the 'max' towards the 'min' value.
unauthenticatedMaxAllowableIdleTimeMillis =
secondaryContext.unauthenticatedInboundIdleTimeMillis;
unauthenticatedMinAllowableIdleTimeMillis =
secondaryContext.unauthenticatedMinAllowableIdleTimeMillis;

authenticatedMaxAllowableIdleTimeMillis =
secondaryContext.authenticatedInboundIdleTimeMillis;
authenticatedMinAllowableIdleTimeMillis =
secondaryContext.authenticatedMinAllowableIdleTimeMillis;
}

/// As number of connections increases then the "allowable" idle time
/// reduces from the 'max' towards the 'min' value.
late int unauthenticatedMaxAllowableIdleTimeMillis;

/// As number of connections increases then the "allowable" idle time
/// reduces from the 'max' towards the 'min' value.
late int unauthenticatedMinAllowableIdleTimeMillis;

/// As number of connections increases then the "allowable" idle time
/// reduces from the 'max' towards the 'min' value.
late int authenticatedMaxAllowableIdleTimeMillis;

/// As number of connections increases then the "allowable" idle time
/// reduces from the 'max' towards the 'min' value.
late int authenticatedMinAllowableIdleTimeMillis;

late double lowWaterMarkRatio;
late bool progressivelyReduceAllowableInboundIdleTime;

int calcAllowableIdleTime(double idleTimeReductionFactor,
int minAllowableIdleTimeMillis, int maxAllowableIdleTimeMillis) =>
(((maxAllowableIdleTimeMillis - minAllowableIdleTimeMillis) *
idleTimeReductionFactor) +
minAllowableIdleTimeMillis)
.floor();

/// Get the idle time of the inbound connection since last write operation
int _getIdleTimeMillis() {
var lastAccessedTime = connection.metaData.lastAccessed;
// if lastAccessedTime is not set, use created time
lastAccessedTime ??= connection.metaData.created;
var currentTime = DateTime.now().toUtc();
return currentTime.difference(lastAccessedTime!).inMilliseconds;
}

/// Returns true if the client's idle time is greater than configured idle time.
/// false otherwise
bool _idleForLongerThanMax() {
var idleTimeMillis = _getIdleTimeMillis();
if (connection.metaData.isAuthenticated ||
connection.metaData.isPolAuthenticated) {
return idleTimeMillis > authenticatedMaxAllowableIdleTimeMillis;
} else {
return idleTimeMillis > unauthenticatedMaxAllowableIdleTimeMillis;
}
}

bool isInValid() {
// If we don't know our owning pool, OR we've disabled the new logic, just use old logic
if (owningPool == null ||
progressivelyReduceAllowableInboundIdleTime == false) {
var retVal = _idleForLongerThanMax();
return retVal;
}

// We do know our owning pool, so we'll use fancier logic.
// Unauthenticated connections should be reaped increasingly aggressively as we approach max connections
// Authenticated connections should also be reaped as we approach max connections, but a lot less aggressively
// Ultimately, the caller (e.g. [InboundConnectionManager] decides **whether** to reap or not.
int? poolMaxConnections = owningPool!.getCapacity();
int lowWaterMark = (poolMaxConnections! * lowWaterMarkRatio).floor();
int numConnectionsOverLwm =
max(owningPool!.getCurrentSize() - lowWaterMark, 0);

// We're past the low water mark. Let's use some fancier logic to mark connections invalid increasingly aggressively.
double idleTimeReductionFactor =
1 - (numConnectionsOverLwm / (poolMaxConnections - lowWaterMark));
if (!connection.metaData.isAuthenticated &&
!connection.metaData.isPolAuthenticated) {
// For **unauthenticated** connections, we deem invalid if idle time is greater than
// ((maxIdleTime - minIdleTime) * (1 - numConnectionsOverLwm / (maxConnections - connectionsLowWaterMark))) + minIdleTime
//
// i.e. as the current number of connections grows past low-water-mark, the tolerated idle time reduces
// Given: Max connections of 50, lwm of 25, max idle time of 605 seconds, min idle time of 5 seconds
// When: current == 25, idle time allowable = (605-5) * (1 - 0/25) + 5 i.e. 600 * 1.0 + 5 i.e. 605
// When: current == 40, idle time allowable = (605-5) * (1 - 15/25) + 5 i.e. 600 * 0.4 + 5 i.e. 245
// When: current == 49, idle time allowable = (605-5) * (1 - 24/25) + 5 i.e. 600 * 0.04 + 5 i.e. 24 + 5 i.e. 29
// When: current == 50, idle time allowable = (605-5) * (1 - 25/25) + 5 i.e. 600 * 0.0 + 5 i.e. 0 + 5 i.e. 5
//
// Given: Max connections of 50, lwm of 10, max idle time of 605 seconds, min idle time of 5 seconds
// When: current == 10, idle time allowable = (605-5) * (1 - (10-10)/(50-10)) + 5 i.e. 600 * (1 - 0/40) + 5 i.e. 605
// When: current == 20, idle time allowable = (605-5) * (1 - (20-10)/(50-10)) + 5 i.e. 600 * (1 - 10/40) + 5 i.e. 455
// When: current == 30, idle time allowable = (605-5) * (1 - (30-10)/(50-10)) + 5 i.e. 600 * (1 - 20/40) + 5 i.e. 305
// When: current == 40, idle time allowable = (605-5) * (1 - (40-10)/(50-10)) + 5 i.e. 600 * (1 - 30/40) + 5 i.e. 155
// When: current == 49, idle time allowable = (605-5) * (1 - (49-10)/(50-10)) + 5 i.e. 600 * (1 - 39/40) + 5 i.e. 600 * .025 + 5 i.e. 20
// When: current == 50, idle time allowable = (605-5) * (1 - (50-10)/(50-10)) + 5 i.e. 600 * (1 - 40/40) + 5 i.e. 600 * 0 + 5 i.e. 5
int allowableIdleTime = calcAllowableIdleTime(
idleTimeReductionFactor,
unauthenticatedMinAllowableIdleTimeMillis,
unauthenticatedMaxAllowableIdleTimeMillis);
var actualIdleTime = _getIdleTimeMillis();
var retVal = actualIdleTime > allowableIdleTime;
return retVal;
} else {
// For authenticated connections
// TODO (1) if the connection has a request in progress, we should never mark it as invalid
// (2) otherwise, we will mark as invalid using same algorithm as above, but using authenticatedMinAllowableIdleTimeMillis
int allowableIdleTime = calcAllowableIdleTime(
idleTimeReductionFactor,
authenticatedMinAllowableIdleTimeMillis,
authenticatedMaxAllowableIdleTimeMillis);
var actualIdleTime = _getIdleTimeMillis();
var retVal = actualIdleTime > allowableIdleTime;
return retVal;
}
}
}

class ConnectionUtil {
/// Returns the number of active monitor connections.
static int getMonitorConnectionSize() {
Expand Down
Loading
Loading