Skip to content

Commit

Permalink
Merge pull request #68 from brave/service_time_version
Browse files Browse the repository at this point in the history
Use mtime as version
  • Loading branch information
yrliou authored May 25, 2021
2 parents 8e8edf9 + 2346c02 commit 1a8d444
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 40 deletions.
7 changes: 4 additions & 3 deletions command/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,8 +211,9 @@ func handleCommitRequest(cache *cache.Cache, commitMsg *sync_pb.CommitMessage, c
}
}

*entityToCommit.Version++
if *entityToCommit.Version == 1 { // Create
oldVersion := *entityToCommit.Version
*entityToCommit.Version = *entityToCommit.Mtime
if oldVersion == 0 { // Create
if itemCount+count >= maxClientObjectQuota {
rspType := sync_pb.CommitResponse_OVER_QUOTA
entryRsp.ResponseType = &rspType
Expand Down Expand Up @@ -240,7 +241,7 @@ func handleCommitRequest(cache *cache.Cache, commitMsg *sync_pb.CommitMessage, c

count++
} else { // Update
conflict, delete, err := db.UpdateSyncEntity(entityToCommit)
conflict, delete, err := db.UpdateSyncEntity(entityToCommit, oldVersion)
if err != nil {
log.Error().Err(err).Msg("Update sync entity failed")
rspType := sync_pb.CommitResponse_TRANSIENT_ERROR
Expand Down
50 changes: 30 additions & 20 deletions command/command_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,10 +229,12 @@ func (suite *CommandTestSuite) TestHandleClientToServerMessage_Basic() {
suite.Assert().Equal(2, len(rsp.Commit.Entryresponse))
commitSuccess := sync_pb.CommitResponse_SUCCESS
serverIDs := []string{}
commitVersions := []int64{}
for _, entryRsp := range rsp.Commit.Entryresponse {
suite.Assert().Equal(commitSuccess, *entryRsp.ResponseType)
suite.Assert().Equal(int64(1), *entryRsp.Version)
suite.Assert().Equal(*entryRsp.Mtime, *entryRsp.Version)
serverIDs = append(serverIDs, *entryRsp.IdString)
commitVersions = append(commitVersions, *entryRsp.Version)
}

// GetUpdates with token 0 should get all of them.
Expand All @@ -246,18 +248,18 @@ func (suite *CommandTestSuite) TestHandleClientToServerMessage_Basic() {
assertCommonResponse(suite, rsp, false)

expectedPBSyncAttrs := []*PBSyncAttrs{
NewPBSyncAttrs(entries[0].Name, aws.Int64(1), aws.Bool(false),
NewPBSyncAttrs(entries[0].Name, &commitVersions[0], aws.Bool(false),
aws.Bool(false), nil, getBookmarkSpecifics()),
NewPBSyncAttrs(entries[1].Name, aws.Int64(1), aws.Bool(false),
NewPBSyncAttrs(entries[1].Name, &commitVersions[1], aws.Bool(false),
aws.Bool(false), nil, getNigoriSpecifics()),
}
newMarker := marker // Initialize expected NewProgressMarker with tokens = 0.
assertGetUpdatesResponse(suite, rsp.GetUpdates, &newMarker, expectedPBSyncAttrs, 0)

// Commit one new item, update one current item for each type.
entries = []*sync_pb.SyncEntity{
getCommitEntity(serverIDs[0], 1, true, getBookmarkSpecifics()),
getCommitEntity(serverIDs[1], 1, true, getNigoriSpecifics()),
getCommitEntity(serverIDs[0], commitVersions[0], true, getBookmarkSpecifics()),
getCommitEntity(serverIDs[1], commitVersions[1], true, getNigoriSpecifics()),
getCommitEntity("id3_bookmark", 0, false, getBookmarkSpecifics()),
getCommitEntity("id4_nigori", 0, false, getNigoriSpecifics()),
}
Expand All @@ -271,11 +273,12 @@ func (suite *CommandTestSuite) TestHandleClientToServerMessage_Basic() {

suite.Assert().Equal(4, len(rsp.Commit.Entryresponse))
serverIDs = []string{}
expectedVersion := []int64{2, 2, 1, 1}
for i, entryRsp := range rsp.Commit.Entryresponse {
commitVersions = []int64{}
for _, entryRsp := range rsp.Commit.Entryresponse {
suite.Assert().Equal(commitSuccess, *entryRsp.ResponseType)
suite.Assert().Equal(expectedVersion[i], *entryRsp.Version)
suite.Assert().Equal(*entryRsp.Mtime, *entryRsp.Version)
serverIDs = append(serverIDs, *entryRsp.IdString)
commitVersions = append(commitVersions, *entryRsp.Version)
}

// GetUpdates again with previous returned mtimes and check the result, it
Expand All @@ -291,13 +294,13 @@ func (suite *CommandTestSuite) TestHandleClientToServerMessage_Basic() {
assertCommonResponse(suite, rsp, false)

expectedPBSyncAttrs = []*PBSyncAttrs{
NewPBSyncAttrs(entries[0].Name, aws.Int64(2), aws.Bool(true),
NewPBSyncAttrs(entries[0].Name, &commitVersions[0], aws.Bool(true),
aws.Bool(false), nil, getBookmarkSpecifics()),
NewPBSyncAttrs(entries[1].Name, aws.Int64(2), aws.Bool(true),
NewPBSyncAttrs(entries[1].Name, &commitVersions[1], aws.Bool(true),
aws.Bool(false), nil, getNigoriSpecifics()),
NewPBSyncAttrs(entries[2].Name, aws.Int64(1), aws.Bool(false),
NewPBSyncAttrs(entries[2].Name, &commitVersions[2], aws.Bool(false),
aws.Bool(false), nil, getBookmarkSpecifics()),
NewPBSyncAttrs(entries[3].Name, aws.Int64(1), aws.Bool(false),
NewPBSyncAttrs(entries[3].Name, &commitVersions[3], aws.Bool(false),
aws.Bool(false), nil, getNigoriSpecifics()),
}
newMarker = marker // Initialize expected NewProgressMarker with FromProgressMarker.
Expand Down Expand Up @@ -395,7 +398,7 @@ func (suite *CommandTestSuite) TestHandleClientToServerMessage_GUBatchSize() {
commitSuccess := sync_pb.CommitResponse_SUCCESS
for _, entryRsp := range rsp.Commit.Entryresponse {
suite.Assert().Equal(commitSuccess, *entryRsp.ResponseType)
suite.Assert().Equal(int64(1), *entryRsp.Version)
suite.Assert().Equal(*entryRsp.Mtime, *entryRsp.Version)
}

// Test maxGUBatchSize from client side should be respected when smaller than
Expand Down Expand Up @@ -461,10 +464,12 @@ func (suite *CommandTestSuite) TestHandleClientToServerMessage_QuotaLimit() {
suite.Assert().Equal(2, len(rsp.Commit.Entryresponse))
commitSuccess := sync_pb.CommitResponse_SUCCESS
serverIDs := []string{}
commitVersions := []int64{}
for _, entryRsp := range rsp.Commit.Entryresponse {
suite.Assert().Equal(commitSuccess, *entryRsp.ResponseType)
suite.Assert().Equal(int64(1), *entryRsp.Version)
suite.Assert().Equal(*entryRsp.Mtime, *entryRsp.Version)
serverIDs = append(serverIDs, *entryRsp.IdString)
commitVersions = append(commitVersions, *entryRsp.Version)
}

// Commit 4 items to exceed quota by a half, 2 should return OVER_QUOTA.
Expand All @@ -484,7 +489,7 @@ func (suite *CommandTestSuite) TestHandleClientToServerMessage_QuotaLimit() {
suite.Assert().Equal(4, len(rsp.Commit.Entryresponse))
overQuota := sync_pb.CommitResponse_OVER_QUOTA
expectedEntryRsp := []sync_pb.CommitResponse_ResponseType{commitSuccess, commitSuccess, overQuota, overQuota}
expectedVersion := []*int64{aws.Int64(1), aws.Int64(1), nil, nil}
expectedVersion := []*int64{rsp.Commit.Entryresponse[0].Mtime, rsp.Commit.Entryresponse[1].Mtime, nil, nil}
for i, entryRsp := range rsp.Commit.Entryresponse {
suite.Assert().Equal(expectedEntryRsp[i], *entryRsp.ResponseType)
suite.Assert().Equal(expectedVersion[i], entryRsp.Version)
Expand All @@ -509,8 +514,8 @@ func (suite *CommandTestSuite) TestHandleClientToServerMessage_QuotaLimit() {

// Commit updates to delete two previous inserted items.
entries = []*sync_pb.SyncEntity{
getCommitEntity(serverIDs[0], 1, true, getBookmarkSpecifics()),
getCommitEntity(serverIDs[1], 1, true, getBookmarkSpecifics()),
getCommitEntity(serverIDs[0], commitVersions[0], true, getBookmarkSpecifics()),
getCommitEntity(serverIDs[1], commitVersions[1], true, getBookmarkSpecifics()),
}
msg = getClientToServerCommitMsg(entries)
rsp = &sync_pb.ClientToServerResponse{}
Expand All @@ -522,7 +527,7 @@ func (suite *CommandTestSuite) TestHandleClientToServerMessage_QuotaLimit() {
suite.Assert().Equal(2, len(rsp.Commit.Entryresponse))
for _, entryRsp := range rsp.Commit.Entryresponse {
suite.Assert().Equal(commitSuccess, *entryRsp.ResponseType)
suite.Assert().Equal(int64(2), *entryRsp.Version)
suite.Assert().Equal(*entryRsp.Mtime, *entryRsp.Version)
}

// Commit 4 items should have two success and two OVER_QUOTA.
Expand All @@ -540,9 +545,14 @@ func (suite *CommandTestSuite) TestHandleClientToServerMessage_QuotaLimit() {
"HandleClientToServerMessage should succeed")
assertCommonResponse(suite, rsp, true)
suite.Assert().Equal(4, len(rsp.Commit.Entryresponse))
expectedVersion = []*int64{rsp.Commit.Entryresponse[0].Mtime, rsp.Commit.Entryresponse[1].Mtime, nil, nil}
for i, entryRsp := range rsp.Commit.Entryresponse {
suite.Assert().Equal(expectedEntryRsp[i], *entryRsp.ResponseType)
suite.Assert().Equal(expectedVersion[i], entryRsp.Version)
if *entryRsp.ResponseType == commitSuccess {
suite.Assert().Equal(*expectedVersion[i], *entryRsp.Version)
} else {
suite.Assert().Equal(expectedVersion[i], entryRsp.Version)
}
}

*command.MaxClientObjectQuota = defaultMaxClientObjectQuota
Expand Down Expand Up @@ -574,7 +584,7 @@ func (suite *CommandTestSuite) TestHandleClientToServerMessage_ReplaceParentIDTo
child3 := getCommitEntity("id_child3", 0, false, getBookmarkSpecifics())
child3.ParentIdString = aws.String("id_parent2")

updateChild0 := getCommitEntity(*rsp.Commit.Entryresponse[0].IdString, 1, false, getBookmarkSpecifics())
updateChild0 := getCommitEntity(*rsp.Commit.Entryresponse[0].IdString, *rsp.Commit.Entryresponse[0].Version, false, getBookmarkSpecifics())
updateChild0.ParentIdString = aws.String("id_parent")

entries := []*sync_pb.SyncEntity{parent1, child1, parent2, child2, child3, updateChild0}
Expand Down
2 changes: 1 addition & 1 deletion datastore/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ type Datastore interface {
// Insert a series of sync entities in a write transaction.
InsertSyncEntitiesWithServerTags(entities []*SyncEntity) error
// Update an existing sync entity.
UpdateSyncEntity(entity *SyncEntity) (conflict bool, delete bool, err error)
UpdateSyncEntity(entity *SyncEntity, oldVersion int64) (conflict bool, delete bool, err error)
// Get updates for a specific type which are modified after the time of
// client token for a given client. Besides the array of sync entities, a
// boolean value indicating whether there are more updates to query in the
Expand Down
4 changes: 2 additions & 2 deletions datastore/instrumented_datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (_d DatastoreWithPrometheus) UpdateClientItemCount(clientID string, count i
}

// UpdateSyncEntity implements Datastore
func (_d DatastoreWithPrometheus) UpdateSyncEntity(entity *SyncEntity) (conflict bool, delete bool, err error) {
func (_d DatastoreWithPrometheus) UpdateSyncEntity(entity *SyncEntity, oldVersion int64) (conflict bool, delete bool, err error) {
_since := time.Now()
defer func() {
result := "ok"
Expand All @@ -132,5 +132,5 @@ func (_d DatastoreWithPrometheus) UpdateSyncEntity(entity *SyncEntity) (conflict

datastoreDurationSummaryVec.WithLabelValues(_d.instanceName, "UpdateSyncEntity", result).Observe(time.Since(_since).Seconds())
}()
return _d.base.UpdateSyncEntity(entity)
return _d.base.UpdateSyncEntity(entity, oldVersion)
}
4 changes: 2 additions & 2 deletions datastore/sync_entity.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ func (dynamo *Dynamo) InsertSyncEntitiesWithServerTags(entities []*SyncEntity) e
}

// UpdateSyncEntity updates a sync item in dynamoDB.
func (dynamo *Dynamo) UpdateSyncEntity(entity *SyncEntity) (bool, bool, error) {
func (dynamo *Dynamo) UpdateSyncEntity(entity *SyncEntity, oldVersion int64) (bool, bool, error) {
primaryKey := PrimaryKey{ClientID: entity.ClientID, ID: entity.ID}
key, err := dynamodbattribute.MarshalMap(primaryKey)
if err != nil {
Expand All @@ -274,7 +274,7 @@ func (dynamo *Dynamo) UpdateSyncEntity(entity *SyncEntity) (bool, bool, error) {
// condition to ensure to be update only and the version is matched.
cond := expression.And(
expression.AttributeExists(expression.Name(pk)),
expression.Name("Version").Equal(expression.Value(*entity.Version-1)))
expression.Name("Version").Equal(expression.Value(oldVersion)))

update := expression.Set(expression.Name("Version"), expression.Value(entity.Version))
update = update.Set(expression.Name("Mtime"), expression.Value(entity.Mtime))
Expand Down
24 changes: 12 additions & 12 deletions datastore/sync_entity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,13 +258,13 @@ func (suite *SyncEntityTestSuite) TestUpdateSyncEntity_Basic() {

// Update without optional fields.
updateEntity1 := entity1
updateEntity1.Version = aws.Int64(2)
updateEntity1.Version = aws.Int64(23456789)
updateEntity1.Mtime = aws.Int64(23456789)
updateEntity1.Folder = aws.Bool(true)
updateEntity1.Deleted = aws.Bool(true)
updateEntity1.DataTypeMtime = aws.String("123#23456789")
updateEntity1.Specifics = []byte{3, 4}
conflict, delete, err := suite.dynamo.UpdateSyncEntity(&updateEntity1)
conflict, delete, err := suite.dynamo.UpdateSyncEntity(&updateEntity1, *entity1.Version)
suite.Require().NoError(err, "UpdateSyncEntity should succeed")
suite.Assert().False(conflict, "Successful update should not have conflict")
suite.Assert().True(delete, "Delete operation should return true")
Expand All @@ -278,7 +278,7 @@ func (suite *SyncEntityTestSuite) TestUpdateSyncEntity_Basic() {
updateEntity2.ParentID = aws.String("parentID")
updateEntity2.Name = aws.String("name")
updateEntity2.NonUniqueName = aws.String("non_unique_name")
conflict, delete, err = suite.dynamo.UpdateSyncEntity(&updateEntity2)
conflict, delete, err = suite.dynamo.UpdateSyncEntity(&updateEntity2, *entity2.Version)
suite.Require().NoError(err, "UpdateSyncEntity should succeed")
suite.Assert().False(conflict, "Successful update should not have conflict")
suite.Assert().False(delete, "Non-delete operation should return false")
Expand All @@ -288,17 +288,17 @@ func (suite *SyncEntityTestSuite) TestUpdateSyncEntity_Basic() {
updateEntity3.ID = "id3"
updateEntity3.Folder = nil
updateEntity3.Deleted = nil
conflict, delete, err = suite.dynamo.UpdateSyncEntity(&updateEntity3)
conflict, delete, err = suite.dynamo.UpdateSyncEntity(&updateEntity3, *entity3.Version)
suite.Require().NoError(err, "UpdateSyncEntity should succeed")
suite.Assert().False(conflict, "Successful update should not have conflict")
suite.Assert().False(delete, "Non-delete operation should return false")
// Reset these back to false because they will be the expected value in DB.
updateEntity3.Folder = aws.Bool(false)
updateEntity3.Deleted = aws.Bool(false)

// Update entity again with the same version as before (version mismatch)
// Update entity again with the wrong old version as (version mismatch)
// should return false.
conflict, delete, err = suite.dynamo.UpdateSyncEntity(&updateEntity2)
conflict, delete, err = suite.dynamo.UpdateSyncEntity(&updateEntity2, 12345678)
suite.Require().NoError(err, "UpdateSyncEntity should succeed")
suite.Assert().True(conflict, "Update with the same version should return conflict")
suite.Assert().False(delete, "Conflict operation should return false for delete")
Expand Down Expand Up @@ -333,28 +333,28 @@ func (suite *SyncEntityTestSuite) TestUpdateSyncEntity_ReuseClientTag() {
suite.Require().NoError(err, "ScanTagItems should succeed")
suite.Assert().Equal(1, len(tagItems), "Tag item should be inserted")

// Update it to version 2.
// Update it to version 23456789.
updateEntity1 := entity1
updateEntity1.Version = aws.Int64(2)
updateEntity1.Version = aws.Int64(23456789)
updateEntity1.Mtime = aws.Int64(23456789)
updateEntity1.Folder = aws.Bool(true)
updateEntity1.DataTypeMtime = aws.String("123#23456789")
updateEntity1.Specifics = []byte{3, 4}
conflict, delete, err := suite.dynamo.UpdateSyncEntity(&updateEntity1)
conflict, delete, err := suite.dynamo.UpdateSyncEntity(&updateEntity1, *entity1.Version)
suite.Require().NoError(err, "UpdateSyncEntity should succeed")
suite.Assert().False(conflict, "Successful update should not have conflict")
suite.Assert().False(delete, "Non-delete operation should return false")

// Soft-delete the item with wrong version should get conflict.
updateEntity1.Deleted = aws.Bool(true)
conflict, delete, err = suite.dynamo.UpdateSyncEntity(&updateEntity1)
conflict, delete, err = suite.dynamo.UpdateSyncEntity(&updateEntity1, *entity1.Version)
suite.Require().NoError(err, "UpdateSyncEntity should succeed")
suite.Assert().True(conflict, "Version mismatched update should have conflict")
suite.Assert().False(delete, "Failed delete operation should return false")

// Soft-delete the item with matched version.
updateEntity1.Version = aws.Int64(3)
conflict, delete, err = suite.dynamo.UpdateSyncEntity(&updateEntity1)
updateEntity1.Version = aws.Int64(34567890)
conflict, delete, err = suite.dynamo.UpdateSyncEntity(&updateEntity1, 23456789)
suite.Require().NoError(err, "UpdateSyncEntity should succeed")
suite.Assert().False(conflict, "Successful update should not have conflict")
suite.Assert().True(delete, "Delete operation should return true")
Expand Down

0 comments on commit 1a8d444

Please sign in to comment.