Skip to content

Commit

Permalink
refactor(exporter): init fileWriter in cmd layer
Browse files Browse the repository at this point in the history
  • Loading branch information
huantt committed Dec 1, 2022
1 parent 3a1018e commit 9557222
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 9 deletions.
7 changes: 6 additions & 1 deletion cmd/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/huantt/kafka-dump/pkg/log"
"github.com/pkg/errors"
"github.com/spf13/cobra"
"github.com/xitongsys/parquet-go-source/local"
"sync"
"time"
)
Expand Down Expand Up @@ -57,7 +58,11 @@ func CreateExportCommand() (*cobra.Command, error) {
outputFilePath = fmt.Sprintf("%s.%d", filePath, time.Now().UnixMilli())
}
log.Infof("[Worker-%d] Exporting to: %s", workerID, outputFilePath)
parquetWriter, err := impl.NewParquetWriter(outputFilePath)
fileWriter, err := local.NewLocalFileWriter(filePath)
if err != nil {
panic(errors.Wrap(err, "[NewLocalFileWriter]"))
}
parquetWriter, err := impl.NewParquetWriter(fileWriter)
if err != nil {
panic(errors.Wrap(err, "Unable to init parquet file writer"))
}
Expand Down
11 changes: 3 additions & 8 deletions impl/parquet_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/huantt/kafka-dump/pkg/log"
"github.com/pkg/errors"
"github.com/xitongsys/parquet-go-source/local"
"github.com/xitongsys/parquet-go/source"
"github.com/xitongsys/parquet-go/writer"
)
Expand All @@ -15,17 +14,13 @@ type ParquetWriter struct {
fileWriter source.ParquetFile
}

func NewParquetWriter(filePath string) (*ParquetWriter, error) {
fw, err := local.NewLocalFileWriter(filePath)
if err != nil {
return nil, errors.Wrap(err, "[NewLocalFileWriter]")
}
parquetWriter, err := writer.NewParquetWriter(fw, new(ParquetMessage), 4)
func NewParquetWriter(fileWriter source.ParquetFile) (*ParquetWriter, error) {
parquetWriter, err := writer.NewParquetWriter(fileWriter, new(ParquetMessage), 4)
if err != nil {
return nil, errors.Wrap(err, "[NewParquetWriter]")
}
return &ParquetWriter{
fileWriter: fw,
fileWriter: fileWriter,
parquetWriter: parquetWriter,
}, nil
}
Expand Down

0 comments on commit 9557222

Please sign in to comment.