diff --git a/packages/at_secondary_server/config/config.yaml b/packages/at_secondary_server/config/config.yaml index 4c1cac706..4b27f99e4 100644 --- a/packages/at_secondary_server/config/config.yaml +++ b/packages/at_secondary_server/config/config.yaml @@ -154,4 +154,8 @@ testing: enrollment: # The maximum time in hours for an enrollment to expire, beyond which any action on enrollment is forbidden. # Default values is 48 hours. - expiryInHours: 48 \ No newline at end of file + expiryInHours: 48 + # The maximum number of requests allowed within the time window. + maxRequestsPerTimeFrame: 5 + # The duration of the time window in hours. + timeFrameInHours: 1 \ No newline at end of file diff --git a/packages/at_secondary_server/lib/src/connection/inbound/dummy_inbound_connection.dart b/packages/at_secondary_server/lib/src/connection/inbound/dummy_inbound_connection.dart index 8b06dc40a..f7b6f748b 100644 --- a/packages/at_secondary_server/lib/src/connection/inbound/dummy_inbound_connection.dart +++ b/packages/at_secondary_server/lib/src/connection/inbound/dummy_inbound_connection.dart @@ -1,12 +1,19 @@ import 'dart:io'; import 'package:at_secondary/src/connection/inbound/inbound_connection_metadata.dart'; +import 'package:at_secondary/src/server/at_secondary_config.dart'; import 'package:at_server_spec/at_server_spec.dart'; /// A dummy implementation of [InboundConnection] class which returns a dummy inbound connection. class DummyInboundConnection implements InboundConnection { var metadata = InboundConnectionMetadata(); + @override + int maxRequestsPerTimeFrame = AtSecondaryConfig.maxEnrollRequestsAllowed; + + @override + int timeFrameInMillis = AtSecondaryConfig.timeFrameInMills; + @override void acceptRequests(Function(String p1, InboundConnection p2) callback, Function(List, InboundConnection) streamCallback) {} @@ -54,4 +61,9 @@ class DummyInboundConnection implements InboundConnection { @override Socket? receiverSocket; + + @override + bool isRequestAllowed() { + return true; + } } diff --git a/packages/at_secondary_server/lib/src/connection/inbound/inbound_connection_impl.dart b/packages/at_secondary_server/lib/src/connection/inbound/inbound_connection_impl.dart index a19bf6b82..dafbae817 100644 --- a/packages/at_secondary_server/lib/src/connection/inbound/inbound_connection_impl.dart +++ b/packages/at_secondary_server/lib/src/connection/inbound/inbound_connection_impl.dart @@ -1,3 +1,4 @@ +import 'dart:collection'; import 'dart:io'; import 'dart:math'; @@ -5,6 +6,7 @@ import 'package:at_secondary/src/connection/base_connection.dart'; import 'package:at_secondary/src/connection/inbound/inbound_connection_metadata.dart'; import 'package:at_secondary/src/connection/inbound/inbound_connection_pool.dart'; import 'package:at_secondary/src/connection/inbound/inbound_message_listener.dart'; +import 'package:at_secondary/src/server/at_secondary_config.dart'; import 'package:at_secondary/src/server/server_context.dart'; import 'package:at_secondary/src/server/at_secondary_impl.dart'; import 'package:at_secondary/src/utils/logging_util.dart'; @@ -42,6 +44,17 @@ class InboundConnectionImpl extends BaseConnection late double lowWaterMarkRatio; late bool progressivelyReduceAllowableInboundIdleTime; + /// The maximum number of requests allowed within the specified time frame. + @override + late int maxRequestsPerTimeFrame; + + /// The duration of the time frame within which requests are limited. + @override + late int timeFrameInMillis; + + /// A list of timestamps representing the times when requests were made. + late final Queue requestTimestampQueue; + InboundConnectionImpl(Socket? socket, String? sessionId, {this.owningPool}) : super(socket) { metaData = InboundConnectionMetadata() @@ -69,6 +82,10 @@ class InboundConnectionImpl extends BaseConnection secondaryContext.authenticatedInboundIdleTimeMillis; authenticatedMinAllowableIdleTimeMillis = secondaryContext.authenticatedMinAllowableIdleTimeMillis; + + maxRequestsPerTimeFrame = AtSecondaryConfig.maxEnrollRequestsAllowed; + timeFrameInMillis = AtSecondaryConfig.timeFrameInMills; + requestTimestampQueue = Queue(); } /// Returns true if the underlying socket is not null and socket's remote address and port match. @@ -230,4 +247,31 @@ class InboundConnectionImpl extends BaseConnection metaData, 'SENT: ${BaseConnection.truncateForLogging(data)}')); } } + + @override + bool isRequestAllowed() { + int currentTimeInMills = DateTime.now().toUtc().millisecondsSinceEpoch; + _checkAndUpdateQueue(currentTimeInMills); + if (requestTimestampQueue.length < maxRequestsPerTimeFrame) { + requestTimestampQueue.addLast(currentTimeInMills); + return true; + } + return false; + } + + /// Checks and updates the request timestamp queue based on the current time. + /// + /// This method removes timestamps from the queue that are older than the specified + /// time window. + /// + /// [currentTimeInMillis] is the current time in milliseconds since epoch. + void _checkAndUpdateQueue(int currentTimeInMillis) { + if (requestTimestampQueue.isEmpty) return; + int calculatedTime = (currentTimeInMillis - requestTimestampQueue.first); + while (calculatedTime >= timeFrameInMillis) { + requestTimestampQueue.removeFirst(); + if (requestTimestampQueue.isEmpty) break; + calculatedTime = (currentTimeInMillis - requestTimestampQueue.first); + } + } } diff --git a/packages/at_secondary_server/lib/src/exception/global_exception_handler.dart b/packages/at_secondary_server/lib/src/exception/global_exception_handler.dart index ecfe7ec17..c3faead04 100644 --- a/packages/at_secondary_server/lib/src/exception/global_exception_handler.dart +++ b/packages/at_secondary_server/lib/src/exception/global_exception_handler.dart @@ -68,7 +68,8 @@ class GlobalExceptionHandler { exception is KeyNotFoundException || exception is AtConnectException || exception is SocketException || - exception is AtTimeoutException) { + exception is AtTimeoutException || + exception is AtThrottleLimitExceeded) { logger.info(exception.toString()); await _sendResponseForException(exception, atConnection); } else if (exception is InternalServerError) { diff --git a/packages/at_secondary_server/lib/src/server/at_secondary_config.dart b/packages/at_secondary_server/lib/src/server/at_secondary_config.dart index 5b004bb75..e3844660a 100644 --- a/packages/at_secondary_server/lib/src/server/at_secondary_config.dart +++ b/packages/at_secondary_server/lib/src/server/at_secondary_config.dart @@ -121,12 +121,22 @@ class AtSecondaryConfig { ? ConfigUtil.getPubspecConfig()!['version'] : null; - static final int _enrollmentExpiryInHours = 48; - static final Map _envVars = Platform.environment; static String? get secondaryServerVersion => _secondaryServerVersion; + // Enrollment Configurations + static const int _enrollmentExpiryInHours = 48; + static int _maxEnrollRequestsAllowed = 5; + + static final int _timeFrameInHours = 1; + + // For easy of testing, duration in hours is long. Hence introduced "timeFrameInMills" + // to have a shorter time frame. This is defaulted to "_timeFrameInHours", can be modified + // via the config verb + static int _timeFrameInMills = + Duration(hours: _timeFrameInHours).inMilliseconds; + static int get enrollmentExpiryInHours => _enrollmentExpiryInHours; // TODO: Medium priority: Most (all?) getters in this class return a default value but the signatures currently @@ -716,6 +726,54 @@ class AtSecondaryConfig { } } + static int get maxEnrollRequestsAllowed { + // For easy of testing purpose, we need to reduce the number of requests. + // So, in testing mode, enable to modify the "maxEnrollRequestsAllowed" + // can be set via the config verb + // Defaults to value in config.yaml + if (testingMode) { + return _maxEnrollRequestsAllowed; + } + var result = _getIntEnvVar('maxEnrollRequestsAllowed'); + if (result != null) { + return result; + } + try { + return getConfigFromYaml(['enrollment', 'maxRequestsPerTimeFrame']); + } on ElementNotFoundException { + return _maxEnrollRequestsAllowed; + } + } + + static set maxEnrollRequestsAllowed(int value) { + _maxEnrollRequestsAllowed = value; + } + + static int get timeFrameInMills { + // For easy of testing purpose, we need to reduce the time frame. + // So, in testing mode, enable to modify the "timeFrameInMills" + // can be set via the config verb + // Defaults to value in config.yaml + if (testingMode) { + return _timeFrameInMills; + } + var result = _getIntEnvVar('enrollTimeFrameInHours'); + if (result != null) { + return Duration(hours: result).inMilliseconds; + } + try { + return Duration( + hours: getConfigFromYaml(['enrollment', 'timeFrameInHours'])) + .inMilliseconds; + } on ElementNotFoundException { + return Duration(hours: _timeFrameInHours).inMilliseconds; + } + } + + static set timeFrameInMills(int timeWindowInMills) { + _timeFrameInMills = timeWindowInMills; + } + //implementation for config:set. This method returns a data stream which subscribers listen to for updates static Stream? subscribe(ModifiableConfigs configName) { if (testingMode) { @@ -786,6 +844,10 @@ class AtSecondaryConfig { return false; case ModifiableConfigs.doCacheRefreshNow: return false; + case ModifiableConfigs.maxRequestsPerTimeFrame: + return maxEnrollRequestsAllowed; + case ModifiableConfigs.timeFrameInMills: + return Duration(hours: _timeFrameInHours).inMilliseconds; } } @@ -866,7 +928,9 @@ enum ModifiableConfigs { maxNotificationRetries, checkCertificateReload, shouldReloadCertificates, - doCacheRefreshNow + doCacheRefreshNow, + maxRequestsPerTimeFrame, + timeFrameInMills } class ModifiableConfigurationEntry { diff --git a/packages/at_secondary_server/lib/src/server/at_secondary_impl.dart b/packages/at_secondary_server/lib/src/server/at_secondary_impl.dart index 932d65a54..bc20ae6e3 100644 --- a/packages/at_secondary_server/lib/src/server/at_secondary_impl.dart +++ b/packages/at_secondary_server/lib/src/server/at_secondary_impl.dart @@ -421,6 +421,14 @@ class AtSecondaryServerImpl implements AtSecondaryServer { notificationResourceManager.setMaxRetries(newCount); QueueManager.getInstance().setMaxRetries(newCount); }); + + AtSecondaryConfig.subscribe(ModifiableConfigs.maxRequestsPerTimeFrame)?.listen((maxEnrollRequestsAllowed) { + AtSecondaryConfig.maxEnrollRequestsAllowed = maxEnrollRequestsAllowed; + }); + + AtSecondaryConfig.subscribe(ModifiableConfigs.timeFrameInMills)?.listen((timeWindowInMills) { + AtSecondaryConfig.timeFrameInMills = timeWindowInMills; + }); } } diff --git a/packages/at_secondary_server/lib/src/verb/handler/enroll_verb_handler.dart b/packages/at_secondary_server/lib/src/verb/handler/enroll_verb_handler.dart index 49e85eacf..31b43f930 100644 --- a/packages/at_secondary_server/lib/src/verb/handler/enroll_verb_handler.dart +++ b/packages/at_secondary_server/lib/src/verb/handler/enroll_verb_handler.dart @@ -105,11 +105,17 @@ class EnrollVerbHandler extends AbstractVerbHandler { /// and its corresponding state. /// /// Throws "AtEnrollmentException", if the OTP provided is invalid. + /// Throws [AtThrottleLimitExceeded], if the number of requests exceed within + /// a time window. Future _handleEnrollmentRequest( EnrollParams enrollParams, currentAtSign, Map responseJson, InboundConnection atConnection) async { + if (!atConnection.isRequestAllowed()) { + throw AtThrottleLimitExceeded( + 'Enrollment requests have exceeded the limit within the specified time frame'); + } if (!atConnection.getMetaData().isAuthenticated) { var otp = enrollParams.otp; if (otp == null || diff --git a/packages/at_secondary_server/pubspec.yaml b/packages/at_secondary_server/pubspec.yaml index 1a27244c6..03be29b74 100644 --- a/packages/at_secondary_server/pubspec.yaml +++ b/packages/at_secondary_server/pubspec.yaml @@ -22,7 +22,7 @@ dependencies: at_utils: 3.0.15 at_chops: 1.0.4 at_lookup: 3.0.40 - at_server_spec: 3.0.14 + at_server_spec: 3.0.15 at_persistence_spec: 2.0.14 at_persistence_secondary_server: 3.0.57 expire_cache: ^2.0.1 diff --git a/packages/at_secondary_server/test/inbound_connection_impl_test.dart b/packages/at_secondary_server/test/inbound_connection_impl_test.dart new file mode 100644 index 000000000..89c2c1d5c --- /dev/null +++ b/packages/at_secondary_server/test/inbound_connection_impl_test.dart @@ -0,0 +1,44 @@ +import 'dart:io'; + +import 'package:at_secondary/src/connection/inbound/inbound_connection_impl.dart'; +import 'package:at_server_spec/at_server_spec.dart'; +import 'package:test/test.dart'; + +void main(){ + group('A test to verify the rate limiter on inbound connection', () { + test('A test to verify requests exceeding the limit are rejected', () { + Socket? dummySocket; + AtConnection connection1 = InboundConnectionImpl(dummySocket, 'aaa'); + (connection1 as InboundConnectionImpl).maxRequestsPerTimeFrame = 1; + connection1.timeFrameInMillis = + Duration(milliseconds: 10).inMilliseconds; + expect(connection1.isRequestAllowed(), true); + expect(connection1.isRequestAllowed(), false); + }); + + test('A test to verify requests after the time window are accepted', + () async { + Socket? dummySocket; + AtConnection connection1 = InboundConnectionImpl(dummySocket, 'aaa'); + (connection1 as InboundConnectionImpl).maxRequestsPerTimeFrame = 1; + connection1.timeFrameInMillis = Duration(milliseconds: 2).inMilliseconds; + expect(connection1.isRequestAllowed(), true); + expect(connection1.isRequestAllowed(), false); + await Future.delayed(Duration(milliseconds: 2)); + expect(connection1.isRequestAllowed(), true); + }); + + test('A test to verify request from different connection is allowed', () { + Socket? dummySocket; + AtConnection connection1 = InboundConnectionImpl(dummySocket, 'aaa'); + AtConnection connection2 = InboundConnectionImpl(dummySocket, 'aaa'); + (connection1 as InboundConnectionImpl).maxRequestsPerTimeFrame = 1; + (connection2 as InboundConnectionImpl).maxRequestsPerTimeFrame = 1; + connection1.timeFrameInMillis = + Duration(milliseconds: 10).inMilliseconds; + expect(connection1.isRequestAllowed(), true); + expect(connection1.isRequestAllowed(), false); + expect(connection2.isRequestAllowed(), true); + }); + }); +} \ No newline at end of file diff --git a/tests/at_functional_test/pubspec.yaml b/tests/at_functional_test/pubspec.yaml index f688db31a..e0a4919e0 100644 --- a/tests/at_functional_test/pubspec.yaml +++ b/tests/at_functional_test/pubspec.yaml @@ -16,7 +16,7 @@ dependencies: ref: trunk at_chops: ^1.0.1 at_lookup: ^3.0.32 - at_commons: ^3.0.53 + at_commons: ^3.0.55 uuid: ^3.0.7 elliptic: ^0.3.8 diff --git a/tests/at_functional_test/test/enroll_verb_test.dart b/tests/at_functional_test/test/enroll_verb_test.dart index 2868b6704..afa4e56db 100644 --- a/tests/at_functional_test/test/enroll_verb_test.dart +++ b/tests/at_functional_test/test/enroll_verb_test.dart @@ -745,6 +745,120 @@ void main() { 'Only pending enrollments can be approved'); }); }); + + group('A group of test related to Rate limiting enrollment requests', () { + String otp = ''; + setUp(() async { + await socket_writer(socketConnection1!, 'from:$firstAtsign'); + var fromResponse = await read(); + fromResponse = fromResponse.replaceAll('data:', ''); + var cramResponse = getDigest(firstAtsign, fromResponse); + await socket_writer(socketConnection1!, 'cram:$cramResponse'); + var cramResult = await read(); + expect(cramResult, 'data:success\n'); + await socket_writer( + socketConnection1!, 'config:set:maxRequestsPerTimeFrame=1\n'); + var configResponse = await read(); + expect(configResponse.trim(), 'data:ok'); + await socket_writer( + socketConnection1!, 'config:set:timeFrameInMills=100\n'); + configResponse = await read(); + expect(configResponse.trim(), 'data:ok'); + await socket_writer(socketConnection1!, 'otp:get'); + otp = await read(); + otp = otp.replaceAll('data:', '').trim(); + }); + + test( + 'A test to verify exception is thrown when request exceed the configured limit', + () async { + SecureSocket unAuthenticatedConnection = + await secure_socket_connection(firstAtsignServer, firstAtsignPort); + socket_listener(unAuthenticatedConnection); + var enrollRequest = + 'enroll:request:{"appName":"wavi","deviceName":"pixel","namespaces":{"wavi":"rw"},"otp":"$otp","apkamPublicKey":"${pkamPublicKeyMap[firstAtsign]!}"}\n'; + await socket_writer(unAuthenticatedConnection, enrollRequest); + var enrollmentResponse = + jsonDecode((await read()).replaceAll('data:', '')); + expect(enrollmentResponse['status'], 'pending'); + expect(enrollmentResponse['enrollmentId'], isNotNull); + enrollRequest = + 'enroll:request:{"appName":"wavi","deviceName":"pixel","namespaces":{"wavi":"rw"},"otp":"$otp","apkamPublicKey":"${pkamPublicKeyMap[firstAtsign]!}"}\n'; + await socket_writer(unAuthenticatedConnection, enrollRequest); + enrollmentResponse = await read() + ..replaceAll('error:', ''); + expect( + enrollmentResponse.contains( + 'Enrollment requests have exceeded the limit within the specified time frame'), + true); + }); + + test('A test to verify request is successful after the time window', + () async { + SecureSocket unAuthenticatedConnection = + await secure_socket_connection(firstAtsignServer, firstAtsignPort); + socket_listener(unAuthenticatedConnection); + var enrollRequest = + 'enroll:request:{"appName":"wavi","deviceName":"pixel","namespaces":{"wavi":"rw"},"otp":"$otp","apkamPublicKey":"${pkamPublicKeyMap[firstAtsign]!}"}\n'; + await socket_writer(unAuthenticatedConnection, enrollRequest); + var enrollmentResponse = + jsonDecode((await read()).replaceAll('data:', '')); + expect(enrollmentResponse['status'], 'pending'); + expect(enrollmentResponse['enrollmentId'], isNotNull); + enrollRequest = + 'enroll:request:{"appName":"wavi","deviceName":"pixel","namespaces":{"wavi":"rw"},"otp":"$otp","apkamPublicKey":"${pkamPublicKeyMap[firstAtsign]!}"}\n'; + await socket_writer(unAuthenticatedConnection, enrollRequest); + enrollmentResponse = await read() + ..replaceAll('error:', ''); + expect( + enrollmentResponse.contains( + 'Enrollment requests have exceeded the limit within the specified time frame'), + true); + await Future.delayed(Duration(milliseconds: 110)); + await socket_writer(unAuthenticatedConnection, enrollRequest); + enrollmentResponse = jsonDecode((await read()).replaceAll('data:', '')); + expect(enrollmentResponse['status'], 'pending'); + expect(enrollmentResponse['enrollmentId'], isNotNull); + }); + + test('A test to verify rate limit is per connection', () async { + SecureSocket unAuthenticatedConnection = + await secure_socket_connection(firstAtsignServer, firstAtsignPort); + socket_listener(unAuthenticatedConnection); + var enrollRequest = + 'enroll:request:{"appName":"wavi","deviceName":"pixel","namespaces":{"wavi":"rw"},"otp":"$otp","apkamPublicKey":"${pkamPublicKeyMap[firstAtsign]!}"}\n'; + await socket_writer(unAuthenticatedConnection, enrollRequest); + var enrollmentResponse = + jsonDecode((await read()).replaceAll('data:', '')); + expect(enrollmentResponse['status'], 'pending'); + expect(enrollmentResponse['enrollmentId'], isNotNull); + enrollRequest = + 'enroll:request:{"appName":"wavi","deviceName":"pixel","namespaces":{"wavi":"rw"},"otp":"$otp","apkamPublicKey":"${pkamPublicKeyMap[firstAtsign]!}"}\n'; + await socket_writer(unAuthenticatedConnection, enrollRequest); + enrollmentResponse = await read() + ..replaceAll('error:', ''); + expect( + enrollmentResponse.contains( + 'Enrollment requests have exceeded the limit within the specified time frame'), + true); + SecureSocket secondUnAuthenticatedConnection2 = + await secure_socket_connection(firstAtsignServer, firstAtsignPort); + socket_listener(secondUnAuthenticatedConnection2); + enrollRequest = + 'enroll:request:{"appName":"wavi","deviceName":"pixel","namespaces":{"wavi":"rw"},"otp":"$otp","apkamPublicKey":"${pkamPublicKeyMap[firstAtsign]!}"}\n'; + await socket_writer(secondUnAuthenticatedConnection2, enrollRequest); + enrollmentResponse = jsonDecode((await read()).replaceAll('data:', '')); + expect(enrollmentResponse['status'], 'pending'); + expect(enrollmentResponse['enrollmentId'], isNotNull); + }); + + tearDown(() async { + socket_writer(socketConnection1!, 'config:reset:maxRequestsAllowed'); + await read(); + socket_writer(socketConnection1!, 'config:reset:timeWindowInMills'); + await read(); + }); + }); } Future _getOTPFromServer(String atSign) async {