Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GODRIVER-2821 De-propagate unacknowledged sentinel error #1661

Merged
merged 13 commits into from
Jul 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 116 additions & 17 deletions internal/integration/collection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ package integration

import (
"context"
"errors"
"strings"
"testing"

Expand Down Expand Up @@ -1472,6 +1471,122 @@ func TestCollection(t *testing.T) {
assert.NotNil(mt, we.WriteConcernError, "expected write concern error, got %v", err)
})
})

unackClientOpts := options.Client().
SetWriteConcern(writeconcern.Unacknowledged())
unackMtOpts := mtest.NewOptions().
ClientOptions(unackClientOpts).
MinServerVersion("3.6")
mt.RunOpts("unacknowledged writes", unackMtOpts, func(mt *mtest.T) {
mt.Run("bulk write", func(mt *mtest.T) {
models := []mongo.WriteModel{
mongo.NewInsertOneModel().SetDocument(bson.D{{"x", 1}}),
}

res, err := mt.Coll.BulkWrite(context.Background(), models)

assert.NoError(mt, err)
assert.False(mt, res.Acknowledged)
})

mt.Run("insert one", func(mt *mtest.T) {
res, err := mt.Coll.InsertOne(context.Background(), bson.D{{"x", 1}})

assert.NoError(mt, err)
assert.False(mt, res.Acknowledged)
})

mt.Run("insert many", func(t *mtest.T) {
docs := []interface{}{
bson.D{{"x", 1}},
bson.D{{"y", 1}},
}

res, err := mt.Coll.InsertMany(context.Background(), docs)

assert.NoError(mt, err)
assert.False(mt, res.Acknowledged)
})

mt.Run("delete", func(mt *mtest.T) {
res, err := mt.Coll.DeleteOne(context.Background(), bson.D{{"x", 1}})

assert.NoError(mt, err)
assert.False(mt, res.Acknowledged)
})

mt.Run("update", func(t *mtest.T) {
res, err := mt.Coll.UpdateOne(context.Background(), bson.D{{"x", 1}}, bson.D{{"$set", bson.D{{"x", "2"}}}})

assert.NoError(mt, err)
assert.False(mt, res.Acknowledged)
})

mt.Run("find and modify", func(mt *mtest.T) {
res := mt.Coll.FindOneAndDelete(context.Background(), bson.D{{"x", 1}})

assert.ErrorIs(mt, res.Err(), mongo.ErrNoDocuments)
assert.False(mt, res.Acknowledged)

})

mt.Run("dropping a collection", func(mt *mtest.T) {
err := mt.Coll.Drop(context.Background())
assert.NoError(mt, err)
})

mt.Run("creating a collection", func(mt *mtest.T) {
err := mt.DB.CreateCollection(context.Background(), "test coll")
assert.NoError(mt, err)
})

mt.Run("creating an index", func(mt *mtest.T) {
indexModel := mongo.IndexModel{
Keys: bson.M{"username": 1},
Options: options.Index().SetUnique(true),
}

_, err := mt.Coll.Indexes().CreateMany(context.Background(), []mongo.IndexModel{indexModel})
assert.NoError(mt, err)
})

mt.Run("creating an index view", func(mt *mtest.T) {
projectStage := bson.D{
{"$project", bson.D{
{"_id", 0},
{"fullName", bson.D{
{"$concat", []string{"$firstName", " ", "$lastName"}},
}},
}},
}

pipeline := mongo.Pipeline{projectStage}

err := mt.DB.CreateView(context.Background(), "testview", "coll", pipeline, nil)
assert.NoError(mt, err)
})

mt.Run("dropping a database", func(mt *mtest.T) {
db := mt.Client.Database("bd7b09e4-7d12-4bcb-9fc6-9852ad93715a")

err := db.Drop(context.Background())
assert.NoError(mt, err)
})

mt.Run("dropping an index", func(t *mtest.T) {
indexModel := mongo.IndexModel{
Keys: bson.M{"username": 1},
Options: options.Index().SetUnique(true).SetName("username_1"),
}

_, err := mt.Coll.Indexes().CreateOne(context.TODO(), indexModel)
assert.NoError(mt, err, "failed to create index")

_, err = mt.Coll.Indexes().DropOne(context.Background(), "username_1")
assert.NoError(mt, err)
})
})

mt.RunOpts("bulk write", noClientOpts, func(mt *mtest.T) {
wcCollOpts := options.Collection().SetWriteConcern(impossibleWc)
wcTestOpts := mtest.NewOptions().CollectionOptions(wcCollOpts).Topologies(mtest.ReplicaSet).CreateClient(false)
Expand Down Expand Up @@ -1703,22 +1818,6 @@ func TestCollection(t *testing.T) {
assert.Equal(mt, res.UpsertedIDs[1].(string), id1, "expected UpsertedIDs[1] to be %v, got %v", id1, res.UpsertedIDs[1])
assert.Equal(mt, res.UpsertedIDs[3].(string), id3, "expected UpsertedIDs[3] to be %v, got %v", id3, res.UpsertedIDs[3])
})
unackClientOpts := options.Client().
SetWriteConcern(writeconcern.Unacknowledged())
unackMtOpts := mtest.NewOptions().
ClientOptions(unackClientOpts).
MinServerVersion("3.6")
mt.RunOpts("unacknowledged write", unackMtOpts, func(mt *mtest.T) {
models := []mongo.WriteModel{
mongo.NewInsertOneModel().SetDocument(bson.D{{"x", 1}}),
}
_, err := mt.Coll.BulkWrite(context.Background(), models)
if !errors.Is(err, mongo.ErrUnacknowledgedWrite) {
// Use a direct comparison rather than assert.Equal because assert.Equal will compare the error strings,
// so the assertion would succeed even if the error had not been wrapped.
mt.Fatalf("expected BulkWrite error %v, got %v", mongo.ErrUnacknowledgedWrite, err)
}
})
mt.RunOpts("insert and delete with batches", mtest.NewOptions().ClientType(mtest.Mock), func(mt *mtest.T) {
// grouped together because delete requires the documents to be inserted
maxBatchCount := int(mtest.MockDescription.MaxBatchCount)
Expand Down
14 changes: 0 additions & 14 deletions internal/integration/index_view_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ package integration

import (
"context"
"errors"
"testing"

"github.com/google/go-cmp/cmp"
Expand Down Expand Up @@ -248,19 +247,6 @@ func TestIndexView(t *testing.T) {
})
}
})
unackClientOpts := options.Client().
SetWriteConcern(writeconcern.Unacknowledged())
unackMtOpts := mtest.NewOptions().
ClientOptions(unackClientOpts).
MinServerVersion("3.6")
mt.RunOpts("unacknowledged write", unackMtOpts, func(mt *mtest.T) {
_, err := mt.Coll.Indexes().CreateOne(context.Background(), mongo.IndexModel{Keys: bson.D{{"x", 1}}})
if !errors.Is(err, mongo.ErrUnacknowledgedWrite) {
// Use a direct comparison rather than assert.Equal because assert.Equal will compare the error strings,
// so the assertion would succeed even if the error had not been wrapped.
mt.Fatalf("expected CreateOne error %v, got %v", mongo.ErrUnacknowledgedWrite, err)
}
})
// Needs to run on these versions for failpoints
mt.RunOpts("replace error", mtest.NewOptions().Topologies(mtest.ReplicaSet).MinServerVersion("4.0"), func(mt *mtest.T) {
mt.SetFailPoint(mtest.FailPoint{
Expand Down
16 changes: 6 additions & 10 deletions internal/integration/mtest/mongotest.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,17 +514,13 @@ func (t *T) ClearCollections() {
DropEncryptedCollection(t, coll.created, coll.CreateOpts.EncryptedFields)
}

err := coll.created.Drop(context.Background())
if errors.Is(err, mongo.ErrUnacknowledgedWrite) || errors.Is(err, driver.ErrUnacknowledgedWrite) {
// It's possible that a collection could have an unacknowledged write concern, which
// could prevent it from being dropped for sharded clusters. We can resolve this by
// re-instantiating the collection with a majority write concern before dropping.
collname := coll.created.Name()
wcm := writeconcern.Majority()
wccoll := t.DB.Collection(collname, options.Collection().SetWriteConcern(wcm))
_ = wccoll.Drop(context.Background())
// It's possible that a collection could have an unacknowledged write
// concern, which could prevent it from being dropped for sharded
// clusters. We can resolve this by re-instantiating the collection with
// a majority write concern before dropping.
clonedColl := coll.created.Clone(options.Collection().SetWriteConcern(writeconcern.Majority()))

}
_ = clonedColl.Drop(context.Background())
}
}
t.createdColls = t.createdColls[:0]
Expand Down
9 changes: 6 additions & 3 deletions internal/integration/sessions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,13 +113,16 @@ func TestSessions(t *testing.T) {
assert.Nil(mt, err, "StartSession error: %v", err)
defer sess.EndSession(context.Background())

var res *mongo.InsertOneResult

err = mongo.WithSession(context.Background(), sess, func(sc context.Context) error {
_, err := mt.Coll.InsertOne(sc, bson.D{{"x", 1}})
res, err = mt.Coll.InsertOne(sc, bson.D{{"x", 1}})

return err
})

assert.Equal(mt, err, mongo.ErrUnacknowledgedWrite,
"expected ErrUnacknowledgedWrite on unacknowledged write in session, got %v", err)
assert.NoError(mt, err)
assert.False(mt, res.Acknowledged)
})

// Regression test for GODRIVER-2533. Note that this test assumes the race
Expand Down
10 changes: 7 additions & 3 deletions mongo/bulk_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,14 @@ func (bw *bulkWrite) execute(ctx context.Context) error {
}

bw.result.MatchedCount -= bw.result.UpsertedCount
if lastErr != nil {
_, lastErr = processWriteError(lastErr)
return lastErr

rr, err := processWriteError(lastErr)
if err != nil {
return err
}

bw.result.Acknowledged = rr.isAcknowledged()

if len(bwErr.WriteErrors) > 0 || bwErr.WriteConcernError != nil {
return bwErr
}
Expand Down
30 changes: 21 additions & 9 deletions mongo/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,10 +432,14 @@ func (coll *Collection) InsertOne(ctx context.Context, document interface{},
res, err := coll.insert(ctx, []interface{}{document}, imOpts)

rr, err := processWriteError(err)
if rr&rrOne == 0 {
if rr&rrOne == 0 && rr.isAcknowledged() {
return nil, err
}
return &InsertOneResult{InsertedID: res[0]}, err

return &InsertOneResult{
InsertedID: res[0],
Acknowledged: rr.isAcknowledged(),
}, err
}

// InsertMany executes an insert command to insert multiple documents into the collection. If write errors occur
Expand Down Expand Up @@ -471,7 +475,10 @@ func (coll *Collection) InsertMany(ctx context.Context, documents interface{},
return nil, err
}

imResult := &InsertManyResult{InsertedIDs: result}
imResult := &InsertManyResult{
InsertedIDs: result,
Acknowledged: rr.isAcknowledged(),
}
var writeException WriteException
if !errors.As(err, &writeException) {
return imResult, err
Expand Down Expand Up @@ -603,7 +610,10 @@ func (coll *Collection) delete(ctx context.Context, filter interface{}, deleteOn
if rr&expectedRr == 0 {
return nil, err
}
return &DeleteResult{DeletedCount: op.Result().N}, err
return &DeleteResult{
DeletedCount: op.Result().N,
Acknowledged: rr.isAcknowledged(),
}, err
}

// DeleteOne executes a delete command to delete at most one document from the collection.
Expand Down Expand Up @@ -754,6 +764,7 @@ func (coll *Collection) updateOrReplace(ctx context.Context, filter bsoncore.Doc
MatchedCount: opRes.N,
ModifiedCount: opRes.NModified,
UpsertedCount: int64(len(opRes.Upserted)),
Acknowledged: rr.isAcknowledged(),
}
if len(opRes.Upserted) > 0 {
res.UpsertedID = opRes.Upserted[0].ID
Expand Down Expand Up @@ -1725,16 +1736,17 @@ func (coll *Collection) findAndModify(ctx context.Context, op *operation.FindAnd
Retry(retry).
Crypt(coll.client.cryptFLE)

_, err = processWriteError(op.Execute(ctx))
rr, err := processWriteError(op.Execute(ctx))
if err != nil {
return &SingleResult{err: err}
}

return &SingleResult{
ctx: ctx,
rdr: bson.Raw(op.Result().Value),
bsonOpts: coll.bsonOpts,
reg: coll.registry,
ctx: ctx,
rdr: bson.Raw(op.Result().Value),
bsonOpts: coll.bsonOpts,
reg: coll.registry,
Acknowledged: rr.isAcknowledged(),
}
}

Expand Down
13 changes: 7 additions & 6 deletions mongo/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,13 +259,14 @@ func (db *Database) RunCommand(ctx context.Context, runCommand interface{}, opts

err = op.Execute(ctx)
// RunCommand can be used to run a write, thus execute may return a write error
_, convErr := processWriteError(err)
rr, convErr := processWriteError(err)
return &SingleResult{
ctx: ctx,
err: convErr,
rdr: bson.Raw(op.Result()),
bsonOpts: db.bsonOpts,
reg: db.registry,
ctx: ctx,
err: convErr,
rdr: bson.Raw(op.Result()),
bsonOpts: db.bsonOpts,
reg: db.registry,
Acknowledged: rr.isAcknowledged(),
}
}

Expand Down
Loading
Loading