forked from disney/quanta
-
Notifications
You must be signed in to change notification settings - Fork 0
/
quanta-node.go
161 lines (132 loc) · 4.26 KB
/
quanta-node.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
// Data Node launchor.
package main
import (
"fmt"
"net"
_ "net/http/pprof"
"os"
"os/signal"
"syscall"
"time"
u "github.com/araddon/gou"
"github.com/disney/quanta/server"
"github.com/disney/quanta/shared"
"github.com/hashicorp/consul/api"
"gopkg.in/alecthomas/kingpin.v2"
)
var (
// Version number (i.e. 0.8.0)
Version string
// Build date
Build string
)
func main() {
fmt.Println("hello from Node")
fmt.Println("IP address", GetOutboundIP())
app := kingpin.New(os.Args[0], "Quanta server node.").DefaultEnvars()
app.Version("Version: " + Version + "\nBuild: " + Build)
hashKey := app.Arg("hash-key", "Consistent hash key for node.").String()
dataDir := app.Arg("data-dir", "Root directory for data files.").Default("/home/ec2-user/data").String()
bindAddr := app.Arg("bind", "Bind address for this endpoint.").Default("0.0.0.0").String()
port := app.Arg("port", "Port for this endpoint.").Default("4000").Int32()
memLimit := app.Flag("mem-limit-mb", "Data partitions will expire after MB limit is exceeded (disabled if not specified).").Default("0").Int32()
tls := app.Flag("tls", "Connection uses TLS if true.").Bool()
certFile := app.Flag("cert-file", "TLS cert file path.").String()
keyFile := app.Flag("key-file", "TLS key file path.").String()
consul := app.Flag("consul-endpoint", "Consul agent address/port").Default("127.0.0.1:8500").String()
environment := app.Flag("env", "Environment [DEV, QA, STG, VAL, PROD]").Default("DEV").String()
logLevel := app.Flag("log-level", "Log Level [ERROR, WARN, INFO, DEBUG]").Default("WARN").String()
pprof := app.Flag("pprof", "Start the pprof server").Default("false").String()
kingpin.MustParse(app.Parse(os.Args[1:]))
shared.InitLogging(*logLevel, *environment, "Data-Node", Version, "Quanta")
if *bindAddr == "0.0.0.0" { // if there's no bind address given then find our ip address
myaddr := GetOutboundIP().String()
*bindAddr = myaddr
fmt.Println("bindAddr", *bindAddr)
}
fmt.Println("pprof is ", *pprof)
shared.StartPprofAndPromListener(*pprof)
u.Warnf("Node identifier '%s'", *hashKey)
u.Infof("Connecting to Consul at: [%s] ...\n", *consul)
consulClient, err := api.NewClient(&api.Config{Address: *consul})
if err != nil {
// Is the consul agent running?
u.Errorf("node: Cannot initialize endpoint config: error: %s", err)
}
_ = *tls
_ = *certFile
_ = *keyFile
fmt.Println("before server.NewNode")
m, err := server.NewNode(fmt.Sprintf("%v:%v", Version, Build), int(*port), *bindAddr, *dataDir, *hashKey, consulClient)
if err != nil {
u.Errorf("[node: Cannot initialize node config: error: %s", err)
}
fmt.Println("after server.NewNode")
kvStore := server.NewKVStore(m)
m.AddNodeService(kvStore)
search := server.NewStringSearch(m)
m.AddNodeService(search)
bitmapIndex := server.NewBitmapIndex(m, int(*memLimit))
m.AddNodeService(bitmapIndex)
fmt.Println("after AddNodeService ...")
// Start listening endpoint
m.Start()
// Start metrics publisher thread
ticker := metricsTicker(m)
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
go func() {
for range c {
u.Errorf("Interrupt signal received. Starting Shutdown...")
ticker.Stop()
m.Leave()
time.Sleep(5 * time.Second)
os.Exit(0)
}
}()
fmt.Println("after m.InitServices")
start := time.Now()
err = m.InitServices()
elapsed := time.Since(start)
if err != nil {
u.Error(err)
}
u.Debugf("Data node initialized in %v.", elapsed)
fmt.Println("before m.Join")
err = m.Join("quanta")
if err != nil {
u.Errorf("[node: Cannot initialize endpoint config: error: %s", err)
}
<-m.Stop
u.Debug(hashKey, "Node got m.Stop.")
m.Leave()
select {
case err = <-m.Err:
default:
}
if err != nil {
u.Errorf("[node: Cannot initialize endpoint config: error: %s", err)
}
}
func metricsTicker(node *server.Node) *time.Ticker {
t := time.NewTicker(time.Second * 10)
start := time.Now()
lastTime := time.Now()
go func() {
for range t.C {
duration := time.Since(start)
lastTime = node.PublishMetrics(duration, lastTime)
}
}()
return t
}
// Get preferred outbound ip of this machine
func GetOutboundIP() net.IP {
conn, err := net.Dial("udp", "8.8.8.8:80")
if err != nil {
u.Log(u.FATAL, err)
}
defer conn.Close()
localAddr := conn.LocalAddr().(*net.UDPAddr)
return localAddr.IP
}