From 559ec99daa2f9c5e7aff8ed15f5a2576f0cfc22f Mon Sep 17 00:00:00 2001 From: Victor Perez Date: Tue, 3 Dec 2024 13:31:34 +0100 Subject: [PATCH 01/17] move CopyOptions to an Option --- pkg/csvcopy/csvcopy.go | 7 ------- pkg/csvcopy/options.go | 25 ++++++++++++++++++++++--- 2 files changed, 22 insertions(+), 10 deletions(-) diff --git a/pkg/csvcopy/csvcopy.go b/pkg/csvcopy/csvcopy.go index b910109..32990d8 100644 --- a/pkg/csvcopy/csvcopy.go +++ b/pkg/csvcopy/csvcopy.go @@ -26,8 +26,6 @@ type Result struct { RowRate float64 } -var HeaderInCopyOptionsError = errors.New("'HEADER' in copyOptions") - type Copier struct { connString string schemaName string @@ -53,7 +51,6 @@ func NewCopier( connString string, schemaName string, tableName string, - copyOptions string, splitCharacter string, quoteCharacter string, escapeCharacter string, @@ -68,9 +65,6 @@ func NewCopier( verbose bool, options ...Option, ) (*Copier, error) { - if strings.Contains(strings.ToUpper(copyOptions), "HEADER") { - return nil, HeaderInCopyOptionsError - } if len(quoteCharacter) > 1 { return nil, errors.New("provided --quote must be a single-byte character") @@ -96,7 +90,6 @@ func NewCopier( connString: connString, schemaName: schemaName, tableName: tableName, - copyOptions: copyOptions, splitCharacter: splitCharacter, quoteCharacter: quoteCharacter, escapeCharacter: escapeCharacter, diff --git a/pkg/csvcopy/options.go b/pkg/csvcopy/options.go index 779dfbc..7d5fdaf 100644 --- a/pkg/csvcopy/options.go +++ b/pkg/csvcopy/options.go @@ -1,6 +1,11 @@ package csvcopy -type Option func(c *Copier) +import ( + "errors" + "strings" +) + +type Option func(c *Copier) error type Logger interface { Infof(msg string, args ...interface{}) @@ -12,15 +17,29 @@ func (l *noopLogger) Infof(msg string, args ...interface{}) {} // WithLogger sets the logger where the application will print debug messages func WithLogger(logger Logger) Option { - return func(c *Copier) { + return func(c *Copier) error { c.logger = logger + return nil } } // WithReportingFunction sets the function that will be called at // reportingPeriod with information about the copy progress func WithReportingFunction(f ReportFunc) Option { - return func(c *Copier) { + return func(c *Copier) error { c.reportingFunction = f + return nil + } +} + +var HeaderInCopyOptionsError = errors.New("'HEADER' in copyOptions") + +func WithCopyOptions(opt string) Option { + return func(c *Copier) error { + if strings.Contains(strings.ToUpper(opt), "HEADER") { + return HeaderInCopyOptionsError + } + c.copyOptions = opt + return nil } } From 2720350d56314b223c03d9cf9bd632f2c22fa34e Mon Sep 17 00:00:00 2001 From: Victor Perez Date: Tue, 3 Dec 2024 13:33:11 +0100 Subject: [PATCH 02/17] Move split character --- pkg/csvcopy/csvcopy.go | 2 -- pkg/csvcopy/options.go | 10 ++++++++++ 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/pkg/csvcopy/csvcopy.go b/pkg/csvcopy/csvcopy.go index 32990d8..9f06cb4 100644 --- a/pkg/csvcopy/csvcopy.go +++ b/pkg/csvcopy/csvcopy.go @@ -51,7 +51,6 @@ func NewCopier( connString string, schemaName string, tableName string, - splitCharacter string, quoteCharacter string, escapeCharacter string, columns string, @@ -90,7 +89,6 @@ func NewCopier( connString: connString, schemaName: schemaName, tableName: tableName, - splitCharacter: splitCharacter, quoteCharacter: quoteCharacter, escapeCharacter: escapeCharacter, columns: columns, diff --git a/pkg/csvcopy/options.go b/pkg/csvcopy/options.go index 7d5fdaf..c46369b 100644 --- a/pkg/csvcopy/options.go +++ b/pkg/csvcopy/options.go @@ -43,3 +43,13 @@ func WithCopyOptions(opt string) Option { return nil } } + +func WithSplitCharacter(splitCharacter string) Option { + return func(c *Copier) error { + if len(splitCharacter) != 1 { + return errors.New("split character must be a single-byte character") + } + c.splitCharacter = splitCharacter + return nil + } +} From 49b0a3d00c32f90da82d72fb4694b79dec9e5862 Mon Sep 17 00:00:00 2001 From: Victor Perez Date: Tue, 3 Dec 2024 13:34:15 +0100 Subject: [PATCH 03/17] Move quote character --- pkg/csvcopy/csvcopy.go | 7 ------- pkg/csvcopy/options.go | 11 +++++++++++ 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/pkg/csvcopy/csvcopy.go b/pkg/csvcopy/csvcopy.go index 9f06cb4..8957ef7 100644 --- a/pkg/csvcopy/csvcopy.go +++ b/pkg/csvcopy/csvcopy.go @@ -51,7 +51,6 @@ func NewCopier( connString string, schemaName string, tableName string, - quoteCharacter string, escapeCharacter string, columns string, skipHeader bool, @@ -64,11 +63,6 @@ func NewCopier( verbose bool, options ...Option, ) (*Copier, error) { - - if len(quoteCharacter) > 1 { - return nil, errors.New("provided --quote must be a single-byte character") - } - if len(escapeCharacter) > 1 { return nil, errors.New("provided --escape must be a single-byte character") } @@ -89,7 +83,6 @@ func NewCopier( connString: connString, schemaName: schemaName, tableName: tableName, - quoteCharacter: quoteCharacter, escapeCharacter: escapeCharacter, columns: columns, workers: workers, diff --git a/pkg/csvcopy/options.go b/pkg/csvcopy/options.go index c46369b..920ef8d 100644 --- a/pkg/csvcopy/options.go +++ b/pkg/csvcopy/options.go @@ -53,3 +53,14 @@ func WithSplitCharacter(splitCharacter string) Option { return nil } } + +func WithQuoteCharacter(quoteCharacter string) Option { + return func(c *Copier) error { + if len(quoteCharacter) != 1 { + return errors.New("quote character must be a single-byte character") + } + + c.quoteCharacter = quoteCharacter + return nil + } +} From 40f1401646a2fdf2ae3d4967b7df5b1c3be44022 Mon Sep 17 00:00:00 2001 From: Victor Perez Date: Tue, 3 Dec 2024 13:35:13 +0100 Subject: [PATCH 04/17] move escapeCharacter --- pkg/csvcopy/csvcopy.go | 6 ------ pkg/csvcopy/options.go | 11 +++++++++++ 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/pkg/csvcopy/csvcopy.go b/pkg/csvcopy/csvcopy.go index 8957ef7..0649dfb 100644 --- a/pkg/csvcopy/csvcopy.go +++ b/pkg/csvcopy/csvcopy.go @@ -51,7 +51,6 @@ func NewCopier( connString string, schemaName string, tableName string, - escapeCharacter string, columns string, skipHeader bool, headerLinesCnt int, @@ -63,10 +62,6 @@ func NewCopier( verbose bool, options ...Option, ) (*Copier, error) { - if len(escapeCharacter) > 1 { - return nil, errors.New("provided --escape must be a single-byte character") - } - if headerLinesCnt <= 0 { return nil, fmt.Errorf( "provided --header-line-count (%d) must be greater than 0\n", @@ -83,7 +78,6 @@ func NewCopier( connString: connString, schemaName: schemaName, tableName: tableName, - escapeCharacter: escapeCharacter, columns: columns, workers: workers, limit: limit, diff --git a/pkg/csvcopy/options.go b/pkg/csvcopy/options.go index 920ef8d..90921a1 100644 --- a/pkg/csvcopy/options.go +++ b/pkg/csvcopy/options.go @@ -64,3 +64,14 @@ func WithQuoteCharacter(quoteCharacter string) Option { return nil } } + +func WithEscapeCharacter(escapeCharacter string) Option { + return func(c *Copier) error { + if len(escapeCharacter) > 1 { + return errors.New("provided escape character must be a single-byte character") + } + + c.escapeCharacter = escapeCharacter + return nil + } +} From 33a2682f998bf9678fb02b0ad82e9ff5c8e010fc Mon Sep 17 00:00:00 2001 From: Victor Perez Date: Tue, 3 Dec 2024 13:36:02 +0100 Subject: [PATCH 05/17] move columns --- pkg/csvcopy/csvcopy.go | 2 -- pkg/csvcopy/options.go | 7 +++++++ 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/pkg/csvcopy/csvcopy.go b/pkg/csvcopy/csvcopy.go index 0649dfb..1ec5091 100644 --- a/pkg/csvcopy/csvcopy.go +++ b/pkg/csvcopy/csvcopy.go @@ -51,7 +51,6 @@ func NewCopier( connString string, schemaName string, tableName string, - columns string, skipHeader bool, headerLinesCnt int, workers int, @@ -78,7 +77,6 @@ func NewCopier( connString: connString, schemaName: schemaName, tableName: tableName, - columns: columns, workers: workers, limit: limit, batchSize: batchSize, diff --git a/pkg/csvcopy/options.go b/pkg/csvcopy/options.go index 90921a1..55ca581 100644 --- a/pkg/csvcopy/options.go +++ b/pkg/csvcopy/options.go @@ -75,3 +75,10 @@ func WithEscapeCharacter(escapeCharacter string) Option { return nil } } + +func WithColumns(columns string) Option { + return func(c *Copier) error { + c.columns = columns + return nil + } +} From 0c8d7a9829281b377e127fff8d65cb2bb3f529c2 Mon Sep 17 00:00:00 2001 From: Victor Perez Date: Tue, 3 Dec 2024 13:40:28 +0100 Subject: [PATCH 06/17] move skip header --- pkg/csvcopy/csvcopy.go | 19 ++----------------- pkg/csvcopy/options.go | 23 +++++++++++++++++++++++ 2 files changed, 25 insertions(+), 17 deletions(-) diff --git a/pkg/csvcopy/csvcopy.go b/pkg/csvcopy/csvcopy.go index 1ec5091..3555d69 100644 --- a/pkg/csvcopy/csvcopy.go +++ b/pkg/csvcopy/csvcopy.go @@ -51,8 +51,6 @@ func NewCopier( connString string, schemaName string, tableName string, - skipHeader bool, - headerLinesCnt int, workers int, limit int64, batchSize int, @@ -61,18 +59,6 @@ func NewCopier( verbose bool, options ...Option, ) (*Copier, error) { - if headerLinesCnt <= 0 { - return nil, fmt.Errorf( - "provided --header-line-count (%d) must be greater than 0\n", - headerLinesCnt, - ) - } - - skip := 0 - if skipHeader { - skip = headerLinesCnt - } - copier := &Copier{ connString: connString, schemaName: schemaName, @@ -82,7 +68,6 @@ func NewCopier( batchSize: batchSize, logBatches: logBatches, verbose: verbose, - skip: skip, logger: &noopLogger{}, rowCount: 0, reportingPeriod: reportingPeriod, @@ -92,8 +77,8 @@ func NewCopier( o(copier) } - if skip > 0 && verbose { - copier.logger.Infof("Skipping the first %d lines of the input.", headerLinesCnt) + if copier.skip > 0 && verbose { + copier.logger.Infof("Skipping the first %d lines of the input.", copier.skip) } if copier.reportingFunction == nil { diff --git a/pkg/csvcopy/options.go b/pkg/csvcopy/options.go index 55ca581..d574e55 100644 --- a/pkg/csvcopy/options.go +++ b/pkg/csvcopy/options.go @@ -82,3 +82,26 @@ func WithColumns(columns string) Option { return nil } } + +func WithSkipHeader(skipHeader bool) Option { + return func(c *Copier) error { + if c.skip != 0 { + return errors.New("skip is already set. Use SkipHeader or SkipHeaderCount") + } + c.skip = 1 + return nil + } +} + +func WithSkipHeaderCount(headerLineCount int) Option { + return func(c *Copier) error { + if c.skip != 0 { + return errors.New("skip is already set. Use SkipHeader or SkipHeaderCount") + } + if headerLineCount == 0 { + return errors.New("header line count must be greater than zero") + } + c.skip = headerLineCount + return nil + } +} From 3c19ca82ca4bc395c763d1e939b3115b14bdd0c2 Mon Sep 17 00:00:00 2001 From: Victor Perez Date: Tue, 3 Dec 2024 13:41:30 +0100 Subject: [PATCH 07/17] move workers --- pkg/csvcopy/csvcopy.go | 2 -- pkg/csvcopy/options.go | 10 ++++++++++ 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/pkg/csvcopy/csvcopy.go b/pkg/csvcopy/csvcopy.go index 3555d69..4832a4e 100644 --- a/pkg/csvcopy/csvcopy.go +++ b/pkg/csvcopy/csvcopy.go @@ -51,7 +51,6 @@ func NewCopier( connString string, schemaName string, tableName string, - workers int, limit int64, batchSize int, logBatches bool, @@ -63,7 +62,6 @@ func NewCopier( connString: connString, schemaName: schemaName, tableName: tableName, - workers: workers, limit: limit, batchSize: batchSize, logBatches: logBatches, diff --git a/pkg/csvcopy/options.go b/pkg/csvcopy/options.go index d574e55..d2cad6b 100644 --- a/pkg/csvcopy/options.go +++ b/pkg/csvcopy/options.go @@ -105,3 +105,13 @@ func WithSkipHeaderCount(headerLineCount int) Option { return nil } } + +func WithWorkers(workers int) Option { + return func(c *Copier) error { + if workers <= 0 { + return errors.New("workers must be greater than zero") + } + c.workers = workers + return nil + } +} From 2b21e5b6bda38b38f4369bbc5b0a01521227d653 Mon Sep 17 00:00:00 2001 From: Victor Perez Date: Tue, 3 Dec 2024 13:42:31 +0100 Subject: [PATCH 08/17] move limit --- pkg/csvcopy/csvcopy.go | 2 -- pkg/csvcopy/options.go | 10 ++++++++++ 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/pkg/csvcopy/csvcopy.go b/pkg/csvcopy/csvcopy.go index 4832a4e..48a38a6 100644 --- a/pkg/csvcopy/csvcopy.go +++ b/pkg/csvcopy/csvcopy.go @@ -51,7 +51,6 @@ func NewCopier( connString string, schemaName string, tableName string, - limit int64, batchSize int, logBatches bool, reportingPeriod time.Duration, @@ -62,7 +61,6 @@ func NewCopier( connString: connString, schemaName: schemaName, tableName: tableName, - limit: limit, batchSize: batchSize, logBatches: logBatches, verbose: verbose, diff --git a/pkg/csvcopy/options.go b/pkg/csvcopy/options.go index d2cad6b..1eb563b 100644 --- a/pkg/csvcopy/options.go +++ b/pkg/csvcopy/options.go @@ -115,3 +115,13 @@ func WithWorkers(workers int) Option { return nil } } + +func WithLimit(limit int64) Option { + return func(c *Copier) error { + if limit < 0 { + return errors.New("limit must be greater than zero") + } + c.limit = limit + return nil + } +} From 8c559bc1858243455839fc4e3c5b709cc0af084f Mon Sep 17 00:00:00 2001 From: Victor Perez Date: Tue, 3 Dec 2024 13:43:29 +0100 Subject: [PATCH 09/17] move batch size --- pkg/csvcopy/csvcopy.go | 2 -- pkg/csvcopy/options.go | 10 ++++++++++ 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/pkg/csvcopy/csvcopy.go b/pkg/csvcopy/csvcopy.go index 48a38a6..7fa442c 100644 --- a/pkg/csvcopy/csvcopy.go +++ b/pkg/csvcopy/csvcopy.go @@ -51,7 +51,6 @@ func NewCopier( connString string, schemaName string, tableName string, - batchSize int, logBatches bool, reportingPeriod time.Duration, verbose bool, @@ -61,7 +60,6 @@ func NewCopier( connString: connString, schemaName: schemaName, tableName: tableName, - batchSize: batchSize, logBatches: logBatches, verbose: verbose, logger: &noopLogger{}, diff --git a/pkg/csvcopy/options.go b/pkg/csvcopy/options.go index 1eb563b..da6b847 100644 --- a/pkg/csvcopy/options.go +++ b/pkg/csvcopy/options.go @@ -125,3 +125,13 @@ func WithLimit(limit int64) Option { return nil } } + +func WithBatchSize(batchSize int) Option { + return func(c *Copier) error { + if batchSize < 0 { + return errors.New("batch size must be greater than zero") + } + c.batchSize = batchSize + return nil + } +} From 597cb9c19e0cadfcfa1c0ea9e5244b0c123b9a4a Mon Sep 17 00:00:00 2001 From: Victor Perez Date: Tue, 3 Dec 2024 13:44:15 +0100 Subject: [PATCH 10/17] move log batches --- pkg/csvcopy/csvcopy.go | 2 -- pkg/csvcopy/options.go | 7 +++++++ 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/pkg/csvcopy/csvcopy.go b/pkg/csvcopy/csvcopy.go index 7fa442c..bfe4ddf 100644 --- a/pkg/csvcopy/csvcopy.go +++ b/pkg/csvcopy/csvcopy.go @@ -51,7 +51,6 @@ func NewCopier( connString string, schemaName string, tableName string, - logBatches bool, reportingPeriod time.Duration, verbose bool, options ...Option, @@ -60,7 +59,6 @@ func NewCopier( connString: connString, schemaName: schemaName, tableName: tableName, - logBatches: logBatches, verbose: verbose, logger: &noopLogger{}, rowCount: 0, diff --git a/pkg/csvcopy/options.go b/pkg/csvcopy/options.go index da6b847..5f0d05d 100644 --- a/pkg/csvcopy/options.go +++ b/pkg/csvcopy/options.go @@ -135,3 +135,10 @@ func WithBatchSize(batchSize int) Option { return nil } } + +func WithLogBatches(logBatches bool) Option { + return func(c *Copier) error { + c.logBatches = logBatches + return nil + } +} From 7a723364d2da7bb3dd470d0e4097d2994e9d0fed Mon Sep 17 00:00:00 2001 From: Victor Perez Date: Tue, 3 Dec 2024 13:46:16 +0100 Subject: [PATCH 11/17] move reporting period --- pkg/csvcopy/csvcopy.go | 14 ++++++-------- pkg/csvcopy/options.go | 12 ++++++++++++ 2 files changed, 18 insertions(+), 8 deletions(-) diff --git a/pkg/csvcopy/csvcopy.go b/pkg/csvcopy/csvcopy.go index bfe4ddf..ddc870f 100644 --- a/pkg/csvcopy/csvcopy.go +++ b/pkg/csvcopy/csvcopy.go @@ -51,18 +51,16 @@ func NewCopier( connString string, schemaName string, tableName string, - reportingPeriod time.Duration, verbose bool, options ...Option, ) (*Copier, error) { copier := &Copier{ - connString: connString, - schemaName: schemaName, - tableName: tableName, - verbose: verbose, - logger: &noopLogger{}, - rowCount: 0, - reportingPeriod: reportingPeriod, + connString: connString, + schemaName: schemaName, + tableName: tableName, + verbose: verbose, + logger: &noopLogger{}, + rowCount: 0, } for _, o := range options { diff --git a/pkg/csvcopy/options.go b/pkg/csvcopy/options.go index 5f0d05d..2119ded 100644 --- a/pkg/csvcopy/options.go +++ b/pkg/csvcopy/options.go @@ -2,7 +2,9 @@ package csvcopy import ( "errors" + "fmt" "strings" + "time" ) type Option func(c *Copier) error @@ -32,6 +34,16 @@ func WithReportingFunction(f ReportFunc) Option { } } +func WithReportingPeriod(reportingPeriod time.Duration) Option { + return func(c *Copier) error { + if reportingPeriod <= 0 { + return fmt.Errorf("reporting period must be greater than zero") + } + c.reportingPeriod = reportingPeriod + return nil + } +} + var HeaderInCopyOptionsError = errors.New("'HEADER' in copyOptions") func WithCopyOptions(opt string) Option { From e74ad3bbe870db0cc84d818c01bddaad16f5752e Mon Sep 17 00:00:00 2001 From: Victor Perez Date: Tue, 3 Dec 2024 13:49:19 +0100 Subject: [PATCH 12/17] move schema name --- pkg/csvcopy/csvcopy.go | 6 +----- pkg/csvcopy/options.go | 14 ++++++++++++++ 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/pkg/csvcopy/csvcopy.go b/pkg/csvcopy/csvcopy.go index ddc870f..48ff1a5 100644 --- a/pkg/csvcopy/csvcopy.go +++ b/pkg/csvcopy/csvcopy.go @@ -49,16 +49,12 @@ type Copier struct { func NewCopier( connString string, - schemaName string, tableName string, - verbose bool, options ...Option, ) (*Copier, error) { copier := &Copier{ connString: connString, - schemaName: schemaName, tableName: tableName, - verbose: verbose, logger: &noopLogger{}, rowCount: 0, } @@ -67,7 +63,7 @@ func NewCopier( o(copier) } - if copier.skip > 0 && verbose { + if copier.skip > 0 && copier.verbose { copier.logger.Infof("Skipping the first %d lines of the input.", copier.skip) } diff --git a/pkg/csvcopy/options.go b/pkg/csvcopy/options.go index 2119ded..3a47060 100644 --- a/pkg/csvcopy/options.go +++ b/pkg/csvcopy/options.go @@ -154,3 +154,17 @@ func WithLogBatches(logBatches bool) Option { return nil } } + +func WithVerbose(verbose bool) Option { + return func(c *Copier) error { + c.verbose = verbose + return nil + } +} + +func WithSchemaName(schema string) Option { + return func(c *Copier) error { + c.schemaName = schema + return nil + } +} From 844eba8c7e23ee4cd9dde65048b1c717850a60fe Mon Sep 17 00:00:00 2001 From: Victor Perez Date: Tue, 3 Dec 2024 15:04:19 +0100 Subject: [PATCH 13/17] set defaults and cleanup --- cmd/timescaledb-parallel-copy/main.go | 36 ++++++++++++++++----------- pkg/csvcopy/csvcopy.go | 28 ++++++++++++++++----- pkg/csvcopy/csvcopy_test.go | 6 ++--- pkg/csvcopy/options.go | 4 +-- 4 files changed, 48 insertions(+), 26 deletions(-) diff --git a/cmd/timescaledb-parallel-copy/main.go b/cmd/timescaledb-parallel-copy/main.go index 9b0f5ca..6dd12bd 100644 --- a/cmd/timescaledb-parallel-copy/main.go +++ b/cmd/timescaledb-parallel-copy/main.go @@ -94,25 +94,31 @@ func main() { if dbName != "" { log.Fatalf("Error: Deprecated flag -db-name is being used. Update -connection to connect to the given database") } + opts := []csvcopy.Option{ + csvcopy.WithLogger(&csvCopierLogger{}), + csvcopy.WithSchemaName(schemaName), + csvcopy.WithCopyOptions(copyOptions), + csvcopy.WithSplitCharacter(splitCharacter), + csvcopy.WithQuoteCharacter(quoteCharacter), + csvcopy.WithEscapeCharacter(escapeCharacter), + csvcopy.WithColumns(columns), + csvcopy.WithWorkers(workers), + csvcopy.WithLimit(limit), + csvcopy.WithBatchSize(batchSize), + csvcopy.WithLogBatches(logBatches), + csvcopy.WithReportingPeriod(reportingPeriod), + csvcopy.WithVerbose(verbose), + } + if headerLinesCnt > 1 { + opts = append(opts, csvcopy.WithSkipHeaderCount(headerLinesCnt)) + } else if skipHeader { + opts = append(opts, csvcopy.WithSkipHeader(skipHeader)) + } copier, err := csvcopy.NewCopier( postgresConnect, - schemaName, tableName, - copyOptions, - splitCharacter, - quoteCharacter, - escapeCharacter, - columns, - skipHeader, - headerLinesCnt, - workers, - limit, - batchSize, - logBatches, - reportingPeriod, - verbose, - csvcopy.WithLogger(&csvCopierLogger{}), + opts..., ) if err != nil { if errors.Is(err, csvcopy.HeaderInCopyOptionsError) { diff --git a/pkg/csvcopy/csvcopy.go b/pkg/csvcopy/csvcopy.go index 48ff1a5..e5e5c2b 100644 --- a/pkg/csvcopy/csvcopy.go +++ b/pkg/csvcopy/csvcopy.go @@ -27,10 +27,13 @@ type Result struct { } type Copier struct { - connString string + connString string + tableName string + + copyOptions string + schemaName string - tableName string - copyOptions string + logger Logger splitCharacter string quoteCharacter string escapeCharacter string @@ -43,7 +46,6 @@ type Copier struct { reportingFunction ReportFunc verbose bool skip int - logger Logger rowCount int64 } @@ -55,8 +57,22 @@ func NewCopier( copier := &Copier{ connString: connString, tableName: tableName, - logger: &noopLogger{}, - rowCount: 0, + + // Defaults + schemaName: "public", + logger: &noopLogger{}, + copyOptions: "CSV", + splitCharacter: ",", + quoteCharacter: "", + escapeCharacter: "", + columns: "", + workers: 1, + limit: 0, + batchSize: 5000, + logBatches: false, + reportingPeriod: 0, + verbose: false, + skip: 0, } for _, o := range options { diff --git a/pkg/csvcopy/csvcopy_test.go b/pkg/csvcopy/csvcopy_test.go index 972c000..04d4aee 100644 --- a/pkg/csvcopy/csvcopy_test.go +++ b/pkg/csvcopy/csvcopy_test.go @@ -67,7 +67,7 @@ func TestWriteDataToCSV(t *testing.T) { writer.Flush() - copier, err := NewCopier(connStr, "public", "metrics", "CSV", ",", "", "", "device_id,label,value", false, 1, 1, 0, 5000, true, 0, false) + copier, err := NewCopier(connStr, "metrics", WithColumns("device_id,label,value")) require.NoError(t, err) reader, err := os.Open(tmpfile.Name()) @@ -154,7 +154,7 @@ func TestErrorAtRow(t *testing.T) { writer.Flush() - copier, err := NewCopier(connStr, "public", "metrics", "CSV", ",", "", "", "device_id,label,value", false, 1, 1, 0, 2, true, 0, false) + copier, err := NewCopier(connStr, "metrics", WithColumns("device_id,label,value")) require.NoError(t, err) reader, err := os.Open(tmpfile.Name()) require.NoError(t, err) @@ -223,7 +223,7 @@ func TestWriteReportProgress(t *testing.T) { require.LessOrEqual(t, r.RowCount, int64(2)) } - copier, err := NewCopier(connStr, "public", "metrics", "CSV", ",", "", "", "device_id,label,value", false, 1, 1, 0, 5000, true, 100*time.Millisecond, false, WithReportingFunction(reportF)) + copier, err := NewCopier(connStr, "metrics", WithColumns("device_id,label,value"), WithReportingPeriod(100*time.Millisecond), WithReportingFunction(reportF)) require.NoError(t, err) reader, err := os.Open(tmpfile.Name()) diff --git a/pkg/csvcopy/options.go b/pkg/csvcopy/options.go index 3a47060..c2b6b40 100644 --- a/pkg/csvcopy/options.go +++ b/pkg/csvcopy/options.go @@ -58,7 +58,7 @@ func WithCopyOptions(opt string) Option { func WithSplitCharacter(splitCharacter string) Option { return func(c *Copier) error { - if len(splitCharacter) != 1 { + if len(splitCharacter) > 1 { return errors.New("split character must be a single-byte character") } c.splitCharacter = splitCharacter @@ -68,7 +68,7 @@ func WithSplitCharacter(splitCharacter string) Option { func WithQuoteCharacter(quoteCharacter string) Option { return func(c *Copier) error { - if len(quoteCharacter) != 1 { + if len(quoteCharacter) > 1 { return errors.New("quote character must be a single-byte character") } From ce85138d89ef823fc30e37373e760c6897386b5a Mon Sep 17 00:00:00 2001 From: Victor Perez Date: Tue, 3 Dec 2024 15:04:58 +0100 Subject: [PATCH 14/17] lint --- pkg/csvcopy/csvcopy.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pkg/csvcopy/csvcopy.go b/pkg/csvcopy/csvcopy.go index e5e5c2b..a3c06f4 100644 --- a/pkg/csvcopy/csvcopy.go +++ b/pkg/csvcopy/csvcopy.go @@ -76,7 +76,10 @@ func NewCopier( } for _, o := range options { - o(copier) + err := o(copier) + if err != nil { + return nil, fmt.Errorf("Error processing option, %T, %w", o, err) + } } if copier.skip > 0 && copier.verbose { From 4a0a9a7c28b35c6de2c49c4cc963c4d84cec48f6 Mon Sep 17 00:00:00 2001 From: Victor Perez Date: Tue, 3 Dec 2024 15:34:36 +0100 Subject: [PATCH 15/17] documentation --- pkg/csvcopy/csvcopy.go | 1 + pkg/csvcopy/options.go | 20 +++++++++++++++++++- 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/pkg/csvcopy/csvcopy.go b/pkg/csvcopy/csvcopy.go index a3c06f4..b9ad4fa 100644 --- a/pkg/csvcopy/csvcopy.go +++ b/pkg/csvcopy/csvcopy.go @@ -261,6 +261,7 @@ func (c *Copier) processBatches(ctx context.Context, ch chan batch.Batch) (err e } else { copyCmd = fmt.Sprintf("COPY %s FROM STDIN WITH DELIMITER %s %s %s", c.getFullTableName(), delimStr, quotes, c.copyOptions) } + c.logger.Infof("Copy command: %s", copyCmd) for { if ctx.Err() != nil { diff --git a/pkg/csvcopy/options.go b/pkg/csvcopy/options.go index c2b6b40..2d14fee 100644 --- a/pkg/csvcopy/options.go +++ b/pkg/csvcopy/options.go @@ -26,14 +26,18 @@ func WithLogger(logger Logger) Option { } // WithReportingFunction sets the function that will be called at -// reportingPeriod with information about the copy progress +// ReportingPeriod with information about the copy progress func WithReportingFunction(f ReportFunc) Option { return func(c *Copier) error { + if c.reportingPeriod == 0 { + return fmt.Errorf("reporting period must be set before the reporting function") + } c.reportingFunction = f return nil } } +// WithReportingPeriod sets how often the reporting function will be called. func WithReportingPeriod(reportingPeriod time.Duration) Option { return func(c *Copier) error { if reportingPeriod <= 0 { @@ -46,6 +50,8 @@ func WithReportingPeriod(reportingPeriod time.Duration) Option { var HeaderInCopyOptionsError = errors.New("'HEADER' in copyOptions") +// WithCopyOptions appends the COPY options for the COPY operation. +// By default is 'CSV' func WithCopyOptions(opt string) Option { return func(c *Copier) error { if strings.Contains(strings.ToUpper(opt), "HEADER") { @@ -56,6 +62,7 @@ func WithCopyOptions(opt string) Option { } } +// WithSplitCharacter sets the COPY option DELIMITER func WithSplitCharacter(splitCharacter string) Option { return func(c *Copier) error { if len(splitCharacter) > 1 { @@ -66,6 +73,7 @@ func WithSplitCharacter(splitCharacter string) Option { } } +// WithQuoteCharacter sets the COPY option QUOTE func WithQuoteCharacter(quoteCharacter string) Option { return func(c *Copier) error { if len(quoteCharacter) > 1 { @@ -77,6 +85,7 @@ func WithQuoteCharacter(quoteCharacter string) Option { } } +// WithEscapeCharacter sets the COPY option ESCAPE func WithEscapeCharacter(escapeCharacter string) Option { return func(c *Copier) error { if len(escapeCharacter) > 1 { @@ -88,6 +97,7 @@ func WithEscapeCharacter(escapeCharacter string) Option { } } +// WithColumns accepts a list of comma separated values for the csv columns func WithColumns(columns string) Option { return func(c *Copier) error { c.columns = columns @@ -95,6 +105,7 @@ func WithColumns(columns string) Option { } } +// WithSkipHeader is set, skips the first row of the csv file func WithSkipHeader(skipHeader bool) Option { return func(c *Copier) error { if c.skip != 0 { @@ -105,6 +116,7 @@ func WithSkipHeader(skipHeader bool) Option { } } +// WithSkipHeaderCount sets the number of lines to skip at the beginning of the file func WithSkipHeaderCount(headerLineCount int) Option { return func(c *Copier) error { if c.skip != 0 { @@ -118,6 +130,7 @@ func WithSkipHeaderCount(headerLineCount int) Option { } } +// WithWorkers sets the number of workers to use while processing the file func WithWorkers(workers int) Option { return func(c *Copier) error { if workers <= 0 { @@ -128,6 +141,7 @@ func WithWorkers(workers int) Option { } } +// WithLimit limits the number of imported rows func WithLimit(limit int64) Option { return func(c *Copier) error { if limit < 0 { @@ -138,6 +152,7 @@ func WithLimit(limit int64) Option { } } +// WithBatchSize sets the rows processed on each batch func WithBatchSize(batchSize int) Option { return func(c *Copier) error { if batchSize < 0 { @@ -148,6 +163,7 @@ func WithBatchSize(batchSize int) Option { } } +// WithLogBatches prints a line for every processed batch func WithLogBatches(logBatches bool) Option { return func(c *Copier) error { c.logBatches = logBatches @@ -155,6 +171,7 @@ func WithLogBatches(logBatches bool) Option { } } +// WithVerbose enables logging func WithVerbose(verbose bool) Option { return func(c *Copier) error { c.verbose = verbose @@ -162,6 +179,7 @@ func WithVerbose(verbose bool) Option { } } +// WithSchemaName sets the schema name func WithSchemaName(schema string) Option { return func(c *Copier) error { c.schemaName = schema From c278668adf1327d9016e2d4ae317d67ae25bf71a Mon Sep 17 00:00:00 2001 From: Victor Perez Date: Thu, 5 Dec 2024 17:39:24 +0100 Subject: [PATCH 16/17] allow 0 value --- pkg/csvcopy/options.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/csvcopy/options.go b/pkg/csvcopy/options.go index 2d14fee..5648853 100644 --- a/pkg/csvcopy/options.go +++ b/pkg/csvcopy/options.go @@ -40,8 +40,8 @@ func WithReportingFunction(f ReportFunc) Option { // WithReportingPeriod sets how often the reporting function will be called. func WithReportingPeriod(reportingPeriod time.Duration) Option { return func(c *Copier) error { - if reportingPeriod <= 0 { - return fmt.Errorf("reporting period must be greater than zero") + if reportingPeriod < 0 { + return fmt.Errorf("reporting period must be equal or greater than zero") } c.reportingPeriod = reportingPeriod return nil From e312fa8ca3c4f604b18b0a531dc6a67d500343dd Mon Sep 17 00:00:00 2001 From: Victor Perez Date: Tue, 10 Dec 2024 18:54:07 +0100 Subject: [PATCH 17/17] keep header flags behaviour --- cmd/timescaledb-parallel-copy/main.go | 9 +++++---- pkg/csvcopy/options.go | 2 +- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/cmd/timescaledb-parallel-copy/main.go b/cmd/timescaledb-parallel-copy/main.go index 6dd12bd..4495d7f 100644 --- a/cmd/timescaledb-parallel-copy/main.go +++ b/cmd/timescaledb-parallel-copy/main.go @@ -109,10 +109,11 @@ func main() { csvcopy.WithReportingPeriod(reportingPeriod), csvcopy.WithVerbose(verbose), } - if headerLinesCnt > 1 { - opts = append(opts, csvcopy.WithSkipHeaderCount(headerLinesCnt)) - } else if skipHeader { - opts = append(opts, csvcopy.WithSkipHeader(skipHeader)) + + if skipHeader { + opts = append(opts, + csvcopy.WithSkipHeaderCount(headerLinesCnt), + ) } copier, err := csvcopy.NewCopier( diff --git a/pkg/csvcopy/options.go b/pkg/csvcopy/options.go index 5648853..bc6328e 100644 --- a/pkg/csvcopy/options.go +++ b/pkg/csvcopy/options.go @@ -122,7 +122,7 @@ func WithSkipHeaderCount(headerLineCount int) Option { if c.skip != 0 { return errors.New("skip is already set. Use SkipHeader or SkipHeaderCount") } - if headerLineCount == 0 { + if headerLineCount <= 0 { return errors.New("header line count must be greater than zero") } c.skip = headerLineCount