Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

filter_log_to_metrics: Add timer callback for emitting metrics #9251

Merged
merged 1 commit into from
Sep 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 97 additions & 7 deletions plugins/filter_log_to_metrics/log_to_metrics.c
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,35 @@ static int fill_labels(struct log_to_metrics_ctx *ctx, char **label_values,
return label_counter;
}

/* Timer callback to inject metrics into the pipeline */
static void cb_send_metric_chunk(struct flb_config *config, void *data)
{
int ret;
struct log_to_metrics_ctx *ctx = data;

/* Check that metric context is not empty */
if (ctx->cmt == NULL || ctx->input_ins == NULL) {
return;
}

if (ctx->new_data) {
ret = flb_input_metrics_append(ctx->input_ins, ctx->tag,
strlen(ctx->tag), ctx->cmt);
if (ret != 0) {
flb_plg_error(ctx->ins, "could not append metrics");
}
}

/* Check if we are shutting down. If so, stop our timer */
if (config->is_shutting_down) {
if(ctx->timer && ctx->timer->active) {
flb_plg_debug(ctx->ins, "Stopping callback timer");
flb_sched_timer_cb_disable(ctx->timer);
}
}
ctx->new_data = FLB_FALSE;
}

static int cb_log_to_metrics_init(struct flb_filter_instance *f_ins,
struct flb_config *config, void *data)
{
Expand All @@ -462,6 +491,7 @@ static int cb_log_to_metrics_init(struct flb_filter_instance *f_ins,
char metric_subsystem[MAX_METRIC_LENGTH];
char value_field[MAX_METRIC_LENGTH];
struct flb_input_instance *input_ins;
struct flb_sched *sched;

int i;
/* Create context */
Expand Down Expand Up @@ -673,7 +703,7 @@ static int cb_log_to_metrics_init(struct flb_filter_instance *f_ins,
}

ret = flb_input_name_exists(ctx->emitter_name, config);
if (ret == FLB_TRUE) {
if (ret) {
flb_plg_error(f_ins, "emitter_name '%s' already exists",
ctx->emitter_name);
flb_sds_destroy(ctx->emitter_name);
Expand Down Expand Up @@ -729,6 +759,43 @@ static int cb_log_to_metrics_init(struct flb_filter_instance *f_ins,
}
ctx->input_ins = input_ins;


if (ctx->flush_interval_sec <= 0) {
ctx->flush_interval_sec = strtol(DEFAULT_INTERVAL_SEC, NULL, 10);
}
if (ctx->flush_interval_nsec <= 0) {
ctx->flush_interval_nsec = strtol(DEFAULT_INTERVAL_NSEC, NULL, 10);
}
if (ctx->flush_interval_sec == 0 && ctx->flush_interval_nsec == 0) {
flb_plg_debug(ctx->ins, "Interval is set to 0, will not use timer and "
"send metrics immediately");
ctx->timer_mode = FLB_FALSE;
return 0;
}

/* Initialize timer for scheduled metric updates */
sched = flb_sched_ctx_get();
if(sched == 0) {
flb_plg_error(f_ins, "could not get scheduler context");
log_to_metrics_destroy(ctx);
return -1;
}
/* Convert flush_interval_sec and flush_interval_nsec to milliseconds */
ctx->timer_interval = (ctx->flush_interval_sec * 1000) +
(ctx->flush_interval_nsec / 1000000);
flb_plg_debug(ctx->ins,
"Creating metric timer with frequency %d ms",
ctx->timer_interval);

ret = flb_sched_timer_cb_create(sched, FLB_SCHED_TIMER_CB_PERM,
ctx->timer_interval, cb_send_metric_chunk,
ctx, &ctx->timer);
if (ret < 0) {
flb_plg_error(f_ins, "could not create timer callback");
log_to_metrics_destroy(ctx);
return -1;
}
ctx->timer_mode = FLB_TRUE;
return 0;
}

Expand Down Expand Up @@ -920,9 +987,17 @@ static int cb_log_to_metrics_filter(const void *data, size_t bytes,
return -1;
}

ret = flb_input_metrics_append(ctx->input_ins, ctx->tag, strlen(ctx->tag), ctx->cmt);
if (ret != 0) {
flb_plg_error(ctx->ins, "could not append metrics");
if (!ctx->timer_mode) {
ret = flb_input_metrics_append(ctx->input_ins, ctx->tag,
strlen(ctx->tag), ctx->cmt);

if (ret != 0) {
flb_plg_error(ctx->ins, "could not append metrics. "
"Please consider to use flush_interval_sec and flush_interval_nsec");
}
}
else {
ctx->new_data = FLB_TRUE;
}

/* Cleanup */
Expand All @@ -941,6 +1016,7 @@ static int cb_log_to_metrics_filter(const void *data, size_t bytes,
}
}


if (ctx->discard_logs) {
*out_buf = NULL;
*out_size = 0;
Expand All @@ -958,7 +1034,10 @@ static int cb_log_to_metrics_filter(const void *data, size_t bytes,
static int cb_log_to_metrics_exit(void *data, struct flb_config *config)
{
struct log_to_metrics_ctx *ctx = data;

if(ctx->timer != NULL) {
flb_plg_debug(ctx->ins, "Destroying callback timer");
flb_sched_timer_destroy(ctx->timer);
}
return log_to_metrics_destroy(ctx);
}

Expand Down Expand Up @@ -1037,13 +1116,24 @@ static struct flb_config_map config_map[] = {
0, FLB_TRUE, offsetof(struct log_to_metrics_ctx, emitter_name),
"Name of the emitter (advanced users)"
},

{
FLB_CONFIG_MAP_SIZE, "emitter_mem_buf_limit", FLB_MEM_BUF_LIMIT_DEFAULT,
0, FLB_TRUE, offsetof(struct log_to_metrics_ctx, emitter_mem_buf_limit),
"set a buffer limit to restrict memory usage of metrics emitter"
},

{
FLB_CONFIG_MAP_INT, "flush_interval_sec", DEFAULT_INTERVAL_SEC,
0, FLB_TRUE, offsetof(struct log_to_metrics_ctx, flush_interval_sec),
"Set the timer interval for metrics emission. If flush_interval_sec and "
"flush_interval_nsec are set to 0, the timer is disabled (default)."
},
{
FLB_CONFIG_MAP_INT, "flush_interval_nsec", DEFAULT_INTERVAL_NSEC,
0, FLB_TRUE, offsetof(struct log_to_metrics_ctx, flush_interval_nsec),
"Set the timer interval (subseconds) for metrics emission. "
"If flush_interval_sec and flush_interval_nsec are set to 0, the timer is disabled "
"(default). Final precision is milliseconds."
},
{
FLB_CONFIG_MAP_BOOL, "discard_logs", "false",
0, FLB_TRUE, offsetof(struct log_to_metrics_ctx, discard_logs),
Expand Down
10 changes: 8 additions & 2 deletions plugins/filter_log_to_metrics/log_to_metrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,13 @@

#define FLB_MEM_BUF_LIMIT_DEFAULT "10M"
#define DEFAULT_LOG_TO_METRICS_NAMESPACE "log_metric"

#define DEFAULT_INTERVAL_SEC "0"
#define DEFAULT_INTERVAL_NSEC "0"

struct log_to_metrics_ctx {
struct mk_list rules;
struct flb_filter_instance *ins;
struct cmt *cmt;

struct flb_input_instance *input_ins;

char **label_keys;
Expand All @@ -83,6 +83,12 @@ struct log_to_metrics_ctx {
flb_sds_t tag;
flb_sds_t emitter_name;
size_t emitter_mem_buf_limit;
long flush_interval_sec;
long flush_interval_nsec;
int timer_interval;
int timer_mode;
struct flb_sched_timer *timer;
int new_data;
};

struct grep_rule
Expand Down
Loading