Skip to content

Commit

Permalink
Merge pull request #101 from timescale/vperez/use-options-pattern
Browse files Browse the repository at this point in the history
Use the options pattern
  • Loading branch information
MetalBlueberry authored Dec 10, 2024
2 parents 0357e2e + e312fa8 commit 464a7be
Show file tree
Hide file tree
Showing 4 changed files with 221 additions and 85 deletions.
37 changes: 22 additions & 15 deletions cmd/timescaledb-parallel-copy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,25 +94,32 @@ func main() {
if dbName != "" {
log.Fatalf("Error: Deprecated flag -db-name is being used. Update -connection to connect to the given database")
}
opts := []csvcopy.Option{
csvcopy.WithLogger(&csvCopierLogger{}),
csvcopy.WithSchemaName(schemaName),
csvcopy.WithCopyOptions(copyOptions),
csvcopy.WithSplitCharacter(splitCharacter),
csvcopy.WithQuoteCharacter(quoteCharacter),
csvcopy.WithEscapeCharacter(escapeCharacter),
csvcopy.WithColumns(columns),
csvcopy.WithWorkers(workers),
csvcopy.WithLimit(limit),
csvcopy.WithBatchSize(batchSize),
csvcopy.WithLogBatches(logBatches),
csvcopy.WithReportingPeriod(reportingPeriod),
csvcopy.WithVerbose(verbose),
}

if skipHeader {
opts = append(opts,
csvcopy.WithSkipHeaderCount(headerLinesCnt),
)
}

copier, err := csvcopy.NewCopier(
postgresConnect,
schemaName,
tableName,
copyOptions,
splitCharacter,
quoteCharacter,
escapeCharacter,
columns,
skipHeader,
headerLinesCnt,
workers,
limit,
batchSize,
logBatches,
reportingPeriod,
verbose,
csvcopy.WithLogger(&csvCopierLogger{}),
opts...,
)
if err != nil {
if errors.Is(err, csvcopy.HeaderInCopyOptionsError) {
Expand Down
93 changes: 30 additions & 63 deletions pkg/csvcopy/csvcopy.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,14 @@ type Result struct {
RowRate float64
}

var HeaderInCopyOptionsError = errors.New("'HEADER' in copyOptions")

type Copier struct {
connString string
connString string
tableName string

copyOptions string

schemaName string
tableName string
copyOptions string
logger Logger
splitCharacter string
quoteCharacter string
escapeCharacter string
Expand All @@ -45,79 +46,44 @@ type Copier struct {
reportingFunction ReportFunc
verbose bool
skip int
logger Logger
rowCount int64
}

func NewCopier(
connString string,
schemaName string,
tableName string,
copyOptions string,
splitCharacter string,
quoteCharacter string,
escapeCharacter string,
columns string,
skipHeader bool,
headerLinesCnt int,
workers int,
limit int64,
batchSize int,
logBatches bool,
reportingPeriod time.Duration,
verbose bool,
options ...Option,
) (*Copier, error) {
if strings.Contains(strings.ToUpper(copyOptions), "HEADER") {
return nil, HeaderInCopyOptionsError
}

if len(quoteCharacter) > 1 {
return nil, errors.New("provided --quote must be a single-byte character")
}

if len(escapeCharacter) > 1 {
return nil, errors.New("provided --escape must be a single-byte character")
}

if headerLinesCnt <= 0 {
return nil, fmt.Errorf(
"provided --header-line-count (%d) must be greater than 0\n",
headerLinesCnt,
)
}

skip := 0
if skipHeader {
skip = headerLinesCnt
}

copier := &Copier{
connString: connString,
schemaName: schemaName,
tableName: tableName,
copyOptions: copyOptions,
splitCharacter: splitCharacter,
quoteCharacter: quoteCharacter,
escapeCharacter: escapeCharacter,
columns: columns,
workers: workers,
limit: limit,
batchSize: batchSize,
logBatches: logBatches,
verbose: verbose,
skip: skip,
connString: connString,
tableName: tableName,

// Defaults
schemaName: "public",
logger: &noopLogger{},
rowCount: 0,
reportingPeriod: reportingPeriod,
copyOptions: "CSV",
splitCharacter: ",",
quoteCharacter: "",
escapeCharacter: "",
columns: "",
workers: 1,
limit: 0,
batchSize: 5000,
logBatches: false,
reportingPeriod: 0,
verbose: false,
skip: 0,
}

for _, o := range options {
o(copier)
err := o(copier)
if err != nil {
return nil, fmt.Errorf("Error processing option, %T, %w", o, err)
}
}

if skip > 0 && verbose {
copier.logger.Infof("Skipping the first %d lines of the input.", headerLinesCnt)
if copier.skip > 0 && copier.verbose {
copier.logger.Infof("Skipping the first %d lines of the input.", copier.skip)
}

if copier.reportingFunction == nil {
Expand Down Expand Up @@ -295,6 +261,7 @@ func (c *Copier) processBatches(ctx context.Context, ch chan batch.Batch) (err e
} else {
copyCmd = fmt.Sprintf("COPY %s FROM STDIN WITH DELIMITER %s %s %s", c.getFullTableName(), delimStr, quotes, c.copyOptions)
}
c.logger.Infof("Copy command: %s", copyCmd)

for {
if ctx.Err() != nil {
Expand Down
6 changes: 3 additions & 3 deletions pkg/csvcopy/csvcopy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func TestWriteDataToCSV(t *testing.T) {

writer.Flush()

copier, err := NewCopier(connStr, "public", "metrics", "CSV", ",", "", "", "device_id,label,value", false, 1, 1, 0, 5000, true, 0, false)
copier, err := NewCopier(connStr, "metrics", WithColumns("device_id,label,value"))
require.NoError(t, err)

reader, err := os.Open(tmpfile.Name())
Expand Down Expand Up @@ -154,7 +154,7 @@ func TestErrorAtRow(t *testing.T) {

writer.Flush()

copier, err := NewCopier(connStr, "public", "metrics", "CSV", ",", "", "", "device_id,label,value", false, 1, 1, 0, 2, true, 0, false)
copier, err := NewCopier(connStr, "metrics", WithColumns("device_id,label,value"))
require.NoError(t, err)
reader, err := os.Open(tmpfile.Name())
require.NoError(t, err)
Expand Down Expand Up @@ -223,7 +223,7 @@ func TestWriteReportProgress(t *testing.T) {
require.LessOrEqual(t, r.RowCount, int64(2))
}

copier, err := NewCopier(connStr, "public", "metrics", "CSV", ",", "", "", "device_id,label,value", false, 1, 1, 0, 5000, true, 100*time.Millisecond, false, WithReportingFunction(reportF))
copier, err := NewCopier(connStr, "metrics", WithColumns("device_id,label,value"), WithReportingPeriod(100*time.Millisecond), WithReportingFunction(reportF))
require.NoError(t, err)

reader, err := os.Open(tmpfile.Name())
Expand Down
170 changes: 166 additions & 4 deletions pkg/csvcopy/options.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
package csvcopy

type Option func(c *Copier)
import (
"errors"
"fmt"
"strings"
"time"
)

type Option func(c *Copier) error

type Logger interface {
Infof(msg string, args ...interface{})
Expand All @@ -12,15 +19,170 @@ func (l *noopLogger) Infof(msg string, args ...interface{}) {}

// WithLogger sets the logger where the application will print debug messages
func WithLogger(logger Logger) Option {
return func(c *Copier) {
return func(c *Copier) error {
c.logger = logger
return nil
}
}

// WithReportingFunction sets the function that will be called at
// reportingPeriod with information about the copy progress
// ReportingPeriod with information about the copy progress
func WithReportingFunction(f ReportFunc) Option {
return func(c *Copier) {
return func(c *Copier) error {
if c.reportingPeriod == 0 {
return fmt.Errorf("reporting period must be set before the reporting function")
}
c.reportingFunction = f
return nil
}
}

// WithReportingPeriod sets how often the reporting function will be called.
func WithReportingPeriod(reportingPeriod time.Duration) Option {
return func(c *Copier) error {
if reportingPeriod < 0 {
return fmt.Errorf("reporting period must be equal or greater than zero")
}
c.reportingPeriod = reportingPeriod
return nil
}
}

var HeaderInCopyOptionsError = errors.New("'HEADER' in copyOptions")

// WithCopyOptions appends the COPY options for the COPY operation.
// By default is 'CSV'
func WithCopyOptions(opt string) Option {
return func(c *Copier) error {
if strings.Contains(strings.ToUpper(opt), "HEADER") {
return HeaderInCopyOptionsError
}
c.copyOptions = opt
return nil
}
}

// WithSplitCharacter sets the COPY option DELIMITER
func WithSplitCharacter(splitCharacter string) Option {
return func(c *Copier) error {
if len(splitCharacter) > 1 {
return errors.New("split character must be a single-byte character")
}
c.splitCharacter = splitCharacter
return nil
}
}

// WithQuoteCharacter sets the COPY option QUOTE
func WithQuoteCharacter(quoteCharacter string) Option {
return func(c *Copier) error {
if len(quoteCharacter) > 1 {
return errors.New("quote character must be a single-byte character")
}

c.quoteCharacter = quoteCharacter
return nil
}
}

// WithEscapeCharacter sets the COPY option ESCAPE
func WithEscapeCharacter(escapeCharacter string) Option {
return func(c *Copier) error {
if len(escapeCharacter) > 1 {
return errors.New("provided escape character must be a single-byte character")
}

c.escapeCharacter = escapeCharacter
return nil
}
}

// WithColumns accepts a list of comma separated values for the csv columns
func WithColumns(columns string) Option {
return func(c *Copier) error {
c.columns = columns
return nil
}
}

// WithSkipHeader is set, skips the first row of the csv file
func WithSkipHeader(skipHeader bool) Option {
return func(c *Copier) error {
if c.skip != 0 {
return errors.New("skip is already set. Use SkipHeader or SkipHeaderCount")
}
c.skip = 1
return nil
}
}

// WithSkipHeaderCount sets the number of lines to skip at the beginning of the file
func WithSkipHeaderCount(headerLineCount int) Option {
return func(c *Copier) error {
if c.skip != 0 {
return errors.New("skip is already set. Use SkipHeader or SkipHeaderCount")
}
if headerLineCount <= 0 {
return errors.New("header line count must be greater than zero")
}
c.skip = headerLineCount
return nil
}
}

// WithWorkers sets the number of workers to use while processing the file
func WithWorkers(workers int) Option {
return func(c *Copier) error {
if workers <= 0 {
return errors.New("workers must be greater than zero")
}
c.workers = workers
return nil
}
}

// WithLimit limits the number of imported rows
func WithLimit(limit int64) Option {
return func(c *Copier) error {
if limit < 0 {
return errors.New("limit must be greater than zero")
}
c.limit = limit
return nil
}
}

// WithBatchSize sets the rows processed on each batch
func WithBatchSize(batchSize int) Option {
return func(c *Copier) error {
if batchSize < 0 {
return errors.New("batch size must be greater than zero")
}
c.batchSize = batchSize
return nil
}
}

// WithLogBatches prints a line for every processed batch
func WithLogBatches(logBatches bool) Option {
return func(c *Copier) error {
c.logBatches = logBatches
return nil
}
}

// WithVerbose enables logging
func WithVerbose(verbose bool) Option {
return func(c *Copier) error {
c.verbose = verbose
return nil
}
}

// WithSchemaName sets the schema name
func WithSchemaName(schema string) Option {
return func(c *Copier) error {
c.schemaName = schema
return nil
}
}

0 comments on commit 464a7be

Please sign in to comment.