Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

filter_log_to_metrics: Add setting "label" #7739

Merged
merged 4 commits into from
Sep 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 86 additions & 41 deletions plugins/filter_log_to_metrics/log_to_metrics.c
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
#include <sys/types.h>


static char kubernetes_label_keys[NUMBER_OF_KUBERNETES_LABELS][16] =
static char kubernetes_label_keys[NUMBER_OF_KUBERNETES_LABELS][16] =
{ "namespace_name",
"pod_name",
"container_name",
Expand Down Expand Up @@ -77,9 +77,15 @@ static int log_to_metrics_destroy(struct log_to_metrics_ctx *ctx)
if (ctx->cmt) {
cmt_destroy(ctx->cmt);
}

delete_rules(ctx);

if (ctx->label_accessors != NULL) {
for (i = 0; i < MAX_LABEL_COUNT; i++) {
flb_free(ctx->label_accessors[i]);
}
flb_free(ctx->label_accessors);
}
if (ctx->label_keys != NULL) {
for (i = 0; i < MAX_LABEL_COUNT; i++) {
flb_free(ctx->label_keys[i]);
Expand Down Expand Up @@ -224,22 +230,26 @@ static inline int grep_filter_data(msgpack_object map,
}

static int set_labels(struct log_to_metrics_ctx *ctx,
char **label_keys,
int *label_counter,
char **label_accessors,
char **label_keys,
int *label_counter,
struct flb_filter_instance *f_ins)
{

struct mk_list *head;
struct flb_kv *kv;
struct mk_list *split;
flb_sds_t tmp;
struct flb_kv *kv;
struct flb_split_entry *sentry;
int counter = 0;
int i;
if (MAX_LABEL_COUNT < NUMBER_OF_KUBERNETES_LABELS){
flb_errno();
return -1;
}
if (ctx->kubernetes_mode){
for (i = 0; i < NUMBER_OF_KUBERNETES_LABELS; i++){
snprintf(label_keys[i], MAX_LABEL_LENGTH - 1, "%s",
for (i = 0; i < NUMBER_OF_KUBERNETES_LABELS; i++){
snprintf(label_keys[i], MAX_LABEL_LENGTH - 1, "%s",
kubernetes_label_keys[i]);
}
counter = NUMBER_OF_KUBERNETES_LABELS;
Expand All @@ -249,14 +259,39 @@ static int set_labels(struct log_to_metrics_ctx *ctx,
mk_list_foreach(head, &f_ins->properties) {
kv = mk_list_entry(head, struct flb_kv, _head);

if (strcasecmp(kv->key, "label_field") != 0) {
continue;
}

if (counter >= MAX_LABEL_COUNT) {
return MAX_LABEL_COUNT;
}
snprintf(label_keys[counter++], MAX_LABEL_LENGTH - 1, "%s", kv->val);

if (strcasecmp(kv->key, "label_field") == 0) {
snprintf(label_accessors[counter], MAX_LABEL_LENGTH - 1, "%s", kv->val);
snprintf(label_keys[counter], MAX_LABEL_LENGTH - 1, "%s", kv->val);
counter++;
}
else if (strcasecmp(kv->key, "label") == 0) {
split = flb_utils_split(kv->val, ' ', 1);
if (mk_list_size(split) != 2) {
flb_plg_error(ctx->ins, "invalid label, expected name and key");
flb_utils_split_free(split);
return -1;
}

sentry = mk_list_entry_first(split, struct flb_split_entry, _head);
tmp = flb_sds_create_len(sentry->value, sentry->len);
snprintf(label_keys[counter], MAX_LABEL_LENGTH - 1, "%s", tmp);
flb_sds_destroy(tmp);

sentry = mk_list_entry_last(split, struct flb_split_entry, _head);
tmp = flb_sds_create_len(sentry->value, sentry->len);
snprintf(label_accessors[counter], MAX_LABEL_LENGTH - 1, "%s", tmp);
flb_sds_destroy(tmp);
counter++;

flb_utils_split_free(split);
}
else {
continue;
}
}
*label_counter = counter;
return counter;
Expand Down Expand Up @@ -346,7 +381,7 @@ static int set_buckets(struct log_to_metrics_ctx *ctx,
static int fill_labels(struct log_to_metrics_ctx *ctx, char **label_values,
char kubernetes_label_values
[NUMBER_OF_KUBERNETES_LABELS][MAX_LABEL_LENGTH],
char **label_keys, int label_counter, msgpack_object map)
char **label_accessors, int label_counter, msgpack_object map)
{
int label_iterator_start = 0;
int i;
Expand All @@ -365,14 +400,14 @@ static int fill_labels(struct log_to_metrics_ctx *ctx, char **label_values,
if (kubernetes_label_keys[i] == NULL){
return -1;
}
snprintf(label_values[i], MAX_LABEL_LENGTH - 1, "%s",
snprintf(label_values[i], MAX_LABEL_LENGTH - 1, "%s",
kubernetes_label_values[i]);
}
label_iterator_start = NUMBER_OF_KUBERNETES_LABELS;
}

for (i = label_iterator_start; i < label_counter; i++){
ra = flb_ra_create(label_keys[i], FLB_TRUE);
ra = flb_ra_create(label_accessors[i], FLB_TRUE);
if (!ra) {
flb_warn("invalid record accessor key, aborting");
break;
Expand All @@ -383,15 +418,15 @@ static int fill_labels(struct log_to_metrics_ctx *ctx, char **label_values,
/* Set value to empty string, so the value will be dropped in Cmetrics*/
label_values[i][0] = '\0';
} else if (rval->type == FLB_RA_STRING) {
snprintf(label_values[i], MAX_LABEL_LENGTH - 1, "%s",
snprintf(label_values[i], MAX_LABEL_LENGTH - 1, "%s",
rval->val.string);
}
else if (rval->type == FLB_RA_FLOAT) {
snprintf(label_values[i], MAX_LABEL_LENGTH - 1, "%f",
snprintf(label_values[i], MAX_LABEL_LENGTH - 1, "%f",
rval->val.f64);
}
else if (rval->type == FLB_RA_INT) {
snprintf(label_values[i], MAX_LABEL_LENGTH - 1, "%ld",
snprintf(label_values[i], MAX_LABEL_LENGTH - 1, "%ld",
(long)rval->val.i64);
}
else {
Expand Down Expand Up @@ -460,6 +495,11 @@ static int cb_log_to_metrics_init(struct flb_filter_instance *f_ins,
return -1;
}

ctx->label_accessors = NULL;
ctx->label_accessors = (char **) flb_malloc(MAX_LABEL_COUNT * sizeof(char *));
for (i = 0; i < MAX_LABEL_COUNT; i++) {
ctx->label_accessors[i] = flb_malloc(MAX_LABEL_LENGTH * sizeof(char));
}
/* Set label keys */
ctx->label_keys = NULL;
ctx->label_keys = (char **) flb_malloc(MAX_LABEL_COUNT * sizeof(char *));
Expand All @@ -468,7 +508,7 @@ static int cb_log_to_metrics_init(struct flb_filter_instance *f_ins,
}
ctx->label_counter = NULL;
ctx->label_counter = flb_malloc(sizeof(int));
label_count = set_labels(ctx, ctx->label_keys, ctx->label_counter, f_ins);
label_count = set_labels(ctx, ctx->label_accessors, ctx->label_keys, ctx->label_counter, f_ins);
if (label_count < 0){
log_to_metrics_destroy(ctx);
return -1;
Expand All @@ -482,7 +522,7 @@ static int cb_log_to_metrics_init(struct flb_filter_instance *f_ins,
}

/* Check property metric mode */
ctx->mode = 0;
ctx->mode = 0;
tmp = (char *)flb_filter_get_property("metric_mode", f_ins);
if (tmp != NULL) {
if (strcasecmp(tmp, FLB_LOG_TO_METRICS_COUNTER_STR) == 0) {
Expand Down Expand Up @@ -534,7 +574,7 @@ static int cb_log_to_metrics_init(struct flb_filter_instance *f_ins,
log_to_metrics_destroy(ctx);
return -1;
}
snprintf(value_field, sizeof(value_field) - 1, "%s",
snprintf(value_field, sizeof(value_field) - 1, "%s",
ctx->value_field);
}

Expand Down Expand Up @@ -563,12 +603,12 @@ static int cb_log_to_metrics_init(struct flb_filter_instance *f_ins,
switch (ctx->mode) {
case FLB_LOG_TO_METRICS_COUNTER:
ctx->c = cmt_counter_create(ctx->cmt, "log_metric", "counter",
metric_name, metric_description,
metric_name, metric_description,
label_count, ctx->label_keys);
break;
case FLB_LOG_TO_METRICS_GAUGE:
ctx->g = cmt_gauge_create(ctx->cmt, "log_metric", "gauge",
metric_name, metric_description,
metric_name, metric_description,
label_count, ctx->label_keys);
break;
case FLB_LOG_TO_METRICS_HISTOGRAM:
Expand Down Expand Up @@ -619,7 +659,7 @@ static int cb_log_to_metrics_init(struct flb_filter_instance *f_ins,
}

static int cb_log_to_metrics_filter(const void *data, size_t bytes,
const char *tag, int tag_len,
const char *tag, int tag_len,
void **out_buf, size_t *out_size,
struct flb_filter_instance *f_ins,
struct flb_input_instance *i_ins, void *context,
Expand Down Expand Up @@ -691,7 +731,7 @@ static int cb_log_to_metrics_filter(const void *data, size_t bytes,
break;
}
else {
snprintf(kubernetes_label_values[i],
snprintf(kubernetes_label_values[i],
MAX_LABEL_LENGTH - 1, "%s", rval->val.string);
}
if (rval){
Expand All @@ -708,12 +748,12 @@ static int cb_log_to_metrics_filter(const void *data, size_t bytes,
/* Fill optional labels */
label_values = flb_malloc(MAX_LABEL_COUNT * sizeof(char *));
for (i = 0; i < MAX_LABEL_COUNT; i++) {
label_values[i] = flb_malloc(MAX_LABEL_LENGTH *
label_values[i] = flb_malloc(MAX_LABEL_LENGTH *
sizeof(char));
}
label_count = fill_labels(ctx, label_values,
kubernetes_label_values, ctx->label_keys,

label_count = fill_labels(ctx, label_values,
kubernetes_label_values, ctx->label_accessors,
*ctx->label_counter, map);
if (label_count != *ctx->label_counter){
label_count = 0;
Expand All @@ -723,7 +763,7 @@ static int cb_log_to_metrics_filter(const void *data, size_t bytes,
/* Calculating and setting metric depending on the mode */
switch (ctx->mode) {
case FLB_LOG_TO_METRICS_COUNTER:
ret = cmt_counter_inc(ctx->c, ts, label_count,
ret = cmt_counter_inc(ctx->c, ts, label_count,
label_values);
break;

Expand All @@ -750,11 +790,11 @@ static int cb_log_to_metrics_filter(const void *data, size_t bytes,
gauge_value = (double)rval->val.i64;
}
else {
flb_plg_error(f_ins,
flb_plg_error(f_ins,
"cannot convert given value to metric");
break;
}

ret = cmt_gauge_set(ctx->g, ts, gauge_value,
label_count, label_values);
if (rval) {
Expand Down Expand Up @@ -811,13 +851,13 @@ static int cb_log_to_metrics_filter(const void *data, size_t bytes,
log_to_metrics_destroy(ctx);
return -1;
}

ret = flb_input_metrics_append(ctx->input_ins, ctx->tag, strlen(ctx->tag), ctx->cmt);

if (ret != 0) {
flb_plg_error(ctx->ins, "could not append metrics");
}

/* Cleanup */
msgpack_unpacked_destroy(&result);
if (label_values != NULL){
Expand Down Expand Up @@ -850,38 +890,38 @@ static int cb_log_to_metrics_exit(void *data, struct flb_config *config)

static struct flb_config_map config_map[] = {
{
FLB_CONFIG_MAP_STR, "regex", NULL,
FLB_CONFIG_MAP_STR, "regex", NULL,
FLB_CONFIG_MAP_MULT, FLB_FALSE, 0,
"Optional filter for records in which the content of KEY "
"matches the regular expression."
},
{
FLB_CONFIG_MAP_STR, "exclude", NULL,
FLB_CONFIG_MAP_STR, "exclude", NULL,
FLB_CONFIG_MAP_MULT, FLB_FALSE, 0,
"Optional filter for records in which the content of KEY "
"does not matches the regular expression."
},
{
FLB_CONFIG_MAP_STR, "metric_mode", "counter",
FLB_CONFIG_MAP_STR, "metric_mode", "counter",
FLB_FALSE, FLB_TRUE,
offsetof(struct log_to_metrics_ctx, mode),
"Mode selector. Values counter, gauge,"
" or histogram. Summary is not supported"
},
{
FLB_CONFIG_MAP_STR, "value_field", NULL,
FLB_CONFIG_MAP_STR, "value_field", NULL,
FLB_FALSE, FLB_TRUE,
offsetof(struct log_to_metrics_ctx, value_field),
"Numeric field to use for gauge or histogram"
},
{
FLB_CONFIG_MAP_STR, "metric_name", NULL,
FLB_CONFIG_MAP_STR, "metric_name", NULL,
FLB_FALSE, FLB_TRUE,
offsetof(struct log_to_metrics_ctx, metric_name),
"Name of metric"
},
{
FLB_CONFIG_MAP_STR, "metric_description", NULL,
FLB_CONFIG_MAP_STR, "metric_description", NULL,
FLB_FALSE, FLB_TRUE,
offsetof(struct log_to_metrics_ctx, metric_description),
"Help text for metric"
Expand All @@ -892,7 +932,12 @@ static struct flb_config_map config_map[] = {
"Enable kubernetes log metric fields"
},
{
FLB_CONFIG_MAP_STR, "label_field", NULL,
FLB_CONFIG_MAP_STR, "label", NULL,
FLB_CONFIG_MAP_MULT, FLB_FALSE, 0,
"Specify message field that should be included in the metric"
},
{
FLB_CONFIG_MAP_STR, "label_field", NULL,
FLB_CONFIG_MAP_MULT, FLB_FALSE, 0,
"Specify message field that should be included in the metric"
},
Expand Down
1 change: 1 addition & 0 deletions plugins/filter_log_to_metrics/log_to_metrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ struct log_to_metrics_ctx
struct cmt_gauge *g;
struct cmt_histogram *h;
struct cmt_histogram_buckets *histogram_buckets;
char **label_accessors;
char **label_keys;
int *label_counter;
bool kubernetes_mode;
Expand Down
Loading
Loading