-
Notifications
You must be signed in to change notification settings - Fork 896
/
Copy pathKafkaUtils.go
103 lines (87 loc) · 2.47 KB
/
KafkaUtils.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
package main
import (
"bufio"
"context"
"fmt"
"os"
"strings"
"time"
"github.com/google/uuid"
"gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
)
// TopicName holds the name of the topic
const TopicName string = "SensorReading"
// PropsFile holds the filename with config
const PropsFile string = "ccloud.properties"
// CreateTopic is a utility function that
// creates the topic if it doesn't exist.
func CreateTopic(props map[string]string) {
adminClient, err := kafka.NewAdminClient(&kafka.ConfigMap{
"bootstrap.servers": props["bootstrap.servers"],
"broker.version.fallback": "0.10.0.0",
"api.version.fallback.ms": 0,
"sasl.mechanisms": "PLAIN",
"security.protocol": "SASL_SSL",
"sasl.username": props["sasl.username"],
"sasl.password": props["sasl.password"]})
if err != nil {
fmt.Printf("Failed to create Admin client: %s\n", err)
os.Exit(1)
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
maxDuration, err := time.ParseDuration("60s")
if err != nil {
panic("time.ParseDuration(60s)")
}
results, err := adminClient.CreateTopics(ctx,
[]kafka.TopicSpecification{{
Topic: TopicName,
NumPartitions: 4,
ReplicationFactor: 3}},
kafka.SetAdminOperationTimeout(maxDuration))
if err != nil {
fmt.Printf("Problem during the topic creation: %v\n", err)
os.Exit(1)
}
for _, result := range results {
if result.Error.Code() != kafka.ErrNoError &&
result.Error.Code() != kafka.ErrTopicAlreadyExists {
fmt.Printf("Topic creation failed for %s: %v",
result.Topic, result.Error.String())
os.Exit(1)
}
}
adminClient.Close()
}
// LoadProperties read the properties file
// containing the Confluent Cloud config
// so the apps can connect to the service.
func LoadProperties() map[string]string {
props := make(map[string]string)
file, err := os.Open(PropsFile)
if err != nil {
panic(fmt.Sprintf("Failed to load the '%s' file", PropsFile))
}
defer file.Close()
scanner := bufio.NewScanner(file)
for scanner.Scan() {
line := strings.TrimSpace(scanner.Text())
if len(line) > 0 {
if !strings.HasPrefix(line, "//") &&
!strings.HasPrefix(line, "#") {
parts := strings.Split(line, "=")
key := strings.TrimSpace(parts[0])
value := strings.TrimSpace(parts[1])
props[key] = value
}
}
}
return props
}
// NewUUID is a simpler way
// to retrieve a new UUID.
func NewUUID() string {
newUUID, _ := uuid.NewUUID()
return newUUID.String()
}