Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhancement: fixing documentation and ability to produce to internal … #6

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ COPY go.mod .
COPY go.sum .
RUN go mod download
COPY . .
RUN GOOS=linux go build -o kafka-dump
RUN GOOS=linux GOARCH=amd64 go build -o kafka-dump

FROM ubuntu:20.04
RUN apt-get update && apt-get install -y ca-certificates
RUN mkdir /app
WORKDIR /app
COPY --from=builder /app/kafka-dump .
CMD ./kafka-dump export
CMD ./kafka-dump export
51 changes: 34 additions & 17 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ Usage:

Flags:
--concurrent-consumers int Number of concurrent consumers (default 1)
--enable-auto-offset-store To store offset in kafka broker (default true)
--fetch-message-max-bytes int Maximum number of bytes per topic+partition to request when fetching messages from the broker. (default 1048576)
-f, --file string Output file path (required)
--gcs-bucket string Google Cloud Storage bucket name
Expand All @@ -37,10 +38,10 @@ Flags:
--limit uint Supports file splitting. Files are split by the number of messages specified
--max-waiting-seconds-for-new-message int Max waiting seconds for new message, then this process will be marked as finish. Set -1 to wait forever. (default 30)
--queued-max-messages-kbytes int Maximum number of kilobytes per topic+partition in the local consumer queue. This value may be overshot by fetch.message.max.bytes (default 128000)
--ssl-ca-location string location of client ca cert file in pem
--ssl-certificate-location string client's certificate location
--ssl-key-location string path to ssl private key
--ssl-key-password string password for ssl private key passphrase
--ssl-ca-location string Location of client ca cert file in pem
--ssl-certificate-location string Client certificate location
--ssl-key-location string Path to ssl private key
--ssl-key-password string Password for ssl private key passphrase
--storage string Storage type: local file (file) or Google cloud storage (gcs) (default "file")

Global Flags:
Expand Down Expand Up @@ -73,10 +74,10 @@ kafka-dump export \
--kafka-username=admin \
--kafka-password=admin \
--kafka-security-protocol=SSL \
--kafka-sasl-mechanism=PLAIN
--ssl-ca-location=<path to ssl cacert>
--ssl-certificate-location=<path to ssl cert>
--ssl-key-location=<path to ssl key>
--kafka-sasl-mechanism=PLAIN \
--ssl-ca-location=<path to ssl cacert> \
--ssl-certificate-location=<path to ssl cert> \
--ssl-key-location=<path to ssl key> \
--ssl-key-password=<ssl key password>
```

Expand All @@ -88,16 +89,16 @@ Usage:
Flags:
-f, --file string Output file path (required)
-h, --help help for import
-i, --include-partition-and-offset to store partition and offset of kafka message in file
-i, --include-partition-and-offset To store partition and offset of kafka message in file
--kafka-password string Kafka password
--kafka-sasl-mechanism string Kafka password
--kafka-security-protocol string Kafka security protocol
--kafka-servers string Kafka servers string
--kafka-username string Kafka username
--ssl-ca-location string location of client ca cert file in pem
--ssl-certificate-location string client's certificate location
--ssl-key-location string path to ssl private key
--ssl-key-password string password for ssl private key passphrase
--ssl-ca-location string Location of client ca cert file in pem
--ssl-certificate-location string Client certificate location
--ssl-key-location string Path to ssl private key
--ssl-key-password string Password for ssl private key passphrase

Global Flags:
--log-level string Log level (default "info")
Expand All @@ -123,10 +124,26 @@ kafka-dump import \
--kafka-username=admin \
--kafka-password=admin \
--kafka-security-protocol=SSL \
--kafka-sasl-mechanism=PLAIN
--ssl-ca-location=<path to ssl cacert>
--ssl-certificate-location=<path to ssl cert>
--ssl-key-location=<path to ssl key>
--kafka-sasl-mechanism=PLAIN \
--ssl-ca-location=<path to ssl cacert> \
--ssl-certificate-location=<path to ssl cert> \
--ssl-key-location=<path to ssl key> \
--ssl-key-password=<ssl key password>
```

- In order to produce data in the internal topics of Kafka cluster (for eg. "__consumer_offsets"), the client id of the producer needs to be configured as `__admin_client`.
```shell
kafka-dump import \
--file=path/to/input/data.parquet \
--kafka-servers=localhost:9092 \
--kafka-username=admin \
--kafka-password=admin \
--kafka-security-protocol=SSL \
--kafka-sasl-mechanism=PLAIN \
--client-id=__admin_client \
--ssl-ca-location=<path to ssl cacert> \
--ssl-certificate-location=<path to ssl cert> \
--ssl-key-location=<path to ssl key> \
--ssl-key-password=<ssl key password>
```

Expand Down
48 changes: 36 additions & 12 deletions cmd/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ import (
)

func CreateExportCommand() (*cobra.Command, error) {
var filePath string
var messageFilePath string
var offsetFilePath string
var kafkaServers string
var kafkaUsername string
var kafkaPassword string
Expand Down Expand Up @@ -58,6 +59,11 @@ func CreateExportCommand() (*cobra.Command, error) {
SSLCertLocation: sslCertLocation,
EnableAutoOffsetStore: enableAutoOffsetStore,
}
adminClient, err := kafka_utils.NewAdminClient(kafkaConsumerConfig)
if err != nil {
panic(errors.Wrap(err, "Unable to init admin client"))
}

consumer, err := kafka_utils.NewConsumer(kafkaConsumerConfig)
if err != nil {
panic(errors.Wrap(err, "Unable to init consumer"))
Expand All @@ -74,12 +80,12 @@ func CreateExportCommand() (*cobra.Command, error) {
go func(workerID int) {
defer wg.Done()
for {
outputFilePath := filePath
outputFilePath := messageFilePath
if exportLimitPerFile > 0 {
outputFilePath = fmt.Sprintf("%s.%d", filePath, time.Now().UnixMilli())
outputFilePath = fmt.Sprintf("%s.%d", messageFilePath, time.Now().UnixMilli())
}
log.Infof("[Worker-%d] Exporting to: %s", workerID, outputFilePath)
fileWriter, err := createParquetFileWriter(
messageFileWriter, err := createParquetFileWriter(
Storage(storageType),
outputFilePath,
gcs_utils.Config{
Expand All @@ -91,11 +97,24 @@ func CreateExportCommand() (*cobra.Command, error) {
if err != nil {
panic(errors.Wrap(err, "[NewLocalFileWriter]"))
}
parquetWriter, err := impl.NewParquetWriter(*fileWriter)
offsetsFilePath := offsetFilePath
offsetFileWriter, err := createParquetFileWriter(
Storage(storageType),
offsetsFilePath,
gcs_utils.Config{
ProjectId: gcsProjectID,
BucketName: gcsBucketName,
CredentialsFile: googleCredentialsFile,
},
)
if err != nil {
panic(errors.Wrap(err, "[NewLocalFileWriter]"))
}
parquetWriter, err := impl.NewParquetWriter(*messageFileWriter, *offsetFileWriter)
if err != nil {
panic(errors.Wrap(err, "Unable to init parquet file writer"))
}
exporter, err := impl.NewExporter(consumer, *topics, parquetWriter, options)
exporter, err := impl.NewExporter(adminClient, consumer, *topics, parquetWriter, options)
if err != nil {
panic(errors.Wrap(err, "Failed to init exporter"))
}
Expand All @@ -116,19 +135,20 @@ func CreateExportCommand() (*cobra.Command, error) {
},
}
command.Flags().StringVar(&storageType, "storage", "file", "Storage type: local file (file) or Google cloud storage (gcs)")
command.Flags().StringVarP(&filePath, "file", "f", "", "Output file path (required)")
command.Flags().StringVarP(&messageFilePath, "file", "f", "", "Output file path for storing message (required)")
command.Flags().StringVarP(&offsetFilePath, "offset-file", "o", "", "Output file path for storing offset (required)")
command.Flags().StringVar(&googleCredentialsFile, "google-credentials", "", "Path to Google Credentials file")
command.Flags().StringVar(&gcsBucketName, "gcs-bucket", "", "Google Cloud Storage bucket name")
command.Flags().StringVar(&gcsProjectID, "gcs-project-id", "", "Google Cloud Storage Project ID")
command.Flags().StringVar(&kafkaServers, "kafka-servers", "", "Kafka servers string")
command.Flags().StringVar(&kafkaUsername, "kafka-username", "", "Kafka username")
command.Flags().StringVar(&kafkaPassword, "kafka-password", "", "Kafka password")
command.Flags().StringVar(&kafkaSASKMechanism, "kafka-sasl-mechanism", "", "Kafka password")
command.Flags().StringVar(&sslCaLocation, "ssl-ca-location", "", "location of client ca cert file in pem")
command.Flags().StringVar(&sslKeyPassword, "ssl-key-password", "", "password for ssl private key passphrase")
command.Flags().StringVar(&sslCertLocation, "ssl-certificate-location", "", "client's certificate location")
command.Flags().StringVar(&sslKeyLocation, "ssl-key-location", "", "path to ssl private key")
command.Flags().BoolVar(&enableAutoOffsetStore, "enable-auto-offset-store", true, "to store offset in kafka broker")
command.Flags().StringVar(&sslCaLocation, "ssl-ca-location", "", "Location of client ca cert file in pem")
command.Flags().StringVar(&sslKeyPassword, "ssl-key-password", "", "Password for ssl private key passphrase")
command.Flags().StringVar(&sslCertLocation, "ssl-certificate-location", "", "Client's certificate location")
command.Flags().StringVar(&sslKeyLocation, "ssl-key-location", "", "Path to ssl private key")
command.Flags().BoolVar(&enableAutoOffsetStore, "enable-auto-offset-store", true, "To store offset in kafka broker")
command.Flags().StringVar(&kafkaSecurityProtocol, "kafka-security-protocol", "", "Kafka security protocol")
command.Flags().StringVar(&kafkaGroupID, "kafka-group-id", "", "Kafka consumer group ID")
command.Flags().Uint64Var(&exportLimitPerFile, "limit", 0, "Supports file splitting. Files are split by the number of messages specified")
Expand All @@ -143,6 +163,10 @@ func CreateExportCommand() (*cobra.Command, error) {
if err != nil {
return nil, err
}
err = command.MarkFlagRequired("offset-file")
if err != nil {
return nil, err
}
return &command, nil
}

Expand Down
57 changes: 43 additions & 14 deletions cmd/importer.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package cmd

import (
"context"
"fmt"

"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
"github.com/huantt/kafka-dump/impl"
"github.com/huantt/kafka-dump/pkg/kafka_utils"
"github.com/huantt/kafka-dump/pkg/log"
Expand All @@ -12,7 +13,8 @@ import (
)

func CreateImportCmd() (*cobra.Command, error) {
var filePath string
var messageFilePath string
var offsetFilePath string
var kafkaServers string
var kafkaUsername string
var kafkaPassword string
Expand All @@ -23,12 +25,16 @@ func CreateImportCmd() (*cobra.Command, error) {
var sslCertLocation string
var sslKeyLocation string
var includePartitionAndOffset bool
var clientid string
var restoreBefore string
var restoreAfter string

command := cobra.Command{
Use: "import",
Run: func(cmd *cobra.Command, args []string) {
log.Infof("Input file: %s", filePath)
parquetReader, err := impl.NewParquetReader(filePath, includePartitionAndOffset)
logger := log.WithContext(context.Background())
logger.Infof("Input file: %s", messageFilePath)
parquetReader, err := impl.NewParquetReader(messageFilePath, offsetFilePath, includePartitionAndOffset)
if err != nil {
panic(errors.Wrap(err, "Unable to init parquet file reader"))
}
Expand All @@ -47,7 +53,8 @@ func CreateImportCmd() (*cobra.Command, error) {
SSLKeyLocation: sslKeyLocation,
SSLCertLocation: sslCertLocation,
SSLKeyPassword: sslKeyPassword,
EnableAutoOffsetStore: false,
EnableAutoOffsetStore: true,
ClientID: clientid,
}
producer, err := kafka_utils.NewProducer(kafkaProducerConfig)
if err != nil {
Expand All @@ -64,29 +71,51 @@ func CreateImportCmd() (*cobra.Command, error) {
if m.TopicPartition.Error != nil {
panic(fmt.Sprintf("Failed to deliver message: %v\n", m.TopicPartition))
} else {
log.Debugf("Successfully produced record to topic %s partition [%d] @ offset %v\n",
logger.Debugf("Successfully produced record to topic %s partition [%d] @ offset %v\n",
*m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset)
}
}
}()
importer := impl.NewImporter(producer, deliveryChan, parquetReader)
err = importer.Run()
kafkaConsumerConfig := kafka_utils.Config{
BootstrapServers: kafkaServers,
SecurityProtocol: kafkaSecurityProtocol,
SASLMechanism: kafkaSASKMechanism,
SASLUsername: kafkaUsername,
SASLPassword: kafkaPassword,
SSLCALocation: sslCaLocation,
SSLKeyPassword: sslKeyPassword,
SSLKeyLocation: sslKeyLocation,
SSLCertLocation: sslCertLocation,
}
importer, err := impl.NewImporter(logger, producer, deliveryChan, parquetReader, restoreBefore, restoreAfter)
if err != nil {
panic(errors.Wrap(err, "unable to init importer"))
}
err = importer.Run(kafkaConsumerConfig)
if err != nil {
panic(errors.Wrap(err, "Error while running importer"))
}
},
}
command.Flags().StringVarP(&filePath, "file", "f", "", "Output file path (required)")
command.Flags().StringVarP(&messageFilePath, "file", "f", "", "Output file path for storing message (required)")
command.Flags().StringVarP(&offsetFilePath, "offset-file", "o", "", "Output file path for storing offset")
command.Flags().StringVar(&kafkaServers, "kafka-servers", "", "Kafka servers string")
command.Flags().StringVar(&kafkaUsername, "kafka-username", "", "Kafka username")
command.Flags().StringVar(&kafkaPassword, "kafka-password", "", "Kafka password")
command.Flags().StringVar(&kafkaSASKMechanism, "kafka-sasl-mechanism", "", "Kafka password")
command.Flags().StringVar(&kafkaSecurityProtocol, "kafka-security-protocol", "", "Kafka security protocol")
command.MarkFlagsRequiredTogether("kafka-username", "kafka-password", "kafka-sasl-mechanism", "kafka-security-protocol")
command.Flags().StringVar(&sslCaLocation, "ssl-ca-location", "", "location of client ca cert file in pem")
command.Flags().StringVar(&sslKeyPassword, "ssl-key-password", "", "password for ssl private key passphrase")
command.Flags().StringVar(&sslCertLocation, "ssl-certificate-location", "", "client's certificate location")
command.Flags().StringVar(&sslKeyLocation, "ssl-key-location", "", "path to ssl private key")
command.Flags().BoolVarP(&includePartitionAndOffset, "include-partition-and-offset", "i", false, "to store partition and offset of kafka message in file")
command.Flags().StringVar(&sslCaLocation, "ssl-ca-location", "", "Location of client ca cert file in pem")
command.Flags().StringVar(&sslKeyPassword, "ssl-key-password", "", "Password for ssl private key passphrase")
command.Flags().StringVar(&sslCertLocation, "ssl-certificate-location", "", "Client's certificate location")
command.Flags().StringVar(&sslKeyLocation, "ssl-key-location", "", "Path to ssl private key")
command.Flags().StringVar(&clientid, "client-id", "", "Producer client id")
command.Flags().StringVar(&restoreBefore, "restore-before", "", "timestamp in RFC3339 format to restore data before this time")
command.Flags().StringVar(&restoreAfter, "restore-after", "", "timestamp in RFC3339 format to restore data after this time")
command.Flags().BoolVarP(&includePartitionAndOffset, "include-partition-and-offset", "i", false, "To store partition and offset of kafka message in file")
err := command.MarkFlagRequired("file")
if err != nil {
return nil, err
}
return &command, nil
}
15 changes: 11 additions & 4 deletions cmd/parquet_row_counter.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,29 @@ import (
)

func CreateCountParquetRowCommand() (*cobra.Command, error) {
var filePath string
var filePathMessage string
var filePathOffset string

command := cobra.Command{
Use: "count-parquet-rows",
Run: func(cmd *cobra.Command, args []string) {
parquetReader, err := impl.NewParquetReader(filePath, false)
parquetReader, err := impl.NewParquetReader(filePathMessage, filePathOffset, false)
if err != nil {
panic(errors.Wrap(err, "Unable to init parquet file reader"))
}
log.Infof("Number of rows: %d", parquetReader.GetNumberOfRows())
log.Infof("Number of rows in message file: %d", parquetReader.GetNumberOfRowsInMessageFile())
log.Infof("Number of rows in offset file: %d", parquetReader.GetNumberOfRowsInOffsetFile())
},
}
command.Flags().StringVarP(&filePath, "file", "f", "", "File path (required)")
command.Flags().StringVarP(&filePathMessage, "file", "f", "", "File path of stored kafka message (required)")
command.Flags().StringVarP(&filePathOffset, "offset-file", "o", "", "File path of stored kafka offset (required)")
err := command.MarkFlagRequired("file")
if err != nil {
return nil, err
}
err = command.MarkFlagRequired("offset-file")
if err != nil {
return nil, err
}
return &command, nil
}
5 changes: 3 additions & 2 deletions cmd/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ package cmd

import (
"fmt"
"github.com/confluentinc/confluent-kafka-go/kafka"
"time"

"github.com/confluentinc/confluent-kafka-go/v2/kafka"
"github.com/huantt/kafka-dump/impl"
"github.com/huantt/kafka-dump/pkg/kafka_utils"
"github.com/huantt/kafka-dump/pkg/log"
"github.com/spf13/cobra"
"time"
)

func CreateStreamCmd() (*cobra.Command, error) {
Expand Down
Loading