Skip to content

Commit

Permalink
filter_log_to_metrics: Add timer callback for emitting metrics
Browse files Browse the repository at this point in the history
This commit will change the log_to_metrics filter to use a timer based
metric inject and not directly inject metrics on every incoming log
record anymore. This will lower the overall load and memory consumption
 especially in high-volume and high-cardinality situations.

Signed-off-by: Richard Treu <[email protected]>
  • Loading branch information
drbugfinder-work committed Aug 20, 2024
1 parent 8a1d830 commit 4d9ee65
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 7 deletions.
69 changes: 63 additions & 6 deletions plugins/filter_log_to_metrics/log_to_metrics.c
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,35 @@ static int fill_labels(struct log_to_metrics_ctx *ctx, char **label_values,
return label_counter;
}

/* Timer callback to inject metrics into the pipeline */
static void cb_send_metric_chunk(struct flb_config *config, void *data)
{
int ret;
struct log_to_metrics_ctx *ctx = data;

/* Check that metric context is not empty */
if (ctx->cmt == NULL || ctx->input_ins == NULL) {
return;
}

if (ctx->new_data == FLB_TRUE) {
ret = flb_input_metrics_append(ctx->input_ins, ctx->tag,
strlen(ctx->tag), ctx->cmt);
if (ret != 0) {
flb_plg_error(ctx->ins, "could not append metrics");
}
}

/* Check if we are shutting down. If so, stop our timer */
if (config->is_shutting_down) {
if(ctx->timer && ctx->timer->active) {
flb_plg_debug(ctx->ins, "Stopping callback timer");
flb_sched_timer_cb_disable(ctx->timer);
}
}
ctx->new_data = FLB_FALSE;
}

static int cb_log_to_metrics_init(struct flb_filter_instance *f_ins,
struct flb_config *config, void *data)
{
Expand All @@ -462,6 +491,7 @@ static int cb_log_to_metrics_init(struct flb_filter_instance *f_ins,
char metric_subsystem[MAX_METRIC_LENGTH];
char value_field[MAX_METRIC_LENGTH];
struct flb_input_instance *input_ins;
struct flb_sched *sched;

int i;
/* Create context */
Expand Down Expand Up @@ -729,6 +759,26 @@ static int cb_log_to_metrics_init(struct flb_filter_instance *f_ins,
}
ctx->input_ins = input_ins;


/* Initialize timer for scheduled metric updates */
sched = flb_sched_ctx_get();
if(sched < 0) {
flb_plg_error(f_ins, "could not create scheduler");
log_to_metrics_destroy(ctx);
return -1;
}
flb_plg_debug(ctx->ins,
"Creating metric timer with frequency %dms",
ctx->timer_interval);
ret = flb_sched_timer_cb_create(sched, FLB_SCHED_TIMER_CB_PERM,
ctx->timer_interval, cb_send_metric_chunk,
ctx, &ctx->timer);
if (ret < 0) {
flb_plg_error(f_ins, "could not create timer callback");
log_to_metrics_destroy(ctx);
return -1;
}

return 0;
}

Expand Down Expand Up @@ -833,6 +883,7 @@ static int cb_log_to_metrics_filter(const void *data, size_t bytes,
case FLB_LOG_TO_METRICS_COUNTER:
ret = cmt_counter_inc(ctx->c, ts, label_count,
label_values);
ctx->new_data = FLB_TRUE;
break;

case FLB_LOG_TO_METRICS_GAUGE:
Expand Down Expand Up @@ -873,6 +924,7 @@ static int cb_log_to_metrics_filter(const void *data, size_t bytes,
flb_ra_destroy(ra);
ra = NULL;
}
ctx->new_data = FLB_TRUE;
break;

case FLB_LOG_TO_METRICS_HISTOGRAM:
Expand Down Expand Up @@ -913,18 +965,14 @@ static int cb_log_to_metrics_filter(const void *data, size_t bytes,
flb_ra_destroy(ra);
ra = NULL;
}
ctx->new_data = FLB_TRUE;
break;
default:
flb_plg_error(f_ins, "unsupported mode");
log_to_metrics_destroy(ctx);
return -1;
}

ret = flb_input_metrics_append(ctx->input_ins, ctx->tag, strlen(ctx->tag), ctx->cmt);
if (ret != 0) {
flb_plg_error(ctx->ins, "could not append metrics");
}

/* Cleanup */
msgpack_unpacked_destroy(&result);
if (label_values != NULL){
Expand Down Expand Up @@ -958,7 +1006,10 @@ static int cb_log_to_metrics_filter(const void *data, size_t bytes,
static int cb_log_to_metrics_exit(void *data, struct flb_config *config)
{
struct log_to_metrics_ctx *ctx = data;

if(ctx->timer) {
flb_plg_debug(ctx->ins, "Destroying callback timer");
flb_sched_timer_destroy(ctx->timer);
}
return log_to_metrics_destroy(ctx);
}

Expand Down Expand Up @@ -1044,6 +1095,12 @@ static struct flb_config_map config_map[] = {
"set a buffer limit to restrict memory usage of metrics emitter"
},

{
FLB_CONFIG_MAP_INT, "timer_interval_ms", DEFAULT_TIMER_INTERVAL,
0, FLB_TRUE, offsetof(struct log_to_metrics_ctx, timer_interval),
"set timer interval in milliseconds for sending metrics"
},

{
FLB_CONFIG_MAP_BOOL, "discard_logs", "false",
0, FLB_TRUE, offsetof(struct log_to_metrics_ctx, discard_logs),
Expand Down
5 changes: 4 additions & 1 deletion plugins/filter_log_to_metrics/log_to_metrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,13 @@

#define FLB_MEM_BUF_LIMIT_DEFAULT "10M"
#define DEFAULT_LOG_TO_METRICS_NAMESPACE "log_metric"
#define DEFAULT_TIMER_INTERVAL "250"


struct log_to_metrics_ctx {
struct mk_list rules;
struct flb_filter_instance *ins;
struct cmt *cmt;

struct flb_input_instance *input_ins;

char **label_keys;
Expand All @@ -83,6 +83,9 @@ struct log_to_metrics_ctx {
flb_sds_t tag;
flb_sds_t emitter_name;
size_t emitter_mem_buf_limit;
int timer_interval;
struct flb_sched_timer *timer;
int new_data;
};

struct grep_rule
Expand Down

0 comments on commit 4d9ee65

Please sign in to comment.