diff --git a/packages/at_persistence_secondary_server/lib/src/log/commitlog/at_commit_log.dart b/packages/at_persistence_secondary_server/lib/src/log/commitlog/at_commit_log.dart index edb8088e0..fcc57a79d 100644 --- a/packages/at_persistence_secondary_server/lib/src/log/commitlog/at_commit_log.dart +++ b/packages/at_persistence_secondary_server/lib/src/log/commitlog/at_commit_log.dart @@ -5,8 +5,33 @@ import 'package:at_utf7/at_utf7.dart'; import 'package:at_utils/at_logger.dart'; import 'package:hive/hive.dart'; +abstract class BaseAtCommitLog implements AtLogType { + Future lastSyncedEntry() async { + // Implemented by [ClientAtCommitLog] + throw UnimplementedError(); + } + + Future lastSyncedEntryWithRegex(String regex) async { + // Implemented by [ClientAtCommitLog] + throw UnimplementedError(); + } + + /// Returns the commit entry for a given commit sequence number + /// throws [DataStoreException] if there is an exception getting the commit entry + Future getEntry(int? sequenceNumber) async { + // Implemented by [ClientAtCommitLog] + throw UnimplementedError(); + } + + Future update(CommitEntry commitEntry, int commitId) async { + // Implemented by [ClientAtCommitLog] + throw UnimplementedError(); + } +} + /// Class to maintain commit logs on the secondary server for create, update and remove operations on keys -class AtCommitLog implements AtLogType { +@server +class AtCommitLog extends BaseAtCommitLog { var logger = AtSignLogger('AtCommitLog'); late final List _atChangeEventListener = []; @@ -54,54 +79,6 @@ class AtCommitLog implements AtLogType { return result; } - /// Returns the commit entry for a given commit sequence number - /// throws [DataStoreException] if there is an exception getting the commit entry - @client - Future getEntry(int? sequenceNumber) async { - try { - var commitEntry = await _commitLogKeyStore.get(sequenceNumber!); - return commitEntry; - } on Exception catch (e) { - throw DataStoreException('Exception getting entry:${e.toString()}'); - } on HiveError catch (e) { - throw DataStoreException( - 'Hive error adding to commit log:${e.toString()}'); - } - } - - /// Returns the list of commit entries greater than [sequenceNumber] - /// throws [DataStoreException] if there is an exception getting the commit entries - Future> getChanges(int? sequenceNumber, String? regex, - {int? limit}) async { - Future> changes; - try { - changes = _commitLogKeyStore.getChanges(sequenceNumber!, - regex: regex, limit: limit); - } on Exception catch (e) { - throw DataStoreException('Exception getting changes:${e.toString()}'); - } on HiveError catch (e) { - throw DataStoreException( - 'Hive error adding to commit log:${e.toString()}'); - } - // ignore: unnecessary_null_comparison - if (changes == null) { - return []; - } - return changes; - } - - @client - Future update(CommitEntry commitEntry, int commitId) async { - try { - await _commitLogKeyStore.update(commitId, commitEntry); - } on Exception catch (e) { - throw DataStoreException('Exception updating entry:${e.toString()}'); - } on HiveError catch (e) { - throw DataStoreException( - 'Hive error updating entry to commit log:${e.toString()}'); - } - } - /// Returns the latest committed sequence number @server int? lastCommittedSequenceNumber() { @@ -114,16 +91,6 @@ class AtCommitLog implements AtLogType { return await _commitLogKeyStore.lastCommittedSequenceNumberWithRegex(regex); } - @client - Future lastSyncedEntry() async { - return await _commitLogKeyStore.lastSyncedEntry(); - } - - @client - Future lastSyncedEntryWithRegex(String regex) async { - return await _commitLogKeyStore.lastSyncedEntry(regex: regex); - } - /// Returns the first committed sequence number @server int? firstCommittedSequenceNumber() { @@ -133,7 +100,6 @@ class AtCommitLog implements AtLogType { /// Returns the total number of keys /// @return - int : Returns number of keys in access log @override - @server int entriesCount() { return _commitLogKeyStore.entriesCount(); } @@ -229,4 +195,77 @@ class AtCommitLog implements AtLogType { String toString() { return runtimeType.toString(); } + + /// Returns the list of commit entries greater than [sequenceNumber] + /// throws [DataStoreException] if there is an exception getting the commit entries + Future> getChanges(int? sequenceNumber, String? regex, + {int? limit}) async { + throw UnimplementedError(''); + } +} + +@client +class ClientAtCommitLog extends AtCommitLog { + ClientAtCommitLog(CommitLogKeyStore keyStore) : super(keyStore); + + /// Returns the commit entry for a given commit sequence number + /// throws [DataStoreException] if there is an exception getting the commit entry + @override + Future getEntry(int? sequenceNumber) async { + try { + var commitEntry = await _commitLogKeyStore.get(sequenceNumber!); + return commitEntry; + } on Exception catch (e) { + throw DataStoreException('Exception getting entry:${e.toString()}'); + } on HiveError catch (e) { + throw DataStoreException( + 'Hive error adding to commit log:${e.toString()}'); + } + } + + @override + Future update(CommitEntry commitEntry, int commitId) async { + try { + await _commitLogKeyStore.update(commitId, commitEntry); + } on Exception catch (e) { + throw DataStoreException('Exception updating entry:${e.toString()}'); + } on HiveError catch (e) { + throw DataStoreException( + 'Hive error updating entry to commit log:${e.toString()}'); + } + } + + @override + Future lastSyncedEntry() async { + return await (_commitLogKeyStore as ClientCommitLogKeyStore) + .lastSyncedEntry(); + } + + @override + Future lastSyncedEntryWithRegex(String regex) async { + return await (_commitLogKeyStore as ClientCommitLogKeyStore) + .lastSyncedEntry(regex: regex); + } + + /// Returns the list of commit entries greater than [sequenceNumber] + /// throws [DataStoreException] if there is an exception getting the commit entries + @override + Future> getChanges(int? sequenceNumber, String? regex, + {int? limit}) async { + List changes; + try { + changes = await _commitLogKeyStore.getChanges(sequenceNumber!, + regex: regex, limit: limit); + } on Exception catch (e) { + throw DataStoreException('Exception getting changes:${e.toString()}'); + } on HiveError catch (e) { + throw DataStoreException( + 'Hive error adding to commit log:${e.toString()}'); + } + // ignore: unnecessary_null_comparison + if (changes == null) { + return []; + } + return changes; + } } diff --git a/packages/at_persistence_secondary_server/lib/src/log/commitlog/at_commit_log_manager_impl.dart b/packages/at_persistence_secondary_server/lib/src/log/commitlog/at_commit_log_manager_impl.dart index b4581f537..e1b8699c7 100644 --- a/packages/at_persistence_secondary_server/lib/src/log/commitlog/at_commit_log_manager_impl.dart +++ b/packages/at_persistence_secondary_server/lib/src/log/commitlog/at_commit_log_manager_impl.dart @@ -17,12 +17,18 @@ class AtCommitLogManagerImpl implements AtCommitLogManager { {String? commitLogPath, bool enableCommitId = true}) async { //verify if an instance has been already created for the given instance. if (!_commitLogMap.containsKey(atSign)) { - var commitLogKeyStore = CommitLogKeyStore(atSign); - commitLogKeyStore.enableCommitId = enableCommitId; + CommitLogKeyStore? commitLogKeyStore; + // Creating commit-log for client + if (enableCommitId) { + commitLogKeyStore = CommitLogKeyStore(atSign); + _commitLogMap[atSign] = AtCommitLog(commitLogKeyStore); + } else { + commitLogKeyStore = ClientCommitLogKeyStore(atSign); + _commitLogMap[atSign] = ClientAtCommitLog(commitLogKeyStore); + } if (commitLogPath != null) { await commitLogKeyStore.init(commitLogPath, isLazy: false); } - _commitLogMap[atSign] = AtCommitLog(commitLogKeyStore); } return _commitLogMap[atSign]; } diff --git a/packages/at_persistence_secondary_server/lib/src/log/commitlog/commit_log_keystore.dart b/packages/at_persistence_secondary_server/lib/src/log/commitlog/commit_log_keystore.dart index c709a7008..477dbb888 100644 --- a/packages/at_persistence_secondary_server/lib/src/log/commitlog/commit_log_keystore.dart +++ b/packages/at_persistence_secondary_server/lib/src/log/commitlog/commit_log_keystore.dart @@ -5,33 +5,20 @@ import 'package:at_utils/at_utils.dart'; import 'package:hive/hive.dart'; import 'package:meta/meta.dart'; -class CommitLogKeyStore - with HiveBase - implements LogKeyStore { +@server +class CommitLogKeyStore extends BaseCommitLogKeyStore { final _logger = AtSignLogger('CommitLogKeyStore'); - bool enableCommitId = true; - final String _currentAtSign; - late String _boxName; - - // A Map implementing a LinkedHashMap to preserve the insertion order. - // "{}" is collection literal to represent a LinkedHashMap. - // Stores AtKey and its corresponding commitEntry sorted by their commit-id's - final _commitLogCacheMap = {}; - - /// Contains the entries that are last synced by the client SDK. - - /// The key represents the regex and value represents the [CommitEntry] - final _lastSyncedEntryCacheMap = {}; + late CommitLogCache commitLogCache; - int _latestCommitId = -1; - - int get latestCommitId => _latestCommitId; + int get latestCommitId => commitLogCache.latestCommitId; - CommitLogKeyStore(this._currentAtSign); + CommitLogKeyStore(String currentAtSign) : super(currentAtSign) { + commitLogCache = CommitLogCache(this); + } @override Future initialize() async { - _boxName = 'commit_log_${AtUtils.getShaForAtSign(_currentAtSign)}'; + _boxName = 'commit_log_${AtUtils.getShaForAtSign(currentAtSign)}'; if (!Hive.isAdapterRegistered(CommitEntryAdapter().typeId)) { Hive.registerAdapter(CommitEntryAdapter()); } @@ -39,52 +26,28 @@ class CommitLogKeyStore Hive.registerAdapter(CommitOpAdapter()); } await super.openBox(_boxName); - var lastCommittedSequenceNum = lastCommittedSequenceNumber(); - _logger.finer('last committed sequence: $lastCommittedSequenceNum'); + _logger.finer('Commit log key store is initialized'); await repairCommitLogAndCreateCachedMap(); } - @override - Future get(int commitId) async { - try { - var commitEntry = await getValue(commitId); - return commitEntry; - } on Exception catch (e) { - throw DataStoreException('Exception get entry:${e.toString()}'); - } on HiveError catch (e) { - throw DataStoreException( - 'Hive error getting entry from commit log:${e.toString()}'); - } - } - - @override Future add(CommitEntry? commitEntry) async { int internalKey; try { - internalKey = await _getBox().add(commitEntry); + internalKey = await getBox().add(commitEntry!); //set the hive generated key as commit id - if (enableCommitId) { - commitEntry!.commitId = internalKey; - // update entry with commitId - await _getBox().put(internalKey, commitEntry); - // Delete old commit entry for the same key from the commit log - // For now, just delete the previous entry from _commitLogCacheMap - // Eventually, after the compaction kicks in all of the old entries would be deleted. - // Subsequently, we can then disable the compaction job. - // When commitLogCacheMap.keys.length == getBox().length then we have fully compacted it - if (_commitLogCacheMap[commitEntry.atKey!] != null && - _commitLogCacheMap[commitEntry.atKey!]?.commitId != null) { - await _getBox() - .delete(_commitLogCacheMap[commitEntry.atKey!]?.commitId); - } - // update the commitId in cache commitMap. - _updateCacheLog(commitEntry.atKey!, commitEntry); - if (commitEntry.commitId != null && - commitEntry.commitId! > _latestCommitId) { - _latestCommitId = commitEntry.commitId!; - } + commitEntry.commitId = internalKey; + // update entry with commitId + await getBox().put(internalKey, commitEntry); + CommitEntry? cachedCommitEntry = + commitLogCache.getEntry(commitEntry.atKey!); + + // Delete old commit entry for the same key from the commit log + if (cachedCommitEntry?.commitId != null) { + await getBox().delete(cachedCommitEntry?.commitId); } + // update the commitId in cache commitMap. + commitLogCache.update(commitEntry.atKey!, commitEntry); } on Exception catch (e) { throw DataStoreException('Exception updating entry:${e.toString()}'); } on HiveError catch (e) { @@ -94,67 +57,9 @@ class CommitLogKeyStore return internalKey; } - /// Updates the [commitEntry.commitId] with the given [commitId]. - /// - /// This method is only called by the client(s) because when a key is created on the - /// client side, a record is created in the [CommitLogKeyStore] with a null commitId. - /// At the time sync, a key is created/updated in cloud secondary server and generates - /// the commitId sends it back to client which the gets updated against the commitEntry - /// of the key synced. - /// - @override - Future update(int commitId, CommitEntry? commitEntry) async { - try { - commitEntry!.commitId = commitId; - await _getBox().put(commitEntry.key, commitEntry); - - if (_lastSyncedEntryCacheMap.isEmpty) { - return; - } - // Iterate through the regex's in the _lastSyncedEntryCacheMap. - // Updates the commitEntry against the matching regexes. - for (var regex in _lastSyncedEntryCacheMap.keys) { - if (_acceptKey(commitEntry.atKey!, regex)) { - _lastSyncedEntryCacheMap[regex] = commitEntry; - } - } - } on Exception catch (e) { - throw DataStoreException('Exception updating entry:${e.toString()}'); - } on HiveError catch (e) { - throw DataStoreException( - 'Hive error updating entry to commit log:${e.toString()}'); - } - } - - /// Remove - @override - Future remove(int commitId) async { - try { - final commitEntry = (_getBox() as Box).get(commitId); - await _getBox().delete(commitId); - // invalidate cache for the removed entry - if (commitEntry != null) { - _commitLogCacheMap.remove(commitEntry.atKey); - _logger.finest('removed key : ${commitEntry.atKey} from commit log.'); - } - } on Exception catch (e) { - throw DataStoreException('Exception deleting entry:${e.toString()}'); - } on HiveError catch (e) { - throw DataStoreException( - 'Hive error deleting entry from commit log:${e.toString()}'); - } - } - - /// Returns the latest committed sequence number - int? lastCommittedSequenceNumber() { - var lastCommittedSequenceNum = - _getBox().keys.isNotEmpty ? _getBox().keys.last : null; - return lastCommittedSequenceNum; - } - /// Returns the latest committed sequence number with regex Future lastCommittedSequenceNumberWithRegex(String regex) async { - var lastCommittedEntry = (_getBox() as Box).values.lastWhere( + var lastCommittedEntry = (getBox() as Box).values.lastWhere( (entry) => (_acceptKey(entry.atKey, regex)), orElse: () => NullCommitEntry()); var lastCommittedSequenceNum = @@ -162,42 +67,6 @@ class CommitLogKeyStore return lastCommittedSequenceNum; } - /// Returns the lastSyncedEntry to the local secondary commitLog keystore by the clients. - /// - /// Optionally accepts the regex. Matches the regex against the [CommitEntry.AtKey] and returns the - /// matching [CommitEntry]. Defaulted to accept all patterns. - /// - /// This is used by the clients which have local secondary keystore. Not used by the secondary server. - Future lastSyncedEntry({String regex = '.*'}) async { - CommitEntry? lastSyncedEntry; - if (_lastSyncedEntryCacheMap.containsKey(regex)) { - lastSyncedEntry = _lastSyncedEntryCacheMap[regex]; - _logger.finer( - 'Returning the lastSyncedEntry matching regex $regex from cache. lastSyncedKey : ${lastSyncedEntry!.atKey} with commitId ${lastSyncedEntry.commitId}'); - return lastSyncedEntry; - } - var values = (_getBox() as Box).values.toList()..sort(_sortByCommitId); - if (values.isEmpty) { - return null; - } - // Returns the commitEntry with maximum commitId matching the given regex. - // otherwise returns NullCommitEntry - lastSyncedEntry = values.lastWhere( - (entry) => - (_acceptKey(entry!.atKey!, regex) && (entry.commitId != null)), - orElse: () => NullCommitEntry()); - - if (lastSyncedEntry == null || lastSyncedEntry is NullCommitEntry) { - _logger.finer('Unable to fetch lastSyncedEntry. Returning null'); - return null; - } - - _logger.finer( - 'Updating the lastSyncedEntry matching regex $regex to the cache. Returning lastSyncedEntry with key : ${lastSyncedEntry.atKey} and commitId ${lastSyncedEntry.commitId}'); - _lastSyncedEntryCacheMap.putIfAbsent(regex, () => lastSyncedEntry!); - return lastSyncedEntry; - } - /// Sorts the [CommitEntry]'s order by commit in descending order int _sortByCommitId(dynamic c1, dynamic c2) { if (c1.commitId == null && c2.commitId == null) { @@ -215,27 +84,25 @@ class CommitLogKeyStore /// Returns the first committed sequence number int? firstCommittedSequenceNumber() { var firstCommittedSequenceNum = - _getBox().keys.isNotEmpty ? _getBox().keys.first : null; + getBox().keys.isNotEmpty ? getBox().keys.first : null; return firstCommittedSequenceNum; } /// Returns the total number of keys /// @return - int : Returns number of keys in access log - @override int entriesCount() { int? totalKeys = 0; - totalKeys = _getBox().keys.length; + totalKeys = getBox().keys.length; return totalKeys; } /// Gets the first 'N' keys from the logs /// @param - N : The integer to get the first 'N' /// @return List of first 'N' keys from the log - @override List getFirstNEntries(int N) { var entries = []; try { - entries = _getBox().keys.toList().take(N).toList(); + entries = getBox().keys.toList().take(N).toList(); } on Exception catch (e) { throw DataStoreException( 'Exception getting first N entries:${e.toString()}'); @@ -247,14 +114,30 @@ class CommitLogKeyStore } @override + Future remove(int commitEntryIndex) async { + CommitEntry? commitEntry = (getBox() as Box).get(commitEntryIndex); + await super.remove(commitEntryIndex); + // On removing the entry from commit log keystore, remove the stale entries from + // commit log cache map + if (commitEntry != null) { + commitLogCache.remove(commitEntry.atKey!); + } + } + Future removeAll(List deleteKeysList) async { if (deleteKeysList.isEmpty) { return; } - await _getBox().deleteAll(deleteKeysList); + await getBox().deleteAll(deleteKeysList); + // Removes stale entries from the commit log cache map + for (int key in deleteKeysList) { + CommitEntry? commitEntry = (getBox() as Box).get(key); + if (commitEntry != null) { + commitLogCache.remove(commitEntry.atKey!); + } + } } - @override Future> getExpired(int expiryInDays) async { var expiredKeys = []; var now = DateTime.now().toUtc(); @@ -294,41 +177,6 @@ class CommitLogKeyStore } } - /// Returns the list of commit entries greater than [sequenceNumber] - /// throws [DataStoreException] if there is an exception getting the commit entries - Future> getChanges(int sequenceNumber, - {String? regex, int? limit}) async { - try { - if (_getBox().isEmpty) { - return []; - } - var changes = []; - var regexString = (regex != null) ? regex : ''; - var values = (_getBox() as Box).values; - var startKey = sequenceNumber + 1; - limit ??= values.length + 1; - for (CommitEntry element in values) { - if (element.key >= startKey && - _acceptKey(element.atKey!, regexString) && - changes.length <= limit) { - if (enableCommitId == false) { - if (element.commitId == null) { - changes.add(element); - } - } else { - changes.add(element); - } - } - } - return changes; - } on Exception catch (e) { - throw DataStoreException('Exception getting changes:${e.toString()}'); - } on HiveError catch (e) { - throw DataStoreException( - 'Hive error adding to commit log:${e.toString()}'); - } - } - bool _acceptKey(String atKey, String regex) { return _isRegexMatches(atKey, regex) || _isSpecialKey(atKey); } @@ -344,76 +192,17 @@ class CommitLogKeyStore atKey.contains(AT_SIGNING_PRIVATE_KEY); } - /// Returns a map of all the keys in the commitLog and latest [CommitEntry] of the key. - /// Called in init method of commitLog to initialize on server start-up. - Future> _getCommitIdMap() async { - var keyMap = {}; - Iterable iterable = (_getBox() as Box).values; - for (var value in iterable) { - if (value.commitId == null) { - _logger.finest( - 'CommitID is null for ${value.atKey}. Skipping to update entry into commitLogCacheMap'); - continue; - } - // The reason we remove and add is that, the map which is a LinkedHashMap - // should have data in the following format: - // { - // {k1, v1}, - // {k2, v2}, - // {k3, v3} - // } - // such that v1 < v2 < v3 - // - // If a key exist in the _commitLogCacheMap, updating the commit entry will - // overwrite the existing key resulting into an unsorted map. - // Hence remove the key and insert at the last ensure the entry with highest commitEntry - // is always at the end of the map. - if (keyMap.containsKey(value.atKey)) { - keyMap.remove(value.atKey); - keyMap[value.atKey] = value; - } else { - keyMap[value.atKey] = value; - } - // update the latest commit id - if (value.commitId > _latestCommitId) { - _latestCommitId = value.commitId; - } - } - return keyMap; - } - - /// Updates the commitId of the key. - void _updateCacheLog(String key, CommitEntry commitEntry) { - // The reason we remove and add is that, the map which is a LinkedHashMap - // should have data in the following format: - // { - // {k1, v1}, - // {k2, v2}, - // {k3, v3} - // } - // such that v1 < v2 < v3 - // - // If a key exist in the _commitLogCacheMap, updating the commit entry will - // overwrite the existing key resulting into an unsorted map. - // Hence remove the key and insert at the last ensure the entry with highest commitEntry - // is always at the end of the map. - _commitLogCacheMap.remove(key); - _commitLogCacheMap[key] = commitEntry; - } - /// Returns the latest commitEntry of the key. CommitEntry? getLatestCommitEntry(String key) { - if (_commitLogCacheMap.containsKey(key)) { - return _commitLogCacheMap[key]!; - } - return null; + return commitLogCache.getEntry(key); } - /// Returns the Iterator of [_commitLogCacheMap] from the commitId specified. + /// Returns the Iterator of entries as Key value pairs after the given the [commitId] for the keys that matches the [regex] Iterator> getEntries(int commitId, {String regex = '.*', int limit = 25}) { Iterable> commitEntriesIterable = - _commitLogCacheMap.entries + commitLogCache + .entriesList() .where((element) => element.value.commitId! >= commitId && _acceptKey(element.value.atKey!, regex)) @@ -421,15 +210,11 @@ class CommitLogKeyStore return commitEntriesIterable.iterator; } - BoxBase _getBox() { - return super.getBox(); - } - ///Returns the key-value pair of commit-log where key is hive internal key and ///value is [CommitEntry] Future> toMap() async { var commitLogMap = {}; - var keys = _getBox().keys; + var keys = getBox().keys; await Future.forEach(keys, (key) async { var value = await getValue(key) as CommitEntry; @@ -440,37 +225,21 @@ class CommitLogKeyStore ///Returns the total number of keys in commit log keystore. int getEntriesCount() { - return _getBox().length; - } - - ///Not a part of API. Exposed for Unit test - List getLastSyncedEntryCacheMapValues() { - return _lastSyncedEntryCacheMap.values.toList(); + return getBox().length; } /// Removes entries with malformed keys /// Repairs entries with null commit IDs - /// Clears and repopulates the [_commitLogCacheMap] + /// Clears and repopulates the [commitLogCache] @visibleForTesting Future repairCommitLogAndCreateCachedMap() async { // Ensures the below code runs only when initialized from secondary server. // enableCommitId is set to true in secondary server and to false in client SDK. - if (!enableCommitId) { - return false; - } - Map allEntries = await toMap(); - await removeEntriesWithMalformedAtKeys(allEntries); - await repairNullCommitIDs(allEntries); - - // Cache the latest commitId of each key. - // Add entries to commitLogCacheMap when initialized from at_secondary_server - // and refrain for at_client_sdk. - _commitLogCacheMap.clear(); - _commitLogCacheMap.addAll(await _getCommitIdMap()); - + commitLogCache.clear(); + commitLogCache.initialize(); return true; } @@ -516,7 +285,8 @@ class CommitLogKeyStore await Future.forEach(commitLogMap.keys, (key) async { CommitEntry? commitEntry = commitLogMap[key]; if (commitEntry?.commitId == null) { - await update(key as int, commitEntry); + commitEntry!.commitId = key as int; + await getBox().put(commitEntry.commitId, commitEntry); } }); } @@ -524,6 +294,286 @@ class CommitLogKeyStore /// Not a part of API. Added for unit test @visibleForTesting List> commitEntriesList() { + return commitLogCache.entriesList(); + } +} + +abstract class BaseCommitLogKeyStore with HiveBase { + late String _boxName; + String currentAtSign; + + BaseCommitLogKeyStore(this.currentAtSign); + + Future get(int commitId) async { + try { + return await getValue(commitId); + } on Exception catch (e) { + throw DataStoreException('Exception get entry:${e.toString()}'); + } on HiveError catch (e) { + throw DataStoreException( + 'Hive error getting entry from commit log:${e.toString()}'); + } + } + + Future remove(int commitEntryIndex) async { + try { + await getBox().delete(commitEntryIndex); + } on Exception catch (e) { + throw DataStoreException('Exception deleting entry:${e.toString()}'); + } on HiveError catch (e) { + throw DataStoreException( + 'Hive error deleting entry from commit log:${e.toString()}'); + } + } + + Future update(int commitId, CommitEntry? commitEntry) { + throw UnimplementedError(); + } + + /// Returns the list of commit entries greater than [sequenceNumber] + /// throws [DataStoreException] if there is an exception getting the commit entries + Future> getChanges(int sequenceNumber, + {String? regex, int? limit}) async { + throw UnimplementedError(); + } +} + +@client +class ClientCommitLogKeyStore extends CommitLogKeyStore { + /// Contains the entries that are last synced by the client SDK. + + /// The key represents the regex and value represents the [CommitEntry] + final _lastSyncedEntryCacheMap = {}; + + ClientCommitLogKeyStore(String currentAtSign) : super(currentAtSign); + + /// Initializes the key store and makes it ready for the persistance + @override + Future initialize() async { + _boxName = 'commit_log_${AtUtils.getShaForAtSign(currentAtSign)}'; + if (!Hive.isAdapterRegistered(CommitEntryAdapter().typeId)) { + Hive.registerAdapter(CommitEntryAdapter()); + } + if (!Hive.isAdapterRegistered(CommitOpAdapter().typeId)) { + Hive.registerAdapter(CommitOpAdapter()); + } + await super.openBox(_boxName); + _logger.finer('Commit log key store is initialized'); + } + + /// Adds a [CommitEntry] to the commitlog + /// Returns numeric value generated as the key to persist the data + @override + Future add(CommitEntry? commitEntry) async { + return await getBox().add(commitEntry); + } + + /// Updates the [commitEntry.commitId] with the given [commitId]. + /// + /// This method is only called by the client(s) because when a key is created on the + /// client side, a record is created in the [CommitLogKeyStore] with a null commitId. + /// At the time sync, a key is created/updated in cloud secondary server and generates + /// the commitId sends it back to client which the gets updated against the commitEntry + /// of the key synced. + /// + @override + Future update(int commitId, CommitEntry? commitEntry) async { + try { + commitEntry!.commitId = commitId; + await getBox().put(commitEntry.key, commitEntry); + if (_lastSyncedEntryCacheMap.isEmpty) { + return; + } + // Iterate through the regex's in the _lastSyncedEntryCacheMap. + // Updates the commitEntry against the matching regexes. + for (var regex in _lastSyncedEntryCacheMap.keys) { + if (_acceptKey(commitEntry.atKey!, regex)) { + _lastSyncedEntryCacheMap[regex] = commitEntry; + } + } + } on Exception catch (e) { + throw DataStoreException('Exception updating entry:${e.toString()}'); + } on HiveError catch (e) { + throw DataStoreException( + 'Hive error updating entry to commit log:${e.toString()}'); + } + } + + @override + Future> getChanges(int sequenceNumber, + {String? regex, int? limit}) async { + try { + if (getBox().isEmpty) { + return []; + } + var changes = []; + var regexString = (regex != null) ? regex : ''; + var values = (getBox() as Box).values; + var startKey = sequenceNumber + 1; + limit ??= values.length + 1; + for (CommitEntry element in values) { + if (element.key >= startKey && + _acceptKey(element.atKey!, regexString) && + changes.length <= limit) { + if (element.commitId == null) { + changes.add(element); + } + } + } + return changes; + } on Exception catch (e) { + throw DataStoreException('Exception getting changes:${e.toString()}'); + } on HiveError catch (e) { + throw DataStoreException( + 'Hive error adding to commit log:${e.toString()}'); + } + } + + /// Returns the lastSyncedEntry to the local secondary commitLog keystore by the clients. + /// + /// Optionally accepts the regex. Matches the regex against the [CommitEntry.AtKey] and returns the + /// matching [CommitEntry]. Defaulted to accept all patterns. + /// + /// This is used by the clients which have local secondary keystore. Not used by the secondary server. + Future lastSyncedEntry({String regex = '.*'}) async { + CommitEntry? lastSyncedEntry; + if (_lastSyncedEntryCacheMap.containsKey(regex)) { + lastSyncedEntry = _lastSyncedEntryCacheMap[regex]; + _logger.finer( + 'Returning the lastSyncedEntry matching regex $regex from cache. lastSyncedKey : ${lastSyncedEntry!.atKey} with commitId ${lastSyncedEntry.commitId}'); + return lastSyncedEntry; + } + var values = (getBox() as Box).values.toList()..sort(_sortByCommitId); + if (values.isEmpty) { + return null; + } + // Returns the commitEntry with maximum commitId matching the given regex. + // otherwise returns NullCommitEntry + lastSyncedEntry = values.lastWhere( + (entry) => + (_acceptKey(entry!.atKey!, regex) && (entry.commitId != null)), + orElse: () => NullCommitEntry()); + + if (lastSyncedEntry == null || lastSyncedEntry is NullCommitEntry) { + _logger.finer('Unable to fetch lastSyncedEntry. Returning null'); + return null; + } + + _logger.finer( + 'Updating the lastSyncedEntry matching regex $regex to the cache. Returning lastSyncedEntry with key : ${lastSyncedEntry.atKey} and commitId ${lastSyncedEntry.commitId}'); + _lastSyncedEntryCacheMap.putIfAbsent(regex, () => lastSyncedEntry!); + return lastSyncedEntry; + } + + ///Not a part of API. Exposed for Unit test + List getLastSyncedEntryCacheMapValues() { + return _lastSyncedEntryCacheMap.values.toList(); + } +} + +class CommitLogCache { + final _logger = AtSignLogger('CommitLogCache'); + + // [CommitLogKeyStore] for which the cache is being maintained + CommitLogKeyStore commitLogKeyStore; + + // A Map implementing a LinkedHashMap to preserve the insertion order. + // "{}" is collection literal to represent a LinkedHashMap. + // Stores AtKey and its corresponding commitEntry sorted by their commit-id's + final _commitLogCacheMap = {}; + + // Keeps track of latest commit ID + int _latestCommitId = -1; + + int get latestCommitId => _latestCommitId; + + CommitLogCache(this.commitLogKeyStore); + + /// Initializes the CommitLogCache + void initialize() { + Iterable iterable = (commitLogKeyStore.getBox() as Box).values; + for (var value in iterable) { + if (value.commitId == null) { + _logger.finest( + 'CommitID is null for ${value.atKey}. Skipping to update entry into commitLogCacheMap'); + continue; + } + // The reason we remove and add is that, the map which is a LinkedHashMap + // should have data in the following format: + // { + // {k1, v1}, + // {k2, v2}, + // {k3, v3} + // } + // such that v1 < v2 < v3 + // + // If a key exist in the _commitLogCacheMap, updating the commit entry will + // overwrite the existing key resulting into an unsorted map. + // Hence remove the key and insert at the last ensure the entry with highest commitEntry + // is always at the end of the map. + if (_commitLogCacheMap.containsKey(value.atKey)) { + _commitLogCacheMap.remove(value.atKey); + _commitLogCacheMap[value.atKey] = value; + } else { + _commitLogCacheMap[value.atKey] = value; + } + // update the latest commit id + if (value.commitId > _latestCommitId) { + _latestCommitId = value.commitId; + } + } + } + + /// Updates cache when a new [CommitEntry] for the [key] is added + void update(String key, CommitEntry commitEntry) { + _updateCacheLog(key, commitEntry); + + if (commitEntry.commitId != null && + commitEntry.commitId! > _latestCommitId) { + _latestCommitId = commitEntry.commitId!; + } + } + + /// Updates the commitId of the key. + void _updateCacheLog(String key, CommitEntry commitEntry) { + // The reason we remove and add is that, the map which is a LinkedHashMap + // should have data in the following format: + // { + // {k1, v1}, + // {k2, v2}, + // {k3, v3} + // } + // such that v1 < v2 < v3 + // + // If a key exist in the _commitLogCacheMap, updating the commit entry will + // overwrite the existing key resulting into an unsorted map. + // Hence remove the key and insert at the last ensure the entry with highest commitEntry + // is always at the end of the map. + _commitLogCacheMap.remove(key); + _commitLogCacheMap[key] = commitEntry; + } + + CommitEntry? getEntry(String atKey) { + if (_commitLogCacheMap.containsKey(atKey)) { + return _commitLogCacheMap[atKey]; + } + return null; + } + + /// On commit log compaction, the entries are removed from the + /// Commit Log Keystore. Remove the stale entries from the commit log cache-map + void remove(String atKey) { + _commitLogCacheMap.remove(atKey); + } + + /// Not a part of API. Added for unit test + @visibleForTesting + List> entriesList() { return _commitLogCacheMap.entries.toList(); } + + // Clears all of the entries in cache + void clear() { + _commitLogCacheMap.clear(); + } } diff --git a/packages/at_persistence_secondary_server/pubspec.yaml b/packages/at_persistence_secondary_server/pubspec.yaml index 03ee6b30e..46bf959b4 100644 --- a/packages/at_persistence_secondary_server/pubspec.yaml +++ b/packages/at_persistence_secondary_server/pubspec.yaml @@ -14,7 +14,7 @@ dependencies: crypto: ^3.0.2 uuid: ^3.0.6 at_utf7: ^1.0.0 - at_commons: ^3.0.53 + at_commons: ^3.0.56 at_utils: ^3.0.15 at_persistence_spec: ^2.0.14 meta: ^1.8.0 diff --git a/packages/at_persistence_secondary_server/test/commit_log_test.dart b/packages/at_persistence_secondary_server/test/commit_log_test.dart index c4ff6a9fc..9b8fb42fa 100644 --- a/packages/at_persistence_secondary_server/test/commit_log_test.dart +++ b/packages/at_persistence_secondary_server/test/commit_log_test.dart @@ -9,626 +9,666 @@ import 'package:hive/hive.dart'; void main() async { var storageDir = '${Directory.current.path}/test/hive'; - group('A group of commit log test', () { - setUp(() async => await setUpFunc(storageDir)); - test('test single insert', () async { - var commitLogInstance = - await (AtCommitLogManagerImpl.getInstance().getCommitLog('@alice')); - var hiveKey = - await commitLogInstance!.commit('location@alice', CommitOp.UPDATE); - var committedEntry = await (commitLogInstance.getEntry(hiveKey)); - expect(committedEntry?.key, hiveKey); - expect(committedEntry?.atKey, 'location@alice'); - expect(committedEntry?.operation, CommitOp.UPDATE); - commitLogInstance = null; - }); - test('test multiple insert', () async { - var commitLogInstance = - await (AtCommitLogManagerImpl.getInstance().getCommitLog('@alice')); - await commitLogInstance?.commit('location@alice', CommitOp.UPDATE); - await commitLogInstance?.commit('location@alice', CommitOp.UPDATE); - await commitLogInstance?.commit('location@alice', CommitOp.DELETE); - expect(commitLogInstance?.lastCommittedSequenceNumber(), 2); - }); - - test('test get entry ', () async { - var commitLogInstance = - await (AtCommitLogManagerImpl.getInstance().getCommitLog('@alice')); - var key_1 = - await commitLogInstance?.commit('location@alice', CommitOp.UPDATE); - var committedEntry = await (commitLogInstance?.getEntry(key_1)); - expect(committedEntry?.atKey, 'location@alice'); - expect(committedEntry?.operation, CommitOp.UPDATE); - expect(committedEntry?.opTime, isNotNull); - expect(committedEntry?.commitId, isNotNull); - }); + group('A group of tests on client commit log', () { + setUp(() async => await setUpFunc(storageDir, enableCommitId: false)); + group( + 'A group of tests to verify correct commit entries are returned for a given sequence number', + () { + test( + 'A test to verify getEntry returns CommitEntry for a given sequence number', + () async { + var commitLogInstance = + await (AtCommitLogManagerImpl.getInstance().getCommitLog('@alice')); + var hiveKey = + await commitLogInstance!.commit('location@alice', CommitOp.UPDATE); + var committedEntry = await (commitLogInstance.getEntry(hiveKey)); + expect(committedEntry?.key, hiveKey); + expect(committedEntry?.atKey, 'location@alice'); + expect(committedEntry?.operation, CommitOp.UPDATE); + expect(committedEntry?.commitId, isNull); + commitLogInstance = null; + }); - test('test entries since commit Id', () async { - var commitLogInstance = - await (AtCommitLogManagerImpl.getInstance().getCommitLog('@alice')); + test('A test to verify getChanges the entries from a given sequence', + () async { + var commitLogInstance = + await (AtCommitLogManagerImpl.getInstance().getCommitLog('@alice')); - var key_1 = - await commitLogInstance?.commit('location@alice', CommitOp.UPDATE); + var key_1 = + await commitLogInstance?.commit('location@alice', CommitOp.UPDATE); + await commitLogInstance!.commit('phone@alice', CommitOp.UPDATE); - await commitLogInstance!.commit('phone@alice', CommitOp.UPDATE); - var changes = await commitLogInstance.getChanges(key_1, ''); - expect(changes.length, 1); - expect(changes[0].atKey, 'phone@alice'); + var changes = await commitLogInstance.getChanges(key_1, ''); + expect(changes.length, 1); + expect(changes[0].atKey, 'phone@alice'); + }); }); - test('test last sequence number called once', () async { - var commitLogInstance = - await (AtCommitLogManagerImpl.getInstance().getCommitLog('@alice')); - - await commitLogInstance?.commit('location@alice', CommitOp.UPDATE); - - await commitLogInstance?.commit('location@alice', CommitOp.UPDATE); - expect(commitLogInstance?.lastCommittedSequenceNumber(), 1); - }); + group('A group of tests to verify lastSynced commit entry', () { + setUp(() async => await setUpFunc(storageDir, enableCommitId: false)); + test( + 'test to verify the last synced entry returns entry with highest commit id', + () async { + var commitLogInstance = + await (AtCommitLogManagerImpl.getInstance().getCommitLog('@alice')); + + await commitLogInstance?.commit('location@alice', CommitOp.UPDATE); + await commitLogInstance?.commit('mobile@alice', CommitOp.UPDATE); + await commitLogInstance?.commit('phone@alice', CommitOp.UPDATE); + + CommitEntry? commitEntry0 = await commitLogInstance?.getEntry(0); + await commitLogInstance?.update(commitEntry0!, 1); + CommitEntry? commitEntry1 = await commitLogInstance?.getEntry(1); + await commitLogInstance?.update(commitEntry1!, 0); + var lastSyncedEntry = await commitLogInstance?.lastSyncedEntry(); + expect(lastSyncedEntry!.commitId, 1); + ClientCommitLogKeyStore keyStore = + commitLogInstance?.commitLogKeyStore as ClientCommitLogKeyStore; + var lastSyncedCacheSize = + keyStore.getLastSyncedEntryCacheMapValues().length; + expect(lastSyncedCacheSize, 1); + }); - test('test last sequence number called multiple times', () async { - var commitLogInstance = - await (AtCommitLogManagerImpl.getInstance().getCommitLog('@alice')); + test('test to verify the last synced entry with regex', () async { + var commitLogInstance = + await (AtCommitLogManagerImpl.getInstance().getCommitLog('@alice')); + + await commitLogInstance?.commit('location.buzz@alice', CommitOp.UPDATE); + await commitLogInstance?.commit('mobile.wavi@alice', CommitOp.UPDATE); + await commitLogInstance?.commit('phone.buzz@alice', CommitOp.UPDATE); + + CommitEntry? commitEntry0 = await commitLogInstance?.getEntry(0); + await commitLogInstance?.update(commitEntry0!, 2); + CommitEntry? commitEntry1 = await commitLogInstance?.getEntry(1); + await commitLogInstance?.update(commitEntry1!, 1); + CommitEntry? commitEntry2 = await commitLogInstance?.getEntry(2); + await commitLogInstance?.update(commitEntry2!, 0); + var lastSyncedEntry = + await commitLogInstance?.lastSyncedEntryWithRegex('buzz'); + expect(lastSyncedEntry!.atKey!, 'location.buzz@alice'); + expect(lastSyncedEntry.commitId!, 2); + lastSyncedEntry = + await commitLogInstance?.lastSyncedEntryWithRegex('wavi'); + expect(lastSyncedEntry!.atKey!, 'mobile.wavi@alice'); + expect(lastSyncedEntry.commitId!, 1); + ClientCommitLogKeyStore keyStore = + commitLogInstance?.commitLogKeyStore as ClientCommitLogKeyStore; + var lastSyncedEntriesList = keyStore.getLastSyncedEntryCacheMapValues(); + expect(lastSyncedEntriesList.length, 2); + }); - await commitLogInstance?.commit('location@alice', CommitOp.UPDATE); + test( + 'Test to verify that null is returned when no values are present in local keystore', + () async { + var commitLogInstance = + await (AtCommitLogManagerImpl.getInstance().getCommitLog('@alice')); + var lastSyncedEntry = await commitLogInstance?.lastSyncedEntry(); + expect(lastSyncedEntry, null); + }); - await commitLogInstance?.commit('location@alice', CommitOp.UPDATE); - expect(commitLogInstance?.lastCommittedSequenceNumber(), 1); - expect(commitLogInstance?.lastCommittedSequenceNumber(), 1); + test( + 'Test to verify that null is returned when matches entry for regex is not found', + () async { + var commitLogInstance = + await (AtCommitLogManagerImpl.getInstance().getCommitLog('@alice')); + + await commitLogInstance?.commit('location.buzz@alice', CommitOp.UPDATE); + CommitEntry? commitEntry0 = await commitLogInstance?.getEntry(0); + await commitLogInstance?.update(commitEntry0!, 2); + var lastSyncedEntry = + await commitLogInstance?.lastSyncedEntryWithRegex('wavi'); + expect(lastSyncedEntry, null); + }); }); - - test( - 'test to verify commitId does not increment for public hidden keys with single _', - () async { - var commitLogInstance = - await (AtCommitLogManagerImpl.getInstance().getCommitLog('@alice')); - var commitId = await commitLogInstance?.commit( - 'public:_location@alice', CommitOp.UPDATE); - expect(commitId, -1); - expect(commitLogInstance?.lastCommittedSequenceNumber(), -1); + group('A group of tests related to fetching uncommitted entries', () { + test( + 'A test to verify only commit entries with null commitId are returned when enableCommitId is false', + () async { + var commitLogInstance = + await (AtCommitLogManagerImpl.getInstance().getCommitLog('@alice')); + var commitLogKeystore = commitLogInstance!.commitLogKeyStore; + //setting enable commitId to false - to test client side functionality + //commitLogKeystore.enableCommitId = false; + //loop to create 10 keys - even keys have commitId null - odd keys have commitId + for (int i = 0; i < 10; i++) { + if (i % 2 == 0) { + await commitLogKeystore.getBox().add(CommitEntry( + 'test_key_false_$i', CommitOp.UPDATE, DateTime.now())); + } else { + await commitLogKeystore.getBox().add(CommitEntry( + 'test_key_false_$i', CommitOp.UPDATE, DateTime.now()) + ..commitId = i); + } + } + List changes = + await commitLogInstance.commitLogKeyStore.getChanges(-1); + //run loop and test all commit entries returned have commitId == null + for (var element in changes) { + expect(element.commitId, null); + } + }); }); + tearDown(() async => await tearDownFunc()); + }); - test('test to verify commitId does not increment for privatekey', () async { - var commitLogInstance = - await (AtCommitLogManagerImpl.getInstance().getCommitLog('@alice')); - var commitId = await commitLogInstance?.commit( - 'privatekey:testkey@alice', CommitOp.UPDATE); - expect(commitId, -1); - expect(commitLogInstance?.lastCommittedSequenceNumber(), -1); - }); + group('A group of tests on server commit log', () { + group('A group of commit log test', () { + setUp(() async => await setUpFunc(storageDir, enableCommitId: true)); + test('test multiple insert', () async { + var commitLogInstance = + await (AtCommitLogManagerImpl.getInstance().getCommitLog('@alice')); + await commitLogInstance?.commit('location@alice', CommitOp.UPDATE); + await commitLogInstance?.commit('location@alice', CommitOp.UPDATE); + await commitLogInstance?.commit('location@alice', CommitOp.DELETE); + expect(commitLogInstance?.lastCommittedSequenceNumber(), 2); + }); - test('test to verify commitId increments for signing public key', () async { - var commitLogInstance = - await (AtCommitLogManagerImpl.getInstance().getCommitLog('@alice')); - var commitId = await commitLogInstance?.commit( - 'public:signing_publickey@alice', CommitOp.UPDATE); - expect(commitId, 0); - expect(commitLogInstance?.lastCommittedSequenceNumber(), 0); - }); + test('test last sequence number called once', () async { + var commitLogInstance = + await (AtCommitLogManagerImpl.getInstance().getCommitLog('@alice')); - test('test to verify commitId increments for signing private key', - () async { - var commitLogInstance = - await (AtCommitLogManagerImpl.getInstance().getCommitLog('@alice')); - var commitId = await commitLogInstance?.commit( - '@alice:signing_privatekey@alice', CommitOp.UPDATE); - expect(commitId, 0); - expect(commitLogInstance?.lastCommittedSequenceNumber(), 0); - }); + await commitLogInstance?.commit('location@alice', CommitOp.UPDATE); - test( - 'test to verify commitId does not increment for key starting with private:', - () async { - var commitLogInstance = - await (AtCommitLogManagerImpl.getInstance().getCommitLog('@alice')); - var commitId = await commitLogInstance?.commit( - 'private:testkey@alice', CommitOp.UPDATE); - expect(commitId, -1); - expect(commitLogInstance?.lastCommittedSequenceNumber(), -1); - }); + await commitLogInstance?.commit('location@alice', CommitOp.UPDATE); + expect(commitLogInstance?.lastCommittedSequenceNumber(), 1); + }); - test( - 'test to verify commitId does increment for public hidden keys with multiple __', - () async { - var commitLogInstance = - await (AtCommitLogManagerImpl.getInstance().getCommitLog('@alice')); - var commitId = await commitLogInstance?.commit( - 'public:__location@alice', CommitOp.UPDATE); - expect(commitId, 0); - expect(commitLogInstance?.lastCommittedSequenceNumber(), 0); - }); - tearDown(() async => await tearDownFunc()); - }); + test('test last sequence number called multiple times', () async { + var commitLogInstance = + await (AtCommitLogManagerImpl.getInstance().getCommitLog('@alice')); - group('A group of tests to verify lastSynced commit entry', () { - setUp(() async => await setUpFunc(storageDir, enableCommitId: false)); - test( - 'test to verify the last synced entry returns entry with highest commit id', - () async { - var commitLogInstance = - await (AtCommitLogManagerImpl.getInstance().getCommitLog('@alice')); + await commitLogInstance?.commit('location@alice', CommitOp.UPDATE); - await commitLogInstance?.commit('location@alice', CommitOp.UPDATE); - await commitLogInstance?.commit('mobile@alice', CommitOp.UPDATE); - await commitLogInstance?.commit('phone@alice', CommitOp.UPDATE); - - CommitEntry? commitEntry0 = await commitLogInstance?.getEntry(0); - await commitLogInstance?.update(commitEntry0!, 1); - CommitEntry? commitEntry1 = await commitLogInstance?.getEntry(1); - await commitLogInstance?.update(commitEntry1!, 0); - var lastSyncedEntry = await commitLogInstance?.lastSyncedEntry(); - expect(lastSyncedEntry!.commitId, 1); - var lastSyncedCacheSize = commitLogInstance!.commitLogKeyStore - .getLastSyncedEntryCacheMapValues() - .length; - expect(lastSyncedCacheSize, 1); - }); + await commitLogInstance?.commit('location@alice', CommitOp.UPDATE); + expect(commitLogInstance?.lastCommittedSequenceNumber(), 1); + expect(commitLogInstance?.lastCommittedSequenceNumber(), 1); + }); - test('test to verify the last synced entry with regex', () async { - var commitLogInstance = - await (AtCommitLogManagerImpl.getInstance().getCommitLog('@alice')); + test( + 'test to verify commitId does not increment for public hidden keys with single _', + () async { + var commitLogInstance = + await (AtCommitLogManagerImpl.getInstance().getCommitLog('@alice')); + var commitId = await commitLogInstance?.commit( + 'public:_location@alice', CommitOp.UPDATE); + expect(commitId, -1); + expect(commitLogInstance?.lastCommittedSequenceNumber(), -1); + }); - await commitLogInstance?.commit('location.buzz@alice', CommitOp.UPDATE); - await commitLogInstance?.commit('mobile.wavi@alice', CommitOp.UPDATE); - await commitLogInstance?.commit('phone.buzz@alice', CommitOp.UPDATE); - - CommitEntry? commitEntry0 = await commitLogInstance?.getEntry(0); - await commitLogInstance?.update(commitEntry0!, 2); - CommitEntry? commitEntry1 = await commitLogInstance?.getEntry(1); - await commitLogInstance?.update(commitEntry1!, 1); - CommitEntry? commitEntry2 = await commitLogInstance?.getEntry(2); - await commitLogInstance?.update(commitEntry2!, 0); - var lastSyncedEntry = - await commitLogInstance?.lastSyncedEntryWithRegex('buzz'); - expect(lastSyncedEntry!.atKey!, 'location.buzz@alice'); - expect(lastSyncedEntry.commitId!, 2); - lastSyncedEntry = - await commitLogInstance?.lastSyncedEntryWithRegex('wavi'); - expect(lastSyncedEntry!.atKey!, 'mobile.wavi@alice'); - expect(lastSyncedEntry.commitId!, 1); - var lastSyncedEntriesList = commitLogInstance!.commitLogKeyStore - .getLastSyncedEntryCacheMapValues(); - expect(lastSyncedEntriesList.length, 2); - }); + test('test to verify commitId does not increment for privatekey', + () async { + var commitLogInstance = + await (AtCommitLogManagerImpl.getInstance().getCommitLog('@alice')); + var commitId = await commitLogInstance?.commit( + 'privatekey:testkey@alice', CommitOp.UPDATE); + expect(commitId, -1); + expect(commitLogInstance?.lastCommittedSequenceNumber(), -1); + }); - test( - 'Test to verify that null is returned when no values are present in local keystore', - () async { - var commitLogInstance = - await (AtCommitLogManagerImpl.getInstance().getCommitLog('@alice')); - var lastSyncedEntry = await commitLogInstance?.lastSyncedEntry(); - expect(lastSyncedEntry, null); - }); + test('test to verify commitId increments for signing public key', + () async { + var commitLogInstance = + await (AtCommitLogManagerImpl.getInstance().getCommitLog('@alice')); + var commitId = await commitLogInstance?.commit( + 'public:signing_publickey@alice', CommitOp.UPDATE); + expect(commitId, 0); + expect(commitLogInstance?.lastCommittedSequenceNumber(), 0); + }); - test( - 'Test to verify that null is returned when matches entry for regex is not found', - () async { - var commitLogInstance = - await (AtCommitLogManagerImpl.getInstance().getCommitLog('@alice')); + test('test to verify commitId increments for signing private key', + () async { + var commitLogInstance = + await (AtCommitLogManagerImpl.getInstance().getCommitLog('@alice')); + var commitId = await commitLogInstance?.commit( + '@alice:signing_privatekey@alice', CommitOp.UPDATE); + expect(commitId, 0); + expect(commitLogInstance?.lastCommittedSequenceNumber(), 0); + }); - await commitLogInstance?.commit('location.buzz@alice', CommitOp.UPDATE); - CommitEntry? commitEntry0 = await commitLogInstance?.getEntry(0); - await commitLogInstance?.update(commitEntry0!, 2); - var lastSyncedEntry = - await commitLogInstance?.lastSyncedEntryWithRegex('wavi'); - expect(lastSyncedEntry, null); - }); - tearDown(() async => await tearDownFunc()); - }); + test( + 'test to verify commitId does not increment for key starting with private:', + () async { + var commitLogInstance = + await (AtCommitLogManagerImpl.getInstance().getCommitLog('@alice')); + var commitId = await commitLogInstance?.commit( + 'private:testkey@alice', CommitOp.UPDATE); + expect(commitId, -1); + expect(commitLogInstance?.lastCommittedSequenceNumber(), -1); + }); - group('A group of commit log compaction tests', () { - setUp(() async => await setUpFunc(storageDir)); - test('Test to verify compaction when single is modified ten times', - () async { - var commitLogInstance = - await (AtCommitLogManagerImpl.getInstance().getCommitLog('@alice')); - var compactionService = - CommitLogCompactionService(commitLogInstance!.commitLogKeyStore); - commitLogInstance.addEventListener(compactionService); - for (int i = 0; i <= 50; i++) { - await commitLogInstance.commit('location@alice', CommitOp.UPDATE); - } - - var list = compactionService.getEntries('location@alice'); - expect(list?.getSize(), 1); + test( + 'test to verify commitId does increment for public hidden keys with multiple __', + () async { + var commitLogInstance = + await (AtCommitLogManagerImpl.getInstance().getCommitLog('@alice')); + var commitId = await commitLogInstance?.commit( + 'public:__location@alice', CommitOp.UPDATE); + expect(commitId, 0); + expect(commitLogInstance?.lastCommittedSequenceNumber(), 0); + }); }); + group('A group of commit log compaction tests', () { + setUp(() async => await setUpFunc(storageDir)); + test('Test to verify compaction when single is modified ten times', + () async { + var commitLogInstance = + await (AtCommitLogManagerImpl.getInstance().getCommitLog('@alice')); + var compactionService = + CommitLogCompactionService(commitLogInstance!.commitLogKeyStore); + commitLogInstance.addEventListener(compactionService); + for (int i = 0; i <= 50; i++) { + await commitLogInstance.commit('location@alice', CommitOp.UPDATE); + } - test('Test to verify compaction when two are modified ten times', () async { - var commitLogInstance = - await (AtCommitLogManagerImpl.getInstance().getCommitLog('@alice')); - var compactionService = - CommitLogCompactionService(commitLogInstance!.commitLogKeyStore); - commitLogInstance.addEventListener(compactionService); - for (int i = 0; i <= 50; i++) { - await commitLogInstance.commit('location@alice', CommitOp.UPDATE); - await commitLogInstance.commit('country@alice', CommitOp.UPDATE); - } - var locationList = compactionService.getEntries('location@alice'); - var countryList = compactionService.getEntries('country@alice'); - expect(locationList!.getSize(), 1); - expect(countryList!.getSize(), 1); - }); + var list = compactionService.getEntries('location@alice'); + expect(list?.getSize(), 1); + }); - test('A test to verify old commit entry is removed when a key is updated', - () async { - var commitLogInstance = - await (AtCommitLogManagerImpl.getInstance().getCommitLog('@alice')); - for (int i = 0; i < 5; i++) { + test('Test to verify compaction when two are modified ten times', + () async { + var commitLogInstance = + await (AtCommitLogManagerImpl.getInstance().getCommitLog('@alice')); + var compactionService = + CommitLogCompactionService(commitLogInstance!.commitLogKeyStore); + commitLogInstance.addEventListener(compactionService); + for (int i = 0; i <= 50; i++) { + await commitLogInstance.commit('location@alice', CommitOp.UPDATE); + await commitLogInstance.commit('country@alice', CommitOp.UPDATE); + } + var locationList = compactionService.getEntries('location@alice'); + var countryList = compactionService.getEntries('country@alice'); + expect(locationList!.getSize(), 1); + expect(countryList!.getSize(), 1); + }); + + test('A test to verify old commit entry is removed when a key is updated', + () async { + var commitLogInstance = + await (AtCommitLogManagerImpl.getInstance().getCommitLog('@alice')); + for (int i = 0; i < 5; i++) { + await commitLogInstance! + .commit('location.wavi@alice', CommitOp.UPDATE); + } + Iterator iterator = + commitLogInstance!.getEntries(-1, regex: 'location.wavi'); + iterator.moveNext(); + expect(iterator.current.value.commitId, 4); + expect(iterator.current.value.atKey, 'location.wavi@alice'); + expect(iterator.current.value.operation, CommitOp.UPDATE); + }); + + test('A test to verify old commit entry is removed when a key is delete', + () async { + var commitLogInstance = + await (AtCommitLogManagerImpl.getInstance().getCommitLog('@alice')); await commitLogInstance!.commit('location.wavi@alice', CommitOp.UPDATE); - } - Iterator iterator = - commitLogInstance!.getEntries(-1, regex: 'location.wavi'); - iterator.moveNext(); - expect(iterator.current.value.commitId, 4); - expect(iterator.current.value.atKey, 'location.wavi@alice'); - expect(iterator.current.value.operation, CommitOp.UPDATE); - }); + await commitLogInstance.commit('location.wavi@alice', CommitOp.DELETE); + // Fetch the commit entry using the lastSyncedCommitEntry + Iterator iterator = + commitLogInstance.getEntries(-1, regex: 'location.wavi'); + iterator.moveNext(); + expect(iterator.current.value.commitId, 1); + expect(iterator.current.value.atKey, 'location.wavi@alice'); + expect(iterator.current.value.operation, CommitOp.DELETE); + }); - test('A test to verify old commit entry is removed when a key is delete', - () async { - var commitLogInstance = - await (AtCommitLogManagerImpl.getInstance().getCommitLog('@alice')); - await commitLogInstance!.commit('location.wavi@alice', CommitOp.UPDATE); - await commitLogInstance.commit('location.wavi@alice', CommitOp.DELETE); - // Fetch the commit entry using the lastSyncedCommitEntry - Iterator iterator = - commitLogInstance.getEntries(-1, regex: 'location.wavi'); - iterator.moveNext(); - expect(iterator.current.value.commitId, 1); - expect(iterator.current.value.atKey, 'location.wavi@alice'); - expect(iterator.current.value.operation, CommitOp.DELETE); + test( + 'A test to verify if size of commit log matches length of commit log cache map then commit log keystore is compacted', + () async { + var commitLogInstance = + await (AtCommitLogManagerImpl.getInstance().getCommitLog('@alice')); + // Add 5 distinct keys + await commitLogInstance! + .commit('firstname.wavi@alice', CommitOp.UPDATE); + await commitLogInstance.commit('lastName.wavi@alice', CommitOp.UPDATE); + await commitLogInstance.commit('country.wavi@alice', CommitOp.UPDATE); + await commitLogInstance.commit('phone.wavi@alice', CommitOp.UPDATE); + await commitLogInstance.commit('location.wavi@alice', CommitOp.UPDATE); + // Update the keys + await commitLogInstance.commit( + 'location.wavi@alice', CommitOp.UPDATE_ALL); + await commitLogInstance.commit( + 'lastName.wavi@alice', CommitOp.UPDATE_ALL); + await commitLogInstance.commit('firstname.wavi@alice', CommitOp.UPDATE); + await commitLogInstance.commit('country.wavi@alice', CommitOp.UPDATE); + // Add a new key which is NOT in commit log keystore + await commitLogInstance.commit('city.wavi@alice', CommitOp.UPDATE); + // Delete the existing key + await commitLogInstance.commit('location.wavi@alice', CommitOp.DELETE); + // Verify size of commit log keystore and commit log cache map are equal + expect(commitLogInstance.commitLogKeyStore.getBox().keys.length, + commitLogInstance.commitLogKeyStore.commitEntriesList().length); + // Get all entries from the commit log keystore. + Iterator itr = + commitLogInstance.commitLogKeyStore.getBox().keys.iterator; + itr.moveNext(); + CommitEntry commitEntry = + (commitLogInstance.commitLogKeyStore.getBox() as Box) + .get(itr.current); + expect(commitEntry.atKey, 'phone.wavi@alice'); + expect(commitEntry.commitId, 3); + expect(commitEntry.operation, CommitOp.UPDATE); + itr.moveNext(); + commitEntry = (commitLogInstance.commitLogKeyStore.getBox() as Box) + .get(itr.current); + expect(commitEntry.atKey, 'lastName.wavi@alice'); + expect(commitEntry.commitId, 6); + expect(commitEntry.operation, CommitOp.UPDATE_ALL); + itr.moveNext(); + commitEntry = (commitLogInstance.commitLogKeyStore.getBox() as Box) + .get(itr.current); + expect(commitEntry.atKey, 'firstname.wavi@alice'); + expect(commitEntry.commitId, 7); + expect(commitEntry.operation, CommitOp.UPDATE); + itr.moveNext(); + commitEntry = (commitLogInstance.commitLogKeyStore.getBox() as Box) + .get(itr.current); + expect(commitEntry.atKey, 'country.wavi@alice'); + expect(commitEntry.commitId, 8); + expect(commitEntry.operation, CommitOp.UPDATE); + itr.moveNext(); + commitEntry = (commitLogInstance.commitLogKeyStore.getBox() as Box) + .get(itr.current); + expect(commitEntry.atKey, 'city.wavi@alice'); + expect(commitEntry.commitId, 9); + expect(commitEntry.operation, CommitOp.UPDATE); + itr.moveNext(); + commitEntry = (commitLogInstance.commitLogKeyStore.getBox() as Box) + .get(itr.current); + expect(commitEntry.atKey, 'location.wavi@alice'); + expect(commitEntry.commitId, 10); + expect(commitEntry.operation, CommitOp.DELETE); + // To ensure there are no more keys in iterator. + expect(itr.moveNext(), false); + }); }); - test( - 'A test to verify if size of commit log matches length of commit log cache map then commit log keystore is compacted', - () async { - var commitLogInstance = - await (AtCommitLogManagerImpl.getInstance().getCommitLog('@alice')); - // Add 5 distinct keys - await commitLogInstance!.commit('firstname.wavi@alice', CommitOp.UPDATE); - await commitLogInstance.commit('lastName.wavi@alice', CommitOp.UPDATE); - await commitLogInstance.commit('country.wavi@alice', CommitOp.UPDATE); - await commitLogInstance.commit('phone.wavi@alice', CommitOp.UPDATE); - await commitLogInstance.commit('location.wavi@alice', CommitOp.UPDATE); - // Update the keys - await commitLogInstance.commit( - 'location.wavi@alice', CommitOp.UPDATE_ALL); - await commitLogInstance.commit( - 'lastName.wavi@alice', CommitOp.UPDATE_ALL); - await commitLogInstance.commit('firstname.wavi@alice', CommitOp.UPDATE); - await commitLogInstance.commit('country.wavi@alice', CommitOp.UPDATE); - // Add a new key which is NOT in commit log keystore - await commitLogInstance.commit('city.wavi@alice', CommitOp.UPDATE); - // Delete the existing key - await commitLogInstance.commit('location.wavi@alice', CommitOp.DELETE); - // Verify size of commit log keystore and commit log cache map are equal - expect(commitLogInstance.commitLogKeyStore.getBox().keys.length, - commitLogInstance.commitLogKeyStore.commitEntriesList().length); - // Get all entries from the commit log keystore. - Iterator itr = commitLogInstance.commitLogKeyStore.getBox().keys.iterator; - itr.moveNext(); - CommitEntry commitEntry = - (commitLogInstance.commitLogKeyStore.getBox() as Box) - .get(itr.current); - expect(commitEntry.atKey, 'phone.wavi@alice'); - expect(commitEntry.commitId, 3); - expect(commitEntry.operation, CommitOp.UPDATE); - itr.moveNext(); - commitEntry = (commitLogInstance.commitLogKeyStore.getBox() as Box) - .get(itr.current); - expect(commitEntry.atKey, 'lastName.wavi@alice'); - expect(commitEntry.commitId, 6); - expect(commitEntry.operation, CommitOp.UPDATE_ALL); - itr.moveNext(); - commitEntry = (commitLogInstance.commitLogKeyStore.getBox() as Box) - .get(itr.current); - expect(commitEntry.atKey, 'firstname.wavi@alice'); - expect(commitEntry.commitId, 7); - expect(commitEntry.operation, CommitOp.UPDATE); - itr.moveNext(); - commitEntry = (commitLogInstance.commitLogKeyStore.getBox() as Box) - .get(itr.current); - expect(commitEntry.atKey, 'country.wavi@alice'); - expect(commitEntry.commitId, 8); - expect(commitEntry.operation, CommitOp.UPDATE); - itr.moveNext(); - commitEntry = (commitLogInstance.commitLogKeyStore.getBox() as Box) - .get(itr.current); - expect(commitEntry.atKey, 'city.wavi@alice'); - expect(commitEntry.commitId, 9); - expect(commitEntry.operation, CommitOp.UPDATE); - itr.moveNext(); - commitEntry = (commitLogInstance.commitLogKeyStore.getBox() as Box) - .get(itr.current); - expect(commitEntry.atKey, 'location.wavi@alice'); - expect(commitEntry.commitId, 10); - expect(commitEntry.operation, CommitOp.DELETE); - // To ensure there are no more keys in iterator. - expect(itr.moveNext(), false); - }); - tearDown(() async => await tearDownFunc()); - }); + group('A group of tests to verify repair commit log', () { + // When client syncs data to server, there might be chance of partial execution of + // add method (due to application crash)- leading to null commitIds being added into + // the server commit entry. Hence setting the "enableCommitId" to false to inject + // commit entries with null commit ids. + setUp(() async => await setUpFunc(storageDir, enableCommitId: false)); + test( + 'A test to verify null commit id gets replaced with hive internal key', + () async { + var commitLogInstance = + await (AtCommitLogManagerImpl.getInstance().getCommitLog('@alice')); + await commitLogInstance?.commit('location@alice', CommitOp.UPDATE); + var commitLogMap = await commitLogInstance?.commitLogKeyStore.toMap(); + expect(commitLogMap?.values.first.commitId, null); + await commitLogInstance?.commitLogKeyStore + .repairNullCommitIDs(commitLogMap!); + commitLogMap = await commitLogInstance?.commitLogKeyStore.toMap(); + expect(commitLogMap?.values.first.commitId, 0); + }); - group('A group of tests to verify repair commit log', () { - setUp(() async => await setUpFunc(storageDir, enableCommitId: false)); - test('A test to verify null commit id gets replaced with hive internal key', - () async { - var commitLogInstance = - await (AtCommitLogManagerImpl.getInstance().getCommitLog('@alice')); - commitLogInstance?.commit('location@alice', CommitOp.UPDATE); - var commitLogMap = await commitLogInstance?.commitLogKeyStore.toMap(); - expect(commitLogMap?.values.first.commitId, null); - await commitLogInstance?.commitLogKeyStore - .repairNullCommitIDs(commitLogMap!); - commitLogMap = await commitLogInstance?.commitLogKeyStore.toMap(); - expect(commitLogMap?.values.first.commitId, 0); + test( + 'A test to verify multiple null commit id gets replaced with hive internal key', + () async { + var commitLogInstance = + await (AtCommitLogManagerImpl.getInstance().getCommitLog('@alice')); + // Inserting commitEntry with commitId 0 + await commitLogInstance!.commitLogKeyStore.getBox().add( + CommitEntry('location@alice', CommitOp.UPDATE, DateTime.now()) + ..commitId = 0); + // Inserting commitEntry with null commitId + await commitLogInstance.commitLogKeyStore.getBox().add( + CommitEntry('location@alice', CommitOp.UPDATE, DateTime.now())); + // Inserting commitEntry with commitId 2 + await commitLogInstance.commitLogKeyStore.getBox().add( + CommitEntry('phone@alice', CommitOp.UPDATE, DateTime.now()) + ..commitId = 2); + // Inserting commitEntry with null commitId + await commitLogInstance.commitLogKeyStore + .getBox() + .add(CommitEntry('mobile@alice', CommitOp.UPDATE, DateTime.now())); + + var commitLogMap = await commitLogInstance.commitLogKeyStore.toMap(); + await commitLogInstance.commitLogKeyStore + .repairNullCommitIDs(commitLogMap); + commitLogMap = await commitLogInstance.commitLogKeyStore.toMap(); + commitLogMap.forEach((key, value) { + assert(value.commitId != null); + expect(value.commitId, key); + }); + + // verify the commit id's return correct key's + expect((await commitLogInstance.commitLogKeyStore.get(1))?.atKey, + 'location@alice'); + expect((await commitLogInstance.commitLogKeyStore.get(3))?.atKey, + 'mobile@alice'); + }); }); + group('A group of tests to verify local key does not add to commit log', + () { + setUp(() async => await setUpFunc(storageDir, enableCommitId: true)); + test('local key does not add to commit log', () async { + var commitLogInstance = + await (AtCommitLogManagerImpl.getInstance().getCommitLog('@alice')); - test( - 'A test to verify multiple null commit id gets replaced with hive internal key', - () async { - var commitLogInstance = - await (AtCommitLogManagerImpl.getInstance().getCommitLog('@alice')); - // Inserting commitEntry with commitId 0 - await commitLogInstance!.commitLogKeyStore.getBox().add( - CommitEntry('location@alice', CommitOp.UPDATE, DateTime.now()) - ..commitId = 0); - // Inserting commitEntry with null commitId - await commitLogInstance.commitLogKeyStore - .getBox() - .add(CommitEntry('location@alice', CommitOp.UPDATE, DateTime.now())); - // Inserting commitEntry with commitId 2 - await commitLogInstance.commitLogKeyStore.getBox().add( - CommitEntry('phone@alice', CommitOp.UPDATE, DateTime.now()) - ..commitId = 2); - // Inserting commitEntry with null commitId - await commitLogInstance.commitLogKeyStore - .getBox() - .add(CommitEntry('mobile@alice', CommitOp.UPDATE, DateTime.now())); - - var commitLogMap = await commitLogInstance.commitLogKeyStore.toMap(); - await commitLogInstance.commitLogKeyStore - .repairNullCommitIDs(commitLogMap); - commitLogMap = await commitLogInstance.commitLogKeyStore.toMap(); - commitLogMap.forEach((key, value) { - assert(value.commitId != null); - expect(value.commitId, key); - }); - - // verify the commit id's return correct key's - expect((await commitLogInstance.commitLogKeyStore.get(1))?.atKey, - 'location@alice'); - expect((await commitLogInstance.commitLogKeyStore.get(3))?.atKey, - 'mobile@alice'); - }); - tearDown(() async => await tearDownFunc()); - }); + var commitId = await commitLogInstance?.commit( + 'local:phone.wavi@alice', CommitOp.UPDATE); + expect(commitId, -1); + }); - group('A group of tests to verify commit log cache map', () { - setUp(() async => await setUpFunc(storageDir, enableCommitId: true)); - test('test to verify the entries count in commit cache map after commit', - () async { - var commitLogInstance = - await (AtCommitLogManagerImpl.getInstance().getCommitLog('@alice')); + test( + 'Test to verify local created with static local method does not add to commit log', + () async { + var commitLogInstance = + await (AtCommitLogManagerImpl.getInstance().getCommitLog('@alice')); - await commitLogInstance?.commit('location@alice', CommitOp.UPDATE); - await commitLogInstance?.commit('mobile@alice', CommitOp.UPDATE); - await commitLogInstance?.commit('phone@alice', CommitOp.UPDATE); + var atKey = AtKey.local('phone', '@alice', namespace: 'wavi').build(); - Iterator? entriesIterator = commitLogInstance?.getEntries(-1); - int commitLogCountBeforeDeletion = 0; - if (entriesIterator != null) { - while (entriesIterator.moveNext()) { - commitLogCountBeforeDeletion++; - } - } - expect(commitLogCountBeforeDeletion, 3); + var commitId = + await commitLogInstance?.commit(atKey.toString(), CommitOp.UPDATE); + expect(commitId, -1); + }); + + test('Test to verify local created with AtKey does not add to commit log', + () async { + var commitLogInstance = + await (AtCommitLogManagerImpl.getInstance().getCommitLog('@alice')); + var atKey = AtKey() + ..key = 'phone' + ..sharedBy = '@alice' + ..namespace = 'wavi' + ..isLocal = true; + var commitId = + await commitLogInstance?.commit(atKey.toString(), CommitOp.UPDATE); + expect(commitId, -1); + }); }); - test( - 'test to verify the entries count in commit cache map after removing from commit log', - () async { - var commitLogInstance = - await (AtCommitLogManagerImpl.getInstance().getCommitLog('@alice')); + group('A group of tests to verify commit log cache map', () { + setUp(() async => await setUpFunc(storageDir, enableCommitId: true)); + test('test to verify the entries count in commit cache map after commit', + () async { + var commitLogInstance = + await (AtCommitLogManagerImpl.getInstance().getCommitLog('@alice')); + + await commitLogInstance?.commit('location@alice', CommitOp.UPDATE); + await commitLogInstance?.commit('mobile@alice', CommitOp.UPDATE); + await commitLogInstance?.commit('phone@alice', CommitOp.UPDATE); + + Iterator? entriesIterator = commitLogInstance?.getEntries(-1); + int commitLogCountBeforeDeletion = 0; + if (entriesIterator != null) { + while (entriesIterator.moveNext()) { + commitLogCountBeforeDeletion++; + } + } + expect(commitLogCountBeforeDeletion, 3); + }); + test( + 'A test to verify entries in commit cache map are sorted by commit-id in ascending order', + () async { + var commitLogInstance = + await (AtCommitLogManagerImpl.getInstance().getCommitLog('@alice')); + await commitLogInstance?.commit( + '@alice:key1.wavi@alice', CommitOp.UPDATE); + await commitLogInstance?.commit( + '@alice:key2.wavi@alice', CommitOp.UPDATE); + await commitLogInstance?.commit( + '@alice:key3.wavi@alice', CommitOp.UPDATE); + await commitLogInstance?.commit( + '@alice:key2.wavi@alice', CommitOp.DELETE); + await commitLogInstance?.commit( + '@alice:key1.wavi@alice', CommitOp.UPDATE); + await commitLogInstance!.commitLogKeyStore + .repairCommitLogAndCreateCachedMap(); + Iterator> itr = + commitLogInstance.getEntries(-1); + itr.moveNext(); + expect(itr.current.key, '@alice:key3.wavi@alice'); + expect(itr.current.value.commitId, 2); + expect(itr.current.value.operation, CommitOp.UPDATE); + + itr.moveNext(); + expect(itr.current.key, '@alice:key2.wavi@alice'); + expect(itr.current.value.commitId, 3); + expect(itr.current.value.operation, CommitOp.DELETE); + + itr.moveNext(); + expect(itr.current.key, '@alice:key1.wavi@alice'); + expect(itr.current.value.commitId, 4); + expect(itr.current.value.operation, CommitOp.UPDATE); + }); - await commitLogInstance?.commit('location@alice', CommitOp.UPDATE); - int? commitIdToRemove = - await commitLogInstance?.commit('mobile@alice', CommitOp.UPDATE); - await commitLogInstance?.commit('phone@alice', CommitOp.UPDATE); + test( + 'A test to verify the order of keys and values in commit log cache map', + () async { + var commitLogInstance = + await (AtCommitLogManagerImpl.getInstance().getCommitLog('@alice')); + await commitLogInstance?.commit( + '@alice:key1.wavi@alice', CommitOp.UPDATE); + await commitLogInstance?.commit( + '@alice:key2.wavi@alice', CommitOp.UPDATE); + await commitLogInstance?.commit( + '@alice:key3.wavi@alice', CommitOp.UPDATE); + await commitLogInstance?.commit( + '@alice:key2.wavi@alice', CommitOp.DELETE); + await commitLogInstance?.commit( + '@alice:key1.wavi@alice', CommitOp.UPDATE); + await commitLogInstance!.commitLogKeyStore + .repairCommitLogAndCreateCachedMap(); + + List> commitEntriesList = + commitLogInstance.commitLogKeyStore.commitEntriesList(); + expect(commitEntriesList[0].key, '@alice:key3.wavi@alice'); + expect(commitEntriesList[0].value.commitId, 2); + + expect(commitEntriesList[1].key, '@alice:key2.wavi@alice'); + expect(commitEntriesList[1].value.commitId, 3); + + expect(commitEntriesList[2].key, '@alice:key1.wavi@alice'); + expect(commitEntriesList[2].value.commitId, 4); + }); - Iterator? entriesIterator = commitLogInstance?.getEntries(-1); - int commitLogCountBeforeDeletion = 0; - if (entriesIterator != null) { - while (entriesIterator.moveNext()) { - commitLogCountBeforeDeletion++; + test( + 'A test to verify the entries count in commit cache map after removing from commit log', + () async { + var commitLogInstance = + await (AtCommitLogManagerImpl.getInstance().getCommitLog('@alice')); + + await commitLogInstance?.commit('location@alice', CommitOp.UPDATE); + int? commitIdToRemove = + await commitLogInstance?.commit('mobile@alice', CommitOp.UPDATE); + await commitLogInstance?.commit('phone@alice', CommitOp.UPDATE); + + Iterator? entriesIterator = commitLogInstance?.getEntries(-1); + int commitLogCountBeforeDeletion = 0; + if (entriesIterator != null) { + while (entriesIterator.moveNext()) { + commitLogCountBeforeDeletion++; + } } - } - expect(commitLogCountBeforeDeletion, 3); - await commitLogInstance?.commitLogKeyStore.remove(commitIdToRemove!); - entriesIterator = commitLogInstance?.getEntries(-1); - int commitLogCountAfterDeletion = 0; - if (entriesIterator != null) { - while (entriesIterator.moveNext()) { - commitLogCountAfterDeletion++; + expect(commitLogCountBeforeDeletion, 3); + await commitLogInstance?.commitLogKeyStore.remove(commitIdToRemove!); + entriesIterator = commitLogInstance?.getEntries(-1); + int commitLogCountAfterDeletion = 0; + if (entriesIterator != null) { + while (entriesIterator.moveNext()) { + commitLogCountAfterDeletion++; + } } - } - expect(commitLogCountAfterDeletion, 2); - }); - test('test to verify the whether correct entry is removed from cache', - () async { - var commitLogInstance = - await (AtCommitLogManagerImpl.getInstance().getCommitLog('@alice')); - - await commitLogInstance?.commit('location@alice', CommitOp.UPDATE); - int? commitIdToRemove = - await commitLogInstance?.commit('mobile@alice', CommitOp.UPDATE); - await commitLogInstance?.commit('phone@alice', CommitOp.UPDATE); + expect(commitLogCountAfterDeletion, 2); + }); - await commitLogInstance?.commitLogKeyStore.remove(commitIdToRemove!); - final commitEntry = await commitLogInstance?.getEntry(commitIdToRemove); - expect(commitEntry, isNull); - }); - test( - 'A test to verify only commit entries with null commitId are returned when enableCommitId is false', - () async { - var commitLogInstance = - await (AtCommitLogManagerImpl.getInstance().getCommitLog('@alice')); - var commitLogKeystore = commitLogInstance!.commitLogKeyStore; - //setting enable commitId to false - to test client side functionality - commitLogKeystore.enableCommitId = false; - //loop to create 10 keys - even keys have commitId null - odd keys have commitId - for (int i = 0; i < 10; i++) { - if (i % 2 == 0) { - await commitLogKeystore.getBox().add(CommitEntry( - 'test_key_false_$i', CommitOp.UPDATE, DateTime.now())); - } else { - await commitLogKeystore.getBox().add( - CommitEntry('test_key_false_$i', CommitOp.UPDATE, DateTime.now()) - ..commitId = i); - } - } - List changes = - await commitLogInstance.commitLogKeyStore.getChanges(-1); - //run loop and test all commit entries returned have commitId == null - for (var element in changes) { - expect(element.commitId, null); - } - }); + test('A test to verify the whether correct entry is removed from cache', + () async { + var commitLogInstance = + await (AtCommitLogManagerImpl.getInstance().getCommitLog('@alice')); + + await commitLogInstance?.commit('location@alice', CommitOp.UPDATE); + int? commitIdToRemove = + await commitLogInstance?.commit('mobile@alice', CommitOp.UPDATE); + await commitLogInstance?.commit('phone@alice', CommitOp.UPDATE); + + await commitLogInstance?.commitLogKeyStore.remove(commitIdToRemove!); + Iterator>? itr = + commitLogInstance?.getEntries(-1); + itr?.moveNext(); + expect(itr?.current.value.atKey, 'location@alice'); + itr?.moveNext(); + expect(itr?.current.value.atKey, 'phone@alice'); + expect(itr?.moveNext(), false); + }); - test( - 'A test to verify all commit entries are returned when enableCommitId is true', - () async { - var commitLogInstance = - await (AtCommitLogManagerImpl.getInstance().getCommitLog('@alice')); - var commitLogKeystore = commitLogInstance!.commitLogKeyStore; - //loop to create 10 keys - even keys have commitId null - odd keys have commitId - for (int i = 0; i < 10; i++) { - if (i % 2 == 0) { - await commitLogKeystore.getBox().add( - CommitEntry('test_key_true_$i', CommitOp.UPDATE, DateTime.now())); - } else { - await commitLogKeystore.getBox().add( - CommitEntry('test_key_true_$i', CommitOp.UPDATE, DateTime.now()) - ..commitId = i); + test( + 'A test to verify all commit entries are returned when enableCommitId is true', + () async { + var commitLogInstance = + await (AtCommitLogManagerImpl.getInstance().getCommitLog('@alice')); + var commitLogKeystore = commitLogInstance!.commitLogKeyStore; + //loop to create 10 keys - even keys have commitId null - odd keys have commitId + for (int i = 0; i < 10; i++) { + if (i % 2 == 0) { + await commitLogKeystore.getBox().add(CommitEntry( + 'test_key_true_$i', CommitOp.UPDATE, DateTime.now())); + } else { + await commitLogKeystore.getBox().add( + CommitEntry('test_key_true_$i', CommitOp.UPDATE, DateTime.now()) + ..commitId = i); + } } - } - List changes = - await commitLogInstance.commitLogKeyStore.getChanges(-1); - //run loop to ensure all commit entries have been returned; irrespective of commitId null or not - for (int i = 0; i < 10; i++) { - if (i % 2 == 0) { - //while creation of commit entries, even keys have been set with commitId == null - expect(changes[i].commitId, null); - } else { - //while creation of commit entries, even keys have been set with commitId equal to iteration count - expect(changes[i].commitId, i); + Iterator>? changes = + commitLogInstance.commitLogKeyStore.getEntries(-1); + //run loop to ensure all commit entries have been returned; irrespective of commitId null or not + int i = 0; + while (changes.moveNext()) { + if (i % 2 == 0) { + //while creation of commit entries, even keys have been set with commitId == null + expect(changes.current.value.commitId, null); + } else { + //while creation of commit entries, even keys have been set with commitId equal to iteration count + expect(changes.current.value.commitId, i); + } + i++; } - } - }); - - test( - 'A test to verify entries in commit cache map are sorted by commit-id in ascending order', - () async { - var commitLogInstance = - await (AtCommitLogManagerImpl.getInstance().getCommitLog('@alice')); - await commitLogInstance?.commit( - '@alice:key1.wavi@alice', CommitOp.UPDATE); - await commitLogInstance?.commit( - '@alice:key2.wavi@alice', CommitOp.UPDATE); - await commitLogInstance?.commit( - '@alice:key3.wavi@alice', CommitOp.UPDATE); - await commitLogInstance?.commit( - '@alice:key2.wavi@alice', CommitOp.DELETE); - await commitLogInstance?.commit( - '@alice:key1.wavi@alice', CommitOp.UPDATE); - await commitLogInstance!.commitLogKeyStore - .repairCommitLogAndCreateCachedMap(); - Iterator> itr = - commitLogInstance.getEntries(-1); - itr.moveNext(); - expect(itr.current.key, '@alice:key3.wavi@alice'); - expect(itr.current.value.commitId, 2); - expect(itr.current.value.operation, CommitOp.UPDATE); - - itr.moveNext(); - expect(itr.current.key, '@alice:key2.wavi@alice'); - expect(itr.current.value.commitId, 3); - expect(itr.current.value.operation, CommitOp.DELETE); - - itr.moveNext(); - expect(itr.current.key, '@alice:key1.wavi@alice'); - expect(itr.current.value.commitId, 4); - expect(itr.current.value.operation, CommitOp.UPDATE); - }); - - test( - 'A test to verify the order of keys and values in commit log cache map', - () async { - var commitLogInstance = - await (AtCommitLogManagerImpl.getInstance().getCommitLog('@alice')); - await commitLogInstance?.commit( - '@alice:key1.wavi@alice', CommitOp.UPDATE); - await commitLogInstance?.commit( - '@alice:key2.wavi@alice', CommitOp.UPDATE); - await commitLogInstance?.commit( - '@alice:key3.wavi@alice', CommitOp.UPDATE); - await commitLogInstance?.commit( - '@alice:key2.wavi@alice', CommitOp.DELETE); - await commitLogInstance?.commit( - '@alice:key1.wavi@alice', CommitOp.UPDATE); - await commitLogInstance!.commitLogKeyStore - .repairCommitLogAndCreateCachedMap(); - - List> commitEntriesList = - commitLogInstance.commitLogKeyStore.commitEntriesList(); - expect(commitEntriesList[0].key, '@alice:key3.wavi@alice'); - expect(commitEntriesList[0].value.commitId, 2); - - expect(commitEntriesList[1].key, '@alice:key2.wavi@alice'); - expect(commitEntriesList[1].value.commitId, 3); - - expect(commitEntriesList[2].key, '@alice:key1.wavi@alice'); - expect(commitEntriesList[2].value.commitId, 4); + }); }); - tearDown(() async => await tearDownFunc()); }); - group('A group of tests to verify local key does not add to commit log', () { - test('local key does not add to commit log', () async { - var commitLogInstance = - await (AtCommitLogManagerImpl.getInstance().getCommitLog('@alice')); - - var commitId = await commitLogInstance?.commit( - 'local:phone.wavi@alice', CommitOp.UPDATE); - expect(commitId, -1); - }); - + group('A group of tests to verify commit log instances', () { test( - 'Test to verify local created with static local method does not add to commit log', + 'A test to verify CommitLogKeyStore is set when enableCommitId is set to true', () async { - var commitLogInstance = + await setUpFunc(storageDir, enableCommitId: true); + AtCommitLog? atCommitLog = await (AtCommitLogManagerImpl.getInstance().getCommitLog('@alice')); - - var atKey = AtKey.local('phone', '@alice', namespace: 'wavi').build(); - - var commitId = - await commitLogInstance?.commit(atKey.toString(), CommitOp.UPDATE); - expect(commitId, -1); + expect(atCommitLog!.commitLogKeyStore, isA()); }); - test('Test to verify local created with AtKey does not add to commit log', + test( + 'A test to verify ClientCommitLogKeyStore is set when enableCommitId is set to false', () async { - var commitLogInstance = + await setUpFunc(storageDir, enableCommitId: false); + AtCommitLog? atCommitLog = await (AtCommitLogManagerImpl.getInstance().getCommitLog('@alice')); - var atKey = AtKey() - ..key = 'phone' - ..sharedBy = '@alice' - ..namespace = 'wavi' - ..isLocal = true; - var commitId = - await commitLogInstance?.commit(atKey.toString(), CommitOp.UPDATE); - expect(commitId, -1); + expect(atCommitLog!.commitLogKeyStore, isA()); }); + tearDown(() async => await tearDownFunc()); }); } diff --git a/packages/at_secondary_server/pubspec.yaml b/packages/at_secondary_server/pubspec.yaml index 50fc9f8ab..81201c836 100644 --- a/packages/at_secondary_server/pubspec.yaml +++ b/packages/at_secondary_server/pubspec.yaml @@ -19,7 +19,7 @@ dependencies: basic_utils: 5.6.1 ecdsa: 0.0.4 encrypt: 5.0.3 - at_commons: 3.0.55 + at_commons: 3.0.56 at_utils: 3.0.15 at_chops: 1.0.4 at_lookup: 3.0.40 diff --git a/packages/at_secondary_server/test/sync_unit_test.dart b/packages/at_secondary_server/test/sync_unit_test.dart index 0750dd4be..2f3488f10 100644 --- a/packages/at_secondary_server/test/sync_unit_test.dart +++ b/packages/at_secondary_server/test/sync_unit_test.dart @@ -94,9 +94,12 @@ void main() { expect(atData.metaData!.version, 0); expect(atData.metaData?.createdBy, atSign); // verify commit entry data - CommitEntry? commitEntry = await atCommitLog!.getEntry(0); - expect(commitEntry!.operation, CommitOp.UPDATE); - expect(commitEntry.commitId, 0); + // The "getEntry" method is specific to "client" operations. Hence + // replaced with "getEntries" + Iterator itr = atCommitLog!.getEntries(-1); + itr.moveNext(); + expect(itr.current.value.operation, CommitOp.UPDATE); + expect(itr.current.value.commitId, 0); }); try { @@ -121,12 +124,12 @@ void main() { .getSecondaryKeyStore() ?.put('@alice:phone@alice', AtData()..data = '123'); // Assert commit entry before update - List commitEntryListBeforeUpdate = - await atCommitLog!.getChanges(-1, '.*'); - expect(commitEntryListBeforeUpdate.length, 1); - expect( - commitEntryListBeforeUpdate.first!.atKey, '@alice:phone@alice'); - expect(commitEntryListBeforeUpdate.first!.commitId, 0); + // The "getChanges" method is specific to the client operations. Hence + // replaced with "getEntries" method + Iterator itr = atCommitLog!.getEntries(-1); + itr.moveNext(); + expect(itr.current.value.atKey, '@alice:phone@alice'); + expect(itr.current.value.commitId, 0); // Update the same key again var keyUpdateDateTime = DateTime.now().toUtc(); await secondaryPersistenceStore!.getSecondaryKeyStore()?.put( @@ -150,7 +153,7 @@ void main() { atDataAfterUpdate.metaData!.updatedAt!.millisecondsSinceEpoch >= keyUpdateDateTime.millisecondsSinceEpoch, true); - Iterator itr = atCommitLog!.getEntries(-1); + itr = atCommitLog!.getEntries(-1); while (itr.moveNext()) { expect(itr.current.value.operation, CommitOp.UPDATE_ALL); expect(itr.current.value.commitId, 1); @@ -433,7 +436,7 @@ void main() { var atConnection = InboundConnectionImpl(null, inBoundSessionId); atConnection.metaData.isAuthenticated = true; var syncVerbParams = HashMap(); - syncVerbParams.putIfAbsent(AT_FROM_COMMIT_SEQUENCE, () => '-1'); + syncVerbParams.putIfAbsent(AtConstants.fromCommitSequence, () => '-1'); await syncProgressiveVerbHandler.processVerb( response, syncVerbParams, atConnection); List syncResponse = jsonDecode(response.data!); @@ -473,7 +476,7 @@ void main() { var atConnection = InboundConnectionImpl(null, inBoundSessionId); atConnection.metaData.isAuthenticated = true; var syncVerbParams = HashMap(); - syncVerbParams.putIfAbsent(AT_FROM_COMMIT_SEQUENCE, () => '-1'); + syncVerbParams.putIfAbsent(AtConstants.fromCommitSequence, () => '-1'); syncVerbParams.putIfAbsent('regex', () => 'buzz'); await syncProgressiveVerbHandler.processVerb( response, syncVerbParams, atConnection); @@ -515,7 +518,7 @@ void main() { var atConnection = InboundConnectionImpl(null, inBoundSessionId); atConnection.metaData.isAuthenticated = true; var syncVerbParams = HashMap(); - syncVerbParams.putIfAbsent(AT_FROM_COMMIT_SEQUENCE, () => '-1'); + syncVerbParams.putIfAbsent(AtConstants.fromCommitSequence, () => '-1'); await syncProgressiveVerbHandler.processVerb( response, syncVerbParams, atConnection); List syncResponse = jsonDecode(response.data!); @@ -524,7 +527,7 @@ void main() { // Increase the sync buffer size and assert all the 4 keys are added to sync response syncProgressiveVerbHandler.capacity = 1200; response = Response(); - syncVerbParams.putIfAbsent(AT_FROM_COMMIT_SEQUENCE, () => '-1'); + syncVerbParams.putIfAbsent(AtConstants.fromCommitSequence, () => '-1'); await syncProgressiveVerbHandler.processVerb( response, syncVerbParams, atConnection); syncResponse = jsonDecode(response.data!); @@ -554,7 +557,7 @@ void main() { var atConnection = InboundConnectionImpl(null, inBoundSessionId); atConnection.metaData.isAuthenticated = true; var syncVerbParams = HashMap(); - syncVerbParams.putIfAbsent(AT_FROM_COMMIT_SEQUENCE, () => '-1'); + syncVerbParams.putIfAbsent(AtConstants.fromCommitSequence, () => '-1'); await syncProgressiveVerbHandler.processVerb( response, syncVerbParams, atConnection); @@ -628,7 +631,7 @@ void main() { var atConnection = InboundConnectionImpl(null, inBoundSessionId); atConnection.metaData.isAuthenticated = true; var syncVerbParams = HashMap(); - syncVerbParams.putIfAbsent(AT_FROM_COMMIT_SEQUENCE, () => '-1'); + syncVerbParams.putIfAbsent(AtConstants.fromCommitSequence, () => '-1'); await syncProgressiveVerbHandler.processVerb( response, syncVerbParams, atConnection); List syncResponseList = jsonDecode(response.data!); @@ -691,7 +694,7 @@ void main() { var atConnection = InboundConnectionImpl(null, inBoundSessionId); atConnection.metaData.isAuthenticated = true; var syncVerbParams = HashMap(); - syncVerbParams.putIfAbsent(AT_FROM_COMMIT_SEQUENCE, () => '-1'); + syncVerbParams.putIfAbsent(AtConstants.fromCommitSequence, () => '-1'); await syncProgressiveVerbHandler.processVerb( response, syncVerbParams, atConnection); List syncResponseList = jsonDecode(response.data!); @@ -731,7 +734,7 @@ void main() { var atConnection = InboundConnectionImpl(null, inBoundSessionId); atConnection.metaData.isAuthenticated = true; var syncVerbParams = HashMap(); - syncVerbParams.putIfAbsent(AT_FROM_COMMIT_SEQUENCE, () => '-1'); + syncVerbParams.putIfAbsent(AtConstants.fromCommitSequence, () => '-1'); await syncProgressiveVerbHandler.processVerb( response, syncVerbParams, atConnection); List syncResponseList = jsonDecode(response.data!); @@ -770,7 +773,7 @@ void main() { var atConnection = InboundConnectionImpl(null, inBoundSessionId); atConnection.metaData.isAuthenticated = true; var syncVerbParams = HashMap(); - syncVerbParams.putIfAbsent(AT_FROM_COMMIT_SEQUENCE, () => '-1'); + syncVerbParams.putIfAbsent(AtConstants.fromCommitSequence, () => '-1'); await syncProgressiveVerbHandler.processVerb( response, syncVerbParams, atConnection); List syncResponseList = jsonDecode(response.data!); @@ -817,7 +820,7 @@ void main() { var atConnection = InboundConnectionImpl(null, inBoundSessionId); atConnection.metaData.isAuthenticated = true; var syncVerbParams = HashMap(); - syncVerbParams.putIfAbsent(AT_FROM_COMMIT_SEQUENCE, () => '-1'); + syncVerbParams.putIfAbsent(AtConstants.fromCommitSequence, () => '-1'); await syncProgressiveVerbHandler.processVerb( response, syncVerbParams, atConnection); List syncResponseList = jsonDecode(response.data!);