diff --git a/agent/agents/mongodb/internal/profiler/aggregator/aggregator.go b/agent/agents/mongodb/internal/profiler/aggregator/aggregator.go index 9b80c9d728..dc0f4cc39d 100644 --- a/agent/agents/mongodb/internal/profiler/aggregator/aggregator.go +++ b/agent/agents/mongodb/internal/profiler/aggregator/aggregator.go @@ -22,11 +22,11 @@ import ( "sync" "time" - "github.com/percona/percona-toolkit/src/go/mongolib/fingerprinter" "github.com/percona/percona-toolkit/src/go/mongolib/proto" mongostats "github.com/percona/percona-toolkit/src/go/mongolib/stats" "github.com/sirupsen/logrus" + "github.com/percona/pmm/agent/agents/mongodb/internal/profiler/fingerprinter" "github.com/percona/pmm/agent/agents/mongodb/internal/report" "github.com/percona/pmm/agent/utils/truncate" agentv1 "github.com/percona/pmm/api/agent/v1" diff --git a/agent/agents/mongodb/internal/profiler/aggregator/aggregator_test.go b/agent/agents/mongodb/internal/profiler/aggregator/aggregator_test.go index 4cf8980c58..f9f5c974b2 100644 --- a/agent/agents/mongodb/internal/profiler/aggregator/aggregator_test.go +++ b/agent/agents/mongodb/internal/profiler/aggregator/aggregator_test.go @@ -66,7 +66,7 @@ func TestAggregator(t *testing.T) { { Common: &agentv1.MetricsBucket_Common{ Queryid: result.Buckets[0].Common.Queryid, - Fingerprint: "INSERT people", + Fingerprint: "db.people.insert(?)", Database: "collection", Tables: []string{"people"}, AgentId: agentID, @@ -129,7 +129,7 @@ func TestAggregator(t *testing.T) { { Common: &agentv1.MetricsBucket_Common{ Queryid: result.Buckets[0].Common.Queryid, - Fingerprint: "FIND people name_\ufffd", + Fingerprint: "db.people.find({\"name_\\ufffd\":\"?\"})", Database: "collection", Tables: []string{"people"}, AgentId: agentID, diff --git a/agent/agents/mongodb/internal/profiler/fingerprinter/fingerprinter.go b/agent/agents/mongodb/internal/profiler/fingerprinter/fingerprinter.go new file mode 100644 index 0000000000..2e479ce20c --- /dev/null +++ b/agent/agents/mongodb/internal/profiler/fingerprinter/fingerprinter.go @@ -0,0 +1,249 @@ +package fingerprinter + +import ( + "encoding/json" + "errors" + "fmt" + "github.com/percona/percona-toolkit/src/go/mongolib/fingerprinter" + "github.com/percona/percona-toolkit/src/go/mongolib/proto" + "go.mongodb.org/mongo-driver/bson" + "strings" +) + +// ProfilerFingerprinter holds any necessary configuration or dependencies. +type ProfilerFingerprinter struct { + keyFilters []string + // Add fields here if you need to configure the fingerprinter +} + +// NewFingerprinter creates a new instance of ProfilerFingerprinter. +func NewFingerprinter(keyFilters []string) *ProfilerFingerprinter { + return &ProfilerFingerprinter{ + keyFilters: keyFilters, + } +} + +// Fingerprint generates a unique MongoDB command fingerprint from profiler output. +func (pf *ProfilerFingerprinter) Fingerprint(doc proto.SystemProfile) (fingerprinter.Fingerprint, error) { + fp := fingerprinter.Fingerprint{ + Namespace: doc.Ns, + Operation: doc.Op, + } + + // Parse the namespace to separate database and collection names + parts := strings.Split(doc.Ns, ".") + if len(parts) < 2 { + return fp, errors.New("invalid namespace format") + } + fp.Database = parts[0] + fp.Collection = parts[1] + + // Select operation type and build command with optional fields + switch doc.Op { + case "query": + return pf.fingerprintFind(fp, doc) + case "insert": + return pf.fingerprintInsert(fp) + case "update": + return pf.fingerprintUpdate(fp, doc) + case "delete", "remove": + return pf.fingerprintDelete(fp, doc) + case "command": + return pf.fingerprintCommand(fp, doc) + default: + return pf.fingerprintCommand(fp, doc) + } +} + +// Helper for find operations with optional parameters. +func (pf *ProfilerFingerprinter) fingerprintFind(fp fingerprinter.Fingerprint, doc proto.SystemProfile) (fingerprinter.Fingerprint, error) { + filter := "" + command := doc.Command.Map() + if f, ok := command["filter"]; ok { + values := maskValues(f, make(map[string]maskOption)) + filterJSON, _ := json.Marshal(values) + filter = string(filterJSON) + } + + // Initialize mongosh command with required fields + fp.Fingerprint = fmt.Sprintf(`db.%s.find(%s`, fp.Collection, filter) + fp.Keys = filter + + // Optional fields for find command + if command["project"] != nil { + projectionJSON, _ := json.Marshal(command["project"]) + fp.Fingerprint += fmt.Sprintf(`, %s`, projectionJSON) + } + fp.Fingerprint += ")" + + if sort, ok := command["sort"]; ok { + sortJSON, _ := json.Marshal(sort.(bson.D).Map()) + fp.Fingerprint += fmt.Sprintf(`.sort(%s)`, sortJSON) + } + if _, ok := command["limit"]; ok { + fp.Fingerprint += `.limit(?)` + } + if _, ok := command["skip"]; ok { + fp.Fingerprint += `.skip(?)` + } + if batchSize, ok := command["batchSize"]; ok { + fp.Fingerprint += fmt.Sprintf(`.batchSize(%d)`, batchSize) + } + + return fp, nil +} + +// Helper for insert operations +func (pf *ProfilerFingerprinter) fingerprintInsert(fp fingerprinter.Fingerprint) (fingerprinter.Fingerprint, error) { + fp.Fingerprint = fmt.Sprintf(`db.%s.insert(?)`, fp.Collection) + return fp, nil +} + +// Helper for update operations +func (pf *ProfilerFingerprinter) fingerprintUpdate(fp fingerprinter.Fingerprint, doc proto.SystemProfile) (fingerprinter.Fingerprint, error) { + command := doc.Command.Map() + filterJSON, _ := json.Marshal(maskValues(command["q"].(bson.D), make(map[string]maskOption))) + updateJSON, _ := json.Marshal(maskValues(command["u"].(bson.D), make(map[string]maskOption))) + + fp.Fingerprint = fmt.Sprintf(`db.%s.update(%s, %s`, fp.Collection, filterJSON, updateJSON) + fp.Keys = string(filterJSON) + + if command["upsert"] == true || command["multi"] == true { + options := map[string]interface{}{} + if command["upsert"] == true { + options["upsert"] = true + } + if command["multi"] == true { + options["multi"] = true + } + optionsJSON, _ := json.Marshal(options) + fp.Fingerprint += fmt.Sprintf(`, %s`, optionsJSON) + } + fp.Fingerprint += ")" + + return fp, nil +} + +// Helper for delete operations +func (pf *ProfilerFingerprinter) fingerprintDelete(fp fingerprinter.Fingerprint, doc proto.SystemProfile) (fingerprinter.Fingerprint, error) { + + command := doc.Command.Map() + method := "deleteMany" + if limit, ok := command["limit"]; ok && limit == int32(1) { + method = "deleteOne" + } + filterJSON, _ := json.Marshal(maskValues(command["q"], make(map[string]maskOption))) + fp.Fingerprint = fmt.Sprintf(`db.%s.%s(%s)`, fp.Collection, method, filterJSON) + fp.Keys = string(filterJSON) + return fp, nil +} + +// Helper for general command operations, including support for "aggregate" commands +func (pf *ProfilerFingerprinter) fingerprintCommand(fp fingerprinter.Fingerprint, doc proto.SystemProfile) (fingerprinter.Fingerprint, error) { + // Unmarshal the command into a map for easy access and manipulation + command := doc.Command.Map() + + maskOptions := map[string]maskOption{ + "$db": {remove: true}, + "$readPreference": {remove: true}, + "$readConcern": {remove: true}, + "$writeConcern": {remove: true}, + "$clusterTime": {remove: true}, + "$oplogQueryData": {remove: true}, + "$replData": {remove: true}, + "lastKnownCommittedOpTime": {remove: true}, + "lsid": {remove: true}, + "findAndModify": {skipMask: true}, + "remove": {skipMask: true}, + } + if _, exists := command["aggregate"]; exists { + // Set collection and initialize aggregation structure + fp.Fingerprint = fmt.Sprintf(`db.%s.aggregate([`, fp.Collection) + stageStrings := []string{} + + // Process pipeline stages, replacing all values with "?" + if pipeline, exists := command["pipeline"]; exists { + pipelineStages, _ := pipeline.(bson.A) + + for _, stage := range pipelineStages { + stageMap := stage.(bson.D).Map() + var stageJSON []byte + switch { + case stageMap["$match"] != nil: + stageJSON, _ = json.Marshal(maskValues(stageMap, maskOptions)) + default: + stageJSON, _ = bson.MarshalExtJSON(stageMap, false, false) + } + + stageStrings = append(stageStrings, string(stageJSON)) + } + + fp.Fingerprint += strings.Join(stageStrings, ", ") + } + fp.Fingerprint += "])" + if collation, exists := command["collation"]; exists { + collationMasked, _ := json.Marshal(maskValues(collation, maskOptions)) + fp.Fingerprint += fmt.Sprintf(`, collation: %s`, collationMasked) + } + + // Build a descriptive Keys field + fp.Keys = strings.Join(stageStrings, ", ") + } else { + // Handle other commands generically + commandMasked, _ := json.Marshal(maskValues(doc.Command, maskOptions)) + fp.Fingerprint = fmt.Sprintf(`db.runCommand(%s)`, commandMasked) + fp.Keys = string(commandMasked) + } + + return fp, nil +} + +type maskOption struct { + remove bool + skipMask bool +} + +// maskValues replaces all values within a map or slice with "?" recursively and removes keys in the filter. +func maskValues(data interface{}, options map[string]maskOption) interface{} { + switch v := data.(type) { + case bson.D: + masked := make(bson.M) + for _, value := range v { + option, ok := options[value.Key] + switch { + case ok && option.remove: + continue + case ok && option.skipMask: + masked[value.Key] = value.Value + default: + masked[value.Key] = maskValues(value.Value, options) + } + } + return masked + case bson.M: + masked := make(bson.M) + for key, value := range v { + option, ok := options[key] + switch { + case ok && option.remove: + continue + case ok && option.skipMask: + masked[key] = value + default: + masked[key] = maskValues(value, options) + } + } + return masked + case bson.A: + for i := range v { + v[i] = maskValues(v[i], options) + } + return v + default: + return "?" + } +} + +func DefaultKeyFilters() []string { + return []string{} +} diff --git a/agent/agents/mongodb/internal/profiler/fingerprinter/fingerprinter_test.go b/agent/agents/mongodb/internal/profiler/fingerprinter/fingerprinter_test.go new file mode 100644 index 0000000000..dc302665d6 --- /dev/null +++ b/agent/agents/mongodb/internal/profiler/fingerprinter/fingerprinter_test.go @@ -0,0 +1,268 @@ +package fingerprinter + +import ( + "context" + "encoding/json" + "fmt" + "github.com/percona/percona-toolkit/src/go/mongolib/fingerprinter" + "github.com/percona/percona-toolkit/src/go/mongolib/proto" + "github.com/percona/pmm/agent/utils/mongo_fix" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/bson/primitive" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" + "go.mongodb.org/mongo-driver/mongo/readpref" + "log" + "testing" + "time" +) + +const ( + MgoTimeoutDialInfo = 5 * time.Second + MgoTimeoutSessionSync = 5 * time.Second + MgoTimeoutSessionSocket = 5 * time.Second +) + +func createQuery(dbName string, startTime time.Time) bson.M { + return bson.M{ + "ns": bson.M{"$ne": dbName + ".system.profile"}, + "ts": bson.M{"$gt": startTime}, + } +} + +func createIterator(ctx context.Context, collection *mongo.Collection, query bson.M) (*mongo.Cursor, error) { + opts := options.Find().SetSort(bson.M{"$natural": 1}).SetCursorType(options.TailableAwait) + return collection.Find(ctx, query, opts) +} + +type ProfilerStatus struct { + Was int64 `bson:"was"` + SlowMs int64 `bson:"slowms"` + GleStats struct { + ElectionID string `bson:"electionId"` + LastOpTime int64 `bson:"lastOpTime"` + } `bson:"$gleStats"` +} + +func createSession(dsn string, agentID string) (*mongo.Client, error) { + ctx, cancel := context.WithTimeout(context.Background(), MgoTimeoutDialInfo) + defer cancel() + + opts, err := mongo_fix.ClientOptionsForDSN(dsn) + if err != nil { + return nil, err + } + + opts = opts. + SetDirect(true). + SetReadPreference(readpref.Nearest()). + SetSocketTimeout(MgoTimeoutSessionSocket). + SetAppName(fmt.Sprintf("QAN-mongodb-profiler-%s", agentID)) + + client, err := mongo.Connect(ctx, opts) + if err != nil { + return nil, err + } + + return client, nil +} + +func TestProfilerFingerprinter(t *testing.T) { + t.Run("CheckWithRealDB", func(t *testing.T) { + url := "mongodb://root:root-password@127.0.0.1:27017" + dbName := "test_fingerprint" + + client, err := createSession(url, "pmm-agent") + if err != nil { + return + } + + ctx, cancel := context.WithTimeout(context.Background(), MgoTimeoutSessionSync) + defer cancel() + _ = client.Database(dbName).Drop(ctx) + defer client.Database(dbName).Drop(context.TODO()) //nolint:errcheck + + ps := ProfilerStatus{} + err = client.Database("admin").RunCommand(ctx, primitive.M{"profile": -1}).Decode(&ps) + defer func() { // restore profiler status + client.Database("admin").RunCommand(ctx, primitive.D{{"profile", ps.Was}, {"slowms", ps.SlowMs}}) + }() + + // Enable profilling all queries (2, slowms = 0) + res := client.Database("admin").RunCommand(ctx, primitive.D{{"profile", 2}, {"slowms", 0}}) + if res.Err() != nil { + return + } + + database := client.Database(dbName) + _, err = database.Collection("test").InsertOne(ctx, bson.M{"id": 0, "name": "test", "value": 1, "time": time.Now()}) + assert.NoError(t, err) + _, err = database.Collection("secondcollection").InsertOne(ctx, bson.M{"id": 0, "name": "sec", "value": 2}) + assert.NoError(t, err) + database.Collection("test").FindOne(ctx, bson.M{"id": 0}) + database.Collection("test").FindOne(ctx, bson.M{"id": 1, "name": "test", "time": time.Now()}) + database.Collection("test").FindOneAndUpdate(ctx, bson.M{"id": 0}, bson.M{"$set": bson.M{"name": "new"}}) + database.Collection("test").FindOneAndDelete(ctx, bson.M{"id": 1}) + database.Collection("secondcollection").Find(ctx, bson.M{"name": "sec"}, options.Find().SetLimit(1).SetSort(bson.M{"id": -1})) + database.Collection("test").Aggregate(ctx, + []bson.M{ + { + "$match": bson.M{"id": 0, "time": bson.M{"$gt": time.Now().Add(-time.Hour)}}, + }, + { + "$group": bson.M{"_id": "$id", "count": bson.M{"$sum": 1}}, + }, + { + "$sort": bson.M{"_id": 1}, + }, + }, + ) + database.Collection("secondcollection").Aggregate(ctx, mongo.Pipeline{ + bson.D{ + { + Key: "$collStats", Value: bson.M{ + // TODO: PMM-9568 : Add support to handle histogram metrics + "latencyStats": bson.M{"histograms": false}, + "storageStats": bson.M{"scale": 1}, + }, + }, + }, bson.D{ + { + Key: "$project", Value: bson.M{ + "storageStats.wiredTiger": 0, + "storageStats.indexDetails": 0, + }, + }, + }}) + database.Collection("secondcollection").DeleteOne(ctx, bson.M{"id": 0}) + database.Collection("test").DeleteMany(ctx, bson.M{"name": "test"}) + profilerCollection := database.Collection("system.profile") + query := createQuery(dbName, time.Now().Add(-10*time.Minute)) + + cursor, err := createIterator(ctx, profilerCollection, query) + require.NoError(t, err) + // do not cancel cursor closing when ctx is canceled + defer cursor.Close(context.Background()) //nolint:errcheck + + pf := &ProfilerFingerprinter{} + + fingerprints := make([]string, 0) + for cursor.TryNext(ctx) { + doc := proto.SystemProfile{} + e := cursor.Decode(&doc) + require.NoError(t, e) + + b := bson.M{} + e = cursor.Decode(&b) + require.NoError(t, e) + + marshal, e := json.Marshal(b) + require.NoError(t, e) + log.Println(string(marshal)) + + fingerprint, err := pf.Fingerprint(doc) + require.NoError(t, err) + require.NotNil(t, fingerprint) + fingerprints = append(fingerprints, fingerprint.Fingerprint) + } + assert.NotEmpty(t, fingerprints) + expectedFingerprints := []string{ + `db.test.insert(?)`, + `db.secondcollection.insert(?)`, + `db.test.find({"id":"?"}).limit(?)`, + `db.test.find({"id":"?","name":"?","time":"?"}).limit(?)`, + `db.runCommand({"findAndModify":"test","query":{"id":"?"},"update":{"$set":{"name":"?"}}})`, + `db.runCommand({"findAndModify":"test","query":{"id":"?"},"remove":true})`, + `db.secondcollection.find({"name":"?"}).sort({"id":-1}).limit(?)`, + `db.test.aggregate([{"$match":{"id":"?","time":{"$gt":"?"}}}, {"$group":{"_id":"$id","count":{"$sum":1}}}, {"$sort":{"_id":1}}])`, + `db.secondcollection.aggregate([{"$collStats":{"latencyStats":{"histograms":false},"storageStats":{"scale":1}}}, {"$project":{"storageStats.wiredTiger":0,"storageStats.indexDetails":0}}])`, + `db.secondcollection.deleteOne({"id":"?"})`, + `db.test.deleteMany({"name":"?"})`, + } + assert.EqualValues(t, expectedFingerprints, fingerprints) + }) + + type testCase struct { + name string + doc proto.SystemProfile + want fingerprinter.Fingerprint + } + tests := []testCase{ + { + name: "find", + doc: proto.SystemProfile{ + Ns: "test.collection", + Op: "query", + Command: bson.D{{Key: "filter", Value: bson.D{{Key: "name", Value: "test"}}}, {Key: "sort", Value: bson.D{{Key: "_id", Value: 1}}}, {Key: "limit", Value: 4}, {Key: "skip", Value: 5}}, + }, + want: fingerprinter.Fingerprint{ + Fingerprint: `db.collection.find({"name":"?"}).sort({"_id":1}).limit(?).skip(?)`, + Namespace: "test.collection", + Database: "test", + Collection: "collection", + Operation: "query", + }, + }, + { + name: "insert", + doc: proto.SystemProfile{ + Ns: "test.insert_collection", + Op: "insert", + Command: bson.D{}, + }, + want: fingerprinter.Fingerprint{ + Fingerprint: `db.insert_collection.insert(?)`, + Namespace: "test.insert_collection", + Database: "test", + Collection: "insert_collection", + Operation: "insert", + }, + }, + { + name: "update", + doc: proto.SystemProfile{ + Ns: "test.update_collection", + Op: "update", + Command: bson.D{{Key: "q", Value: bson.D{{Key: "name", Value: "test"}}}, {Key: "u", Value: bson.D{{Key: "$set", Value: bson.D{{Key: "name", Value: "new"}}}}}}, + }, + want: fingerprinter.Fingerprint{ + Fingerprint: `db.update_collection.update({"name":"?"}, {"$set":{"name":"?"}})`, + Namespace: "test.update_collection", + Database: "test", + Collection: "update_collection", + Operation: "update", + }, + }, + { + name: "delete", + doc: proto.SystemProfile{ + Ns: "test.delete_collection", + Op: "remove", + Command: bson.D{{Key: "q", Value: bson.D{{Key: "name", Value: "test"}}}}, + }, + want: fingerprinter.Fingerprint{ + Fingerprint: `db.delete_collection.deleteMany({"name":"?"})`, + Namespace: "test.delete_collection", + Database: "test", + Collection: "delete_collection", + Operation: "remove", + }, + }, + } + for _, tt := range tests { + + t.Run(tt.name, func(t *testing.T) { + pf := &ProfilerFingerprinter{} + fingerprint, err := pf.Fingerprint(tt.doc) + require.NoError(t, err) + require.NotNil(t, fingerprint) + assert.Equal(t, tt.want.Fingerprint, fingerprint.Fingerprint) + assert.Equal(t, tt.want.Namespace, fingerprint.Namespace) + assert.Equal(t, tt.want.Database, fingerprint.Database) + assert.Equal(t, tt.want.Collection, fingerprint.Collection) + assert.Equal(t, tt.want.Operation, fingerprint.Operation) + }) + } +} diff --git a/agent/agents/mongodb/internal/profiler/profiler_test.go b/agent/agents/mongodb/internal/profiler/profiler_test.go index c4c4802243..717b0a9b02 100644 --- a/agent/agents/mongodb/internal/profiler/profiler_test.go +++ b/agent/agents/mongodb/internal/profiler/profiler_test.go @@ -143,7 +143,7 @@ func testProfiler(t *testing.T, url string) { for _, r := range ms.reports { for _, bucket := range r.Buckets { switch bucket.Common.Fingerprint { - case "INSERT people": + case "db.people.insert(?)": key := fmt.Sprintf("%s:%s", bucket.Common.Database, bucket.Common.Fingerprint) if b, ok := bucketsMap[key]; ok { b.Mongodb.MDocsReturnedCnt += bucket.Mongodb.MDocsReturnedCnt @@ -153,24 +153,15 @@ func testProfiler(t *testing.T, url string) { } else { bucketsMap[key] = bucket } - case "FIND people name_00\ufffd": + case `db.people.find({"name_00\ufffd":"?"})`: findBucket = bucket + default: + t.Logf("unknown fingerprint: %s", bucket.Common.Fingerprint) } } } - version, err := GetMongoVersion(context.TODO(), sess) - require.NoError(t, err) - - var responseLength float32 - switch version { - case "3.4": - responseLength = 44 - case "3.6": - responseLength = 29 - default: - responseLength = 45 - } + responseLength := float32(45) assert.Equal(t, dbsCount, len(bucketsMap)) // 300 sample docs / 10 = different database names var buckets []*agentv1.MetricsBucket @@ -182,7 +173,7 @@ func testProfiler(t *testing.T, url string) { }) for i, bucket := range buckets { assert.Equal(t, bucket.Common.Database, fmt.Sprintf("test_%02d", i)) - assert.Equal(t, "INSERT people", bucket.Common.Fingerprint) + assert.Equal(t, "db.people.insert(?)", bucket.Common.Fingerprint) assert.Equal(t, []string{"people"}, bucket.Common.Tables) assert.Equal(t, "test-id", bucket.Common.AgentId) assert.Equal(t, inventoryv1.AgentType(10), bucket.Common.AgentType) @@ -205,7 +196,7 @@ func testProfiler(t *testing.T, url string) { assert.Equalf(t, expected.MDocsScannedCnt, bucket.Mongodb.MDocsScannedCnt, "wrong metrics for db %s", bucket.Common.Database) } require.NotNil(t, findBucket) - assert.Equal(t, "FIND people name_00\ufffd", findBucket.Common.Fingerprint) + assert.Equal(t, `db.people.find({"name_00\ufffd":"?"})`, findBucket.Common.Fingerprint) assert.Equal(t, docsCount, findBucket.Mongodb.MDocsReturnedSum) }