diff --git a/.github/workflows/at_server.yaml b/.github/workflows/at_server.yaml index 480aaa638..cc6968e27 100644 --- a/.github/workflows/at_server.yaml +++ b/.github/workflows/at_server.yaml @@ -574,7 +574,7 @@ jobs: sed -i "s/ATSIGN_2_HOST/$AT_SIGN_2_HOST/g" tests/at_end2end_test/config/config-e2e_test_runtime.yaml mv tests/at_end2end_test/config/config-e2e_test_runtime.yaml tests/at_end2end_test/config/config.yaml cat tests/at_end2end_test/config/config.yaml - echo "Connection successfull" + echo "Connection successful" break else echo "Connection error on attempt ${try}" diff --git a/packages/at_secondary_server/CHANGELOG.md b/packages/at_secondary_server/CHANGELOG.md index e7dae789d..7e703bd3a 100644 --- a/packages/at_secondary_server/CHANGELOG.md +++ b/packages/at_secondary_server/CHANGELOG.md @@ -1,3 +1,5 @@ +# 3.2.0 +- feat: Added WebSocket support for inbound connections # 3.1.1 - fix: Store "publicKeyHash" value in the keystore - fix: add limit param in SyncProgressiveVerbHandler diff --git a/packages/at_secondary_server/lib/src/connection/connection_factory.dart b/packages/at_secondary_server/lib/src/connection/connection_factory.dart index 486b1580b..dd55f1aa7 100644 --- a/packages/at_secondary_server/lib/src/connection/connection_factory.dart +++ b/packages/at_secondary_server/lib/src/connection/connection_factory.dart @@ -2,6 +2,8 @@ import 'dart:io'; import 'package:at_server_spec/at_server_spec.dart'; -abstract class AtConnectionFactory { - T createSocketConnection(Socket socket, {String? sessionId}); +abstract class AtConnectionFactory { + InboundConnection createSocketConnection(Socket socket, {String? sessionId}); + InboundConnection createWebSocketConnection(WebSocket socket, + {String? sessionId}); } diff --git a/packages/at_secondary_server/lib/src/connection/inbound/connection_util.dart b/packages/at_secondary_server/lib/src/connection/inbound/connection_util.dart index d57e4b048..20c4b187b 100644 --- a/packages/at_secondary_server/lib/src/connection/inbound/connection_util.dart +++ b/packages/at_secondary_server/lib/src/connection/inbound/connection_util.dart @@ -1,5 +1,192 @@ +import 'dart:collection'; +import 'dart:math'; + +import 'package:at_secondary/src/server/at_secondary_config.dart'; +import 'package:at_secondary/src/server/server_context.dart'; +import 'package:at_server_spec/at_server_spec.dart'; + import 'inbound_connection_pool.dart'; +// ignore: implementation_imports +import 'package:at_server_spec/src/at_rate_limiter/at_rate_limiter.dart'; + +class InboundRateLimiter implements AtRateLimiter { + /// The maximum number of requests allowed within the specified time frame. + @override + late int maxRequestsPerTimeFrame; + + /// The duration of the time frame within which requests are limited. + @override + late int timeFrameInMillis; + + /// A list of timestamps representing the times when requests were made. + late final Queue requestTimestampQueue; + + InboundRateLimiter() { + maxRequestsPerTimeFrame = AtSecondaryConfig.maxEnrollRequestsAllowed; + timeFrameInMillis = AtSecondaryConfig.timeFrameInMills; + requestTimestampQueue = Queue(); + } + + @override + bool isRequestAllowed() { + int currentTimeInMills = DateTime.now().toUtc().millisecondsSinceEpoch; + _checkAndUpdateQueue(currentTimeInMills); + if (requestTimestampQueue.length < maxRequestsPerTimeFrame) { + requestTimestampQueue.addLast(currentTimeInMills); + return true; + } + return false; + } + + /// Checks and updates the request timestamp queue based on the current time. + /// + /// This method removes timestamps from the queue that are older than the specified + /// time window. + /// + /// [currentTimeInMillis] is the current time in milliseconds since epoch. + void _checkAndUpdateQueue(int currentTimeInMillis) { + if (requestTimestampQueue.isEmpty) return; + int calculatedTime = (currentTimeInMillis - requestTimestampQueue.first); + while (calculatedTime >= timeFrameInMillis) { + requestTimestampQueue.removeFirst(); + if (requestTimestampQueue.isEmpty) break; + calculatedTime = (currentTimeInMillis - requestTimestampQueue.first); + } + } +} + +class InboundIdleChecker { + AtSecondaryContext secondaryContext; + InboundConnection connection; + InboundConnectionPool? owningPool; + + InboundIdleChecker(this.secondaryContext, this.connection, this.owningPool) { + lowWaterMarkRatio = secondaryContext.inboundConnectionLowWaterMarkRatio; + progressivelyReduceAllowableInboundIdleTime = + secondaryContext.progressivelyReduceAllowableInboundIdleTime; + + // As number of connections increases then the "allowable" idle time + // reduces from the 'max' towards the 'min' value. + unauthenticatedMaxAllowableIdleTimeMillis = + secondaryContext.unauthenticatedInboundIdleTimeMillis; + unauthenticatedMinAllowableIdleTimeMillis = + secondaryContext.unauthenticatedMinAllowableIdleTimeMillis; + + authenticatedMaxAllowableIdleTimeMillis = + secondaryContext.authenticatedInboundIdleTimeMillis; + authenticatedMinAllowableIdleTimeMillis = + secondaryContext.authenticatedMinAllowableIdleTimeMillis; + } + + /// As number of connections increases then the "allowable" idle time + /// reduces from the 'max' towards the 'min' value. + late int unauthenticatedMaxAllowableIdleTimeMillis; + + /// As number of connections increases then the "allowable" idle time + /// reduces from the 'max' towards the 'min' value. + late int unauthenticatedMinAllowableIdleTimeMillis; + + /// As number of connections increases then the "allowable" idle time + /// reduces from the 'max' towards the 'min' value. + late int authenticatedMaxAllowableIdleTimeMillis; + + /// As number of connections increases then the "allowable" idle time + /// reduces from the 'max' towards the 'min' value. + late int authenticatedMinAllowableIdleTimeMillis; + + late double lowWaterMarkRatio; + late bool progressivelyReduceAllowableInboundIdleTime; + + int calcAllowableIdleTime(double idleTimeReductionFactor, + int minAllowableIdleTimeMillis, int maxAllowableIdleTimeMillis) => + (((maxAllowableIdleTimeMillis - minAllowableIdleTimeMillis) * + idleTimeReductionFactor) + + minAllowableIdleTimeMillis) + .floor(); + + /// Get the idle time of the inbound connection since last write operation + int _getIdleTimeMillis() { + var lastAccessedTime = connection.metaData.lastAccessed; + // if lastAccessedTime is not set, use created time + lastAccessedTime ??= connection.metaData.created; + var currentTime = DateTime.now().toUtc(); + return currentTime.difference(lastAccessedTime!).inMilliseconds; + } + + /// Returns true if the client's idle time is greater than configured idle time. + /// false otherwise + bool _idleForLongerThanMax() { + var idleTimeMillis = _getIdleTimeMillis(); + if (connection.metaData.isAuthenticated || + connection.metaData.isPolAuthenticated) { + return idleTimeMillis > authenticatedMaxAllowableIdleTimeMillis; + } else { + return idleTimeMillis > unauthenticatedMaxAllowableIdleTimeMillis; + } + } + + bool isInValid() { + // If we don't know our owning pool, OR we've disabled the new logic, just use old logic + if (owningPool == null || + progressivelyReduceAllowableInboundIdleTime == false) { + var retVal = _idleForLongerThanMax(); + return retVal; + } + + // We do know our owning pool, so we'll use fancier logic. + // Unauthenticated connections should be reaped increasingly aggressively as we approach max connections + // Authenticated connections should also be reaped as we approach max connections, but a lot less aggressively + // Ultimately, the caller (e.g. [InboundConnectionManager] decides **whether** to reap or not. + int? poolMaxConnections = owningPool!.getCapacity(); + int lowWaterMark = (poolMaxConnections! * lowWaterMarkRatio).floor(); + int numConnectionsOverLwm = + max(owningPool!.getCurrentSize() - lowWaterMark, 0); + + // We're past the low water mark. Let's use some fancier logic to mark connections invalid increasingly aggressively. + double idleTimeReductionFactor = + 1 - (numConnectionsOverLwm / (poolMaxConnections - lowWaterMark)); + if (!connection.metaData.isAuthenticated && + !connection.metaData.isPolAuthenticated) { + // For **unauthenticated** connections, we deem invalid if idle time is greater than + // ((maxIdleTime - minIdleTime) * (1 - numConnectionsOverLwm / (maxConnections - connectionsLowWaterMark))) + minIdleTime + // + // i.e. as the current number of connections grows past low-water-mark, the tolerated idle time reduces + // Given: Max connections of 50, lwm of 25, max idle time of 605 seconds, min idle time of 5 seconds + // When: current == 25, idle time allowable = (605-5) * (1 - 0/25) + 5 i.e. 600 * 1.0 + 5 i.e. 605 + // When: current == 40, idle time allowable = (605-5) * (1 - 15/25) + 5 i.e. 600 * 0.4 + 5 i.e. 245 + // When: current == 49, idle time allowable = (605-5) * (1 - 24/25) + 5 i.e. 600 * 0.04 + 5 i.e. 24 + 5 i.e. 29 + // When: current == 50, idle time allowable = (605-5) * (1 - 25/25) + 5 i.e. 600 * 0.0 + 5 i.e. 0 + 5 i.e. 5 + // + // Given: Max connections of 50, lwm of 10, max idle time of 605 seconds, min idle time of 5 seconds + // When: current == 10, idle time allowable = (605-5) * (1 - (10-10)/(50-10)) + 5 i.e. 600 * (1 - 0/40) + 5 i.e. 605 + // When: current == 20, idle time allowable = (605-5) * (1 - (20-10)/(50-10)) + 5 i.e. 600 * (1 - 10/40) + 5 i.e. 455 + // When: current == 30, idle time allowable = (605-5) * (1 - (30-10)/(50-10)) + 5 i.e. 600 * (1 - 20/40) + 5 i.e. 305 + // When: current == 40, idle time allowable = (605-5) * (1 - (40-10)/(50-10)) + 5 i.e. 600 * (1 - 30/40) + 5 i.e. 155 + // When: current == 49, idle time allowable = (605-5) * (1 - (49-10)/(50-10)) + 5 i.e. 600 * (1 - 39/40) + 5 i.e. 600 * .025 + 5 i.e. 20 + // When: current == 50, idle time allowable = (605-5) * (1 - (50-10)/(50-10)) + 5 i.e. 600 * (1 - 40/40) + 5 i.e. 600 * 0 + 5 i.e. 5 + int allowableIdleTime = calcAllowableIdleTime( + idleTimeReductionFactor, + unauthenticatedMinAllowableIdleTimeMillis, + unauthenticatedMaxAllowableIdleTimeMillis); + var actualIdleTime = _getIdleTimeMillis(); + var retVal = actualIdleTime > allowableIdleTime; + return retVal; + } else { + // For authenticated connections + // TODO (1) if the connection has a request in progress, we should never mark it as invalid + // (2) otherwise, we will mark as invalid using same algorithm as above, but using authenticatedMinAllowableIdleTimeMillis + int allowableIdleTime = calcAllowableIdleTime( + idleTimeReductionFactor, + authenticatedMinAllowableIdleTimeMillis, + authenticatedMaxAllowableIdleTimeMillis); + var actualIdleTime = _getIdleTimeMillis(); + var retVal = actualIdleTime > allowableIdleTime; + return retVal; + } + } +} + class ConnectionUtil { /// Returns the number of active monitor connections. static int getMonitorConnectionSize() { diff --git a/packages/at_secondary_server/lib/src/connection/inbound/inbound_connection_impl.dart b/packages/at_secondary_server/lib/src/connection/inbound/inbound_connection_impl.dart index a6750bd7b..6f6eaacd8 100644 --- a/packages/at_secondary_server/lib/src/connection/inbound/inbound_connection_impl.dart +++ b/packages/at_secondary_server/lib/src/connection/inbound/inbound_connection_impl.dart @@ -1,17 +1,15 @@ -import 'dart:collection'; import 'dart:io'; -import 'dart:math'; import 'package:at_secondary/src/connection/base_connection.dart'; import 'package:at_secondary/src/connection/inbound/inbound_connection_metadata.dart'; import 'package:at_secondary/src/connection/inbound/inbound_connection_pool.dart'; import 'package:at_secondary/src/connection/inbound/inbound_message_listener.dart'; -import 'package:at_secondary/src/server/at_secondary_config.dart'; import 'package:at_secondary/src/server/server_context.dart'; import 'package:at_secondary/src/server/at_secondary_impl.dart'; import 'package:at_secondary/src/utils/logging_util.dart'; import 'package:at_server_spec/at_server_spec.dart'; +import 'connection_util.dart'; import 'dummy_inbound_connection.dart'; class InboundConnectionImpl extends BaseSocketConnection @@ -25,35 +23,8 @@ class InboundConnectionImpl extends BaseSocketConnection InboundConnectionPool? owningPool; - /// As number of connections increases then the "allowable" idle time - /// reduces from the 'max' towards the 'min' value. - late int unauthenticatedMaxAllowableIdleTimeMillis; - - /// As number of connections increases then the "allowable" idle time - /// reduces from the 'max' towards the 'min' value. - late int unauthenticatedMinAllowableIdleTimeMillis; - - /// As number of connections increases then the "allowable" idle time - /// reduces from the 'max' towards the 'min' value. - late int authenticatedMaxAllowableIdleTimeMillis; - - /// As number of connections increases then the "allowable" idle time - /// reduces from the 'max' towards the 'min' value. - late int authenticatedMinAllowableIdleTimeMillis; - - late double lowWaterMarkRatio; - late bool progressivelyReduceAllowableInboundIdleTime; - - /// The maximum number of requests allowed within the specified time frame. - @override - late int maxRequestsPerTimeFrame; - - /// The duration of the time frame within which requests are limited. - @override - late int timeFrameInMillis; - - /// A list of timestamps representing the times when requests were made. - late final Queue requestTimestampQueue; + late InboundRateLimiter rateLimiter; + late InboundIdleChecker idleChecker; InboundConnectionImpl(T socket, String? sessionId, {this.owningPool}) : super(socket) { @@ -67,25 +38,8 @@ class InboundConnectionImpl extends BaseSocketConnection // In test harnesses, secondary context may not yet have been set, in which case create a default AtSecondaryContext instance secondaryContext ??= AtSecondaryContext(); - lowWaterMarkRatio = secondaryContext.inboundConnectionLowWaterMarkRatio; - progressivelyReduceAllowableInboundIdleTime = - secondaryContext.progressivelyReduceAllowableInboundIdleTime; - - // As number of connections increases then the "allowable" idle time - // reduces from the 'max' towards the 'min' value. - unauthenticatedMaxAllowableIdleTimeMillis = - secondaryContext.unauthenticatedInboundIdleTimeMillis; - unauthenticatedMinAllowableIdleTimeMillis = - secondaryContext.unauthenticatedMinAllowableIdleTimeMillis; - - authenticatedMaxAllowableIdleTimeMillis = - secondaryContext.authenticatedInboundIdleTimeMillis; - authenticatedMinAllowableIdleTimeMillis = - secondaryContext.authenticatedMinAllowableIdleTimeMillis; - - maxRequestsPerTimeFrame = AtSecondaryConfig.maxEnrollRequestsAllowed; - timeFrameInMillis = AtSecondaryConfig.timeFrameInMills; - requestTimestampQueue = Queue(); + idleChecker = InboundIdleChecker(secondaryContext, this, owningPool); + rateLimiter = InboundRateLimiter(); logger.info(logger.getAtConnectionLogMessage( metaData, @@ -128,89 +82,7 @@ class InboundConnectionImpl extends BaseSocketConnection return true; } - // If we don't know our owning pool, OR we've disabled the new logic, just use old logic - if (owningPool == null || - progressivelyReduceAllowableInboundIdleTime == false) { - var retVal = _idleForLongerThanMax(); - return retVal; - } - - // We do know our owning pool, so we'll use fancier logic. - // Unauthenticated connections should be reaped increasingly aggressively as we approach max connections - // Authenticated connections should also be reaped as we approach max connections, but a lot less aggressively - // Ultimately, the caller (e.g. [InboundConnectionManager] decides **whether** to reap or not. - int? poolMaxConnections = owningPool!.getCapacity(); - int lowWaterMark = (poolMaxConnections! * lowWaterMarkRatio).floor(); - int numConnectionsOverLwm = - max(owningPool!.getCurrentSize() - lowWaterMark, 0); - - // We're past the low water mark. Let's use some fancier logic to mark connections invalid increasingly aggressively. - double idleTimeReductionFactor = - 1 - (numConnectionsOverLwm / (poolMaxConnections - lowWaterMark)); - if (!metaData.isAuthenticated && !metaData.isPolAuthenticated) { - // For **unauthenticated** connections, we deem invalid if idle time is greater than - // ((maxIdleTime - minIdleTime) * (1 - numConnectionsOverLwm / (maxConnections - connectionsLowWaterMark))) + minIdleTime - // - // i.e. as the current number of connections grows past low-water-mark, the tolerated idle time reduces - // Given: Max connections of 50, lwm of 25, max idle time of 605 seconds, min idle time of 5 seconds - // When: current == 25, idle time allowable = (605-5) * (1 - 0/25) + 5 i.e. 600 * 1.0 + 5 i.e. 605 - // When: current == 40, idle time allowable = (605-5) * (1 - 15/25) + 5 i.e. 600 * 0.4 + 5 i.e. 245 - // When: current == 49, idle time allowable = (605-5) * (1 - 24/25) + 5 i.e. 600 * 0.04 + 5 i.e. 24 + 5 i.e. 29 - // When: current == 50, idle time allowable = (605-5) * (1 - 25/25) + 5 i.e. 600 * 0.0 + 5 i.e. 0 + 5 i.e. 5 - // - // Given: Max connections of 50, lwm of 10, max idle time of 605 seconds, min idle time of 5 seconds - // When: current == 10, idle time allowable = (605-5) * (1 - (10-10)/(50-10)) + 5 i.e. 600 * (1 - 0/40) + 5 i.e. 605 - // When: current == 20, idle time allowable = (605-5) * (1 - (20-10)/(50-10)) + 5 i.e. 600 * (1 - 10/40) + 5 i.e. 455 - // When: current == 30, idle time allowable = (605-5) * (1 - (30-10)/(50-10)) + 5 i.e. 600 * (1 - 20/40) + 5 i.e. 305 - // When: current == 40, idle time allowable = (605-5) * (1 - (40-10)/(50-10)) + 5 i.e. 600 * (1 - 30/40) + 5 i.e. 155 - // When: current == 49, idle time allowable = (605-5) * (1 - (49-10)/(50-10)) + 5 i.e. 600 * (1 - 39/40) + 5 i.e. 600 * .025 + 5 i.e. 20 - // When: current == 50, idle time allowable = (605-5) * (1 - (50-10)/(50-10)) + 5 i.e. 600 * (1 - 40/40) + 5 i.e. 600 * 0 + 5 i.e. 5 - int allowableIdleTime = calcAllowableIdleTime( - idleTimeReductionFactor, - unauthenticatedMinAllowableIdleTimeMillis, - unauthenticatedMaxAllowableIdleTimeMillis); - var actualIdleTime = _getIdleTimeMillis(); - var retVal = actualIdleTime > allowableIdleTime; - return retVal; - } else { - // For authenticated connections - // TODO (1) if the connection has a request in progress, we should never mark it as invalid - // (2) otherwise, we will mark as invalid using same algorithm as above, but using authenticatedMinAllowableIdleTimeMillis - int allowableIdleTime = calcAllowableIdleTime( - idleTimeReductionFactor, - authenticatedMinAllowableIdleTimeMillis, - authenticatedMaxAllowableIdleTimeMillis); - var actualIdleTime = _getIdleTimeMillis(); - var retVal = actualIdleTime > allowableIdleTime; - return retVal; - } - } - - int calcAllowableIdleTime(double idleTimeReductionFactor, - int minAllowableIdleTimeMillis, int maxAllowableIdleTimeMillis) => - (((maxAllowableIdleTimeMillis - minAllowableIdleTimeMillis) * - idleTimeReductionFactor) + - minAllowableIdleTimeMillis) - .floor(); - - /// Get the idle time of the inbound connection since last write operation - int _getIdleTimeMillis() { - var lastAccessedTime = metaData.lastAccessed; - // if lastAccessedTime is not set, use created time - lastAccessedTime ??= metaData.created; - var currentTime = DateTime.now().toUtc(); - return currentTime.difference(lastAccessedTime!).inMilliseconds; - } - - /// Returns true if the client's idle time is greater than configured idle time. - /// false otherwise - bool _idleForLongerThanMax() { - var idleTimeMillis = _getIdleTimeMillis(); - if (metaData.isAuthenticated || metaData.isPolAuthenticated) { - return idleTimeMillis > authenticatedMaxAllowableIdleTimeMillis; - } else { - return idleTimeMillis > unauthenticatedMaxAllowableIdleTimeMillis; - } + return idleChecker.isInValid(); } @override @@ -259,29 +131,19 @@ class InboundConnectionImpl extends BaseSocketConnection } @override - bool isRequestAllowed() { - int currentTimeInMills = DateTime.now().toUtc().millisecondsSinceEpoch; - _checkAndUpdateQueue(currentTimeInMills); - if (requestTimestampQueue.length < maxRequestsPerTimeFrame) { - requestTimestampQueue.addLast(currentTimeInMills); - return true; - } - return false; - } + int get maxRequestsPerTimeFrame => rateLimiter.maxRequestsPerTimeFrame; - /// Checks and updates the request timestamp queue based on the current time. - /// - /// This method removes timestamps from the queue that are older than the specified - /// time window. - /// - /// [currentTimeInMillis] is the current time in milliseconds since epoch. - void _checkAndUpdateQueue(int currentTimeInMillis) { - if (requestTimestampQueue.isEmpty) return; - int calculatedTime = (currentTimeInMillis - requestTimestampQueue.first); - while (calculatedTime >= timeFrameInMillis) { - requestTimestampQueue.removeFirst(); - if (requestTimestampQueue.isEmpty) break; - calculatedTime = (currentTimeInMillis - requestTimestampQueue.first); - } + @override + set maxRequestsPerTimeFrame(int i) => rateLimiter.maxRequestsPerTimeFrame = i; + + @override + int get timeFrameInMillis => rateLimiter.timeFrameInMillis; + + @override + set timeFrameInMillis(int i) => rateLimiter.timeFrameInMillis = i; + + @override + bool isRequestAllowed() { + return rateLimiter.isRequestAllowed(); } } diff --git a/packages/at_secondary_server/lib/src/connection/inbound/inbound_connection_manager.dart b/packages/at_secondary_server/lib/src/connection/inbound/inbound_connection_manager.dart index 6a9e2dee9..2ba6d4b57 100644 --- a/packages/at_secondary_server/lib/src/connection/inbound/inbound_connection_manager.dart +++ b/packages/at_secondary_server/lib/src/connection/inbound/inbound_connection_manager.dart @@ -1,6 +1,7 @@ import 'dart:io'; import 'package:at_secondary/src/connection/connection_factory.dart'; import 'package:at_secondary/src/connection/inbound/inbound_connection_impl.dart'; +import 'package:at_secondary/src/connection/inbound/inbound_web_socket_connection.dart'; import 'package:at_server_spec/at_server_spec.dart'; import 'package:uuid/uuid.dart'; import 'package:at_commons/at_commons.dart'; @@ -23,7 +24,7 @@ class InboundConnectionManager implements AtConnectionFactory { return _singleton; } - /// Creates and adds [InboundConnection] to the pool + /// Creates and adds an [InboundConnectionImpl] to the pool /// If the pool is not initialized, initializes the pool with [defaultPoolSize] /// @param socket - client socket /// @param sessionId - current sessionId @@ -41,7 +42,29 @@ class InboundConnectionManager implements AtConnectionFactory { var atConnection = InboundConnectionImpl(socket, sessionId, owningPool: _pool); _pool.add(atConnection); - true; + + return atConnection; + } + + /// Creates and adds an [InboundWebSocketConnection] to the pool + /// If the pool is not initialized, initializes the pool with [defaultPoolSize] + /// @param socket - client socket + /// @param sessionId - current sessionId + /// Throws a [InboundConnectionLimitException] if pool doesn't have capacity + @override + InboundConnection createWebSocketConnection(WebSocket ws, + {String? sessionId}) { + if (!_isInitialized) { + init(defaultPoolSize); + } + if (!hasCapacity()) { + throw InboundConnectionLimitException( + 'max limit reached on inbound pool'); + } + sessionId ??= '_${Uuid().v4()}'; + var atConnection = InboundWebSocketConnection(ws, sessionId, _pool); + _pool.add(atConnection); + return atConnection; } diff --git a/packages/at_secondary_server/lib/src/connection/inbound/inbound_message_listener.dart b/packages/at_secondary_server/lib/src/connection/inbound/inbound_message_listener.dart index f42a649d6..8c728a0ef 100644 --- a/packages/at_secondary_server/lib/src/connection/inbound/inbound_message_listener.dart +++ b/packages/at_secondary_server/lib/src/connection/inbound/inbound_message_listener.dart @@ -42,7 +42,19 @@ class InboundMessageListener { /// Handles messages on the inbound client's connection and calls the verb executor /// Closes the inbound connection in case of any error. - Future _messageHandler(data) async { + Future _messageHandler(streamData) async { + logger.finest('_messageHandler received ${streamData.runtimeType}' + ' : $streamData '); + List data; + if (streamData is List) { + data = streamData; + } else if (streamData is String) { + data = utf8.encode(streamData); + } else { + logger.severe('Un-handled data type: ${streamData.runtimeType}'); + await _finishedHandler(); + return; + } //ignore the data read if the connection is stale or closed if (connection.metaData.isStale || connection.metaData.isClosed) { //clear buffer as data is redundant diff --git a/packages/at_secondary_server/lib/src/connection/inbound/inbound_web_socket_connection.dart b/packages/at_secondary_server/lib/src/connection/inbound/inbound_web_socket_connection.dart new file mode 100644 index 000000000..964955c61 --- /dev/null +++ b/packages/at_secondary_server/lib/src/connection/inbound/inbound_web_socket_connection.dart @@ -0,0 +1,141 @@ +import 'dart:async'; +import 'dart:io'; + +import 'package:at_secondary/src/connection/inbound/inbound_connection_metadata.dart'; +import 'package:at_secondary/src/connection/inbound/inbound_connection_pool.dart'; +import 'package:at_secondary/src/connection/inbound/inbound_message_listener.dart'; +import 'package:at_secondary/src/server/at_secondary_impl.dart'; +import 'package:at_secondary/src/server/server_context.dart'; +import 'package:at_secondary/src/utils/logging_util.dart'; +import 'package:at_server_spec/at_server_spec.dart'; +import 'package:at_utils/at_utils.dart'; + +import 'connection_util.dart'; + +class InboundWebSocketConnection implements InboundConnection { + WebSocket ws; + + AtSignLogger logger = AtSignLogger('InboundWebSocketConnection'); + + @override + late InboundConnectionMetadata metaData; + + @override + bool? isMonitor = false; + + /// This contains the value of the atsign initiated the connection + @override + String? initiatedBy; + + InboundConnectionPool? owningPool; + + late InboundRateLimiter rateLimiter; + late InboundIdleChecker idleChecker; + + InboundWebSocketConnection(this.ws, String? sessionId, this.owningPool) { + metaData = InboundConnectionMetadata() + ..sessionID = sessionId + ..created = DateTime.now().toUtc() + ..isCreated = true; + + AtSecondaryContext? secondaryContext = + AtSecondaryServerImpl.getInstance().serverContext; + // In test harnesses, secondary context may not yet have been set, in which case create a default AtSecondaryContext instance + secondaryContext ??= AtSecondaryContext(); + + idleChecker = InboundIdleChecker(secondaryContext, this, owningPool); + rateLimiter = InboundRateLimiter(); + + logger.info( + logger.getAtConnectionLogMessage(metaData, 'New WebSocket ($this)')); + + ws.done.then((doneValue) { + logger.info('ws.done called. Calling this.close()'); + close(); + }, onError: (error, stackTrace) { + logger.info('ws.done.onError called with $error. Calling this.close()'); + close(); + }); + } + + /// Returns true if the web sockets are identical + @override + bool equals(InboundConnection connection) { + if (connection is! InboundWebSocketConnection) { + return false; + } + + return ws == connection.ws; + } + + /// Returning true indicates to the caller that this connection **can** be closed if needed + @override + bool isInValid() { + if (metaData.isClosed || metaData.isStale) { + return true; + } + + return idleChecker.isInValid(); + } + + @override + void acceptRequests(Function(String, InboundConnection) callback, + Function(List, InboundConnection) streamCallBack) { + var listener = InboundMessageListener(this); + listener.listen(callback, streamCallBack); + } + + bool? isStream; + + @override + Future close() async { + // Some defensive code just in case we accidentally call close multiple times + if (metaData.isClosed) { + logger.info('already closed; returning'); + return; + } + + metaData.isClosed = true; + + try { + logger.info(logger.getAtConnectionLogMessage( + metaData, 'closing WebSocket (readyState ${ws.readyState})')); + try { + await ws.close(); + } catch (e) { + logger.severe('ws.close() exception: $e'); + } + logger.info(logger.getAtConnectionLogMessage( + metaData, 'Closed WebSocket (readyState ${ws.readyState})')); + } catch (_) { + // Ignore exception on a connection close + metaData.isStale = true; + } + } + + @override + Future write(String data) async { + ws.add(data); + logger.info(logger.getAtConnectionLogMessage(metaData, 'SENT: $data')); + } + + @override + int get maxRequestsPerTimeFrame => rateLimiter.maxRequestsPerTimeFrame; + + @override + set maxRequestsPerTimeFrame(int i) => rateLimiter.maxRequestsPerTimeFrame = i; + + @override + int get timeFrameInMillis => rateLimiter.timeFrameInMillis; + + @override + set timeFrameInMillis(int i) => rateLimiter.timeFrameInMillis = i; + + @override + bool isRequestAllowed() { + return rateLimiter.isRequestAllowed(); + } + + @override + get underlying => ws; +} diff --git a/packages/at_secondary_server/lib/src/exception/global_exception_handler.dart b/packages/at_secondary_server/lib/src/exception/global_exception_handler.dart index 1dd49986a..607c149a5 100644 --- a/packages/at_secondary_server/lib/src/exception/global_exception_handler.dart +++ b/packages/at_secondary_server/lib/src/exception/global_exception_handler.dart @@ -25,7 +25,7 @@ class GlobalExceptionHandler { /// params: AtException, AtConnection Future handle(Exception exception, {AtConnection? atConnection, - Socket? clientSocket, + dynamic clientSocket, StackTrace? stackTrace}) async { if (exception is InvalidAtSignException || exception is BufferOverFlowException || @@ -49,9 +49,16 @@ class GlobalExceptionHandler { await _sendResponseForException(exception, atConnection); _closeConnection(atConnection); } else if (exception is InboundConnectionLimitException) { - // This requires different handling which is in _handleInboundLimit - logger.info(exception.toString()); - await _handleInboundLimit(exception, clientSocket!); + if (clientSocket == null) { + logger.severe('handling InboundConnectionLimitException,' + ' but clientSocket parameter was null'); + } else { + logger.info(exception.toString()); + var errorCode = getErrorCode(exception); + var errorDescription = getErrorDescription(errorCode); + clientSocket.add('error:$errorCode-$errorDescription\n'.codeUnits); + await clientSocket.close(); + } } else if (exception is ServerIsPausedException) { // This is thrown when a new verb request comes in and the server is paused (likely // pending restart) @@ -84,14 +91,6 @@ class GlobalExceptionHandler { } } - Future _handleInboundLimit( - AtException exception, Socket clientSocket) async { - var errorCode = getErrorCode(exception); - var errorDescription = getErrorDescription(errorCode); - clientSocket.write('error:$errorCode-$errorDescription\n'); - await clientSocket.close(); - } - /// Method to close connection. /// params: AtConnection /// This will close the connection and remove it from pool diff --git a/packages/at_secondary_server/lib/src/server/at_secondary_impl.dart b/packages/at_secondary_server/lib/src/server/at_secondary_impl.dart index f58b4f9c1..436f0644e 100644 --- a/packages/at_secondary_server/lib/src/server/at_secondary_impl.dart +++ b/packages/at_secondary_server/lib/src/server/at_secondary_impl.dart @@ -35,6 +35,8 @@ import 'package:crypton/crypton.dart'; import 'package:meta/meta.dart'; import 'package:uuid/uuid.dart'; +import 'pseudo_server_socket.dart'; + /// [AtSecondaryServerImpl] is a singleton class which implements [AtSecondaryServer] class AtSecondaryServerImpl implements AtSecondaryServer { static final bool? useTLS = AtSecondaryConfig.useTLS; @@ -302,6 +304,9 @@ class AtSecondaryServerImpl implements AtSecondaryServer { // clean up malformed keys from keystore await removeMalformedKeys(); + if (!useTLS!) { + throw AtServerException('Only TLS is supported; useTLS must be true'); + } try { _isRunning = true; if (useTLS!) { @@ -441,31 +446,97 @@ class AtSecondaryServerImpl implements AtSecondaryServer { } } + webSocketListener(WebSocket ws) async { + InboundConnection? connection; + try { + var inBoundConnectionManager = InboundConnectionManager.getInstance(); + connection = inBoundConnectionManager.createWebSocketConnection(ws, + sessionId: '_${Uuid().v4()}'); + connection.acceptRequests(_executeVerbCallBack, _streamCallBack); + await connection.write('@'); + } on InboundConnectionLimitException catch (e) { + await GlobalExceptionHandler.getInstance() + .handle(e, atConnection: connection, clientSocket: ws); + } + } + /// Listens on the secondary server socket and creates an inbound connection to server socket from client socket /// Throws [AtConnection] if unable to create a connection /// Throws [SocketException] for exceptions on socket /// Throws [Exception] for any other exceptions. /// @param - ServerSocket - void _listen(var serverSocket) { - logger.info('serverSocket _listen : ${serverSocket.runtimeType}'); + void _listen(final serverSocket) { + // ALPN support. + // First, make a PseudoServerSocket to which we will pass sockets which + // have a selectedProtocol which is neither null nor 'atProtocol/1.0'. + // See later in this method for where we pass sockets received on the real + // serverSocket to the pseudoServerSocket. + final pseudoServerSocket = PseudoServerSocket(serverSocket); + // Second, make an HttpServer which is handling sockets which are passed + // to the pseudoServerSocket + HttpServer httpServer = HttpServer.listenOn(pseudoServerSocket); + httpServer.listen((HttpRequest req) { + if (req.uri.path == '/ws') { + // Upgrade an HttpRequest to a WebSocket connection. + logger.info('Upgraded to WebSocket connection'); + WebSocketTransformer.upgrade(req) + .then((WebSocket ws) => webSocketListener(ws)); + } else { + logger.info('Got Http Request: ${req.method} ${req.uri}'); + if (req.method.toUpperCase() != 'GET') { + req.response.statusCode = HttpStatus.badRequest; + req.response.close(); + } else { + // TODO URL decoding, need to handle emojis for example + var lookupKey = req.uri.path.substring(1); + if (!lookupKey.startsWith('public:')) { + lookupKey = 'public:$lookupKey'; + } + if (!lookupKey.endsWith(currentAtSign)) { + lookupKey = '$lookupKey$currentAtSign'; + } + logger.finer('Key to look up: $lookupKey'); + secondaryKeyStore.get(lookupKey)!.then((AtData? value) { + req.response.writeln('data:${value?.data}'); + req.response.close(); + }, onError: (error) { + req.response.writeln('error:no such key $lookupKey'); + req.response.close(); + }); + } + } + }); + + logger.finer('serverSocket _listen : ${serverSocket.runtimeType}'); serverSocket.listen(((clientSocket) async { - var sessionID = '_${Uuid().v4()}'; - InboundConnection? connection; - try { - logger.finer( - 'In _listen - clientSocket.peerCertificate : ${clientSocket.peerCertificate}'); - var inBoundConnectionManager = InboundConnectionManager.getInstance(); - connection = inBoundConnectionManager - .createSocketConnection(clientSocket, sessionId: sessionID); - connection.acceptRequests(_executeVerbCallBack, _streamCallBack); - await connection.write('@'); - } on InboundConnectionLimitException catch (e) { - await GlobalExceptionHandler.getInstance() - .handle(e, atConnection: connection, clientSocket: clientSocket); + logger.info( + 'New client socket: selectedProtocol ${clientSocket.selectedProtocol}'); + if (clientSocket.selectedProtocol == 'atProtocol/1.0' || + clientSocket.selectedProtocol == null) { + InboundConnection? connection; + try { + logger.info( + 'In _listen - clientSocket.peerCertificate : ${clientSocket.peerCertificate}'); + var inBoundConnectionManager = InboundConnectionManager.getInstance(); + connection = inBoundConnectionManager.createSocketConnection( + clientSocket, + sessionId: '_${Uuid().v4()}'); + connection.acceptRequests(_executeVerbCallBack, _streamCallBack); + await connection.write('@'); + } on InboundConnectionLimitException catch (e) { + await GlobalExceptionHandler.getInstance() + .handle(e, atConnection: connection, clientSocket: clientSocket); + } + } else { + // ALPN support + // selectedProtocol is neither null nor 'atProtocol/1.0' + // TODO check specifically for http/1.1 + logger.info('Transferring socket to HttpServer for handling'); + pseudoServerSocket.add(clientSocket); } }), onError: (error) { - // We've got no action to take here, let's just log a message - logger.info("ServerSocket.listen called onError with '$error'"); + // We've got no action to take here, let's just log a warning + logger.warning("ServerSocket.listen called onError with '$error'"); }); } @@ -486,6 +557,8 @@ class AtSecondaryServerImpl implements AtSecondaryServer { secCon.setTrustedCertificates( serverContext!.securityContext!.trustedCertificatePath()); certsAvailable = true; + // secCon.setAlpnProtocols(['atp/1.0', 'h2', 'http/1.1'], true); + secCon.setAlpnProtocols(['atProtocol/1.0', 'http/1.1'], true); } on FileSystemException catch (e) { retryCount++; logger.info('${e.message}:${e.path}'); diff --git a/packages/at_secondary_server/lib/src/server/pseudo_server_socket.dart b/packages/at_secondary_server/lib/src/server/pseudo_server_socket.dart new file mode 100644 index 000000000..d1f292cd6 --- /dev/null +++ b/packages/at_secondary_server/lib/src/server/pseudo_server_socket.dart @@ -0,0 +1,221 @@ +import 'dart:async'; +import 'dart:io'; + +import 'package:at_utils/at_utils.dart'; + +class PseudoServerSocket implements ServerSocket { + final AtSignLogger logger = AtSignLogger(' AtServerSocket '); + final SecureServerSocket _serverSocket; + final StreamController sc = + StreamController.broadcast(sync: true); + + PseudoServerSocket(this._serverSocket); + + @override + int get port => _serverSocket.port; + + @override + InternetAddress get address => _serverSocket.address; + + @override + Future close() async { + await sc.close(); + return this; + } + + add(Socket socket) { + logger.info('add was called with socket: $socket'); + sc.add(socket); + } + + // Can ignore everything from this point on, it's just the implementation + // of the Stream interface. + // + // All calls to the Stream methods are implemented by delegating + // the calls to the StreamController's stream + + @override + Future any(bool Function(Socket element) test) { + return sc.stream.any(test); + } + + @override + Stream asBroadcastStream( + {void Function(StreamSubscription subscription)? onListen, + void Function(StreamSubscription subscription)? onCancel}) { + return sc.stream.asBroadcastStream(onListen: onListen, onCancel: onCancel); + } + + @override + Stream asyncExpand(Stream? Function(Socket event) convert) { + return sc.stream.asyncExpand(convert); + } + + @override + Stream asyncMap(FutureOr Function(Socket event) convert) { + return sc.stream.asyncMap(convert); + } + + @override + Stream cast() { + return sc.stream.cast(); + } + + @override + Future contains(Object? needle) { + return sc.stream.contains(needle); + } + + @override + Stream distinct( + [bool Function(Socket previous, Socket next)? equals]) { + return sc.stream.distinct(equals); + } + + @override + Future drain([E? futureValue]) { + return sc.stream.drain(futureValue); + } + + @override + Future elementAt(int index) { + return sc.stream.elementAt(index); + } + + @override + Future every(bool Function(Socket element) test) { + return sc.stream.every(test); + } + + @override + Stream expand(Iterable Function(Socket element) convert) { + return sc.stream.expand(convert); + } + + @override + Future get first => sc.stream.first; + + @override + Future firstWhere(bool Function(Socket element) test, + {Socket Function()? orElse}) { + return sc.stream.firstWhere(test, orElse: orElse); + } + + @override + Future fold( + S initialValue, S Function(S previous, Socket element) combine) { + return sc.stream.fold(initialValue, combine); + } + + @override + Future forEach(void Function(Socket element) action) { + return sc.stream.forEach(action); + } + + @override + Stream handleError(Function onError, + {bool Function(dynamic error)? test}) { + return sc.stream.handleError(onError, test: test); + } + + @override + bool get isBroadcast => sc.stream.isBroadcast; + + @override + Future get isEmpty => sc.stream.isEmpty; + + @override + Future join([String separator = ""]) { + return sc.stream.join(separator); + } + + @override + Future get last => sc.stream.last; + + @override + Future lastWhere(bool Function(Socket element) test, + {Socket Function()? orElse}) { + return sc.stream.lastWhere(test, orElse: orElse); + } + + @override + Future get length => sc.stream.length; + + @override + StreamSubscription listen(void Function(Socket event)? onData, + {Function? onError, void Function()? onDone, bool? cancelOnError}) { + return sc.stream.listen(onData, + onError: onError, onDone: onDone, cancelOnError: cancelOnError); + } + + @override + Stream map(S Function(Socket event) convert) { + return sc.stream.map(convert); + } + + @override + Future pipe(StreamConsumer streamConsumer) { + return sc.stream.pipe(streamConsumer); + } + + @override + Future reduce( + Socket Function(Socket previous, Socket element) combine) { + return sc.stream.reduce(combine); + } + + @override + Future get single => sc.stream.single; + + @override + Future singleWhere(bool Function(Socket element) test, + {Socket Function()? orElse}) { + return sc.stream.singleWhere(test, orElse: orElse); + } + + @override + Stream skip(int count) { + return sc.stream.skip(count); + } + + @override + Stream skipWhile(bool Function(Socket element) test) { + return sc.stream.skipWhile(test); + } + + @override + Stream take(int count) { + return sc.stream.take(count); + } + + @override + Stream takeWhile(bool Function(Socket element) test) { + return sc.stream.takeWhile(test); + } + + @override + Stream timeout(Duration timeLimit, + {void Function(EventSink sink)? onTimeout}) { + return sc.stream.timeout(timeLimit, onTimeout: onTimeout); + } + + @override + Future> toList() { + return sc.stream.toList(); + } + + @override + Future> toSet() { + return sc.stream.toSet(); + } + + @override + Stream transform(StreamTransformer streamTransformer) { + return sc.stream.transform(streamTransformer); + } + + @override + Stream where(bool Function(Socket event) test) { + return sc.stream.where(test); + } +} diff --git a/packages/at_secondary_server/lib/src/utils/logging_util.dart b/packages/at_secondary_server/lib/src/utils/logging_util.dart index ed03ec445..5c1193d44 100644 --- a/packages/at_secondary_server/lib/src/utils/logging_util.dart +++ b/packages/at_secondary_server/lib/src/utils/logging_util.dart @@ -13,7 +13,7 @@ extension AtConnectionMetadataLogging on AtSignLogger { if (atConnectionMetaData.sessionID != null) { stringBuffer.write('${atConnectionMetaData.sessionID?.hashCode}|'); } - stringBuffer.write('$logMsg|'); + stringBuffer.write(logMsg); return stringBuffer.toString(); } diff --git a/packages/at_secondary_server/pubspec.yaml b/packages/at_secondary_server/pubspec.yaml index 50b377658..b84fd9513 100644 --- a/packages/at_secondary_server/pubspec.yaml +++ b/packages/at_secondary_server/pubspec.yaml @@ -1,6 +1,6 @@ name: at_secondary description: Implementation of secondary server. -version: 3.1.1 +version: 3.2.0 repository: https://github.com/atsign-foundation/at_server homepage: https://www.example.com publish_to: none diff --git a/packages/at_secondary_server/test/local_lookup_verb_test.dart b/packages/at_secondary_server/test/local_lookup_verb_test.dart index fcc96c44e..a9841df11 100644 --- a/packages/at_secondary_server/test/local_lookup_verb_test.dart +++ b/packages/at_secondary_server/test/local_lookup_verb_test.dart @@ -332,15 +332,18 @@ void main() { await secondaryKeyStore.put( keyName, AtData()..data = jsonEncode(enrollJson)); // Update a key with buzz namespace - String updateCommand = 'update:atconnections.bob.alice.at_contact.buzz$alice bob'; - HashMap updateVerbParams = getVerbParam(VerbSyntax.update, updateCommand); + String updateCommand = + 'update:atconnections.bob.alice.at_contact.buzz$alice bob'; + HashMap updateVerbParams = + getVerbParam(VerbSyntax.update, updateCommand); UpdateVerbHandler updateVerbHandler = UpdateVerbHandler( secondaryKeyStore, statsNotificationService, notificationManager); await updateVerbHandler.processVerb( response, updateVerbParams, inboundConnection); expect(response.data, isNotNull); // Local Lookup a key with at_contact.buzz namespace - String llookupCommand = 'llookup:atconnections.bob.alice.at_contact.buzz$alice'; + String llookupCommand = + 'llookup:atconnections.bob.alice.at_contact.buzz$alice'; HashMap llookupVerbParams = getVerbParam(VerbSyntax.llookup, llookupCommand); LocalLookupVerbHandler localLookupVerbHandler = diff --git a/packages/at_server_spec/lib/at_server_spec.dart b/packages/at_server_spec/lib/at_server_spec.dart index 46d1fbb7b..73f0f635c 100644 --- a/packages/at_server_spec/lib/at_server_spec.dart +++ b/packages/at_server_spec/lib/at_server_spec.dart @@ -7,3 +7,5 @@ export 'package:at_server_spec/src/connection/at_connection.dart'; export 'package:at_server_spec/src/connection/inbound_connection.dart'; export 'package:at_server_spec/src/verb/update_meta.dart'; export 'package:at_server_spec/src/verb/verb.dart'; +export 'package:at_server_spec/src/at_rate_limiter/at_rate_limiter.dart'; + diff --git a/tests/at_end2end_test/config/config-e2e_test_runtime.yaml b/tests/at_end2end_test/config/config-e2e_test_runtime.yaml index 55983c78b..68da164b8 100644 --- a/tests/at_end2end_test/config/config-e2e_test_runtime.yaml +++ b/tests/at_end2end_test/config/config-e2e_test_runtime.yaml @@ -18,4 +18,4 @@ second_atsign_server: # # The port to connect to second atServer second_atsign_port: ATSIGN_2_PORT # # The url to connect to second atServer - second_atsign_url: ATSIGN_2_HOST + second_atsign_url: ATSIGN_2_HOST \ No newline at end of file diff --git a/tests/at_end2end_test/config/config12.yaml b/tests/at_end2end_test/config/config12.yaml index b37bb3b51..213e86c5d 100644 --- a/tests/at_end2end_test/config/config12.yaml +++ b/tests/at_end2end_test/config/config12.yaml @@ -8,14 +8,18 @@ root_server: # cicd atsign details first_atsign_server: first_atsign_name: '@cicd1' - # # The port to connect to first atsign server + # The port to connect to first atsign server first_atsign_port: 6464 - # # The url to connect to first atsign server + # The url to connect to first atsign server first_atsign_url: cicd1.atsign.wtf + # The type of connection (socket or websocket) + first_atsign_connection_type: websocket second_atsign_server: second_atsign_name: '@cicd2' - # # The port to connect to second atsign server + # The port to connect to second atsign server second_atsign_port: 6464 - # # The url to connect to second atsign server + # The url to connect to second atsign server second_atsign_url: cicd2.atsign.wtf + # The type of connection (socket or websocket) + second_atsign_connection_type: websocket diff --git a/tests/at_end2end_test/test/cached_key_test.dart b/tests/at_end2end_test/test/cached_key_test.dart index cbf6e3a93..bed884eab 100644 --- a/tests/at_end2end_test/test/cached_key_test.dart +++ b/tests/at_end2end_test/test/cached_key_test.dart @@ -6,10 +6,10 @@ import 'e2e_test_utils.dart' as e2e; void main() { late String atSign_1; - late e2e.SimpleOutboundSocketHandler sh1; + late e2e.SimpleOutboundConnection sh1; late String atSign_2; - late e2e.SimpleOutboundSocketHandler sh2; + late e2e.SimpleOutboundConnection sh2; int lastValue = DateTime.now().millisecondsSinceEpoch; diff --git a/tests/at_end2end_test/test/commons.dart b/tests/at_end2end_test/test/commons.dart deleted file mode 100644 index 9905d9a23..000000000 --- a/tests/at_end2end_test/test/commons.dart +++ /dev/null @@ -1,119 +0,0 @@ -import 'dart:collection'; -import 'dart:convert'; -import 'dart:io'; - -import 'package:test/test.dart'; - -import 'pkam_utils.dart'; - -var _queue = Queue(); -var maxRetryCount = 10; -var retryCount = 1; - -///Socket Connection -Future socket_connection(host, port) async { - var context = SecurityContext(); - print(Directory.current); - context.setTrustedCertificates('lib/secondary/base/certs/cacert.pem'); - context.usePrivateKey('lib/secondary/base/certs/privkey.pem'); - context.useCertificateChain('lib/secondary/base/certs/fullchain.pem'); - return await SecureSocket.connect(host, port, context: context); -} - -void clear() { - _queue.clear(); - print('queue cleared'); -} - -///Secure Socket Connection -Future secure_socket_connection(var host, var port) async { - var socket; - while (retryCount < maxRetryCount) { - try { - socket = await SecureSocket.connect(host, port); - if (socket != null) { - break; - } - } on Exception { - print('retrying "$host:$port" for connection.. $retryCount'); - await Future.delayed(Duration(seconds: 5)); - retryCount++; - } - } - return socket; -} - -/// Socket Listener -void socket_listener(Socket socket) { - socket.listen(_messageHandler); -} - -/// Socket write -Future socket_writer(Socket socket, String msg) async { - print('command sent: $msg'); - msg = msg + '\n'; - socket.write(msg); -} - -///The prepare function takes a socket and atsign as input params and runs a from verb and pkam verb on the atsign param. -Future prepare(Socket socket, String atsign) async { - // FROM VERB - await socket_writer(socket, 'from:$atsign'); - var response = await read(); - print('From verb response $response'); - response = response.replaceAll('data:', ''); - var pkam_digest = generatePKAMDigest(atsign, response); - // var cram = getDigest(atsign, response); - - // PKAM VERB - await socket_writer(socket, 'pkam:$pkam_digest'); - response = await read(); - print('pkam verb response $response'); - expect(response, 'data:success\n'); - - //CRAM VERB - // await socket_writer(socket, 'cram:$cram'); - // response = await read(); - // print('cram verb response $response'); - // expect(response, 'data:success\n'); -} - -void _messageHandler(data) { - if (data.length == 1 && data.first == 64) { - return; - } - //ignore prompt(@ or @@) after '\n'. byte code for \n is 10 - if (data.last == 64 && data.contains(10)) { - data = data.sublist(0, data.lastIndexOf(10) + 1); - _queue.add(utf8.decode(data)); - } else if (data.length > 1 && data.first == 64 && data.last == 64) { - // pol responses do not end with '\n'. Add \n for buffer completion - _queue.add(utf8.decode(data)); - } else { - _queue.add(utf8.decode(data)); - } -} - -Future read({int maxWaitMilliSeconds = 5000}) async { - var result; - //wait maxWaitMilliSeconds seconds for response from remote socket - var loopCount = (maxWaitMilliSeconds / 50).round(); - for (var i = 0; i < loopCount; i++) { - await Future.delayed(Duration(milliseconds: 1000)); - var queueLength = _queue.length; - if (queueLength > 0) { - result = _queue.removeFirst(); - // result from another secondary is either data or a @@ denoting complete - // of the handshake - if (result.startsWith('data:') || - (result.startsWith('error:')) || - (result.startsWith('@') && result.endsWith('@'))) { - return result; - } else { - //log any other response and ignore - result = ''; - } - } - } - return result; -} diff --git a/tests/at_end2end_test/test/e2e_test_utils.dart b/tests/at_end2end_test/test/e2e_test_utils.dart index 506327d67..c55853815 100644 --- a/tests/at_end2end_test/test/e2e_test_utils.dart +++ b/tests/at_end2end_test/test/e2e_test_utils.dart @@ -1,3 +1,5 @@ +import 'dart:math'; + import 'package:at_commons/at_commons.dart'; import 'package:at_end2end_test/conf/config_util.dart'; import 'pkam_utils.dart'; @@ -5,10 +7,15 @@ import 'pkam_utils.dart'; import 'dart:collection'; import 'dart:convert'; import 'dart:io'; +// ignore: depend_on_referenced_packages +import 'package:at_utils/at_logger.dart'; const int maxRetryCount = 10; +AtSignLogger logger = AtSignLogger('e2e_test_utils'); + /// Contains all [_AtSignConfig] instances we know about so we can avoid loads of boilerplate elsewhere +// ignore: library_private_types_in_public_api LinkedHashMap atSignConfigMap = LinkedHashMap(); /// Return a List of atSigns known to these e2e test utils. Ordering is the order of insertion in [_loadYaml] which is @@ -21,14 +28,26 @@ List knownAtSigns() { /// Utility method which will return a socket handler. Gets config from [atSignConfigMap] which in turn calls /// [_loadTheYaml] if it hasn't yet been loaded. /// Can evolve this to use a pooling approach if/when it becomes necessary. -Future getSocketHandler(atSign) async { +Future getSocketHandler(atSign) async { _loadTheYaml(); _AtSignConfig? asc = atSignConfigMap[atSign]; if (asc == null) { throw _NoSuchAtSignException('$atSign not configured'); } - var handler = SimpleOutboundSocketHandler._(asc.host, asc.port, atSign); + + // ignore: prefer_typing_uninitialized_variables + // ignore: prefer_typing_uninitialized_variables + var handler; + switch (asc.connectionType!) { + case _ConnectionTypeEnum.socket: + handler = SimpleOutboundSocketConnection(asc.host, asc.port, atSign); + break; + case _ConnectionTypeEnum.websocket: + handler = SimpleOutboundWebsocketConnection(asc.host, asc.port, atSign); + break; + } + await handler.connect(); handler.startListening(); await handler.sendFromAndPkam(); @@ -37,26 +56,137 @@ Future getSocketHandler(atSign) async { } /// A simple wrapper around a socket for @ protocol communication. -class SimpleOutboundSocketHandler { +/// A simple wrapper around a socket for @ protocol communication. +abstract class SimpleOutboundConnection { late Queue _queue; final _buffer = ByteBuffer(capacity: 10240000); - // ignore: prefer_typing_uninitialized_variables String host; int port; String atSign; - SecureSocket? socket; - - /// Try to open a socket - SimpleOutboundSocketHandler._(this.host, this.port, this.atSign) { + SimpleOutboundConnection(this.host, this.port, this.atSign) { _queue = Queue(); } + void close(); + Future connect(); + void startListening(); + Future writeCommand(String command, {bool log = true}); + Future sendFromAndPkam(); + + Future clear() async { + _queue.clear(); + } + + Future read( + {bool log = true, + int timeoutMillis = 4000, + bool throwTimeoutException = true}) async { + String result; + // Wait this many milliseconds between checks on the queue + var loopDelay = 250; + bool first = true; + var loopCount = (timeoutMillis / loopDelay).round(); + for (var i = 0; i < loopCount; i++) { + if (!first) { + await Future.delayed(Duration(milliseconds: loopDelay)); + } + first = false; + var queueLength = _queue.length; + if (queueLength > 0) { + result = _queue.removeFirst(); + if (log) { + print("Response: $result"); + } + return result; + } + } + // No response - either throw a timeout exception or return the canned readTimedOutMessage + if (throwTimeoutException) { + throw AtTimeoutException( + "No response from $host:$port ($atSign) after ${timeoutMillis / 1000} seconds"); + } else { + print("read(): No response after $timeoutMillis milliseconds"); + return readTimedOutMessage; + } + } + + final int newLineCodeUnit = 10; + final int atCharCodeUnit = 64; + + /// Handles responses from the remote secondary, adds to [_queue] for processing in [read] method + /// Throws a [BufferOverFlowException] if buffer is unable to hold incoming data + /// Handles responses from the remote secondary for both socket and WebSocket connections. + /// Adds responses to [_queue] for processing in the [read] method. + /// Throws a [BufferOverFlowException] if buffer is unable to hold incoming data. + Future _messageHandler(dynamic data) async { + if (data is String) { + if (data == '@' || data.isEmpty) { + return; + } + int lastIndexOfNewLineCharacter = data.lastIndexOf('\n'); + data = data.substring(0, lastIndexOfNewLineCharacter); + _queue.add(data); + } else if (data is List) { + // Handle raw socket data + _checkBufferOverFlow(data); + + // Loop through the data and process it + for (int element = 0; element < data.length; element++) { + if (data[element] == newLineCodeUnit) { + String result = utf8.decode(_buffer.getData().toList()); + result = _stripPrompt(result); + _buffer.clear(); + _queue.add(result); + } else { + _buffer.addByte(data[element]); + } + } + } else { + throw UnsupportedError( + 'Unsupported data type received: ${data.runtimeType}'); + } + } + + void _checkBufferOverFlow(data) { + if (_buffer.isOverFlow(data)) { + int bufferLength = (_buffer.length() + data.length) as int; + _buffer.clear(); + throw BufferOverFlowException( + 'data length exceeded the buffer limit. Data length : $bufferLength and Buffer capacity ${_buffer.capacity}'); + } + } + + String _stripPrompt(String result) { + var colonIndex = result.indexOf(':'); + if (colonIndex == -1) { + return result; + } + var responsePrefix = result.substring(0, colonIndex); + var response = result.substring(colonIndex); + if (responsePrefix.contains('@')) { + responsePrefix = + responsePrefix.substring(responsePrefix.lastIndexOf('@') + 1); + } + return '$responsePrefix$response'; + } + + static String readTimedOutMessage = 'E2E_SIMPLE_SOCKET_HANDLER_TIMED_OUT'; +} + +class SimpleOutboundSocketConnection extends SimpleOutboundConnection { + SecureSocket? socket; + + SimpleOutboundSocketConnection(String host, int port, String atSign) + : super(host, port, atSign); + + @override void close() { - print("Closing SimpleOutboundSocketHandler for $atSign ($host:$port)"); - socket!.destroy(); + print("Closing SimpleOutboundSocketConnection for $atSign ($host:$port)"); + socket?.destroy(); } + @override Future connect() async { int retryCount = 1; while (retryCount < maxRetryCount) { @@ -71,140 +201,134 @@ class SimpleOutboundSocketHandler { retryCount++; } } - throw Exception("Failed to connect to $host:$port after $retryCount attempts"); + throw Exception( + "Failed to connect to $host:$port after $retryCount attempts"); } + @override void startListening() { - socket!.listen(_messageHandler); + socket?.listen(_messageHandler); } - /// Socket write + @override Future writeCommand(String command, {bool log = true}) async { if (log) { print('command sent: $command'); } - if (! command.endsWith('\n')) { - command = command + '\n'; + if (!command.endsWith('\n')) { + command = '$command\n'; } - socket!.write(command); + socket?.write(command); } - /// Runs a from verb and pkam verb on the atsign param. + @override Future sendFromAndPkam() async { - // FROM VERB - // Setting clientVersion to 3.0.38 to support JSON encoding of error responses await writeCommand('from:$atSign:clientConfig:{"version":"3.0.38"}'); - var response = await read(timeoutMillis:5000); + var response = await read(timeoutMillis: 5000); response = response.replaceAll('data:', ''); var pkamDigest = generatePKAMDigest(atSign, response); - // PKAM VERB - print ("Sending pkam: command"); - await writeCommand('pkam:$pkamDigest', log:false); - response = await read(timeoutMillis:5000); + await writeCommand('pkam:$pkamDigest', log: false); + response = await read(timeoutMillis: 5000); print('pkam verb response $response'); assert(response.contains('data:success')); } +} - Future clear() async { - // queue.clear(); - } +class SimpleOutboundWebsocketConnection extends SimpleOutboundConnection { + WebSocket? websocket; - final int newLineCodeUnit = 10; - final int atCharCodeUnit = 64; + SimpleOutboundWebsocketConnection(String host, int port, String atSign) + : super(host, port, atSign); - /// Handles responses from the remote secondary, adds to [_queue] for processing in [read] method - /// Throws a [BufferOverFlowException] if buffer is unable to hold incoming data - Future _messageHandler(data) async { - // check buffer overflow - _checkBufferOverFlow(data); - - // Loop from last index to until the end of data. - // If a new line character is found, then it is end - // of server response. process the data. - // Else add the byte to buffer. - for (int element = 0; element < data.length; element++) { - // If it's a '\n' then complete data has been received. process it. - if (data[element] == newLineCodeUnit) { - String result = utf8.decode(_buffer.getData().toList()); - result = _stripPrompt(result); - _buffer.clear(); - _queue.add(result); - } else { - _buffer.addByte(data[element]); - } - } + @override + void close() { + print( + "Closing SimpleOutboundWebsocketConnection for $atSign ($host:$port)"); + websocket?.close(); } - _checkBufferOverFlow(data) { - if (_buffer.isOverFlow(data)) { - int bufferLength = (_buffer.length() + data.length) as int; - _buffer.clear(); - throw BufferOverFlowException( - 'data length exceeded the buffer limit. Data length : $bufferLength and Buffer capacity ${_buffer.capacity}'); + @override + Future connect() async { + try { + Random random = Random(); + String key = + base64.encode(List.generate(8, (_) => random.nextInt(256))); + + SecurityContext context = SecurityContext.defaultContext; + context.setAlpnProtocols(['http/1.1'], false); + HttpClient client = HttpClient(context: context); + + Uri uri = Uri.parse("https://$host:$port/ws"); + HttpClientRequest request = await client.getUrl(uri); + request.headers.add('Connection', 'upgrade'); + request.headers.add('Upgrade', 'websocket'); + request.headers.add('sec-websocket-version', '13'); + request.headers.add('sec-websocket-key', key); + + HttpClientResponse response = await request.close(); + Socket socket = await response.detachSocket(); + + websocket = WebSocket.fromUpgradedSocket( + socket, + serverSide: false, + ); + + print('WebSocket connection established for $atSign ($host:$port)'); + } catch (e) { + throw AtException( + 'Failed to establish WebSocket connection: ${e.toString()}'); } } - String _stripPrompt(String result) { - var colonIndex = result.indexOf(':'); - if (colonIndex == -1) { - return result; + @override + void startListening() { + websocket?.listen(_messageHandler); + } + + @override + Future writeCommand(String command, {bool log = true}) async { + if (log) { + print('command sent: $command'); } - var responsePrefix = result.substring(0, colonIndex); - var response = result.substring(colonIndex); - if (responsePrefix.contains('@')) { - responsePrefix = - responsePrefix.substring(responsePrefix.lastIndexOf('@') + 1); + if (!command.endsWith('\n')) { + command = '$command\n'; } - return '$responsePrefix$response'; + websocket?.add(command); } - /// A message which is returned from [read] if throwTimeoutException is set to false - static String readTimedOutMessage = 'E2E_SIMPLE_SOCKET_HANDLER_TIMED_OUT'; - - Future read({bool log = true, int timeoutMillis = 4000, bool throwTimeoutException = true}) async { - String result; - - // Wait this many milliseconds between checks on the queue - var loopDelay=250; + @override + Future sendFromAndPkam() async { + await writeCommand('from:$atSign:clientConfig:{"version":"3.0.38"}'); + var response = await read(timeoutMillis: 5000); + response = response.replaceAll('data:', ''); + var pkamDigest = generatePKAMDigest(atSign, response); - bool first = true; - // Check every loopDelay milliseconds until we get a response or timeoutMillis have passed. - var loopCount = (timeoutMillis / loopDelay).round(); - for (var i = 0; i < loopCount; i++) { - if (!first) { - await Future.delayed(Duration(milliseconds: loopDelay)); - } - first = false; - var queueLength = _queue.length; - if (queueLength > 0) { - result = _queue.removeFirst(); - if (log) { - print("Response: $result"); - } - // Got a response, let's return it - return result; - } - } - // No response - either throw a timeout exception or return the canned readTimedOutMessage - if (throwTimeoutException) { - throw AtTimeoutException ("No response from $host:$port ($atSign) after ${timeoutMillis/1000} seconds"); - } else { - print ("read(): No response after $timeoutMillis milliseconds"); - return readTimedOutMessage; - } + await writeCommand('pkam:$pkamDigest', log: false); + response = await read(timeoutMillis: 5000); + print('pkam verb response $response'); + assert(response.contains('data:success')); } } + +enum _ConnectionTypeEnum { + socket, + websocket, +} + /// Simple data-holding class which adds its instances into [atSignConfigMap] class _AtSignConfig { String atSign; String host; int port; + _ConnectionTypeEnum? connectionType; /// Creates and adds to [atSignConfigMap] or throws [_AtSignAlreadyAddedException] if we've already got it. - _AtSignConfig(this.atSign, this.host, this.port) { + _AtSignConfig(this.atSign, this.host, this.port, this.connectionType) { + connectionType ??= _ConnectionTypeEnum.socket; if (atSignConfigMap.containsKey(atSign)) { - throw _AtSignAlreadyAddedException("AtSignConfig for $atSign has already been created"); + throw _AtSignAlreadyAddedException( + "AtSignConfig for $atSign has already been created"); } atSignConfigMap[atSign] = this; } @@ -229,15 +353,33 @@ void _loadTheYaml() { _yamlLoaded = true; + String? connTypeStr1; + _ConnectionTypeEnum? connType1; + connTypeStr1 = ConfigUtil.getYaml()!['first_atsign_server'] + ['first_atsign_connection_type']; + if (connTypeStr1 != null) { + connType1 = _ConnectionTypeEnum.values.byName(connTypeStr1); + } _AtSignConfig( - ConfigUtil.getYaml()!['first_atsign_server']['first_atsign_name'], - ConfigUtil.getYaml()!['first_atsign_server']['first_atsign_url'], - ConfigUtil.getYaml()!['first_atsign_server']['first_atsign_port']); - + ConfigUtil.getYaml()!['first_atsign_server']['first_atsign_name'], + ConfigUtil.getYaml()!['first_atsign_server']['first_atsign_url'], + ConfigUtil.getYaml()!['first_atsign_server']['first_atsign_port'], + connType1, + ); + + String? connTypeStr2; + _ConnectionTypeEnum? connType2; + connTypeStr2 = ConfigUtil.getYaml()!['second_atsign_server'] + ['second_atsign_connection_type']; + if (connTypeStr2 != null) { + connType2 = _ConnectionTypeEnum.values.byName(connTypeStr2); + } _AtSignConfig( - ConfigUtil.getYaml()!['second_atsign_server']['second_atsign_name'], - ConfigUtil.getYaml()!['second_atsign_server']['second_atsign_url'], - ConfigUtil.getYaml()!['second_atsign_server']['second_atsign_port']); + ConfigUtil.getYaml()!['second_atsign_server']['second_atsign_name'], + ConfigUtil.getYaml()!['second_atsign_server']['second_atsign_url'], + ConfigUtil.getYaml()!['second_atsign_server']['second_atsign_port'], + connType2, + ); /// TODO Ideally instead of the current config.yaml we'd have yaml like this /// at_sign_configs: @@ -250,7 +392,7 @@ void _loadTheYaml() { /// ... etc } -extension Utils on SimpleOutboundSocketHandler { +extension Utils on SimpleOutboundConnection { Future getVersion() async { await writeCommand('info\n'); var version = await read(); diff --git a/tests/at_end2end_test/test/lookup_verb_test.dart b/tests/at_end2end_test/test/lookup_verb_test.dart index 14dc155d1..4d29acd81 100644 --- a/tests/at_end2end_test/test/lookup_verb_test.dart +++ b/tests/at_end2end_test/test/lookup_verb_test.dart @@ -9,10 +9,10 @@ import 'e2e_test_utils.dart' as e2e; void main() { late String atSign_1; - late e2e.SimpleOutboundSocketHandler sh1; + late e2e.SimpleOutboundConnection sh1; late String atSign_2; - late e2e.SimpleOutboundSocketHandler sh2; + late e2e.SimpleOutboundConnection sh2; setUpAll(() async { List atSigns = e2e.knownAtSigns(); diff --git a/tests/at_end2end_test/test/notify_verb_test.dart b/tests/at_end2end_test/test/notify_verb_test.dart index c901f233f..3556cfadd 100644 --- a/tests/at_end2end_test/test/notify_verb_test.dart +++ b/tests/at_end2end_test/test/notify_verb_test.dart @@ -8,10 +8,10 @@ import 'e2e_test_utils.dart' as e2e; void main() { late String atSign_1; - late e2e.SimpleOutboundSocketHandler sh1; + late e2e.SimpleOutboundConnection sh1; late String atSign_2; - late e2e.SimpleOutboundSocketHandler sh2; + late e2e.SimpleOutboundConnection sh2; var lastValue = Random().nextInt(30); @@ -46,7 +46,7 @@ void main() { // atsign2 create connection, issue monitor command for that namespace // verify atsign2 receives all 5 notifications immediately test('monitor receives multiple pending notifications immediately', () async { - e2e.SimpleOutboundSocketHandler notifySH, monitorSH; + e2e.SimpleOutboundConnection notifySH, monitorSH; notifySH = await e2e.getSocketHandler(atSign_1); monitorSH = await e2e.getSocketHandler(atSign_2); @@ -845,7 +845,7 @@ void main() { // get notify status Future getNotifyStatus( - e2e.SimpleOutboundSocketHandler sh, String notificationId, + e2e.SimpleOutboundConnection sh, String notificationId, {List? returnWhenStatusIn, int timeOutMillis = 5000}) async { returnWhenStatusIn ??= ['expired']; print( @@ -871,7 +871,7 @@ Future getNotifyStatus( log: true, timeoutMillis: loopDelay, throwTimeoutException: false); readTimedOut = - (response == e2e.SimpleOutboundSocketHandler.readTimedOutMessage); + (response == e2e.SimpleOutboundConnection.readTimedOutMessage); if (response.startsWith('data:')) { String status = response.replaceFirst('data:', '').replaceAll('\n', ''); @@ -888,7 +888,7 @@ Future getNotifyStatus( } Future retryCommandUntilMatchOrTimeout( - e2e.SimpleOutboundSocketHandler sh, + e2e.SimpleOutboundConnection sh, String command, String shouldContain, int timeoutMillis) async { @@ -909,7 +909,7 @@ Future retryCommandUntilMatchOrTimeout( log: false, timeoutMillis: loopDelay, throwTimeoutException: false); readTimedOut = - (response == e2e.SimpleOutboundSocketHandler.readTimedOutMessage); + (response == e2e.SimpleOutboundConnection.readTimedOutMessage); if (readTimedOut) { continue; } diff --git a/tests/at_end2end_test/test/plookup_verb_test.dart b/tests/at_end2end_test/test/plookup_verb_test.dart index 39993af88..1880de6c7 100644 --- a/tests/at_end2end_test/test/plookup_verb_test.dart +++ b/tests/at_end2end_test/test/plookup_verb_test.dart @@ -7,10 +7,10 @@ import 'e2e_test_utils.dart' as e2e; void main() { late String atSign_1; - late e2e.SimpleOutboundSocketHandler sh1; + late e2e.SimpleOutboundConnection sh1; late String atSign_2; - late e2e.SimpleOutboundSocketHandler sh2; + late e2e.SimpleOutboundConnection sh2; setUpAll(() async { List atSigns = e2e.knownAtSigns(); diff --git a/tests/at_end2end_test/test/stats_verb_test.dart b/tests/at_end2end_test/test/stats_verb_test.dart index 7fcf5bf3b..1efcd2a41 100644 --- a/tests/at_end2end_test/test/stats_verb_test.dart +++ b/tests/at_end2end_test/test/stats_verb_test.dart @@ -8,10 +8,10 @@ import 'notify_verb_test.dart' as notification; void main() { late String atSign_1; - late e2e.SimpleOutboundSocketHandler sh1; + late e2e.SimpleOutboundConnection sh1; late String atSign_2; - late e2e.SimpleOutboundSocketHandler sh2; + late e2e.SimpleOutboundConnection sh2; var lastValue = Random().nextInt(20); @@ -170,7 +170,7 @@ void main() { }); } -Future notificationStats(e2e.SimpleOutboundSocketHandler sh) async { +Future notificationStats(e2e.SimpleOutboundConnection sh) async { await sh.writeCommand('stats:11'); var statsResponse = await sh.read(); print('stats verb response : $statsResponse'); diff --git a/tests/at_end2end_test/test/update_verb_test.dart b/tests/at_end2end_test/test/update_verb_test.dart index 45ad8898d..8612a54e3 100644 --- a/tests/at_end2end_test/test/update_verb_test.dart +++ b/tests/at_end2end_test/test/update_verb_test.dart @@ -3,15 +3,14 @@ import 'dart:math'; import 'package:test/test.dart'; -import 'commons.dart'; import 'e2e_test_utils.dart' as e2e; void main() { late String atSign_1; - late e2e.SimpleOutboundSocketHandler sh1; + late e2e.SimpleOutboundConnection sh1; late String atSign_2; - late e2e.SimpleOutboundSocketHandler sh2; + late e2e.SimpleOutboundConnection sh2; var lastValue = Random().nextInt(20); @@ -77,6 +76,8 @@ void main() { expect(response, contains('data:$value')); //LOOKUP VERB in the other secondary + var maxRetryCount = 10; + var retryCount = 1; while (true) { await sh2.writeCommand('llookup:cached:$atSign_2:youtube_id$atSign_1'); response = await sh2.read(); diff --git a/tools/run_locally/scripts/macos/at_server b/tools/run_locally/scripts/macos/at_server index f7c31edc7..d4764083a 100755 --- a/tools/run_locally/scripts/macos/at_server +++ b/tools/run_locally/scripts/macos/at_server @@ -62,7 +62,7 @@ export accessLogPath="$storageDir/accessLog" export notificationStoragePath="$storageDir/notificationLog.v1" export inbound_max_limit=200 -export logLevel="WARNING" +export logLevel="INFO" export testingMode="true"