Skip to content

Commit

Permalink
Merge pull request #2227 from atsign-foundation/enable-better-reset-h…
Browse files Browse the repository at this point in the history
…andling

feat: Enable better reset handling
  • Loading branch information
gkc authored Feb 26, 2025
2 parents 277fc29 + 1d3b544 commit b72f37f
Show file tree
Hide file tree
Showing 16 changed files with 322 additions and 174 deletions.
12 changes: 12 additions & 0 deletions packages/at_secondary_server/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,15 @@
# 3.3.0
- feat: Add support for "atServer events" - starting with the
`AtSignPKChangedEvent`. atServer events are stored in a newly reserved
namespace called `__atserver` to which all clients will have read access
but not write access - creating new atServer events is solely an atServer
responsibility. Clients will typically fetch events when they initially
connect, and will then handle appropriately (for example: store the event
information locally; handle it; mark as processed locally.)
Clients should keep a marker for the latest event they have
fetched so that when they restart they do not re-process past events.
Newly-created clients should set their initial marker to
microsecondsSinceEpoch so that they do not process past events unnecessarily.
# 3.2.0
- feat: Added WebSocket support for inbound connections
# 3.1.1
Expand Down
2 changes: 1 addition & 1 deletion packages/at_secondary_server/bin/main.dart
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import 'package:at_utils/at_logger.dart';
/// given port.
/// Throws [SocketException] on invalid port.
/// Throws [ArgParserException] for invalid arguments passed.
/// @ param - List<String> atSign and port
/// @ param - `List<String>` atSign and port
Future<void> main(List<String> arguments) async {
AtSignLogger.root_level = AtSecondaryConfig.logLevel;

Expand Down
262 changes: 192 additions & 70 deletions packages/at_secondary_server/lib/src/caching/cache_manager.dart
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,31 @@ import 'package:at_commons/at_commons.dart';
import 'package:at_persistence_secondary_server/at_persistence_secondary_server.dart';
import 'package:at_secondary/src/connection/inbound/dummy_inbound_connection.dart';
import 'package:at_secondary/src/connection/outbound/outbound_client_manager.dart';
import 'package:at_secondary/src/utils/notification_util.dart';
import 'package:at_secondary/src/utils/secondary_util.dart';
import 'package:at_utils/at_logger.dart';
import 'package:meta/meta.dart';

class CacheUpdateResult {
final bool newEntry, dataChanged, metadataChanged;

CacheUpdateResult({
required this.newEntry,
required this.dataChanged,
required this.metadataChanged,
});
}

class AtCacheManager {
/// This atServer's atSign
final String atSign;
final SecondaryKeyStore<String, AtData?, AtMetaData?> keyStore;
final OutboundClientManager outboundClientManager;

final logger = AtSignLogger('AtCacheManager');

AtCacheManager(this.atSign, this.keyStore, this.outboundClientManager);
AtCacheManager(String atSign, this.keyStore, this.outboundClientManager)
: atSign = atSign.toAtsign();

/// Returns a List of keyNames of all cached records due to refresh
Future<List<String>> getKeyNamesToRefresh() async {
Expand Down Expand Up @@ -267,22 +281,9 @@ class AtCacheManager {

/// Update the cached data.
///
/// If the cached key name starts with 'cached:public:publickey@' then it has special handling logic
///
/// If the value of a cached:public:publickey@atSign has changed, we are dealing with the aftermath of an
/// atServer reset where the owner has re-onboarded with a different encryption keypair.
///
/// When that happens, we need to do some stuff in this atServer's keyStore so that
/// some client for this atSign can know that it needs to cut a new shared encryption key
/// (or, if client library supports it, reuse the old shared encryption key)
/// and share it with the other atSign. (Context: sharing a shared encryption key involves
/// encrypting it with the other atSign's encryption public key)
///
/// In essence, this is the server providing the minimum crude signal to clients that
/// they need to do something. As we extend the client libraries to understand these
/// post-reset scenarios better, they can be smarter but right now all client libraries
/// know that they first check if there is a shared key, and if not then they create one.
Future<void> put(String cachedKeyName, AtData atData) async {
/// If the cached key name starts with 'cached:public:publickey@' then it
/// has special handling logic - see [putCachedPublicKey]
Future<CacheUpdateResult> put(String cachedKeyName, AtData atData) async {
logger.info("put: $cachedKeyName");
if (!cachedKeyName.startsWith('cached:')) {
throw IllegalArgumentException(
Expand All @@ -294,83 +295,204 @@ class AtCacheManager {
}

// For everything other than 'cached:public:publickey@atSign' just put it into the key store
AtData? existingAtData;
if (keyStore.isKeyExists(cachedKeyName)) {
existingAtData = await keyStore.get(cachedKeyName);
}

if (!cachedKeyName.startsWith('cached:public:publickey@')) {
await keyStore.put(cachedKeyName, atData,
time_to_refresh: atData.metaData!.ttr,
time_to_live: atData.metaData!.ttl);
return;
if (existingAtData == null) {
return CacheUpdateResult(
newEntry: true,
dataChanged: true,
metadataChanged: true,
);
} else {
bool dataChanged = false;
bool metadataChanged = false;
// existing cached entry - check (1) has data changed (2) has metadata changed
if (existingAtData.data != atData.data) {
dataChanged = true;
}
if (existingAtData.metaData != atData.metaData) {
metadataChanged = true;
}
return CacheUpdateResult(
newEntry: false,
dataChanged: dataChanged,
metadataChanged: metadataChanged,
);
}
} else {
return putCachedPublicKey(cachedKeyName, atData, existingAtData);
}
}

// For publickey@atSign, we need to do some more stuff
// We have two things to take care of
// a) If it's not currently in the cache, then just update the cache and return
// b) It is currently in the cache
//
// If the data (public encryption key of another atSign) has actually changed, then we need to update the cache
// ==> in fact we're going to remove the current key from the keystore, and create the new one,
// so that we get the correct 'createdAt' value
// If the data has not changed, then we don't need to do anything
var otherAtSignWithoutTheAt =
cachedKeyName.replaceFirst('cached:public:publickey@', '');
/// For publickey@atSign, we need to do some more stuff.
///
/// We have two things to take care of
/// - a) If it's not currently in the cache, then just update the cache and return
/// - b) It is currently in the cache, but is unchanged - nothing to do
/// - c) It is currently in the cache, and has changed
///
/// If the data (public encryption key of another atSign) has actually
/// changed, then we need to update the cache
/// - in fact we're going to remove the current key from the keystore,
/// and create the new one, so that we get the correct 'createdAt'
/// value
@visibleForTesting
Future<CacheUpdateResult> putCachedPublicKey(
String cachedKeyName,
AtData atData,
AtData? existingAtData,
) async {
var otherAtSign =
cachedKeyName.replaceFirst('cached:public:publickey@', '@').toAtsign();
try {
// 1) If it's not currently in the cache, then just update the cache and return
if (!keyStore.isKeyExists(cachedKeyName)) {
await keyStore.put(cachedKeyName, atData, time_to_refresh: -1);
return;
return CacheUpdateResult(
newEntry: true,
dataChanged: true,
metadataChanged: true,
);
}

// 2) It is currently in the cache
// If the data (public encryption key of another atSign) has actually changed, then we need to update the cache
// If the data has not changed, then we don't need to do anything
bool publicKeyChanged = false;
if (keyStore.isKeyExists(cachedKeyName)) {
bool metadataChanged = false;
if (existingAtData != null) {
// If existing value in cache
// ⁃ fetch it, and compare its value with the new value
late AtData existing;
try {
existing = (await keyStore.get(cachedKeyName))!;
if (atData.data != null &&
atData.data != 'null' &&
existing.data != atData.data) {
// We're only setting the 'publicKeyChanged' flag to true IFF
// 1) We previously had real data and we also have some new real data (not null, nor the literal value 'null')
// 2) The data is actually different
publicKeyChanged = true;
}
} on KeyNotFoundException catch (unexpected) {
logger.severe(
'Unexpected KeyNotFoundException when retrieving $cachedKeyName after first checking that it existed : $unexpected');
}
}
if (publicKeyChanged) {
logger.warning('Public key $cachedKeyName has changed');
// Key has actually changed

// Firstly - Find shared_key.otherAtSign@myAtSign and rename it to shared_key.other.until.now@myAtSign
// e.g. find shared_key.bob@alice and rename it to shared_key.bob.until.<epochMillis>@alice
var now = DateTime.now().toUtc().millisecondsSinceEpoch;
var nameOfMyCopyOfSharedKey =
'shared_key.$otherAtSignWithoutTheAt$atSign';
if (keyStore.isKeyExists(nameOfMyCopyOfSharedKey)) {
AtData data = (await keyStore.get(nameOfMyCopyOfSharedKey))!;

logger.warning('Removing $nameOfMyCopyOfSharedKey');
await keyStore.remove(nameOfMyCopyOfSharedKey);

var copyOfSharedKeyKeyName =
'shared_key.$otherAtSignWithoutTheAt.until.$now$atSign';
logger.warning('Creating $copyOfSharedKeyKeyName');
await keyStore.put(copyOfSharedKeyKeyName, data);
// We're only setting the 'publicKeyChanged' flag to true IFF
// 1) We previously had real data and we also have some new real data (not null, nor the literal value 'null')
// 2) The data is actually different
if (atData.data != null &&
atData.data != 'null' &&
existingAtData.data != atData.data) {
publicKeyChanged = true;
}

// Secondly, update the cache, and ensure that ttr is set to -1 (cache indefinitely)
await keyStore.remove(cachedKeyName);
await keyStore.put(cachedKeyName, atData, time_to_refresh: -1);
metadataChanged = atData.metaData != existingAtData.metaData;
}
if (!publicKeyChanged) {
// nothing's changed, nothing more to do, just return
return CacheUpdateResult(
newEntry: false,
dataChanged: false,
metadataChanged: false,
);
} else {
// Key has actually changed - we have things to do
await _handleOthersPublicKeyHasChanged(
otherAtSign,
cachedKeyName,
atData,
existingAtData!,
);

return CacheUpdateResult(
newEntry: false,
dataChanged: publicKeyChanged,
metadataChanged: metadataChanged,
);
}
} catch (e, st) {
logger.severe(
'Exception when handling public key changed event for @$otherAtSignWithoutTheAt : $e\n$st');
'Exception when handling public key changed event for $otherAtSign : $e\n$st');
rethrow;
}
}

/// If the value of a cached:public:publickey@atSign has changed, we are dealing with the aftermath of an
/// atServer reset where the owner has re-onboarded with a different encryption keypair.
/// <p/>
///
/// When that happens, we need to do some stuff in this atServer's keyStore so that
/// some client for this atSign can know that it needs to cut a new shared encryption key
/// (or, if client library supports it, reuse the old shared encryption key)
/// and share it with the other atSign. (Context: sharing a shared encryption key involves
/// encrypting it with the other atSign's encryption public key)
/// <p/>
///
/// In essence, this is the server providing the minimum crude signal to
/// clients that there has been a public key change, and they probably need
/// to do something.
/// As we extend the client libraries to understand these post-reset
/// scenarios better, they can be smarter but right now all client libraries
/// know that they first check if there is a shared key, and if not then they
/// create one.
Future<void> _handleOthersPublicKeyHasChanged(
String otherAtSign,
String cachedKeyName,
AtData atData,
AtData existingAtData,
) async {
logger.warning('Public key $cachedKeyName has changed');

// First and most important - create a record that this has happened; this
// is so that clients have a standardized way to know that another atSign
// has changed its public key and take appropriate actions as it wishes.
final event = AtSignPKChangedEvent(otherAtSign);

// store the event for retrieval by clients
int nowMicros = DateTime.now().microsecondsSinceEpoch;
String keyName = '$nowMicros.events'
'.${AtConstants.atServerReservedNamespace}'
'@${atSign.withoutAt()}';
await keyStore.put(keyName, AtData()..data = jsonEncode(event.toJson()));

AtData? stored = await keyStore.get(keyName);
logger.warning('Created AtSignPKChangedEvent for $otherAtSign.'
' Stored event keyName: $keyName value: ${stored?.data}');

// send a 'self' notification for clients which may be listening
// expire it after 5 minutes since interested clients will be fetching
// these events when they start up, as well as listening for notifications
final notif = (AtNotificationBuilder()
..notification = keyName
..fromAtSign = atSign
..toAtSign = atSign
..ttl = 5 * 60 * 1000 // 5 minutes expiration
..type = NotificationType.self
..opType = OperationType.update
..atValue = jsonEncode(event.toJson()))
.build();
final notificationId = await NotificationUtil.storeNotification(notif);
logger
.warning('Sent self notification re $otherAtSign AtSignPKChangedEvent.'
' Notif ID: $notificationId'
' Notification: ${notif.toJson()}');

// Housekeeping for older clients.
// Housekeeping (1): find shared_key.otherAtSign@myAtSign and rename it to
// shared_key.other.until.now@myAtSign e.g. find shared_key.bob@alice and
// rename it to shared_key.bob.until.<epochMillis>@alice
int now = DateTime.now().toUtc().millisecondsSinceEpoch;
var nameOfMyCopyOfSharedKey =
'shared_key.${otherAtSign.withoutAt()}$atSign';
if (keyStore.isKeyExists(nameOfMyCopyOfSharedKey)) {
AtData data = (await keyStore.get(nameOfMyCopyOfSharedKey))!;

logger.warning('Removing $nameOfMyCopyOfSharedKey');
await keyStore.remove(nameOfMyCopyOfSharedKey);

var copyOfSharedKeyKeyName =
'shared_key.${otherAtSign.withoutAt()}.until.$now$atSign';
logger.warning('Creating $copyOfSharedKeyKeyName');
await keyStore.put(copyOfSharedKeyKeyName, data);
}

// Housekeeping (2): update the cache
// and ensure that ttr is set to -1 (cache indefinitely)
await keyStore.remove(cachedKeyName);
await keyStore.put(cachedKeyName, atData, time_to_refresh: -1);
}

/// Does the remote lookup - returns the atProtocol string which it receives
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import 'package:at_server_spec/at_server_spec.dart';

import 'inbound_connection_pool.dart';

// ignore: implementation_imports
// ignore: implementation_imports, unnecessary_import
import 'package:at_server_spec/src/at_rate_limiter/at_rate_limiter.dart';

class InboundRateLimiter implements AtRateLimiter {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ class GlobalExceptionHandler {

/// Method to write response to client
/// Params: AtException, AtConnection
/// We'll get error code based on the exception and write error:<error_code> to the client socket
/// We'll get error code based on the exception
/// and write `error:<error_code>` to the client socket
Future<void> _sendResponseForException(
Exception exception, AtConnection? atConnection) async {
if (atConnection != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import 'package:at_persistence_secondary_server/at_persistence_secondary_server.
import 'package:at_secondary/src/notification/priority_queue_impl.dart';

///Class implements [NotificationStrategy].
/// Latest Notifications contains Map <String, AtNotificationPriorityQueue> where String is notifier
/// Latest Notifications contains `Map <String, AtNotificationPriorityQueue>` where String is notifier
/// AtNotificationPriorityQueue contains AtNotifications on priority basis.
class LatestNotifications implements NotificationStrategy {
final _latestNotificationsMap = <String?, AtNotificationPriorityQueue>{};
Expand Down
Loading

0 comments on commit b72f37f

Please sign in to comment.