Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Sync verb handler add limit param #2182

Merged
merged 9 commits into from
Dec 17, 2024
3 changes: 3 additions & 0 deletions packages/at_secondary_server/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# 3.1.1
- fix: Store "publicKeyHash" value in the keystore
- fix: add limit param in SyncProgressiveVerbHandler
- build[deps]: Upgraded the following package:
at_commons to v5.1.2
# 3.1.0
- feat: sync skip deletes until changes
- fix: Enable persistence of the Initialization Vector for "defaultEncryptionPrivateKey" and "selfEncryptionKey" in
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,25 @@ class SyncProgressiveVerbHandler extends AbstractVerbHandler {
// Get Commit Log Instance.
var atCommitLog = await (AtCommitLogManagerImpl.getInstance()
.getCommitLog(AtSecondaryServerImpl.getInstance().currentAtSign));
int? skipDeletesUntil = verbParams['skipDeletesUntil'] != null
? int.parse(verbParams['skipDeletesUntil']!)
int? skipDeletesUntil = verbParams[AtConstants.skipDeletesUntil] != null
? int.parse(verbParams[AtConstants.skipDeletesUntil]!)
: null;
int? syncLimit = verbParams[AtConstants.syncLimit] != null
? int.parse(verbParams[AtConstants.syncLimit]!)
: null;
// Get entries to sync
var commitEntryIterator = atCommitLog!.getEntries(
Iterator<MapEntry<String, CommitEntry>> commitEntryIterator;
// if client doesn't pass syncLimit set the default value from server
syncLimit ??= AtSecondaryConfig.syncPageLimit;
commitEntryIterator = atCommitLog!.getEntries(
int.parse(verbParams[AtConstants.fromCommitSequence]!) + 1,
regex: verbParams['regex'],
skipDeletesUntil: skipDeletesUntil);
skipDeletesUntil: skipDeletesUntil,
limit: syncLimit);

List<KeyStoreEntry> syncResponse = [];
await prepareResponse(capacity, syncResponse, commitEntryIterator,
await prepareResponse(
capacity, syncLimit, syncResponse, commitEntryIterator,
enrollmentId:
(atConnection.metaData as InboundConnectionMetadata).enrollmentId);

Expand All @@ -61,8 +69,11 @@ class SyncProgressiveVerbHandler extends AbstractVerbHandler {
/// 1. there is at least one item in [syncResponse], and the response length is greater than [desiredMaxSyncResponseLength], or
/// 2. there are [AtSecondaryConfig.syncPageLimit] items in the [syncResponse]
@visibleForTesting
Future<void> prepareResponse(int desiredMaxSyncResponseLength,
List<KeyStoreEntry> syncResponse, Iterator<dynamic> commitEntryIterator,
Future<void> prepareResponse(
int desiredMaxSyncResponseLength,
int syncPageLimit,
List<KeyStoreEntry> syncResponse,
Iterator<dynamic> commitEntryIterator,
{String? enrollmentId}) async {
int currentResponseLength = 0;
Map<String, String> enrolledNamespaces = {};
Expand All @@ -73,8 +84,8 @@ class SyncProgressiveVerbHandler extends AbstractVerbHandler {
.get(enrollmentId))
.namespaces;
}
while (commitEntryIterator.moveNext() &&
syncResponse.length < AtSecondaryConfig.syncPageLimit) {
while (
commitEntryIterator.moveNext() && syncResponse.length < syncPageLimit) {
var atKeyType = AtKey.getKeyType(commitEntryIterator.current.key,
enforceNameSpace: false);
if (atKeyType == KeyType.invalidKey) {
Expand Down
2 changes: 1 addition & 1 deletion packages/at_secondary_server/pubspec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ dependencies:
basic_utils: 5.7.0
ecdsa: 0.1.0
encrypt: 5.0.3
at_commons: 5.1.1
at_commons: 5.1.2
at_utils: 3.0.19
at_chops: 2.2.0
at_lookup: 3.0.49
Expand Down
159 changes: 145 additions & 14 deletions packages/at_secondary_server/test/sync_verb_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import 'package:at_chops/at_chops.dart';
import 'package:at_commons/at_commons.dart';
import 'package:at_persistence_secondary_server/at_persistence_secondary_server.dart';
import 'package:at_secondary/src/connection/inbound/inbound_connection_impl.dart';
import 'package:at_secondary/src/server/at_secondary_config.dart';
import 'package:at_secondary/src/server/at_secondary_impl.dart';
import 'package:at_secondary/src/utils/handler_util.dart';
import 'package:at_secondary/src/utils/secondary_util.dart';
Expand Down Expand Up @@ -161,8 +162,8 @@ void main() {
assert(atCommitLog.entriesCount() > 0);

List<KeyStoreEntry> syncResponse = [];
await verbHandler.prepareResponse(
0, syncResponse, atCommitLog.getEntries(0));
await verbHandler.prepareResponse(0, AtSecondaryConfig.syncPageLimit,
syncResponse, atCommitLog.getEntries(0));
expect(syncResponse.length, 1);
expect(syncResponse[0].key, 'test_key_alpha@alice');
});
Expand Down Expand Up @@ -192,19 +193,19 @@ void main() {

// Since syncResponse already has an entry, and the 'capacity' is 0, then the next entry
// should not be added to the syncResponse
await verbHandler.prepareResponse(
0, syncResponse, atCommitLog.getEntries(0));
await verbHandler.prepareResponse(0, AtSecondaryConfig.syncPageLimit,
syncResponse, atCommitLog.getEntries(0));
expect(syncResponse, [entry]);

syncResponse.clear();
await verbHandler.prepareResponse(
0, syncResponse, atCommitLog.getEntries(0));
await verbHandler.prepareResponse(0, AtSecondaryConfig.syncPageLimit,
syncResponse, atCommitLog.getEntries(0));
expect(syncResponse.length, 1);
expect(syncResponse[0].key, 'test_key_alpha@alice');

syncResponse.clear();
await verbHandler.prepareResponse(
0, syncResponse, atCommitLog.getEntries(1));
await verbHandler.prepareResponse(0, AtSecondaryConfig.syncPageLimit,
syncResponse, atCommitLog.getEntries(1));
expect(syncResponse.length, 1);
expect(syncResponse[0].key, 'test_key2_beta@alice');
});
Expand Down Expand Up @@ -236,7 +237,10 @@ void main() {
syncResponse.add(entry);

await verbHandler.prepareResponse(
10 * 1024 * 1024, syncResponse, atCommitLog.getEntries(0));
10 * 1024 * 1024,
AtSecondaryConfig.syncPageLimit,
syncResponse,
atCommitLog.getEntries(0));

// Expecting that all the entries in the commitLog have been
// added to syncResponse
Expand All @@ -262,24 +266,151 @@ void main() {
assert(atCommitLog.entriesCount() == 2);

List<KeyStoreEntry> syncResponse = [];
await verbHandler.prepareResponse(
0, syncResponse, atCommitLog.getEntries(0));
await verbHandler.prepareResponse(0, AtSecondaryConfig.syncPageLimit,
syncResponse, atCommitLog.getEntries(0));
expect(syncResponse.length, 1);
expect(syncResponse[0].key, 'test_key1@alice');

syncResponse.clear();
await verbHandler.prepareResponse(
0, syncResponse, atCommitLog.getEntries(1));
await verbHandler.prepareResponse(0, AtSecondaryConfig.syncPageLimit,
syncResponse, atCommitLog.getEntries(1));
expect(syncResponse.length, 1);
expect(syncResponse[0].key, 'test_key2@alice');

// test with empty iterator
syncResponse.clear();
await verbHandler.prepareResponse(
10 * 1024 * 1024, syncResponse, atCommitLog.getEntries(2));
10 * 1024 * 1024,
AtSecondaryConfig.syncPageLimit,
syncResponse,
atCommitLog.getEntries(2));
expect(syncResponse.length, 0);
});

test(
'A test to verify sync returns default number of entries when limit is not passed',
() async {
// Add data to commit log
var atCommitLog =
await AtCommitLogManagerImpl.getInstance().getCommitLog('@alice');
await atCommitLog?.commit('phone.wavi@alice', CommitOp.UPDATE);
//Add data to keystore
var secondaryKeyStore = SecondaryPersistenceStoreFactory.getInstance()
.getSecondaryPersistenceStore('@alice');
var metadata = (AtMetaData()
..ttl = 10000
..ttb = 1000
..ttr = 100
..isBinary = false
..encoding = 'base64'
..pubKeyHash = PublicKeyHash('dummy_hash', HashingAlgoType.sha512.name)
..pubKeyCS = 'dummy_pub_key_cs');
for (int i = 1; i <= 40; i++) {
await secondaryKeyStore?.getSecondaryKeyStore()?.put(
'random_$i.wavi@alice',
AtData()
..data = i.toString()
..metaData = metadata);
}

verbHandler = SyncProgressiveVerbHandler(keyStoreManager.getKeyStore());
var response = Response();
var verbParams = HashMap<String, String>();
verbParams.putIfAbsent(AtConstants.fromCommitSequence, () => '0');
var inBoundSessionId = '123';
var atConnection = InboundConnectionImpl(mockSocket, inBoundSessionId);
await verbHandler.processVerb(response, verbParams, atConnection);

var syncResponseList = jsonDecode(response.data!);
expect(syncResponseList.length, 25);
for (int i = 0; i < syncResponseList.length; i++) {
expect(syncResponseList[i]['atKey'], 'random_${i + 1}.wavi@alice');
}
});

test(
'A test to verify sync returns correct number of entries when limit (less than default size) is passed',
() async {
// Add data to commit log
var atCommitLog =
await AtCommitLogManagerImpl.getInstance().getCommitLog('@alice');
await atCommitLog?.commit('phone.wavi@alice', CommitOp.UPDATE);
//Add data to keystore
var secondaryKeyStore = SecondaryPersistenceStoreFactory.getInstance()
.getSecondaryPersistenceStore('@alice');
var metadata = (AtMetaData()
..ttl = 10000
..ttb = 1000
..ttr = 100
..isBinary = false
..encoding = 'base64'
..pubKeyHash = PublicKeyHash('dummy_hash', HashingAlgoType.sha512.name)
..pubKeyCS = 'dummy_pub_key_cs');
for (int i = 1; i <= 40; i++) {
await secondaryKeyStore?.getSecondaryKeyStore()?.put(
'random_$i.wavi@alice',
AtData()
..data = i.toString()
..metaData = metadata);
}

verbHandler = SyncProgressiveVerbHandler(keyStoreManager.getKeyStore());
var response = Response();
var verbParams = HashMap<String, String>();
verbParams.putIfAbsent(AtConstants.fromCommitSequence, () => '0');
verbParams.putIfAbsent(AtConstants.syncLimit, () => '12');
var inBoundSessionId = '123';
var atConnection = InboundConnectionImpl(mockSocket, inBoundSessionId);
await verbHandler.processVerb(response, verbParams, atConnection);

var syncResponseList = jsonDecode(response.data!);
expect(syncResponseList.length, 12);
for (int i = 0; i < syncResponseList.length; i++) {
expect(syncResponseList[i]['atKey'], 'random_${i + 1}.wavi@alice');
}
});
test(
'A test to verify sync returns correct number of entries when limit (greater than default size) is passed',
() async {
// Add data to commit log
var atCommitLog =
await AtCommitLogManagerImpl.getInstance().getCommitLog('@alice');
await atCommitLog?.commit('phone.wavi@alice', CommitOp.UPDATE);
//Add data to keystore
var secondaryKeyStore = SecondaryPersistenceStoreFactory.getInstance()
.getSecondaryPersistenceStore('@alice');
var metadata = (AtMetaData()
..ttl = 10000
..ttb = 1000
..ttr = 100
..isBinary = false
..encoding = 'base64'
..pubKeyHash = PublicKeyHash('dummy_hash', HashingAlgoType.sha512.name)
..pubKeyCS = 'dummy_pub_key_cs');
for (int i = 1; i <= 40; i++) {
await secondaryKeyStore?.getSecondaryKeyStore()?.put(
'random_$i.wavi@alice',
AtData()
..data = i.toString()
..metaData = metadata);
}

verbHandler = SyncProgressiveVerbHandler(keyStoreManager.getKeyStore());
var response = Response();
var verbParams = HashMap<String, String>();
verbParams.putIfAbsent(AtConstants.fromCommitSequence, () => '0');
verbParams.putIfAbsent(AtConstants.syncLimit, () => '35');
var inBoundSessionId = '123';
var atConnection = InboundConnectionImpl(mockSocket, inBoundSessionId);
await verbHandler.processVerb(response, verbParams, atConnection);

var syncResponseList = jsonDecode(response.data!);
expect(syncResponseList.length, 35);
for (int i = 0; i < syncResponseList.length; i++) {
expect(syncResponseList[i]['atKey'], 'random_${i + 1}.wavi@alice');
}
});

tearDown(() async => await tearDownFunc());
});
}
Expand Down
2 changes: 1 addition & 1 deletion tests/at_end2end_test/pubspec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ dependencies:
encrypt: 5.0.3
at_demo_data: ^1.0.3
at_lookup: ^3.0.49
at_commons: ^5.1.0
at_commons: ^5.1.2

dev_dependencies:
lints: ^5.0.0
Expand Down
3 changes: 2 additions & 1 deletion tests/at_functional_test/pubspec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@ dependencies:
at_demo_data: ^1.1.0
at_chops: ^2.2.0
at_lookup: ^3.0.49
at_commons: ^5.1.0
at_commons: ^5.1.2
uuid: ^3.0.7
elliptic: ^0.3.8


dev_dependencies:
lints: ^5.0.0
test: ^1.25.9
Expand Down
Loading