Skip to content

Commit

Permalink
feat(exporter): Support exporting data to Google Cloud Storage
Browse files Browse the repository at this point in the history
  • Loading branch information
huantt committed Dec 1, 2022
1 parent 9557222 commit 2a66f41
Show file tree
Hide file tree
Showing 7 changed files with 281 additions and 18 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ vendor/
out/
.env
.init.env
.DS_Store
.DS_Store
google-credentials.json
39 changes: 24 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,30 @@ Usage:
export [flags]

Flags:
-f, --file string Output file path (required)
-h, --help help for export
--concurrent-consumers int Number of concurrent consumers (default 1)
--kafka-group-id string Kafka consumer group ID
--kafka-password string Kafka password
--kafka-sasl-mechanism string Kafka password
--kafka-security-protocol string Kafka security protocol
--kafka-servers string Kafka servers string
--kafka-topics stringArray Kafka topics
--kafka-username string Kafka username
--limit uint Supports file splitting. Files are split by the number of messages specified
--max-waiting-seconds-for-new-message uint Max waiting seconds for new message, then this process will be marked as finish. Set -1 to wait forever. (default 30)
--concurrent-consumers int Number of concurrent consumers (default 1)
-f, --file string Output file path (required)
--gcs-bucket string Google Cloud Storage bucket name
--gcs-project-id string Google Cloud Storage Project ID
--google-credentials string Path to Google Credentials file
-h, --help help for export
--kafka-group-id string Kafka consumer group ID
--kafka-password string Kafka password
--kafka-sasl-mechanism string Kafka password
--kafka-security-protocol string Kafka security protocol
--kafka-servers string Kafka servers string
--kafka-topics stringArray Kafka topics
--kafka-username string Kafka username
--limit uint Supports file splitting. Files are split by the number of messages specified
--max-waiting-seconds-for-new-message int Max waiting seconds for new message, then this process will be marked as finish. Set -1 to wait forever. (default 30)
--storage string Storage type: local file (file) or Google cloud storage (gcs) (default "file")

Global Flags:
--log-level string Log level (default "info")
--log-level string Log level (default "info")
```
#### Sample
```shell
kafka-dump export \
--storage=file
--file=path/to/output/data.parquet \
--kafka-topics=users-activities \
--kafka-group-id=id=kafka-dump.local \
Expand Down Expand Up @@ -141,7 +146,7 @@ kafka-dump count-parquet-rows \
--file=path/to/output/data.parquet
```

# Use Docker
## Use Docker
```shell
docker run -d --rm \
-v /local-data:/data \
Expand All @@ -155,4 +160,8 @@ kafka-dump export \
--kafka-password=admin \
--kafka-security-protocol=SASL_SSL \
--kafka-sasl-mechanism=PLAIN
```
```

## TODO
- Import topics from multiple files or directory
- Import topics from Google Cloud Storage files or directory
56 changes: 54 additions & 2 deletions cmd/exporter.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
package cmd

import (
"context"
"fmt"
"github.com/huantt/kafka-dump/impl"
"github.com/huantt/kafka-dump/pkg/gcs_utils"
"github.com/huantt/kafka-dump/pkg/kafka_utils"
"github.com/huantt/kafka-dump/pkg/log"
"github.com/pkg/errors"
"github.com/spf13/cobra"
"github.com/xitongsys/parquet-go-source/gcs"
"github.com/xitongsys/parquet-go-source/local"
"github.com/xitongsys/parquet-go/source"
"sync"
"time"
)
Expand All @@ -24,6 +28,10 @@ func CreateExportCommand() (*cobra.Command, error) {
var exportLimitPerFile uint64
var maxWaitingSecondsForNewMessage int
var concurrentConsumers = 1
var googleCredentialsFile string
var storageType string
var gcsBucketName string
var gcsProjectID string

command := cobra.Command{
Use: "export",
Expand Down Expand Up @@ -58,11 +66,19 @@ func CreateExportCommand() (*cobra.Command, error) {
outputFilePath = fmt.Sprintf("%s.%d", filePath, time.Now().UnixMilli())
}
log.Infof("[Worker-%d] Exporting to: %s", workerID, outputFilePath)
fileWriter, err := local.NewLocalFileWriter(filePath)
fileWriter, err := createParquetFileWriter(
Storage(storageType),
outputFilePath,
gcs_utils.Config{
ProjectId: gcsProjectID,
BucketName: gcsBucketName,
CredentialsFile: googleCredentialsFile,
},
)
if err != nil {
panic(errors.Wrap(err, "[NewLocalFileWriter]"))
}
parquetWriter, err := impl.NewParquetWriter(fileWriter)
parquetWriter, err := impl.NewParquetWriter(*fileWriter)
if err != nil {
panic(errors.Wrap(err, "Unable to init parquet file writer"))
}
Expand All @@ -86,7 +102,11 @@ func CreateExportCommand() (*cobra.Command, error) {
wg.Wait()
},
}
command.Flags().StringVar(&storageType, "storage", "file", "Storage type: local file (file) or Google cloud storage (gcs)")
command.Flags().StringVarP(&filePath, "file", "f", "", "Output file path (required)")
command.Flags().StringVar(&googleCredentialsFile, "google-credentials", "", "Path to Google Credentials file")
command.Flags().StringVar(&gcsBucketName, "gcs-bucket", "", "Google Cloud Storage bucket name")
command.Flags().StringVar(&gcsProjectID, "gcs-project-id", "", "Google Cloud Storage Project ID")
command.Flags().StringVar(&kafkaServers, "kafka-servers", "", "Kafka servers string")
command.Flags().StringVar(&kafkaUsername, "kafka-username", "", "Kafka username")
command.Flags().StringVar(&kafkaPassword, "kafka-password", "", "Kafka password")
Expand All @@ -98,9 +118,41 @@ func CreateExportCommand() (*cobra.Command, error) {
command.Flags().IntVar(&concurrentConsumers, "concurrent-consumers", 1, "Number of concurrent consumers")
topics = command.Flags().StringArray("kafka-topics", nil, "Kafka topics")
command.MarkFlagsRequiredTogether("kafka-username", "kafka-password", "kafka-sasl-mechanism", "kafka-security-protocol")
command.MarkFlagsRequiredTogether("google-credentials", "gcs-bucket", "gcs-project-id")
err := command.MarkFlagRequired("file")
if err != nil {
return nil, err
}
return &command, nil
}

type Storage string

const (
StorageLocalFile Storage = "file"
StorageGoogleCloudStorage Storage = "gcs"
)

func createParquetFileWriter(storage Storage, filePath string, gcsConfig gcs_utils.Config) (*source.ParquetFile, error) {
switch storage {
case StorageLocalFile:
fw, err := local.NewLocalFileWriter(filePath)
if err != nil {
return nil, errors.Wrap(err, "[NewLocalFileWriter]")
}
return &fw, nil
case StorageGoogleCloudStorage:
ctx := context.Background()
client, err := gcs_utils.Singleton(gcsConfig.CredentialsFile)
if err != nil {
return nil, errors.Wrap(err, "Failed to create Singleton GCS client")
}
fw, err := gcs.NewGcsFileWriterWithClient(ctx, client, gcsConfig.ProjectId, gcsConfig.BucketName, filePath)
if err != nil {
return nil, errors.Wrap(err, "[NewGcsFileWriterWithClient]")
}
return &fw, nil
default:
return nil, errors.New(fmt.Sprintf("Storage type must be either file or gcs. Got %s", storage))
}
}
18 changes: 18 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/huantt/kafka-dump
go 1.18

require (
cloud.google.com/go/storage v1.14.0
github.com/confluentinc/confluent-kafka-go v1.9.2
github.com/pkg/errors v0.9.1
github.com/sirupsen/logrus v1.9.0
Expand All @@ -12,13 +13,30 @@ require (
)

require (
cloud.google.com/go v0.75.0 // indirect
github.com/apache/arrow/go/arrow v0.0.0-20200730104253-651201b0f516 // indirect
github.com/apache/thrift v0.14.2 // indirect
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/googleapis/gax-go/v2 v2.0.5 // indirect
github.com/inconshreveable/mousetrap v1.0.1 // indirect
github.com/jstemmer/go-junit-report v0.9.1 // indirect
github.com/klauspost/compress v1.13.1 // indirect
github.com/pierrec/lz4/v4 v4.1.8 // indirect
github.com/spf13/pflag v1.0.5 // indirect
go.opencensus.io v0.22.5 // indirect
golang.org/x/lint v0.0.0-20201208152925-83fdc39ff7b5 // indirect
golang.org/x/mod v0.4.1 // indirect
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4 // indirect
golang.org/x/oauth2 v0.0.0-20210218202405-ba52d332ba99 // indirect
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 // indirect
golang.org/x/text v0.3.5 // indirect
golang.org/x/tools v0.1.0 // indirect
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
google.golang.org/api v0.40.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20220503193339-ba3ae3f07e29 // indirect
google.golang.org/grpc v1.46.0 // indirect
google.golang.org/protobuf v1.28.0 // indirect
)
Loading

0 comments on commit 2a66f41

Please sign in to comment.