diff --git a/mongoimport/common.go b/mongoimport/common.go index cb1a944e7..7988c02e4 100644 --- a/mongoimport/common.go +++ b/mongoimport/common.go @@ -32,6 +32,13 @@ type FieldInfo struct { parts []string } +// All options to mongoimport that need validation +// against the column names provided by user data. +type ColumnsAsOptionFields struct { + timeField string + metaField string +} + const ( pgAutoCast ParseGrace = iota pgSkipField @@ -590,6 +597,7 @@ func validateFields(inputFields []string, useArrayIndexFields bool) error { return err } } + return nil } diff --git a/mongoimport/csv.go b/mongoimport/csv.go index 17b20409e..c9cfb98be 100644 --- a/mongoimport/csv.go +++ b/mongoimport/csv.go @@ -78,18 +78,21 @@ func NewCSVInputReader(colSpecs []ColumnSpec, in io.Reader, rejects io.Writer, n // ReadAndValidateHeader reads the header from the underlying reader and validates // the header fields. It sets err if the read/validation fails. -func (r *CSVInputReader) ReadAndValidateHeader() (err error) { +func (r *CSVInputReader) ReadAndValidateHeader(optionsWithFields ColumnsAsOptionFields) (err error) { fields, err := r.csvReader.Read() if err != nil { return err } r.colSpecs = ParseAutoHeaders(fields) + if err = ValidateOptionDependentFields(r.colSpecs, optionsWithFields); err != nil { + return err + } return validateReaderFields(ColumnNames(r.colSpecs), r.useArrayIndexFields) } // ReadAndValidateHeader reads the header from the underlying reader and validates // the header fields. It sets err if the read/validation fails. -func (r *CSVInputReader) ReadAndValidateTypedHeader(parseGrace ParseGrace) (err error) { +func (r *CSVInputReader) ReadAndValidateTypedHeader(parseGrace ParseGrace, optionsWithFields ColumnsAsOptionFields) (err error) { fields, err := r.csvReader.Read() if err != nil { return err @@ -98,6 +101,9 @@ func (r *CSVInputReader) ReadAndValidateTypedHeader(parseGrace ParseGrace) (err if err != nil { return err } + if err = ValidateOptionDependentFields(r.colSpecs, optionsWithFields); err != nil { + return err + } return validateReaderFields(ColumnNames(r.colSpecs), r.useArrayIndexFields) } diff --git a/mongoimport/csv_test.go b/mongoimport/csv_test.go index e8e7a6811..103d20b84 100644 --- a/mongoimport/csv_test.go +++ b/mongoimport/csv_test.go @@ -210,12 +210,13 @@ func TestCSVStreamDocument(t *testing.T) { func TestCSVReadAndValidateHeader(t *testing.T) { testtype.SkipUnlessTestType(t, testtype.UnitTestType) var err error + var optionsWithFields ColumnsAsOptionFields Convey("With a CSV input reader", t, func() { Convey("setting the header should read the first line of the CSV", func() { contents := "extraHeader1, extraHeader2, extraHeader3" colSpecs := []ColumnSpec{} r := NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false, false) - So(r.ReadAndValidateHeader(), ShouldBeNil) + So(r.ReadAndValidateHeader(optionsWithFields), ShouldBeNil) So(len(r.colSpecs), ShouldEqual, 3) }) @@ -223,24 +224,24 @@ func TestCSVReadAndValidateHeader(t *testing.T) { contents := "a, b, c" colSpecs := []ColumnSpec{} r := NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false, false) - So(r.ReadAndValidateHeader(), ShouldBeNil) + So(r.ReadAndValidateHeader(optionsWithFields), ShouldBeNil) So(len(r.colSpecs), ShouldEqual, 3) contents = "a.b.c, a.b.d, c" colSpecs = []ColumnSpec{} r = NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false, false) - So(r.ReadAndValidateHeader(), ShouldBeNil) + So(r.ReadAndValidateHeader(optionsWithFields), ShouldBeNil) So(len(r.colSpecs), ShouldEqual, 3) contents = "a.b, ab, a.c" colSpecs = []ColumnSpec{} r = NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false, false) - So(r.ReadAndValidateHeader(), ShouldBeNil) + So(r.ReadAndValidateHeader(optionsWithFields), ShouldBeNil) So(len(r.colSpecs), ShouldEqual, 3) contents = "a, ab, ac, dd" colSpecs = []ColumnSpec{} r = NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false, false) - So(r.ReadAndValidateHeader(), ShouldBeNil) + So(r.ReadAndValidateHeader(optionsWithFields), ShouldBeNil) So(len(r.colSpecs), ShouldEqual, 4) }) @@ -248,47 +249,47 @@ func TestCSVReadAndValidateHeader(t *testing.T) { contents := "a, a.b, c" colSpecs := []ColumnSpec{} r := NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false, false) - So(r.ReadAndValidateHeader(), ShouldNotBeNil) + So(r.ReadAndValidateHeader(optionsWithFields), ShouldNotBeNil) contents = "a.b.c, a.b.d.c, a.b.d" colSpecs = []ColumnSpec{} r = NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false, false) - So(r.ReadAndValidateHeader(), ShouldNotBeNil) + So(r.ReadAndValidateHeader(optionsWithFields), ShouldNotBeNil) contents = "a, a, a" colSpecs = []ColumnSpec{} r = NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false, false) - So(r.ReadAndValidateHeader(), ShouldNotBeNil) + So(r.ReadAndValidateHeader(optionsWithFields), ShouldNotBeNil) }) Convey("setting the header that ends in a dot should error", func() { contents := "c, a., b" colSpecs := []ColumnSpec{} So(err, ShouldBeNil) - So(NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false, false).ReadAndValidateHeader(), ShouldNotBeNil) + So(NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false, false).ReadAndValidateHeader(optionsWithFields), ShouldNotBeNil) }) Convey("setting the header that starts in a dot should error", func() { contents := "c, .a, b" colSpecs := []ColumnSpec{} - So(NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false, false).ReadAndValidateHeader(), ShouldNotBeNil) + So(NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false, false).ReadAndValidateHeader(optionsWithFields), ShouldNotBeNil) }) Convey("setting the header that contains multiple consecutive dots should error", func() { contents := "c, a..a, b" colSpecs := []ColumnSpec{} - So(NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false, false).ReadAndValidateHeader(), ShouldNotBeNil) + So(NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false, false).ReadAndValidateHeader(optionsWithFields), ShouldNotBeNil) contents = "c, a.a, b.b...b" colSpecs = []ColumnSpec{} - So(NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false, false).ReadAndValidateHeader(), ShouldNotBeNil) + So(NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false, false).ReadAndValidateHeader(optionsWithFields), ShouldNotBeNil) }) Convey("setting the header using an empty file should return EOF", func() { contents := "" colSpecs := []ColumnSpec{} r := NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false, false) - So(r.ReadAndValidateHeader(), ShouldEqual, io.EOF) + So(r.ReadAndValidateHeader(optionsWithFields), ShouldEqual, io.EOF) So(len(r.colSpecs), ShouldEqual, 0) }) Convey("setting the header with column specs already set should replace "+ @@ -300,7 +301,7 @@ func TestCSVReadAndValidateHeader(t *testing.T) { {"c", new(FieldAutoParser), pgAutoCast, "auto", []string{"c"}}, } r := NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false, false) - So(r.ReadAndValidateHeader(), ShouldBeNil) + So(r.ReadAndValidateHeader(optionsWithFields), ShouldBeNil) // if ReadAndValidateHeader() is called with column specs already passed // in, the header should be replaced with the read header line So(len(r.colSpecs), ShouldEqual, 3) diff --git a/mongoimport/json.go b/mongoimport/json.go index c007ef82f..6467c0aff 100644 --- a/mongoimport/json.go +++ b/mongoimport/json.go @@ -94,12 +94,12 @@ func NewJSONInputReader(isArray bool, legacyExtJSON bool, in io.Reader, numDecod } // ReadAndValidateHeader is a no-op for JSON imports; always returns nil. -func (r *JSONInputReader) ReadAndValidateHeader() error { +func (r *JSONInputReader) ReadAndValidateHeader(optionsWithFields ColumnsAsOptionFields) error { return nil } // ReadAndValidateTypedHeader is a no-op for JSON imports; always returns nil. -func (r *JSONInputReader) ReadAndValidateTypedHeader(parseGrace ParseGrace) error { +func (r *JSONInputReader) ReadAndValidateTypedHeader(parseGrace ParseGrace, optionsWithFields ColumnsAsOptionFields) error { return nil } diff --git a/mongoimport/mongoimport.go b/mongoimport/mongoimport.go index 1d8529ed6..d99e960f4 100644 --- a/mongoimport/mongoimport.go +++ b/mongoimport/mongoimport.go @@ -14,9 +14,12 @@ import ( "github.com/mongodb/mongo-tools/common/progress" "github.com/mongodb/mongo-tools/common/util" "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo" + mopt "go.mongodb.org/mongo-driver/mongo/options" "gopkg.in/tomb.v2" + "context" "fmt" "io" "os" @@ -46,6 +49,8 @@ const ( progressBarLength = 24 ) +var optionsWithFields ColumnsAsOptionFields + // MongoImport is a container for the user-specified options and // internal state used for running mongoimport. type MongoImport struct { @@ -90,12 +95,12 @@ type InputReader interface { // ReadAndValidateHeader reads the header line from the InputReader and returns // a non-nil error if the fields from the header line are invalid; returns // nil otherwise. No-op for JSON input readers. - ReadAndValidateHeader() error + ReadAndValidateHeader(optionsWithFields ColumnsAsOptionFields) error // ReadAndValidateTypedHeader is the same as ReadAndValidateHeader, // except it also parses types from the fields of the header. Parse errors // will be handled according parseGrace. - ReadAndValidateTypedHeader(parseGrace ParseGrace) error + ReadAndValidateTypedHeader(parseGrace ParseGrace, optionsWithFields ColumnsAsOptionFields) error // embedded io.Reader that tracks number of bytes read, to allow feeding into progress bar. sizeTracker @@ -206,6 +211,29 @@ func (imp *MongoImport) validateSettings(args []string) error { imp.IngestOptions.Mode = modeUpsert } + // Validate TimeSeries Options + if imp.IngestOptions.TimeSeriesTimeField == "" { + if imp.IngestOptions.TimeSeriesMetaField != "" || + imp.IngestOptions.TimeSeriesGranularity != "" { + return fmt.Errorf("cannot use --timeSeriesMetaField nor --timeSeriesGranularity without --timeSeriesTimeField") + } + } else { + if imp.InputOptions.Type == JSON { + return fmt.Errorf("cannot import time-series collections with JSON") + } + + if imp.IngestOptions.TimeSeriesTimeField == imp.IngestOptions.TimeSeriesMetaField { + return fmt.Errorf("error the MetaField and TimeField for time-series collections must be different columns") + } + + if !imp.InputOptions.ColumnsHaveTypes { + return fmt.Errorf("--timeSeriesTimeFields requires --columnsHaveTypes so mongoimport can validate the date field") + } + + optionsWithFields.timeField = imp.IngestOptions.TimeSeriesTimeField + optionsWithFields.metaField = imp.IngestOptions.TimeSeriesMetaField + } + // parse UpsertFields, may set default mode to modeUpsert if imp.IngestOptions.UpsertFields != "" { if imp.IngestOptions.Mode == "" { @@ -329,9 +357,9 @@ func (imp *MongoImport) ImportDocuments() (uint64, uint64, error) { if imp.InputOptions.HeaderLine { if imp.InputOptions.ColumnsHaveTypes { - err = inputReader.ReadAndValidateTypedHeader(ParsePG(imp.InputOptions.ParseGrace)) + err = inputReader.ReadAndValidateTypedHeader(ParsePG(imp.InputOptions.ParseGrace), optionsWithFields) } else { - err = inputReader.ReadAndValidateHeader() + err = inputReader.ReadAndValidateHeader(optionsWithFields) } if err != nil { return 0, 0, err @@ -386,6 +414,44 @@ func (imp *MongoImport) importDocuments(inputReader InputReader) (uint64, uint64 } } + // Before importing documents, create the target collection as TimeSeries. + // Fail if the collection already exists. + if imp.IngestOptions.TimeSeriesTimeField != "" { + if !imp.IngestOptions.Drop { + collectionFilter := bson.D{} + collectionFilter = append(collectionFilter, primitive.E{"name", imp.ToolOptions.Namespace.Collection}) + cursor, err := session.Database(imp.ToolOptions.DB).ListCollections(context.TODO(), collectionFilter) + + if err != nil { + return 0, 0, err + } + + if cursor.Next(context.TODO()) { + cursor.Close(context.TODO()) + if !imp.IngestOptions.TimeSeriesExists { + return 0, 0, fmt.Errorf("error when inserting to a time-series collection, the collection must not exist, or --drop must be provided. Consider using --timeSeriesExists if the time-series collection was already created") + } + } + } + + log.Logvf(log.Always, "creating time-series collection: %v.%v", + imp.ToolOptions.Namespace.DB, + imp.ToolOptions.Namespace.Collection) + timeseriesOptions := mopt.TimeSeries() + timeseriesOptions.SetTimeField(imp.IngestOptions.TimeSeriesTimeField) + + if imp.IngestOptions.TimeSeriesMetaField != "" { + timeseriesOptions.SetMetaField(imp.IngestOptions.TimeSeriesMetaField) + } + + if imp.IngestOptions.TimeSeriesGranularity != "" { + timeseriesOptions.SetGranularity(imp.IngestOptions.TimeSeriesGranularity) + } + collectionOptions := mopt.CreateCollection().SetTimeSeriesOptions(timeseriesOptions) + session.Database(imp.ToolOptions.DB).CreateCollection(context.TODO(), imp.ToolOptions.Namespace.Collection, collectionOptions) + + } + readDocs := make(chan bson.D, workerBufferSize) processingErrChan := make(chan error) ordered := imp.IngestOptions.MaintainInsertionOrder @@ -570,6 +636,10 @@ func (imp *MongoImport) getInputReader(in io.Reader) (InputReader, error) { // header fields validation can only happen once we have an input reader if !imp.InputOptions.HeaderLine { + if err = ValidateOptionDependentFields(colSpecs, optionsWithFields); err != nil { + return nil, err + } + if err = validateReaderFields(ColumnNames(colSpecs), imp.InputOptions.UseArrayIndexFields); err != nil { return nil, err } diff --git a/mongoimport/mongoimport_test.go b/mongoimport/mongoimport_test.go index 032de19b7..bcdae12fb 100644 --- a/mongoimport/mongoimport_test.go +++ b/mongoimport/mongoimport_test.go @@ -8,6 +8,7 @@ package mongoimport import ( "bufio" + "context" "fmt" "io" "io/ioutil" @@ -71,13 +72,43 @@ func checkOnlyHasDocuments(sessionProvider *db.SessionProvider, expectedDocument return nil } -func countDocuments(sessionProvider *db.SessionProvider) (int, error) { +func isTimeSeriesCollection(sessionProvider *db.SessionProvider, testCollectionName string) (bool, error) { + session, err := (*sessionProvider).GetSession() + if err != nil { + return false, err + } + + db := session.Database(testDb) + collInfoSpecs, err := db.ListCollectionSpecifications(context.TODO(), bson.D{{"name", testCollectionName}}) + if err != nil { + return false, err + } + + for _, elem := range collInfoSpecs { + opts := elem.Options + var collSpecs bson.D + err = bson.Unmarshal(opts, &collSpecs) + if err != nil { + return false, err + } + + for _, collSpec := range collSpecs { + if collSpec.Key == "timeseries" { + return true, nil + } + } + } + + return false, nil +} + +func countDocuments(sessionProvider *db.SessionProvider, testCollectionName string) (int, error) { session, err := (*sessionProvider).GetSession() if err != nil { return 0, err } - collection := session.Database(testDb).Collection(testCollection) + collection := session.Database(testDb).Collection(testCollectionName) n, err := collection.CountDocuments(nil, bson.D{}) if err != nil { return 0, err @@ -539,6 +570,59 @@ func TestGetInputReader(t *testing.T) { }) } +func TestTimeSeriesImport(t *testing.T) { + testtype.SkipUnlessTestType(t, testtype.IntegrationTestType) + + session, err := testutil.GetBareSession() + if err != nil { + t.Fatalf("No server available") + } + + fcv := testutil.GetFCV(session) + if cmp, err := testutil.CompareFCV(fcv, "5.0"); err != nil || cmp < 0 { + t.Skip("Requires server with FCV 5.0 or later") + } + + Convey("With a mongoimport instance", t, func() { + Reset(func() { + sessionProvider, err := db.NewSessionProvider(*getBasicToolOptions()) + if err != nil { + t.Fatalf("error getting session provider session: %v", err) + } + session, err := sessionProvider.GetSession() + if err != nil { + t.Fatalf("error getting session: %v", err) + } + _, err = session.Database(testDb).Collection("timeseries_coll").DeleteMany(nil, bson.D{}) + if err != nil { + t.Fatalf("error dropping collection: %v", err) + } + }) + Convey("CSV import with --timeSeriesTimeField should succeed in creating a time-series collection and inserting documents", func() { + imp, err := NewMongoImport() + So(err, ShouldBeNil) + imp.IngestOptions.Drop = true + imp.IngestOptions.Mode = modeInsert + imp.IngestOptions.TimeSeriesTimeField = "timestamp" + imp.InputOptions.ColumnsHaveTypes = true + imp.InputOptions.File = "testdata/test_timeseries.csv" + imp.InputOptions.HeaderLine = true + imp.InputOptions.Type = CSV + imp.ToolOptions.Collection = "timeseries_coll" + numProcessed, numFailed, err := imp.ImportDocuments() + So(err, ShouldBeNil) + So(numProcessed, ShouldEqual, 5) + So(numFailed, ShouldEqual, 0) + isTimeSeries, err := isTimeSeriesCollection(imp.SessionProvider, imp.ToolOptions.Collection) + So(err, ShouldBeNil) + So(isTimeSeries, ShouldBeTrue) + n, err := countDocuments(imp.SessionProvider, imp.ToolOptions.Collection) + So(err, ShouldBeNil) + So(n, ShouldEqual, 5) + }) + }) +} + func TestImportDocuments(t *testing.T) { testtype.SkipUnlessTestType(t, testtype.IntegrationTestType) Convey("With a mongoimport instance", t, func() { @@ -982,7 +1066,7 @@ func TestImportDocuments(t *testing.T) { So(err, ShouldBeNil) So(numProcessed, ShouldEqual, 1) So(numFailed, ShouldEqual, 0) - n, err := countDocuments(imp.SessionProvider) + n, err := countDocuments(imp.SessionProvider, testCollection) So(err, ShouldBeNil) So(n, ShouldEqual, 1) @@ -998,7 +1082,7 @@ func TestImportDocuments(t *testing.T) { So(err, ShouldBeNil) So(numProcessed, ShouldEqual, 1) So(numFailed, ShouldEqual, 0) - n, err = countDocuments(imp.SessionProvider) + n, err = countDocuments(imp.SessionProvider, testCollection) So(err, ShouldBeNil) So(n, ShouldEqual, 1) }) diff --git a/mongoimport/options.go b/mongoimport/options.go index 2ddd32a4c..943509beb 100644 --- a/mongoimport/options.go +++ b/mongoimport/options.go @@ -102,6 +102,22 @@ type IngestOptions struct { NumDecodingWorkers int `long:"numDecodingWorkers" default:"0" hidden:"true"` BulkBufferSize int `long:"batchSize" default:"1000" hidden:"true"` + + // Creates the target collection as a time-series collection using the value given + // as the TimeField. --columnsHaveTypes is required so mongoimport can validate + // that a date field exists, and because a date cannot be coerced from auto. + TimeSeriesTimeField string `long:"timeSeriesTimeField" value-name:"time-field" description:"Creates target collection as time-series with this field as the timeField e.g. --timeSeriesTimeField='timestamp'. Requires --columnsHaveTypes."` + + // Optional. Passed to the creation of a time-series collection. + TimeSeriesMetaField string `long:"timeSeriesMetaField" value-name:"meta-field" description:"Sets the (optional) metaField of the target time-series collection e.g. --timeSeriesMetaField='sensor_id'. Requires --timeSeriesTimeField."` + + // Optional. Passed to the creation of a time-series collection. + TimeSeriesGranularity string `long:"timeSeriesGranularity" choice:"seconds" choice:"minutes" choice:"hours" description:"Sets the (optional) granularity of time values on the target time-series collection to optimize how time-series data is stored internally. Requires --timeSeriesTimeField. (default: seconds)"` + + // If an existing time-series collection exists, allow mongoimport to write to it directly, without creating the collection. + // You must ensure that the target collection was created as a time-series collection, and that the TimeField and/or MetaField was set correctly upon creation. + // Mongoimport is not currently able to fully validate what you are doing is correct. + TimeSeriesExists bool `long:"timeSeriesExists" description:"Allow mongoimport to write to an existing time-series collection directly, without creating the collection nor validating the options to mongoimport. You must ensure that the target collection was created as a time-series collection, and that the TimeField and/or MetaField was set correctly upon creation. Use this option with discretion."` } // Name returns a description of the IngestOptions struct. diff --git a/mongoimport/testdata/test_timeseries.csv b/mongoimport/testdata/test_timeseries.csv new file mode 100644 index 000000000..cd26faf41 --- /dev/null +++ b/mongoimport/testdata/test_timeseries.csv @@ -0,0 +1,6 @@ +timestamp.date(2006-01-02 15:04:05),mac_address.string(),humidity.decimal(),temperature_celsius.decimal() +2023-03-07 13:29:00,48:3F:DA:A3:CE:06,91.800003,7.600000 +2023-03-07 13:29:38,8C:AA:B5:63:03:08,52.400002,16.500000 +2023-03-07 13:32:37,84:CC:A8:B2:D0:6E,67.800003,12.500000 +2023-03-07 13:34:25,8C:AA:B5:63:03:08,52.500000,16.400000 +2023-03-07 13:34:45,48:3F:DA:A3:CE:06,92.199997,7.700000 diff --git a/mongoimport/tsv.go b/mongoimport/tsv.go index c794d350e..04acdcb84 100644 --- a/mongoimport/tsv.go +++ b/mongoimport/tsv.go @@ -80,7 +80,7 @@ func NewTSVInputReader(colSpecs []ColumnSpec, in io.Reader, rejects io.Writer, n // ReadAndValidateHeader reads the header from the underlying reader and validates // the header fields. It sets err if the read/validation fails. -func (r *TSVInputReader) ReadAndValidateHeader() (err error) { +func (r *TSVInputReader) ReadAndValidateHeader(optionsWithFields ColumnsAsOptionFields) (err error) { header, err := r.tsvReader.ReadString(entryDelimiter) if err != nil { return err @@ -90,12 +90,15 @@ func (r *TSVInputReader) ReadAndValidateHeader() (err error) { headerFields = append(headerFields, strings.TrimRight(field, "\r\n")) } r.colSpecs = ParseAutoHeaders(headerFields) + if err = ValidateOptionDependentFields(r.colSpecs, optionsWithFields); err != nil { + return err + } return validateReaderFields(ColumnNames(r.colSpecs), r.useArrayIndexFields) } // ReadAndValidateTypedHeader reads the header from the underlying reader and validates // the header fields. It sets err if the read/validation fails. -func (r *TSVInputReader) ReadAndValidateTypedHeader(parseGrace ParseGrace) (err error) { +func (r *TSVInputReader) ReadAndValidateTypedHeader(parseGrace ParseGrace, optionsWithFields ColumnsAsOptionFields) (err error) { header, err := r.tsvReader.ReadString(entryDelimiter) if err != nil { return err @@ -108,6 +111,9 @@ func (r *TSVInputReader) ReadAndValidateTypedHeader(parseGrace ParseGrace) (err if err != nil { return err } + if err = ValidateOptionDependentFields(r.colSpecs, optionsWithFields); err != nil { + return err + } return validateReaderFields(ColumnNames(r.colSpecs), r.useArrayIndexFields) } diff --git a/mongoimport/tsv_test.go b/mongoimport/tsv_test.go index 15b1739bd..fb22d7bc1 100644 --- a/mongoimport/tsv_test.go +++ b/mongoimport/tsv_test.go @@ -201,12 +201,13 @@ func TestTSVStreamDocument(t *testing.T) { func TestTSVReadAndValidateHeader(t *testing.T) { testtype.SkipUnlessTestType(t, testtype.UnitTestType) + var optionsWithFields ColumnsAsOptionFields Convey("With a TSV input reader", t, func() { Convey("setting the header should read the first line of the TSV", func() { contents := "extraHeader1\textraHeader2\textraHeader3\n" colSpecs := []ColumnSpec{} r := NewTSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false, false) - So(r.ReadAndValidateHeader(), ShouldBeNil) + So(r.ReadAndValidateHeader(optionsWithFields), ShouldBeNil) So(len(r.colSpecs), ShouldEqual, 3) }) }) diff --git a/mongoimport/typed_fields.go b/mongoimport/typed_fields.go index 47861167c..294f89407 100644 --- a/mongoimport/typed_fields.go +++ b/mongoimport/typed_fields.go @@ -134,6 +134,50 @@ func ParseAutoHeaders(headers []string) (fs []ColumnSpec) { return } +// Validate options that are dependent on existence of field names and/or types. +// - Some options refer to field names. +// --timeSeriesTimeField= must refer to a field from --headerline/--fields/--fieldFile +// --timeSeriesMetaField= must refer to a field from --headerline/--fields/--fieldFile +// - Some options can require a field type. +// --timeSeriesTimeField: date +func ValidateOptionDependentFields(headers []ColumnSpec, optionFields ColumnsAsOptionFields) (err error) { + // --timeSeriesTimeField must match a column name AND be of type date* + if len(optionFields.timeField) > 0 { + var timeFieldExists bool + for _, header := range headers { + // Check against NameParts[0] because the Name field will contain the "Name.Type" + if header.NameParts[0] == optionFields.timeField { + timeFieldExists = true + if !(header.TypeName == "date" || header.TypeName == "date_go" || header.TypeName == "date_ms" || header.TypeName == "date_oracle") { + return fmt.Errorf("error --timeSeriesTimeField '%v' must be a date type (date, date_go, date_ms, date_oracle), found type '%v'", optionFields.timeField, header.TypeName) + } + break + } + } + + if !timeFieldExists { + return fmt.Errorf("error --timeSeriesTimeField '%v' doesn't match any provided fields", optionFields.timeField) + } + } + + // --timeSeriesMetaField must match a column name + if len(optionFields.metaField) > 0 { + var metaFieldExists bool + for _, header := range headers { + if header.Name == optionFields.metaField { + metaFieldExists = true + break + } + } + + if !metaFieldExists { + return fmt.Errorf("error --timeSeriesMetaField '%v' doesn't match any provided fields", optionFields.metaField) + } + } + + return nil +} + // FieldParser is the interface for any parser of a field item. type FieldParser interface { Parse(in string) (interface{}, error) diff --git a/mongoimport/typed_fields_test.go b/mongoimport/typed_fields_test.go index 141029071..d3bc514aa 100644 --- a/mongoimport/typed_fields_test.go +++ b/mongoimport/typed_fields_test.go @@ -414,3 +414,33 @@ func TestFieldParsers(t *testing.T) { }) } + +func TestOptionDependentFieldValidation(t *testing.T) { + testtype.SkipUnlessTestType(t, testtype.UnitTestType) + Convey("Using 'timestamp.date(),zip.string(),number.double(),foo.auto()'", t, func() { + colSpecs := []ColumnSpec{ + {"timestamp", &FieldDateParser{"January 2, (2006)"}, pgAutoCast, "date", []string{"timestamp"}}, + {"zip", new(FieldStringParser), pgAutoCast, "string", []string{"zip"}}, + {"number", new(FieldDoubleParser), pgAutoCast, "double", []string{"number"}}, + {"foo", new(FieldAutoParser), pgAutoCast, "auto", []string{"foo"}}, + } + + emptyOptionFields := ColumnsAsOptionFields{} + So(ValidateOptionDependentFields(colSpecs, emptyOptionFields), ShouldBeNil) + + bothNonExistentOptionFields := ColumnsAsOptionFields{"somefield", "otherfield"} + So(ValidateOptionDependentFields(colSpecs, bothNonExistentOptionFields), ShouldNotBeNil) + + timeNonExistentOptionFields := ColumnsAsOptionFields{"somefield", "foo"} + So(ValidateOptionDependentFields(colSpecs, timeNonExistentOptionFields), ShouldNotBeNil) + + metaNonExistentOptionFields := ColumnsAsOptionFields{"timestamp", "somefield"} + So(ValidateOptionDependentFields(colSpecs, metaNonExistentOptionFields), ShouldNotBeNil) + + nonDateTimestamp := ColumnsAsOptionFields{"number", "zip"} + So(ValidateOptionDependentFields(colSpecs, nonDateTimestamp), ShouldNotBeNil) + + optionFields := ColumnsAsOptionFields{"timestamp", "zip"} + So(ValidateOptionDependentFields(colSpecs, optionFields), ShouldBeNil) + }) +}