diff --git a/command/command.go b/command/command.go index 9576fe29..355db596 100644 --- a/command/command.go +++ b/command/command.go @@ -14,8 +14,16 @@ import ( var ( // Could be modified in tests. - maxGUBatchSize = 500 - maxClientObjectQuota = 85000 + // Original: + // maxGUBatchSize = 500 + // maxClientObjectQuota = 85000 + + // Test with small values + maxGUBatchSize = 5 + maxClientObjectQuota = 30 + // new + clientObjectsToStartCleanup = 20 + historyObjectsToCleanup = 5 ) const ( @@ -214,6 +222,20 @@ func handleCommitRequest(cache *cache.Cache, commitMsg *sync_pb.CommitMessage, c return &errCode, fmt.Errorf("error getting client's item count: %w", err) } + deletedHistoryEntriesCount := 0 + + if itemCount >= clientObjectsToStartCleanup { + _, historySyncEntities, err3 := + db.GetUpdatesForType(historyTypeID, 0/*clientToken*/, false/*fetchFolders*/, clientID, int64(historyObjectsToCleanup)/*maxSize int64*/) + if err3 == nil && len(historySyncEntities) > 0 { + err4 := db.DeleteThese(historySyncEntities) + if err4 == nil { + deletedHistoryEntriesCount = len(historySyncEntities); + itemCount -= deletedHistoryEntriesCount + } + } + } + commitRsp.Entryresponse = make([]*sync_pb.CommitResponse_EntryResponse, len(commitMsg.Entries)) // Map client-generated ID to its server-generated ID. @@ -315,7 +337,7 @@ func handleCommitRequest(cache *cache.Cache, commitMsg *sync_pb.CommitMessage, c cache.SetTypeMtime(context.Background(), clientID, dataType, mtime) } - err = db.UpdateClientItemCount(clientID, count) + err = db.UpdateClientItemCount(clientID, count - deletedHistoryEntriesCount) if err != nil { // We only impose a soft quota limit on the item count for each client, so // we only log the error without further actions here. The reason of this diff --git a/datastore/datastore.go b/datastore/datastore.go index 3e240ac2..8b5add0f 100644 --- a/datastore/datastore.go +++ b/datastore/datastore.go @@ -15,6 +15,8 @@ type Datastore interface { GetUpdatesForType(dataType int, clientToken int64, fetchFolders bool, clientID string, maxSize int64) (bool, []SyncEntity, error) // Check if a server-defined unique tag is in the datastore. HasServerDefinedUniqueTag(clientID string, tag string) (bool, error) + // Deletes some entries + DeleteThese(entities []SyncEntity) error; // Get the count of sync items for a client. GetClientItemCount(clientID string) (int, error) // Update the count of sync items for a client. diff --git a/datastore/datastoretest/mock_datastore.go b/datastore/datastoretest/mock_datastore.go index 94c889bf..b2e8a98d 100644 --- a/datastore/datastoretest/mock_datastore.go +++ b/datastore/datastoretest/mock_datastore.go @@ -45,6 +45,12 @@ func (m *MockDatastore) HasItem(clientID string, ID string) (bool, error) { return args.Bool(0), args.Error(1) } +// DeleteThese mocks calls to DeleteThese +func (m *MockDatastore) DeleteThese(entities []datastore.SyncEntity) error { + args := m.Called(entities) + return args.Error(0) +} + // GetClientItemCount mocks calls to GetClientItemCount func (m *MockDatastore) GetClientItemCount(clientID string) (int, error) { args := m.Called(clientID) diff --git a/datastore/instrumented_datastore.go b/datastore/instrumented_datastore.go index 35709aae..35fb4eac 100644 --- a/datastore/instrumented_datastore.go +++ b/datastore/instrumented_datastore.go @@ -65,6 +65,20 @@ func (_d DatastoreWithPrometheus) DisableSyncChain(clientID string) (err error) return _d.base.DisableSyncChain(clientID) } + // DeleteThese implements Datastore + func (_d DatastoreWithPrometheus) DeleteThese(entities []SyncEntity) (err error) { + _since := time.Now() + defer func() { + result := "ok" + if err != nil { + result = "error" + } + + datastoreDurationSummaryVec.WithLabelValues(_d.instanceName, "DeleteThese", result).Observe(time.Since(_since).Seconds()) + }() + return _d.base.DeleteThese(entities) +} + // GetClientItemCount implements Datastore func (_d DatastoreWithPrometheus) GetClientItemCount(clientID string) (i1 int, err error) { _since := time.Now() diff --git a/datastore/sync_entity.go b/datastore/sync_entity.go index 499abb19..45569307 100644 --- a/datastore/sync_entity.go +++ b/datastore/sync_entity.go @@ -469,6 +469,69 @@ func (dynamo *Dynamo) ClearServerData(clientID string) ([]SyncEntity, error) { return syncEntities, nil } +// Code is taken mostly from ClearServerData function +func (dynamo *Dynamo) DeleteThese(entities []SyncEntity) error { + if len(entities) == 0 { + return fmt.Errorf("error deleting sync entities - none requested") + } + + items := []*dynamodb.TransactWriteItem{} + + for _, item := range entities { + // Fail delete if race condition detected (modified time has changed). + if item.Version != nil { + cond := expression.Name("Mtime").Equal(expression.Value(*item.Mtime)) + expr, err := expression.NewBuilder().WithCondition(cond).Build() + if err != nil { + return fmt.Errorf("error deleting sync entities for client : %w", err) + } + + writeItem := dynamodb.TransactWriteItem{ + Delete: &dynamodb.Delete{ + ConditionExpression: expr.Condition(), + ExpressionAttributeNames: expr.Names(), + ExpressionAttributeValues: expr.Values(), + TableName: aws.String(Table), + Key: map[string]*dynamodb.AttributeValue{ + pk: { + S: aws.String(item.ClientID), + }, + sk: { + S: aws.String(item.ID), + }, + }, + }, + } + + items = append(items, &writeItem) + } else { + // If row doesn't hold Mtime, delete as usual. + writeItem := dynamodb.TransactWriteItem{ + Delete: &dynamodb.Delete{ + TableName: aws.String(Table), + Key: map[string]*dynamodb.AttributeValue{ + pk: { + S: aws.String(item.ClientID), + }, + sk: { + S: aws.String(item.ID), + }, + }, + }, + } + + items = append(items, &writeItem) + } + } + _, err := dynamo.TransactWriteItems(&dynamodb.TransactWriteItemsInput{TransactItems: items}) + + if err != nil { + return fmt.Errorf("error deleting sync entities for client : %w", err) + } + + return nil +} + // IsSyncChainDisabled checks whether a given sync chain has been deleted func (dynamo *Dynamo) IsSyncChainDisabled(clientID string) (bool, error) { key, err := dynamodbattribute.MarshalMap(DisabledMarkerItemQuery{