forked from elastic/logstash-forwarder
-
Notifications
You must be signed in to change notification settings - Fork 5
/
publisher1.go
335 lines (284 loc) · 10.6 KB
/
publisher1.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
package main
import (
"bytes"
"compress/zlib"
"crypto/tls"
"crypto/x509"
"encoding/binary"
"encoding/pem"
"io"
"io/ioutil"
"log"
"math/rand"
"net"
"os"
"regexp"
"strconv"
"time"
)
// Support for newer SSL signature algorithms
import _ "crypto/sha256"
import _ "crypto/sha512"
var hostname string
var hostport_re, _ = regexp.Compile(`^\[?([^]]+)\]?:([0-9]+)$`)
func init() {
log.Printf("publisher init\n")
hostname, _ = os.Hostname()
rand.Seed(time.Now().UnixNano())
}
func Publishv1(input chan []*FileEvent,
registrar_chan chan []RegistrarEvent,
config *NetworkConfig) {
var buffer bytes.Buffer
var compressed_payload []byte
var socket *tls.Conn
var last_ack_sequence uint32
var sequence uint32
var err error
socket = connect(config)
defer socket.Close()
// TODO(driskell): Make the idle timeout configurable like the network timeout is?
timer := time.NewTimer(900 * time.Second)
for {
select {
case events := <-input:
for {
// Do we need to populate the buffer again? Or do we already have it done?
if buffer.Len() == 0 {
sequence = last_ack_sequence
compressor, _ := zlib.NewWriterLevel(&buffer, 3)
for _, event := range events {
sequence += 1
writeDataFrame(event, sequence, compressor)
}
compressor.Flush()
compressor.Close()
compressed_payload = buffer.Bytes()
}
// Abort if our whole request takes longer than the configured network timeout.
socket.SetDeadline(time.Now().Add(config.timeout))
// Set the window size to the length of this payload in events.
_, err = socket.Write([]byte("1W"))
if err != nil {
log.Printf("Socket error, will reconnect: %s\n", err)
goto RetryPayload
}
err = binary.Write(socket, binary.BigEndian, uint32(len(events)))
if err != nil {
log.Printf("Socket error, will reconnect: %s\n", err)
goto RetryPayload
}
// Write compressed frame
_, err = socket.Write([]byte("1C"))
if err != nil {
log.Printf("Socket error, will reconnect: %s\n", err)
goto RetryPayload
}
err = binary.Write(socket, binary.BigEndian, uint32(len(compressed_payload)))
if err != nil {
log.Printf("Socket error, will reconnect: %s\n", err)
goto RetryPayload
}
_, err = socket.Write(compressed_payload)
if err != nil {
log.Printf("Socket error, will reconnect: %s\n", err)
goto RetryPayload
}
// read ack
for {
var frame [2]byte
// Each time we've received a frame, reset the deadline
socket.SetDeadline(time.Now().Add(config.timeout))
err = binary.Read(socket, binary.BigEndian, &frame)
if err != nil {
log.Printf("Socket error, will reconnect: %s\n", err)
goto RetryPayload
}
if frame == [2]byte{'1', 'A'} {
var ack_sequence uint32
// Read the sequence number acked
err = binary.Read(socket, binary.BigEndian, &ack_sequence)
if err != nil {
log.Printf("Socket error, will reconnect: %s\n", err)
goto RetryPayload
}
if sequence == ack_sequence {
// Give the registrar the remainder of the events so it can save to state the new offsets
registrar_chan <- []RegistrarEvent{&EventsEvent{Events: events}}
last_ack_sequence = ack_sequence
// All acknowledged! Stop reading acks
break
}
// NOTE(driskell): If the server is busy and not yet processed anything, we MAY
// end up receiving an ack for the last sequence in the previous payload, or 0
if ack_sequence == last_ack_sequence {
// Just keep waiting
continue
} else if ack_sequence-last_ack_sequence > uint32(len(events)) {
// This is wrong - we've already had an ack for these
log.Printf("Socket error, will reconnect: Repeated ACK\n")
goto RetryPayload
}
// Send the events to registrar so it can save to state the new offsets
registrar_chan <- []RegistrarEvent{&EventsEvent{Events: events[:ack_sequence-last_ack_sequence]}}
events = events[ack_sequence-last_ack_sequence:]
last_ack_sequence = ack_sequence
// Reset the events buffer so it gets regenerated if we need to retry the payload
buffer.Truncate(0)
continue
}
// Unknown frame!
log.Printf("Socket error, will reconnect: Unknown frame received: %s\n", frame)
goto RetryPayload
}
// Success, stop trying to send the payload.
break
RetryPayload:
// TODO(sissel): Track how frequently we timeout and reconnect. If we're
// timing out too frequently, there's really no point in timing out since
// basically everything is slow or down. We'll want to ratchet up the
// timeout value slowly until things improve, then ratchet it down once
// things seem healthy.
time.Sleep(config.reconnect)
socket.Close()
socket = connect(config)
}
// Reset the events buffer
buffer.Truncate(0)
// Prepare to enter idle by setting a long deadline... if we have more events we'll drop it down again
socket.SetDeadline(time.Now().Add(1800 * time.Second))
// Reset the timer
timer.Reset(900 * time.Second)
case <-timer.C:
// We've no events to send - throw a ping (well... window frame) so our connection doesn't idle and die
err = ping(config, socket)
if err != nil {
log.Printf("Socket error during ping, will reconnect: %s\n", err)
time.Sleep(config.reconnect)
socket.Close()
socket = connect(config)
}
// Reset the deadline
socket.SetDeadline(time.Now().Add(1800 * time.Second))
// Reset the timer
timer.Reset(900 * time.Second)
} /* select */
} /* for */
} // Publish
func ping(config *NetworkConfig, socket *tls.Conn) error {
// Set deadline for this write
socket.SetDeadline(time.Now().Add(config.timeout))
// This just keeps connection open through firewalls
// We don't await for a response so its not a real ping, the protocol does not provide for a real ping
// And with a complete replacement of protocol happening soon, makes no sense to add new frames and such
_, err := socket.Write([]byte("1W"))
if err != nil {
return err
}
err = binary.Write(socket, binary.BigEndian, uint32(*spool_size))
if err != nil {
return err
}
return nil
}
func connect(config *NetworkConfig) (socket *tls.Conn) {
var tlsconfig tls.Config
if len(config.SSLCertificate) > 0 && len(config.SSLKey) > 0 {
log.Printf("Loading client ssl certificate: %s and %s\n",
config.SSLCertificate, config.SSLKey)
cert, err := tls.LoadX509KeyPair(config.SSLCertificate, config.SSLKey)
if err != nil {
log.Fatalf("Failed loading client ssl certificate: %s\n", err)
}
tlsconfig.Certificates = []tls.Certificate{cert}
}
if len(config.SSLCA) > 0 {
log.Printf("Setting trusted CA from file: %s\n", config.SSLCA)
tlsconfig.RootCAs = x509.NewCertPool()
pemdata, err := ioutil.ReadFile(config.SSLCA)
if err != nil {
log.Fatalf("Failure reading CA certificate: %s\n", err)
}
block, _ := pem.Decode(pemdata)
if block == nil {
log.Fatalf("Failed to decode PEM data, is %s a valid cert?\n", config.SSLCA)
}
if block.Type != "CERTIFICATE" {
log.Fatalf("This is not a certificate file: %s\n", config.SSLCA)
}
cert, err := x509.ParseCertificate(block.Bytes)
if err != nil {
log.Fatalf("Failed to parse a certificate: %s\n", config.SSLCA)
}
tlsconfig.RootCAs.AddCert(cert)
}
for {
// Pick a random server from the list.
hostport := config.Servers[rand.Int()%len(config.Servers)]
submatch := hostport_re.FindSubmatch([]byte(hostport))
if submatch == nil {
log.Fatalf("Invalid host:port given: %s", hostport)
}
host := string(submatch[1])
port := string(submatch[2])
addresses, err := net.LookupHost(host)
if err != nil {
log.Printf("DNS lookup failure \"%s\": %s\n", host, err)
time.Sleep(config.reconnect)
continue
}
address := addresses[rand.Int()%len(addresses)]
addressport := net.JoinHostPort(address, port)
log.Printf("Connecting to %s (%s) \n", addressport, host)
tcpsocket, err := net.DialTimeout("tcp", addressport, config.timeout)
if err != nil {
log.Printf("Failure connecting to %s: %s\n", address, err)
time.Sleep(config.reconnect)
continue
}
socket = tls.Client(tcpsocket, &tlsconfig)
socket.SetDeadline(time.Now().Add(config.timeout))
err = socket.Handshake()
if err != nil {
log.Printf("Handshake failure with %s: Failed to TLS handshake: %s\n", address, err)
goto TryNextServer
}
log.Printf("Connected with %s\n", address)
// connected, let's rock and roll.
return
TryNextServer:
time.Sleep(config.reconnect)
socket.Close()
continue
}
return
}
func writeDataFrame(event *FileEvent, sequence uint32, output io.Writer) {
//log.Printf("event: %s\n", *event.Text)
// header, "2D"
// Why version 2 data frame? Because server.rb will correctly start returning partial ACKs if we specify version 2
// This keeps the old logstash forwarders, which broke on partial ACK, working with even the newer server.rb
// If the newer server.rb receives a 1D it will refuse to send partial ACK, just like before
output.Write([]byte("2D"))
// sequence number
binary.Write(output, binary.BigEndian, uint32(sequence))
// 'pair' count
binary.Write(output, binary.BigEndian, uint32(len(*event.Event)+1))
writeKV("file", *(*event.Event)["file"].(*string), output)
writeKV("host", hostname, output)
writeKV("offset", strconv.FormatInt((*event.Event)["offset"].(int64), 10), output)
writeKV("line", *(*event.Event)["line"].(*string), output)
for k, v := range *event.Event {
if k == "file" || k == "offset" || k == "line" {
continue
}
writeKV(k, *v.(*string), output)
}
}
func writeKV(key string, value string, output io.Writer) {
//log.Printf("kv: %d/%s %d/%s\n", len(key), key, len(value), value)
binary.Write(output, binary.BigEndian, uint32(len(key)))
output.Write([]byte(key))
binary.Write(output, binary.BigEndian, uint32(len(value)))
output.Write([]byte(value))
}