Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GODRIVER-2821 De-propagate unacknowledged sentinel error #1661

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
131 changes: 114 additions & 17 deletions internal/integration/collection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ package integration

import (
"context"
"errors"
"strings"
"testing"

Expand Down Expand Up @@ -1472,6 +1471,120 @@ func TestCollection(t *testing.T) {
assert.NotNil(mt, we.WriteConcernError, "expected write concern error, got %v", err)
})
})

unackClientOpts := options.Client().
SetWriteConcern(writeconcern.Unacknowledged())
unackMtOpts := mtest.NewOptions().
ClientOptions(unackClientOpts).
MinServerVersion("3.6")
mt.RunOpts("unacknowledged writes", unackMtOpts, func(mt *mtest.T) {
mt.Run("bulk write", func(mt *mtest.T) {
models := []mongo.WriteModel{
mongo.NewInsertOneModel().SetDocument(bson.D{{"x", 1}}),
}

res, err := mt.Coll.BulkWrite(context.Background(), models)

assert.NoError(mt, err)
assert.False(mt, res.Acknowledged)
})

mt.Run("insert one", func(mt *mtest.T) {
res, err := mt.Coll.InsertOne(context.Background(), bson.D{{"x", 1}})

assert.NoError(mt, err)
assert.False(mt, res.Acknowledged)
})

mt.Run("insert many", func(t *mtest.T) {
docs := []interface{}{
bson.D{{"x", 1}},
bson.D{{"y", 1}},
}

res, err := mt.Coll.InsertMany(context.Background(), docs)

assert.NoError(mt, err)
assert.False(mt, res.Acknowledged)
})

mt.Run("delete", func(mt *mtest.T) {
res, err := mt.Coll.DeleteOne(context.Background(), bson.D{{"x", 1}})

assert.NoError(mt, err)
assert.False(mt, res.Acknowledged)
})

mt.Run("update", func(t *mtest.T) {
res, err := mt.Coll.UpdateOne(context.Background(), bson.D{{"x", 1}}, bson.D{{"$set", bson.D{{"x", "2"}}}})

assert.NoError(mt, err)
assert.False(mt, res.Acknowledged)
})

mt.Run("find and modify", func(mt *mtest.T) {
res := mt.Coll.FindOneAndDelete(context.Background(), bson.D{{"x", 1}})

assert.ErrorIs(mt, res.Err(), mongo.ErrNoDocuments)
assert.False(mt, res.Acknowledged)

})

mt.Run("dropping a collection", func(mt *mtest.T) {
err := mt.Coll.Drop(context.Background())
assert.NoError(mt, err)
})

mt.Run("creating a collection", func(mt *mtest.T) {
err := mt.DB.CreateCollection(context.Background(), "test coll")
assert.NoError(mt, err)
})

mt.Run("creating an index", func(mt *mtest.T) {
indexModel := mongo.IndexModel{
Keys: bson.M{"username": 1},
Options: options.Index().SetUnique(true),
}

_, err := mt.Coll.Indexes().CreateMany(context.Background(), []mongo.IndexModel{indexModel})
assert.NoError(mt, err)
})

mt.Run("creating an index view", func(mt *mtest.T) {
projectStage := bson.D{
{"$project", bson.D{
{"_id", 0},
{"fullName", bson.D{
{"$concat", []string{"$firstName", " ", "$lastName"}},
}},
}},
}

pipeline := mongo.Pipeline{projectStage}

err := mt.DB.CreateView(context.Background(), "testview", "coll", pipeline, nil)
assert.NoError(mt, err)
})

mt.Run("dropping a database", func(mt *mtest.T) {
err := mt.DB.Drop(context.Background())
assert.NoError(mt, err)
})

mt.Run("dropping an index", func(t *mtest.T) {
indexModel := mongo.IndexModel{
Keys: bson.M{"username": 1},
Options: options.Index().SetUnique(true).SetName("username_1"),
}

_, err := mt.Coll.Indexes().CreateOne(context.TODO(), indexModel)
assert.NoError(mt, err, "failed to create index")

_, err = mt.Coll.Indexes().DropOne(context.Background(), "username_1")
assert.NoError(mt, err)
})
})

mt.RunOpts("bulk write", noClientOpts, func(mt *mtest.T) {
wcCollOpts := options.Collection().SetWriteConcern(impossibleWc)
wcTestOpts := mtest.NewOptions().CollectionOptions(wcCollOpts).Topologies(mtest.ReplicaSet).CreateClient(false)
Expand Down Expand Up @@ -1703,22 +1816,6 @@ func TestCollection(t *testing.T) {
assert.Equal(mt, res.UpsertedIDs[1].(string), id1, "expected UpsertedIDs[1] to be %v, got %v", id1, res.UpsertedIDs[1])
assert.Equal(mt, res.UpsertedIDs[3].(string), id3, "expected UpsertedIDs[3] to be %v, got %v", id3, res.UpsertedIDs[3])
})
unackClientOpts := options.Client().
SetWriteConcern(writeconcern.Unacknowledged())
unackMtOpts := mtest.NewOptions().
ClientOptions(unackClientOpts).
MinServerVersion("3.6")
mt.RunOpts("unacknowledged write", unackMtOpts, func(mt *mtest.T) {
models := []mongo.WriteModel{
mongo.NewInsertOneModel().SetDocument(bson.D{{"x", 1}}),
}
_, err := mt.Coll.BulkWrite(context.Background(), models)
if !errors.Is(err, mongo.ErrUnacknowledgedWrite) {
// Use a direct comparison rather than assert.Equal because assert.Equal will compare the error strings,
// so the assertion would succeed even if the error had not been wrapped.
mt.Fatalf("expected BulkWrite error %v, got %v", mongo.ErrUnacknowledgedWrite, err)
}
})
mt.RunOpts("insert and delete with batches", mtest.NewOptions().ClientType(mtest.Mock), func(mt *mtest.T) {
// grouped together because delete requires the documents to be inserted
maxBatchCount := int(mtest.MockDescription.MaxBatchCount)
Expand Down
14 changes: 0 additions & 14 deletions internal/integration/index_view_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ package integration

import (
"context"
"errors"
"testing"

"github.com/google/go-cmp/cmp"
Expand Down Expand Up @@ -248,19 +247,6 @@ func TestIndexView(t *testing.T) {
})
}
})
unackClientOpts := options.Client().
SetWriteConcern(writeconcern.Unacknowledged())
unackMtOpts := mtest.NewOptions().
ClientOptions(unackClientOpts).
MinServerVersion("3.6")
mt.RunOpts("unacknowledged write", unackMtOpts, func(mt *mtest.T) {
_, err := mt.Coll.Indexes().CreateOne(context.Background(), mongo.IndexModel{Keys: bson.D{{"x", 1}}})
if !errors.Is(err, mongo.ErrUnacknowledgedWrite) {
// Use a direct comparison rather than assert.Equal because assert.Equal will compare the error strings,
// so the assertion would succeed even if the error had not been wrapped.
mt.Fatalf("expected CreateOne error %v, got %v", mongo.ErrUnacknowledgedWrite, err)
}
})
// Needs to run on these versions for failpoints
mt.RunOpts("replace error", mtest.NewOptions().Topologies(mtest.ReplicaSet).MinServerVersion("4.0"), func(mt *mtest.T) {
mt.SetFailPoint(mtest.FailPoint{
Expand Down
9 changes: 6 additions & 3 deletions internal/integration/sessions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,13 +113,16 @@ func TestSessions(t *testing.T) {
assert.Nil(mt, err, "StartSession error: %v", err)
defer sess.EndSession(context.Background())

var res *mongo.InsertOneResult

err = mongo.WithSession(context.Background(), sess, func(sc context.Context) error {
_, err := mt.Coll.InsertOne(sc, bson.D{{"x", 1}})
res, err = mt.Coll.InsertOne(sc, bson.D{{"x", 1}})

return err
})

assert.Equal(mt, err, mongo.ErrUnacknowledgedWrite,
"expected ErrUnacknowledgedWrite on unacknowledged write in session, got %v", err)
assert.NoError(mt, err)
assert.False(mt, res.Acknowledged)
})

// Regression test for GODRIVER-2533. Note that this test assumes the race
Expand Down
10 changes: 7 additions & 3 deletions mongo/bulk_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,14 @@ func (bw *bulkWrite) execute(ctx context.Context) error {
}

bw.result.MatchedCount -= bw.result.UpsertedCount
if lastErr != nil {
_, lastErr = processWriteError(lastErr)
return lastErr

rr, err := processWriteError(lastErr)
if err != nil {
return err
}

bw.result.Acknowledged = rr.isAcknowledged()

if len(bwErr.WriteErrors) > 0 || bwErr.WriteConcernError != nil {
return bwErr
}
Expand Down
30 changes: 21 additions & 9 deletions mongo/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,10 +432,14 @@ func (coll *Collection) InsertOne(ctx context.Context, document interface{},
res, err := coll.insert(ctx, []interface{}{document}, imOpts)

rr, err := processWriteError(err)
if rr&rrOne == 0 {
if rr&rrOne == 0 && rr.isAcknowledged() {
return nil, err
}
return &InsertOneResult{InsertedID: res[0]}, err

return &InsertOneResult{
InsertedID: res[0],
Acknowledged: rr.isAcknowledged(),
}, err
}

// InsertMany executes an insert command to insert multiple documents into the collection. If write errors occur
Expand Down Expand Up @@ -471,7 +475,10 @@ func (coll *Collection) InsertMany(ctx context.Context, documents interface{},
return nil, err
}

imResult := &InsertManyResult{InsertedIDs: result}
imResult := &InsertManyResult{
InsertedIDs: result,
Acknowledged: rr.isAcknowledged(),
}
var writeException WriteException
if !errors.As(err, &writeException) {
return imResult, err
Expand Down Expand Up @@ -603,7 +610,10 @@ func (coll *Collection) delete(ctx context.Context, filter interface{}, deleteOn
if rr&expectedRr == 0 {
return nil, err
}
return &DeleteResult{DeletedCount: op.Result().N}, err
return &DeleteResult{
DeletedCount: op.Result().N,
Acknowledged: rr.isAcknowledged(),
}, err
}

// DeleteOne executes a delete command to delete at most one document from the collection.
Expand Down Expand Up @@ -754,6 +764,7 @@ func (coll *Collection) updateOrReplace(ctx context.Context, filter bsoncore.Doc
MatchedCount: opRes.N,
ModifiedCount: opRes.NModified,
UpsertedCount: int64(len(opRes.Upserted)),
Acknowledged: rr.isAcknowledged(),
}
if len(opRes.Upserted) > 0 {
res.UpsertedID = opRes.Upserted[0].ID
Expand Down Expand Up @@ -1725,16 +1736,17 @@ func (coll *Collection) findAndModify(ctx context.Context, op *operation.FindAnd
Retry(retry).
Crypt(coll.client.cryptFLE)

_, err = processWriteError(op.Execute(ctx))
rr, err := processWriteError(op.Execute(ctx))
if err != nil {
return &SingleResult{err: err}
}

return &SingleResult{
ctx: ctx,
rdr: bson.Raw(op.Result().Value),
bsonOpts: coll.bsonOpts,
reg: coll.registry,
ctx: ctx,
rdr: bson.Raw(op.Result().Value),
bsonOpts: coll.bsonOpts,
reg: coll.registry,
Acknowledged: rr.isAcknowledged(),
}
}

Expand Down
13 changes: 7 additions & 6 deletions mongo/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,13 +259,14 @@ func (db *Database) RunCommand(ctx context.Context, runCommand interface{}, opts

err = op.Execute(ctx)
// RunCommand can be used to run a write, thus execute may return a write error
_, convErr := processWriteError(err)
rr, convErr := processWriteError(err)
return &SingleResult{
ctx: ctx,
err: convErr,
rdr: bson.Raw(op.Result()),
bsonOpts: db.bsonOpts,
reg: db.registry,
ctx: ctx,
err: convErr,
rdr: bson.Raw(op.Result()),
bsonOpts: db.bsonOpts,
reg: db.registry,
Acknowledged: rr.isAcknowledged(),
}
}

Expand Down
54 changes: 37 additions & 17 deletions mongo/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,15 @@ func replaceErrors(err error) error {
return nil
}

// Do not propagate the acknowledgement sentinel error. For DDL commands,
// (creating indexes, dropping collections, etc) acknowledgement should be
// ignored. For non-DDL write commands (insert, update, etc), acknowledgement
// should be be propagated at the result-level: e.g.,
// SingleResult.Acknowledged.
if err == driver.ErrUnacknowledgedWrite {
return nil
}

if errors.Is(err, topology.ErrTopologyClosed) {
return ErrClientDisconnected
}
Expand Down Expand Up @@ -616,34 +625,45 @@ const (
rrNone returnResult = 1 << iota // None means do not return the result ever.
rrOne // One means return the result if this was called by a *One method.
rrMany // Many means return the result is this was called by a *Many method.
rrUnacknowledged

rrAll returnResult = rrOne | rrMany // All means always return the result.
rrAll returnResult = rrOne | rrMany // All means always return the result.
rrAllUnacknowledged returnResult = rrAll | rrUnacknowledged // All + unacknowledged write
)

func (rr returnResult) isAcknowledged() bool {
return rr&rrUnacknowledged == 0
}

// processWriteError handles processing the result of a write operation. If the retrunResult matches
// the calling method's type, it should return the result object in addition to the error.
// This function will wrap the errors from other packages and return them as errors from this package.
//
// WriteConcernError will be returned over WriteErrors if both are present.
func processWriteError(err error) (returnResult, error) {
switch {
case errors.Is(err, driver.ErrUnacknowledgedWrite):
return rrAll, ErrUnacknowledgedWrite
case err != nil:
switch tt := err.(type) {
case driver.WriteCommandError:
return rrMany, WriteException{
WriteConcernError: convertDriverWriteConcernError(tt.WriteConcernError),
WriteErrors: writeErrorsFromDriverWriteErrors(tt.WriteErrors),
Labels: tt.Labels,
Raw: bson.Raw(tt.Raw),
}
default:
return rrNone, replaceErrors(err)
}
default:
if err == nil {
return rrAll, nil
}
// Do not propagate the acknowledgement sentinel error. For DDL commands,
// (creating indexes, dropping collections, etc) acknowledgement should be
// ignored. For non-DDL write commands (insert, update, etc), acknowledgement
// should be be propagated at the result-level: e.g.,
// SingleResult.Acknowledged.
if err == driver.ErrUnacknowledgedWrite {
return rrAllUnacknowledged, nil
}

wce, ok := err.(driver.WriteCommandError)
if !ok {
return rrNone, replaceErrors(err)
}

return rrMany, WriteException{
WriteConcernError: convertDriverWriteConcernError(wce.WriteConcernError),
WriteErrors: writeErrorsFromDriverWriteErrors(wce.WriteErrors),
Labels: wce.Labels,
Raw: bson.Raw(wce.Raw),
}
}

// batchErrorsTargetLength is the target length of error messages returned by batch operation
Expand Down
Loading
Loading