From 5fcb147028929da63ecddb2ff029b68f390ce1b0 Mon Sep 17 00:00:00 2001 From: Preston Vasquez <24281431+prestonvasquez@users.noreply.github.com> Date: Mon, 10 Oct 2022 10:15:49 -0600 Subject: [PATCH] GODRIVER-2539 Remove MinWireVersion < 6 Logic (#1084) --- mongo/change_stream.go | 12 +- mongo/integration/operation_legacy_test.go | 301 --------- x/mongo/driver/operation.go | 22 +- x/mongo/driver/operation_legacy.go | 731 --------------------- 4 files changed, 4 insertions(+), 1062 deletions(-) delete mode 100644 mongo/integration/operation_legacy_test.go delete mode 100644 x/mongo/driver/operation_legacy.go diff --git a/mongo/change_stream.go b/mongo/change_stream.go index 503dbdfd9d..82d2b57998 100644 --- a/mongo/change_stream.go +++ b/mongo/change_stream.go @@ -287,7 +287,7 @@ func (cs *ChangeStream) executeOperation(ctx context.Context, resuming bool) err // Execute the aggregate, retrying on retryable errors once (1) if retryable reads are enabled and // infinitely (-1) if context is a Timeout context. var retries int - if cs.client.retryReads && cs.wireVersion != nil && cs.wireVersion.Max >= 6 { + if cs.client.retryReads { retries = 1 } if internal.IsTimeoutContext(ctx) { @@ -325,11 +325,8 @@ AggregateExecuteLoop: } defer conn.Close() - // If wire version is now < 6, do not retry. + // Update the wire version with data from the new connection. cs.wireVersion = conn.Description().WireVersion - if cs.wireVersion == nil || cs.wireVersion.Max < 6 { - break AggregateExecuteLoop - } // Reset deployment. cs.aggregate.Deployment(cs.createOperationDeployment(server, conn)) @@ -435,10 +432,7 @@ func (cs *ChangeStream) createPipelineOptionsDoc() (bsoncore.Document, error) { } if cs.options.FullDocument != nil { - // Only append a default "fullDocument" field if wire version is less than 6 (3.6). Otherwise, - // the server will assume users want the default behavior, and "fullDocument" does not need to be - // specified. - if *cs.options.FullDocument != options.Default || (cs.wireVersion != nil && cs.wireVersion.Max < 6) { + if *cs.options.FullDocument != options.Default { plDoc = bsoncore.AppendStringElement(plDoc, "fullDocument", string(*cs.options.FullDocument)) } } diff --git a/mongo/integration/operation_legacy_test.go b/mongo/integration/operation_legacy_test.go deleted file mode 100644 index 48bf3fb086..0000000000 --- a/mongo/integration/operation_legacy_test.go +++ /dev/null @@ -1,301 +0,0 @@ -// Copyright (C) MongoDB, Inc. 2017-present. -// -// Licensed under the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. You may obtain -// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 - -package integration - -import ( - "bytes" - "context" - "testing" - "time" - - "go.mongodb.org/mongo-driver/bson" - "go.mongodb.org/mongo-driver/bson/primitive" - "go.mongodb.org/mongo-driver/internal/testutil/assert" - "go.mongodb.org/mongo-driver/mongo" - "go.mongodb.org/mongo-driver/mongo/description" - "go.mongodb.org/mongo-driver/mongo/integration/mtest" - "go.mongodb.org/mongo-driver/mongo/options" - "go.mongodb.org/mongo-driver/x/bsonx/bsoncore" - "go.mongodb.org/mongo-driver/x/mongo/driver" - "go.mongodb.org/mongo-driver/x/mongo/driver/drivertest" - "go.mongodb.org/mongo-driver/x/mongo/driver/wiremessage" -) - -func TestOperationLegacy(t *testing.T) { - mt := mtest.New(t, noClientOpts) - defer mt.Close() - - mt.RunOpts("verify wiremessage", noClientOpts, func(mt *mtest.T) { - res := bson.D{{"ok", 1}} - resBytes, err := bson.Marshal(res) - assert.Nil(mt, err, "Marshal error: %v", err) - fakeOpReply := drivertest.MakeReply(resBytes) - // mock connection - testConn := &drivertest.ChannelConn{ - Written: make(chan []byte, 5), - ReadResp: make(chan []byte, 10), - Desc: description.Server{ - WireVersion: &description.VersionRange{ - Max: 2, - }, - }, - } - defer func() { - close(testConn.Written) - close(testConn.ReadResp) - }() - for i := 0; i < 10; i++ { - testConn.ReadResp <- fakeOpReply - } - testClientOpts := &options.ClientOptions{Deployment: driver.SingleConnectionDeployment{C: testConn}} - - // test cases for commands that will generate an OP_QUERY - cases := []struct { - name string - cmdFn func(*mtest.T) opQuery // runs a command and returns the expected wire message - }{ - {"find", runFindWithOptions}, - {"list collections", runListCollectionsWithOptions}, - {"list indexes", runListIndexesWithOptions}, - } - for _, tc := range cases { - mt.RunOpts(tc.name, mtest.NewOptions().ClientOptions(testClientOpts), func(mt *mtest.T) { - // Clear any messages written during test setup. Each message written consumed one of the pre-loaded - // replies, so add a fakeOpReply to the ReadResp channel for each one. - for len(testConn.Written) > 0 { - <-testConn.Written - testConn.ReadResp <- fakeOpReply - } - expectedQuery := tc.cmdFn(mt) - - assert.NotEqual(mt, 0, len(testConn.Written), "no message written to connection") - validateQueryWiremessage(mt, <-testConn.Written, expectedQuery) - }) - } - }) - mt.RunOpts("verify results", noClientOpts, func(mt *mtest.T) { - mt.RunOpts("find", mtest.NewOptions().MaxServerVersion("3.0"), func(mt *mtest.T) { - initCollection(mt, mt.Coll) - cursor, err := mt.Coll.Find(context.Background(), bson.D{}, options.Find().SetSort(bson.D{{"x", 1}})) - assert.Nil(mt, err, "Find error: %v", err) - - for i := 1; i <= 5; i++ { - assert.True(mt, cursor.Next(context.Background()), "Next returned false on iteration %v", i) - got := cursor.Current.Lookup("x").Int32() - assert.Equal(mt, int32(i), got, "expected x value %v, got %v", i, got) - } - assert.False(mt, cursor.Next(context.Background()), "found extra document %v", cursor.Current) - err = cursor.Err() - assert.Nil(mt, err, "cursor error: %v", err) - }) - mt.RunOpts("list collections", mtest.NewOptions().MaxServerVersion("2.7.6").DatabaseName("test_legacy"), func(mt *mtest.T) { - // run on a separate database to avoid finding other collections if we run these tests in parallel - cursor, err := mt.DB.ListCollections(context.Background(), bson.D{}) - assert.Nil(mt, err, "ListCollections error: %v", err) - - for i := 0; i < 2; i++ { - assert.True(mt, cursor.Next(context.Background()), "Next returned false on iteration %v", i) - collName := cursor.Current.Lookup("name").StringValue() - assert.True(mt, collName == mt.Coll.Name() || collName == "system.indexes", - "unexpected collection %v", collName) - } - assert.False(mt, cursor.Next(context.Background()), "found extra document %v", cursor.Current) - err = cursor.Err() - assert.Nil(mt, err, "cursor error: %v", err) - }) - mt.RunOpts("list indexes", mtest.NewOptions().MaxServerVersion("2.7.6"), func(mt *mtest.T) { - // create index so an index besides _id is found - iv := mt.Coll.Indexes() - indexName, err := iv.CreateOne(context.Background(), mongo.IndexModel{ - Keys: bson.D{{"x", 1}}, - }) - assert.Nil(mt, err, "CreateOne error: %v", err) - - cursor, err := iv.List(context.Background()) - expectedNs := fullCollName(mt, mt.Coll.Name()) - assert.Nil(mt, err, "List error: %v", err) - for i := 0; i < 2; i++ { - assert.True(mt, cursor.Next(context.Background()), "Next returned false on iteration %v", i) - ns := cursor.Current.Lookup("ns").StringValue() - assert.Equal(mt, expectedNs, ns, "expected ns %v, got %v", expectedNs, ns) - - name := cursor.Current.Lookup("name").StringValue() - assert.True(mt, name == "_id_" || name == indexName, "unexpected index %v", name) - } - assert.False(mt, cursor.Next(context.Background()), "found extra document %v", cursor.Current) - err = cursor.Err() - assert.Nil(mt, err, "cursor error: %v", err) - }) - mt.RunOpts("find and killCursors", mtest.NewOptions().MaxServerVersion("3.0"), func(mt *mtest.T) { - initCollection(mt, mt.Coll) - // set batch size to force multiple batches - cursor, err := mt.Coll.Find(context.Background(), bson.D{}, options.Find().SetBatchSize(2)) - assert.Nil(mt, err, "Find error: %v", err) - // close cursor to force a killCursors to be sent - mt.ClearEvents() - err = cursor.Close(context.Background()) - assert.Nil(mt, err, "Close error: %v", err) - evt := mt.GetStartedEvent() - assert.NotNil(mt, evt, "expected CommandStartedEvent, got nil") - assert.Equal(mt, "killCursors", evt.CommandName, "expected command 'killCursors', got '%v'", evt.CommandName) - }) - }) -} - -type opQuery struct { - flags wiremessage.QueryFlag - fullCollectionName string - numToSkip, numToReturn int32 - query, returnFieldsSelector bson.D -} - -func fullCollName(mt *mtest.T, coll string) string { - return mt.DB.Name() + "." + coll -} - -func runFindWithOptions(mt *mtest.T) opQuery { - maxDoc := bson.D{{"indexBounds", bson.D{{"x", 50}}}} - minDoc := bson.D{{"indexBounds", bson.D{{"x", 50}}}} - projection := bson.D{{"y", 0}} - sort := bson.D{{"x", 1}} - filter := bson.D{{"x", 1}} - - opts := options.Find(). - SetAllowPartialResults(true). - SetBatchSize(2). - SetComment("hello"). - SetCursorType(options.Tailable). - SetHint("hintFoo"). - SetLimit(5). - SetMax(maxDoc). - SetMaxTime(10000 * time.Millisecond). - SetMin(minDoc). - SetNoCursorTimeout(true). - SetOplogReplay(true). - SetProjection(projection). - SetReturnKey(false). - SetShowRecordID(false). - SetSkip(1). - SetSnapshot(false). - SetSort(sort) - _, _ = mt.Coll.Find(context.Background(), filter, opts) - - // find expectations - findQueryDoc := bson.D{ - {"$query", filter}, - {"$comment", "hello"}, - {"$hint", "hintFoo"}, - {"$max", maxDoc}, - {"$min", minDoc}, - {"$returnKey", false}, - {"$showDiskLoc", false}, - {"$snapshot", false}, - {"$orderby", sort}, - {"$maxTimeMS", int64(10000)}, - } - return opQuery{ - flags: wiremessage.Partial | wiremessage.TailableCursor | wiremessage.NoCursorTimeout | wiremessage.OplogReplay | wiremessage.SecondaryOK, - fullCollectionName: fullCollName(mt, mt.Coll.Name()), - numToSkip: 1, - numToReturn: 2, - query: findQueryDoc, - returnFieldsSelector: projection, - } -} - -func runListCollectionsWithOptions(mt *mtest.T) opQuery { - _, _ = mt.DB.ListCollections(context.Background(), bson.D{{"name", "foo"}}) - - regexDoc := bson.D{{"name", primitive.Regex{Pattern: "^[^$]*$"}}} - modifiedFilterDoc := bson.D{{"name", fullCollName(mt, "foo")}} - listCollDoc := bson.D{ - {"$and", bson.A{regexDoc, modifiedFilterDoc}}, - } - return opQuery{ - flags: wiremessage.SecondaryOK, - fullCollectionName: fullCollName(mt, "system.namespaces"), - query: listCollDoc, - } -} - -func runListIndexesWithOptions(mt *mtest.T) opQuery { - _, _ = mt.Coll.Indexes().List(context.Background(), options.ListIndexes().SetBatchSize(2).SetMaxTime(10000*time.Millisecond)) - - listIndexesDoc := bson.D{ - {"$query", bson.D{{"ns", fullCollName(mt, mt.Coll.Name())}}}, - {"$maxTimeMS", int64(10000)}, - } - return opQuery{ - flags: wiremessage.SecondaryOK, - fullCollectionName: fullCollName(mt, "system.indexes"), - numToReturn: 2, - query: listIndexesDoc, - } -} - -func validateHeader(mt *mtest.T, wm []byte, expectedOpcode wiremessage.OpCode) []byte { - mt.Helper() - - actualLen := len(wm) - var readLen int32 - var opcode wiremessage.OpCode - var ok bool - - readLen, _, _, opcode, wm, ok = wiremessage.ReadHeader(wm) - assert.True(mt, ok, "could not read header") - assert.Equal(mt, int32(actualLen), readLen, "expected header length %v, got %v", actualLen, readLen) - assert.Equal(mt, expectedOpcode, opcode, "expected opcode %v, got %v", expectedOpcode, opcode) - return wm -} - -func validateQueryWiremessage(mt *mtest.T, wm []byte, expected opQuery) { - mt.Helper() - - var numToSkip, numToReturn int32 - var flags wiremessage.QueryFlag - var fullCollName string - var query, returnFieldsSelector bsoncore.Document - var ok bool - - wm = validateHeader(mt, wm, wiremessage.OpQuery) - - flags, wm, ok = wiremessage.ReadQueryFlags(wm) - assert.True(mt, ok, "could not read flags") - assert.Equal(mt, expected.flags, flags, "expected query flags %v, got %v", expected.flags, flags) - - fullCollName, wm, ok = wiremessage.ReadQueryFullCollectionName(wm) - assert.True(mt, ok, "could not read fullCollectionName") - assert.Equal(mt, expected.fullCollectionName, fullCollName, "expected namespace %v, got %v", expected.fullCollectionName, fullCollName) - - numToSkip, wm, ok = wiremessage.ReadQueryNumberToSkip(wm) - assert.True(mt, ok, "could not read numToSkip") - assert.Equal(mt, expected.numToSkip, numToSkip, "expected skip %v, got %v", expected.numToSkip, numToSkip) - - numToReturn, wm, ok = wiremessage.ReadQueryNumberToReturn(wm) - assert.True(mt, ok, "could not read numToReturn") - assert.Equal(mt, expected.numToReturn, numToReturn, "expected num to return %v, got %v", expected.numToReturn, numToReturn) - - query, wm, ok = wiremessage.ReadQueryQuery(wm) - assert.True(mt, ok, "could not read query document") - expectedQueryBytes, err := bson.Marshal(expected.query) - assert.Nil(mt, err, "Marshal error for query: %v", err) - assert.True(mt, bytes.Equal(query, expectedQueryBytes), "expected query %v, got %v", bsoncore.Document(expectedQueryBytes), query) - - if len(expected.returnFieldsSelector) == 0 { - assert.Equal(mt, 0, len(wm), "wire message had extraneous bytes") - return - } - - returnFieldsSelector, wm, ok = wiremessage.ReadQueryReturnFieldsSelector(wm) - assert.True(mt, ok, "could not read returnFieldsSelector") - assert.Equal(mt, 0, len(wm), "wire message had extraneous bytes after return fields selector") - - expectedRfsBytes, err := bson.Marshal(expected.returnFieldsSelector) - assert.Nil(mt, err, "Marshal error for return fields selector: %v", err) - assert.True(mt, bytes.Equal(returnFieldsSelector, expectedRfsBytes), - "expected return fields selector %v, got %v", bsoncore.Document(expectedRfsBytes), returnFieldsSelector) -} diff --git a/x/mongo/driver/operation.go b/x/mongo/driver/operation.go index 10b10a9c1d..6324e95119 100644 --- a/x/mongo/driver/operation.go +++ b/x/mongo/driver/operation.go @@ -505,24 +505,6 @@ func (op Operation) Execute(ctx context.Context) error { } desc := description.SelectedServer{Server: conn.Description(), Kind: op.Deployment.Kind()} - if desc.WireVersion == nil || desc.WireVersion.Max < 4 { - switch op.Legacy { - case LegacyFind: - return op.legacyFind(ctx, (*wm)[:0], srvr, conn, desc, maxTimeMS) - case LegacyGetMore: - return op.legacyGetMore(ctx, (*wm)[:0], srvr, conn, desc) - case LegacyKillCursors: - return op.legacyKillCursors(ctx, (*wm)[:0], srvr, conn, desc) - } - } - if desc.WireVersion == nil || desc.WireVersion.Max < 3 { - switch op.Legacy { - case LegacyListCollections: - return op.legacyListCollections(ctx, (*wm)[:0], srvr, conn, desc) - case LegacyListIndexes: - return op.legacyListIndexes(ctx, (*wm)[:0], srvr, conn, desc, maxTimeMS) - } - } if batching { targetBatchSize := desc.MaxDocumentSize @@ -830,7 +812,6 @@ func (op Operation) retryable(desc description.Server) bool { return true } if retryWritesSupported(desc) && - desc.WireVersion != nil && desc.WireVersion.Max >= 6 && op.Client != nil && !(op.Client.TransactionInProgress() || op.Client.TransactionStarting()) && writeconcern.AckWrite(op.WriteConcern) { return true @@ -839,8 +820,7 @@ func (op Operation) retryable(desc description.Server) bool { if op.Client != nil && (op.Client.Committing || op.Client.Aborting) { return true } - if desc.WireVersion != nil && desc.WireVersion.Max >= 6 && - (op.Client == nil || !(op.Client.TransactionInProgress() || op.Client.TransactionStarting())) { + if op.Client == nil || !(op.Client.TransactionInProgress() || op.Client.TransactionStarting()) { return true } } diff --git a/x/mongo/driver/operation_legacy.go b/x/mongo/driver/operation_legacy.go deleted file mode 100644 index bbb0cb054f..0000000000 --- a/x/mongo/driver/operation_legacy.go +++ /dev/null @@ -1,731 +0,0 @@ -// Copyright (C) MongoDB, Inc. 2017-present. -// -// Licensed under the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. You may obtain -// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 - -package driver - -import ( - "context" - "errors" - "strconv" - "time" - - "go.mongodb.org/mongo-driver/bson/bsontype" - "go.mongodb.org/mongo-driver/mongo/description" - "go.mongodb.org/mongo-driver/x/bsonx/bsoncore" - "go.mongodb.org/mongo-driver/x/mongo/driver/wiremessage" -) - -var ( - firstBatchIdentifier = "firstBatch" - nextBatchIdentifier = "nextBatch" - listCollectionsNamespace = "system.namespaces" - listIndexesNamespace = "system.indexes" - - // ErrFilterType is returned when the filter for a legacy list collections operation is of the wrong type. - ErrFilterType = errors.New("filter for list collections operation must be a string") -) - -func (op Operation) getFullCollectionName(coll string) string { - return op.Database + "." + coll -} - -func (op Operation) legacyFind(ctx context.Context, dst []byte, srvr Server, conn Connection, - desc description.SelectedServer, maxTimeMS uint64) error { - wm, startedInfo, collName, err := op.createLegacyFindWireMessage(dst, desc, maxTimeMS) - if err != nil { - return err - } - startedInfo.connID = conn.ID() - op.publishStartedEvent(ctx, startedInfo) - - finishedInfo := finishedInformation{ - cmdName: startedInfo.cmdName, - requestID: startedInfo.requestID, - startTime: time.Now(), - connID: startedInfo.connID, - } - - finishedInfo.response, finishedInfo.cmdErr = op.roundTripLegacyCursor(ctx, wm, srvr, conn, collName, firstBatchIdentifier) - op.publishFinishedEvent(ctx, finishedInfo) - - if finishedInfo.cmdErr != nil { - return finishedInfo.cmdErr - } - - if op.ProcessResponseFn != nil { - // CurrentIndex is always 0 in this mode. - info := ResponseInfo{ - ServerResponse: finishedInfo.response, - Server: srvr, - Connection: conn, - ConnectionDescription: desc.Server, - } - return op.ProcessResponseFn(info) - } - return nil -} - -// returns wire message, collection name, error -func (op Operation) createLegacyFindWireMessage(dst []byte, desc description.SelectedServer, maxTimeMS uint64) ([]byte, startedInformation, string, error) { - info := startedInformation{ - requestID: wiremessage.NextRequestID(), - cmdName: "find", - } - - // call CommandFn on an empty slice rather than dst because the options will need to be converted to legacy - var cmdDoc bsoncore.Document - var cmdIndex int32 - var err error - - cmdIndex, cmdDoc = bsoncore.AppendDocumentStart(cmdDoc) - cmdDoc, err = op.CommandFn(cmdDoc, desc) - if err != nil { - return dst, info, "", err - } - // If maxTimeMS is greater than 0 append it to wire message. A maxTimeMS value of 0 only explicitly - // specifies the default behavior of no timeout server-side. - if maxTimeMS > 0 { - cmdDoc = bsoncore.AppendInt64Element(cmdDoc, "maxTimeMS", int64(maxTimeMS)) - } - cmdDoc, _ = bsoncore.AppendDocumentEnd(cmdDoc, cmdIndex) - // for monitoring legacy events, the upconverted document should be captured rather than the legacy one - info.cmd = cmdDoc - - cmdElems, err := cmdDoc.Elements() - if err != nil { - return dst, info, "", err - } - - // take each option from the non-legacy command and convert it - // build options as a byte slice of elements rather than a bsoncore.Document because they will be appended - // to another document with $query - var optsElems []byte - flags := op.secondaryOK(desc) - var numToSkip, numToReturn, batchSize, limit int32 // numToReturn calculated from batchSize and limit - var filter, returnFieldsSelector bsoncore.Document - var collName string - var singleBatch bool - for _, elem := range cmdElems { - switch elem.Key() { - case "find": - collName = elem.Value().StringValue() - case "filter": - filter = elem.Value().Data - case "sort": - optsElems = bsoncore.AppendValueElement(optsElems, "$orderby", elem.Value()) - case "hint": - optsElems = bsoncore.AppendValueElement(optsElems, "$hint", elem.Value()) - case "comment": - optsElems = bsoncore.AppendValueElement(optsElems, "$comment", elem.Value()) - case "max": - optsElems = bsoncore.AppendValueElement(optsElems, "$max", elem.Value()) - case "min": - optsElems = bsoncore.AppendValueElement(optsElems, "$min", elem.Value()) - case "returnKey": - optsElems = bsoncore.AppendValueElement(optsElems, "$returnKey", elem.Value()) - case "showRecordId": - optsElems = bsoncore.AppendValueElement(optsElems, "$showDiskLoc", elem.Value()) - case "maxTimeMS": - optsElems = bsoncore.AppendValueElement(optsElems, "$maxTimeMS", elem.Value()) - case "snapshot": - optsElems = bsoncore.AppendValueElement(optsElems, "$snapshot", elem.Value()) - case "projection": - returnFieldsSelector = elem.Value().Data - case "skip": - // CRUD spec declares skip as int64 but numToSkip is int32 in OP_QUERY - numToSkip = int32(elem.Value().Int64()) - case "batchSize": - batchSize = elem.Value().Int32() - // Not possible to use batchSize = 1 because cursor will be closed on first batch - if batchSize == 1 { - batchSize = 2 - } - case "limit": - // CRUD spec declares limit as int64 but numToReturn is int32 in OP_QUERY - limit = int32(elem.Value().Int64()) - case "singleBatch": - singleBatch = elem.Value().Boolean() - case "tailable": - flags |= wiremessage.TailableCursor - case "awaitData": - flags |= wiremessage.AwaitData - case "oplogReplay": - flags |= wiremessage.OplogReplay - case "noCursorTimeout": - flags |= wiremessage.NoCursorTimeout - case "allowPartialResults": - flags |= wiremessage.Partial - } - } - - // for non-legacy servers, a negative limit is implemented as a positive limit + singleBatch = true - if singleBatch { - limit = limit * -1 - } - numToReturn = op.calculateNumberToReturn(limit, batchSize) - - // add read preference if needed - rp, err := op.createReadPref(desc, true) - if err != nil { - return dst, info, "", err - } - if len(rp) > 0 { - optsElems = bsoncore.AppendDocumentElement(optsElems, "$readPreference", rp) - } - - if len(filter) == 0 { - var fidx int32 - fidx, filter = bsoncore.AppendDocumentStart(filter) - filter, _ = bsoncore.AppendDocumentEnd(filter, fidx) - } - - var wmIdx int32 - wmIdx, dst = wiremessage.AppendHeaderStart(dst, info.requestID, 0, wiremessage.OpQuery) - dst = wiremessage.AppendQueryFlags(dst, flags) - dst = wiremessage.AppendQueryFullCollectionName(dst, op.getFullCollectionName(collName)) - dst = wiremessage.AppendQueryNumberToSkip(dst, numToSkip) - dst = wiremessage.AppendQueryNumberToReturn(dst, numToReturn) - dst = op.appendLegacyQueryDocument(dst, filter, optsElems) - if len(returnFieldsSelector) != 0 { - // returnFieldsSelector is optional - dst = append(dst, returnFieldsSelector...) - } - - return bsoncore.UpdateLength(dst, wmIdx, int32(len(dst[wmIdx:]))), info, collName, nil -} - -func (op Operation) calculateNumberToReturn(limit, batchSize int32) int32 { - var numToReturn int32 - - if limit < 0 { - numToReturn = limit - } else if limit == 0 { - numToReturn = batchSize - } else if batchSize == 0 { - numToReturn = limit - } else if limit < batchSize { - numToReturn = limit - } else { - numToReturn = batchSize - } - - return numToReturn -} - -func (op Operation) legacyGetMore(ctx context.Context, dst []byte, srvr Server, conn Connection, desc description.SelectedServer) error { - wm, startedInfo, collName, err := op.createLegacyGetMoreWiremessage(dst, desc) - if err != nil { - return err - } - - startedInfo.connID = conn.ID() - op.publishStartedEvent(ctx, startedInfo) - - finishedInfo := finishedInformation{ - cmdName: startedInfo.cmdName, - requestID: startedInfo.requestID, - startTime: time.Now(), - connID: startedInfo.connID, - } - finishedInfo.response, finishedInfo.cmdErr = op.roundTripLegacyCursor(ctx, wm, srvr, conn, collName, nextBatchIdentifier) - op.publishFinishedEvent(ctx, finishedInfo) - - if finishedInfo.cmdErr != nil { - return finishedInfo.cmdErr - } - - if op.ProcessResponseFn != nil { - // CurrentIndex is always 0 in this mode. - info := ResponseInfo{ - ServerResponse: finishedInfo.response, - Server: srvr, - Connection: conn, - ConnectionDescription: desc.Server, - } - return op.ProcessResponseFn(info) - } - return nil -} - -func (op Operation) createLegacyGetMoreWiremessage(dst []byte, desc description.SelectedServer) ([]byte, startedInformation, string, error) { - info := startedInformation{ - requestID: wiremessage.NextRequestID(), - cmdName: "getMore", - } - - var cmdDoc bsoncore.Document - var cmdIdx int32 - var err error - - cmdIdx, cmdDoc = bsoncore.AppendDocumentStart(cmdDoc) - cmdDoc, err = op.CommandFn(cmdDoc, desc) - if err != nil { - return dst, info, "", err - } - cmdDoc, _ = bsoncore.AppendDocumentEnd(cmdDoc, cmdIdx) - info.cmd = cmdDoc - - cmdElems, err := cmdDoc.Elements() - if err != nil { - return dst, info, "", err - } - - var cursorID int64 - var numToReturn int32 - var collName string - for _, elem := range cmdElems { - switch elem.Key() { - case "getMore": - cursorID = elem.Value().Int64() - case "collection": - collName = elem.Value().StringValue() - case "batchSize": - numToReturn = elem.Value().Int32() - } - } - - var wmIdx int32 - wmIdx, dst = wiremessage.AppendHeaderStart(dst, info.requestID, 0, wiremessage.OpGetMore) - dst = wiremessage.AppendGetMoreZero(dst) - dst = wiremessage.AppendGetMoreFullCollectionName(dst, op.getFullCollectionName(collName)) - dst = wiremessage.AppendGetMoreNumberToReturn(dst, numToReturn) - dst = wiremessage.AppendGetMoreCursorID(dst, cursorID) - - return bsoncore.UpdateLength(dst, wmIdx, int32(len(dst[wmIdx:]))), info, collName, nil -} - -func (op Operation) legacyKillCursors(ctx context.Context, dst []byte, srvr Server, conn Connection, desc description.SelectedServer) error { - wm, startedInfo, _, err := op.createLegacyKillCursorsWiremessage(dst, desc) - if err != nil { - return err - } - - startedInfo.connID = conn.ID() - op.publishStartedEvent(ctx, startedInfo) - - // skip startTime because OP_KILL_CURSORS does not return a response - finishedInfo := finishedInformation{ - cmdName: "killCursors", - requestID: startedInfo.requestID, - connID: startedInfo.connID, - } - - err = conn.WriteWireMessage(ctx, wm) - if err != nil { - err = Error{Message: err.Error(), Labels: []string{TransientTransactionError, NetworkError}} - if ep, ok := srvr.(ErrorProcessor); ok { - _ = ep.ProcessError(err, conn) - } - - finishedInfo.cmdErr = err - op.publishFinishedEvent(ctx, finishedInfo) - return err - } - - ridx, response := bsoncore.AppendDocumentStart(nil) - response = bsoncore.AppendInt32Element(response, "ok", 1) - response = bsoncore.AppendArrayElement(response, "cursorsUnknown", startedInfo.cmd.Lookup("cursors").Array()) - response, _ = bsoncore.AppendDocumentEnd(response, ridx) - - finishedInfo.response = response - op.publishFinishedEvent(ctx, finishedInfo) - return nil -} - -func (op Operation) createLegacyKillCursorsWiremessage(dst []byte, desc description.SelectedServer) ([]byte, startedInformation, string, error) { - info := startedInformation{ - cmdName: "killCursors", - requestID: wiremessage.NextRequestID(), - } - - var cmdDoc bsoncore.Document - var cmdIdx int32 - var err error - - cmdIdx, cmdDoc = bsoncore.AppendDocumentStart(cmdDoc) - cmdDoc, err = op.CommandFn(cmdDoc, desc) - if err != nil { - return nil, info, "", err - } - cmdDoc, _ = bsoncore.AppendDocumentEnd(cmdDoc, cmdIdx) - info.cmd = cmdDoc - - cmdElems, err := cmdDoc.Elements() - if err != nil { - return nil, info, "", err - } - - var collName string - var cursors bsoncore.Array - for _, elem := range cmdElems { - switch elem.Key() { - case "killCursors": - collName = elem.Value().StringValue() - case "cursors": - cursors = elem.Value().Array() - } - } - - var cursorIDs []int64 - if cursors != nil { - cursorValues, err := cursors.Values() - if err != nil { - return nil, info, "", err - } - - for _, cursorVal := range cursorValues { - cursorIDs = append(cursorIDs, cursorVal.Int64()) - } - } - - var wmIdx int32 - wmIdx, dst = wiremessage.AppendHeaderStart(dst, info.requestID, 0, wiremessage.OpKillCursors) - dst = wiremessage.AppendKillCursorsZero(dst) - dst = wiremessage.AppendKillCursorsNumberIDs(dst, int32(len(cursorIDs))) - dst = wiremessage.AppendKillCursorsCursorIDs(dst, cursorIDs) - - return bsoncore.UpdateLength(dst, wmIdx, int32(len(dst[wmIdx:]))), info, collName, nil -} - -func (op Operation) legacyListCollections(ctx context.Context, dst []byte, srvr Server, conn Connection, desc description.SelectedServer) error { - wm, startedInfo, collName, err := op.createLegacyListCollectionsWiremessage(dst, desc) - if err != nil { - return err - } - startedInfo.connID = conn.ID() - op.publishStartedEvent(ctx, startedInfo) - - finishedInfo := finishedInformation{ - cmdName: startedInfo.cmdName, - requestID: startedInfo.requestID, - startTime: time.Now(), - connID: startedInfo.connID, - } - - finishedInfo.response, finishedInfo.cmdErr = op.roundTripLegacyCursor(ctx, wm, srvr, conn, collName, firstBatchIdentifier) - op.publishFinishedEvent(ctx, finishedInfo) - - if finishedInfo.cmdErr != nil { - return finishedInfo.cmdErr - } - - if op.ProcessResponseFn != nil { - // CurrentIndex is always 0 in this mode. - info := ResponseInfo{ - ServerResponse: finishedInfo.response, - Server: srvr, - Connection: conn, - ConnectionDescription: desc.Server, - } - return op.ProcessResponseFn(info) - } - return nil -} - -func (op Operation) createLegacyListCollectionsWiremessage(dst []byte, desc description.SelectedServer) ([]byte, startedInformation, string, error) { - info := startedInformation{ - cmdName: "find", - requestID: wiremessage.NextRequestID(), - } - - var cmdDoc bsoncore.Document - var cmdIdx int32 - var err error - - cmdIdx, cmdDoc = bsoncore.AppendDocumentStart(cmdDoc) - if cmdDoc, err = op.CommandFn(cmdDoc, desc); err != nil { - return dst, info, "", err - } - cmdDoc, _ = bsoncore.AppendDocumentEnd(cmdDoc, cmdIdx) - info.cmd, err = op.convertCommandToFind(cmdDoc, listCollectionsNamespace) - if err != nil { - return nil, info, "", err - } - - // lookup filter directly instead of calling cmdDoc.Elements() because the only listCollections option is nameOnly, - // which doesn't apply to legacy servers - var originalFilter bsoncore.Document - if filterVal, err := cmdDoc.LookupErr("filter"); err == nil { - originalFilter = filterVal.Document() - } - - var optsElems []byte - filter, err := op.transformListCollectionsFilter(originalFilter) - if err != nil { - return dst, info, "", err - } - rp, err := op.createReadPref(desc, true) - if err != nil { - return dst, info, "", err - } - if len(rp) > 0 { - optsElems = bsoncore.AppendDocumentElement(optsElems, "$readPreference", rp) - } - - var batchSize int32 - if val, ok := cmdDoc.Lookup("cursor", "batchSize").AsInt32OK(); ok { - batchSize = val - } - - var wmIdx int32 - wmIdx, dst = wiremessage.AppendHeaderStart(dst, info.requestID, 0, wiremessage.OpQuery) - dst = wiremessage.AppendQueryFlags(dst, op.secondaryOK(desc)) - dst = wiremessage.AppendQueryFullCollectionName(dst, op.getFullCollectionName(listCollectionsNamespace)) - dst = wiremessage.AppendQueryNumberToSkip(dst, 0) - dst = wiremessage.AppendQueryNumberToReturn(dst, batchSize) - dst = op.appendLegacyQueryDocument(dst, filter, optsElems) - // leave out returnFieldsSelector because it is optional - - return bsoncore.UpdateLength(dst, wmIdx, int32(len(dst[wmIdx:]))), info, listCollectionsNamespace, nil -} - -func (op Operation) transformListCollectionsFilter(filter bsoncore.Document) (bsoncore.Document, error) { - // filter out results containing $ because those represent indexes - var regexFilter bsoncore.Document - var ridx int32 - ridx, regexFilter = bsoncore.AppendDocumentStart(regexFilter) - regexFilter = bsoncore.AppendRegexElement(regexFilter, "name", "^[^$]*$", "") - regexFilter, _ = bsoncore.AppendDocumentEnd(regexFilter, ridx) - - if len(filter) == 0 { - return regexFilter, nil - } - - convertedIdx, convertedFilter := bsoncore.AppendDocumentStart(nil) - elems, err := filter.Elements() - if err != nil { - return nil, err - } - - for _, elem := range elems { - if elem.Key() != "name" { - convertedFilter = append(convertedFilter, elem...) - continue - } - - // the name value in a filter for legacy list collections must be a string and has to be prepended - // with the database name - nameVal := elem.Value() - if nameVal.Type != bsontype.String { - return nil, ErrFilterType - } - convertedFilter = bsoncore.AppendStringElement(convertedFilter, "name", op.getFullCollectionName(nameVal.StringValue())) - } - convertedFilter, _ = bsoncore.AppendDocumentEnd(convertedFilter, convertedIdx) - - // combine regexFilter and convertedFilter with $and - var combinedFilter bsoncore.Document - var cidx, aidx int32 - cidx, combinedFilter = bsoncore.AppendDocumentStart(combinedFilter) - aidx, combinedFilter = bsoncore.AppendArrayElementStart(combinedFilter, "$and") - combinedFilter = bsoncore.AppendDocumentElement(combinedFilter, "0", regexFilter) - combinedFilter = bsoncore.AppendDocumentElement(combinedFilter, "1", convertedFilter) - combinedFilter, _ = bsoncore.AppendArrayEnd(combinedFilter, aidx) - combinedFilter, _ = bsoncore.AppendDocumentEnd(combinedFilter, cidx) - - return combinedFilter, nil -} - -func (op Operation) legacyListIndexes(ctx context.Context, dst []byte, srvr Server, conn Connection, - desc description.SelectedServer, maxTimeMS uint64) error { - wm, startedInfo, collName, err := op.createLegacyListIndexesWiremessage(dst, desc, maxTimeMS) - if err != nil { - return err - } - startedInfo.connID = conn.ID() - op.publishStartedEvent(ctx, startedInfo) - - finishedInfo := finishedInformation{ - cmdName: startedInfo.cmdName, - requestID: startedInfo.requestID, - startTime: time.Now(), - connID: startedInfo.connID, - } - - finishedInfo.response, finishedInfo.cmdErr = op.roundTripLegacyCursor(ctx, wm, srvr, conn, collName, firstBatchIdentifier) - op.publishFinishedEvent(ctx, finishedInfo) - - if finishedInfo.cmdErr != nil { - return finishedInfo.cmdErr - } - - if op.ProcessResponseFn != nil { - // CurrentIndex is always 0 in this mode. - info := ResponseInfo{ - ServerResponse: finishedInfo.response, - Server: srvr, - Connection: conn, - ConnectionDescription: desc.Server, - } - return op.ProcessResponseFn(info) - } - return nil -} - -func (op Operation) createLegacyListIndexesWiremessage(dst []byte, desc description.SelectedServer, maxTimeMS uint64) ([]byte, startedInformation, string, error) { - info := startedInformation{ - cmdName: "find", - requestID: wiremessage.NextRequestID(), - } - - var cmdDoc bsoncore.Document - var cmdIndex int32 - var err error - - cmdIndex, cmdDoc = bsoncore.AppendDocumentStart(cmdDoc) - cmdDoc, err = op.CommandFn(cmdDoc, desc) - if err != nil { - return dst, info, "", err - } - // If maxTimeMS is greater than 0 append it to wire message. A maxTimeMS value of 0 only explicitly - // specifies the default behavior of no timeout server-side. - if maxTimeMS > 0 { - cmdDoc = bsoncore.AppendInt64Element(cmdDoc, "maxTimeMS", int64(maxTimeMS)) - } - cmdDoc, _ = bsoncore.AppendDocumentEnd(cmdDoc, cmdIndex) - info.cmd, err = op.convertCommandToFind(cmdDoc, listIndexesNamespace) - if err != nil { - return nil, info, "", err - } - - cmdElems, err := cmdDoc.Elements() - if err != nil { - return nil, info, "", err - } - - var filterCollName string - var batchSize int32 - var optsElems []byte // options elements - for _, elem := range cmdElems { - switch elem.Key() { - case "listIndexes": - filterCollName = elem.Value().StringValue() - case "cursor": - // the batchSize option is embedded in a cursor subdocument - cursorDoc := elem.Value().Document() - if val, err := cursorDoc.LookupErr("batchSize"); err == nil { - batchSize = val.Int32() - } - case "maxTimeMS": - optsElems = bsoncore.AppendValueElement(optsElems, "$maxTimeMS", elem.Value()) - } - } - - // always filter with {ns: db.collection} - fidx, filter := bsoncore.AppendDocumentStart(nil) - filter = bsoncore.AppendStringElement(filter, "ns", op.getFullCollectionName(filterCollName)) - filter, _ = bsoncore.AppendDocumentEnd(filter, fidx) - - rp, err := op.createReadPref(desc, true) - if err != nil { - return dst, info, "", err - } - if len(rp) > 0 { - optsElems = bsoncore.AppendDocumentElement(optsElems, "$readPreference", rp) - } - - var wmIdx int32 - wmIdx, dst = wiremessage.AppendHeaderStart(dst, info.requestID, 0, wiremessage.OpQuery) - dst = wiremessage.AppendQueryFlags(dst, op.secondaryOK(desc)) - dst = wiremessage.AppendQueryFullCollectionName(dst, op.getFullCollectionName(listIndexesNamespace)) - dst = wiremessage.AppendQueryNumberToSkip(dst, 0) - dst = wiremessage.AppendQueryNumberToReturn(dst, batchSize) - dst = op.appendLegacyQueryDocument(dst, filter, optsElems) - // leave out returnFieldsSelector because it is optional - - return bsoncore.UpdateLength(dst, wmIdx, int32(len(dst[wmIdx:]))), info, listIndexesNamespace, nil -} - -// convertCommandToFind takes a non-legacy command document for a command that needs to be run as a find on legacy -// servers and converts it to a find command document for APM. -func (op Operation) convertCommandToFind(cmdDoc bsoncore.Document, collName string) (bsoncore.Document, error) { - cidx, converted := bsoncore.AppendDocumentStart(nil) - elems, err := cmdDoc.Elements() - if err != nil { - return nil, err - } - - converted = bsoncore.AppendStringElement(converted, "find", collName) - // skip the first element because that will have the old command name - for i := 1; i < len(elems); i++ { - converted = bsoncore.AppendValueElement(converted, elems[i].Key(), elems[i].Value()) - } - - converted, _ = bsoncore.AppendDocumentEnd(converted, cidx) - return converted, nil -} - -// appendLegacyQueryDocument takes a filter and a list of options elements for a legacy find operation, creates -// a query document, and appends it to dst. -func (op Operation) appendLegacyQueryDocument(dst []byte, filter bsoncore.Document, opts []byte) []byte { - if len(opts) == 0 { - dst = append(dst, filter...) - return dst - } - - // filter must be wrapped in $query if other $-modifiers are used - var qidx int32 - qidx, dst = bsoncore.AppendDocumentStart(dst) - dst = bsoncore.AppendDocumentElement(dst, "$query", filter) - dst = append(dst, opts...) - dst, _ = bsoncore.AppendDocumentEnd(dst, qidx) - return dst -} - -// roundTripLegacyCursor sends a wiremessage for an operation expecting a cursor result and converts the legacy -// document sequence into a cursor document. -func (op Operation) roundTripLegacyCursor(ctx context.Context, wm []byte, srvr Server, conn Connection, collName, identifier string) (bsoncore.Document, error) { - wm, err := op.roundTripLegacy(ctx, conn, wm) - if ep, ok := srvr.(ErrorProcessor); ok { - _ = ep.ProcessError(err, conn) - } - if err != nil { - return nil, err - } - - return op.upconvertCursorResponse(wm, identifier, collName) -} - -// roundTripLegacy handles writing a wire message and reading the response. -func (op Operation) roundTripLegacy(ctx context.Context, conn Connection, wm []byte) ([]byte, error) { - err := conn.WriteWireMessage(ctx, wm) - if err != nil { - return nil, Error{Message: err.Error(), Labels: []string{TransientTransactionError, NetworkError}, Wrapped: err} - } - - wm, err = conn.ReadWireMessage(ctx, wm[:0]) - if err != nil { - err = Error{Message: err.Error(), Labels: []string{TransientTransactionError, NetworkError}, Wrapped: err} - } - return wm, err -} - -func (op Operation) upconvertCursorResponse(wm []byte, batchIdentifier string, collName string) (bsoncore.Document, error) { - reply := op.decodeOpReply(wm, true) - if reply.err != nil { - return nil, reply.err - } - - cursorIdx, cursorDoc := bsoncore.AppendDocumentStart(nil) - // convert reply documents to BSON array - var arrIdx int32 - arrIdx, cursorDoc = bsoncore.AppendArrayElementStart(cursorDoc, batchIdentifier) - for i, doc := range reply.documents { - cursorDoc = bsoncore.AppendDocumentElement(cursorDoc, strconv.Itoa(i), doc) - } - cursorDoc, _ = bsoncore.AppendArrayEnd(cursorDoc, arrIdx) - - cursorDoc = bsoncore.AppendInt64Element(cursorDoc, "id", reply.cursorID) - cursorDoc = bsoncore.AppendStringElement(cursorDoc, "ns", op.getFullCollectionName(collName)) - cursorDoc, _ = bsoncore.AppendDocumentEnd(cursorDoc, cursorIdx) - - resIdx, resDoc := bsoncore.AppendDocumentStart(nil) - resDoc = bsoncore.AppendInt32Element(resDoc, "ok", 1) - resDoc = bsoncore.AppendDocumentElement(resDoc, "cursor", cursorDoc) - resDoc, _ = bsoncore.AppendDocumentEnd(resDoc, resIdx) - - return resDoc, nil -}