forked from mahlonsmith/netdata-timescale-relay
-
Notifications
You must be signed in to change notification settings - Fork 0
/
netdata_tsrelay.nim
389 lines (311 loc) · 12.4 KB
/
netdata_tsrelay.nim
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
# vim: set et nosta sw=4 ts=4 :
#
# Copyright (c) 2018-2020, Mahlon E. Smith <[email protected]>
# All rights reserved.
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
#
# * Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# * Neither the name of Mahlon E. Smith nor the names of his
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND ANY
# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE REGENTS AND CONTRIBUTORS BE LIABLE FOR ANY
# DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import
db_postgres,
json,
math,
nativesockets,
net,
parseopt,
posix,
strutils,
strformat,
tables,
terminal,
times
const
VERSION = "v0.3.0"
USAGE = """
./netdata_tsrelay [-adDhopqtTv]
-a --listen-addr:
The outbound IP address to listen for netdata streams.
-d --debug:
Debug: Show incoming and parsed data.
-D --dropconn:
Drop the persistent socket to netdata between samples to conserve
local resources. This may be helpful with a large number of clients.
Defaults to false.
-h --help:
Help. You're lookin' at it.
-o --dbopts:
The PostgreSQL connection string parameters.
The default connection string is:
"host=localhost dbname=netdata application_name=netdata-tsrelay"
-p --listen-port:
Change the listening port from the default (14866).
-P --persistent:
Don't disconnect from the database between samples. This may be
more efficient with a small number of clients, when not using a
pooler, or with a very high sample size/rate. Defaults to false.
-q --quiet:
Quiet mode. No output at all. Ignored if -d is supplied.
-T --dbtable:
Change the destination table name from the default (netdata).
-t --timeout:
Alter the maximum time (in ms) an open socket waits for data
before processing the sample. Default: 500ms.
-v --version:
Display version number.
"""
INSERT_SQL = """
INSERT INTO $1
( time, host, metrics )
VALUES
( 'epoch'::timestamptz + ? * '1 second'::interval, ?, ? )
"""
type
Config = object of RootObj
dbopts: string # The postgresql connection parameters. (See https://www.postgresql.org/docs/current/static/libpq-connect.html)
dbtable: string # The name of the table to write to.
dropconn: bool # Close the TCP connection between samples.
persistent: bool # Don't close the database handle between samples.
listen_port: int # The port to listen for incoming connections.
listen_addr: string # The IP address listen for incoming connections. Defaults to inaddr_any.
verbose: bool # Be informative
debug: bool # Spew out raw data
insertsql: string # The SQL insert string after interpolating the table name.
timeout: int # How long to block, waiting on connection data.
type
NetdataClient = ref object
sock: Socket # The raw socket fd
address: string # The remote IP address
db: DbConn # An optionally persistent database handle
# Global configuration
var conf: Config
proc hl( msg: string, fg: ForegroundColor, bright=false ): string =
## Quick wrapper for color formatting a string, since the 'terminal'
## module only deals with stdout directly.
if not isatty(stdout): return msg
var color: BiggestInt = ord( fg )
if bright: inc( color, 60 )
result = "\e[" & $color & 'm' & msg & "\e[0m"
proc fetch_data( client: NetdataClient ): string =
## Netdata JSON backend doesn't send a length nor a separator
## between samples, so we read line by line and wait for stream
## timeout to determine what constitutes a sample.
var buf = ""
while true:
try:
client.sock.readline( buf, timeout=conf.timeout )
if buf == "":
if conf.debug: echo "Client {client.address} closed socket.".fmt.hl( fgRed, bright=true )
quit( 1 )
result = result & buf & "\n"
except OSError:
quit( 1 )
except TimeoutError:
if result == "": continue
return
proc parse_data( data: string ): seq[ JsonNode ] =
## Given a raw +data+ string, parse JSON and return a sequence
## of JSON samples. Netdata can buffer multiple samples in one batch.
result = @[]
if data == "": return
# Hash of sample timeperiods to pivoted json data
var pivoted_data = init_table[ BiggestInt, JsonNode ]()
for sample in split_lines( data ):
if sample == "": continue
if conf.debug: echo sample.hl( fgBlack, bright=true )
var parsed: JsonNode
try:
parsed = sample.parse_json
except JsonParsingError:
if conf.debug: echo hl( "Unable to parse sample line: " & sample.hl(fgRed, bright=true), fgRed )
continue
if parsed.kind != JObject: return
# Create or use existing Json object for modded data.
#
var pivot: JsonNode
try:
let key = parsed[ "timestamp" ].get_int
if pivoted_data.has_key( key ):
pivot = pivoted_data[ key ]
else:
pivot = newJObject()
pivoted_data[ key ] = pivot
var name = parsed[ "chart_id" ].get_str & "." & parsed[ "id" ].get_str
pivot[ "hostname" ] = parsed[ "hostname" ]
pivot[ "timestamp" ] = parsed[ "timestamp" ]
pivot[ name ] = parsed[ "value" ]
if parsed.has_key( "labels" ):
pivot[ "labels" ] = parsed[ "labels" ]
except:
continue
for timestamp, sample in pivoted_data:
result.add( sample )
proc write_to_database( client: NetdataClient, samples: seq[ JsonNode ] ): void =
## Given a sequence of json samples, write them to database.
if samples.len == 0: return
if client.db.isNil:
client.db = open( "", "", "", conf.dbopts )
try:
client.db.exec sql( "BEGIN" )
for sample in samples:
var
timestamp = sample[ "timestamp" ].get_int
host = sample[ "hostname" ].get_str.to_lowerascii
sample.delete( "timestamp" )
sample.delete( "hostname" )
client.db.exec sql( conf.insertsql ), timestamp, host, sample
client.db.exec sql( "COMMIT" )
except:
let
e = getCurrentException()
msg = getCurrentExceptionMsg()
echo "Got exception ", repr(e), " while writing to DB: ", msg
discard
if not conf.persistent:
client.db.close
client.db = nil
proc process( client: NetdataClient ): void =
## Do the work for a connected client within child process.
let t0 = cpu_time()
var raw_data = client.fetch_data
# Done with the socket, netdata will automatically
# reconnect. Save local resources/file descriptors
# by closing after the send is considered complete.
#
if conf.dropconn:
try:
client.sock.close
except OSError:
return
# Pivot the parsed data to a single JSON blob per sample time.
var samples = parse_data( raw_data )
client.write_to_database( samples )
if conf.verbose:
let cputime = cpu_time() - t0
echo(
hl( $(epochTime().to_int), fgMagenta, bright=true ),
" ",
hl( $(samples.len), fgWhite, bright=true ),
" sample(s) parsed from ",
client.address.hl( fgYellow, bright=true ),
" in ", hl( "{cputime:<2.3f}".fmt, fgWhite, bright=true), " seconds."
)
proc serverloop( conf: Config ): void =
## Open a database connection, bind to the listening socket,
## and start serving incoming netdata streams.
let db = open( "", "", "", conf.dbopts )
db.close
if conf.verbose: echo( "Successfully tested connection to the backend database.".hl( fgGreen ) )
# Ensure children are properly reaped.
#
var sa: Sigaction
sa.sa_handler = SIG_IGN
discard sigaction( SIGCHLD, sa )
# Setup listening socket.
#
var server = newSocket()
server.set_sock_opt( OptReuseAddr, true )
server.bind_addr( Port(conf.listen_port), conf.listen_addr )
server.listen()
if conf.verbose:
echo(
"Listening for incoming connections on ".hl( fgGreen, bright=true ),
hl( (if conf.listen_addr == "0.0.0.0": "*" else: conf.listen_addr) , fgBlue, bright=true ),
":",
hl( $conf.listen_port, fgBlue, bright=true ),
)
echo ""
# Wait for incoming connections, fork for each client.
#
while true:
let client = NetdataClient.new
client.sock = Socket.new
# Block, waiting for new connections.
server.acceptAddr( client.sock, client.address )
if fork() == 0:
server.close
if conf.dropconn:
# "one shot" mode.
client.process
quit( 0 )
else:
# Keep the connection to netdata open.
while true: client.process
client.sock.close
when defined( testing ): dumpNumberOfInstances()
proc parse_cmdline: Config =
## Populate the config object with the user's preferences.
# Config object defaults.
#
result = Config(
dbopts: "host=localhost dbname=netdata application_name=netdata-tsrelay",
dbtable: "netdata",
dropconn: false,
listen_port: 14866,
listen_addr: "0.0.0.0",
verbose: true,
debug: false,
timeout: 500,
persistent: false,
insertsql: INSERT_SQL % [ "netdata" ]
)
# always set debug mode if development build.
result.debug = defined( testing )
for kind, key, val in getopt():
case kind
of cmdArgument:
discard
of cmdLongOption, cmdShortOption:
case key
of "debug", "d":
result.debug = true
of "dropconn", "D":
if result.persistent:
echo "Dropping TCP sockets are incompatible with persistent database connections."
quit( 1 )
result.dropconn = true
of "help", "h":
echo USAGE
quit( 0 )
of "quiet", "q":
result.verbose = false
of "version", "v":
echo hl( "netdata_tsrelay " & VERSION, fgWhite, bright=true )
quit( 0 )
of "timeout", "t": result.timeout = val.parse_int
of "dbtable", "T":
result.insertsql = INSERT_SQL % [ val ]
of "dbopts", "o": result.dbopts = val
of "listen-addr", "a": result.listen_addr = val
of "listen-port", "p": result.listen_port = val.parse_int
of "persistent", "P":
if result.dropconn:
echo "Persistent database connections are incompatible with dropping TCP sockets."
quit( 1 )
result.persistent = true
else: discard
of cmdEnd: assert( false ) # shouldn't reach here ever
when isMainModule:
system.addQuitProc( resetAttributes )
conf = parse_cmdline()
if conf.debug: echo hl( $conf, fgYellow )
serverloop( conf )