Skip to content

Commit

Permalink
Experiment of forced clean of old history items for the case wqhen se…
Browse files Browse the repository at this point in the history
…rver is about to reach max object limit on the client.

Must not be merged as-is.
  • Loading branch information
AlexeyBarabash committed Apr 3, 2024
1 parent 6c8dc1a commit 353aa5e
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 3 deletions.
28 changes: 25 additions & 3 deletions command/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,16 @@ import (

var (
// Could be modified in tests.
maxGUBatchSize = 500
maxClientObjectQuota = 85000
// Original:
// maxGUBatchSize = 500
// maxClientObjectQuota = 85000

// Test with small values
maxGUBatchSize = 5
maxClientObjectQuota = 30
// new
clientObjectsToStartCleanup = 20
historyObjectsToCleanup = 5
)

const (
Expand Down Expand Up @@ -214,6 +222,20 @@ func handleCommitRequest(cache *cache.Cache, commitMsg *sync_pb.CommitMessage, c
return &errCode, fmt.Errorf("error getting client's item count: %w", err)
}

deletedHistoryEntriesCount := 0

if itemCount >= clientObjectsToStartCleanup {
_, historySyncEntities, err3 :=
db.GetUpdatesForType(historyTypeID, 0/*clientToken*/, false/*fetchFolders*/, clientID, int64(historyObjectsToCleanup)/*maxSize int64*/)
if err3 == nil && len(historySyncEntities) > 0 {
err4 := db.DeleteThese(historySyncEntities)
if err4 == nil {
deletedHistoryEntriesCount = len(historySyncEntities);
itemCount -= deletedHistoryEntriesCount
}
}
}

commitRsp.Entryresponse = make([]*sync_pb.CommitResponse_EntryResponse, len(commitMsg.Entries))

// Map client-generated ID to its server-generated ID.
Expand Down Expand Up @@ -315,7 +337,7 @@ func handleCommitRequest(cache *cache.Cache, commitMsg *sync_pb.CommitMessage, c
cache.SetTypeMtime(context.Background(), clientID, dataType, mtime)
}

err = db.UpdateClientItemCount(clientID, count)
err = db.UpdateClientItemCount(clientID, count - deletedHistoryEntriesCount)
if err != nil {
// We only impose a soft quota limit on the item count for each client, so
// we only log the error without further actions here. The reason of this
Expand Down
2 changes: 2 additions & 0 deletions datastore/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ type Datastore interface {
GetUpdatesForType(dataType int, clientToken int64, fetchFolders bool, clientID string, maxSize int64) (bool, []SyncEntity, error)
// Check if a server-defined unique tag is in the datastore.
HasServerDefinedUniqueTag(clientID string, tag string) (bool, error)
// Deletes some entries
DeleteThese(entities []SyncEntity) error;
// Get the count of sync items for a client.
GetClientItemCount(clientID string) (int, error)
// Update the count of sync items for a client.
Expand Down
6 changes: 6 additions & 0 deletions datastore/datastoretest/mock_datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ func (m *MockDatastore) HasItem(clientID string, ID string) (bool, error) {
return args.Bool(0), args.Error(1)
}

// DeleteThese mocks calls to DeleteThese
func (m *MockDatastore) DeleteThese(entities []datastore.SyncEntity) error {
args := m.Called(entities)
return args.Error(0)
}

// GetClientItemCount mocks calls to GetClientItemCount
func (m *MockDatastore) GetClientItemCount(clientID string) (int, error) {
args := m.Called(clientID)
Expand Down
14 changes: 14 additions & 0 deletions datastore/instrumented_datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,20 @@ func (_d DatastoreWithPrometheus) DisableSyncChain(clientID string) (err error)
return _d.base.DisableSyncChain(clientID)
}

// DeleteThese implements Datastore
func (_d DatastoreWithPrometheus) DeleteThese(entities []SyncEntity) (err error) {
_since := time.Now()
defer func() {
result := "ok"
if err != nil {
result = "error"
}

datastoreDurationSummaryVec.WithLabelValues(_d.instanceName, "DeleteThese", result).Observe(time.Since(_since).Seconds())
}()
return _d.base.DeleteThese(entities)
}

// GetClientItemCount implements Datastore
func (_d DatastoreWithPrometheus) GetClientItemCount(clientID string) (i1 int, err error) {
_since := time.Now()
Expand Down
63 changes: 63 additions & 0 deletions datastore/sync_entity.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,69 @@ func (dynamo *Dynamo) ClearServerData(clientID string) ([]SyncEntity, error) {
return syncEntities, nil
}

// Code is taken mostly from ClearServerData function
func (dynamo *Dynamo) DeleteThese(entities []SyncEntity) error {
if len(entities) == 0 {
return fmt.Errorf("error deleting sync entities - none requested")
}

items := []*dynamodb.TransactWriteItem{}

for _, item := range entities {
// Fail delete if race condition detected (modified time has changed).
if item.Version != nil {
cond := expression.Name("Mtime").Equal(expression.Value(*item.Mtime))
expr, err := expression.NewBuilder().WithCondition(cond).Build()
if err != nil {
return fmt.Errorf("error deleting sync entities for client <???>: %w", err)
}

writeItem := dynamodb.TransactWriteItem{
Delete: &dynamodb.Delete{
ConditionExpression: expr.Condition(),
ExpressionAttributeNames: expr.Names(),
ExpressionAttributeValues: expr.Values(),
TableName: aws.String(Table),
Key: map[string]*dynamodb.AttributeValue{
pk: {
S: aws.String(item.ClientID),
},
sk: {
S: aws.String(item.ID),
},
},
},
}

items = append(items, &writeItem)
} else {
// If row doesn't hold Mtime, delete as usual.
writeItem := dynamodb.TransactWriteItem{
Delete: &dynamodb.Delete{
TableName: aws.String(Table),
Key: map[string]*dynamodb.AttributeValue{
pk: {
S: aws.String(item.ClientID),
},
sk: {
S: aws.String(item.ID),
},
},
},
}

items = append(items, &writeItem)
}
}
_, err := dynamo.TransactWriteItems(&dynamodb.TransactWriteItemsInput{TransactItems: items})

if err != nil {
return fmt.Errorf("error deleting sync entities for client <???>: %w", err)
}

return nil
}

// IsSyncChainDisabled checks whether a given sync chain has been deleted
func (dynamo *Dynamo) IsSyncChainDisabled(clientID string) (bool, error) {
key, err := dynamodbattribute.MarshalMap(DisabledMarkerItemQuery{
Expand Down

0 comments on commit 353aa5e

Please sign in to comment.