From 955722269e0281b7a576694f04062f5dee2d9e41 Mon Sep 17 00:00:00 2001 From: jack Date: Thu, 1 Dec 2022 20:47:11 +0700 Subject: [PATCH] refactor(exporter): init fileWriter in cmd layer --- cmd/exporter.go | 7 ++++++- impl/parquet_writer.go | 11 +++-------- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/cmd/exporter.go b/cmd/exporter.go index c25e79c..390a732 100644 --- a/cmd/exporter.go +++ b/cmd/exporter.go @@ -7,6 +7,7 @@ import ( "github.com/huantt/kafka-dump/pkg/log" "github.com/pkg/errors" "github.com/spf13/cobra" + "github.com/xitongsys/parquet-go-source/local" "sync" "time" ) @@ -57,7 +58,11 @@ func CreateExportCommand() (*cobra.Command, error) { outputFilePath = fmt.Sprintf("%s.%d", filePath, time.Now().UnixMilli()) } log.Infof("[Worker-%d] Exporting to: %s", workerID, outputFilePath) - parquetWriter, err := impl.NewParquetWriter(outputFilePath) + fileWriter, err := local.NewLocalFileWriter(filePath) + if err != nil { + panic(errors.Wrap(err, "[NewLocalFileWriter]")) + } + parquetWriter, err := impl.NewParquetWriter(fileWriter) if err != nil { panic(errors.Wrap(err, "Unable to init parquet file writer")) } diff --git a/impl/parquet_writer.go b/impl/parquet_writer.go index dcdbb5b..2198e56 100644 --- a/impl/parquet_writer.go +++ b/impl/parquet_writer.go @@ -5,7 +5,6 @@ import ( "github.com/confluentinc/confluent-kafka-go/kafka" "github.com/huantt/kafka-dump/pkg/log" "github.com/pkg/errors" - "github.com/xitongsys/parquet-go-source/local" "github.com/xitongsys/parquet-go/source" "github.com/xitongsys/parquet-go/writer" ) @@ -15,17 +14,13 @@ type ParquetWriter struct { fileWriter source.ParquetFile } -func NewParquetWriter(filePath string) (*ParquetWriter, error) { - fw, err := local.NewLocalFileWriter(filePath) - if err != nil { - return nil, errors.Wrap(err, "[NewLocalFileWriter]") - } - parquetWriter, err := writer.NewParquetWriter(fw, new(ParquetMessage), 4) +func NewParquetWriter(fileWriter source.ParquetFile) (*ParquetWriter, error) { + parquetWriter, err := writer.NewParquetWriter(fileWriter, new(ParquetMessage), 4) if err != nil { return nil, errors.Wrap(err, "[NewParquetWriter]") } return &ParquetWriter{ - fileWriter: fw, + fileWriter: fileWriter, parquetWriter: parquetWriter, }, nil }