From 276f834c9879327ed0dc8bfc2283f54ae54e11a9 Mon Sep 17 00:00:00 2001 From: Matt Kneiser Date: Wed, 8 Mar 2023 11:46:08 -0800 Subject: [PATCH 01/19] Skunkworks - Import CSV into TimeSeries collection --- mongoimport/mongoimport.go | 13 +++++++++++++ mongoimport/options.go | 4 ++++ 2 files changed, 17 insertions(+) diff --git a/mongoimport/mongoimport.go b/mongoimport/mongoimport.go index 1d8529ed6..903549b22 100644 --- a/mongoimport/mongoimport.go +++ b/mongoimport/mongoimport.go @@ -15,8 +15,10 @@ import ( "github.com/mongodb/mongo-tools/common/util" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" + moptions "go.mongodb.org/mongo-driver/mongo/options" "gopkg.in/tomb.v2" + "context" "fmt" "io" "os" @@ -386,6 +388,17 @@ func (imp *MongoImport) importDocuments(inputReader InputReader) (uint64, uint64 } } + if imp.IngestOptions.TimeSeries != "" { + log.Logvf(log.Always, "creating TimeSeries collection: %v.%v", + imp.ToolOptions.Namespace.DB, + imp.ToolOptions.Namespace.Collection) + timeseriesOptions := moptions.TimeSeries() + timeseriesOptions.SetTimeField(imp.IngestOptions.TimeSeries) + collectionOptions := moptions.CreateCollection().SetTimeSeriesOptions(timeseriesOptions) + session.Database(imp.ToolOptions.DB).CreateCollection(context.TODO(), imp.ToolOptions.Namespace.Collection, collectionOptions) + + } + readDocs := make(chan bson.D, workerBufferSize) processingErrChan := make(chan error) ordered := imp.IngestOptions.MaintainInsertionOrder diff --git a/mongoimport/options.go b/mongoimport/options.go index 2ddd32a4c..51e568028 100644 --- a/mongoimport/options.go +++ b/mongoimport/options.go @@ -102,6 +102,10 @@ type IngestOptions struct { NumDecodingWorkers int `long:"numDecodingWorkers" default:"0" hidden:"true"` BulkBufferSize int `long:"batchSize" default:"1000" hidden:"true"` + + // Creates the target collection as a TimeSeries collection using the value given + // as the TimeField. + TimeSeries string `long:"timeseries" value-name:"time-field" description:"Creates target collection as TimeSeries with this field as the timeField e.g. --timeseries='timestamp'"` } // Name returns a description of the IngestOptions struct. From 143ae8c986a9451da060832d7a3c007703cdcb8e Mon Sep 17 00:00:00 2001 From: Matt Kneiser Date: Wed, 8 Mar 2023 12:44:13 -0800 Subject: [PATCH 02/19] Adds metaField & granularity, along with some basic validation --- mongoimport/mongoimport.go | 26 ++++++++++++++++++++++++-- mongoimport/options.go | 8 +++++++- 2 files changed, 31 insertions(+), 3 deletions(-) diff --git a/mongoimport/mongoimport.go b/mongoimport/mongoimport.go index 903549b22..ede773940 100644 --- a/mongoimport/mongoimport.go +++ b/mongoimport/mongoimport.go @@ -208,6 +208,22 @@ func (imp *MongoImport) validateSettings(args []string) error { imp.IngestOptions.Mode = modeUpsert } + // Validate TimeSeries Options + if imp.IngestOptions.TimeSeriesTimeField == "" { + if (imp.IngestOptions.TimeSeriesMetaField != "" || + imp.IngestOptions.TimeSeriesGranularity != "") { + return fmt.Errorf("can not use --timeseries-metafield nor --timeseries-granularity without --timeseries-timefield") + } + } + + if imp.IngestOptions.TimeSeriesGranularity != "" { + if (imp.IngestOptions.TimeSeriesGranularity != "seconds" || + imp.IngestOptions.TimeSeriesGranularity != "minutes" || + imp.IngestOptions.TimeSeriesGranularity != "hours") { + return fmt.Errorf("--timeseries-granularity must be one of: seconds, minutes, hours") + } + } + // parse UpsertFields, may set default mode to modeUpsert if imp.IngestOptions.UpsertFields != "" { if imp.IngestOptions.Mode == "" { @@ -388,12 +404,18 @@ func (imp *MongoImport) importDocuments(inputReader InputReader) (uint64, uint64 } } - if imp.IngestOptions.TimeSeries != "" { + if imp.IngestOptions.TimeSeriesTimeField != "" { log.Logvf(log.Always, "creating TimeSeries collection: %v.%v", imp.ToolOptions.Namespace.DB, imp.ToolOptions.Namespace.Collection) timeseriesOptions := moptions.TimeSeries() - timeseriesOptions.SetTimeField(imp.IngestOptions.TimeSeries) + timeseriesOptions.SetTimeField(imp.IngestOptions.TimeSeriesTimeField) + if imp.IngestOptions.TimeSeriesMetaField != "" { + timeseriesOptions.SetMetaField(imp.IngestOptions.TimeSeriesMetaField) + } + if imp.IngestOptions.TimeSeriesGranularity != "" { + timeseriesOptions.SetGranularity(imp.IngestOptions.TimeSeriesGranularity) + } collectionOptions := moptions.CreateCollection().SetTimeSeriesOptions(timeseriesOptions) session.Database(imp.ToolOptions.DB).CreateCollection(context.TODO(), imp.ToolOptions.Namespace.Collection, collectionOptions) diff --git a/mongoimport/options.go b/mongoimport/options.go index 51e568028..a4066df68 100644 --- a/mongoimport/options.go +++ b/mongoimport/options.go @@ -105,7 +105,13 @@ type IngestOptions struct { // Creates the target collection as a TimeSeries collection using the value given // as the TimeField. - TimeSeries string `long:"timeseries" value-name:"time-field" description:"Creates target collection as TimeSeries with this field as the timeField e.g. --timeseries='timestamp'"` + TimeSeriesTimeField string `long:"timeseries-timefield" value-name:"time-field" description:"Creates target collection as TimeSeries with this field as the timeField e.g. --timeseries-timefield='timestamp'"` + + // Optional. Passed to the creation of a TimeSeries collection. + TimeSeriesMetaField string `long:"timeseries-metafield" value-name:"meta-field" description:"Sets the (optional) metaField of the target TimeSeries collection e.g. --timeseries-metafield='sensor_id'. Requires --timeseries-timefield."` + + // Optional. Passed to the creation of a TimeSeries collection. + TimeSeriesGranularity string `long:"timeseries-granularity" value-name:"granularity" description:"Sets the (optional) granularity of time values on the target TimeSeries collection to optimize how TimeSeries data is stored internally. The type can be one of: seconds (default), minutes, hours. Requires --timeseries-timefield."` } // Name returns a description of the IngestOptions struct. From 40c0702958a4703bc50d126cda705791c89b5e65 Mon Sep 17 00:00:00 2001 From: Matt Kneiser Date: Wed, 8 Mar 2023 15:51:51 -0800 Subject: [PATCH 03/19] Ensure collection doesn't already exist --- mongoimport/mongoimport.go | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/mongoimport/mongoimport.go b/mongoimport/mongoimport.go index ede773940..0392ffe79 100644 --- a/mongoimport/mongoimport.go +++ b/mongoimport/mongoimport.go @@ -14,6 +14,7 @@ import ( "github.com/mongodb/mongo-tools/common/progress" "github.com/mongodb/mongo-tools/common/util" "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo" moptions "go.mongodb.org/mongo-driver/mongo/options" "gopkg.in/tomb.v2" @@ -404,15 +405,34 @@ func (imp *MongoImport) importDocuments(inputReader InputReader) (uint64, uint64 } } + // Before importing documents, create the target collection as TimeSeries. + // Fail if the collection already exists. if imp.IngestOptions.TimeSeriesTimeField != "" { + if !imp.IngestOptions.Drop { + collectionFilter := bson.D{} + collectionFilter = append(collectionFilter, primitive.E{"name", imp.ToolOptions.Namespace.Collection}) + cursor, err := session.Database(imp.ToolOptions.DB).ListCollections(context.TODO(), collectionFilter) + + if err != nil { + return 0, 0, err + } + + if cursor.Next(context.TODO()) { + cursor.Close(context.TODO()) + return 0, 0, fmt.Errorf("error when inserting to a TimeSeries collection, the collection must not exist, or --drop must be provided") + } + } + log.Logvf(log.Always, "creating TimeSeries collection: %v.%v", imp.ToolOptions.Namespace.DB, imp.ToolOptions.Namespace.Collection) timeseriesOptions := moptions.TimeSeries() timeseriesOptions.SetTimeField(imp.IngestOptions.TimeSeriesTimeField) + if imp.IngestOptions.TimeSeriesMetaField != "" { timeseriesOptions.SetMetaField(imp.IngestOptions.TimeSeriesMetaField) } + if imp.IngestOptions.TimeSeriesGranularity != "" { timeseriesOptions.SetGranularity(imp.IngestOptions.TimeSeriesGranularity) } From 1af1e55a4f24519f1472862b3f9046dbd601a275 Mon Sep 17 00:00:00 2001 From: Matt Kneiser Date: Wed, 8 Mar 2023 15:59:43 -0800 Subject: [PATCH 04/19] Disallow TS with JSON import && TimeSeries -> time-series --- mongoimport/mongoimport.go | 10 +++++++--- mongoimport/options.go | 12 ++++++------ 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/mongoimport/mongoimport.go b/mongoimport/mongoimport.go index 0392ffe79..9b4e030a1 100644 --- a/mongoimport/mongoimport.go +++ b/mongoimport/mongoimport.go @@ -213,7 +213,11 @@ func (imp *MongoImport) validateSettings(args []string) error { if imp.IngestOptions.TimeSeriesTimeField == "" { if (imp.IngestOptions.TimeSeriesMetaField != "" || imp.IngestOptions.TimeSeriesGranularity != "") { - return fmt.Errorf("can not use --timeseries-metafield nor --timeseries-granularity without --timeseries-timefield") + return fmt.Errorf("cannot use --timeseries-metafield nor --timeseries-granularity without --timeseries-timefield") + } + } else { + if imp.InputOptions.Type == JSON { + return fmt.Errorf("cannot import time-series collections with JSON") } } @@ -419,11 +423,11 @@ func (imp *MongoImport) importDocuments(inputReader InputReader) (uint64, uint64 if cursor.Next(context.TODO()) { cursor.Close(context.TODO()) - return 0, 0, fmt.Errorf("error when inserting to a TimeSeries collection, the collection must not exist, or --drop must be provided") + return 0, 0, fmt.Errorf("error when inserting to a time-series collection, the collection must not exist, or --drop must be provided") } } - log.Logvf(log.Always, "creating TimeSeries collection: %v.%v", + log.Logvf(log.Always, "creating time-series collection: %v.%v", imp.ToolOptions.Namespace.DB, imp.ToolOptions.Namespace.Collection) timeseriesOptions := moptions.TimeSeries() diff --git a/mongoimport/options.go b/mongoimport/options.go index a4066df68..c772f8e9a 100644 --- a/mongoimport/options.go +++ b/mongoimport/options.go @@ -103,15 +103,15 @@ type IngestOptions struct { BulkBufferSize int `long:"batchSize" default:"1000" hidden:"true"` - // Creates the target collection as a TimeSeries collection using the value given + // Creates the target collection as a time-series collection using the value given // as the TimeField. - TimeSeriesTimeField string `long:"timeseries-timefield" value-name:"time-field" description:"Creates target collection as TimeSeries with this field as the timeField e.g. --timeseries-timefield='timestamp'"` + TimeSeriesTimeField string `long:"timeseries-timefield" value-name:"time-field" description:"Creates target collection as time-series with this field as the timeField e.g. --timeseries-timefield='timestamp'"` - // Optional. Passed to the creation of a TimeSeries collection. - TimeSeriesMetaField string `long:"timeseries-metafield" value-name:"meta-field" description:"Sets the (optional) metaField of the target TimeSeries collection e.g. --timeseries-metafield='sensor_id'. Requires --timeseries-timefield."` + // Optional. Passed to the creation of a time-series collection. + TimeSeriesMetaField string `long:"timeseries-metafield" value-name:"meta-field" description:"Sets the (optional) metaField of the target time-series collection e.g. --timeseries-metafield='sensor_id'. Requires --timeseries-timefield."` - // Optional. Passed to the creation of a TimeSeries collection. - TimeSeriesGranularity string `long:"timeseries-granularity" value-name:"granularity" description:"Sets the (optional) granularity of time values on the target TimeSeries collection to optimize how TimeSeries data is stored internally. The type can be one of: seconds (default), minutes, hours. Requires --timeseries-timefield."` + // Optional. Passed to the creation of a time-series collection. + TimeSeriesGranularity string `long:"timeseries-granularity" value-name:"granularity" description:"Sets the (optional) granularity of time values on the target time-series collection to optimize how time-series data is stored internally. The type can be one of: seconds (default), minutes, hours. Requires --timeseries-timefield."` } // Name returns a description of the IngestOptions struct. From e01bdf48e230fec9893096bebe3455f28f24a968 Mon Sep 17 00:00:00 2001 From: Matt Kneiser Date: Wed, 8 Mar 2023 20:18:18 -0800 Subject: [PATCH 05/19] column validation for timeField & metaField --- mongoimport/common.go | 43 +++++++++++++++++++++++++++++++++++--- mongoimport/csv.go | 8 +++---- mongoimport/json.go | 4 ++-- mongoimport/mongoimport.go | 17 +++++++++------ mongoimport/tsv.go | 8 +++---- 5 files changed, 61 insertions(+), 19 deletions(-) diff --git a/mongoimport/common.go b/mongoimport/common.go index cb1a944e7..0164c643e 100644 --- a/mongoimport/common.go +++ b/mongoimport/common.go @@ -32,6 +32,13 @@ type FieldInfo struct { parts []string } +// All options to mongoimport that need validation +// against the column names provided by user data. +type ColumnsAsOptionFields struct { + timeField string + metaField string +} + const ( pgAutoCast ParseGrace = iota pgSkipField @@ -554,7 +561,10 @@ func tokensToBSON(colSpecs []ColumnSpec, tokens []string, numProcessed uint64, i // (6). The indexes for an array don't start from 0 (e.g. a.1,a.2) // (7). Array indexes are out of order (e.g. a.0,a.2,a.1) // (8). An array is missing an index (e.g. a.0,a.2) -func validateFields(inputFields []string, useArrayIndexFields bool) error { +// +// In the case that --timeseries-timefield and its optional pair --timeseries-metafield are set: +// (9). A field intended to refer to a column of provided data must match one. +func validateFields(inputFields []string, useArrayIndexFields bool, fields ColumnsAsOptionFields) error { for _, field := range inputFields { // Here we check validity for case (1). @@ -590,6 +600,33 @@ func validateFields(inputFields []string, useArrayIndexFields bool) error { return err } } + + if len(fields.timeField) > 0 { + var timeFieldExists bool + for _, field := range inputFields { + if field == fields.timeField { + timeFieldExists = true + break + } + } + if !timeFieldExists { + return fmt.Errorf("error --timeseries-timefield '%v' doesn't match any provided fields", fields.timeField) + } + } + + if len(fields.metaField) > 0 { + var metaFieldExists bool + for _, field := range inputFields { + if field == fields.metaField { + metaFieldExists = true + break + } + } + if !metaFieldExists { + return fmt.Errorf("error --timeseries-metafield '%v' doesn't match any provided fields", fields.metaField) + } + } + return nil } @@ -780,8 +817,8 @@ func indexError(field string) error { } // validateReaderFields is a helper to validate fields for input readers -func validateReaderFields(fields []string, useArrayIndexFields bool) error { - if err := validateFields(fields, useArrayIndexFields); err != nil { +func validateReaderFields(fields []string, useArrayIndexFields bool, optionsWithFields ColumnsAsOptionFields) error { + if err := validateFields(fields, useArrayIndexFields, optionsWithFields); err != nil { return err } if len(fields) == 1 { diff --git a/mongoimport/csv.go b/mongoimport/csv.go index 17b20409e..bb1abdfb2 100644 --- a/mongoimport/csv.go +++ b/mongoimport/csv.go @@ -78,18 +78,18 @@ func NewCSVInputReader(colSpecs []ColumnSpec, in io.Reader, rejects io.Writer, n // ReadAndValidateHeader reads the header from the underlying reader and validates // the header fields. It sets err if the read/validation fails. -func (r *CSVInputReader) ReadAndValidateHeader() (err error) { +func (r *CSVInputReader) ReadAndValidateHeader(optionsWithFields ColumnsAsOptionFields) (err error) { fields, err := r.csvReader.Read() if err != nil { return err } r.colSpecs = ParseAutoHeaders(fields) - return validateReaderFields(ColumnNames(r.colSpecs), r.useArrayIndexFields) + return validateReaderFields(ColumnNames(r.colSpecs), r.useArrayIndexFields, optionsWithFields) } // ReadAndValidateHeader reads the header from the underlying reader and validates // the header fields. It sets err if the read/validation fails. -func (r *CSVInputReader) ReadAndValidateTypedHeader(parseGrace ParseGrace) (err error) { +func (r *CSVInputReader) ReadAndValidateTypedHeader(parseGrace ParseGrace, optionsWithFields ColumnsAsOptionFields) (err error) { fields, err := r.csvReader.Read() if err != nil { return err @@ -98,7 +98,7 @@ func (r *CSVInputReader) ReadAndValidateTypedHeader(parseGrace ParseGrace) (err if err != nil { return err } - return validateReaderFields(ColumnNames(r.colSpecs), r.useArrayIndexFields) + return validateReaderFields(ColumnNames(r.colSpecs), r.useArrayIndexFields, optionsWithFields) } // StreamDocument takes a boolean indicating if the documents should be streamed diff --git a/mongoimport/json.go b/mongoimport/json.go index c007ef82f..6467c0aff 100644 --- a/mongoimport/json.go +++ b/mongoimport/json.go @@ -94,12 +94,12 @@ func NewJSONInputReader(isArray bool, legacyExtJSON bool, in io.Reader, numDecod } // ReadAndValidateHeader is a no-op for JSON imports; always returns nil. -func (r *JSONInputReader) ReadAndValidateHeader() error { +func (r *JSONInputReader) ReadAndValidateHeader(optionsWithFields ColumnsAsOptionFields) error { return nil } // ReadAndValidateTypedHeader is a no-op for JSON imports; always returns nil. -func (r *JSONInputReader) ReadAndValidateTypedHeader(parseGrace ParseGrace) error { +func (r *JSONInputReader) ReadAndValidateTypedHeader(parseGrace ParseGrace, optionsWithFields ColumnsAsOptionFields) error { return nil } diff --git a/mongoimport/mongoimport.go b/mongoimport/mongoimport.go index 9b4e030a1..04dd525fc 100644 --- a/mongoimport/mongoimport.go +++ b/mongoimport/mongoimport.go @@ -49,6 +49,8 @@ const ( progressBarLength = 24 ) +var optionsWithFields ColumnsAsOptionFields + // MongoImport is a container for the user-specified options and // internal state used for running mongoimport. type MongoImport struct { @@ -93,12 +95,12 @@ type InputReader interface { // ReadAndValidateHeader reads the header line from the InputReader and returns // a non-nil error if the fields from the header line are invalid; returns // nil otherwise. No-op for JSON input readers. - ReadAndValidateHeader() error + ReadAndValidateHeader(optionsWithFields ColumnsAsOptionFields) error // ReadAndValidateTypedHeader is the same as ReadAndValidateHeader, // except it also parses types from the fields of the header. Parse errors // will be handled according parseGrace. - ReadAndValidateTypedHeader(parseGrace ParseGrace) error + ReadAndValidateTypedHeader(parseGrace ParseGrace, optionsWithFields ColumnsAsOptionFields) error // embedded io.Reader that tracks number of bytes read, to allow feeding into progress bar. sizeTracker @@ -219,6 +221,9 @@ func (imp *MongoImport) validateSettings(args []string) error { if imp.InputOptions.Type == JSON { return fmt.Errorf("cannot import time-series collections with JSON") } + + optionsWithFields.timeField = imp.IngestOptions.TimeSeriesTimeField + optionsWithFields.metaField = imp.IngestOptions.TimeSeriesMetaField } if imp.IngestOptions.TimeSeriesGranularity != "" { @@ -237,7 +242,7 @@ func (imp *MongoImport) validateSettings(args []string) error { return fmt.Errorf("can not use --upsertFields with --mode=insert") } imp.upsertFields = strings.Split(imp.IngestOptions.UpsertFields, ",") - if err := validateFields(imp.upsertFields, imp.InputOptions.UseArrayIndexFields); err != nil { + if err := validateFields(imp.upsertFields, imp.InputOptions.UseArrayIndexFields, optionsWithFields); err != nil { return fmt.Errorf("invalid --upsertFields argument: %v", err) } } else if imp.IngestOptions.Mode != modeInsert { @@ -352,9 +357,9 @@ func (imp *MongoImport) ImportDocuments() (uint64, uint64, error) { if imp.InputOptions.HeaderLine { if imp.InputOptions.ColumnsHaveTypes { - err = inputReader.ReadAndValidateTypedHeader(ParsePG(imp.InputOptions.ParseGrace)) + err = inputReader.ReadAndValidateTypedHeader(ParsePG(imp.InputOptions.ParseGrace), optionsWithFields) } else { - err = inputReader.ReadAndValidateHeader() + err = inputReader.ReadAndValidateHeader(optionsWithFields) } if err != nil { return 0, 0, err @@ -629,7 +634,7 @@ func (imp *MongoImport) getInputReader(in io.Reader) (InputReader, error) { // header fields validation can only happen once we have an input reader if !imp.InputOptions.HeaderLine { - if err = validateReaderFields(ColumnNames(colSpecs), imp.InputOptions.UseArrayIndexFields); err != nil { + if err = validateReaderFields(ColumnNames(colSpecs), imp.InputOptions.UseArrayIndexFields, optionsWithFields); err != nil { return nil, err } } diff --git a/mongoimport/tsv.go b/mongoimport/tsv.go index c794d350e..18ffe66f5 100644 --- a/mongoimport/tsv.go +++ b/mongoimport/tsv.go @@ -80,7 +80,7 @@ func NewTSVInputReader(colSpecs []ColumnSpec, in io.Reader, rejects io.Writer, n // ReadAndValidateHeader reads the header from the underlying reader and validates // the header fields. It sets err if the read/validation fails. -func (r *TSVInputReader) ReadAndValidateHeader() (err error) { +func (r *TSVInputReader) ReadAndValidateHeader(optionsWithFields ColumnsAsOptionFields) (err error) { header, err := r.tsvReader.ReadString(entryDelimiter) if err != nil { return err @@ -90,12 +90,12 @@ func (r *TSVInputReader) ReadAndValidateHeader() (err error) { headerFields = append(headerFields, strings.TrimRight(field, "\r\n")) } r.colSpecs = ParseAutoHeaders(headerFields) - return validateReaderFields(ColumnNames(r.colSpecs), r.useArrayIndexFields) + return validateReaderFields(ColumnNames(r.colSpecs), r.useArrayIndexFields, optionsWithFields) } // ReadAndValidateTypedHeader reads the header from the underlying reader and validates // the header fields. It sets err if the read/validation fails. -func (r *TSVInputReader) ReadAndValidateTypedHeader(parseGrace ParseGrace) (err error) { +func (r *TSVInputReader) ReadAndValidateTypedHeader(parseGrace ParseGrace, optionsWithFields ColumnsAsOptionFields) (err error) { header, err := r.tsvReader.ReadString(entryDelimiter) if err != nil { return err @@ -108,7 +108,7 @@ func (r *TSVInputReader) ReadAndValidateTypedHeader(parseGrace ParseGrace) (err if err != nil { return err } - return validateReaderFields(ColumnNames(r.colSpecs), r.useArrayIndexFields) + return validateReaderFields(ColumnNames(r.colSpecs), r.useArrayIndexFields, optionsWithFields) } // StreamDocument takes a boolean indicating if the documents should be streamed From adf1ebe39d2a5fa3fb3e287aa63bf37fed11bccb Mon Sep 17 00:00:00 2001 From: Matt Kneiser Date: Thu, 9 Mar 2023 10:11:46 -0800 Subject: [PATCH 06/19] Fix granularity condition, metafield and timefield can't be identical --- mongoimport/mongoimport.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/mongoimport/mongoimport.go b/mongoimport/mongoimport.go index 04dd525fc..7789b5b81 100644 --- a/mongoimport/mongoimport.go +++ b/mongoimport/mongoimport.go @@ -222,13 +222,17 @@ func (imp *MongoImport) validateSettings(args []string) error { return fmt.Errorf("cannot import time-series collections with JSON") } + if imp.IngestOptions.TimeSeriesTimeField == imp.IngestOptions.TimeSeriesMetaField { + return fmt.Errorf("error the metafield and timefield for time-series collections must be different columns") + } + optionsWithFields.timeField = imp.IngestOptions.TimeSeriesTimeField optionsWithFields.metaField = imp.IngestOptions.TimeSeriesMetaField } if imp.IngestOptions.TimeSeriesGranularity != "" { - if (imp.IngestOptions.TimeSeriesGranularity != "seconds" || - imp.IngestOptions.TimeSeriesGranularity != "minutes" || + if (imp.IngestOptions.TimeSeriesGranularity != "seconds" && + imp.IngestOptions.TimeSeriesGranularity != "minutes" && imp.IngestOptions.TimeSeriesGranularity != "hours") { return fmt.Errorf("--timeseries-granularity must be one of: seconds, minutes, hours") } From b20c0309ab7b78fdc737ac4dcaa68f967f77dec9 Mon Sep 17 00:00:00 2001 From: Matt Kneiser Date: Thu, 9 Mar 2023 10:11:55 -0800 Subject: [PATCH 07/19] Fix tests --- mongoimport/common_test.go | 42 +++++++++++++++++++++++--------------- mongoimport/csv_test.go | 29 +++++++++++++------------- mongoimport/tsv_test.go | 3 ++- 3 files changed, 42 insertions(+), 32 deletions(-) diff --git a/mongoimport/common_test.go b/mongoimport/common_test.go index 2fd24062f..90db8486f 100644 --- a/mongoimport/common_test.go +++ b/mongoimport/common_test.go @@ -110,36 +110,44 @@ func convertBSONDToRaw(documents []bson.D) []bson.Raw { func TestValidateFields(t *testing.T) { testtype.SkipUnlessTestType(t, testtype.UnitTestType) + var optionsWithFields ColumnsAsOptionFields Convey("Given an import input, in validating the headers", t, func() { Convey("if the fields contain '..', an error should be thrown", func() { - So(validateFields([]string{"a..a"}, false), ShouldNotBeNil) + So(validateFields([]string{"a..a"}, false, optionsWithFields), ShouldNotBeNil) }) Convey("if the fields start/end in a '.', an error should be thrown", func() { - So(validateFields([]string{".a"}, false), ShouldNotBeNil) - So(validateFields([]string{"a."}, false), ShouldNotBeNil) + So(validateFields([]string{".a"}, false, optionsWithFields), ShouldNotBeNil) + So(validateFields([]string{"a."}, false, optionsWithFields), ShouldNotBeNil) }) Convey("if the fields start in a '$', an error should be thrown", func() { - So(validateFields([]string{"$.a"}, false), ShouldNotBeNil) - So(validateFields([]string{"$"}, false), ShouldNotBeNil) - So(validateFields([]string{"$a"}, false), ShouldNotBeNil) - So(validateFields([]string{"a$a"}, false), ShouldBeNil) + So(validateFields([]string{"$.a"}, false, optionsWithFields), ShouldNotBeNil) + So(validateFields([]string{"$"}, false, optionsWithFields), ShouldNotBeNil) + So(validateFields([]string{"$a"}, false, optionsWithFields), ShouldNotBeNil) + So(validateFields([]string{"a$a"}, false, optionsWithFields), ShouldBeNil) }) Convey("if the fields collide, an error should be thrown", func() { - So(validateFields([]string{"a", "a.a"}, false), ShouldNotBeNil) - So(validateFields([]string{"a", "a.ba", "b.a"}, false), ShouldNotBeNil) - So(validateFields([]string{"a", "a.ba", "b.a"}, false), ShouldNotBeNil) - So(validateFields([]string{"a", "a.b.c"}, false), ShouldNotBeNil) + So(validateFields([]string{"a", "a.a"}, false, optionsWithFields), ShouldNotBeNil) + So(validateFields([]string{"a", "a.ba", "b.a"}, false, optionsWithFields), ShouldNotBeNil) + So(validateFields([]string{"a", "a.ba", "b.a"}, false, optionsWithFields), ShouldNotBeNil) + So(validateFields([]string{"a", "a.b.c"}, false, optionsWithFields), ShouldNotBeNil) }) Convey("if the fields don't collide, no error should be thrown", func() { - So(validateFields([]string{"a", "aa"}, false), ShouldBeNil) - So(validateFields([]string{"a", "aa", "b.a", "b.c"}, false), ShouldBeNil) - So(validateFields([]string{"a", "ba", "ab", "b.a"}, false), ShouldBeNil) - So(validateFields([]string{"a", "ba", "ab", "b.a", "b.c.d"}, false), ShouldBeNil) - So(validateFields([]string{"a", "ab.c"}, false), ShouldBeNil) + So(validateFields([]string{"a", "aa"}, false, optionsWithFields), ShouldBeNil) + So(validateFields([]string{"a", "aa", "b.a", "b.c"}, false, optionsWithFields), ShouldBeNil) + So(validateFields([]string{"a", "ba", "ab", "b.a"}, false, optionsWithFields), ShouldBeNil) + So(validateFields([]string{"a", "ba", "ab", "b.a", "b.c.d"}, false, optionsWithFields), ShouldBeNil) + So(validateFields([]string{"a", "ab.c"}, false, optionsWithFields), ShouldBeNil) }) Convey("if the fields contain the same keys, an error should be thrown", func() { - So(validateFields([]string{"a", "ba", "a"}, false), ShouldNotBeNil) + So(validateFields([]string{"a", "ba", "a"}, false, optionsWithFields), ShouldNotBeNil) + }) + Convey("time-series fields should validate", func() { + optionsWithFields.metaField = "b" + optionsWithFields.timeField = "c" + So(validateFields([]string{"a", "aa"}, false, optionsWithFields), ShouldNotBeNil) + So(validateFields([]string{"a", "aa","b"}, false, optionsWithFields), ShouldNotBeNil) + So(validateFields([]string{"a", "aa","b","c"}, false, optionsWithFields), ShouldBeNil) }) }) } diff --git a/mongoimport/csv_test.go b/mongoimport/csv_test.go index e8e7a6811..103d20b84 100644 --- a/mongoimport/csv_test.go +++ b/mongoimport/csv_test.go @@ -210,12 +210,13 @@ func TestCSVStreamDocument(t *testing.T) { func TestCSVReadAndValidateHeader(t *testing.T) { testtype.SkipUnlessTestType(t, testtype.UnitTestType) var err error + var optionsWithFields ColumnsAsOptionFields Convey("With a CSV input reader", t, func() { Convey("setting the header should read the first line of the CSV", func() { contents := "extraHeader1, extraHeader2, extraHeader3" colSpecs := []ColumnSpec{} r := NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false, false) - So(r.ReadAndValidateHeader(), ShouldBeNil) + So(r.ReadAndValidateHeader(optionsWithFields), ShouldBeNil) So(len(r.colSpecs), ShouldEqual, 3) }) @@ -223,24 +224,24 @@ func TestCSVReadAndValidateHeader(t *testing.T) { contents := "a, b, c" colSpecs := []ColumnSpec{} r := NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false, false) - So(r.ReadAndValidateHeader(), ShouldBeNil) + So(r.ReadAndValidateHeader(optionsWithFields), ShouldBeNil) So(len(r.colSpecs), ShouldEqual, 3) contents = "a.b.c, a.b.d, c" colSpecs = []ColumnSpec{} r = NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false, false) - So(r.ReadAndValidateHeader(), ShouldBeNil) + So(r.ReadAndValidateHeader(optionsWithFields), ShouldBeNil) So(len(r.colSpecs), ShouldEqual, 3) contents = "a.b, ab, a.c" colSpecs = []ColumnSpec{} r = NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false, false) - So(r.ReadAndValidateHeader(), ShouldBeNil) + So(r.ReadAndValidateHeader(optionsWithFields), ShouldBeNil) So(len(r.colSpecs), ShouldEqual, 3) contents = "a, ab, ac, dd" colSpecs = []ColumnSpec{} r = NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false, false) - So(r.ReadAndValidateHeader(), ShouldBeNil) + So(r.ReadAndValidateHeader(optionsWithFields), ShouldBeNil) So(len(r.colSpecs), ShouldEqual, 4) }) @@ -248,47 +249,47 @@ func TestCSVReadAndValidateHeader(t *testing.T) { contents := "a, a.b, c" colSpecs := []ColumnSpec{} r := NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false, false) - So(r.ReadAndValidateHeader(), ShouldNotBeNil) + So(r.ReadAndValidateHeader(optionsWithFields), ShouldNotBeNil) contents = "a.b.c, a.b.d.c, a.b.d" colSpecs = []ColumnSpec{} r = NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false, false) - So(r.ReadAndValidateHeader(), ShouldNotBeNil) + So(r.ReadAndValidateHeader(optionsWithFields), ShouldNotBeNil) contents = "a, a, a" colSpecs = []ColumnSpec{} r = NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false, false) - So(r.ReadAndValidateHeader(), ShouldNotBeNil) + So(r.ReadAndValidateHeader(optionsWithFields), ShouldNotBeNil) }) Convey("setting the header that ends in a dot should error", func() { contents := "c, a., b" colSpecs := []ColumnSpec{} So(err, ShouldBeNil) - So(NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false, false).ReadAndValidateHeader(), ShouldNotBeNil) + So(NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false, false).ReadAndValidateHeader(optionsWithFields), ShouldNotBeNil) }) Convey("setting the header that starts in a dot should error", func() { contents := "c, .a, b" colSpecs := []ColumnSpec{} - So(NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false, false).ReadAndValidateHeader(), ShouldNotBeNil) + So(NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false, false).ReadAndValidateHeader(optionsWithFields), ShouldNotBeNil) }) Convey("setting the header that contains multiple consecutive dots should error", func() { contents := "c, a..a, b" colSpecs := []ColumnSpec{} - So(NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false, false).ReadAndValidateHeader(), ShouldNotBeNil) + So(NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false, false).ReadAndValidateHeader(optionsWithFields), ShouldNotBeNil) contents = "c, a.a, b.b...b" colSpecs = []ColumnSpec{} - So(NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false, false).ReadAndValidateHeader(), ShouldNotBeNil) + So(NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false, false).ReadAndValidateHeader(optionsWithFields), ShouldNotBeNil) }) Convey("setting the header using an empty file should return EOF", func() { contents := "" colSpecs := []ColumnSpec{} r := NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false, false) - So(r.ReadAndValidateHeader(), ShouldEqual, io.EOF) + So(r.ReadAndValidateHeader(optionsWithFields), ShouldEqual, io.EOF) So(len(r.colSpecs), ShouldEqual, 0) }) Convey("setting the header with column specs already set should replace "+ @@ -300,7 +301,7 @@ func TestCSVReadAndValidateHeader(t *testing.T) { {"c", new(FieldAutoParser), pgAutoCast, "auto", []string{"c"}}, } r := NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false, false) - So(r.ReadAndValidateHeader(), ShouldBeNil) + So(r.ReadAndValidateHeader(optionsWithFields), ShouldBeNil) // if ReadAndValidateHeader() is called with column specs already passed // in, the header should be replaced with the read header line So(len(r.colSpecs), ShouldEqual, 3) diff --git a/mongoimport/tsv_test.go b/mongoimport/tsv_test.go index 15b1739bd..fb22d7bc1 100644 --- a/mongoimport/tsv_test.go +++ b/mongoimport/tsv_test.go @@ -201,12 +201,13 @@ func TestTSVStreamDocument(t *testing.T) { func TestTSVReadAndValidateHeader(t *testing.T) { testtype.SkipUnlessTestType(t, testtype.UnitTestType) + var optionsWithFields ColumnsAsOptionFields Convey("With a TSV input reader", t, func() { Convey("setting the header should read the first line of the TSV", func() { contents := "extraHeader1\textraHeader2\textraHeader3\n" colSpecs := []ColumnSpec{} r := NewTSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false, false) - So(r.ReadAndValidateHeader(), ShouldBeNil) + So(r.ReadAndValidateHeader(optionsWithFields), ShouldBeNil) So(len(r.colSpecs), ShouldEqual, 3) }) }) From ff6e44be880d2e4c2c0ec589a153129b8a8cda0c Mon Sep 17 00:00:00 2001 From: Matt Kneiser Date: Thu, 9 Mar 2023 10:45:43 -0800 Subject: [PATCH 08/19] gofmt -s --- mongoimport/common_test.go | 5 +++-- mongoimport/mongoimport.go | 8 ++++---- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/mongoimport/common_test.go b/mongoimport/common_test.go index 90db8486f..84b95bd90 100644 --- a/mongoimport/common_test.go +++ b/mongoimport/common_test.go @@ -146,8 +146,9 @@ func TestValidateFields(t *testing.T) { optionsWithFields.metaField = "b" optionsWithFields.timeField = "c" So(validateFields([]string{"a", "aa"}, false, optionsWithFields), ShouldNotBeNil) - So(validateFields([]string{"a", "aa","b"}, false, optionsWithFields), ShouldNotBeNil) - So(validateFields([]string{"a", "aa","b","c"}, false, optionsWithFields), ShouldBeNil) + So(validateFields([]string{"a", "aa", "b"}, false, optionsWithFields), ShouldNotBeNil) + So(validateFields([]string{"a", "aa", "c"}, false, optionsWithFields), ShouldNotBeNil) + So(validateFields([]string{"a", "aa", "b", "c"}, false, optionsWithFields), ShouldBeNil) }) }) } diff --git a/mongoimport/mongoimport.go b/mongoimport/mongoimport.go index 7789b5b81..40f2dc476 100644 --- a/mongoimport/mongoimport.go +++ b/mongoimport/mongoimport.go @@ -213,8 +213,8 @@ func (imp *MongoImport) validateSettings(args []string) error { // Validate TimeSeries Options if imp.IngestOptions.TimeSeriesTimeField == "" { - if (imp.IngestOptions.TimeSeriesMetaField != "" || - imp.IngestOptions.TimeSeriesGranularity != "") { + if imp.IngestOptions.TimeSeriesMetaField != "" || + imp.IngestOptions.TimeSeriesGranularity != "" { return fmt.Errorf("cannot use --timeseries-metafield nor --timeseries-granularity without --timeseries-timefield") } } else { @@ -231,9 +231,9 @@ func (imp *MongoImport) validateSettings(args []string) error { } if imp.IngestOptions.TimeSeriesGranularity != "" { - if (imp.IngestOptions.TimeSeriesGranularity != "seconds" && + if imp.IngestOptions.TimeSeriesGranularity != "seconds" && imp.IngestOptions.TimeSeriesGranularity != "minutes" && - imp.IngestOptions.TimeSeriesGranularity != "hours") { + imp.IngestOptions.TimeSeriesGranularity != "hours" { return fmt.Errorf("--timeseries-granularity must be one of: seconds, minutes, hours") } } From 18f9845e68a36ccce6a0acb856bf0a450d226b8a Mon Sep 17 00:00:00 2001 From: Matt Kneiser Date: Thu, 9 Mar 2023 17:44:21 -0800 Subject: [PATCH 09/19] Allow escape hatch to write to pre-existing TS collections --- mongoimport/mongoimport.go | 12 +++--------- mongoimport/options.go | 7 ++++++- 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/mongoimport/mongoimport.go b/mongoimport/mongoimport.go index 40f2dc476..0d8fbaa9e 100644 --- a/mongoimport/mongoimport.go +++ b/mongoimport/mongoimport.go @@ -230,14 +230,6 @@ func (imp *MongoImport) validateSettings(args []string) error { optionsWithFields.metaField = imp.IngestOptions.TimeSeriesMetaField } - if imp.IngestOptions.TimeSeriesGranularity != "" { - if imp.IngestOptions.TimeSeriesGranularity != "seconds" && - imp.IngestOptions.TimeSeriesGranularity != "minutes" && - imp.IngestOptions.TimeSeriesGranularity != "hours" { - return fmt.Errorf("--timeseries-granularity must be one of: seconds, minutes, hours") - } - } - // parse UpsertFields, may set default mode to modeUpsert if imp.IngestOptions.UpsertFields != "" { if imp.IngestOptions.Mode == "" { @@ -432,7 +424,9 @@ func (imp *MongoImport) importDocuments(inputReader InputReader) (uint64, uint64 if cursor.Next(context.TODO()) { cursor.Close(context.TODO()) - return 0, 0, fmt.Errorf("error when inserting to a time-series collection, the collection must not exist, or --drop must be provided") + if !imp.IngestOptions.TimeSeriesExists { + return 0, 0, fmt.Errorf("error when inserting to a time-series collection, the collection must not exist, or --drop must be provided. Consider using --timeseries-exists if the time-series collection was already created.") + } } } diff --git a/mongoimport/options.go b/mongoimport/options.go index c772f8e9a..32f45f5d1 100644 --- a/mongoimport/options.go +++ b/mongoimport/options.go @@ -111,7 +111,12 @@ type IngestOptions struct { TimeSeriesMetaField string `long:"timeseries-metafield" value-name:"meta-field" description:"Sets the (optional) metaField of the target time-series collection e.g. --timeseries-metafield='sensor_id'. Requires --timeseries-timefield."` // Optional. Passed to the creation of a time-series collection. - TimeSeriesGranularity string `long:"timeseries-granularity" value-name:"granularity" description:"Sets the (optional) granularity of time values on the target time-series collection to optimize how time-series data is stored internally. The type can be one of: seconds (default), minutes, hours. Requires --timeseries-timefield."` + TimeSeriesGranularity string `long:"timeseries-granularity" choice:"seconds" choice:"minutes" choice:"hours" description:"Sets the (optional) granularity of time values on the target time-series collection to optimize how time-series data is stored internally. Requires --timeseries-timefield. (default: seconds)"` + + // If an existing time-series collection exists, allow mongoimport to write to it directly, without creating the collection. + // You must ensure that the target collection was created as a time-series collection, and that the timeField and/or MetaField was set correctly upon creation. + // Mongoimport is not currently able to fully validate what you are doing is correct. + TimeSeriesExists bool `long:"timeseries-exists" description:"Allow mongoimport to write to an existing time-series collection directly, without creating the collection nor validating the options to mongoimport. You must ensure that the target collection was created as a time-series collection, and that the timeField and/or MetaField was set correctly upon creation. Use this option with discretion."` } // Name returns a description of the IngestOptions struct. From 954cf8dc4cdd34b7b115bf313866804e955dd2c6 Mon Sep 17 00:00:00 2001 From: Matt Kneiser Date: Fri, 10 Mar 2023 11:17:41 -0800 Subject: [PATCH 10/19] Put validation code in typed_fields, validate 'date' for timefield --- mongoimport/common.go | 35 +++--------------------------- mongoimport/csv.go | 10 +++++++-- mongoimport/mongoimport.go | 8 +++++-- mongoimport/tsv.go | 10 +++++++-- mongoimport/typed_fields.go | 43 +++++++++++++++++++++++++++++++++++++ 5 files changed, 68 insertions(+), 38 deletions(-) diff --git a/mongoimport/common.go b/mongoimport/common.go index 0164c643e..7988c02e4 100644 --- a/mongoimport/common.go +++ b/mongoimport/common.go @@ -561,10 +561,7 @@ func tokensToBSON(colSpecs []ColumnSpec, tokens []string, numProcessed uint64, i // (6). The indexes for an array don't start from 0 (e.g. a.1,a.2) // (7). Array indexes are out of order (e.g. a.0,a.2,a.1) // (8). An array is missing an index (e.g. a.0,a.2) -// -// In the case that --timeseries-timefield and its optional pair --timeseries-metafield are set: -// (9). A field intended to refer to a column of provided data must match one. -func validateFields(inputFields []string, useArrayIndexFields bool, fields ColumnsAsOptionFields) error { +func validateFields(inputFields []string, useArrayIndexFields bool) error { for _, field := range inputFields { // Here we check validity for case (1). @@ -601,32 +598,6 @@ func validateFields(inputFields []string, useArrayIndexFields bool, fields Colum } } - if len(fields.timeField) > 0 { - var timeFieldExists bool - for _, field := range inputFields { - if field == fields.timeField { - timeFieldExists = true - break - } - } - if !timeFieldExists { - return fmt.Errorf("error --timeseries-timefield '%v' doesn't match any provided fields", fields.timeField) - } - } - - if len(fields.metaField) > 0 { - var metaFieldExists bool - for _, field := range inputFields { - if field == fields.metaField { - metaFieldExists = true - break - } - } - if !metaFieldExists { - return fmt.Errorf("error --timeseries-metafield '%v' doesn't match any provided fields", fields.metaField) - } - } - return nil } @@ -817,8 +788,8 @@ func indexError(field string) error { } // validateReaderFields is a helper to validate fields for input readers -func validateReaderFields(fields []string, useArrayIndexFields bool, optionsWithFields ColumnsAsOptionFields) error { - if err := validateFields(fields, useArrayIndexFields, optionsWithFields); err != nil { +func validateReaderFields(fields []string, useArrayIndexFields bool) error { + if err := validateFields(fields, useArrayIndexFields); err != nil { return err } if len(fields) == 1 { diff --git a/mongoimport/csv.go b/mongoimport/csv.go index bb1abdfb2..c9cfb98be 100644 --- a/mongoimport/csv.go +++ b/mongoimport/csv.go @@ -84,7 +84,10 @@ func (r *CSVInputReader) ReadAndValidateHeader(optionsWithFields ColumnsAsOption return err } r.colSpecs = ParseAutoHeaders(fields) - return validateReaderFields(ColumnNames(r.colSpecs), r.useArrayIndexFields, optionsWithFields) + if err = ValidateOptionDependentFields(r.colSpecs, optionsWithFields); err != nil { + return err + } + return validateReaderFields(ColumnNames(r.colSpecs), r.useArrayIndexFields) } // ReadAndValidateHeader reads the header from the underlying reader and validates @@ -98,7 +101,10 @@ func (r *CSVInputReader) ReadAndValidateTypedHeader(parseGrace ParseGrace, optio if err != nil { return err } - return validateReaderFields(ColumnNames(r.colSpecs), r.useArrayIndexFields, optionsWithFields) + if err = ValidateOptionDependentFields(r.colSpecs, optionsWithFields); err != nil { + return err + } + return validateReaderFields(ColumnNames(r.colSpecs), r.useArrayIndexFields) } // StreamDocument takes a boolean indicating if the documents should be streamed diff --git a/mongoimport/mongoimport.go b/mongoimport/mongoimport.go index 0d8fbaa9e..c0ebaa6c9 100644 --- a/mongoimport/mongoimport.go +++ b/mongoimport/mongoimport.go @@ -238,7 +238,7 @@ func (imp *MongoImport) validateSettings(args []string) error { return fmt.Errorf("can not use --upsertFields with --mode=insert") } imp.upsertFields = strings.Split(imp.IngestOptions.UpsertFields, ",") - if err := validateFields(imp.upsertFields, imp.InputOptions.UseArrayIndexFields, optionsWithFields); err != nil { + if err := validateFields(imp.upsertFields, imp.InputOptions.UseArrayIndexFields); err != nil { return fmt.Errorf("invalid --upsertFields argument: %v", err) } } else if imp.IngestOptions.Mode != modeInsert { @@ -632,7 +632,11 @@ func (imp *MongoImport) getInputReader(in io.Reader) (InputReader, error) { // header fields validation can only happen once we have an input reader if !imp.InputOptions.HeaderLine { - if err = validateReaderFields(ColumnNames(colSpecs), imp.InputOptions.UseArrayIndexFields, optionsWithFields); err != nil { + if err = ValidateOptionDependentFields(colSpecs, optionsWithFields); err != nil { + return nil, err + } + + if err = validateReaderFields(ColumnNames(colSpecs), imp.InputOptions.UseArrayIndexFields); err != nil { return nil, err } } diff --git a/mongoimport/tsv.go b/mongoimport/tsv.go index 18ffe66f5..04acdcb84 100644 --- a/mongoimport/tsv.go +++ b/mongoimport/tsv.go @@ -90,7 +90,10 @@ func (r *TSVInputReader) ReadAndValidateHeader(optionsWithFields ColumnsAsOption headerFields = append(headerFields, strings.TrimRight(field, "\r\n")) } r.colSpecs = ParseAutoHeaders(headerFields) - return validateReaderFields(ColumnNames(r.colSpecs), r.useArrayIndexFields, optionsWithFields) + if err = ValidateOptionDependentFields(r.colSpecs, optionsWithFields); err != nil { + return err + } + return validateReaderFields(ColumnNames(r.colSpecs), r.useArrayIndexFields) } // ReadAndValidateTypedHeader reads the header from the underlying reader and validates @@ -108,7 +111,10 @@ func (r *TSVInputReader) ReadAndValidateTypedHeader(parseGrace ParseGrace, optio if err != nil { return err } - return validateReaderFields(ColumnNames(r.colSpecs), r.useArrayIndexFields, optionsWithFields) + if err = ValidateOptionDependentFields(r.colSpecs, optionsWithFields); err != nil { + return err + } + return validateReaderFields(ColumnNames(r.colSpecs), r.useArrayIndexFields) } // StreamDocument takes a boolean indicating if the documents should be streamed diff --git a/mongoimport/typed_fields.go b/mongoimport/typed_fields.go index 47861167c..a560317c4 100644 --- a/mongoimport/typed_fields.go +++ b/mongoimport/typed_fields.go @@ -134,6 +134,49 @@ func ParseAutoHeaders(headers []string) (fs []ColumnSpec) { return } +// Validate options that are dependent on existence of field names and/or types. +// - Some options refer to field names. +// --timeseries-timefield= must refer to a field from --headerline/--fields/--fieldFile +// --timeseries-metafield= must refer to a field from --headerline/--fields/--fieldFile +// - Some options can require a field type. +// --timeseries-timefield: date +func ValidateOptionDependentFields(headers []ColumnSpec, optionFields ColumnsAsOptionFields) (err error) { + // --timeseries-timefield must match a column name AND be of type date* + if len(optionFields.timeField) > 0 { + var timeFieldExists bool + for _, header := range headers { + if header.Name == optionFields.timeField { + if ! (header.TypeName == "date" || header.TypeName == "date_go" || header.TypeName == "date_ms" || header.TypeName == "date_oracle") { + return fmt.Errorf("error --timeseries-timefield '%v' must be a date type (date, date_go, date_ms, date_oracle)", optionFields.timeField) + } + timeFieldExists = true + break + } + } + + if !timeFieldExists { + return fmt.Errorf("error --timeseries-timefield '%v' doesn't match any provided fields", optionFields.timeField) + } + } + + // --timeseries-metafield must match a column name + if len(optionFields.metaField) > 0 { + var metaFieldExists bool + for _, header := range headers { + if header.Name == optionFields.metaField { + metaFieldExists = true + break + } + } + + if !metaFieldExists { + return fmt.Errorf("error --timeseries-metafield '%v' doesn't match any provided fields", optionFields.metaField) + } + } + + return nil +} + // FieldParser is the interface for any parser of a field item. type FieldParser interface { Parse(in string) (interface{}, error) From 82a00c9d5d480f2b01f7590a71f45f104a8585e1 Mon Sep 17 00:00:00 2001 From: Matt Kneiser Date: Fri, 10 Mar 2023 12:33:50 -0800 Subject: [PATCH 11/19] Make linter happy & unit test the field validation --- mongoimport/common_test.go | 43 +++++++++++++------------------- mongoimport/mongoimport.go | 2 +- mongoimport/typed_fields_test.go | 30 ++++++++++++++++++++++ 3 files changed, 48 insertions(+), 27 deletions(-) diff --git a/mongoimport/common_test.go b/mongoimport/common_test.go index 84b95bd90..2fd24062f 100644 --- a/mongoimport/common_test.go +++ b/mongoimport/common_test.go @@ -110,45 +110,36 @@ func convertBSONDToRaw(documents []bson.D) []bson.Raw { func TestValidateFields(t *testing.T) { testtype.SkipUnlessTestType(t, testtype.UnitTestType) - var optionsWithFields ColumnsAsOptionFields Convey("Given an import input, in validating the headers", t, func() { Convey("if the fields contain '..', an error should be thrown", func() { - So(validateFields([]string{"a..a"}, false, optionsWithFields), ShouldNotBeNil) + So(validateFields([]string{"a..a"}, false), ShouldNotBeNil) }) Convey("if the fields start/end in a '.', an error should be thrown", func() { - So(validateFields([]string{".a"}, false, optionsWithFields), ShouldNotBeNil) - So(validateFields([]string{"a."}, false, optionsWithFields), ShouldNotBeNil) + So(validateFields([]string{".a"}, false), ShouldNotBeNil) + So(validateFields([]string{"a."}, false), ShouldNotBeNil) }) Convey("if the fields start in a '$', an error should be thrown", func() { - So(validateFields([]string{"$.a"}, false, optionsWithFields), ShouldNotBeNil) - So(validateFields([]string{"$"}, false, optionsWithFields), ShouldNotBeNil) - So(validateFields([]string{"$a"}, false, optionsWithFields), ShouldNotBeNil) - So(validateFields([]string{"a$a"}, false, optionsWithFields), ShouldBeNil) + So(validateFields([]string{"$.a"}, false), ShouldNotBeNil) + So(validateFields([]string{"$"}, false), ShouldNotBeNil) + So(validateFields([]string{"$a"}, false), ShouldNotBeNil) + So(validateFields([]string{"a$a"}, false), ShouldBeNil) }) Convey("if the fields collide, an error should be thrown", func() { - So(validateFields([]string{"a", "a.a"}, false, optionsWithFields), ShouldNotBeNil) - So(validateFields([]string{"a", "a.ba", "b.a"}, false, optionsWithFields), ShouldNotBeNil) - So(validateFields([]string{"a", "a.ba", "b.a"}, false, optionsWithFields), ShouldNotBeNil) - So(validateFields([]string{"a", "a.b.c"}, false, optionsWithFields), ShouldNotBeNil) + So(validateFields([]string{"a", "a.a"}, false), ShouldNotBeNil) + So(validateFields([]string{"a", "a.ba", "b.a"}, false), ShouldNotBeNil) + So(validateFields([]string{"a", "a.ba", "b.a"}, false), ShouldNotBeNil) + So(validateFields([]string{"a", "a.b.c"}, false), ShouldNotBeNil) }) Convey("if the fields don't collide, no error should be thrown", func() { - So(validateFields([]string{"a", "aa"}, false, optionsWithFields), ShouldBeNil) - So(validateFields([]string{"a", "aa", "b.a", "b.c"}, false, optionsWithFields), ShouldBeNil) - So(validateFields([]string{"a", "ba", "ab", "b.a"}, false, optionsWithFields), ShouldBeNil) - So(validateFields([]string{"a", "ba", "ab", "b.a", "b.c.d"}, false, optionsWithFields), ShouldBeNil) - So(validateFields([]string{"a", "ab.c"}, false, optionsWithFields), ShouldBeNil) + So(validateFields([]string{"a", "aa"}, false), ShouldBeNil) + So(validateFields([]string{"a", "aa", "b.a", "b.c"}, false), ShouldBeNil) + So(validateFields([]string{"a", "ba", "ab", "b.a"}, false), ShouldBeNil) + So(validateFields([]string{"a", "ba", "ab", "b.a", "b.c.d"}, false), ShouldBeNil) + So(validateFields([]string{"a", "ab.c"}, false), ShouldBeNil) }) Convey("if the fields contain the same keys, an error should be thrown", func() { - So(validateFields([]string{"a", "ba", "a"}, false, optionsWithFields), ShouldNotBeNil) - }) - Convey("time-series fields should validate", func() { - optionsWithFields.metaField = "b" - optionsWithFields.timeField = "c" - So(validateFields([]string{"a", "aa"}, false, optionsWithFields), ShouldNotBeNil) - So(validateFields([]string{"a", "aa", "b"}, false, optionsWithFields), ShouldNotBeNil) - So(validateFields([]string{"a", "aa", "c"}, false, optionsWithFields), ShouldNotBeNil) - So(validateFields([]string{"a", "aa", "b", "c"}, false, optionsWithFields), ShouldBeNil) + So(validateFields([]string{"a", "ba", "a"}, false), ShouldNotBeNil) }) }) } diff --git a/mongoimport/mongoimport.go b/mongoimport/mongoimport.go index c0ebaa6c9..dc1eada30 100644 --- a/mongoimport/mongoimport.go +++ b/mongoimport/mongoimport.go @@ -425,7 +425,7 @@ func (imp *MongoImport) importDocuments(inputReader InputReader) (uint64, uint64 if cursor.Next(context.TODO()) { cursor.Close(context.TODO()) if !imp.IngestOptions.TimeSeriesExists { - return 0, 0, fmt.Errorf("error when inserting to a time-series collection, the collection must not exist, or --drop must be provided. Consider using --timeseries-exists if the time-series collection was already created.") + return 0, 0, fmt.Errorf("error when inserting to a time-series collection, the collection must not exist, or --drop must be provided. Consider using --timeseries-exists if the time-series collection was already created") } } } diff --git a/mongoimport/typed_fields_test.go b/mongoimport/typed_fields_test.go index 141029071..d3bc514aa 100644 --- a/mongoimport/typed_fields_test.go +++ b/mongoimport/typed_fields_test.go @@ -414,3 +414,33 @@ func TestFieldParsers(t *testing.T) { }) } + +func TestOptionDependentFieldValidation(t *testing.T) { + testtype.SkipUnlessTestType(t, testtype.UnitTestType) + Convey("Using 'timestamp.date(),zip.string(),number.double(),foo.auto()'", t, func() { + colSpecs := []ColumnSpec{ + {"timestamp", &FieldDateParser{"January 2, (2006)"}, pgAutoCast, "date", []string{"timestamp"}}, + {"zip", new(FieldStringParser), pgAutoCast, "string", []string{"zip"}}, + {"number", new(FieldDoubleParser), pgAutoCast, "double", []string{"number"}}, + {"foo", new(FieldAutoParser), pgAutoCast, "auto", []string{"foo"}}, + } + + emptyOptionFields := ColumnsAsOptionFields{} + So(ValidateOptionDependentFields(colSpecs, emptyOptionFields), ShouldBeNil) + + bothNonExistentOptionFields := ColumnsAsOptionFields{"somefield", "otherfield"} + So(ValidateOptionDependentFields(colSpecs, bothNonExistentOptionFields), ShouldNotBeNil) + + timeNonExistentOptionFields := ColumnsAsOptionFields{"somefield", "foo"} + So(ValidateOptionDependentFields(colSpecs, timeNonExistentOptionFields), ShouldNotBeNil) + + metaNonExistentOptionFields := ColumnsAsOptionFields{"timestamp", "somefield"} + So(ValidateOptionDependentFields(colSpecs, metaNonExistentOptionFields), ShouldNotBeNil) + + nonDateTimestamp := ColumnsAsOptionFields{"number", "zip"} + So(ValidateOptionDependentFields(colSpecs, nonDateTimestamp), ShouldNotBeNil) + + optionFields := ColumnsAsOptionFields{"timestamp", "zip"} + So(ValidateOptionDependentFields(colSpecs, optionFields), ShouldBeNil) + }) +} From 4ad1daf3ed1175f8ffc7bb1a0de5bfa8f84c7df3 Mon Sep 17 00:00:00 2001 From: Matt Kneiser Date: Mon, 13 Mar 2023 08:50:47 -0700 Subject: [PATCH 12/19] Follow repo coding practice of options naming --- mongoimport/mongoimport.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/mongoimport/mongoimport.go b/mongoimport/mongoimport.go index dc1eada30..8ed0bb5f1 100644 --- a/mongoimport/mongoimport.go +++ b/mongoimport/mongoimport.go @@ -16,7 +16,7 @@ import ( "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo" - moptions "go.mongodb.org/mongo-driver/mongo/options" + mopt "go.mongodb.org/mongo-driver/mongo/options" "gopkg.in/tomb.v2" "context" @@ -433,7 +433,7 @@ func (imp *MongoImport) importDocuments(inputReader InputReader) (uint64, uint64 log.Logvf(log.Always, "creating time-series collection: %v.%v", imp.ToolOptions.Namespace.DB, imp.ToolOptions.Namespace.Collection) - timeseriesOptions := moptions.TimeSeries() + timeseriesOptions := mopt.TimeSeries() timeseriesOptions.SetTimeField(imp.IngestOptions.TimeSeriesTimeField) if imp.IngestOptions.TimeSeriesMetaField != "" { @@ -443,7 +443,7 @@ func (imp *MongoImport) importDocuments(inputReader InputReader) (uint64, uint64 if imp.IngestOptions.TimeSeriesGranularity != "" { timeseriesOptions.SetGranularity(imp.IngestOptions.TimeSeriesGranularity) } - collectionOptions := moptions.CreateCollection().SetTimeSeriesOptions(timeseriesOptions) + collectionOptions := mopt.CreateCollection().SetTimeSeriesOptions(timeseriesOptions) session.Database(imp.ToolOptions.DB).CreateCollection(context.TODO(), imp.ToolOptions.Namespace.Collection, collectionOptions) } From cd51c38a4223558c0e3d30fde6f3a9e659db1f21 Mon Sep 17 00:00:00 2001 From: Matt Kneiser Date: Mon, 13 Mar 2023 15:07:14 -0700 Subject: [PATCH 13/19] Add test for time-series --- mongoimport/mongoimport_test.go | 71 +++++++++++++++++++++++++++++++-- 1 file changed, 67 insertions(+), 4 deletions(-) diff --git a/mongoimport/mongoimport_test.go b/mongoimport/mongoimport_test.go index 032de19b7..a7450258a 100644 --- a/mongoimport/mongoimport_test.go +++ b/mongoimport/mongoimport_test.go @@ -8,6 +8,7 @@ package mongoimport import ( "bufio" + "context" "fmt" "io" "io/ioutil" @@ -18,6 +19,7 @@ import ( "testing" "github.com/mongodb/mongo-tools/common/db" + "github.com/mongodb/mongo-tools/common/log" "github.com/mongodb/mongo-tools/common/options" "github.com/mongodb/mongo-tools/common/testtype" "github.com/mongodb/mongo-tools/common/testutil" @@ -71,13 +73,52 @@ func checkOnlyHasDocuments(sessionProvider *db.SessionProvider, expectedDocument return nil } -func countDocuments(sessionProvider *db.SessionProvider) (int, error) { +func isTimeSeriesCollection(sessionProvider *db.SessionProvider, testCollectionName string) (bool, error) { + session, err := (*sessionProvider).GetSession() + if err != nil { + return false, err + } + + db := session.Database(testDb) + collInfo, err := db.ListCollectionSpecifications(context.TODO(), bson.D{{"name", testCollectionName}}) + if err != nil { + return false, err + } + + log.Logvf(log.Always, "listCollSpecs: %v num:[%d]", collInfo[0], len(collInfo)) + opts := collInfo[0].Options + elems, err := collInfo[0].Options.Elements() + if err != nil { + return false, err + } + + log.Logvf(log.Always, "num elems: %d", len(elems)) + log.Logvf(log.Always, "num elems: %s", elems[0].DebugString()) + + var collSpecs bson.D + err = bson.Unmarshal(opts, &collSpecs) + if err != nil { + return false, err + } + + for _, collSpec := range collSpecs { + log.Logvf(log.Always, " elem: %v", collSpec.Key) + if collSpec.Key == "timeseries" { + log.Logvf(log.Always, " found TS!") + return true, nil + } + } + + return false, nil +} + +func countDocuments(sessionProvider *db.SessionProvider, testCollectionName string) (int, error) { session, err := (*sessionProvider).GetSession() if err != nil { return 0, err } - collection := session.Database(testDb).Collection(testCollection) + collection := session.Database(testDb).Collection(testCollectionName) n, err := collection.CountDocuments(nil, bson.D{}) if err != nil { return 0, err @@ -982,7 +1023,7 @@ func TestImportDocuments(t *testing.T) { So(err, ShouldBeNil) So(numProcessed, ShouldEqual, 1) So(numFailed, ShouldEqual, 0) - n, err := countDocuments(imp.SessionProvider) + n, err := countDocuments(imp.SessionProvider, testCollection) So(err, ShouldBeNil) So(n, ShouldEqual, 1) @@ -998,7 +1039,7 @@ func TestImportDocuments(t *testing.T) { So(err, ShouldBeNil) So(numProcessed, ShouldEqual, 1) So(numFailed, ShouldEqual, 0) - n, err = countDocuments(imp.SessionProvider) + n, err = countDocuments(imp.SessionProvider, testCollection) So(err, ShouldBeNil) So(n, ShouldEqual, 1) }) @@ -1236,6 +1277,28 @@ func TestImportDocuments(t *testing.T) { fmt.Errorf("fields 'a.a.a.a' and 'a.a' are incompatible"), ), ) + Convey("CSV import with --timeSeriesTimeField should succeed in creating a time-series collection and inserting documents", func() { + imp, err := NewMongoImport() + So(err, ShouldBeNil) + imp.IngestOptions.Drop = true + imp.IngestOptions.Mode = modeInsert + imp.IngestOptions.TimeSeriesTimeField = "timestamp" + imp.InputOptions.ColumnsHaveTypes = true + imp.InputOptions.File = "testdata/test_timeseries.csv" + imp.InputOptions.HeaderLine = true + imp.InputOptions.Type = CSV + imp.ToolOptions.Collection = "timeseries_coll" + numProcessed, numFailed, err := imp.ImportDocuments() + So(err, ShouldBeNil) + So(numProcessed, ShouldEqual, 5) + So(numFailed, ShouldEqual, 0) + isTimeSeries, err := isTimeSeriesCollection(imp.SessionProvider, imp.ToolOptions.Collection) + So(err, ShouldBeNil) + So(isTimeSeries, ShouldBeTrue) + n, err := countDocuments(imp.SessionProvider, imp.ToolOptions.Collection) + So(err, ShouldBeNil) + So(n, ShouldEqual, 5) + }) }) } From 3f8fd5ee9c9cd3ec221f601300a9a145a8603f62 Mon Sep 17 00:00:00 2001 From: Matt Kneiser Date: Mon, 13 Mar 2023 15:11:16 -0700 Subject: [PATCH 14/19] snake_case to camelCase on new options --- mongoimport/mongoimport.go | 10 +++++++--- mongoimport/options.go | 13 +++++++------ mongoimport/typed_fields.go | 21 +++++++++++---------- 3 files changed, 25 insertions(+), 19 deletions(-) diff --git a/mongoimport/mongoimport.go b/mongoimport/mongoimport.go index 8ed0bb5f1..71e975df0 100644 --- a/mongoimport/mongoimport.go +++ b/mongoimport/mongoimport.go @@ -215,7 +215,7 @@ func (imp *MongoImport) validateSettings(args []string) error { if imp.IngestOptions.TimeSeriesTimeField == "" { if imp.IngestOptions.TimeSeriesMetaField != "" || imp.IngestOptions.TimeSeriesGranularity != "" { - return fmt.Errorf("cannot use --timeseries-metafield nor --timeseries-granularity without --timeseries-timefield") + return fmt.Errorf("cannot use --timeSeriesMetaField nor --timeSeriesGranularity without --timeSeriesTimeField") } } else { if imp.InputOptions.Type == JSON { @@ -223,7 +223,11 @@ func (imp *MongoImport) validateSettings(args []string) error { } if imp.IngestOptions.TimeSeriesTimeField == imp.IngestOptions.TimeSeriesMetaField { - return fmt.Errorf("error the metafield and timefield for time-series collections must be different columns") + return fmt.Errorf("error the MetaField and TimeField for time-series collections must be different columns") + } + + if ! imp.InputOptions.ColumnsHaveTypes { + return fmt.Errorf("--timeSeriesTimeFields requires --columnsHaveTypes so mongoimport can validate the date field") } optionsWithFields.timeField = imp.IngestOptions.TimeSeriesTimeField @@ -425,7 +429,7 @@ func (imp *MongoImport) importDocuments(inputReader InputReader) (uint64, uint64 if cursor.Next(context.TODO()) { cursor.Close(context.TODO()) if !imp.IngestOptions.TimeSeriesExists { - return 0, 0, fmt.Errorf("error when inserting to a time-series collection, the collection must not exist, or --drop must be provided. Consider using --timeseries-exists if the time-series collection was already created") + return 0, 0, fmt.Errorf("error when inserting to a time-series collection, the collection must not exist, or --drop must be provided. Consider using --timeSeriesExists if the time-series collection was already created") } } } diff --git a/mongoimport/options.go b/mongoimport/options.go index 32f45f5d1..454c4800c 100644 --- a/mongoimport/options.go +++ b/mongoimport/options.go @@ -104,19 +104,20 @@ type IngestOptions struct { BulkBufferSize int `long:"batchSize" default:"1000" hidden:"true"` // Creates the target collection as a time-series collection using the value given - // as the TimeField. - TimeSeriesTimeField string `long:"timeseries-timefield" value-name:"time-field" description:"Creates target collection as time-series with this field as the timeField e.g. --timeseries-timefield='timestamp'"` + // as the TimeField. --columnsHaveTypes is required so mongoimport can validate + // that a date field exists, and because a date cannot be coerced from auto. + TimeSeriesTimeField string `long:"timeSeriesTimeField" value-name:"time-field" description:"Creates target collection as time-series with this field as the timeField e.g. --timeSeriesTimeField='timestamp'". Requires --columnsHaveTypes.` // Optional. Passed to the creation of a time-series collection. - TimeSeriesMetaField string `long:"timeseries-metafield" value-name:"meta-field" description:"Sets the (optional) metaField of the target time-series collection e.g. --timeseries-metafield='sensor_id'. Requires --timeseries-timefield."` + TimeSeriesMetaField string `long:"timeSeriesMetaField" value-name:"meta-field" description:"Sets the (optional) metaField of the target time-series collection e.g. --timeSeriesMetaField='sensor_id'. Requires --timeSeriesTimeField."` // Optional. Passed to the creation of a time-series collection. - TimeSeriesGranularity string `long:"timeseries-granularity" choice:"seconds" choice:"minutes" choice:"hours" description:"Sets the (optional) granularity of time values on the target time-series collection to optimize how time-series data is stored internally. Requires --timeseries-timefield. (default: seconds)"` + TimeSeriesGranularity string `long:"timeSeriesGranularity" choice:"seconds" choice:"minutes" choice:"hours" description:"Sets the (optional) granularity of time values on the target time-series collection to optimize how time-series data is stored internally. Requires --timeSeriesTimeField. (default: seconds)"` // If an existing time-series collection exists, allow mongoimport to write to it directly, without creating the collection. - // You must ensure that the target collection was created as a time-series collection, and that the timeField and/or MetaField was set correctly upon creation. + // You must ensure that the target collection was created as a time-series collection, and that the TimeField and/or MetaField was set correctly upon creation. // Mongoimport is not currently able to fully validate what you are doing is correct. - TimeSeriesExists bool `long:"timeseries-exists" description:"Allow mongoimport to write to an existing time-series collection directly, without creating the collection nor validating the options to mongoimport. You must ensure that the target collection was created as a time-series collection, and that the timeField and/or MetaField was set correctly upon creation. Use this option with discretion."` + TimeSeriesExists bool `long:"timeSeriesExists" description:"Allow mongoimport to write to an existing time-series collection directly, without creating the collection nor validating the options to mongoimport. You must ensure that the target collection was created as a time-series collection, and that the TimeField and/or MetaField was set correctly upon creation. Use this option with discretion."` } // Name returns a description of the IngestOptions struct. diff --git a/mongoimport/typed_fields.go b/mongoimport/typed_fields.go index a560317c4..33153c2d7 100644 --- a/mongoimport/typed_fields.go +++ b/mongoimport/typed_fields.go @@ -136,30 +136,31 @@ func ParseAutoHeaders(headers []string) (fs []ColumnSpec) { // Validate options that are dependent on existence of field names and/or types. // - Some options refer to field names. -// --timeseries-timefield= must refer to a field from --headerline/--fields/--fieldFile -// --timeseries-metafield= must refer to a field from --headerline/--fields/--fieldFile +// --timeSeriesTimeField= must refer to a field from --headerline/--fields/--fieldFile +// --timeSeriesMetaField= must refer to a field from --headerline/--fields/--fieldFile // - Some options can require a field type. -// --timeseries-timefield: date +// --timeSeriesTimeField: date func ValidateOptionDependentFields(headers []ColumnSpec, optionFields ColumnsAsOptionFields) (err error) { - // --timeseries-timefield must match a column name AND be of type date* + // --timeSeriesTimeField must match a column name AND be of type date* if len(optionFields.timeField) > 0 { var timeFieldExists bool for _, header := range headers { - if header.Name == optionFields.timeField { + // Check against NameParts[0] because the Name field will contain the "Name.Type" + if header.NameParts[0] == optionFields.timeField { + timeFieldExists = true if ! (header.TypeName == "date" || header.TypeName == "date_go" || header.TypeName == "date_ms" || header.TypeName == "date_oracle") { - return fmt.Errorf("error --timeseries-timefield '%v' must be a date type (date, date_go, date_ms, date_oracle)", optionFields.timeField) + return fmt.Errorf("error --timeSeriesTimeField '%v' must be a date type (date, date_go, date_ms, date_oracle), found type '%v'", optionFields.timeField, header.TypeName) } - timeFieldExists = true break } } if !timeFieldExists { - return fmt.Errorf("error --timeseries-timefield '%v' doesn't match any provided fields", optionFields.timeField) + return fmt.Errorf("error --timeSeriesTimeField '%v' doesn't match any provided fields", optionFields.timeField) } } - // --timeseries-metafield must match a column name + // --timeSeriesMetaField must match a column name if len(optionFields.metaField) > 0 { var metaFieldExists bool for _, header := range headers { @@ -170,7 +171,7 @@ func ValidateOptionDependentFields(headers []ColumnSpec, optionFields ColumnsAsO } if !metaFieldExists { - return fmt.Errorf("error --timeseries-metafield '%v' doesn't match any provided fields", optionFields.metaField) + return fmt.Errorf("error --timeSeriesMetaField '%v' doesn't match any provided fields", optionFields.metaField) } } From 9acb199ca17dc21d99c57b217d5365a206ed0572 Mon Sep 17 00:00:00 2001 From: Matt Kneiser Date: Mon, 13 Mar 2023 15:13:24 -0700 Subject: [PATCH 15/19] Add test time-series CSV data --- mongoimport/testdata/test_timeseries.csv | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 mongoimport/testdata/test_timeseries.csv diff --git a/mongoimport/testdata/test_timeseries.csv b/mongoimport/testdata/test_timeseries.csv new file mode 100644 index 000000000..cd26faf41 --- /dev/null +++ b/mongoimport/testdata/test_timeseries.csv @@ -0,0 +1,6 @@ +timestamp.date(2006-01-02 15:04:05),mac_address.string(),humidity.decimal(),temperature_celsius.decimal() +2023-03-07 13:29:00,48:3F:DA:A3:CE:06,91.800003,7.600000 +2023-03-07 13:29:38,8C:AA:B5:63:03:08,52.400002,16.500000 +2023-03-07 13:32:37,84:CC:A8:B2:D0:6E,67.800003,12.500000 +2023-03-07 13:34:25,8C:AA:B5:63:03:08,52.500000,16.400000 +2023-03-07 13:34:45,48:3F:DA:A3:CE:06,92.199997,7.700000 From 48f4d84c7cf7240b2504a5ae32d7742c492c2b27 Mon Sep 17 00:00:00 2001 From: Matt Kneiser Date: Mon, 13 Mar 2023 15:17:20 -0700 Subject: [PATCH 16/19] gofmt --- mongoimport/mongoimport.go | 2 +- mongoimport/typed_fields.go | 12 ++++++------ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/mongoimport/mongoimport.go b/mongoimport/mongoimport.go index 71e975df0..d99e960f4 100644 --- a/mongoimport/mongoimport.go +++ b/mongoimport/mongoimport.go @@ -226,7 +226,7 @@ func (imp *MongoImport) validateSettings(args []string) error { return fmt.Errorf("error the MetaField and TimeField for time-series collections must be different columns") } - if ! imp.InputOptions.ColumnsHaveTypes { + if !imp.InputOptions.ColumnsHaveTypes { return fmt.Errorf("--timeSeriesTimeFields requires --columnsHaveTypes so mongoimport can validate the date field") } diff --git a/mongoimport/typed_fields.go b/mongoimport/typed_fields.go index 33153c2d7..294f89407 100644 --- a/mongoimport/typed_fields.go +++ b/mongoimport/typed_fields.go @@ -135,11 +135,11 @@ func ParseAutoHeaders(headers []string) (fs []ColumnSpec) { } // Validate options that are dependent on existence of field names and/or types. -// - Some options refer to field names. -// --timeSeriesTimeField= must refer to a field from --headerline/--fields/--fieldFile -// --timeSeriesMetaField= must refer to a field from --headerline/--fields/--fieldFile -// - Some options can require a field type. -// --timeSeriesTimeField: date +// - Some options refer to field names. +// --timeSeriesTimeField= must refer to a field from --headerline/--fields/--fieldFile +// --timeSeriesMetaField= must refer to a field from --headerline/--fields/--fieldFile +// - Some options can require a field type. +// --timeSeriesTimeField: date func ValidateOptionDependentFields(headers []ColumnSpec, optionFields ColumnsAsOptionFields) (err error) { // --timeSeriesTimeField must match a column name AND be of type date* if len(optionFields.timeField) > 0 { @@ -148,7 +148,7 @@ func ValidateOptionDependentFields(headers []ColumnSpec, optionFields ColumnsAsO // Check against NameParts[0] because the Name field will contain the "Name.Type" if header.NameParts[0] == optionFields.timeField { timeFieldExists = true - if ! (header.TypeName == "date" || header.TypeName == "date_go" || header.TypeName == "date_ms" || header.TypeName == "date_oracle") { + if !(header.TypeName == "date" || header.TypeName == "date_go" || header.TypeName == "date_ms" || header.TypeName == "date_oracle") { return fmt.Errorf("error --timeSeriesTimeField '%v' must be a date type (date, date_go, date_ms, date_oracle), found type '%v'", optionFields.timeField, header.TypeName) } break From 5524a47be2ebf34d98b6e2491dce966b17bd3288 Mon Sep 17 00:00:00 2001 From: Matt Kneiser Date: Mon, 13 Mar 2023 15:26:04 -0700 Subject: [PATCH 17/19] Fix quote mark --- mongoimport/options.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mongoimport/options.go b/mongoimport/options.go index 454c4800c..943509beb 100644 --- a/mongoimport/options.go +++ b/mongoimport/options.go @@ -106,7 +106,7 @@ type IngestOptions struct { // Creates the target collection as a time-series collection using the value given // as the TimeField. --columnsHaveTypes is required so mongoimport can validate // that a date field exists, and because a date cannot be coerced from auto. - TimeSeriesTimeField string `long:"timeSeriesTimeField" value-name:"time-field" description:"Creates target collection as time-series with this field as the timeField e.g. --timeSeriesTimeField='timestamp'". Requires --columnsHaveTypes.` + TimeSeriesTimeField string `long:"timeSeriesTimeField" value-name:"time-field" description:"Creates target collection as time-series with this field as the timeField e.g. --timeSeriesTimeField='timestamp'. Requires --columnsHaveTypes."` // Optional. Passed to the creation of a time-series collection. TimeSeriesMetaField string `long:"timeSeriesMetaField" value-name:"meta-field" description:"Sets the (optional) metaField of the target time-series collection e.g. --timeSeriesMetaField='sensor_id'. Requires --timeSeriesTimeField."` From 3393e00334ffe1749bdd7d99dd129cf146752b85 Mon Sep 17 00:00:00 2001 From: Matt Kneiser Date: Mon, 13 Mar 2023 15:56:39 -0700 Subject: [PATCH 18/19] More generic collSpec checking for isTimeSeries --- mongoimport/mongoimport_test.go | 34 ++++++++++++--------------------- 1 file changed, 12 insertions(+), 22 deletions(-) diff --git a/mongoimport/mongoimport_test.go b/mongoimport/mongoimport_test.go index a7450258a..dd9a4bc07 100644 --- a/mongoimport/mongoimport_test.go +++ b/mongoimport/mongoimport_test.go @@ -19,7 +19,6 @@ import ( "testing" "github.com/mongodb/mongo-tools/common/db" - "github.com/mongodb/mongo-tools/common/log" "github.com/mongodb/mongo-tools/common/options" "github.com/mongodb/mongo-tools/common/testtype" "github.com/mongodb/mongo-tools/common/testutil" @@ -80,32 +79,23 @@ func isTimeSeriesCollection(sessionProvider *db.SessionProvider, testCollectionN } db := session.Database(testDb) - collInfo, err := db.ListCollectionSpecifications(context.TODO(), bson.D{{"name", testCollectionName}}) + collInfoSpecs, err := db.ListCollectionSpecifications(context.TODO(), bson.D{{"name", testCollectionName}}) if err != nil { return false, err } - log.Logvf(log.Always, "listCollSpecs: %v num:[%d]", collInfo[0], len(collInfo)) - opts := collInfo[0].Options - elems, err := collInfo[0].Options.Elements() - if err != nil { - return false, err - } - - log.Logvf(log.Always, "num elems: %d", len(elems)) - log.Logvf(log.Always, "num elems: %s", elems[0].DebugString()) - - var collSpecs bson.D - err = bson.Unmarshal(opts, &collSpecs) - if err != nil { - return false, err - } + for _, elem := range collInfoSpecs { + opts := elem.Options + var collSpecs bson.D + err = bson.Unmarshal(opts, &collSpecs) + if err != nil { + return false, err + } - for _, collSpec := range collSpecs { - log.Logvf(log.Always, " elem: %v", collSpec.Key) - if collSpec.Key == "timeseries" { - log.Logvf(log.Always, " found TS!") - return true, nil + for _, collSpec := range collSpecs { + if collSpec.Key == "timeseries" { + return true, nil + } } } From 5ebab80824d692247eca4a1a0c3322f343573f37 Mon Sep 17 00:00:00 2001 From: Matt Kneiser Date: Tue, 14 Mar 2023 08:27:54 -0700 Subject: [PATCH 19/19] Pull time-series testing into a separate test and check FCV --- mongoimport/mongoimport_test.go | 75 +++++++++++++++++++++++---------- 1 file changed, 53 insertions(+), 22 deletions(-) diff --git a/mongoimport/mongoimport_test.go b/mongoimport/mongoimport_test.go index dd9a4bc07..bcdae12fb 100644 --- a/mongoimport/mongoimport_test.go +++ b/mongoimport/mongoimport_test.go @@ -570,6 +570,59 @@ func TestGetInputReader(t *testing.T) { }) } +func TestTimeSeriesImport(t *testing.T) { + testtype.SkipUnlessTestType(t, testtype.IntegrationTestType) + + session, err := testutil.GetBareSession() + if err != nil { + t.Fatalf("No server available") + } + + fcv := testutil.GetFCV(session) + if cmp, err := testutil.CompareFCV(fcv, "5.0"); err != nil || cmp < 0 { + t.Skip("Requires server with FCV 5.0 or later") + } + + Convey("With a mongoimport instance", t, func() { + Reset(func() { + sessionProvider, err := db.NewSessionProvider(*getBasicToolOptions()) + if err != nil { + t.Fatalf("error getting session provider session: %v", err) + } + session, err := sessionProvider.GetSession() + if err != nil { + t.Fatalf("error getting session: %v", err) + } + _, err = session.Database(testDb).Collection("timeseries_coll").DeleteMany(nil, bson.D{}) + if err != nil { + t.Fatalf("error dropping collection: %v", err) + } + }) + Convey("CSV import with --timeSeriesTimeField should succeed in creating a time-series collection and inserting documents", func() { + imp, err := NewMongoImport() + So(err, ShouldBeNil) + imp.IngestOptions.Drop = true + imp.IngestOptions.Mode = modeInsert + imp.IngestOptions.TimeSeriesTimeField = "timestamp" + imp.InputOptions.ColumnsHaveTypes = true + imp.InputOptions.File = "testdata/test_timeseries.csv" + imp.InputOptions.HeaderLine = true + imp.InputOptions.Type = CSV + imp.ToolOptions.Collection = "timeseries_coll" + numProcessed, numFailed, err := imp.ImportDocuments() + So(err, ShouldBeNil) + So(numProcessed, ShouldEqual, 5) + So(numFailed, ShouldEqual, 0) + isTimeSeries, err := isTimeSeriesCollection(imp.SessionProvider, imp.ToolOptions.Collection) + So(err, ShouldBeNil) + So(isTimeSeries, ShouldBeTrue) + n, err := countDocuments(imp.SessionProvider, imp.ToolOptions.Collection) + So(err, ShouldBeNil) + So(n, ShouldEqual, 5) + }) + }) +} + func TestImportDocuments(t *testing.T) { testtype.SkipUnlessTestType(t, testtype.IntegrationTestType) Convey("With a mongoimport instance", t, func() { @@ -1267,28 +1320,6 @@ func TestImportDocuments(t *testing.T) { fmt.Errorf("fields 'a.a.a.a' and 'a.a' are incompatible"), ), ) - Convey("CSV import with --timeSeriesTimeField should succeed in creating a time-series collection and inserting documents", func() { - imp, err := NewMongoImport() - So(err, ShouldBeNil) - imp.IngestOptions.Drop = true - imp.IngestOptions.Mode = modeInsert - imp.IngestOptions.TimeSeriesTimeField = "timestamp" - imp.InputOptions.ColumnsHaveTypes = true - imp.InputOptions.File = "testdata/test_timeseries.csv" - imp.InputOptions.HeaderLine = true - imp.InputOptions.Type = CSV - imp.ToolOptions.Collection = "timeseries_coll" - numProcessed, numFailed, err := imp.ImportDocuments() - So(err, ShouldBeNil) - So(numProcessed, ShouldEqual, 5) - So(numFailed, ShouldEqual, 0) - isTimeSeries, err := isTimeSeriesCollection(imp.SessionProvider, imp.ToolOptions.Collection) - So(err, ShouldBeNil) - So(isTimeSeries, ShouldBeTrue) - n, err := countDocuments(imp.SessionProvider, imp.ToolOptions.Collection) - So(err, ShouldBeNil) - So(n, ShouldEqual, 5) - }) }) }