From 38692f14a1b6e57eeebad79a163cdb9c6ccf0aef Mon Sep 17 00:00:00 2001 From: gkc Date: Fri, 8 Mar 2024 12:29:37 +0000 Subject: [PATCH 01/22] feat: interim commit --- .../lib/src/server/at_secondary_impl.dart | 114 ++++++--- .../lib/src/server/bootstrapper.dart | 12 +- .../lib/src/server/pseudo_server_socket.dart | 221 ++++++++++++++++++ 3 files changed, 309 insertions(+), 38 deletions(-) create mode 100644 packages/at_secondary_server/lib/src/server/pseudo_server_socket.dart diff --git a/packages/at_secondary_server/lib/src/server/at_secondary_impl.dart b/packages/at_secondary_server/lib/src/server/at_secondary_impl.dart index e3913d628..f3ec641ae 100644 --- a/packages/at_secondary_server/lib/src/server/at_secondary_impl.dart +++ b/packages/at_secondary_server/lib/src/server/at_secondary_impl.dart @@ -34,13 +34,15 @@ import 'package:crypton/crypton.dart'; import 'package:uuid/uuid.dart'; import 'package:meta/meta.dart'; +import 'pseudo_server_socket.dart'; + /// [AtSecondaryServerImpl] is a singleton class which implements [AtSecondaryServer] class AtSecondaryServerImpl implements AtSecondaryServer { static final bool? useTLS = AtSecondaryConfig.useTLS; static final AtSecondaryServerImpl _singleton = - AtSecondaryServerImpl._internal(); + AtSecondaryServerImpl._internal(); static final inboundConnectionFactory = - InboundConnectionManager.getInstance(); + InboundConnectionManager.getInstance(); static final String? storagePath = AtSecondaryConfig.storagePath; static final String? commitLogPath = AtSecondaryConfig.commitLogPath; static final String? accessLogPath = AtSecondaryConfig.accessLogPath; @@ -192,9 +194,9 @@ class AtSecondaryServerImpl implements AtSecondaryServer { AtNotificationKeystore.getInstance(), secondaryPersistenceStore); atNotificationCompactionConfig = AtCompactionConfig() ..compactionPercentage = - AtSecondaryConfig.notificationKeyStoreCompactionPercentage! + AtSecondaryConfig.notificationKeyStoreCompactionPercentage! ..compactionFrequencyInMins = - AtSecondaryConfig.notificationKeyStoreCompactionFrequencyMins!; + AtSecondaryConfig.notificationKeyStoreCompactionFrequencyMins!; await notificationKeyStoreCompactionJobInstance .scheduleCompactionJob(atNotificationCompactionConfig); @@ -297,13 +299,12 @@ class AtSecondaryServerImpl implements AtSecondaryServer { // clean up malformed keys from keystore await removeMalformedKeys(); + if (!useTLS!) { + throw AtServerException('Only TLS is supported; useTLS must be true'); + } try { _isRunning = true; - if (useTLS!) { - await _startSecuredServer(); - } else { - await _startUnSecuredServer(); - } + await _startSecuredServer(); } on Exception catch (e, stacktrace) { _isRunning = false; logger.severe('AtSecondaryServer().start exception: ${e.toString()}'); @@ -368,7 +369,7 @@ class AtSecondaryServerImpl implements AtSecondaryServer { logger.finest( 'Subscribing to dynamic changes made to notificationKeystoreCompactionFreq'); AtSecondaryConfig.subscribe( - ModifiableConfigs.notificationKeyStoreCompactionFrequencyMins) + ModifiableConfigs.notificationKeyStoreCompactionFrequencyMins) ?.listen((newFrequency) async { await restartCompaction( notificationKeyStoreCompactionJobInstance, @@ -381,7 +382,7 @@ class AtSecondaryServerImpl implements AtSecondaryServer { logger.finest( 'Subscribing to dynamic changes made to accessLogCompactionFreq'); AtSecondaryConfig.subscribe( - ModifiableConfigs.accessLogCompactionFrequencyMins) + ModifiableConfigs.accessLogCompactionFrequencyMins) ?.listen((newFrequency) async { await restartCompaction(accessLogCompactionJobInstance, atAccessLogCompactionConfig, newFrequency, _accessLog); @@ -391,7 +392,7 @@ class AtSecondaryServerImpl implements AtSecondaryServer { logger.finest( 'Subscribing to dynamic changes made to commitLogCompactionFreq'); AtSecondaryConfig.subscribe( - ModifiableConfigs.commitLogCompactionFrequencyMins) + ModifiableConfigs.commitLogCompactionFrequencyMins) ?.listen((newFrequency) async { await restartCompaction(commitLogCompactionJobInstance, atCommitLogCompactionConfig, newFrequency, _commitLog); @@ -436,27 +437,76 @@ class AtSecondaryServerImpl implements AtSecondaryServer { } } + void onWebSocketData(data) { + logger.info('WebSocket received: $data'); + } + /// Listens on the secondary server socket and creates an inbound connection to server socket from client socket /// Throws [AtConnection] if unable to create a connection /// Throws [SocketException] for exceptions on socket /// Throws [Exception] for any other exceptions. /// @param - ServerSocket - void _listen(var serverSocket) { + void _listen(final serverSocket) { + final wssb = PseudoServerSocket(serverSocket); + HttpServer httpServer = HttpServer.listenOn(wssb); + httpServer.listen((HttpRequest req) { + if (req.uri.path == '/ws') { + // Upgrade an HttpRequest to a WebSocket connection. + logger.info('Upgraded to WebSocket connection'); + WebSocketTransformer.upgrade(req) + .then((WebSocket socket) => socket.listen(onWebSocketData)); + } else { + logger.info('Got Http Request: ${req.method} ${req.uri}'); + if (req.method.toUpperCase() != 'GET') { + req.response.statusCode = HttpStatus.badRequest; + req.response.close(); + } else { + var lookupKey = req.uri.path.substring(1); + if (!lookupKey.startsWith('public:')) { + lookupKey = 'public:$lookupKey'; + } + if (!lookupKey.endsWith(currentAtSign)) { + lookupKey = '$lookupKey$currentAtSign'; + } + logger.finer('Key to look up: $lookupKey'); + secondaryKeyStore.get(lookupKey)!.then((AtData? value) { + req.response.writeln('Hello there, http client!\n\n' + 'The value stored for ${req.uri} ($lookupKey) is: \n' + '\t data: ${value?.data}\n\n' + '\t metadata: ${value?.metaData}\n'); + req.response.close(); + }, onError: (error) { + req.response.writeln('Hello there, http client!\n\n' + 'No value available for ${req.uri} ($lookupKey)\n'); + req.response.close(); + }); + } + } + }); + logger.finer('serverSocket _listen : ${serverSocket.runtimeType}'); serverSocket.listen(((clientSocket) { var sessionID = '_${Uuid().v4()}'; - InboundConnection? connection; - try { - logger.finer( - 'In _listen - clientSocket.peerCertificate : ${clientSocket.peerCertificate}'); - var inBoundConnectionManager = InboundConnectionManager.getInstance(); - connection = inBoundConnectionManager - .createSocketConnection(clientSocket, sessionId: sessionID); - connection.acceptRequests(_executeVerbCallBack, _streamCallBack); - connection.write('@'); - } on InboundConnectionLimitException catch (e) { - GlobalExceptionHandler.getInstance() - .handle(e, atConnection: connection, clientSocket: clientSocket); + logger.info( + 'New client socket: selectedProtocol ${clientSocket.selectedProtocol}'); + if (clientSocket.selectedProtocol == 'atProtocol/1.0' || + clientSocket.selectedProtocol == null) { + InboundConnection? connection; + try { + logger.finer( + 'In _listen - clientSocket.peerCertificate : ${clientSocket.peerCertificate}'); + var inBoundConnectionManager = InboundConnectionManager.getInstance(); + connection = inBoundConnectionManager + .createSocketConnection(clientSocket, sessionId: sessionID); + connection.acceptRequests(_executeVerbCallBack, _streamCallBack); + connection.write('@'); + } on InboundConnectionLimitException catch (e) { + GlobalExceptionHandler.getInstance() + .handle(e, atConnection: connection, clientSocket: clientSocket); + } + } else { + logger.info('Transferring socket to HttpServer for handling'); + wssb.add(clientSocket); } }), onError: (error) { // We've got no action to take here, let's just log a warning @@ -481,6 +531,8 @@ class AtSecondaryServerImpl implements AtSecondaryServer { secCon.setTrustedCertificates( serverContext!.securityContext!.trustedCertificatePath()); certsAvailable = true; + // secCon.setAlpnProtocols(['atp/1.0', 'h2', 'http/1.1'], true); + secCon.setAlpnProtocols(['atProtocol/1.0', 'http/1.1'], true); } on FileSystemException catch (e) { retryCount++; logger.info('${e.message}:${e.path}'); @@ -501,14 +553,6 @@ class AtSecondaryServerImpl implements AtSecondaryServer { } } - /// Starts the secondary server in un-secure mode and calls the listen method of server socket. - Future _startUnSecuredServer() async { - _serverSocket = - await ServerSocket.bind(InternetAddress.anyIPv4, serverContext!.port); - logger.info('Unsecure Socket open'); - _listen(_serverSocket); - } - ///Accepts the command and the inbound connection and invokes a call to execute method. ///@param - command : Command to process ///@param - connection : The inbound connection to secondary server from client @@ -636,8 +680,8 @@ class AtSecondaryServerImpl implements AtSecondaryServer { // Initialize Secondary Storage var secondaryPersistenceStore = - SecondaryPersistenceStoreFactory.getInstance() - .getSecondaryPersistenceStore(serverContext!.currentAtSign)!; + SecondaryPersistenceStoreFactory.getInstance() + .getSecondaryPersistenceStore(serverContext!.currentAtSign)!; var manager = secondaryPersistenceStore.getHivePersistenceManager()!; await manager.init(storagePath!); // expiringRunFreqMins default is 10 mins. Randomly run the task every 8-15 mins. diff --git a/packages/at_secondary_server/lib/src/server/bootstrapper.dart b/packages/at_secondary_server/lib/src/server/bootstrapper.dart index 2ee50524c..37eef177d 100644 --- a/packages/at_secondary_server/lib/src/server/bootstrapper.dart +++ b/packages/at_secondary_server/lib/src/server/bootstrapper.dart @@ -54,9 +54,15 @@ class SecondaryServerBootStrapper { //prevents secondary from terminating due to uncaught non-fatal errors unawaited(runZonedGuarded(() async { await secondaryServerInstance.start(); - }, (error, stackTrace) { - logger.severe('Uncaught error: $error \n StackTrace: $stackTrace'); - handleTerminateSignal(ProcessSignal.sigstop); + }, (Object error, StackTrace stackTrace) { + if (error is SocketException) { + logger.warning('runZonedGuarded caught a SocketException ($error)'); + logger.warning('we will not terminate the server'); + } else { + logger.shout('runZonedGuarded caught unexpected exception:' + ' $error \n StackTrace: $stackTrace'); + handleTerminateSignal(ProcessSignal.sigstop); + } })); ProcessSignal.sigterm.watch().listen(handleTerminateSignal); ProcessSignal.sigint.watch().listen(handleTerminateSignal); diff --git a/packages/at_secondary_server/lib/src/server/pseudo_server_socket.dart b/packages/at_secondary_server/lib/src/server/pseudo_server_socket.dart new file mode 100644 index 000000000..d1f292cd6 --- /dev/null +++ b/packages/at_secondary_server/lib/src/server/pseudo_server_socket.dart @@ -0,0 +1,221 @@ +import 'dart:async'; +import 'dart:io'; + +import 'package:at_utils/at_utils.dart'; + +class PseudoServerSocket implements ServerSocket { + final AtSignLogger logger = AtSignLogger(' AtServerSocket '); + final SecureServerSocket _serverSocket; + final StreamController sc = + StreamController.broadcast(sync: true); + + PseudoServerSocket(this._serverSocket); + + @override + int get port => _serverSocket.port; + + @override + InternetAddress get address => _serverSocket.address; + + @override + Future close() async { + await sc.close(); + return this; + } + + add(Socket socket) { + logger.info('add was called with socket: $socket'); + sc.add(socket); + } + + // Can ignore everything from this point on, it's just the implementation + // of the Stream interface. + // + // All calls to the Stream methods are implemented by delegating + // the calls to the StreamController's stream + + @override + Future any(bool Function(Socket element) test) { + return sc.stream.any(test); + } + + @override + Stream asBroadcastStream( + {void Function(StreamSubscription subscription)? onListen, + void Function(StreamSubscription subscription)? onCancel}) { + return sc.stream.asBroadcastStream(onListen: onListen, onCancel: onCancel); + } + + @override + Stream asyncExpand(Stream? Function(Socket event) convert) { + return sc.stream.asyncExpand(convert); + } + + @override + Stream asyncMap(FutureOr Function(Socket event) convert) { + return sc.stream.asyncMap(convert); + } + + @override + Stream cast() { + return sc.stream.cast(); + } + + @override + Future contains(Object? needle) { + return sc.stream.contains(needle); + } + + @override + Stream distinct( + [bool Function(Socket previous, Socket next)? equals]) { + return sc.stream.distinct(equals); + } + + @override + Future drain([E? futureValue]) { + return sc.stream.drain(futureValue); + } + + @override + Future elementAt(int index) { + return sc.stream.elementAt(index); + } + + @override + Future every(bool Function(Socket element) test) { + return sc.stream.every(test); + } + + @override + Stream expand(Iterable Function(Socket element) convert) { + return sc.stream.expand(convert); + } + + @override + Future get first => sc.stream.first; + + @override + Future firstWhere(bool Function(Socket element) test, + {Socket Function()? orElse}) { + return sc.stream.firstWhere(test, orElse: orElse); + } + + @override + Future fold( + S initialValue, S Function(S previous, Socket element) combine) { + return sc.stream.fold(initialValue, combine); + } + + @override + Future forEach(void Function(Socket element) action) { + return sc.stream.forEach(action); + } + + @override + Stream handleError(Function onError, + {bool Function(dynamic error)? test}) { + return sc.stream.handleError(onError, test: test); + } + + @override + bool get isBroadcast => sc.stream.isBroadcast; + + @override + Future get isEmpty => sc.stream.isEmpty; + + @override + Future join([String separator = ""]) { + return sc.stream.join(separator); + } + + @override + Future get last => sc.stream.last; + + @override + Future lastWhere(bool Function(Socket element) test, + {Socket Function()? orElse}) { + return sc.stream.lastWhere(test, orElse: orElse); + } + + @override + Future get length => sc.stream.length; + + @override + StreamSubscription listen(void Function(Socket event)? onData, + {Function? onError, void Function()? onDone, bool? cancelOnError}) { + return sc.stream.listen(onData, + onError: onError, onDone: onDone, cancelOnError: cancelOnError); + } + + @override + Stream map(S Function(Socket event) convert) { + return sc.stream.map(convert); + } + + @override + Future pipe(StreamConsumer streamConsumer) { + return sc.stream.pipe(streamConsumer); + } + + @override + Future reduce( + Socket Function(Socket previous, Socket element) combine) { + return sc.stream.reduce(combine); + } + + @override + Future get single => sc.stream.single; + + @override + Future singleWhere(bool Function(Socket element) test, + {Socket Function()? orElse}) { + return sc.stream.singleWhere(test, orElse: orElse); + } + + @override + Stream skip(int count) { + return sc.stream.skip(count); + } + + @override + Stream skipWhile(bool Function(Socket element) test) { + return sc.stream.skipWhile(test); + } + + @override + Stream take(int count) { + return sc.stream.take(count); + } + + @override + Stream takeWhile(bool Function(Socket element) test) { + return sc.stream.takeWhile(test); + } + + @override + Stream timeout(Duration timeLimit, + {void Function(EventSink sink)? onTimeout}) { + return sc.stream.timeout(timeLimit, onTimeout: onTimeout); + } + + @override + Future> toList() { + return sc.stream.toList(); + } + + @override + Future> toSet() { + return sc.stream.toSet(); + } + + @override + Stream transform(StreamTransformer streamTransformer) { + return sc.stream.transform(streamTransformer); + } + + @override + Stream where(bool Function(Socket event) test) { + return sc.stream.where(test); + } +} From 92785215f44289136f797af17444558c9cb11d17 Mon Sep 17 00:00:00 2001 From: gkc Date: Mon, 22 Jul 2024 11:09:18 +0100 Subject: [PATCH 02/22] chore: fix merge issue --- .../at_secondary_server/lib/src/server/at_secondary_impl.dart | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/at_secondary_server/lib/src/server/at_secondary_impl.dart b/packages/at_secondary_server/lib/src/server/at_secondary_impl.dart index 46a63b1c8..0392afa34 100644 --- a/packages/at_secondary_server/lib/src/server/at_secondary_impl.dart +++ b/packages/at_secondary_server/lib/src/server/at_secondary_impl.dart @@ -489,7 +489,7 @@ class AtSecondaryServerImpl implements AtSecondaryServer { }); logger.finer('serverSocket _listen : ${serverSocket.runtimeType}'); - serverSocket.listen(((clientSocket) { + serverSocket.listen(((clientSocket) async { var sessionID = '_${Uuid().v4()}'; logger.info( 'New client socket: selectedProtocol ${clientSocket.selectedProtocol}'); From fd54baf64ab41cab397f56e3fb638458fbd2d1c5 Mon Sep 17 00:00:00 2001 From: gkc Date: Thu, 25 Jul 2024 12:33:42 +0100 Subject: [PATCH 03/22] chore: to eliminate some lint messages following change of Dart minimum version to 3.0.0, update lots of various constructor signatures to use `super.` --- .../src/connection/outbound/outbound_connection.dart | 2 +- .../lib/src/server/at_secondary_config.dart | 2 +- .../src/verb/handler/abstract_update_verb_handler.dart | 8 +++----- .../lib/src/verb/handler/batch_verb_handler.dart | 4 +--- .../lib/src/verb/handler/change_verb_handler.dart | 4 +--- .../lib/src/verb/handler/config_verb_handler.dart | 2 +- .../lib/src/verb/handler/cram_verb_handler.dart | 2 +- .../lib/src/verb/handler/delete_verb_handler.dart | 5 +---- .../lib/src/verb/handler/enroll_verb_handler.dart | 10 +++++----- .../lib/src/verb/handler/from_verb_handler.dart | 2 +- .../lib/src/verb/handler/info_verb_handler.dart | 2 +- .../lib/src/verb/handler/keys_verb_handler.dart | 4 ++-- .../src/verb/handler/local_lookup_verb_handler.dart | 2 +- .../lib/src/verb/handler/lookup_verb_handler.dart | 3 +-- .../lib/src/verb/handler/monitor_verb_handler.dart | 2 +- .../lib/src/verb/handler/noop_verb_handler.dart | 4 ++-- .../lib/src/verb/handler/notify_all_verb_handler.dart | 2 +- .../src/verb/handler/notify_fetch_verb_handler.dart | 2 +- .../lib/src/verb/handler/notify_list_verb_handler.dart | 3 +-- .../src/verb/handler/notify_remove_verb_handler.dart | 2 +- .../src/verb/handler/notify_status_verb_handler.dart | 2 +- .../lib/src/verb/handler/notify_verb_handler.dart | 2 +- .../lib/src/verb/handler/otp_verb_handler.dart | 2 +- .../lib/src/verb/handler/pkam_verb_handler.dart | 3 +-- .../lib/src/verb/handler/pol_verb_handler.dart | 4 +--- .../src/verb/handler/proxy_lookup_verb_handler.dart | 3 +-- .../lib/src/verb/handler/scan_verb_handler.dart | 4 +--- .../lib/src/verb/handler/stats_verb_handler.dart | 3 +-- .../lib/src/verb/handler/stream_verb_handler.dart | 2 +- .../verb/handler/sync_progressive_verb_handler.dart | 2 +- .../lib/src/verb/handler/update_meta_verb_handler.dart | 10 +++++----- .../lib/src/verb/handler/update_verb_handler.dart | 10 +++++----- .../test/abstract_verb_handler_test.dart | 2 +- .../at_secondary_server/test/enroll_verb_test.dart | 2 +- .../test/inbound_connection_pool_test.dart | 5 ++--- 35 files changed, 52 insertions(+), 71 deletions(-) diff --git a/packages/at_secondary_server/lib/src/connection/outbound/outbound_connection.dart b/packages/at_secondary_server/lib/src/connection/outbound/outbound_connection.dart index a0596b0b4..48d19a86b 100644 --- a/packages/at_secondary_server/lib/src/connection/outbound/outbound_connection.dart +++ b/packages/at_secondary_server/lib/src/connection/outbound/outbound_connection.dart @@ -6,7 +6,7 @@ import 'package:at_server_spec/at_server_spec.dart'; // Represent an OutboundConnection to another atServer abstract class OutboundSocketConnection extends BaseSocketConnection { - OutboundSocketConnection(T socket) : super(socket); + OutboundSocketConnection(T super.socket); } /// Metadata information for [OutboundSocketConnection] diff --git a/packages/at_secondary_server/lib/src/server/at_secondary_config.dart b/packages/at_secondary_server/lib/src/server/at_secondary_config.dart index 657947cef..ecafa6146 100644 --- a/packages/at_secondary_server/lib/src/server/at_secondary_config.dart +++ b/packages/at_secondary_server/lib/src/server/at_secondary_config.dart @@ -960,5 +960,5 @@ class ModifiableConfigurationEntry { } class ElementNotFoundException extends AtException { - ElementNotFoundException(message) : super(message); + ElementNotFoundException(super.message); } diff --git a/packages/at_secondary_server/lib/src/verb/handler/abstract_update_verb_handler.dart b/packages/at_secondary_server/lib/src/verb/handler/abstract_update_verb_handler.dart index 026377776..3810a1998 100644 --- a/packages/at_secondary_server/lib/src/verb/handler/abstract_update_verb_handler.dart +++ b/packages/at_secondary_server/lib/src/verb/handler/abstract_update_verb_handler.dart @@ -6,7 +6,6 @@ import 'package:at_commons/at_commons.dart'; import 'package:at_persistence_secondary_server/at_persistence_secondary_server.dart'; import 'package:at_secondary/src/connection/inbound/inbound_connection_metadata.dart'; import 'package:at_secondary/src/notification/notification_manager_impl.dart'; -import 'package:at_secondary/src/notification/stats_notification_service.dart'; import 'package:at_secondary/src/server/at_secondary_config.dart'; import 'package:at_secondary/src/server/at_secondary_impl.dart'; import 'package:at_secondary/src/utils/handler_util.dart'; @@ -22,10 +21,9 @@ abstract class AbstractUpdateVerbHandler extends ChangeVerbHandler { static const int maxKeyLengthWithoutCached = 248; AbstractUpdateVerbHandler( - SecondaryKeyStore keyStore, - StatsNotificationService statsNotificationService, - this.notificationManager) - : super(keyStore, statsNotificationService); + super.keyStore, + super.statsNotificationService, + this.notificationManager); //setter to set autoNotify value from dynamic server config "config:set". //only works when testingMode is set to true diff --git a/packages/at_secondary_server/lib/src/verb/handler/batch_verb_handler.dart b/packages/at_secondary_server/lib/src/verb/handler/batch_verb_handler.dart index cb0f50048..b41e4a0d6 100644 --- a/packages/at_secondary_server/lib/src/verb/handler/batch_verb_handler.dart +++ b/packages/at_secondary_server/lib/src/verb/handler/batch_verb_handler.dart @@ -5,7 +5,6 @@ import 'package:at_secondary/src/exception/global_exception_handler.dart'; import 'package:at_secondary/src/verb/handler/abstract_verb_handler.dart'; import 'package:at_secondary/src/verb/verb_enum.dart'; import 'package:at_server_spec/at_verb_spec.dart'; -import 'package:at_persistence_secondary_server/at_persistence_secondary_server.dart'; import 'package:at_server_spec/at_server_spec.dart'; // BatchVerbHandler is used to process batch of commands @@ -13,8 +12,7 @@ class BatchVerbHandler extends AbstractVerbHandler { static Batch batch = Batch(); final VerbHandlerManager verbHandlerManager; - BatchVerbHandler(SecondaryKeyStore keyStore, this.verbHandlerManager) - : super(keyStore); + BatchVerbHandler(super.keyStore, this.verbHandlerManager); // Method to verify whether command is accepted or not // Input: command diff --git a/packages/at_secondary_server/lib/src/verb/handler/change_verb_handler.dart b/packages/at_secondary_server/lib/src/verb/handler/change_verb_handler.dart index f4edf5a24..8ffdf71fe 100644 --- a/packages/at_secondary_server/lib/src/verb/handler/change_verb_handler.dart +++ b/packages/at_secondary_server/lib/src/verb/handler/change_verb_handler.dart @@ -1,7 +1,6 @@ import 'dart:collection'; import 'package:at_commons/at_commons.dart'; -import 'package:at_persistence_spec/at_persistence_spec.dart'; import 'package:at_secondary/src/notification/stats_notification_service.dart'; import 'package:at_secondary/src/verb/handler/abstract_verb_handler.dart'; import 'package:at_server_spec/at_server_spec.dart'; @@ -11,8 +10,7 @@ import 'package:at_server_spec/at_server_spec.dart'; /// verbHandler to write the commitId to SDK. abstract class ChangeVerbHandler extends AbstractVerbHandler { final StatsNotificationService statsNotificationService; - ChangeVerbHandler(SecondaryKeyStore keyStore, this.statsNotificationService) - : super(keyStore); + ChangeVerbHandler(super.keyStore, this.statsNotificationService); Response? _responseInternal; diff --git a/packages/at_secondary_server/lib/src/verb/handler/config_verb_handler.dart b/packages/at_secondary_server/lib/src/verb/handler/config_verb_handler.dart index c3ebd411e..a34867361 100644 --- a/packages/at_secondary_server/lib/src/verb/handler/config_verb_handler.dart +++ b/packages/at_secondary_server/lib/src/verb/handler/config_verb_handler.dart @@ -27,7 +27,7 @@ import 'package:at_commons/at_commons.dart'; /// class ConfigVerbHandler extends AbstractVerbHandler { static Config config = Config(); - ConfigVerbHandler(SecondaryKeyStore keyStore) : super(keyStore); + ConfigVerbHandler(super.keyStore); late AtConfig atConfigInstance; late ModifiableConfigs? setConfigName; diff --git a/packages/at_secondary_server/lib/src/verb/handler/cram_verb_handler.dart b/packages/at_secondary_server/lib/src/verb/handler/cram_verb_handler.dart index d92ca316e..0e98ccd7f 100644 --- a/packages/at_secondary_server/lib/src/verb/handler/cram_verb_handler.dart +++ b/packages/at_secondary_server/lib/src/verb/handler/cram_verb_handler.dart @@ -13,7 +13,7 @@ import 'package:crypto/crypto.dart'; class CramVerbHandler extends AbstractVerbHandler { static Cram cram = Cram(); - CramVerbHandler(SecondaryKeyStore keyStore) : super(keyStore); + CramVerbHandler(super.keyStore); @override bool accept(String command) => diff --git a/packages/at_secondary_server/lib/src/verb/handler/delete_verb_handler.dart b/packages/at_secondary_server/lib/src/verb/handler/delete_verb_handler.dart index 0c37c52bb..412cdf15f 100644 --- a/packages/at_secondary_server/lib/src/verb/handler/delete_verb_handler.dart +++ b/packages/at_secondary_server/lib/src/verb/handler/delete_verb_handler.dart @@ -4,7 +4,6 @@ import 'package:at_commons/at_commons.dart'; import 'package:at_persistence_secondary_server/at_persistence_secondary_server.dart'; import 'package:at_secondary/src/connection/inbound/inbound_connection_metadata.dart'; import 'package:at_secondary/src/notification/notification_manager_impl.dart'; -import 'package:at_secondary/src/notification/stats_notification_service.dart'; import 'package:at_secondary/src/server/at_secondary_config.dart'; import 'package:at_secondary/src/server/at_secondary_impl.dart'; import 'package:at_secondary/src/utils/secondary_util.dart'; @@ -19,9 +18,7 @@ class DeleteVerbHandler extends ChangeVerbHandler { static bool _autoNotify = AtSecondaryConfig.autoNotify; Set? protectedKeys; - DeleteVerbHandler(SecondaryKeyStore keyStore, - StatsNotificationService statsNotificationService) - : super(keyStore, statsNotificationService); + DeleteVerbHandler(super.keyStore, super.statsNotificationService); //setter to set autoNotify value from dynamic server config "config:set". //only works when testingMode is set to true diff --git a/packages/at_secondary_server/lib/src/verb/handler/enroll_verb_handler.dart b/packages/at_secondary_server/lib/src/verb/handler/enroll_verb_handler.dart index 887ece456..6ce442a4b 100644 --- a/packages/at_secondary_server/lib/src/verb/handler/enroll_verb_handler.dart +++ b/packages/at_secondary_server/lib/src/verb/handler/enroll_verb_handler.dart @@ -39,7 +39,7 @@ class EnrollVerbHandler extends AbstractVerbHandler { seconds: AtSecondaryConfig.enrollmentResponseDelayIntervalInSeconds) .inMilliseconds; - EnrollVerbHandler(SecondaryKeyStore keyStore) : super(keyStore); + EnrollVerbHandler(super.keyStore); @override bool accept(String command) => command.startsWith('enroll:'); @@ -406,11 +406,11 @@ class EnrollVerbHandler extends AbstractVerbHandler { /// Encrypted keys will be used later on by the approving app to send the keys to a new enrolling app Future _storeEncryptionKeys( String newEnrollmentId, EnrollParams enrollParams, String atSign) async { - var privKeyJson = {}; - privKeyJson['value'] = enrollParams.encryptedDefaultEncryptionPrivateKey; + var privateKeyJson = {}; + privateKeyJson['value'] = enrollParams.encryptedDefaultEncryptionPrivateKey; await keyStore.put( '$newEnrollmentId.${AtConstants.defaultEncryptionPrivateKey}.$enrollManageNamespace$atSign', - AtData()..data = jsonEncode(privKeyJson), + AtData()..data = jsonEncode(privateKeyJson), skipCommit: true); var selfKeyJson = {}; selfKeyJson['value'] = enrollParams.encryptedDefaultSelfEncryptionKey; @@ -635,7 +635,7 @@ class EnrollVerbHandler extends AbstractVerbHandler { if (enrollParams.namespaces == null || enrollParams.namespaces!.isEmpty) { throw AtEnrollmentException( - 'atleast one namespace must be specified for new client enroll:request'); + 'At least one namespace must be specified for new client enroll:request'); } } diff --git a/packages/at_secondary_server/lib/src/verb/handler/from_verb_handler.dart b/packages/at_secondary_server/lib/src/verb/handler/from_verb_handler.dart index 292febcfb..3c82d82a7 100644 --- a/packages/at_secondary_server/lib/src/verb/handler/from_verb_handler.dart +++ b/packages/at_secondary_server/lib/src/verb/handler/from_verb_handler.dart @@ -23,7 +23,7 @@ class FromVerbHandler extends AbstractVerbHandler { static final bool? clientCertificateRequired = AtSecondaryConfig.clientCertificateRequired; - FromVerbHandler(SecondaryKeyStore keyStore) : super(keyStore); + FromVerbHandler(super.keyStore); late AtConfig atConfigInstance; diff --git a/packages/at_secondary_server/lib/src/verb/handler/info_verb_handler.dart b/packages/at_secondary_server/lib/src/verb/handler/info_verb_handler.dart index e848469b5..d7e8527cd 100644 --- a/packages/at_secondary_server/lib/src/verb/handler/info_verb_handler.dart +++ b/packages/at_secondary_server/lib/src/verb/handler/info_verb_handler.dart @@ -17,7 +17,7 @@ class InfoVerbHandler extends AbstractVerbHandler { static Info infoVerb = Info(); static int? approximateStartTimeMillis; - InfoVerbHandler(SecondaryKeyStore keyStore) : super(keyStore) { + InfoVerbHandler(super.keyStore) { approximateStartTimeMillis ??= DateTime.now().millisecondsSinceEpoch; } diff --git a/packages/at_secondary_server/lib/src/verb/handler/keys_verb_handler.dart b/packages/at_secondary_server/lib/src/verb/handler/keys_verb_handler.dart index 95023477e..097b03e6a 100644 --- a/packages/at_secondary_server/lib/src/verb/handler/keys_verb_handler.dart +++ b/packages/at_secondary_server/lib/src/verb/handler/keys_verb_handler.dart @@ -14,7 +14,7 @@ import 'package:at_server_spec/at_verb_spec.dart'; class KeysVerbHandler extends AbstractVerbHandler { static Keys keys = Keys(); - KeysVerbHandler(SecondaryKeyStore keyStore) : super(keyStore); + KeysVerbHandler(super.keyStore); @override bool accept(String command) => command.startsWith('keys:'); @@ -130,7 +130,7 @@ class KeysVerbHandler extends AbstractVerbHandler { /// If current enrollment has __manage access then return both __global and __manage keys with visibility [keyVisibility] /// Otherwise return only __global keys with visibility [keyVisibility] - /// Also return the encrypted default encrption private key and encrypted self encryption key for enrollmentId [enrollIdFromMetadata] + /// Also return the encrypted default encryption private key and encrypted self encryption key for enrollmentId [enrollIdFromMetadata] Future> _getFilteredKeys(String? keyVisibility, bool hasManageAccess, String enrollIdFromMetadata, String atSign) async { final result = keyVisibility != null && keyVisibility.isNotEmpty diff --git a/packages/at_secondary_server/lib/src/verb/handler/local_lookup_verb_handler.dart b/packages/at_secondary_server/lib/src/verb/handler/local_lookup_verb_handler.dart index f4882dbe1..69c9d7967 100644 --- a/packages/at_secondary_server/lib/src/verb/handler/local_lookup_verb_handler.dart +++ b/packages/at_secondary_server/lib/src/verb/handler/local_lookup_verb_handler.dart @@ -13,7 +13,7 @@ import 'package:at_utils/at_utils.dart'; class LocalLookupVerbHandler extends AbstractVerbHandler { static LocalLookup llookup = LocalLookup(); - LocalLookupVerbHandler(SecondaryKeyStore keyStore) : super(keyStore); + LocalLookupVerbHandler(super.keyStore); @override bool accept(String command) => diff --git a/packages/at_secondary_server/lib/src/verb/handler/lookup_verb_handler.dart b/packages/at_secondary_server/lib/src/verb/handler/lookup_verb_handler.dart index 01db1998d..c0275ac22 100644 --- a/packages/at_secondary_server/lib/src/verb/handler/lookup_verb_handler.dart +++ b/packages/at_secondary_server/lib/src/verb/handler/lookup_verb_handler.dart @@ -21,8 +21,7 @@ class LookupVerbHandler extends AbstractVerbHandler { final AtCacheManager cacheManager; LookupVerbHandler( - SecondaryKeyStore keyStore, this.outboundClientManager, this.cacheManager) - : super(keyStore); + super.keyStore, this.outboundClientManager, this.cacheManager); @override bool accept(String command) => diff --git a/packages/at_secondary_server/lib/src/verb/handler/monitor_verb_handler.dart b/packages/at_secondary_server/lib/src/verb/handler/monitor_verb_handler.dart index 3f21973a9..9e547cdf0 100644 --- a/packages/at_secondary_server/lib/src/verb/handler/monitor_verb_handler.dart +++ b/packages/at_secondary_server/lib/src/verb/handler/monitor_verb_handler.dart @@ -16,7 +16,7 @@ class MonitorVerbHandler extends AbstractVerbHandler { late String regex; - MonitorVerbHandler(SecondaryKeyStore keyStore) : super(keyStore); + MonitorVerbHandler(super.keyStore); Notification notification = Notification.empty(); diff --git a/packages/at_secondary_server/lib/src/verb/handler/noop_verb_handler.dart b/packages/at_secondary_server/lib/src/verb/handler/noop_verb_handler.dart index 3d9d9a1c8..15c0fab31 100644 --- a/packages/at_secondary_server/lib/src/verb/handler/noop_verb_handler.dart +++ b/packages/at_secondary_server/lib/src/verb/handler/noop_verb_handler.dart @@ -3,11 +3,11 @@ import 'package:at_commons/at_commons.dart'; import 'package:at_server_spec/at_server_spec.dart'; import 'abstract_verb_handler.dart'; import 'package:at_server_spec/at_verb_spec.dart'; -import 'package:at_persistence_secondary_server/at_persistence_secondary_server.dart'; class NoOpVerbHandler extends AbstractVerbHandler { static NoOp noOpVerb = NoOp(); - NoOpVerbHandler(SecondaryKeyStore keyStore) : super(keyStore); + + NoOpVerbHandler(super.keyStore); @override bool accept(String command) => command.startsWith('noop:'); diff --git a/packages/at_secondary_server/lib/src/verb/handler/notify_all_verb_handler.dart b/packages/at_secondary_server/lib/src/verb/handler/notify_all_verb_handler.dart index 7403be847..01a5b3f54 100644 --- a/packages/at_secondary_server/lib/src/verb/handler/notify_all_verb_handler.dart +++ b/packages/at_secondary_server/lib/src/verb/handler/notify_all_verb_handler.dart @@ -18,7 +18,7 @@ import 'abstract_verb_handler.dart'; class NotifyAllVerbHandler extends AbstractVerbHandler { static NotifyAll notifyAll = NotifyAll(); - NotifyAllVerbHandler(SecondaryKeyStore keyStore) : super(keyStore); + NotifyAllVerbHandler(super.keyStore); @override bool accept(String command) => diff --git a/packages/at_secondary_server/lib/src/verb/handler/notify_fetch_verb_handler.dart b/packages/at_secondary_server/lib/src/verb/handler/notify_fetch_verb_handler.dart index 16d5fd77f..0abed1bd2 100644 --- a/packages/at_secondary_server/lib/src/verb/handler/notify_fetch_verb_handler.dart +++ b/packages/at_secondary_server/lib/src/verb/handler/notify_fetch_verb_handler.dart @@ -13,7 +13,7 @@ import 'package:at_server_spec/at_verb_spec.dart'; class NotifyFetchVerbHandler extends AbstractVerbHandler { static NotifyFetch notifyFetch = NotifyFetch(); - NotifyFetchVerbHandler(SecondaryKeyStore keyStore) : super(keyStore); + NotifyFetchVerbHandler(super.keyStore); @override bool accept(String command) => diff --git a/packages/at_secondary_server/lib/src/verb/handler/notify_list_verb_handler.dart b/packages/at_secondary_server/lib/src/verb/handler/notify_list_verb_handler.dart index 968e389ac..e23fc110a 100644 --- a/packages/at_secondary_server/lib/src/verb/handler/notify_list_verb_handler.dart +++ b/packages/at_secondary_server/lib/src/verb/handler/notify_list_verb_handler.dart @@ -17,8 +17,7 @@ class NotifyListVerbHandler extends AbstractVerbHandler { static NotifyList notifyList = NotifyList(); final OutboundClientManager outboundClientManager; - NotifyListVerbHandler(SecondaryKeyStore keyStore, this.outboundClientManager) - : super(keyStore); + NotifyListVerbHandler(super.keyStore, this.outboundClientManager); @override bool accept(String command) => diff --git a/packages/at_secondary_server/lib/src/verb/handler/notify_remove_verb_handler.dart b/packages/at_secondary_server/lib/src/verb/handler/notify_remove_verb_handler.dart index 2c074c202..ae0c52d87 100644 --- a/packages/at_secondary_server/lib/src/verb/handler/notify_remove_verb_handler.dart +++ b/packages/at_secondary_server/lib/src/verb/handler/notify_remove_verb_handler.dart @@ -11,7 +11,7 @@ import 'package:at_server_spec/at_verb_spec.dart'; class NotifyRemoveVerbHandler extends AbstractVerbHandler { static NotifyRemove notifyRemove = NotifyRemove(); - NotifyRemoveVerbHandler(SecondaryKeyStore keyStore) : super(keyStore); + NotifyRemoveVerbHandler(super.keyStore); @override bool accept(String command) => command.startsWith('notify:remove:'); diff --git a/packages/at_secondary_server/lib/src/verb/handler/notify_status_verb_handler.dart b/packages/at_secondary_server/lib/src/verb/handler/notify_status_verb_handler.dart index ea4bf8b51..c383aa0ef 100644 --- a/packages/at_secondary_server/lib/src/verb/handler/notify_status_verb_handler.dart +++ b/packages/at_secondary_server/lib/src/verb/handler/notify_status_verb_handler.dart @@ -11,7 +11,7 @@ import 'package:at_server_spec/at_server_spec.dart'; class NotifyStatusVerbHandler extends AbstractVerbHandler { static NotifyStatus notifyStatus = NotifyStatus(); - NotifyStatusVerbHandler(SecondaryKeyStore keyStore) : super(keyStore); + NotifyStatusVerbHandler(super.keyStore); @override bool accept(String command) => diff --git a/packages/at_secondary_server/lib/src/verb/handler/notify_verb_handler.dart b/packages/at_secondary_server/lib/src/verb/handler/notify_verb_handler.dart index 05b7cffff..2f7e10075 100644 --- a/packages/at_secondary_server/lib/src/verb/handler/notify_verb_handler.dart +++ b/packages/at_secondary_server/lib/src/verb/handler/notify_verb_handler.dart @@ -23,7 +23,7 @@ class NotifyVerbHandler extends AbstractVerbHandler { static Notify notify = Notify(); final int _maxKeyLength = 255; - NotifyVerbHandler(SecondaryKeyStore keyStore) : super(keyStore); + NotifyVerbHandler(super.keyStore); AtNotificationBuilder atNotificationBuilder = AtNotificationBuilder(); diff --git a/packages/at_secondary_server/lib/src/verb/handler/otp_verb_handler.dart b/packages/at_secondary_server/lib/src/verb/handler/otp_verb_handler.dart index 68f6e35bc..8fceeaee3 100644 --- a/packages/at_secondary_server/lib/src/verb/handler/otp_verb_handler.dart +++ b/packages/at_secondary_server/lib/src/verb/handler/otp_verb_handler.dart @@ -19,7 +19,7 @@ class OtpVerbHandler extends AbstractVerbHandler { @visibleForTesting static const Duration defaultOtpExpiry = Duration(minutes: 5); - OtpVerbHandler(SecondaryKeyStore keyStore) : super(keyStore); + OtpVerbHandler(super.keyStore); @override bool accept(String command) => command.startsWith('otp:'); diff --git a/packages/at_secondary_server/lib/src/verb/handler/pkam_verb_handler.dart b/packages/at_secondary_server/lib/src/verb/handler/pkam_verb_handler.dart index b0b398c0f..8180c95dd 100644 --- a/packages/at_secondary_server/lib/src/verb/handler/pkam_verb_handler.dart +++ b/packages/at_secondary_server/lib/src/verb/handler/pkam_verb_handler.dart @@ -4,7 +4,6 @@ import 'dart:typed_data'; import 'package:at_chops/at_chops.dart'; import 'package:at_commons/at_commons.dart'; -import 'package:at_persistence_secondary_server/at_persistence_secondary_server.dart'; import 'package:at_secondary/src/connection/inbound/inbound_connection_metadata.dart'; import 'package:at_secondary/src/constants/enroll_constants.dart'; import 'package:at_secondary/src/enroll/enroll_datastore_value.dart'; @@ -23,7 +22,7 @@ class PkamVerbHandler extends AbstractVerbHandler { static const String _sha512 = 'sha512'; AtChops? atChops; - PkamVerbHandler(SecondaryKeyStore keyStore) : super(keyStore); + PkamVerbHandler(super.keyStore); @override bool accept(String command) => diff --git a/packages/at_secondary_server/lib/src/verb/handler/pol_verb_handler.dart b/packages/at_secondary_server/lib/src/verb/handler/pol_verb_handler.dart index c16a75ee2..a934f05c9 100644 --- a/packages/at_secondary_server/lib/src/verb/handler/pol_verb_handler.dart +++ b/packages/at_secondary_server/lib/src/verb/handler/pol_verb_handler.dart @@ -22,9 +22,7 @@ class PolVerbHandler extends AbstractVerbHandler { final AtCacheManager cacheManager; OutboundClient? _outboundClient; - PolVerbHandler( - SecondaryKeyStore keyStore, this.outboundClientManager, this.cacheManager) - : super(keyStore); + PolVerbHandler(super.keyStore, this.outboundClientManager, this.cacheManager); // Method to verify whether command is accepted or not // Input: command diff --git a/packages/at_secondary_server/lib/src/verb/handler/proxy_lookup_verb_handler.dart b/packages/at_secondary_server/lib/src/verb/handler/proxy_lookup_verb_handler.dart index ff49933db..c710d2c0c 100644 --- a/packages/at_secondary_server/lib/src/verb/handler/proxy_lookup_verb_handler.dart +++ b/packages/at_secondary_server/lib/src/verb/handler/proxy_lookup_verb_handler.dart @@ -20,8 +20,7 @@ class ProxyLookupVerbHandler extends AbstractVerbHandler { final AtCacheManager cacheManager; ProxyLookupVerbHandler( - SecondaryKeyStore keyStore, this.outboundClientManager, this.cacheManager) - : super(keyStore); + super.keyStore, this.outboundClientManager, this.cacheManager); // Method to verify whether command is accepted or not // Input: command diff --git a/packages/at_secondary_server/lib/src/verb/handler/scan_verb_handler.dart b/packages/at_secondary_server/lib/src/verb/handler/scan_verb_handler.dart index c6c2f8d1e..2afd4a367 100644 --- a/packages/at_secondary_server/lib/src/verb/handler/scan_verb_handler.dart +++ b/packages/at_secondary_server/lib/src/verb/handler/scan_verb_handler.dart @@ -2,7 +2,6 @@ import 'dart:collection'; import 'dart:convert'; import 'package:at_commons/at_commons.dart'; -import 'package:at_persistence_secondary_server/at_persistence_secondary_server.dart'; import 'package:at_secondary/src/caching/cache_manager.dart'; import 'package:at_secondary/src/connection/inbound/inbound_connection_metadata.dart'; import 'package:at_secondary/src/connection/outbound/outbound_client_manager.dart'; @@ -23,8 +22,7 @@ class ScanVerbHandler extends AbstractVerbHandler { final AtCacheManager cacheManager; ScanVerbHandler( - SecondaryKeyStore keyStore, this.outboundClientManager, this.cacheManager) - : super(keyStore); + super.keyStore, this.outboundClientManager, this.cacheManager); /// Verifies whether command is accepted or not /// diff --git a/packages/at_secondary_server/lib/src/verb/handler/stats_verb_handler.dart b/packages/at_secondary_server/lib/src/verb/handler/stats_verb_handler.dart index 229c0fbb4..84166f7ac 100644 --- a/packages/at_secondary_server/lib/src/verb/handler/stats_verb_handler.dart +++ b/packages/at_secondary_server/lib/src/verb/handler/stats_verb_handler.dart @@ -4,7 +4,6 @@ import 'dart:collection'; import 'dart:convert'; import 'package:at_commons/at_commons.dart'; -import 'package:at_persistence_secondary_server/at_persistence_secondary_server.dart'; import 'package:at_secondary/src/connection/inbound/inbound_connection_metadata.dart'; import 'package:at_secondary/src/constants/enroll_constants.dart'; import 'package:at_secondary/src/server/at_secondary_impl.dart'; @@ -98,7 +97,7 @@ class StatsVerbHandler extends AbstractVerbHandler { dynamic _regex; - StatsVerbHandler(SecondaryKeyStore keyStore) : super(keyStore); + StatsVerbHandler(super.keyStore); // Method to verify whether command is accepted or not // Input: command diff --git a/packages/at_secondary_server/lib/src/verb/handler/stream_verb_handler.dart b/packages/at_secondary_server/lib/src/verb/handler/stream_verb_handler.dart index cc9641530..bc3bdfd80 100644 --- a/packages/at_secondary_server/lib/src/verb/handler/stream_verb_handler.dart +++ b/packages/at_secondary_server/lib/src/verb/handler/stream_verb_handler.dart @@ -14,7 +14,7 @@ class StreamVerbHandler extends AbstractVerbHandler { InboundConnection? atConnection; - StreamVerbHandler(SecondaryKeyStore keyStore) : super(keyStore); + StreamVerbHandler(super.keyStore); @override bool accept(String command) => command.startsWith(getName(VerbEnum.stream)); diff --git a/packages/at_secondary_server/lib/src/verb/handler/sync_progressive_verb_handler.dart b/packages/at_secondary_server/lib/src/verb/handler/sync_progressive_verb_handler.dart index 68e5d0521..c559f7a29 100644 --- a/packages/at_secondary_server/lib/src/verb/handler/sync_progressive_verb_handler.dart +++ b/packages/at_secondary_server/lib/src/verb/handler/sync_progressive_verb_handler.dart @@ -16,7 +16,7 @@ import 'package:meta/meta.dart'; class SyncProgressiveVerbHandler extends AbstractVerbHandler { static SyncFrom syncFrom = SyncFrom(); - SyncProgressiveVerbHandler(SecondaryKeyStore keyStore) : super(keyStore); + SyncProgressiveVerbHandler(super.keyStore); /// Represents the size of the sync buffer @visibleForTesting diff --git a/packages/at_secondary_server/lib/src/verb/handler/update_meta_verb_handler.dart b/packages/at_secondary_server/lib/src/verb/handler/update_meta_verb_handler.dart index 897dbc752..230f0df1f 100644 --- a/packages/at_secondary_server/lib/src/verb/handler/update_meta_verb_handler.dart +++ b/packages/at_secondary_server/lib/src/verb/handler/update_meta_verb_handler.dart @@ -1,17 +1,17 @@ import 'dart:collection'; import 'package:at_commons/at_commons.dart'; -import 'package:at_persistence_secondary_server/at_persistence_secondary_server.dart'; -import 'package:at_secondary/src/notification/stats_notification_service.dart'; import 'package:at_secondary/src/verb/handler/abstract_update_verb_handler.dart'; import 'package:at_server_spec/at_server_spec.dart'; class UpdateMetaVerbHandler extends AbstractUpdateVerbHandler { static UpdateMeta updateMeta = UpdateMeta(); - UpdateMetaVerbHandler(SecondaryKeyStore keyStore, - StatsNotificationService statsNotificationService, notificationManager) - : super(keyStore, statsNotificationService, notificationManager); + UpdateMetaVerbHandler( + super.keyStore, + super.statsNotificationService, + super.notificationManager, + ); @override bool accept(String command) => command.startsWith('update:meta:'); diff --git a/packages/at_secondary_server/lib/src/verb/handler/update_verb_handler.dart b/packages/at_secondary_server/lib/src/verb/handler/update_verb_handler.dart index f2997c7e2..6fef38961 100644 --- a/packages/at_secondary_server/lib/src/verb/handler/update_verb_handler.dart +++ b/packages/at_secondary_server/lib/src/verb/handler/update_verb_handler.dart @@ -1,8 +1,6 @@ import 'dart:collection'; import 'package:at_commons/at_commons.dart'; -import 'package:at_persistence_secondary_server/at_persistence_secondary_server.dart'; -import 'package:at_secondary/src/notification/stats_notification_service.dart'; import 'package:at_secondary/src/verb/handler/abstract_update_verb_handler.dart'; import 'package:at_server_spec/at_server_spec.dart'; import 'package:at_server_spec/at_verb_spec.dart'; @@ -13,9 +11,11 @@ import 'package:at_server_spec/at_verb_spec.dart'; class UpdateVerbHandler extends AbstractUpdateVerbHandler { static Update update = Update(); - UpdateVerbHandler(SecondaryKeyStore keyStore, - StatsNotificationService statsNotificationService, notificationManager) - : super(keyStore, statsNotificationService, notificationManager); + UpdateVerbHandler( + super.keyStore, + super.statsNotificationService, + super.notificationManager, + ); // Method to verify whether command is accepted or not // Input: command diff --git a/packages/at_secondary_server/test/abstract_verb_handler_test.dart b/packages/at_secondary_server/test/abstract_verb_handler_test.dart index 9a3bc0c9f..ed718928b 100644 --- a/packages/at_secondary_server/test/abstract_verb_handler_test.dart +++ b/packages/at_secondary_server/test/abstract_verb_handler_test.dart @@ -83,7 +83,7 @@ void main() { } class TestUpdateVerbHandler extends AbstractVerbHandler { - TestUpdateVerbHandler(SecondaryKeyStore keyStore) : super(keyStore); + TestUpdateVerbHandler(super.keyStore); @override bool accept(String command) { diff --git a/packages/at_secondary_server/test/enroll_verb_test.dart b/packages/at_secondary_server/test/enroll_verb_test.dart index 113410230..6ae786aaa 100644 --- a/packages/at_secondary_server/test/enroll_verb_test.dart +++ b/packages/at_secondary_server/test/enroll_verb_test.dart @@ -1413,7 +1413,7 @@ void main() { throwsA(predicate((e) => e is AtEnrollmentException && e.message == - 'atleast one namespace must be specified for new client enroll:request'))); + 'At least one namespace must be specified for new client enroll:request'))); }); test('A test to validate enrollmentId is mandatory for enroll:approve', () async { diff --git a/packages/at_secondary_server/test/inbound_connection_pool_test.dart b/packages/at_secondary_server/test/inbound_connection_pool_test.dart index e3297c37d..5b72b7c6a 100644 --- a/packages/at_secondary_server/test/inbound_connection_pool_test.dart +++ b/packages/at_secondary_server/test/inbound_connection_pool_test.dart @@ -289,9 +289,8 @@ int calcActualAllowableIdleTime( } class MockInboundConnectionImpl extends InboundConnectionImpl { - MockInboundConnectionImpl(Socket socket, String sessionId) - : super(socket, sessionId, - owningPool: InboundConnectionPool.getInstance()); + MockInboundConnectionImpl(super.socket, String super.sessionId) + : super(owningPool: InboundConnectionPool.getInstance()); @override Future close() async { From ce14b256cee43f5bf9c026bb6d4ddb27312a5989 Mon Sep 17 00:00:00 2001 From: gkc Date: Thu, 25 Jul 2024 13:09:57 +0100 Subject: [PATCH 04/22] docs: add some code comments to explain how the ALPN support works --- .../lib/src/server/at_secondary_impl.dart | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/packages/at_secondary_server/lib/src/server/at_secondary_impl.dart b/packages/at_secondary_server/lib/src/server/at_secondary_impl.dart index 0392afa34..d6bef755f 100644 --- a/packages/at_secondary_server/lib/src/server/at_secondary_impl.dart +++ b/packages/at_secondary_server/lib/src/server/at_secondary_impl.dart @@ -451,8 +451,15 @@ class AtSecondaryServerImpl implements AtSecondaryServer { /// Throws [Exception] for any other exceptions. /// @param - ServerSocket void _listen(final serverSocket) { - final wssb = PseudoServerSocket(serverSocket); - HttpServer httpServer = HttpServer.listenOn(wssb); + // ALPN support. + // First, make a PseudoServerSocket to which we will pass sockets which + // have a selectedProtocol which is neither null nor 'atProtocol/1.0'. + // See later in this method for where we pass sockets received on the real + // serverSocket to the pseudoServerSocket. + final pseudoServerSocket = PseudoServerSocket(serverSocket); + // Second, make an HttpServer which is handling sockets which are passed + // to the pseudoServerSocket + HttpServer httpServer = HttpServer.listenOn(pseudoServerSocket); httpServer.listen((HttpRequest req) { if (req.uri.path == '/ws') { // Upgrade an HttpRequest to a WebSocket connection. @@ -509,8 +516,10 @@ class AtSecondaryServerImpl implements AtSecondaryServer { .handle(e, atConnection: connection, clientSocket: clientSocket); } } else { + // ALPN support + // selectedProtocol is neither null nor 'atProtocol/1.0' logger.info('Transferring socket to HttpServer for handling'); - wssb.add(clientSocket); + pseudoServerSocket.add(clientSocket); } }), onError: (error) { // We've got no action to take here, let's just log a warning From 1e7e9aa38875131e992189ee3b28063635c7a80e Mon Sep 17 00:00:00 2001 From: gkc Date: Thu, 25 Jul 2024 13:57:58 +0100 Subject: [PATCH 05/22] make a webSocketListener function in at_secondary_impl; add a new method, createWebSocketConnection, to InboundConnectionManager; modify global exception handler so it can deal with WebSockets as well as Sockets for the InboundConnectionLimitException handling --- .../inbound/inbound_connection_manager.dart | 26 +++++++++++++++++-- .../exception/global_exception_handler.dart | 23 ++++++++-------- .../lib/src/server/at_secondary_impl.dart | 21 ++++++++++----- 3 files changed, 50 insertions(+), 20 deletions(-) diff --git a/packages/at_secondary_server/lib/src/connection/inbound/inbound_connection_manager.dart b/packages/at_secondary_server/lib/src/connection/inbound/inbound_connection_manager.dart index 6a9e2dee9..15717493a 100644 --- a/packages/at_secondary_server/lib/src/connection/inbound/inbound_connection_manager.dart +++ b/packages/at_secondary_server/lib/src/connection/inbound/inbound_connection_manager.dart @@ -23,7 +23,7 @@ class InboundConnectionManager implements AtConnectionFactory { return _singleton; } - /// Creates and adds [InboundConnection] to the pool + /// Creates and adds an [InboundConnectionImpl] to the pool /// If the pool is not initialized, initializes the pool with [defaultPoolSize] /// @param socket - client socket /// @param sessionId - current sessionId @@ -41,10 +41,32 @@ class InboundConnectionManager implements AtConnectionFactory { var atConnection = InboundConnectionImpl(socket, sessionId, owningPool: _pool); _pool.add(atConnection); - true; + return atConnection; } + /// Creates and adds an [InboundWebSocketConnection] to the pool + /// If the pool is not initialized, initializes the pool with [defaultPoolSize] + /// @param socket - client socket + /// @param sessionId - current sessionId + /// Throws a [InboundConnectionLimitException] if pool doesn't have capacity + @override + InboundConnection createWebSocketConnection(WebSocket socket, {String? sessionId}) { + if (!_isInitialized) { + init(defaultPoolSize); + } + if (!hasCapacity()) { + throw InboundConnectionLimitException( + 'max limit reached on inbound pool'); + } + sessionId ??= '_${Uuid().v4()}'; + + throw UnimplementedError('not yet implemented'); + // _pool.add(atConnection); + // + // return atConnection; + } + bool hasCapacity() { _pool.clearInvalidConnections(); return _pool.hasCapacity(); diff --git a/packages/at_secondary_server/lib/src/exception/global_exception_handler.dart b/packages/at_secondary_server/lib/src/exception/global_exception_handler.dart index 1dd49986a..607c149a5 100644 --- a/packages/at_secondary_server/lib/src/exception/global_exception_handler.dart +++ b/packages/at_secondary_server/lib/src/exception/global_exception_handler.dart @@ -25,7 +25,7 @@ class GlobalExceptionHandler { /// params: AtException, AtConnection Future handle(Exception exception, {AtConnection? atConnection, - Socket? clientSocket, + dynamic clientSocket, StackTrace? stackTrace}) async { if (exception is InvalidAtSignException || exception is BufferOverFlowException || @@ -49,9 +49,16 @@ class GlobalExceptionHandler { await _sendResponseForException(exception, atConnection); _closeConnection(atConnection); } else if (exception is InboundConnectionLimitException) { - // This requires different handling which is in _handleInboundLimit - logger.info(exception.toString()); - await _handleInboundLimit(exception, clientSocket!); + if (clientSocket == null) { + logger.severe('handling InboundConnectionLimitException,' + ' but clientSocket parameter was null'); + } else { + logger.info(exception.toString()); + var errorCode = getErrorCode(exception); + var errorDescription = getErrorDescription(errorCode); + clientSocket.add('error:$errorCode-$errorDescription\n'.codeUnits); + await clientSocket.close(); + } } else if (exception is ServerIsPausedException) { // This is thrown when a new verb request comes in and the server is paused (likely // pending restart) @@ -84,14 +91,6 @@ class GlobalExceptionHandler { } } - Future _handleInboundLimit( - AtException exception, Socket clientSocket) async { - var errorCode = getErrorCode(exception); - var errorDescription = getErrorDescription(errorCode); - clientSocket.write('error:$errorCode-$errorDescription\n'); - await clientSocket.close(); - } - /// Method to close connection. /// params: AtConnection /// This will close the connection and remove it from pool diff --git a/packages/at_secondary_server/lib/src/server/at_secondary_impl.dart b/packages/at_secondary_server/lib/src/server/at_secondary_impl.dart index d6bef755f..e9b9836dd 100644 --- a/packages/at_secondary_server/lib/src/server/at_secondary_impl.dart +++ b/packages/at_secondary_server/lib/src/server/at_secondary_impl.dart @@ -441,10 +441,20 @@ class AtSecondaryServerImpl implements AtSecondaryServer { } } - void onWebSocketData(data) { - logger.info('WebSocket received: $data'); - } + webSocketListener(WebSocket ws) async { + InboundConnection? connection; + try { + var inBoundConnectionManager = InboundConnectionManager.getInstance(); + connection = inBoundConnectionManager.createWebSocketConnection(ws, + sessionId: '_${Uuid().v4()}'); + connection.acceptRequests(_executeVerbCallBack, _streamCallBack); + await connection.write('@'); + } on InboundConnectionLimitException catch (e) { + await GlobalExceptionHandler.getInstance() + .handle(e, atConnection: connection, clientSocket: ws); + } + } /// Listens on the secondary server socket and creates an inbound connection to server socket from client socket /// Throws [AtConnection] if unable to create a connection /// Throws [SocketException] for exceptions on socket @@ -465,7 +475,7 @@ class AtSecondaryServerImpl implements AtSecondaryServer { // Upgrade an HttpRequest to a WebSocket connection. logger.info('Upgraded to WebSocket connection'); WebSocketTransformer.upgrade(req) - .then((WebSocket socket) => socket.listen(onWebSocketData)); + .then((WebSocket ws) => webSocketListener(ws)); } else { logger.info('Got Http Request: ${req.method} ${req.uri}'); if (req.method.toUpperCase() != 'GET') { @@ -497,7 +507,6 @@ class AtSecondaryServerImpl implements AtSecondaryServer { logger.finer('serverSocket _listen : ${serverSocket.runtimeType}'); serverSocket.listen(((clientSocket) async { - var sessionID = '_${Uuid().v4()}'; logger.info( 'New client socket: selectedProtocol ${clientSocket.selectedProtocol}'); if (clientSocket.selectedProtocol == 'atProtocol/1.0' || @@ -508,7 +517,7 @@ class AtSecondaryServerImpl implements AtSecondaryServer { 'In _listen - clientSocket.peerCertificate : ${clientSocket.peerCertificate}'); var inBoundConnectionManager = InboundConnectionManager.getInstance(); connection = inBoundConnectionManager - .createSocketConnection(clientSocket, sessionId: sessionID); + .createSocketConnection(clientSocket, sessionId: '_${Uuid().v4()}'); connection.acceptRequests(_executeVerbCallBack, _streamCallBack); await connection.write('@'); } on InboundConnectionLimitException catch (e) { From a8bc5744f21a168052a9b6b04e8c50e3514d6963 Mon Sep 17 00:00:00 2001 From: gkc Date: Thu, 5 Sep 2024 11:19:14 +0100 Subject: [PATCH 06/22] feat: at_secondary_server: add createWebSocketConnection to connection_factory --- .../lib/src/connection/connection_factory.dart | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/packages/at_secondary_server/lib/src/connection/connection_factory.dart b/packages/at_secondary_server/lib/src/connection/connection_factory.dart index 486b1580b..7be2bc55d 100644 --- a/packages/at_secondary_server/lib/src/connection/connection_factory.dart +++ b/packages/at_secondary_server/lib/src/connection/connection_factory.dart @@ -2,6 +2,7 @@ import 'dart:io'; import 'package:at_server_spec/at_server_spec.dart'; -abstract class AtConnectionFactory { - T createSocketConnection(Socket socket, {String? sessionId}); +abstract class AtConnectionFactory { + InboundConnection createSocketConnection(Socket socket, {String? sessionId}); + InboundConnection createWebSocketConnection(WebSocket socket, {String? sessionId}); } From 507aad9debd9b314c53de6573d0718e8661b2bd9 Mon Sep 17 00:00:00 2001 From: gkc Date: Wed, 18 Sep 2024 14:20:11 +0100 Subject: [PATCH 07/22] refactor: allow creation of InboundWebSocketConnection with minimal duplicated code --- .../src/connection/connection_factory.dart | 3 +- .../connection/inbound/connection_util.dart | 187 ++++++++++++++++++ .../inbound/inbound_connection_impl.dart | 176 ++--------------- .../inbound/inbound_connection_manager.dart | 3 +- .../inbound_web_socket_connection.dart | 139 +++++++++++++ .../lib/src/server/at_secondary_impl.dart | 7 +- .../test/local_lookup_verb_test.dart | 9 +- .../test/update_verb_test.dart | 8 +- .../at_server_spec/lib/at_server_spec.dart | 2 + 9 files changed, 365 insertions(+), 169 deletions(-) create mode 100644 packages/at_secondary_server/lib/src/connection/inbound/inbound_web_socket_connection.dart diff --git a/packages/at_secondary_server/lib/src/connection/connection_factory.dart b/packages/at_secondary_server/lib/src/connection/connection_factory.dart index 7be2bc55d..dd55f1aa7 100644 --- a/packages/at_secondary_server/lib/src/connection/connection_factory.dart +++ b/packages/at_secondary_server/lib/src/connection/connection_factory.dart @@ -4,5 +4,6 @@ import 'package:at_server_spec/at_server_spec.dart'; abstract class AtConnectionFactory { InboundConnection createSocketConnection(Socket socket, {String? sessionId}); - InboundConnection createWebSocketConnection(WebSocket socket, {String? sessionId}); + InboundConnection createWebSocketConnection(WebSocket socket, + {String? sessionId}); } diff --git a/packages/at_secondary_server/lib/src/connection/inbound/connection_util.dart b/packages/at_secondary_server/lib/src/connection/inbound/connection_util.dart index d57e4b048..20c4b187b 100644 --- a/packages/at_secondary_server/lib/src/connection/inbound/connection_util.dart +++ b/packages/at_secondary_server/lib/src/connection/inbound/connection_util.dart @@ -1,5 +1,192 @@ +import 'dart:collection'; +import 'dart:math'; + +import 'package:at_secondary/src/server/at_secondary_config.dart'; +import 'package:at_secondary/src/server/server_context.dart'; +import 'package:at_server_spec/at_server_spec.dart'; + import 'inbound_connection_pool.dart'; +// ignore: implementation_imports +import 'package:at_server_spec/src/at_rate_limiter/at_rate_limiter.dart'; + +class InboundRateLimiter implements AtRateLimiter { + /// The maximum number of requests allowed within the specified time frame. + @override + late int maxRequestsPerTimeFrame; + + /// The duration of the time frame within which requests are limited. + @override + late int timeFrameInMillis; + + /// A list of timestamps representing the times when requests were made. + late final Queue requestTimestampQueue; + + InboundRateLimiter() { + maxRequestsPerTimeFrame = AtSecondaryConfig.maxEnrollRequestsAllowed; + timeFrameInMillis = AtSecondaryConfig.timeFrameInMills; + requestTimestampQueue = Queue(); + } + + @override + bool isRequestAllowed() { + int currentTimeInMills = DateTime.now().toUtc().millisecondsSinceEpoch; + _checkAndUpdateQueue(currentTimeInMills); + if (requestTimestampQueue.length < maxRequestsPerTimeFrame) { + requestTimestampQueue.addLast(currentTimeInMills); + return true; + } + return false; + } + + /// Checks and updates the request timestamp queue based on the current time. + /// + /// This method removes timestamps from the queue that are older than the specified + /// time window. + /// + /// [currentTimeInMillis] is the current time in milliseconds since epoch. + void _checkAndUpdateQueue(int currentTimeInMillis) { + if (requestTimestampQueue.isEmpty) return; + int calculatedTime = (currentTimeInMillis - requestTimestampQueue.first); + while (calculatedTime >= timeFrameInMillis) { + requestTimestampQueue.removeFirst(); + if (requestTimestampQueue.isEmpty) break; + calculatedTime = (currentTimeInMillis - requestTimestampQueue.first); + } + } +} + +class InboundIdleChecker { + AtSecondaryContext secondaryContext; + InboundConnection connection; + InboundConnectionPool? owningPool; + + InboundIdleChecker(this.secondaryContext, this.connection, this.owningPool) { + lowWaterMarkRatio = secondaryContext.inboundConnectionLowWaterMarkRatio; + progressivelyReduceAllowableInboundIdleTime = + secondaryContext.progressivelyReduceAllowableInboundIdleTime; + + // As number of connections increases then the "allowable" idle time + // reduces from the 'max' towards the 'min' value. + unauthenticatedMaxAllowableIdleTimeMillis = + secondaryContext.unauthenticatedInboundIdleTimeMillis; + unauthenticatedMinAllowableIdleTimeMillis = + secondaryContext.unauthenticatedMinAllowableIdleTimeMillis; + + authenticatedMaxAllowableIdleTimeMillis = + secondaryContext.authenticatedInboundIdleTimeMillis; + authenticatedMinAllowableIdleTimeMillis = + secondaryContext.authenticatedMinAllowableIdleTimeMillis; + } + + /// As number of connections increases then the "allowable" idle time + /// reduces from the 'max' towards the 'min' value. + late int unauthenticatedMaxAllowableIdleTimeMillis; + + /// As number of connections increases then the "allowable" idle time + /// reduces from the 'max' towards the 'min' value. + late int unauthenticatedMinAllowableIdleTimeMillis; + + /// As number of connections increases then the "allowable" idle time + /// reduces from the 'max' towards the 'min' value. + late int authenticatedMaxAllowableIdleTimeMillis; + + /// As number of connections increases then the "allowable" idle time + /// reduces from the 'max' towards the 'min' value. + late int authenticatedMinAllowableIdleTimeMillis; + + late double lowWaterMarkRatio; + late bool progressivelyReduceAllowableInboundIdleTime; + + int calcAllowableIdleTime(double idleTimeReductionFactor, + int minAllowableIdleTimeMillis, int maxAllowableIdleTimeMillis) => + (((maxAllowableIdleTimeMillis - minAllowableIdleTimeMillis) * + idleTimeReductionFactor) + + minAllowableIdleTimeMillis) + .floor(); + + /// Get the idle time of the inbound connection since last write operation + int _getIdleTimeMillis() { + var lastAccessedTime = connection.metaData.lastAccessed; + // if lastAccessedTime is not set, use created time + lastAccessedTime ??= connection.metaData.created; + var currentTime = DateTime.now().toUtc(); + return currentTime.difference(lastAccessedTime!).inMilliseconds; + } + + /// Returns true if the client's idle time is greater than configured idle time. + /// false otherwise + bool _idleForLongerThanMax() { + var idleTimeMillis = _getIdleTimeMillis(); + if (connection.metaData.isAuthenticated || + connection.metaData.isPolAuthenticated) { + return idleTimeMillis > authenticatedMaxAllowableIdleTimeMillis; + } else { + return idleTimeMillis > unauthenticatedMaxAllowableIdleTimeMillis; + } + } + + bool isInValid() { + // If we don't know our owning pool, OR we've disabled the new logic, just use old logic + if (owningPool == null || + progressivelyReduceAllowableInboundIdleTime == false) { + var retVal = _idleForLongerThanMax(); + return retVal; + } + + // We do know our owning pool, so we'll use fancier logic. + // Unauthenticated connections should be reaped increasingly aggressively as we approach max connections + // Authenticated connections should also be reaped as we approach max connections, but a lot less aggressively + // Ultimately, the caller (e.g. [InboundConnectionManager] decides **whether** to reap or not. + int? poolMaxConnections = owningPool!.getCapacity(); + int lowWaterMark = (poolMaxConnections! * lowWaterMarkRatio).floor(); + int numConnectionsOverLwm = + max(owningPool!.getCurrentSize() - lowWaterMark, 0); + + // We're past the low water mark. Let's use some fancier logic to mark connections invalid increasingly aggressively. + double idleTimeReductionFactor = + 1 - (numConnectionsOverLwm / (poolMaxConnections - lowWaterMark)); + if (!connection.metaData.isAuthenticated && + !connection.metaData.isPolAuthenticated) { + // For **unauthenticated** connections, we deem invalid if idle time is greater than + // ((maxIdleTime - minIdleTime) * (1 - numConnectionsOverLwm / (maxConnections - connectionsLowWaterMark))) + minIdleTime + // + // i.e. as the current number of connections grows past low-water-mark, the tolerated idle time reduces + // Given: Max connections of 50, lwm of 25, max idle time of 605 seconds, min idle time of 5 seconds + // When: current == 25, idle time allowable = (605-5) * (1 - 0/25) + 5 i.e. 600 * 1.0 + 5 i.e. 605 + // When: current == 40, idle time allowable = (605-5) * (1 - 15/25) + 5 i.e. 600 * 0.4 + 5 i.e. 245 + // When: current == 49, idle time allowable = (605-5) * (1 - 24/25) + 5 i.e. 600 * 0.04 + 5 i.e. 24 + 5 i.e. 29 + // When: current == 50, idle time allowable = (605-5) * (1 - 25/25) + 5 i.e. 600 * 0.0 + 5 i.e. 0 + 5 i.e. 5 + // + // Given: Max connections of 50, lwm of 10, max idle time of 605 seconds, min idle time of 5 seconds + // When: current == 10, idle time allowable = (605-5) * (1 - (10-10)/(50-10)) + 5 i.e. 600 * (1 - 0/40) + 5 i.e. 605 + // When: current == 20, idle time allowable = (605-5) * (1 - (20-10)/(50-10)) + 5 i.e. 600 * (1 - 10/40) + 5 i.e. 455 + // When: current == 30, idle time allowable = (605-5) * (1 - (30-10)/(50-10)) + 5 i.e. 600 * (1 - 20/40) + 5 i.e. 305 + // When: current == 40, idle time allowable = (605-5) * (1 - (40-10)/(50-10)) + 5 i.e. 600 * (1 - 30/40) + 5 i.e. 155 + // When: current == 49, idle time allowable = (605-5) * (1 - (49-10)/(50-10)) + 5 i.e. 600 * (1 - 39/40) + 5 i.e. 600 * .025 + 5 i.e. 20 + // When: current == 50, idle time allowable = (605-5) * (1 - (50-10)/(50-10)) + 5 i.e. 600 * (1 - 40/40) + 5 i.e. 600 * 0 + 5 i.e. 5 + int allowableIdleTime = calcAllowableIdleTime( + idleTimeReductionFactor, + unauthenticatedMinAllowableIdleTimeMillis, + unauthenticatedMaxAllowableIdleTimeMillis); + var actualIdleTime = _getIdleTimeMillis(); + var retVal = actualIdleTime > allowableIdleTime; + return retVal; + } else { + // For authenticated connections + // TODO (1) if the connection has a request in progress, we should never mark it as invalid + // (2) otherwise, we will mark as invalid using same algorithm as above, but using authenticatedMinAllowableIdleTimeMillis + int allowableIdleTime = calcAllowableIdleTime( + idleTimeReductionFactor, + authenticatedMinAllowableIdleTimeMillis, + authenticatedMaxAllowableIdleTimeMillis); + var actualIdleTime = _getIdleTimeMillis(); + var retVal = actualIdleTime > allowableIdleTime; + return retVal; + } + } +} + class ConnectionUtil { /// Returns the number of active monitor connections. static int getMonitorConnectionSize() { diff --git a/packages/at_secondary_server/lib/src/connection/inbound/inbound_connection_impl.dart b/packages/at_secondary_server/lib/src/connection/inbound/inbound_connection_impl.dart index a6750bd7b..6f6eaacd8 100644 --- a/packages/at_secondary_server/lib/src/connection/inbound/inbound_connection_impl.dart +++ b/packages/at_secondary_server/lib/src/connection/inbound/inbound_connection_impl.dart @@ -1,17 +1,15 @@ -import 'dart:collection'; import 'dart:io'; -import 'dart:math'; import 'package:at_secondary/src/connection/base_connection.dart'; import 'package:at_secondary/src/connection/inbound/inbound_connection_metadata.dart'; import 'package:at_secondary/src/connection/inbound/inbound_connection_pool.dart'; import 'package:at_secondary/src/connection/inbound/inbound_message_listener.dart'; -import 'package:at_secondary/src/server/at_secondary_config.dart'; import 'package:at_secondary/src/server/server_context.dart'; import 'package:at_secondary/src/server/at_secondary_impl.dart'; import 'package:at_secondary/src/utils/logging_util.dart'; import 'package:at_server_spec/at_server_spec.dart'; +import 'connection_util.dart'; import 'dummy_inbound_connection.dart'; class InboundConnectionImpl extends BaseSocketConnection @@ -25,35 +23,8 @@ class InboundConnectionImpl extends BaseSocketConnection InboundConnectionPool? owningPool; - /// As number of connections increases then the "allowable" idle time - /// reduces from the 'max' towards the 'min' value. - late int unauthenticatedMaxAllowableIdleTimeMillis; - - /// As number of connections increases then the "allowable" idle time - /// reduces from the 'max' towards the 'min' value. - late int unauthenticatedMinAllowableIdleTimeMillis; - - /// As number of connections increases then the "allowable" idle time - /// reduces from the 'max' towards the 'min' value. - late int authenticatedMaxAllowableIdleTimeMillis; - - /// As number of connections increases then the "allowable" idle time - /// reduces from the 'max' towards the 'min' value. - late int authenticatedMinAllowableIdleTimeMillis; - - late double lowWaterMarkRatio; - late bool progressivelyReduceAllowableInboundIdleTime; - - /// The maximum number of requests allowed within the specified time frame. - @override - late int maxRequestsPerTimeFrame; - - /// The duration of the time frame within which requests are limited. - @override - late int timeFrameInMillis; - - /// A list of timestamps representing the times when requests were made. - late final Queue requestTimestampQueue; + late InboundRateLimiter rateLimiter; + late InboundIdleChecker idleChecker; InboundConnectionImpl(T socket, String? sessionId, {this.owningPool}) : super(socket) { @@ -67,25 +38,8 @@ class InboundConnectionImpl extends BaseSocketConnection // In test harnesses, secondary context may not yet have been set, in which case create a default AtSecondaryContext instance secondaryContext ??= AtSecondaryContext(); - lowWaterMarkRatio = secondaryContext.inboundConnectionLowWaterMarkRatio; - progressivelyReduceAllowableInboundIdleTime = - secondaryContext.progressivelyReduceAllowableInboundIdleTime; - - // As number of connections increases then the "allowable" idle time - // reduces from the 'max' towards the 'min' value. - unauthenticatedMaxAllowableIdleTimeMillis = - secondaryContext.unauthenticatedInboundIdleTimeMillis; - unauthenticatedMinAllowableIdleTimeMillis = - secondaryContext.unauthenticatedMinAllowableIdleTimeMillis; - - authenticatedMaxAllowableIdleTimeMillis = - secondaryContext.authenticatedInboundIdleTimeMillis; - authenticatedMinAllowableIdleTimeMillis = - secondaryContext.authenticatedMinAllowableIdleTimeMillis; - - maxRequestsPerTimeFrame = AtSecondaryConfig.maxEnrollRequestsAllowed; - timeFrameInMillis = AtSecondaryConfig.timeFrameInMills; - requestTimestampQueue = Queue(); + idleChecker = InboundIdleChecker(secondaryContext, this, owningPool); + rateLimiter = InboundRateLimiter(); logger.info(logger.getAtConnectionLogMessage( metaData, @@ -128,89 +82,7 @@ class InboundConnectionImpl extends BaseSocketConnection return true; } - // If we don't know our owning pool, OR we've disabled the new logic, just use old logic - if (owningPool == null || - progressivelyReduceAllowableInboundIdleTime == false) { - var retVal = _idleForLongerThanMax(); - return retVal; - } - - // We do know our owning pool, so we'll use fancier logic. - // Unauthenticated connections should be reaped increasingly aggressively as we approach max connections - // Authenticated connections should also be reaped as we approach max connections, but a lot less aggressively - // Ultimately, the caller (e.g. [InboundConnectionManager] decides **whether** to reap or not. - int? poolMaxConnections = owningPool!.getCapacity(); - int lowWaterMark = (poolMaxConnections! * lowWaterMarkRatio).floor(); - int numConnectionsOverLwm = - max(owningPool!.getCurrentSize() - lowWaterMark, 0); - - // We're past the low water mark. Let's use some fancier logic to mark connections invalid increasingly aggressively. - double idleTimeReductionFactor = - 1 - (numConnectionsOverLwm / (poolMaxConnections - lowWaterMark)); - if (!metaData.isAuthenticated && !metaData.isPolAuthenticated) { - // For **unauthenticated** connections, we deem invalid if idle time is greater than - // ((maxIdleTime - minIdleTime) * (1 - numConnectionsOverLwm / (maxConnections - connectionsLowWaterMark))) + minIdleTime - // - // i.e. as the current number of connections grows past low-water-mark, the tolerated idle time reduces - // Given: Max connections of 50, lwm of 25, max idle time of 605 seconds, min idle time of 5 seconds - // When: current == 25, idle time allowable = (605-5) * (1 - 0/25) + 5 i.e. 600 * 1.0 + 5 i.e. 605 - // When: current == 40, idle time allowable = (605-5) * (1 - 15/25) + 5 i.e. 600 * 0.4 + 5 i.e. 245 - // When: current == 49, idle time allowable = (605-5) * (1 - 24/25) + 5 i.e. 600 * 0.04 + 5 i.e. 24 + 5 i.e. 29 - // When: current == 50, idle time allowable = (605-5) * (1 - 25/25) + 5 i.e. 600 * 0.0 + 5 i.e. 0 + 5 i.e. 5 - // - // Given: Max connections of 50, lwm of 10, max idle time of 605 seconds, min idle time of 5 seconds - // When: current == 10, idle time allowable = (605-5) * (1 - (10-10)/(50-10)) + 5 i.e. 600 * (1 - 0/40) + 5 i.e. 605 - // When: current == 20, idle time allowable = (605-5) * (1 - (20-10)/(50-10)) + 5 i.e. 600 * (1 - 10/40) + 5 i.e. 455 - // When: current == 30, idle time allowable = (605-5) * (1 - (30-10)/(50-10)) + 5 i.e. 600 * (1 - 20/40) + 5 i.e. 305 - // When: current == 40, idle time allowable = (605-5) * (1 - (40-10)/(50-10)) + 5 i.e. 600 * (1 - 30/40) + 5 i.e. 155 - // When: current == 49, idle time allowable = (605-5) * (1 - (49-10)/(50-10)) + 5 i.e. 600 * (1 - 39/40) + 5 i.e. 600 * .025 + 5 i.e. 20 - // When: current == 50, idle time allowable = (605-5) * (1 - (50-10)/(50-10)) + 5 i.e. 600 * (1 - 40/40) + 5 i.e. 600 * 0 + 5 i.e. 5 - int allowableIdleTime = calcAllowableIdleTime( - idleTimeReductionFactor, - unauthenticatedMinAllowableIdleTimeMillis, - unauthenticatedMaxAllowableIdleTimeMillis); - var actualIdleTime = _getIdleTimeMillis(); - var retVal = actualIdleTime > allowableIdleTime; - return retVal; - } else { - // For authenticated connections - // TODO (1) if the connection has a request in progress, we should never mark it as invalid - // (2) otherwise, we will mark as invalid using same algorithm as above, but using authenticatedMinAllowableIdleTimeMillis - int allowableIdleTime = calcAllowableIdleTime( - idleTimeReductionFactor, - authenticatedMinAllowableIdleTimeMillis, - authenticatedMaxAllowableIdleTimeMillis); - var actualIdleTime = _getIdleTimeMillis(); - var retVal = actualIdleTime > allowableIdleTime; - return retVal; - } - } - - int calcAllowableIdleTime(double idleTimeReductionFactor, - int minAllowableIdleTimeMillis, int maxAllowableIdleTimeMillis) => - (((maxAllowableIdleTimeMillis - minAllowableIdleTimeMillis) * - idleTimeReductionFactor) + - minAllowableIdleTimeMillis) - .floor(); - - /// Get the idle time of the inbound connection since last write operation - int _getIdleTimeMillis() { - var lastAccessedTime = metaData.lastAccessed; - // if lastAccessedTime is not set, use created time - lastAccessedTime ??= metaData.created; - var currentTime = DateTime.now().toUtc(); - return currentTime.difference(lastAccessedTime!).inMilliseconds; - } - - /// Returns true if the client's idle time is greater than configured idle time. - /// false otherwise - bool _idleForLongerThanMax() { - var idleTimeMillis = _getIdleTimeMillis(); - if (metaData.isAuthenticated || metaData.isPolAuthenticated) { - return idleTimeMillis > authenticatedMaxAllowableIdleTimeMillis; - } else { - return idleTimeMillis > unauthenticatedMaxAllowableIdleTimeMillis; - } + return idleChecker.isInValid(); } @override @@ -259,29 +131,19 @@ class InboundConnectionImpl extends BaseSocketConnection } @override - bool isRequestAllowed() { - int currentTimeInMills = DateTime.now().toUtc().millisecondsSinceEpoch; - _checkAndUpdateQueue(currentTimeInMills); - if (requestTimestampQueue.length < maxRequestsPerTimeFrame) { - requestTimestampQueue.addLast(currentTimeInMills); - return true; - } - return false; - } + int get maxRequestsPerTimeFrame => rateLimiter.maxRequestsPerTimeFrame; - /// Checks and updates the request timestamp queue based on the current time. - /// - /// This method removes timestamps from the queue that are older than the specified - /// time window. - /// - /// [currentTimeInMillis] is the current time in milliseconds since epoch. - void _checkAndUpdateQueue(int currentTimeInMillis) { - if (requestTimestampQueue.isEmpty) return; - int calculatedTime = (currentTimeInMillis - requestTimestampQueue.first); - while (calculatedTime >= timeFrameInMillis) { - requestTimestampQueue.removeFirst(); - if (requestTimestampQueue.isEmpty) break; - calculatedTime = (currentTimeInMillis - requestTimestampQueue.first); - } + @override + set maxRequestsPerTimeFrame(int i) => rateLimiter.maxRequestsPerTimeFrame = i; + + @override + int get timeFrameInMillis => rateLimiter.timeFrameInMillis; + + @override + set timeFrameInMillis(int i) => rateLimiter.timeFrameInMillis = i; + + @override + bool isRequestAllowed() { + return rateLimiter.isRequestAllowed(); } } diff --git a/packages/at_secondary_server/lib/src/connection/inbound/inbound_connection_manager.dart b/packages/at_secondary_server/lib/src/connection/inbound/inbound_connection_manager.dart index 15717493a..b1c267ec1 100644 --- a/packages/at_secondary_server/lib/src/connection/inbound/inbound_connection_manager.dart +++ b/packages/at_secondary_server/lib/src/connection/inbound/inbound_connection_manager.dart @@ -51,7 +51,8 @@ class InboundConnectionManager implements AtConnectionFactory { /// @param sessionId - current sessionId /// Throws a [InboundConnectionLimitException] if pool doesn't have capacity @override - InboundConnection createWebSocketConnection(WebSocket socket, {String? sessionId}) { + InboundConnection createWebSocketConnection(WebSocket socket, + {String? sessionId}) { if (!_isInitialized) { init(defaultPoolSize); } diff --git a/packages/at_secondary_server/lib/src/connection/inbound/inbound_web_socket_connection.dart b/packages/at_secondary_server/lib/src/connection/inbound/inbound_web_socket_connection.dart new file mode 100644 index 000000000..3d1d4de1c --- /dev/null +++ b/packages/at_secondary_server/lib/src/connection/inbound/inbound_web_socket_connection.dart @@ -0,0 +1,139 @@ +import 'dart:io'; + +import 'package:at_secondary/src/connection/base_connection.dart'; +import 'package:at_secondary/src/connection/inbound/inbound_connection_metadata.dart'; +import 'package:at_secondary/src/connection/inbound/inbound_connection_pool.dart'; +import 'package:at_secondary/src/connection/inbound/inbound_message_listener.dart'; +import 'package:at_secondary/src/server/at_secondary_impl.dart'; +import 'package:at_secondary/src/server/server_context.dart'; +import 'package:at_secondary/src/utils/logging_util.dart'; +import 'package:at_server_spec/at_server_spec.dart'; +import 'package:at_utils/at_utils.dart'; + +import 'connection_util.dart'; + +class InboundWebSocketConnection implements InboundConnection { + WebSocket ws; + + AtSignLogger logger = AtSignLogger('InboundWebSocketConnection'); + + @override + late InboundConnectionMetadata metaData; + + @override + bool? isMonitor = false; + + /// This contains the value of the atsign initiated the connection + @override + String? initiatedBy; + + InboundConnectionPool? owningPool; + + late InboundRateLimiter rateLimiter; + late InboundIdleChecker idleChecker; + + InboundWebSocketConnection(this.ws, String? sessionId, {this.owningPool}) { + metaData = InboundConnectionMetadata() + ..sessionID = sessionId + ..created = DateTime.now().toUtc() + ..isCreated = true; + + AtSecondaryContext? secondaryContext = + AtSecondaryServerImpl.getInstance().serverContext; + // In test harnesses, secondary context may not yet have been set, in which case create a default AtSecondaryContext instance + secondaryContext ??= AtSecondaryContext(); + + idleChecker = InboundIdleChecker(secondaryContext, this, owningPool); + rateLimiter = InboundRateLimiter(); + + logger.info(logger.getAtConnectionLogMessage( + metaData, + 'New connection (' + 'this side: ${underlying.address}:${underlying.port}' + ' remote side: ${underlying.remoteAddress}:${underlying.remotePort}' + ')')); + + ws.done.then((doneValue) { + logger.info('ws.done called. Calling this.close()'); + close(); + }, onError: (error, stackTrace) { + logger.info('ws.done.onError called with $error. Calling this.close()'); + close(); + }); + } + + /// Returns true if the web sockets are identical + @override + bool equals(InboundConnection connection) { + if (connection is! InboundWebSocketConnection) { + return false; + } + + return ws == connection.ws; + } + + /// Returning true indicates to the caller that this connection **can** be closed if needed + @override + bool isInValid() { + if (metaData.isClosed || metaData.isStale) { + return true; + } + + return idleChecker.isInValid(); + } + + @override + void acceptRequests(Function(String, InboundConnection) callback, + Function(List, InboundConnection) streamCallBack) { + var listener = InboundMessageListener(this); + listener.listen(callback, streamCallBack); + } + + bool? isStream; + + @override + Future close() async { + // Some defensive code just in case we accidentally call close multiple times + if (metaData.isClosed) { + return; + } + + try { + logger.info(logger.getAtConnectionLogMessage( + metaData, 'destroying WebSocket $this')); + await ws.close(); + } catch (_) { + // Ignore exception on a connection close + metaData.isStale = true; + } finally { + metaData.isClosed = true; + } + } + + @override + Future write(String data) async { + ws.add(data); + logger.info(logger.getAtConnectionLogMessage( + metaData, 'SENT: ${BaseSocketConnection.truncateForLogging(data)}')); + } + + @override + int get maxRequestsPerTimeFrame => rateLimiter.maxRequestsPerTimeFrame; + + @override + set maxRequestsPerTimeFrame(int i) => rateLimiter.maxRequestsPerTimeFrame = i; + + @override + int get timeFrameInMillis => rateLimiter.timeFrameInMillis; + + @override + set timeFrameInMillis(int i) => rateLimiter.timeFrameInMillis = i; + + @override + bool isRequestAllowed() { + return rateLimiter.isRequestAllowed(); + } + + @override + get underlying => ws; +} diff --git a/packages/at_secondary_server/lib/src/server/at_secondary_impl.dart b/packages/at_secondary_server/lib/src/server/at_secondary_impl.dart index e9b9836dd..bc49695c0 100644 --- a/packages/at_secondary_server/lib/src/server/at_secondary_impl.dart +++ b/packages/at_secondary_server/lib/src/server/at_secondary_impl.dart @@ -453,8 +453,8 @@ class AtSecondaryServerImpl implements AtSecondaryServer { await GlobalExceptionHandler.getInstance() .handle(e, atConnection: connection, clientSocket: ws); } - } + /// Listens on the secondary server socket and creates an inbound connection to server socket from client socket /// Throws [AtConnection] if unable to create a connection /// Throws [SocketException] for exceptions on socket @@ -516,8 +516,9 @@ class AtSecondaryServerImpl implements AtSecondaryServer { logger.info( 'In _listen - clientSocket.peerCertificate : ${clientSocket.peerCertificate}'); var inBoundConnectionManager = InboundConnectionManager.getInstance(); - connection = inBoundConnectionManager - .createSocketConnection(clientSocket, sessionId: '_${Uuid().v4()}'); + connection = inBoundConnectionManager.createSocketConnection( + clientSocket, + sessionId: '_${Uuid().v4()}'); connection.acceptRequests(_executeVerbCallBack, _streamCallBack); await connection.write('@'); } on InboundConnectionLimitException catch (e) { diff --git a/packages/at_secondary_server/test/local_lookup_verb_test.dart b/packages/at_secondary_server/test/local_lookup_verb_test.dart index fcc96c44e..a9841df11 100644 --- a/packages/at_secondary_server/test/local_lookup_verb_test.dart +++ b/packages/at_secondary_server/test/local_lookup_verb_test.dart @@ -332,15 +332,18 @@ void main() { await secondaryKeyStore.put( keyName, AtData()..data = jsonEncode(enrollJson)); // Update a key with buzz namespace - String updateCommand = 'update:atconnections.bob.alice.at_contact.buzz$alice bob'; - HashMap updateVerbParams = getVerbParam(VerbSyntax.update, updateCommand); + String updateCommand = + 'update:atconnections.bob.alice.at_contact.buzz$alice bob'; + HashMap updateVerbParams = + getVerbParam(VerbSyntax.update, updateCommand); UpdateVerbHandler updateVerbHandler = UpdateVerbHandler( secondaryKeyStore, statsNotificationService, notificationManager); await updateVerbHandler.processVerb( response, updateVerbParams, inboundConnection); expect(response.data, isNotNull); // Local Lookup a key with at_contact.buzz namespace - String llookupCommand = 'llookup:atconnections.bob.alice.at_contact.buzz$alice'; + String llookupCommand = + 'llookup:atconnections.bob.alice.at_contact.buzz$alice'; HashMap llookupVerbParams = getVerbParam(VerbSyntax.llookup, llookupCommand); LocalLookupVerbHandler localLookupVerbHandler = diff --git a/packages/at_secondary_server/test/update_verb_test.dart b/packages/at_secondary_server/test/update_verb_test.dart index a8c2fb6b2..e1858d688 100644 --- a/packages/at_secondary_server/test/update_verb_test.dart +++ b/packages/at_secondary_server/test/update_verb_test.dart @@ -1328,7 +1328,8 @@ void main() { var keyName = '$enrollmentId.new.enrollments.__manage@alice'; await secondaryKeyStore.put( keyName, AtData()..data = jsonEncode(enrollJson)); - String updateCommand = 'update:atconnections.bob.alice.at_contact.buzz$alice bob'; + String updateCommand = + 'update:atconnections.bob.alice.at_contact.buzz$alice bob'; HashMap updateVerbParams = getVerbParam(VerbSyntax.update, updateCommand); UpdateVerbHandler updateVerbHandler = UpdateVerbHandler( @@ -1339,7 +1340,7 @@ void main() { expect(response.isError, false); }); - test( + test( 'A test to verify write access is allowed to a key with a at_contact.buzz namespace for an enrollment with at_contact.buzz namespace access', () async { inboundConnection.metadata.isAuthenticated = @@ -1389,8 +1390,7 @@ void main() { var keyName = '$enrollmentId.new.enrollments.__manage@alice'; await secondaryKeyStore.put( keyName, AtData()..data = jsonEncode(enrollJson)); - String updateCommand = - 'update:atconnections.bob.alice.buzz$alice bob'; + String updateCommand = 'update:atconnections.bob.alice.buzz$alice bob'; HashMap updateVerbParams = getVerbParam(VerbSyntax.update, updateCommand); UpdateVerbHandler updateVerbHandler = UpdateVerbHandler( diff --git a/packages/at_server_spec/lib/at_server_spec.dart b/packages/at_server_spec/lib/at_server_spec.dart index 46d1fbb7b..73f0f635c 100644 --- a/packages/at_server_spec/lib/at_server_spec.dart +++ b/packages/at_server_spec/lib/at_server_spec.dart @@ -7,3 +7,5 @@ export 'package:at_server_spec/src/connection/at_connection.dart'; export 'package:at_server_spec/src/connection/inbound_connection.dart'; export 'package:at_server_spec/src/verb/update_meta.dart'; export 'package:at_server_spec/src/verb/verb.dart'; +export 'package:at_server_spec/src/at_rate_limiter/at_rate_limiter.dart'; + From fe52c06781b9ffd816f212bcf3219a3965004d87 Mon Sep 17 00:00:00 2001 From: gkc Date: Wed, 18 Sep 2024 14:22:54 +0100 Subject: [PATCH 08/22] feat: implement createWebSocketConnection in InboundConnectionManager --- .../connection/inbound/inbound_connection_manager.dart | 10 +++++----- .../inbound/inbound_web_socket_connection.dart | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/packages/at_secondary_server/lib/src/connection/inbound/inbound_connection_manager.dart b/packages/at_secondary_server/lib/src/connection/inbound/inbound_connection_manager.dart index b1c267ec1..2ba6d4b57 100644 --- a/packages/at_secondary_server/lib/src/connection/inbound/inbound_connection_manager.dart +++ b/packages/at_secondary_server/lib/src/connection/inbound/inbound_connection_manager.dart @@ -1,6 +1,7 @@ import 'dart:io'; import 'package:at_secondary/src/connection/connection_factory.dart'; import 'package:at_secondary/src/connection/inbound/inbound_connection_impl.dart'; +import 'package:at_secondary/src/connection/inbound/inbound_web_socket_connection.dart'; import 'package:at_server_spec/at_server_spec.dart'; import 'package:uuid/uuid.dart'; import 'package:at_commons/at_commons.dart'; @@ -51,7 +52,7 @@ class InboundConnectionManager implements AtConnectionFactory { /// @param sessionId - current sessionId /// Throws a [InboundConnectionLimitException] if pool doesn't have capacity @override - InboundConnection createWebSocketConnection(WebSocket socket, + InboundConnection createWebSocketConnection(WebSocket ws, {String? sessionId}) { if (!_isInitialized) { init(defaultPoolSize); @@ -61,11 +62,10 @@ class InboundConnectionManager implements AtConnectionFactory { 'max limit reached on inbound pool'); } sessionId ??= '_${Uuid().v4()}'; + var atConnection = InboundWebSocketConnection(ws, sessionId, _pool); + _pool.add(atConnection); - throw UnimplementedError('not yet implemented'); - // _pool.add(atConnection); - // - // return atConnection; + return atConnection; } bool hasCapacity() { diff --git a/packages/at_secondary_server/lib/src/connection/inbound/inbound_web_socket_connection.dart b/packages/at_secondary_server/lib/src/connection/inbound/inbound_web_socket_connection.dart index 3d1d4de1c..7cc9ef304 100644 --- a/packages/at_secondary_server/lib/src/connection/inbound/inbound_web_socket_connection.dart +++ b/packages/at_secondary_server/lib/src/connection/inbound/inbound_web_socket_connection.dart @@ -32,7 +32,7 @@ class InboundWebSocketConnection implements InboundConnection { late InboundRateLimiter rateLimiter; late InboundIdleChecker idleChecker; - InboundWebSocketConnection(this.ws, String? sessionId, {this.owningPool}) { + InboundWebSocketConnection(this.ws, String? sessionId, this.owningPool) { metaData = InboundConnectionMetadata() ..sessionID = sessionId ..created = DateTime.now().toUtc() From ad62ece25436e1d2459779b124bac771d4d8653b Mon Sep 17 00:00:00 2001 From: gkc Date: Wed, 18 Sep 2024 15:37:04 +0100 Subject: [PATCH 09/22] feat: atServer support for atProtocol over websockets : MVP complete --- .../inbound/inbound_message_listener.dart | 14 +++++++++- .../inbound_web_socket_connection.dart | 27 ++++++++++--------- .../lib/src/utils/logging_util.dart | 2 +- 3 files changed, 29 insertions(+), 14 deletions(-) diff --git a/packages/at_secondary_server/lib/src/connection/inbound/inbound_message_listener.dart b/packages/at_secondary_server/lib/src/connection/inbound/inbound_message_listener.dart index f42a649d6..6beb04b76 100644 --- a/packages/at_secondary_server/lib/src/connection/inbound/inbound_message_listener.dart +++ b/packages/at_secondary_server/lib/src/connection/inbound/inbound_message_listener.dart @@ -42,7 +42,19 @@ class InboundMessageListener { /// Handles messages on the inbound client's connection and calls the verb executor /// Closes the inbound connection in case of any error. - Future _messageHandler(data) async { + Future _messageHandler(streamData) async { + logger.finest('_messageHandler received ${streamData.runtimeType}' + ' : $streamData '); + List data; + if (streamData is List) { + data = streamData; + } else if (streamData is String) { + data = streamData.codeUnits; + } else { + logger.severe('Un-handled data type: ${streamData.runtimeType}'); + await _finishedHandler(); + return; + } //ignore the data read if the connection is stale or closed if (connection.metaData.isStale || connection.metaData.isClosed) { //clear buffer as data is redundant diff --git a/packages/at_secondary_server/lib/src/connection/inbound/inbound_web_socket_connection.dart b/packages/at_secondary_server/lib/src/connection/inbound/inbound_web_socket_connection.dart index 7cc9ef304..184e9908b 100644 --- a/packages/at_secondary_server/lib/src/connection/inbound/inbound_web_socket_connection.dart +++ b/packages/at_secondary_server/lib/src/connection/inbound/inbound_web_socket_connection.dart @@ -1,3 +1,4 @@ +import 'dart:async'; import 'dart:io'; import 'package:at_secondary/src/connection/base_connection.dart'; @@ -46,12 +47,8 @@ class InboundWebSocketConnection implements InboundConnection { idleChecker = InboundIdleChecker(secondaryContext, this, owningPool); rateLimiter = InboundRateLimiter(); - logger.info(logger.getAtConnectionLogMessage( - metaData, - 'New connection (' - 'this side: ${underlying.address}:${underlying.port}' - ' remote side: ${underlying.remoteAddress}:${underlying.remotePort}' - ')')); + logger.info( + logger.getAtConnectionLogMessage(metaData, 'New WebSocket ($this)')); ws.done.then((doneValue) { logger.info('ws.done called. Calling this.close()'); @@ -95,26 +92,32 @@ class InboundWebSocketConnection implements InboundConnection { Future close() async { // Some defensive code just in case we accidentally call close multiple times if (metaData.isClosed) { + logger.info('already closed; returning'); return; } + metaData.isClosed = true; + try { logger.info(logger.getAtConnectionLogMessage( - metaData, 'destroying WebSocket $this')); - await ws.close(); + metaData, 'closing WebSocket (readyState ${ws.readyState})')); + try { + await ws.close(); + } catch (e) { + logger.severe('ws.close() exception: $e'); + } + logger.info(logger.getAtConnectionLogMessage( + metaData, 'Closed WebSocket (readyState ${ws.readyState})')); } catch (_) { // Ignore exception on a connection close metaData.isStale = true; - } finally { - metaData.isClosed = true; } } @override Future write(String data) async { ws.add(data); - logger.info(logger.getAtConnectionLogMessage( - metaData, 'SENT: ${BaseSocketConnection.truncateForLogging(data)}')); + logger.info(logger.getAtConnectionLogMessage(metaData, 'SENT: $data')); } @override diff --git a/packages/at_secondary_server/lib/src/utils/logging_util.dart b/packages/at_secondary_server/lib/src/utils/logging_util.dart index ed03ec445..5c1193d44 100644 --- a/packages/at_secondary_server/lib/src/utils/logging_util.dart +++ b/packages/at_secondary_server/lib/src/utils/logging_util.dart @@ -13,7 +13,7 @@ extension AtConnectionMetadataLogging on AtSignLogger { if (atConnectionMetaData.sessionID != null) { stringBuffer.write('${atConnectionMetaData.sessionID?.hashCode}|'); } - stringBuffer.write('$logMsg|'); + stringBuffer.write(logMsg); return stringBuffer.toString(); } From 4ffb9bad2b265807846761ec0f2e6aec26b1ca09 Mon Sep 17 00:00:00 2001 From: gkc Date: Wed, 9 Oct 2024 09:27:34 +0100 Subject: [PATCH 10/22] fix: logging cleanup for http get handling --- .../lib/src/server/at_secondary_impl.dart | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/packages/at_secondary_server/lib/src/server/at_secondary_impl.dart b/packages/at_secondary_server/lib/src/server/at_secondary_impl.dart index bc49695c0..1987bb3b7 100644 --- a/packages/at_secondary_server/lib/src/server/at_secondary_impl.dart +++ b/packages/at_secondary_server/lib/src/server/at_secondary_impl.dart @@ -482,6 +482,7 @@ class AtSecondaryServerImpl implements AtSecondaryServer { req.response.statusCode = HttpStatus.badRequest; req.response.close(); } else { + // TODO URL decoding, need to handle emojis for example var lookupKey = req.uri.path.substring(1); if (!lookupKey.startsWith('public:')) { lookupKey = 'public:$lookupKey'; @@ -491,14 +492,10 @@ class AtSecondaryServerImpl implements AtSecondaryServer { } logger.finer('Key to look up: $lookupKey'); secondaryKeyStore.get(lookupKey)!.then((AtData? value) { - req.response.writeln('Hello there, http client!\n\n' - 'The value stored for ${req.uri} ($lookupKey) is: \n' - '\t data: ${value?.data}\n\n' - '\t metadata: ${value?.metaData}\n'); + req.response.writeln('data:${value?.data}'); req.response.close(); }, onError: (error) { - req.response.writeln('Hello there, http client!\n\n' - 'No value available for ${req.uri} ($lookupKey)\n'); + req.response.writeln('error:no such key $lookupKey'); req.response.close(); }); } From 54a3229d8afecb3b5266190f42a98f2cb3a6c04c Mon Sep 17 00:00:00 2001 From: gkc Date: Tue, 22 Oct 2024 11:05:09 +0100 Subject: [PATCH 11/22] chore: add a TODO --- .../at_secondary_server/lib/src/server/at_secondary_impl.dart | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/at_secondary_server/lib/src/server/at_secondary_impl.dart b/packages/at_secondary_server/lib/src/server/at_secondary_impl.dart index 1987bb3b7..38cc29a2d 100644 --- a/packages/at_secondary_server/lib/src/server/at_secondary_impl.dart +++ b/packages/at_secondary_server/lib/src/server/at_secondary_impl.dart @@ -525,6 +525,7 @@ class AtSecondaryServerImpl implements AtSecondaryServer { } else { // ALPN support // selectedProtocol is neither null nor 'atProtocol/1.0' + // TODO check specifically for http/1.1 logger.info('Transferring socket to HttpServer for handling'); pseudoServerSocket.add(clientSocket); } From 3c65ad9a67a0b93eeef868d03115ef7a8cae4d93 Mon Sep 17 00:00:00 2001 From: purnimavenkatasubbu Date: Wed, 11 Dec 2024 22:01:16 +0530 Subject: [PATCH 12/22] updated changelog and pubspec --- packages/at_secondary_server/CHANGELOG.md | 2 ++ packages/at_secondary_server/pubspec.yaml | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/packages/at_secondary_server/CHANGELOG.md b/packages/at_secondary_server/CHANGELOG.md index 2b155b113..a604c2c13 100644 --- a/packages/at_secondary_server/CHANGELOG.md +++ b/packages/at_secondary_server/CHANGELOG.md @@ -1,3 +1,5 @@ +# 3.1.2 +- feat: Added WebSocket support for inbound connections # 3.1.1 - fix: Store "publicKeyHash" value in the keystore # 3.1.0 diff --git a/packages/at_secondary_server/pubspec.yaml b/packages/at_secondary_server/pubspec.yaml index 1aa03443d..7c3012a00 100644 --- a/packages/at_secondary_server/pubspec.yaml +++ b/packages/at_secondary_server/pubspec.yaml @@ -1,6 +1,6 @@ name: at_secondary description: Implementation of secondary server. -version: 3.1.1 +version: 3.1.2 repository: https://github.com/atsign-foundation/at_server homepage: https://www.example.com publish_to: none From 6deb580efd885043946872a3304ea3e883c59aab Mon Sep 17 00:00:00 2001 From: purnimavenkatasubbu Date: Wed, 11 Dec 2024 22:08:04 +0530 Subject: [PATCH 13/22] fix analyzer issue --- .../src/connection/inbound/inbound_web_socket_connection.dart | 1 - 1 file changed, 1 deletion(-) diff --git a/packages/at_secondary_server/lib/src/connection/inbound/inbound_web_socket_connection.dart b/packages/at_secondary_server/lib/src/connection/inbound/inbound_web_socket_connection.dart index 184e9908b..964955c61 100644 --- a/packages/at_secondary_server/lib/src/connection/inbound/inbound_web_socket_connection.dart +++ b/packages/at_secondary_server/lib/src/connection/inbound/inbound_web_socket_connection.dart @@ -1,7 +1,6 @@ import 'dart:async'; import 'dart:io'; -import 'package:at_secondary/src/connection/base_connection.dart'; import 'package:at_secondary/src/connection/inbound/inbound_connection_metadata.dart'; import 'package:at_secondary/src/connection/inbound/inbound_connection_pool.dart'; import 'package:at_secondary/src/connection/inbound/inbound_message_listener.dart'; From 3ae6a4ffa14da84d1cfef536fc6dbabe11cd0b22 Mon Sep 17 00:00:00 2001 From: gkc Date: Thu, 12 Dec 2024 09:33:35 +0000 Subject: [PATCH 14/22] chore: Added TODOs to add end-to-end tests for the websocket handling --- .github/workflows/at_server.yaml | 6 +++-- .../config/config-e2e_test_runtime.yaml | 2 ++ .../at_end2end_test/test/e2e_test_utils.dart | 25 ++++++++++++++++--- tools/run_locally/scripts/macos/at_server | 2 +- 4 files changed, 29 insertions(+), 6 deletions(-) diff --git a/.github/workflows/at_server.yaml b/.github/workflows/at_server.yaml index 3659c5cf2..09310d715 100644 --- a/.github/workflows/at_server.yaml +++ b/.github/workflows/at_server.yaml @@ -569,12 +569,14 @@ jobs: sed -i "s/ATSIGN_1_NAME/@$AT_SIGN_1/g" tests/at_end2end_test/config/config-e2e_test_runtime.yaml sed -i "s/ATSIGN_1_PORT/$AT_SIGN_1_PORT/g" tests/at_end2end_test/config/config-e2e_test_runtime.yaml sed -i "s/ATSIGN_1_HOST/$AT_SIGN_1_HOST/g" tests/at_end2end_test/config/config-e2e_test_runtime.yaml +# TODO: add something like `sed -i "s/ATSIGN_1_CONNECTION_TYPE/$AT_SIGN_1_CONNECTION_TYPE/g" tests/at_end2end_test/config/config-e2e_test_runtime.yaml` sed -i "s/ATSIGN_2_NAME/@$AT_SIGN_2/g" tests/at_end2end_test/config/config-e2e_test_runtime.yaml sed -i "s/ATSIGN_2_PORT/$AT_SIGN_2_PORT/g" tests/at_end2end_test/config/config-e2e_test_runtime.yaml - sed -i "s/ATSIGN_2_HOST/$AT_SIGN_2_HOST/g" tests/at_end2end_test/config/config-e2e_test_runtime.yaml + sed -i "s/ATSIGN_2_HOST/$AT_SIGN_2_HOST/g" tests/at_end2end_test/config/config-e2e_test_runtime.yaml +# TODO: add something like `sed -i "s/ATSIGN_2_CONNECTION_TYPE/$AT_SIGN_2_CONNECTION_TYPE/g" tests/at_end2end_test/config/config-e2e_test_runtime.yaml` mv tests/at_end2end_test/config/config-e2e_test_runtime.yaml tests/at_end2end_test/config/config.yaml cat tests/at_end2end_test/config/config.yaml - echo "Connection successfull" + echo "Connection successful" break else echo "Connection error on attempt ${try}" diff --git a/tests/at_end2end_test/config/config-e2e_test_runtime.yaml b/tests/at_end2end_test/config/config-e2e_test_runtime.yaml index 55983c78b..a00f5d045 100644 --- a/tests/at_end2end_test/config/config-e2e_test_runtime.yaml +++ b/tests/at_end2end_test/config/config-e2e_test_runtime.yaml @@ -12,6 +12,7 @@ first_atsign_server: first_atsign_port: ATSIGN_1_PORT # # The url to connect to first atServer first_atsign_url: ATSIGN_1_HOST + # TODO add something like `first_atsign_connection_type: ATSIGN_1_CONNECTION_TYPE` second_atsign_server: second_atsign_name: 'ATSIGN_2_NAME' @@ -19,3 +20,4 @@ second_atsign_server: second_atsign_port: ATSIGN_2_PORT # # The url to connect to second atServer second_atsign_url: ATSIGN_2_HOST + # TODO add something like `second_atsign_connection_type: ATSIGN_2_CONNECTION_TYPE` diff --git a/tests/at_end2end_test/test/e2e_test_utils.dart b/tests/at_end2end_test/test/e2e_test_utils.dart index 506327d67..f21f2c9c5 100644 --- a/tests/at_end2end_test/test/e2e_test_utils.dart +++ b/tests/at_end2end_test/test/e2e_test_utils.dart @@ -28,7 +28,11 @@ Future getSocketHandler(atSign) async { if (asc == null) { throw _NoSuchAtSignException('$atSign not configured'); } + // TODO switch on _AtSignConfig.connectionType and create a + // TODO SimpleOutboundSocketConnection or SimpleOutboundWebsocketConnection + // TODO as required var handler = SimpleOutboundSocketHandler._(asc.host, asc.port, atSign); + await handler.connect(); handler.startListening(); await handler.sendFromAndPkam(); @@ -38,6 +42,9 @@ Future getSocketHandler(atSign) async { /// A simple wrapper around a socket for @ protocol communication. class SimpleOutboundSocketHandler { + // TODO Turn this into an abstract base class, e.g. SimpleOutboundConnection + // TODO with two concrete subclasses, e.g. SimpleOutboundSocketConnection + // TODO and SimpleOutboundWebsocketConnection late Queue _queue; final _buffer = ByteBuffer(capacity: 10240000); @@ -195,14 +202,22 @@ class SimpleOutboundSocketHandler { } } } + +enum _ConnectionTypeEnum { + socket, + websocket, +} /// Simple data-holding class which adds its instances into [atSignConfigMap] class _AtSignConfig { String atSign; String host; int port; + _ConnectionTypeEnum connectionType; + // TODO Add connectionType /// Creates and adds to [atSignConfigMap] or throws [_AtSignAlreadyAddedException] if we've already got it. - _AtSignConfig(this.atSign, this.host, this.port) { + _AtSignConfig(this.atSign, this.host, this.port, this.connectionType) { + this.connectionType ??= _ConnectionTypeEnum.socket; if (atSignConfigMap.containsKey(atSign)) { throw _AtSignAlreadyAddedException("AtSignConfig for $atSign has already been created"); } @@ -232,12 +247,16 @@ void _loadTheYaml() { _AtSignConfig( ConfigUtil.getYaml()!['first_atsign_server']['first_atsign_name'], ConfigUtil.getYaml()!['first_atsign_server']['first_atsign_url'], - ConfigUtil.getYaml()!['first_atsign_server']['first_atsign_port']); + ConfigUtil.getYaml()!['first_atsign_server']['first_atsign_port'], + ConfigUtil.getYaml()!['first_atsign_server']['first_atsign_connection_type'], + ); _AtSignConfig( ConfigUtil.getYaml()!['second_atsign_server']['second_atsign_name'], ConfigUtil.getYaml()!['second_atsign_server']['second_atsign_url'], - ConfigUtil.getYaml()!['second_atsign_server']['second_atsign_port']); + ConfigUtil.getYaml()!['second_atsign_server']['second_atsign_port'], + ConfigUtil.getYaml()!['second_atsign_server']['second_atsign_connection_type'], + ); /// TODO Ideally instead of the current config.yaml we'd have yaml like this /// at_sign_configs: diff --git a/tools/run_locally/scripts/macos/at_server b/tools/run_locally/scripts/macos/at_server index f7c31edc7..d4764083a 100755 --- a/tools/run_locally/scripts/macos/at_server +++ b/tools/run_locally/scripts/macos/at_server @@ -62,7 +62,7 @@ export accessLogPath="$storageDir/accessLog" export notificationStoragePath="$storageDir/notificationLog.v1" export inbound_max_limit=200 -export logLevel="WARNING" +export logLevel="INFO" export testingMode="true" From 84419de5fc4ca74e3e771c9bf58bee6c8d6f6fba Mon Sep 17 00:00:00 2001 From: gkc Date: Thu, 12 Dec 2024 09:34:34 +0000 Subject: [PATCH 15/22] chore: removed a TODO which had been implemented --- tests/at_end2end_test/test/e2e_test_utils.dart | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/at_end2end_test/test/e2e_test_utils.dart b/tests/at_end2end_test/test/e2e_test_utils.dart index f21f2c9c5..9862a13c6 100644 --- a/tests/at_end2end_test/test/e2e_test_utils.dart +++ b/tests/at_end2end_test/test/e2e_test_utils.dart @@ -213,7 +213,6 @@ class _AtSignConfig { String host; int port; _ConnectionTypeEnum connectionType; - // TODO Add connectionType /// Creates and adds to [atSignConfigMap] or throws [_AtSignAlreadyAddedException] if we've already got it. _AtSignConfig(this.atSign, this.host, this.port, this.connectionType) { From 810578ea540146185ef491e8b2dc5722ccf6499c Mon Sep 17 00:00:00 2001 From: gkc Date: Thu, 12 Dec 2024 09:58:14 +0000 Subject: [PATCH 16/22] test: removed unused `at_end2end_test/test/commons.dart` chore: removed a TODO --- tests/at_end2end_test/test/commons.dart | 119 ------------------ .../at_end2end_test/test/e2e_test_utils.dart | 19 ++- .../test/update_verb_test.dart | 3 +- 3 files changed, 15 insertions(+), 126 deletions(-) delete mode 100644 tests/at_end2end_test/test/commons.dart diff --git a/tests/at_end2end_test/test/commons.dart b/tests/at_end2end_test/test/commons.dart deleted file mode 100644 index 9905d9a23..000000000 --- a/tests/at_end2end_test/test/commons.dart +++ /dev/null @@ -1,119 +0,0 @@ -import 'dart:collection'; -import 'dart:convert'; -import 'dart:io'; - -import 'package:test/test.dart'; - -import 'pkam_utils.dart'; - -var _queue = Queue(); -var maxRetryCount = 10; -var retryCount = 1; - -///Socket Connection -Future socket_connection(host, port) async { - var context = SecurityContext(); - print(Directory.current); - context.setTrustedCertificates('lib/secondary/base/certs/cacert.pem'); - context.usePrivateKey('lib/secondary/base/certs/privkey.pem'); - context.useCertificateChain('lib/secondary/base/certs/fullchain.pem'); - return await SecureSocket.connect(host, port, context: context); -} - -void clear() { - _queue.clear(); - print('queue cleared'); -} - -///Secure Socket Connection -Future secure_socket_connection(var host, var port) async { - var socket; - while (retryCount < maxRetryCount) { - try { - socket = await SecureSocket.connect(host, port); - if (socket != null) { - break; - } - } on Exception { - print('retrying "$host:$port" for connection.. $retryCount'); - await Future.delayed(Duration(seconds: 5)); - retryCount++; - } - } - return socket; -} - -/// Socket Listener -void socket_listener(Socket socket) { - socket.listen(_messageHandler); -} - -/// Socket write -Future socket_writer(Socket socket, String msg) async { - print('command sent: $msg'); - msg = msg + '\n'; - socket.write(msg); -} - -///The prepare function takes a socket and atsign as input params and runs a from verb and pkam verb on the atsign param. -Future prepare(Socket socket, String atsign) async { - // FROM VERB - await socket_writer(socket, 'from:$atsign'); - var response = await read(); - print('From verb response $response'); - response = response.replaceAll('data:', ''); - var pkam_digest = generatePKAMDigest(atsign, response); - // var cram = getDigest(atsign, response); - - // PKAM VERB - await socket_writer(socket, 'pkam:$pkam_digest'); - response = await read(); - print('pkam verb response $response'); - expect(response, 'data:success\n'); - - //CRAM VERB - // await socket_writer(socket, 'cram:$cram'); - // response = await read(); - // print('cram verb response $response'); - // expect(response, 'data:success\n'); -} - -void _messageHandler(data) { - if (data.length == 1 && data.first == 64) { - return; - } - //ignore prompt(@ or @@) after '\n'. byte code for \n is 10 - if (data.last == 64 && data.contains(10)) { - data = data.sublist(0, data.lastIndexOf(10) + 1); - _queue.add(utf8.decode(data)); - } else if (data.length > 1 && data.first == 64 && data.last == 64) { - // pol responses do not end with '\n'. Add \n for buffer completion - _queue.add(utf8.decode(data)); - } else { - _queue.add(utf8.decode(data)); - } -} - -Future read({int maxWaitMilliSeconds = 5000}) async { - var result; - //wait maxWaitMilliSeconds seconds for response from remote socket - var loopCount = (maxWaitMilliSeconds / 50).round(); - for (var i = 0; i < loopCount; i++) { - await Future.delayed(Duration(milliseconds: 1000)); - var queueLength = _queue.length; - if (queueLength > 0) { - result = _queue.removeFirst(); - // result from another secondary is either data or a @@ denoting complete - // of the handshake - if (result.startsWith('data:') || - (result.startsWith('error:')) || - (result.startsWith('@') && result.endsWith('@'))) { - return result; - } else { - //log any other response and ignore - result = ''; - } - } - } - return result; -} diff --git a/tests/at_end2end_test/test/e2e_test_utils.dart b/tests/at_end2end_test/test/e2e_test_utils.dart index 9862a13c6..e5bce6041 100644 --- a/tests/at_end2end_test/test/e2e_test_utils.dart +++ b/tests/at_end2end_test/test/e2e_test_utils.dart @@ -28,10 +28,17 @@ Future getSocketHandler(atSign) async { if (asc == null) { throw _NoSuchAtSignException('$atSign not configured'); } - // TODO switch on _AtSignConfig.connectionType and create a - // TODO SimpleOutboundSocketConnection or SimpleOutboundWebsocketConnection - // TODO as required - var handler = SimpleOutboundSocketHandler._(asc.host, asc.port, atSign); + + // ignore: prefer_typing_uninitialized_variables + var handler; + switch(asc.connectionType!) { + case _ConnectionTypeEnum.socket: + handler = SimpleOutboundSocketHandler._(asc.host, asc.port, atSign); + break; + case _ConnectionTypeEnum.websocket: + // TODO e.g. handler = SimpleOutboundWebsocketConnection._(asc.host, asc.port, atSign); + throw UnimplementedError('e2e_test_utils cannot yet create a websocket connection'); + } await handler.connect(); handler.startListening(); @@ -212,11 +219,11 @@ class _AtSignConfig { String atSign; String host; int port; - _ConnectionTypeEnum connectionType; + _ConnectionTypeEnum? connectionType; /// Creates and adds to [atSignConfigMap] or throws [_AtSignAlreadyAddedException] if we've already got it. _AtSignConfig(this.atSign, this.host, this.port, this.connectionType) { - this.connectionType ??= _ConnectionTypeEnum.socket; + connectionType ??= _ConnectionTypeEnum.socket; if (atSignConfigMap.containsKey(atSign)) { throw _AtSignAlreadyAddedException("AtSignConfig for $atSign has already been created"); } diff --git a/tests/at_end2end_test/test/update_verb_test.dart b/tests/at_end2end_test/test/update_verb_test.dart index 45ad8898d..a4ae2b448 100644 --- a/tests/at_end2end_test/test/update_verb_test.dart +++ b/tests/at_end2end_test/test/update_verb_test.dart @@ -3,7 +3,6 @@ import 'dart:math'; import 'package:test/test.dart'; -import 'commons.dart'; import 'e2e_test_utils.dart' as e2e; void main() { @@ -77,6 +76,8 @@ void main() { expect(response, contains('data:$value')); //LOOKUP VERB in the other secondary + var maxRetryCount = 10; + var retryCount = 1; while (true) { await sh2.writeCommand('llookup:cached:$atSign_2:youtube_id$atSign_1'); response = await sh2.read(); From 5b4e6b0cff80d1b0e39915668a5b1973bb30a15c Mon Sep 17 00:00:00 2001 From: gkc Date: Thu, 12 Dec 2024 10:04:55 +0000 Subject: [PATCH 17/22] chore: syntax error? --- .github/workflows/at_server.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/at_server.yaml b/.github/workflows/at_server.yaml index 09310d715..1f034cb0c 100644 --- a/.github/workflows/at_server.yaml +++ b/.github/workflows/at_server.yaml @@ -569,11 +569,11 @@ jobs: sed -i "s/ATSIGN_1_NAME/@$AT_SIGN_1/g" tests/at_end2end_test/config/config-e2e_test_runtime.yaml sed -i "s/ATSIGN_1_PORT/$AT_SIGN_1_PORT/g" tests/at_end2end_test/config/config-e2e_test_runtime.yaml sed -i "s/ATSIGN_1_HOST/$AT_SIGN_1_HOST/g" tests/at_end2end_test/config/config-e2e_test_runtime.yaml -# TODO: add something like `sed -i "s/ATSIGN_1_CONNECTION_TYPE/$AT_SIGN_1_CONNECTION_TYPE/g" tests/at_end2end_test/config/config-e2e_test_runtime.yaml` + # TODO: add something like sed -i "s/ATSIGN_1_CONNECTION_TYPE/$AT_SIGN_1_CONNECTION_TYPE/g" tests/at_end2end_test/config/config-e2e_test_runtime.yaml sed -i "s/ATSIGN_2_NAME/@$AT_SIGN_2/g" tests/at_end2end_test/config/config-e2e_test_runtime.yaml sed -i "s/ATSIGN_2_PORT/$AT_SIGN_2_PORT/g" tests/at_end2end_test/config/config-e2e_test_runtime.yaml sed -i "s/ATSIGN_2_HOST/$AT_SIGN_2_HOST/g" tests/at_end2end_test/config/config-e2e_test_runtime.yaml -# TODO: add something like `sed -i "s/ATSIGN_2_CONNECTION_TYPE/$AT_SIGN_2_CONNECTION_TYPE/g" tests/at_end2end_test/config/config-e2e_test_runtime.yaml` + # TODO: add something like sed -i "s/ATSIGN_2_CONNECTION_TYPE/$AT_SIGN_2_CONNECTION_TYPE/g" tests/at_end2end_test/config/config-e2e_test_runtime.yaml mv tests/at_end2end_test/config/config-e2e_test_runtime.yaml tests/at_end2end_test/config/config.yaml cat tests/at_end2end_test/config/config.yaml echo "Connection successful" From 7a62b0e4b113acdd8997d500247921a89ffbd657 Mon Sep 17 00:00:00 2001 From: gkc Date: Thu, 12 Dec 2024 15:20:32 +0000 Subject: [PATCH 18/22] test: removed unnecessary TODOS; updated config12.yaml adding connection type `websocket` --- .github/workflows/at_server.yaml | 4 +--- tests/at_end2end_test/config/config12.yaml | 12 ++++++++---- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/.github/workflows/at_server.yaml b/.github/workflows/at_server.yaml index 1f034cb0c..951c24e96 100644 --- a/.github/workflows/at_server.yaml +++ b/.github/workflows/at_server.yaml @@ -569,11 +569,9 @@ jobs: sed -i "s/ATSIGN_1_NAME/@$AT_SIGN_1/g" tests/at_end2end_test/config/config-e2e_test_runtime.yaml sed -i "s/ATSIGN_1_PORT/$AT_SIGN_1_PORT/g" tests/at_end2end_test/config/config-e2e_test_runtime.yaml sed -i "s/ATSIGN_1_HOST/$AT_SIGN_1_HOST/g" tests/at_end2end_test/config/config-e2e_test_runtime.yaml - # TODO: add something like sed -i "s/ATSIGN_1_CONNECTION_TYPE/$AT_SIGN_1_CONNECTION_TYPE/g" tests/at_end2end_test/config/config-e2e_test_runtime.yaml sed -i "s/ATSIGN_2_NAME/@$AT_SIGN_2/g" tests/at_end2end_test/config/config-e2e_test_runtime.yaml sed -i "s/ATSIGN_2_PORT/$AT_SIGN_2_PORT/g" tests/at_end2end_test/config/config-e2e_test_runtime.yaml - sed -i "s/ATSIGN_2_HOST/$AT_SIGN_2_HOST/g" tests/at_end2end_test/config/config-e2e_test_runtime.yaml - # TODO: add something like sed -i "s/ATSIGN_2_CONNECTION_TYPE/$AT_SIGN_2_CONNECTION_TYPE/g" tests/at_end2end_test/config/config-e2e_test_runtime.yaml + sed -i "s/ATSIGN_2_HOST/$AT_SIGN_2_HOST/g" tests/at_end2end_test/config/config-e2e_test_runtime.yaml mv tests/at_end2end_test/config/config-e2e_test_runtime.yaml tests/at_end2end_test/config/config.yaml cat tests/at_end2end_test/config/config.yaml echo "Connection successful" diff --git a/tests/at_end2end_test/config/config12.yaml b/tests/at_end2end_test/config/config12.yaml index b37bb3b51..213e86c5d 100644 --- a/tests/at_end2end_test/config/config12.yaml +++ b/tests/at_end2end_test/config/config12.yaml @@ -8,14 +8,18 @@ root_server: # cicd atsign details first_atsign_server: first_atsign_name: '@cicd1' - # # The port to connect to first atsign server + # The port to connect to first atsign server first_atsign_port: 6464 - # # The url to connect to first atsign server + # The url to connect to first atsign server first_atsign_url: cicd1.atsign.wtf + # The type of connection (socket or websocket) + first_atsign_connection_type: websocket second_atsign_server: second_atsign_name: '@cicd2' - # # The port to connect to second atsign server + # The port to connect to second atsign server second_atsign_port: 6464 - # # The url to connect to second atsign server + # The url to connect to second atsign server second_atsign_url: cicd2.atsign.wtf + # The type of connection (socket or websocket) + second_atsign_connection_type: websocket From 445bb02c716eaaa03daef0925c9bf7e25ac53c97 Mon Sep 17 00:00:00 2001 From: gkc Date: Thu, 12 Dec 2024 15:33:01 +0000 Subject: [PATCH 19/22] test: fixed e2e_test_utils for new enum --- tests/at_end2end_test/test/e2e_test_utils.dart | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/tests/at_end2end_test/test/e2e_test_utils.dart b/tests/at_end2end_test/test/e2e_test_utils.dart index e5bce6041..67e66f200 100644 --- a/tests/at_end2end_test/test/e2e_test_utils.dart +++ b/tests/at_end2end_test/test/e2e_test_utils.dart @@ -250,18 +250,32 @@ void _loadTheYaml() { _yamlLoaded = true; + String? connTypeStr1; + _ConnectionTypeEnum? connType1; + connTypeStr1 = ConfigUtil.getYaml()!['first_atsign_server'] + ['first_atsign_connection_type']; + if (connTypeStr1 != null) { + connType1 = _ConnectionTypeEnum.values.byName(connTypeStr1); + } _AtSignConfig( ConfigUtil.getYaml()!['first_atsign_server']['first_atsign_name'], ConfigUtil.getYaml()!['first_atsign_server']['first_atsign_url'], ConfigUtil.getYaml()!['first_atsign_server']['first_atsign_port'], - ConfigUtil.getYaml()!['first_atsign_server']['first_atsign_connection_type'], + connType1, ); + String? connTypeStr2; + _ConnectionTypeEnum? connType2; + connTypeStr2 = ConfigUtil.getYaml()!['second_atsign_server'] + ['second_atsign_connection_type']; + if (connTypeStr2 != null) { + connType2 = _ConnectionTypeEnum.values.byName(connTypeStr2); + } _AtSignConfig( ConfigUtil.getYaml()!['second_atsign_server']['second_atsign_name'], ConfigUtil.getYaml()!['second_atsign_server']['second_atsign_url'], ConfigUtil.getYaml()!['second_atsign_server']['second_atsign_port'], - ConfigUtil.getYaml()!['second_atsign_server']['second_atsign_connection_type'], + connType2, ); /// TODO Ideally instead of the current config.yaml we'd have yaml like this From 6aca4bdc87538bda4e645a163b6dd2fbc1e1202b Mon Sep 17 00:00:00 2001 From: gkc Date: Thu, 12 Dec 2024 15:35:36 +0000 Subject: [PATCH 20/22] test: fixed e2e_test_utils for new enum --- tests/at_end2end_test/test/e2e_test_utils.dart | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/at_end2end_test/test/e2e_test_utils.dart b/tests/at_end2end_test/test/e2e_test_utils.dart index 67e66f200..6a23190ce 100644 --- a/tests/at_end2end_test/test/e2e_test_utils.dart +++ b/tests/at_end2end_test/test/e2e_test_utils.dart @@ -37,7 +37,8 @@ Future getSocketHandler(atSign) async { break; case _ConnectionTypeEnum.websocket: // TODO e.g. handler = SimpleOutboundWebsocketConnection._(asc.host, asc.port, atSign); - throw UnimplementedError('e2e_test_utils cannot yet create a websocket connection'); + handler = SimpleOutboundSocketHandler._(asc.host, asc.port, atSign); + break; } await handler.connect(); From 39600c98a0c71b7731080e8e49a9b4c1779b6b74 Mon Sep 17 00:00:00 2001 From: purnimavenkatasubbu Date: Tue, 17 Dec 2024 23:28:44 +0530 Subject: [PATCH 21/22] addressed todos for e2e tests --- packages/at_secondary_server/CHANGELOG.md | 2 +- .../inbound/inbound_message_listener.dart | 2 +- packages/at_secondary_server/pubspec.yaml | 2 +- .../config/config-e2e_test_runtime.yaml | 6 +- .../at_end2end_test/test/cached_key_test.dart | 4 +- .../at_end2end_test/test/e2e_test_utils.dart | 332 ++++++++++++------ .../test/lookup_verb_test.dart | 4 +- .../test/notify_verb_test.dart | 14 +- .../test/plookup_verb_test.dart | 4 +- .../at_end2end_test/test/stats_verb_test.dart | 6 +- .../test/update_verb_test.dart | 4 +- 11 files changed, 242 insertions(+), 138 deletions(-) diff --git a/packages/at_secondary_server/CHANGELOG.md b/packages/at_secondary_server/CHANGELOG.md index 2099cc83d..7e703bd3a 100644 --- a/packages/at_secondary_server/CHANGELOG.md +++ b/packages/at_secondary_server/CHANGELOG.md @@ -1,4 +1,4 @@ -# 3.1.2 +# 3.2.0 - feat: Added WebSocket support for inbound connections # 3.1.1 - fix: Store "publicKeyHash" value in the keystore diff --git a/packages/at_secondary_server/lib/src/connection/inbound/inbound_message_listener.dart b/packages/at_secondary_server/lib/src/connection/inbound/inbound_message_listener.dart index 6beb04b76..8c728a0ef 100644 --- a/packages/at_secondary_server/lib/src/connection/inbound/inbound_message_listener.dart +++ b/packages/at_secondary_server/lib/src/connection/inbound/inbound_message_listener.dart @@ -49,7 +49,7 @@ class InboundMessageListener { if (streamData is List) { data = streamData; } else if (streamData is String) { - data = streamData.codeUnits; + data = utf8.encode(streamData); } else { logger.severe('Un-handled data type: ${streamData.runtimeType}'); await _finishedHandler(); diff --git a/packages/at_secondary_server/pubspec.yaml b/packages/at_secondary_server/pubspec.yaml index 91945018d..b84fd9513 100644 --- a/packages/at_secondary_server/pubspec.yaml +++ b/packages/at_secondary_server/pubspec.yaml @@ -1,6 +1,6 @@ name: at_secondary description: Implementation of secondary server. -version: 3.1.2 +version: 3.2.0 repository: https://github.com/atsign-foundation/at_server homepage: https://www.example.com publish_to: none diff --git a/tests/at_end2end_test/config/config-e2e_test_runtime.yaml b/tests/at_end2end_test/config/config-e2e_test_runtime.yaml index a00f5d045..8569ec7d6 100644 --- a/tests/at_end2end_test/config/config-e2e_test_runtime.yaml +++ b/tests/at_end2end_test/config/config-e2e_test_runtime.yaml @@ -12,7 +12,8 @@ first_atsign_server: first_atsign_port: ATSIGN_1_PORT # # The url to connect to first atServer first_atsign_url: ATSIGN_1_HOST - # TODO add something like `first_atsign_connection_type: ATSIGN_1_CONNECTION_TYPE` + # # The connection type of first atServer + first_atsign_connection_type: ATSIGN_1_CONNECTION_TYPE second_atsign_server: second_atsign_name: 'ATSIGN_2_NAME' @@ -20,4 +21,5 @@ second_atsign_server: second_atsign_port: ATSIGN_2_PORT # # The url to connect to second atServer second_atsign_url: ATSIGN_2_HOST - # TODO add something like `second_atsign_connection_type: ATSIGN_2_CONNECTION_TYPE` + # # The connection type of second atServer + first_atsign_connection_type: ATSIGN_2_CONNECTION_TYPE diff --git a/tests/at_end2end_test/test/cached_key_test.dart b/tests/at_end2end_test/test/cached_key_test.dart index cbf6e3a93..bed884eab 100644 --- a/tests/at_end2end_test/test/cached_key_test.dart +++ b/tests/at_end2end_test/test/cached_key_test.dart @@ -6,10 +6,10 @@ import 'e2e_test_utils.dart' as e2e; void main() { late String atSign_1; - late e2e.SimpleOutboundSocketHandler sh1; + late e2e.SimpleOutboundConnection sh1; late String atSign_2; - late e2e.SimpleOutboundSocketHandler sh2; + late e2e.SimpleOutboundConnection sh2; int lastValue = DateTime.now().millisecondsSinceEpoch; diff --git a/tests/at_end2end_test/test/e2e_test_utils.dart b/tests/at_end2end_test/test/e2e_test_utils.dart index 6a23190ce..c55853815 100644 --- a/tests/at_end2end_test/test/e2e_test_utils.dart +++ b/tests/at_end2end_test/test/e2e_test_utils.dart @@ -1,3 +1,5 @@ +import 'dart:math'; + import 'package:at_commons/at_commons.dart'; import 'package:at_end2end_test/conf/config_util.dart'; import 'pkam_utils.dart'; @@ -5,10 +7,15 @@ import 'pkam_utils.dart'; import 'dart:collection'; import 'dart:convert'; import 'dart:io'; +// ignore: depend_on_referenced_packages +import 'package:at_utils/at_logger.dart'; const int maxRetryCount = 10; +AtSignLogger logger = AtSignLogger('e2e_test_utils'); + /// Contains all [_AtSignConfig] instances we know about so we can avoid loads of boilerplate elsewhere +// ignore: library_private_types_in_public_api LinkedHashMap atSignConfigMap = LinkedHashMap(); /// Return a List of atSigns known to these e2e test utils. Ordering is the order of insertion in [_loadYaml] which is @@ -21,7 +28,7 @@ List knownAtSigns() { /// Utility method which will return a socket handler. Gets config from [atSignConfigMap] which in turn calls /// [_loadTheYaml] if it hasn't yet been loaded. /// Can evolve this to use a pooling approach if/when it becomes necessary. -Future getSocketHandler(atSign) async { +Future getSocketHandler(atSign) async { _loadTheYaml(); _AtSignConfig? asc = atSignConfigMap[atSign]; @@ -29,15 +36,15 @@ Future getSocketHandler(atSign) async { throw _NoSuchAtSignException('$atSign not configured'); } + // ignore: prefer_typing_uninitialized_variables // ignore: prefer_typing_uninitialized_variables var handler; - switch(asc.connectionType!) { + switch (asc.connectionType!) { case _ConnectionTypeEnum.socket: - handler = SimpleOutboundSocketHandler._(asc.host, asc.port, atSign); + handler = SimpleOutboundSocketConnection(asc.host, asc.port, atSign); break; case _ConnectionTypeEnum.websocket: - // TODO e.g. handler = SimpleOutboundWebsocketConnection._(asc.host, asc.port, atSign); - handler = SimpleOutboundSocketHandler._(asc.host, asc.port, atSign); + handler = SimpleOutboundWebsocketConnection(asc.host, asc.port, atSign); break; } @@ -49,29 +56,137 @@ Future getSocketHandler(atSign) async { } /// A simple wrapper around a socket for @ protocol communication. -class SimpleOutboundSocketHandler { - // TODO Turn this into an abstract base class, e.g. SimpleOutboundConnection - // TODO with two concrete subclasses, e.g. SimpleOutboundSocketConnection - // TODO and SimpleOutboundWebsocketConnection +/// A simple wrapper around a socket for @ protocol communication. +abstract class SimpleOutboundConnection { late Queue _queue; final _buffer = ByteBuffer(capacity: 10240000); - // ignore: prefer_typing_uninitialized_variables String host; int port; String atSign; - SecureSocket? socket; - - /// Try to open a socket - SimpleOutboundSocketHandler._(this.host, this.port, this.atSign) { + SimpleOutboundConnection(this.host, this.port, this.atSign) { _queue = Queue(); } + void close(); + Future connect(); + void startListening(); + Future writeCommand(String command, {bool log = true}); + Future sendFromAndPkam(); + + Future clear() async { + _queue.clear(); + } + + Future read( + {bool log = true, + int timeoutMillis = 4000, + bool throwTimeoutException = true}) async { + String result; + // Wait this many milliseconds between checks on the queue + var loopDelay = 250; + bool first = true; + var loopCount = (timeoutMillis / loopDelay).round(); + for (var i = 0; i < loopCount; i++) { + if (!first) { + await Future.delayed(Duration(milliseconds: loopDelay)); + } + first = false; + var queueLength = _queue.length; + if (queueLength > 0) { + result = _queue.removeFirst(); + if (log) { + print("Response: $result"); + } + return result; + } + } + // No response - either throw a timeout exception or return the canned readTimedOutMessage + if (throwTimeoutException) { + throw AtTimeoutException( + "No response from $host:$port ($atSign) after ${timeoutMillis / 1000} seconds"); + } else { + print("read(): No response after $timeoutMillis milliseconds"); + return readTimedOutMessage; + } + } + + final int newLineCodeUnit = 10; + final int atCharCodeUnit = 64; + + /// Handles responses from the remote secondary, adds to [_queue] for processing in [read] method + /// Throws a [BufferOverFlowException] if buffer is unable to hold incoming data + /// Handles responses from the remote secondary for both socket and WebSocket connections. + /// Adds responses to [_queue] for processing in the [read] method. + /// Throws a [BufferOverFlowException] if buffer is unable to hold incoming data. + Future _messageHandler(dynamic data) async { + if (data is String) { + if (data == '@' || data.isEmpty) { + return; + } + int lastIndexOfNewLineCharacter = data.lastIndexOf('\n'); + data = data.substring(0, lastIndexOfNewLineCharacter); + _queue.add(data); + } else if (data is List) { + // Handle raw socket data + _checkBufferOverFlow(data); + + // Loop through the data and process it + for (int element = 0; element < data.length; element++) { + if (data[element] == newLineCodeUnit) { + String result = utf8.decode(_buffer.getData().toList()); + result = _stripPrompt(result); + _buffer.clear(); + _queue.add(result); + } else { + _buffer.addByte(data[element]); + } + } + } else { + throw UnsupportedError( + 'Unsupported data type received: ${data.runtimeType}'); + } + } + + void _checkBufferOverFlow(data) { + if (_buffer.isOverFlow(data)) { + int bufferLength = (_buffer.length() + data.length) as int; + _buffer.clear(); + throw BufferOverFlowException( + 'data length exceeded the buffer limit. Data length : $bufferLength and Buffer capacity ${_buffer.capacity}'); + } + } + + String _stripPrompt(String result) { + var colonIndex = result.indexOf(':'); + if (colonIndex == -1) { + return result; + } + var responsePrefix = result.substring(0, colonIndex); + var response = result.substring(colonIndex); + if (responsePrefix.contains('@')) { + responsePrefix = + responsePrefix.substring(responsePrefix.lastIndexOf('@') + 1); + } + return '$responsePrefix$response'; + } + + static String readTimedOutMessage = 'E2E_SIMPLE_SOCKET_HANDLER_TIMED_OUT'; +} + +class SimpleOutboundSocketConnection extends SimpleOutboundConnection { + SecureSocket? socket; + + SimpleOutboundSocketConnection(String host, int port, String atSign) + : super(host, port, atSign); + + @override void close() { - print("Closing SimpleOutboundSocketHandler for $atSign ($host:$port)"); - socket!.destroy(); + print("Closing SimpleOutboundSocketConnection for $atSign ($host:$port)"); + socket?.destroy(); } + @override Future connect() async { int retryCount = 1; while (retryCount < maxRetryCount) { @@ -86,128 +201,113 @@ class SimpleOutboundSocketHandler { retryCount++; } } - throw Exception("Failed to connect to $host:$port after $retryCount attempts"); + throw Exception( + "Failed to connect to $host:$port after $retryCount attempts"); } + @override void startListening() { - socket!.listen(_messageHandler); + socket?.listen(_messageHandler); } - /// Socket write + @override Future writeCommand(String command, {bool log = true}) async { if (log) { print('command sent: $command'); } - if (! command.endsWith('\n')) { - command = command + '\n'; + if (!command.endsWith('\n')) { + command = '$command\n'; } - socket!.write(command); + socket?.write(command); } - /// Runs a from verb and pkam verb on the atsign param. + @override Future sendFromAndPkam() async { - // FROM VERB - // Setting clientVersion to 3.0.38 to support JSON encoding of error responses await writeCommand('from:$atSign:clientConfig:{"version":"3.0.38"}'); - var response = await read(timeoutMillis:5000); + var response = await read(timeoutMillis: 5000); response = response.replaceAll('data:', ''); var pkamDigest = generatePKAMDigest(atSign, response); - // PKAM VERB - print ("Sending pkam: command"); - await writeCommand('pkam:$pkamDigest', log:false); - response = await read(timeoutMillis:5000); + await writeCommand('pkam:$pkamDigest', log: false); + response = await read(timeoutMillis: 5000); print('pkam verb response $response'); assert(response.contains('data:success')); } +} - Future clear() async { - // queue.clear(); - } +class SimpleOutboundWebsocketConnection extends SimpleOutboundConnection { + WebSocket? websocket; - final int newLineCodeUnit = 10; - final int atCharCodeUnit = 64; + SimpleOutboundWebsocketConnection(String host, int port, String atSign) + : super(host, port, atSign); - /// Handles responses from the remote secondary, adds to [_queue] for processing in [read] method - /// Throws a [BufferOverFlowException] if buffer is unable to hold incoming data - Future _messageHandler(data) async { - // check buffer overflow - _checkBufferOverFlow(data); - - // Loop from last index to until the end of data. - // If a new line character is found, then it is end - // of server response. process the data. - // Else add the byte to buffer. - for (int element = 0; element < data.length; element++) { - // If it's a '\n' then complete data has been received. process it. - if (data[element] == newLineCodeUnit) { - String result = utf8.decode(_buffer.getData().toList()); - result = _stripPrompt(result); - _buffer.clear(); - _queue.add(result); - } else { - _buffer.addByte(data[element]); - } - } + @override + void close() { + print( + "Closing SimpleOutboundWebsocketConnection for $atSign ($host:$port)"); + websocket?.close(); } - _checkBufferOverFlow(data) { - if (_buffer.isOverFlow(data)) { - int bufferLength = (_buffer.length() + data.length) as int; - _buffer.clear(); - throw BufferOverFlowException( - 'data length exceeded the buffer limit. Data length : $bufferLength and Buffer capacity ${_buffer.capacity}'); + @override + Future connect() async { + try { + Random random = Random(); + String key = + base64.encode(List.generate(8, (_) => random.nextInt(256))); + + SecurityContext context = SecurityContext.defaultContext; + context.setAlpnProtocols(['http/1.1'], false); + HttpClient client = HttpClient(context: context); + + Uri uri = Uri.parse("https://$host:$port/ws"); + HttpClientRequest request = await client.getUrl(uri); + request.headers.add('Connection', 'upgrade'); + request.headers.add('Upgrade', 'websocket'); + request.headers.add('sec-websocket-version', '13'); + request.headers.add('sec-websocket-key', key); + + HttpClientResponse response = await request.close(); + Socket socket = await response.detachSocket(); + + websocket = WebSocket.fromUpgradedSocket( + socket, + serverSide: false, + ); + + print('WebSocket connection established for $atSign ($host:$port)'); + } catch (e) { + throw AtException( + 'Failed to establish WebSocket connection: ${e.toString()}'); } } - String _stripPrompt(String result) { - var colonIndex = result.indexOf(':'); - if (colonIndex == -1) { - return result; + @override + void startListening() { + websocket?.listen(_messageHandler); + } + + @override + Future writeCommand(String command, {bool log = true}) async { + if (log) { + print('command sent: $command'); } - var responsePrefix = result.substring(0, colonIndex); - var response = result.substring(colonIndex); - if (responsePrefix.contains('@')) { - responsePrefix = - responsePrefix.substring(responsePrefix.lastIndexOf('@') + 1); + if (!command.endsWith('\n')) { + command = '$command\n'; } - return '$responsePrefix$response'; + websocket?.add(command); } - /// A message which is returned from [read] if throwTimeoutException is set to false - static String readTimedOutMessage = 'E2E_SIMPLE_SOCKET_HANDLER_TIMED_OUT'; - - Future read({bool log = true, int timeoutMillis = 4000, bool throwTimeoutException = true}) async { - String result; - - // Wait this many milliseconds between checks on the queue - var loopDelay=250; + @override + Future sendFromAndPkam() async { + await writeCommand('from:$atSign:clientConfig:{"version":"3.0.38"}'); + var response = await read(timeoutMillis: 5000); + response = response.replaceAll('data:', ''); + var pkamDigest = generatePKAMDigest(atSign, response); - bool first = true; - // Check every loopDelay milliseconds until we get a response or timeoutMillis have passed. - var loopCount = (timeoutMillis / loopDelay).round(); - for (var i = 0; i < loopCount; i++) { - if (!first) { - await Future.delayed(Duration(milliseconds: loopDelay)); - } - first = false; - var queueLength = _queue.length; - if (queueLength > 0) { - result = _queue.removeFirst(); - if (log) { - print("Response: $result"); - } - // Got a response, let's return it - return result; - } - } - // No response - either throw a timeout exception or return the canned readTimedOutMessage - if (throwTimeoutException) { - throw AtTimeoutException ("No response from $host:$port ($atSign) after ${timeoutMillis/1000} seconds"); - } else { - print ("read(): No response after $timeoutMillis milliseconds"); - return readTimedOutMessage; - } + await writeCommand('pkam:$pkamDigest', log: false); + response = await read(timeoutMillis: 5000); + print('pkam verb response $response'); + assert(response.contains('data:success')); } } @@ -215,6 +315,7 @@ enum _ConnectionTypeEnum { socket, websocket, } + /// Simple data-holding class which adds its instances into [atSignConfigMap] class _AtSignConfig { String atSign; @@ -226,7 +327,8 @@ class _AtSignConfig { _AtSignConfig(this.atSign, this.host, this.port, this.connectionType) { connectionType ??= _ConnectionTypeEnum.socket; if (atSignConfigMap.containsKey(atSign)) { - throw _AtSignAlreadyAddedException("AtSignConfig for $atSign has already been created"); + throw _AtSignAlreadyAddedException( + "AtSignConfig for $atSign has already been created"); } atSignConfigMap[atSign] = this; } @@ -259,10 +361,10 @@ void _loadTheYaml() { connType1 = _ConnectionTypeEnum.values.byName(connTypeStr1); } _AtSignConfig( - ConfigUtil.getYaml()!['first_atsign_server']['first_atsign_name'], - ConfigUtil.getYaml()!['first_atsign_server']['first_atsign_url'], - ConfigUtil.getYaml()!['first_atsign_server']['first_atsign_port'], - connType1, + ConfigUtil.getYaml()!['first_atsign_server']['first_atsign_name'], + ConfigUtil.getYaml()!['first_atsign_server']['first_atsign_url'], + ConfigUtil.getYaml()!['first_atsign_server']['first_atsign_port'], + connType1, ); String? connTypeStr2; @@ -273,10 +375,10 @@ void _loadTheYaml() { connType2 = _ConnectionTypeEnum.values.byName(connTypeStr2); } _AtSignConfig( - ConfigUtil.getYaml()!['second_atsign_server']['second_atsign_name'], - ConfigUtil.getYaml()!['second_atsign_server']['second_atsign_url'], - ConfigUtil.getYaml()!['second_atsign_server']['second_atsign_port'], - connType2, + ConfigUtil.getYaml()!['second_atsign_server']['second_atsign_name'], + ConfigUtil.getYaml()!['second_atsign_server']['second_atsign_url'], + ConfigUtil.getYaml()!['second_atsign_server']['second_atsign_port'], + connType2, ); /// TODO Ideally instead of the current config.yaml we'd have yaml like this @@ -290,7 +392,7 @@ void _loadTheYaml() { /// ... etc } -extension Utils on SimpleOutboundSocketHandler { +extension Utils on SimpleOutboundConnection { Future getVersion() async { await writeCommand('info\n'); var version = await read(); diff --git a/tests/at_end2end_test/test/lookup_verb_test.dart b/tests/at_end2end_test/test/lookup_verb_test.dart index 14dc155d1..4d29acd81 100644 --- a/tests/at_end2end_test/test/lookup_verb_test.dart +++ b/tests/at_end2end_test/test/lookup_verb_test.dart @@ -9,10 +9,10 @@ import 'e2e_test_utils.dart' as e2e; void main() { late String atSign_1; - late e2e.SimpleOutboundSocketHandler sh1; + late e2e.SimpleOutboundConnection sh1; late String atSign_2; - late e2e.SimpleOutboundSocketHandler sh2; + late e2e.SimpleOutboundConnection sh2; setUpAll(() async { List atSigns = e2e.knownAtSigns(); diff --git a/tests/at_end2end_test/test/notify_verb_test.dart b/tests/at_end2end_test/test/notify_verb_test.dart index c901f233f..3556cfadd 100644 --- a/tests/at_end2end_test/test/notify_verb_test.dart +++ b/tests/at_end2end_test/test/notify_verb_test.dart @@ -8,10 +8,10 @@ import 'e2e_test_utils.dart' as e2e; void main() { late String atSign_1; - late e2e.SimpleOutboundSocketHandler sh1; + late e2e.SimpleOutboundConnection sh1; late String atSign_2; - late e2e.SimpleOutboundSocketHandler sh2; + late e2e.SimpleOutboundConnection sh2; var lastValue = Random().nextInt(30); @@ -46,7 +46,7 @@ void main() { // atsign2 create connection, issue monitor command for that namespace // verify atsign2 receives all 5 notifications immediately test('monitor receives multiple pending notifications immediately', () async { - e2e.SimpleOutboundSocketHandler notifySH, monitorSH; + e2e.SimpleOutboundConnection notifySH, monitorSH; notifySH = await e2e.getSocketHandler(atSign_1); monitorSH = await e2e.getSocketHandler(atSign_2); @@ -845,7 +845,7 @@ void main() { // get notify status Future getNotifyStatus( - e2e.SimpleOutboundSocketHandler sh, String notificationId, + e2e.SimpleOutboundConnection sh, String notificationId, {List? returnWhenStatusIn, int timeOutMillis = 5000}) async { returnWhenStatusIn ??= ['expired']; print( @@ -871,7 +871,7 @@ Future getNotifyStatus( log: true, timeoutMillis: loopDelay, throwTimeoutException: false); readTimedOut = - (response == e2e.SimpleOutboundSocketHandler.readTimedOutMessage); + (response == e2e.SimpleOutboundConnection.readTimedOutMessage); if (response.startsWith('data:')) { String status = response.replaceFirst('data:', '').replaceAll('\n', ''); @@ -888,7 +888,7 @@ Future getNotifyStatus( } Future retryCommandUntilMatchOrTimeout( - e2e.SimpleOutboundSocketHandler sh, + e2e.SimpleOutboundConnection sh, String command, String shouldContain, int timeoutMillis) async { @@ -909,7 +909,7 @@ Future retryCommandUntilMatchOrTimeout( log: false, timeoutMillis: loopDelay, throwTimeoutException: false); readTimedOut = - (response == e2e.SimpleOutboundSocketHandler.readTimedOutMessage); + (response == e2e.SimpleOutboundConnection.readTimedOutMessage); if (readTimedOut) { continue; } diff --git a/tests/at_end2end_test/test/plookup_verb_test.dart b/tests/at_end2end_test/test/plookup_verb_test.dart index 39993af88..1880de6c7 100644 --- a/tests/at_end2end_test/test/plookup_verb_test.dart +++ b/tests/at_end2end_test/test/plookup_verb_test.dart @@ -7,10 +7,10 @@ import 'e2e_test_utils.dart' as e2e; void main() { late String atSign_1; - late e2e.SimpleOutboundSocketHandler sh1; + late e2e.SimpleOutboundConnection sh1; late String atSign_2; - late e2e.SimpleOutboundSocketHandler sh2; + late e2e.SimpleOutboundConnection sh2; setUpAll(() async { List atSigns = e2e.knownAtSigns(); diff --git a/tests/at_end2end_test/test/stats_verb_test.dart b/tests/at_end2end_test/test/stats_verb_test.dart index 7fcf5bf3b..1efcd2a41 100644 --- a/tests/at_end2end_test/test/stats_verb_test.dart +++ b/tests/at_end2end_test/test/stats_verb_test.dart @@ -8,10 +8,10 @@ import 'notify_verb_test.dart' as notification; void main() { late String atSign_1; - late e2e.SimpleOutboundSocketHandler sh1; + late e2e.SimpleOutboundConnection sh1; late String atSign_2; - late e2e.SimpleOutboundSocketHandler sh2; + late e2e.SimpleOutboundConnection sh2; var lastValue = Random().nextInt(20); @@ -170,7 +170,7 @@ void main() { }); } -Future notificationStats(e2e.SimpleOutboundSocketHandler sh) async { +Future notificationStats(e2e.SimpleOutboundConnection sh) async { await sh.writeCommand('stats:11'); var statsResponse = await sh.read(); print('stats verb response : $statsResponse'); diff --git a/tests/at_end2end_test/test/update_verb_test.dart b/tests/at_end2end_test/test/update_verb_test.dart index a4ae2b448..8612a54e3 100644 --- a/tests/at_end2end_test/test/update_verb_test.dart +++ b/tests/at_end2end_test/test/update_verb_test.dart @@ -7,10 +7,10 @@ import 'e2e_test_utils.dart' as e2e; void main() { late String atSign_1; - late e2e.SimpleOutboundSocketHandler sh1; + late e2e.SimpleOutboundConnection sh1; late String atSign_2; - late e2e.SimpleOutboundSocketHandler sh2; + late e2e.SimpleOutboundConnection sh2; var lastValue = Random().nextInt(20); From 60200fcc6f082b427c7b92cd3e186d56df1d9b17 Mon Sep 17 00:00:00 2001 From: purnimavenkatasubbu Date: Wed, 18 Dec 2024 20:06:29 +0530 Subject: [PATCH 22/22] revert connectionType in config-e2etest-runtime.yaml --- tests/at_end2end_test/config/config-e2e_test_runtime.yaml | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/tests/at_end2end_test/config/config-e2e_test_runtime.yaml b/tests/at_end2end_test/config/config-e2e_test_runtime.yaml index 8569ec7d6..68da164b8 100644 --- a/tests/at_end2end_test/config/config-e2e_test_runtime.yaml +++ b/tests/at_end2end_test/config/config-e2e_test_runtime.yaml @@ -12,14 +12,10 @@ first_atsign_server: first_atsign_port: ATSIGN_1_PORT # # The url to connect to first atServer first_atsign_url: ATSIGN_1_HOST - # # The connection type of first atServer - first_atsign_connection_type: ATSIGN_1_CONNECTION_TYPE second_atsign_server: second_atsign_name: 'ATSIGN_2_NAME' # # The port to connect to second atServer second_atsign_port: ATSIGN_2_PORT # # The url to connect to second atServer - second_atsign_url: ATSIGN_2_HOST - # # The connection type of second atServer - first_atsign_connection_type: ATSIGN_2_CONNECTION_TYPE + second_atsign_url: ATSIGN_2_HOST \ No newline at end of file