diff --git a/internal/integration/collection_test.go b/internal/integration/collection_test.go index dd1eb51e72..4965d662c7 100644 --- a/internal/integration/collection_test.go +++ b/internal/integration/collection_test.go @@ -8,7 +8,6 @@ package integration import ( "context" - "errors" "strings" "testing" @@ -1472,6 +1471,122 @@ func TestCollection(t *testing.T) { assert.NotNil(mt, we.WriteConcernError, "expected write concern error, got %v", err) }) }) + + unackClientOpts := options.Client(). + SetWriteConcern(writeconcern.Unacknowledged()) + unackMtOpts := mtest.NewOptions(). + ClientOptions(unackClientOpts). + MinServerVersion("3.6") + mt.RunOpts("unacknowledged writes", unackMtOpts, func(mt *mtest.T) { + mt.Run("bulk write", func(mt *mtest.T) { + models := []mongo.WriteModel{ + mongo.NewInsertOneModel().SetDocument(bson.D{{"x", 1}}), + } + + res, err := mt.Coll.BulkWrite(context.Background(), models) + + assert.NoError(mt, err) + assert.False(mt, res.Acknowledged) + }) + + mt.Run("insert one", func(mt *mtest.T) { + res, err := mt.Coll.InsertOne(context.Background(), bson.D{{"x", 1}}) + + assert.NoError(mt, err) + assert.False(mt, res.Acknowledged) + }) + + mt.Run("insert many", func(t *mtest.T) { + docs := []interface{}{ + bson.D{{"x", 1}}, + bson.D{{"y", 1}}, + } + + res, err := mt.Coll.InsertMany(context.Background(), docs) + + assert.NoError(mt, err) + assert.False(mt, res.Acknowledged) + }) + + mt.Run("delete", func(mt *mtest.T) { + res, err := mt.Coll.DeleteOne(context.Background(), bson.D{{"x", 1}}) + + assert.NoError(mt, err) + assert.False(mt, res.Acknowledged) + }) + + mt.Run("update", func(t *mtest.T) { + res, err := mt.Coll.UpdateOne(context.Background(), bson.D{{"x", 1}}, bson.D{{"$set", bson.D{{"x", "2"}}}}) + + assert.NoError(mt, err) + assert.False(mt, res.Acknowledged) + }) + + mt.Run("find and modify", func(mt *mtest.T) { + res := mt.Coll.FindOneAndDelete(context.Background(), bson.D{{"x", 1}}) + + assert.ErrorIs(mt, res.Err(), mongo.ErrNoDocuments) + assert.False(mt, res.Acknowledged) + + }) + + mt.Run("dropping a collection", func(mt *mtest.T) { + err := mt.Coll.Drop(context.Background()) + assert.NoError(mt, err) + }) + + mt.Run("creating a collection", func(mt *mtest.T) { + err := mt.DB.CreateCollection(context.Background(), "test coll") + assert.NoError(mt, err) + }) + + mt.Run("creating an index", func(mt *mtest.T) { + indexModel := mongo.IndexModel{ + Keys: bson.M{"username": 1}, + Options: options.Index().SetUnique(true), + } + + _, err := mt.Coll.Indexes().CreateMany(context.Background(), []mongo.IndexModel{indexModel}) + assert.NoError(mt, err) + }) + + mt.Run("creating an index view", func(mt *mtest.T) { + projectStage := bson.D{ + {"$project", bson.D{ + {"_id", 0}, + {"fullName", bson.D{ + {"$concat", []string{"$firstName", " ", "$lastName"}}, + }}, + }}, + } + + pipeline := mongo.Pipeline{projectStage} + + err := mt.DB.CreateView(context.Background(), "testview", "coll", pipeline, nil) + assert.NoError(mt, err) + }) + + mt.Run("dropping a database", func(mt *mtest.T) { + db := mt.Client.Database("bd7b09e4-7d12-4bcb-9fc6-9852ad93715a") + + err := db.Drop(context.Background()) + assert.NoError(mt, err) + }) + + mt.Run("dropping an index", func(t *mtest.T) { + indexModel := mongo.IndexModel{ + Keys: bson.M{"username": 1}, + Options: options.Index().SetUnique(true).SetName("username_1"), + } + + _, err := mt.Coll.Indexes().CreateOne(context.TODO(), indexModel) + assert.NoError(mt, err, "failed to create index") + + _, err = mt.Coll.Indexes().DropOne(context.Background(), "username_1") + assert.NoError(mt, err) + }) + }) + mt.RunOpts("bulk write", noClientOpts, func(mt *mtest.T) { wcCollOpts := options.Collection().SetWriteConcern(impossibleWc) wcTestOpts := mtest.NewOptions().CollectionOptions(wcCollOpts).Topologies(mtest.ReplicaSet).CreateClient(false) @@ -1703,22 +1818,6 @@ func TestCollection(t *testing.T) { assert.Equal(mt, res.UpsertedIDs[1].(string), id1, "expected UpsertedIDs[1] to be %v, got %v", id1, res.UpsertedIDs[1]) assert.Equal(mt, res.UpsertedIDs[3].(string), id3, "expected UpsertedIDs[3] to be %v, got %v", id3, res.UpsertedIDs[3]) }) - unackClientOpts := options.Client(). - SetWriteConcern(writeconcern.Unacknowledged()) - unackMtOpts := mtest.NewOptions(). - ClientOptions(unackClientOpts). - MinServerVersion("3.6") - mt.RunOpts("unacknowledged write", unackMtOpts, func(mt *mtest.T) { - models := []mongo.WriteModel{ - mongo.NewInsertOneModel().SetDocument(bson.D{{"x", 1}}), - } - _, err := mt.Coll.BulkWrite(context.Background(), models) - if !errors.Is(err, mongo.ErrUnacknowledgedWrite) { - // Use a direct comparison rather than assert.Equal because assert.Equal will compare the error strings, - // so the assertion would succeed even if the error had not been wrapped. - mt.Fatalf("expected BulkWrite error %v, got %v", mongo.ErrUnacknowledgedWrite, err) - } - }) mt.RunOpts("insert and delete with batches", mtest.NewOptions().ClientType(mtest.Mock), func(mt *mtest.T) { // grouped together because delete requires the documents to be inserted maxBatchCount := int(mtest.MockDescription.MaxBatchCount) diff --git a/internal/integration/index_view_test.go b/internal/integration/index_view_test.go index e3d1ae687c..dff50050f2 100644 --- a/internal/integration/index_view_test.go +++ b/internal/integration/index_view_test.go @@ -8,7 +8,6 @@ package integration import ( "context" - "errors" "testing" "github.com/google/go-cmp/cmp" @@ -248,19 +247,6 @@ func TestIndexView(t *testing.T) { }) } }) - unackClientOpts := options.Client(). - SetWriteConcern(writeconcern.Unacknowledged()) - unackMtOpts := mtest.NewOptions(). - ClientOptions(unackClientOpts). - MinServerVersion("3.6") - mt.RunOpts("unacknowledged write", unackMtOpts, func(mt *mtest.T) { - _, err := mt.Coll.Indexes().CreateOne(context.Background(), mongo.IndexModel{Keys: bson.D{{"x", 1}}}) - if !errors.Is(err, mongo.ErrUnacknowledgedWrite) { - // Use a direct comparison rather than assert.Equal because assert.Equal will compare the error strings, - // so the assertion would succeed even if the error had not been wrapped. - mt.Fatalf("expected CreateOne error %v, got %v", mongo.ErrUnacknowledgedWrite, err) - } - }) // Needs to run on these versions for failpoints mt.RunOpts("replace error", mtest.NewOptions().Topologies(mtest.ReplicaSet).MinServerVersion("4.0"), func(mt *mtest.T) { mt.SetFailPoint(mtest.FailPoint{ diff --git a/internal/integration/mtest/mongotest.go b/internal/integration/mtest/mongotest.go index affa4233df..6d706ea536 100644 --- a/internal/integration/mtest/mongotest.go +++ b/internal/integration/mtest/mongotest.go @@ -514,17 +514,13 @@ func (t *T) ClearCollections() { DropEncryptedCollection(t, coll.created, coll.CreateOpts.EncryptedFields) } - err := coll.created.Drop(context.Background()) - if errors.Is(err, mongo.ErrUnacknowledgedWrite) || errors.Is(err, driver.ErrUnacknowledgedWrite) { - // It's possible that a collection could have an unacknowledged write concern, which - // could prevent it from being dropped for sharded clusters. We can resolve this by - // re-instantiating the collection with a majority write concern before dropping. - collname := coll.created.Name() - wcm := writeconcern.Majority() - wccoll := t.DB.Collection(collname, options.Collection().SetWriteConcern(wcm)) - _ = wccoll.Drop(context.Background()) + // It's possible that a collection could have an unacknowledged write + // concern, which could prevent it from being dropped for sharded + // clusters. We can resolve this by re-instantiating the collection with + // a majority write concern before dropping. + clonedColl := coll.created.Clone(options.Collection().SetWriteConcern(writeconcern.Majority())) - } + _ = clonedColl.Drop(context.Background()) } } t.createdColls = t.createdColls[:0] diff --git a/internal/integration/sessions_test.go b/internal/integration/sessions_test.go index 0150a21fa2..67448f6974 100644 --- a/internal/integration/sessions_test.go +++ b/internal/integration/sessions_test.go @@ -113,13 +113,16 @@ func TestSessions(t *testing.T) { assert.Nil(mt, err, "StartSession error: %v", err) defer sess.EndSession(context.Background()) + var res *mongo.InsertOneResult + err = mongo.WithSession(context.Background(), sess, func(sc context.Context) error { - _, err := mt.Coll.InsertOne(sc, bson.D{{"x", 1}}) + res, err = mt.Coll.InsertOne(sc, bson.D{{"x", 1}}) + return err }) - assert.Equal(mt, err, mongo.ErrUnacknowledgedWrite, - "expected ErrUnacknowledgedWrite on unacknowledged write in session, got %v", err) + assert.NoError(mt, err) + assert.False(mt, res.Acknowledged) }) // Regression test for GODRIVER-2533. Note that this test assumes the race diff --git a/mongo/bulk_write.go b/mongo/bulk_write.go index 96bea7b8e0..cf8127ccc9 100644 --- a/mongo/bulk_write.go +++ b/mongo/bulk_write.go @@ -87,10 +87,14 @@ func (bw *bulkWrite) execute(ctx context.Context) error { } bw.result.MatchedCount -= bw.result.UpsertedCount - if lastErr != nil { - _, lastErr = processWriteError(lastErr) - return lastErr + + rr, err := processWriteError(lastErr) + if err != nil { + return err } + + bw.result.Acknowledged = rr.isAcknowledged() + if len(bwErr.WriteErrors) > 0 || bwErr.WriteConcernError != nil { return bwErr } diff --git a/mongo/collection.go b/mongo/collection.go index ea75a4a0cd..828dd95065 100644 --- a/mongo/collection.go +++ b/mongo/collection.go @@ -432,10 +432,14 @@ func (coll *Collection) InsertOne(ctx context.Context, document interface{}, res, err := coll.insert(ctx, []interface{}{document}, imOpts) rr, err := processWriteError(err) - if rr&rrOne == 0 { + if rr&rrOne == 0 && rr.isAcknowledged() { return nil, err } - return &InsertOneResult{InsertedID: res[0]}, err + + return &InsertOneResult{ + InsertedID: res[0], + Acknowledged: rr.isAcknowledged(), + }, err } // InsertMany executes an insert command to insert multiple documents into the collection. If write errors occur @@ -471,7 +475,10 @@ func (coll *Collection) InsertMany(ctx context.Context, documents interface{}, return nil, err } - imResult := &InsertManyResult{InsertedIDs: result} + imResult := &InsertManyResult{ + InsertedIDs: result, + Acknowledged: rr.isAcknowledged(), + } var writeException WriteException if !errors.As(err, &writeException) { return imResult, err @@ -603,7 +610,10 @@ func (coll *Collection) delete(ctx context.Context, filter interface{}, deleteOn if rr&expectedRr == 0 { return nil, err } - return &DeleteResult{DeletedCount: op.Result().N}, err + return &DeleteResult{ + DeletedCount: op.Result().N, + Acknowledged: rr.isAcknowledged(), + }, err } // DeleteOne executes a delete command to delete at most one document from the collection. @@ -754,6 +764,7 @@ func (coll *Collection) updateOrReplace(ctx context.Context, filter bsoncore.Doc MatchedCount: opRes.N, ModifiedCount: opRes.NModified, UpsertedCount: int64(len(opRes.Upserted)), + Acknowledged: rr.isAcknowledged(), } if len(opRes.Upserted) > 0 { res.UpsertedID = opRes.Upserted[0].ID @@ -1725,16 +1736,17 @@ func (coll *Collection) findAndModify(ctx context.Context, op *operation.FindAnd Retry(retry). Crypt(coll.client.cryptFLE) - _, err = processWriteError(op.Execute(ctx)) + rr, err := processWriteError(op.Execute(ctx)) if err != nil { return &SingleResult{err: err} } return &SingleResult{ - ctx: ctx, - rdr: bson.Raw(op.Result().Value), - bsonOpts: coll.bsonOpts, - reg: coll.registry, + ctx: ctx, + rdr: bson.Raw(op.Result().Value), + bsonOpts: coll.bsonOpts, + reg: coll.registry, + Acknowledged: rr.isAcknowledged(), } } diff --git a/mongo/database.go b/mongo/database.go index 36296a11b7..22fffc4821 100644 --- a/mongo/database.go +++ b/mongo/database.go @@ -259,13 +259,14 @@ func (db *Database) RunCommand(ctx context.Context, runCommand interface{}, opts err = op.Execute(ctx) // RunCommand can be used to run a write, thus execute may return a write error - _, convErr := processWriteError(err) + rr, convErr := processWriteError(err) return &SingleResult{ - ctx: ctx, - err: convErr, - rdr: bson.Raw(op.Result()), - bsonOpts: db.bsonOpts, - reg: db.registry, + ctx: ctx, + err: convErr, + rdr: bson.Raw(op.Result()), + bsonOpts: db.bsonOpts, + reg: db.registry, + Acknowledged: rr.isAcknowledged(), } } diff --git a/mongo/errors.go b/mongo/errors.go index 5b2c039898..91bb5c856a 100644 --- a/mongo/errors.go +++ b/mongo/errors.go @@ -55,6 +55,15 @@ func replaceErrors(err error) error { return nil } + // Do not propagate the acknowledgement sentinel error. For DDL commands, + // (creating indexes, dropping collections, etc) acknowledgement should be + // ignored. For non-DDL write commands (insert, update, etc), acknowledgement + // should be be propagated at the result-level: e.g., + // SingleResult.Acknowledged. + if err == driver.ErrUnacknowledgedWrite { + return nil + } + if errors.Is(err, topology.ErrTopologyClosed) { return ErrClientDisconnected } @@ -616,34 +625,45 @@ const ( rrNone returnResult = 1 << iota // None means do not return the result ever. rrOne // One means return the result if this was called by a *One method. rrMany // Many means return the result is this was called by a *Many method. + rrUnacknowledged - rrAll returnResult = rrOne | rrMany // All means always return the result. + rrAll returnResult = rrOne | rrMany // All means always return the result. + rrAllUnacknowledged returnResult = rrAll | rrUnacknowledged // All + unacknowledged write ) +func (rr returnResult) isAcknowledged() bool { + return rr&rrUnacknowledged == 0 +} + // processWriteError handles processing the result of a write operation. If the retrunResult matches // the calling method's type, it should return the result object in addition to the error. // This function will wrap the errors from other packages and return them as errors from this package. // // WriteConcernError will be returned over WriteErrors if both are present. func processWriteError(err error) (returnResult, error) { - switch { - case errors.Is(err, driver.ErrUnacknowledgedWrite): - return rrAll, ErrUnacknowledgedWrite - case err != nil: - switch tt := err.(type) { - case driver.WriteCommandError: - return rrMany, WriteException{ - WriteConcernError: convertDriverWriteConcernError(tt.WriteConcernError), - WriteErrors: writeErrorsFromDriverWriteErrors(tt.WriteErrors), - Labels: tt.Labels, - Raw: bson.Raw(tt.Raw), - } - default: - return rrNone, replaceErrors(err) - } - default: + if err == nil { return rrAll, nil } + // Do not propagate the acknowledgement sentinel error. For DDL commands, + // (creating indexes, dropping collections, etc) acknowledgement should be + // ignored. For non-DDL write commands (insert, update, etc), acknowledgement + // should be be propagated at the result-level: e.g., + // SingleResult.Acknowledged. + if err == driver.ErrUnacknowledgedWrite { + return rrAllUnacknowledged, nil + } + + wce, ok := err.(driver.WriteCommandError) + if !ok { + return rrNone, replaceErrors(err) + } + + return rrMany, WriteException{ + WriteConcernError: convertDriverWriteConcernError(wce.WriteConcernError), + WriteErrors: writeErrorsFromDriverWriteErrors(wce.WriteErrors), + Labels: wce.Labels, + Raw: bson.Raw(wce.Raw), + } } // batchErrorsTargetLength is the target length of error messages returned by batch operation diff --git a/mongo/index_view.go b/mongo/index_view.go index 59ea8c8e26..e6df212f46 100644 --- a/mongo/index_view.go +++ b/mongo/index_view.go @@ -166,7 +166,11 @@ func (iv IndexView) ListSpecifications(ctx context.Context, opts ...*options.Lis // CreateOne executes a createIndexes command to create an index on the collection and returns the name of the new // index. See the IndexView.CreateMany documentation for more information and an example. -func (iv IndexView) CreateOne(ctx context.Context, model IndexModel, opts ...*options.CreateIndexesOptions) (string, error) { +func (iv IndexView) CreateOne( + ctx context.Context, + model IndexModel, + opts ...*options.CreateIndexesOptions, +) (string, error) { names, err := iv.CreateMany(ctx, []IndexModel{model}, opts...) if err != nil { return "", err @@ -185,7 +189,11 @@ func (iv IndexView) CreateOne(ctx context.Context, model IndexModel, opts ...*op // documentation). // // For more information about the command, see https://www.mongodb.com/docs/manual/reference/command/createIndexes/. -func (iv IndexView) CreateMany(ctx context.Context, models []IndexModel, opts ...*options.CreateIndexesOptions) ([]string, error) { +func (iv IndexView) CreateMany( + ctx context.Context, + models []IndexModel, + opts ...*options.CreateIndexesOptions, +) ([]string, error) { names := make([]string, 0, len(models)) var indexes bsoncore.Document @@ -285,9 +293,8 @@ func (iv IndexView) CreateMany(ctx context.Context, models []IndexModel, opts .. op.CommitQuorum(commitQuorum) } - err = op.Execute(ctx) + _, err = processWriteError(op.Execute(ctx)) if err != nil { - _, err = processWriteError(err) return nil, err } diff --git a/mongo/results.go b/mongo/results.go index 8436ab49de..7283a0c131 100644 --- a/mongo/results.go +++ b/mongo/results.go @@ -32,18 +32,30 @@ type BulkWriteResult struct { // A map of operation index to the _id of each upserted document. UpsertedIDs map[int64]interface{} + + // Operation performed with an acknowledged write. Values for other fields may + // not be deterministic if the write operation was unacknowledged. + Acknowledged bool } // InsertOneResult is the result type returned by an InsertOne operation. type InsertOneResult struct { // The _id of the inserted document. A value generated by the driver will be of type bson.ObjectID. InsertedID interface{} + + // Operation performed with an acknowledged write. Values for other fields may + // not be deterministic if the write operation was unacknowledged. + Acknowledged bool } // InsertManyResult is a result type returned by an InsertMany operation. type InsertManyResult struct { // The _id values of the inserted documents. Values generated by the driver will be of type bson.ObjectID. InsertedIDs []interface{} + + // Operation performed with an acknowledged write. Values for other fields may + // not be deterministic if the write operation was unacknowledged. + Acknowledged bool } // TODO(GODRIVER-2367): Remove the BSON struct tags on DeleteResult. @@ -51,6 +63,10 @@ type InsertManyResult struct { // DeleteResult is the result type returned by DeleteOne and DeleteMany operations. type DeleteResult struct { DeletedCount int64 // The number of documents deleted. + + // Operation performed with an acknowledged write. Values for other fields may + // not be deterministic if the write operation was unacknowledged. + Acknowledged bool } // RewrapManyDataKeyResult is the result of the bulk write operation used to update the key vault collection with @@ -95,6 +111,10 @@ type UpdateResult struct { ModifiedCount int64 // The number of documents modified by the operation. UpsertedCount int64 // The number of documents upserted by the operation. UpsertedID interface{} // The _id field of the upserted document, or nil if no upsert was done. + + // Operation performed with an acknowledged write. Values for other fields may + // not be deterministic if the write operation was unacknowledged. + Acknowledged bool } // IndexSpecification represents an index in a database. This type is returned by the IndexView.ListSpecifications diff --git a/mongo/single_result.go b/mongo/single_result.go index 6a0a695685..8998596992 100644 --- a/mongo/single_result.go +++ b/mongo/single_result.go @@ -28,6 +28,11 @@ type SingleResult struct { rdr bson.Raw bsonOpts *options.BSONOptions reg *bson.Registry + + // Operation performed with an acknowledged write. Values returned by + // SingleResult methods may not be deterministic if the write operation was + // unacknowledged and so should not be relied upon. + Acknowledged bool } // NewSingleResultFromDocument creates a SingleResult with the provided error, registry, and an underlying Cursor pre-loaded with