Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TOOLS-3271 Import Time-Series Collections via mongoimport #535

Open
wants to merge 19 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions mongoimport/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,13 @@ type FieldInfo struct {
parts []string
}

// All options to mongoimport that need validation
// against the column names provided by user data.
type ColumnsAsOptionFields struct {
timeField string
metaField string
}

const (
pgAutoCast ParseGrace = iota
pgSkipField
Expand Down Expand Up @@ -590,6 +597,7 @@ func validateFields(inputFields []string, useArrayIndexFields bool) error {
return err
}
}

return nil
}

Expand Down
10 changes: 8 additions & 2 deletions mongoimport/csv.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,18 +78,21 @@ func NewCSVInputReader(colSpecs []ColumnSpec, in io.Reader, rejects io.Writer, n

// ReadAndValidateHeader reads the header from the underlying reader and validates
// the header fields. It sets err if the read/validation fails.
func (r *CSVInputReader) ReadAndValidateHeader() (err error) {
func (r *CSVInputReader) ReadAndValidateHeader(optionsWithFields ColumnsAsOptionFields) (err error) {
fields, err := r.csvReader.Read()
if err != nil {
return err
}
r.colSpecs = ParseAutoHeaders(fields)
if err = ValidateOptionDependentFields(r.colSpecs, optionsWithFields); err != nil {
return err
}
return validateReaderFields(ColumnNames(r.colSpecs), r.useArrayIndexFields)
}

// ReadAndValidateHeader reads the header from the underlying reader and validates
// the header fields. It sets err if the read/validation fails.
func (r *CSVInputReader) ReadAndValidateTypedHeader(parseGrace ParseGrace) (err error) {
func (r *CSVInputReader) ReadAndValidateTypedHeader(parseGrace ParseGrace, optionsWithFields ColumnsAsOptionFields) (err error) {
fields, err := r.csvReader.Read()
if err != nil {
return err
Expand All @@ -98,6 +101,9 @@ func (r *CSVInputReader) ReadAndValidateTypedHeader(parseGrace ParseGrace) (err
if err != nil {
return err
}
if err = ValidateOptionDependentFields(r.colSpecs, optionsWithFields); err != nil {
return err
}
return validateReaderFields(ColumnNames(r.colSpecs), r.useArrayIndexFields)
}

Expand Down
29 changes: 15 additions & 14 deletions mongoimport/csv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,85 +210,86 @@ func TestCSVStreamDocument(t *testing.T) {
func TestCSVReadAndValidateHeader(t *testing.T) {
testtype.SkipUnlessTestType(t, testtype.UnitTestType)
var err error
var optionsWithFields ColumnsAsOptionFields
Convey("With a CSV input reader", t, func() {
Convey("setting the header should read the first line of the CSV", func() {
contents := "extraHeader1, extraHeader2, extraHeader3"
colSpecs := []ColumnSpec{}
r := NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false, false)
So(r.ReadAndValidateHeader(), ShouldBeNil)
So(r.ReadAndValidateHeader(optionsWithFields), ShouldBeNil)
So(len(r.colSpecs), ShouldEqual, 3)
})

Convey("setting non-colliding nested CSV headers should not raise an error", func() {
contents := "a, b, c"
colSpecs := []ColumnSpec{}
r := NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false, false)
So(r.ReadAndValidateHeader(), ShouldBeNil)
So(r.ReadAndValidateHeader(optionsWithFields), ShouldBeNil)
So(len(r.colSpecs), ShouldEqual, 3)
contents = "a.b.c, a.b.d, c"
colSpecs = []ColumnSpec{}
r = NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false, false)
So(r.ReadAndValidateHeader(), ShouldBeNil)
So(r.ReadAndValidateHeader(optionsWithFields), ShouldBeNil)
So(len(r.colSpecs), ShouldEqual, 3)

contents = "a.b, ab, a.c"
colSpecs = []ColumnSpec{}
r = NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false, false)
So(r.ReadAndValidateHeader(), ShouldBeNil)
So(r.ReadAndValidateHeader(optionsWithFields), ShouldBeNil)
So(len(r.colSpecs), ShouldEqual, 3)

contents = "a, ab, ac, dd"
colSpecs = []ColumnSpec{}
r = NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false, false)
So(r.ReadAndValidateHeader(), ShouldBeNil)
So(r.ReadAndValidateHeader(optionsWithFields), ShouldBeNil)
So(len(r.colSpecs), ShouldEqual, 4)
})

Convey("setting colliding nested CSV headers should raise an error", func() {
contents := "a, a.b, c"
colSpecs := []ColumnSpec{}
r := NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false, false)
So(r.ReadAndValidateHeader(), ShouldNotBeNil)
So(r.ReadAndValidateHeader(optionsWithFields), ShouldNotBeNil)

contents = "a.b.c, a.b.d.c, a.b.d"
colSpecs = []ColumnSpec{}
r = NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false, false)
So(r.ReadAndValidateHeader(), ShouldNotBeNil)
So(r.ReadAndValidateHeader(optionsWithFields), ShouldNotBeNil)

contents = "a, a, a"
colSpecs = []ColumnSpec{}
r = NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false, false)
So(r.ReadAndValidateHeader(), ShouldNotBeNil)
So(r.ReadAndValidateHeader(optionsWithFields), ShouldNotBeNil)
})

Convey("setting the header that ends in a dot should error", func() {
contents := "c, a., b"
colSpecs := []ColumnSpec{}
So(err, ShouldBeNil)
So(NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false, false).ReadAndValidateHeader(), ShouldNotBeNil)
So(NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false, false).ReadAndValidateHeader(optionsWithFields), ShouldNotBeNil)
})

Convey("setting the header that starts in a dot should error", func() {
contents := "c, .a, b"
colSpecs := []ColumnSpec{}
So(NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false, false).ReadAndValidateHeader(), ShouldNotBeNil)
So(NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false, false).ReadAndValidateHeader(optionsWithFields), ShouldNotBeNil)
})

Convey("setting the header that contains multiple consecutive dots should error", func() {
contents := "c, a..a, b"
colSpecs := []ColumnSpec{}
So(NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false, false).ReadAndValidateHeader(), ShouldNotBeNil)
So(NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false, false).ReadAndValidateHeader(optionsWithFields), ShouldNotBeNil)

contents = "c, a.a, b.b...b"
colSpecs = []ColumnSpec{}
So(NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false, false).ReadAndValidateHeader(), ShouldNotBeNil)
So(NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false, false).ReadAndValidateHeader(optionsWithFields), ShouldNotBeNil)
})

Convey("setting the header using an empty file should return EOF", func() {
contents := ""
colSpecs := []ColumnSpec{}
r := NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false, false)
So(r.ReadAndValidateHeader(), ShouldEqual, io.EOF)
So(r.ReadAndValidateHeader(optionsWithFields), ShouldEqual, io.EOF)
So(len(r.colSpecs), ShouldEqual, 0)
})
Convey("setting the header with column specs already set should replace "+
Expand All @@ -300,7 +301,7 @@ func TestCSVReadAndValidateHeader(t *testing.T) {
{"c", new(FieldAutoParser), pgAutoCast, "auto", []string{"c"}},
}
r := NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false, false)
So(r.ReadAndValidateHeader(), ShouldBeNil)
So(r.ReadAndValidateHeader(optionsWithFields), ShouldBeNil)
// if ReadAndValidateHeader() is called with column specs already passed
// in, the header should be replaced with the read header line
So(len(r.colSpecs), ShouldEqual, 3)
Expand Down
4 changes: 2 additions & 2 deletions mongoimport/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,12 @@ func NewJSONInputReader(isArray bool, legacyExtJSON bool, in io.Reader, numDecod
}

// ReadAndValidateHeader is a no-op for JSON imports; always returns nil.
func (r *JSONInputReader) ReadAndValidateHeader() error {
func (r *JSONInputReader) ReadAndValidateHeader(optionsWithFields ColumnsAsOptionFields) error {
return nil
}

// ReadAndValidateTypedHeader is a no-op for JSON imports; always returns nil.
func (r *JSONInputReader) ReadAndValidateTypedHeader(parseGrace ParseGrace) error {
func (r *JSONInputReader) ReadAndValidateTypedHeader(parseGrace ParseGrace, optionsWithFields ColumnsAsOptionFields) error {
return nil
}

Expand Down
78 changes: 74 additions & 4 deletions mongoimport/mongoimport.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,12 @@ import (
"github.com/mongodb/mongo-tools/common/progress"
"github.com/mongodb/mongo-tools/common/util"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
mopt "go.mongodb.org/mongo-driver/mongo/options"
"gopkg.in/tomb.v2"

"context"
"fmt"
"io"
"os"
Expand Down Expand Up @@ -46,6 +49,8 @@ const (
progressBarLength = 24
)

var optionsWithFields ColumnsAsOptionFields

// MongoImport is a container for the user-specified options and
// internal state used for running mongoimport.
type MongoImport struct {
Expand Down Expand Up @@ -90,12 +95,12 @@ type InputReader interface {
// ReadAndValidateHeader reads the header line from the InputReader and returns
// a non-nil error if the fields from the header line are invalid; returns
// nil otherwise. No-op for JSON input readers.
ReadAndValidateHeader() error
ReadAndValidateHeader(optionsWithFields ColumnsAsOptionFields) error

// ReadAndValidateTypedHeader is the same as ReadAndValidateHeader,
// except it also parses types from the fields of the header. Parse errors
// will be handled according parseGrace.
ReadAndValidateTypedHeader(parseGrace ParseGrace) error
ReadAndValidateTypedHeader(parseGrace ParseGrace, optionsWithFields ColumnsAsOptionFields) error

// embedded io.Reader that tracks number of bytes read, to allow feeding into progress bar.
sizeTracker
Expand Down Expand Up @@ -206,6 +211,29 @@ func (imp *MongoImport) validateSettings(args []string) error {
imp.IngestOptions.Mode = modeUpsert
}

// Validate TimeSeries Options
if imp.IngestOptions.TimeSeriesTimeField == "" {
if imp.IngestOptions.TimeSeriesMetaField != "" ||
imp.IngestOptions.TimeSeriesGranularity != "" {
return fmt.Errorf("cannot use --timeSeriesMetaField nor --timeSeriesGranularity without --timeSeriesTimeField")
}
} else {
if imp.InputOptions.Type == JSON {
return fmt.Errorf("cannot import time-series collections with JSON")
}

if imp.IngestOptions.TimeSeriesTimeField == imp.IngestOptions.TimeSeriesMetaField {
return fmt.Errorf("error the MetaField and TimeField for time-series collections must be different columns")
}

if !imp.InputOptions.ColumnsHaveTypes {
return fmt.Errorf("--timeSeriesTimeFields requires --columnsHaveTypes so mongoimport can validate the date field")
}

optionsWithFields.timeField = imp.IngestOptions.TimeSeriesTimeField
optionsWithFields.metaField = imp.IngestOptions.TimeSeriesMetaField
}

// parse UpsertFields, may set default mode to modeUpsert
if imp.IngestOptions.UpsertFields != "" {
if imp.IngestOptions.Mode == "" {
Expand Down Expand Up @@ -329,9 +357,9 @@ func (imp *MongoImport) ImportDocuments() (uint64, uint64, error) {

if imp.InputOptions.HeaderLine {
if imp.InputOptions.ColumnsHaveTypes {
err = inputReader.ReadAndValidateTypedHeader(ParsePG(imp.InputOptions.ParseGrace))
err = inputReader.ReadAndValidateTypedHeader(ParsePG(imp.InputOptions.ParseGrace), optionsWithFields)
} else {
err = inputReader.ReadAndValidateHeader()
err = inputReader.ReadAndValidateHeader(optionsWithFields)
}
if err != nil {
return 0, 0, err
Expand Down Expand Up @@ -386,6 +414,44 @@ func (imp *MongoImport) importDocuments(inputReader InputReader) (uint64, uint64
}
}

// Before importing documents, create the target collection as TimeSeries.
// Fail if the collection already exists.
if imp.IngestOptions.TimeSeriesTimeField != "" {
if !imp.IngestOptions.Drop {
collectionFilter := bson.D{}
collectionFilter = append(collectionFilter, primitive.E{"name", imp.ToolOptions.Namespace.Collection})
cursor, err := session.Database(imp.ToolOptions.DB).ListCollections(context.TODO(), collectionFilter)

if err != nil {
return 0, 0, err
}

if cursor.Next(context.TODO()) {
cursor.Close(context.TODO())
if !imp.IngestOptions.TimeSeriesExists {
return 0, 0, fmt.Errorf("error when inserting to a time-series collection, the collection must not exist, or --drop must be provided. Consider using --timeSeriesExists if the time-series collection was already created")
}
}
}

log.Logvf(log.Always, "creating time-series collection: %v.%v",
imp.ToolOptions.Namespace.DB,
imp.ToolOptions.Namespace.Collection)
timeseriesOptions := mopt.TimeSeries()
timeseriesOptions.SetTimeField(imp.IngestOptions.TimeSeriesTimeField)

if imp.IngestOptions.TimeSeriesMetaField != "" {
timeseriesOptions.SetMetaField(imp.IngestOptions.TimeSeriesMetaField)
}

if imp.IngestOptions.TimeSeriesGranularity != "" {
timeseriesOptions.SetGranularity(imp.IngestOptions.TimeSeriesGranularity)
}
collectionOptions := mopt.CreateCollection().SetTimeSeriesOptions(timeseriesOptions)
session.Database(imp.ToolOptions.DB).CreateCollection(context.TODO(), imp.ToolOptions.Namespace.Collection, collectionOptions)

}

readDocs := make(chan bson.D, workerBufferSize)
processingErrChan := make(chan error)
ordered := imp.IngestOptions.MaintainInsertionOrder
Expand Down Expand Up @@ -570,6 +636,10 @@ func (imp *MongoImport) getInputReader(in io.Reader) (InputReader, error) {

// header fields validation can only happen once we have an input reader
if !imp.InputOptions.HeaderLine {
if err = ValidateOptionDependentFields(colSpecs, optionsWithFields); err != nil {
return nil, err
}

if err = validateReaderFields(ColumnNames(colSpecs), imp.InputOptions.UseArrayIndexFields); err != nil {
return nil, err
}
Expand Down
Loading