-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
add go-kafka-consumer
- Loading branch information
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,78 @@ | ||
package main | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"log" | ||
|
||
"github.com/confluentinc/confluent-kafka-go/kafka" | ||
"github.com/colinmarc/hdfs" | ||
) | ||
|
||
func main() { | ||
// Kafka configuration | ||
config := kafka.ConfigMap{ | ||
"bootstrap.servers": "kafka-service:9092", | ||
"group.id": "hdfs-writer-group", | ||
"auto.offset.reset": "earliest", | ||
} | ||
|
||
// Create Kafka consumer | ||
consumer, err := kafka.NewConsumer(&config) | ||
if err != nil { | ||
log.Fatalf("Failed to create consumer: %s", err) | ||
} | ||
defer consumer.Close() | ||
|
||
// Subscribe to the Kafka topic | ||
topic := "coin-data" | ||
err = consumer.Subscribe(topic, nil) | ||
if err != nil { | ||
log.Fatalf("Failed to subscribe to topic: %s", err) | ||
} | ||
|
||
// HDFS configuration | ||
hdfsClient, err := hdfs.New("hdfs-service:9000") | ||
if err != nil { | ||
log.Fatalf("Failed to create HDFS client: %s", err) | ||
} | ||
|
||
// Open HDFS file for writing | ||
hdfsFilePath := "/raw_data" | ||
hdfsFile, err := hdfsClient.Append(hdfsFilePath) | ||
if err != nil { | ||
hdfsFile, err = hdfsClient.Create(hdfsFilePath) | ||
if err != nil { | ||
log.Fatalf("Failed to create HDFS file: %s", err) | ||
} | ||
} | ||
defer hdfsFile.Close() | ||
|
||
// Context to manage message consumption | ||
ctx, cancel := context.WithCancel(context.Background()) | ||
defer cancel() | ||
|
||
// Consume messages from Kafka | ||
for { | ||
select { | ||
case <-ctx.Done(): | ||
fmt.Println("Stopping consumer") | ||
return | ||
default: | ||
msg, err := consumer.ReadMessage(-1) | ||
if err != nil { | ||
fmt.Printf("Consumer error: %v (%v)\n", err, msg) | ||
continue | ||
} | ||
|
||
// Write the raw data to HDFS | ||
_, err = hdfsFile.Write([]byte(msg.Value)) | ||
if err != nil { | ||
log.Fatalf("Failed to write to HDFS file: %s", err) | ||
} | ||
|
||
// Print message to console (optional) | ||
fmt.Printf("Consumed message: %s\n", string(msg.Value)) | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
module main | ||
|
||
go 1.22.4 | ||
|
||
require ( | ||
github.com/colinmarc/hdfs v1.1.3 | ||
github.com/confluentinc/confluent-kafka-go v1.9.2 | ||
) | ||
|
||
require ( | ||
github.com/golang/protobuf v1.5.2 // indirect | ||
github.com/google/go-cmp v0.5.8 // indirect | ||
google.golang.org/protobuf v1.28.0 // indirect | ||
gopkg.in/yaml.v3 v3.0.1 // indirect | ||
) |
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
FROM golang:latest | ||
COPY . . | ||
RUN go build | ||
CMD ./main |
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.