Skip to content

Commit

Permalink
statsd: drop-in standalone implementation
Browse files Browse the repository at this point in the history
This gets rid of the semi-borken statsd-c-client dependency and uses a
standalone implementation instead. Also supports floating point gauges.

Currently a packet is still sent for each data point to keep the patch
within statsd.c, but the code can be further optimized to send multiple
datapoint per packet. The code does not modify the passed arguments.
  • Loading branch information
f00b4r0 committed Aug 15, 2024
1 parent a398633 commit f063a86
Show file tree
Hide file tree
Showing 3 changed files with 256 additions and 7 deletions.
3 changes: 1 addition & 2 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -171,10 +171,9 @@ if(SOAPYSDR)
endif()

if(ETSY_STATSD)
find_library(STATSD_FOUND statsdclient)
SET(STATSD_FOUND TRUE)
if(STATSD_FOUND)
list(APPEND dumpvdl2_extra_sources statsd.c)
list(APPEND dumpvdl2_extra_libs statsdclient)
set(WITH_STATSD TRUE)
endif()
endif()
Expand Down
2 changes: 1 addition & 1 deletion src/dumpvdl2.h
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ void statsd_counter_per_channel_increment(uint32_t freq, char *counter);
void statsd_timing_delta_per_channel_send(uint32_t freq, char *timer, struct timeval ts);
void statsd_counter_per_msgdir_increment(la_msg_dir msg_dir, char *counter);
void statsd_counter_increment(char *counter);
void statsd_gauge_set(char *gauge, size_t value);
void statsd_gauge_set(char *gauge, long value);
#define statsd_increment_per_channel(freq, counter) statsd_counter_per_channel_increment(freq, counter)
#define statsd_timing_delta_per_channel(freq, timer, start) statsd_timing_delta_per_channel_send(freq, timer, start)
#define statsd_increment_per_msgdir(counter, msgdir) statsd_counter_per_msgdir_increment(counter, msgdir)
Expand Down
258 changes: 254 additions & 4 deletions src/statsd.c
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
* This file is a part of dumpvdl2
*
* Copyright (c) 2017-2023 Tomasz Lemiech <[email protected]>
* Copyright (c) 2024 Thibaut VARENE <[email protected]>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
Expand All @@ -22,14 +23,17 @@
#include <unistd.h>
#include <string.h>
#include <sys/time.h>
#include <statsd/statsd-client.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netdb.h>
#include <libacars/libacars.h> // la_msg_dir
#include <libacars/vstring.h> // la_vstring
#include "dumpvdl2.h"
#include "config.h"

#define STATSD_UDP_BUFSIZE 1432 ///< udp buffer size. Untold rule seems to be that the datagram must not be fragmented.

#define STATSD_NAMESPACE "dumpvdl2"
static statsd_link *statsd = NULL;

static char const *counters_per_channel[] = {
"avlc.errors.bad_fcs",
Expand Down Expand Up @@ -88,6 +92,252 @@ static char const *msg_dir_labels[] = {
[LA_MSG_DIR_GND2AIR] = "gnd2air"
};

static struct _statsd_runtime {
char *namespace; ///< statsd namespace prefix (dot-terminated)
struct sockaddr_storage ai_addr;
socklen_t ai_addrlen;
int sockfd;
} statsd_runtime = {};

typedef struct _statsd_runtime statsd_link;

static statsd_link *statsd = NULL;

static statsd_link *statsd_init_with_namespace(const char *host, const char *port, const char *ns)
{
int sockfd;
struct addrinfo hints;
struct addrinfo *result, *rp;
int ret;

// obtain address(es) matching host/port
memset(&hints, 0, sizeof(hints));
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_DGRAM;
hints.ai_protocol = IPPROTO_UDP;

ret = getaddrinfo(host, port, &hints, &result);
if (ret) {
fprintf(stderr, "statsd: getaddrinfo: %s\n", gai_strerror(ret));
return NULL;
}

// try each address until one succeeds
for (rp = result; rp; rp = rp->ai_next) {
sockfd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
if (-1 != sockfd)
break; // success
}

if (!rp) {
fprintf(stderr, "statsd: Could not reach server\n");
goto cleanup;
}

memcpy(&statsd_runtime.ai_addr, rp->ai_addr, rp->ai_addrlen); // ai_addrlen is guaranteed to be <= sizeof(sockaddr_storage)
statsd_runtime.ai_addrlen = rp->ai_addrlen;

ret = strlen(ns);
statsd_runtime.namespace = malloc(ret + 2);
if(!statsd_runtime.namespace) {
perror("statsd");
goto cleanup;
}

strcpy(statsd_runtime.namespace, ns);
statsd_runtime.namespace[ret++] = '.';
statsd_runtime.namespace[ret] = '\0';

statsd_runtime.sockfd = sockfd;

freeaddrinfo(result);
return &statsd_runtime;

cleanup:
freeaddrinfo(result);

return NULL;
}

#ifdef DEBUG
static int statsd_validate(const char * stat)
{
const char * p;
for (p = stat; *p; p++) {
switch (*p) {
case ':':
case '|':
case '@':
return (-1);
default:
; // nothing
}
}

return 0;
}
#endif

struct statsd_metric {
enum { STATSD_UCOUNTER, STATSD_IGAUGE, STATSD_FGAUGE, STATSD_TIMING } type;
const char *name;
union { unsigned long u; long l; float f; } value;
};

/**
* Update StatsD metrics.
* @param metrics an array of metrics to push to StatsD
* @param nmetrics the array size
* @return exec status
* @warning not thread-safe.
*/
static int statsd_update(const struct statsd_metric * const metrics, const unsigned int nmetrics)
{
static char sbuffer[STATSD_UDP_BUFSIZE];
const char *mtype;
char * buffer;
bool zerofirst;
int ret;
ssize_t sent;
size_t avail;
unsigned int i;

buffer = sbuffer;
avail = STATSD_UDP_BUFSIZE;

for (i = 0; i < nmetrics; i++) {
#ifdef DEBUG
if ((statsd_validate(metrics[i].name) != 0)) {
fprintf(stderr, "statsd: ignoring invalid name \"%s\"", metrics[i].name);
continue;
}
#endif

zerofirst = false;

switch (metrics[i].type) {
case STATSD_IGAUGE:
mtype = "g";
if (metrics[i].value.l < 0)
zerofirst = true;
break;
case STATSD_FGAUGE:
mtype = "g";
if (metrics[i].value.f < 0.0F)
zerofirst = true;
break;
case STATSD_UCOUNTER:
mtype = "c";
break;
case STATSD_TIMING:
mtype = "ms";
break;
default:
ret = -1;
goto cleanup;
}

restartzero:
// StatsD has a schizophrenic idea of what a gauge is (negative values are subtracted from previous data and not registered as is): work around its dementia
if (zerofirst) {
ret = snprintf(buffer, avail, "%s%s:0|%s\n", statsd_runtime.namespace ? statsd_runtime.namespace : "", metrics[i].name, mtype);
if (ret < 0) {
ret = -1;
goto cleanup;
}
else if ((size_t)ret >= avail) {
// send what we have, reset buffer, restart - no need to add '\0': sendto will truncate anyway
sendto(statsd_runtime.sockfd, sbuffer, STATSD_UDP_BUFSIZE - avail, 0, (struct sockaddr *)&statsd_runtime.ai_addr, statsd_runtime.ai_addrlen);
buffer = sbuffer;
avail = STATSD_UDP_BUFSIZE;
goto restartzero;
}
buffer += ret;
avail -= (size_t)ret;
}

restartbuffer:
switch (metrics[i].type) {
case STATSD_IGAUGE:
ret = snprintf(buffer, avail, "%s%s:%ld|%s\n", statsd_runtime.namespace ? statsd_runtime.namespace : "", metrics[i].name, metrics[i].value.l, mtype);
break;
case STATSD_UCOUNTER:
case STATSD_TIMING:
ret = snprintf(buffer, avail, "%s%s:%lu|%s\n", statsd_runtime.namespace ? statsd_runtime.namespace : "", metrics[i].name, metrics[i].value.u, mtype);
break;
case STATSD_FGAUGE:
ret = snprintf(buffer, avail, "%s%s:%f|%s\n", statsd_runtime.namespace ? statsd_runtime.namespace : "", metrics[i].name, metrics[i].value.f, mtype);
break;
default:
ret = 0;
break; // cannot happen thanks to previous switch()
}

if (ret < 0) {
ret = -1;
goto cleanup;
}
else if ((size_t)ret >= avail) {
// send what we have, reset buffer, restart - no need to add '\0': sendto will truncate anyway
sendto(statsd_runtime.sockfd, sbuffer, STATSD_UDP_BUFSIZE - avail, 0, (struct sockaddr *)&statsd_runtime.ai_addr, statsd_runtime.ai_addrlen);
buffer = sbuffer;
avail = STATSD_UDP_BUFSIZE;
goto restartbuffer;
}
buffer += ret;
avail -= (size_t)ret;
}

ret = 0;

cleanup:
// we only check for sendto() errors here
sent = sendto(statsd_runtime.sockfd, sbuffer, STATSD_UDP_BUFSIZE - avail, 0, (struct sockaddr *)&statsd_runtime.ai_addr, statsd_runtime.ai_addrlen);
if (-1 == sent)
perror("statsd");

return ret;
}

int statsd_count(__attribute__ ((unused)) statsd_link *link, char *stat, unsigned long value, __attribute__ ((unused)) float sample_rate)
{
struct statsd_metric m = {
.type = STATSD_UCOUNTER,
.name = stat,
.value.u = value,
};

return statsd_update(&m, 1);
}

int statsd_inc(statsd_link *link, char *stat, float sample_rate)
{
return statsd_count(link, stat, 1, sample_rate);
}

int statsd_gauge(__attribute__ ((unused)) statsd_link *link, char *stat, long value)
{
struct statsd_metric m = {
.type = STATSD_IGAUGE,
.name = stat,
.value.l = value,
};

return statsd_update(&m, 1);
}

int statsd_timing(__attribute__ ((unused)) statsd_link *link, char *stat, unsigned long ms)
{
struct statsd_metric m = {
.type = STATSD_TIMING,
.name = stat,
.value.u = ms,
};

return statsd_update(&m, 1);

}

int statsd_initialize(char *statsd_addr) {
char *addr;
char *port;
Expand All @@ -107,7 +357,7 @@ int statsd_initialize(char *statsd_addr) {
fprintf(stderr, "Using extended statsd namespace %s.%s\n", STATSD_NAMESPACE, Config.station_id);
la_vstring_append_sprintf(statsd_namespace, ".%s", Config.station_id);
}
statsd = statsd_init_with_namespace(addr, atoi(port), statsd_namespace->str);
statsd = statsd_init_with_namespace(addr, port, statsd_namespace->str);
la_vstring_destroy(statsd_namespace, true);
if(statsd == NULL) {
return -2;
Expand Down Expand Up @@ -176,7 +426,7 @@ void statsd_counter_increment(char *counter) {
statsd_inc(statsd, counter, 1.0);
}

void statsd_gauge_set(char *gauge, size_t value) {
void statsd_gauge_set(char *gauge, long value) {
if(statsd == NULL) {
return;
}
Expand Down

0 comments on commit f063a86

Please sign in to comment.