diff --git a/plugins/filter_log_to_metrics/log_to_metrics.c b/plugins/filter_log_to_metrics/log_to_metrics.c index 147832b626c..5db50a4f33e 100644 --- a/plugins/filter_log_to_metrics/log_to_metrics.c +++ b/plugins/filter_log_to_metrics/log_to_metrics.c @@ -39,7 +39,7 @@ #include -static char kubernetes_label_keys[NUMBER_OF_KUBERNETES_LABELS][16] = +static char kubernetes_label_keys[NUMBER_OF_KUBERNETES_LABELS][16] = { "namespace_name", "pod_name", "container_name", @@ -77,9 +77,15 @@ static int log_to_metrics_destroy(struct log_to_metrics_ctx *ctx) if (ctx->cmt) { cmt_destroy(ctx->cmt); } - + delete_rules(ctx); + if (ctx->label_accessors != NULL) { + for (i = 0; i < MAX_LABEL_COUNT; i++) { + flb_free(ctx->label_accessors[i]); + } + flb_free(ctx->label_accessors); + } if (ctx->label_keys != NULL) { for (i = 0; i < MAX_LABEL_COUNT; i++) { flb_free(ctx->label_keys[i]); @@ -224,13 +230,17 @@ static inline int grep_filter_data(msgpack_object map, } static int set_labels(struct log_to_metrics_ctx *ctx, - char **label_keys, - int *label_counter, + char **label_accessors, + char **label_keys, + int *label_counter, struct flb_filter_instance *f_ins) { struct mk_list *head; - struct flb_kv *kv; + struct mk_list *split; + flb_sds_t tmp; + struct flb_kv *kv; + struct flb_split_entry *sentry; int counter = 0; int i; if (MAX_LABEL_COUNT < NUMBER_OF_KUBERNETES_LABELS){ @@ -238,8 +248,8 @@ static int set_labels(struct log_to_metrics_ctx *ctx, return -1; } if (ctx->kubernetes_mode){ - for (i = 0; i < NUMBER_OF_KUBERNETES_LABELS; i++){ - snprintf(label_keys[i], MAX_LABEL_LENGTH - 1, "%s", + for (i = 0; i < NUMBER_OF_KUBERNETES_LABELS; i++){ + snprintf(label_keys[i], MAX_LABEL_LENGTH - 1, "%s", kubernetes_label_keys[i]); } counter = NUMBER_OF_KUBERNETES_LABELS; @@ -249,14 +259,39 @@ static int set_labels(struct log_to_metrics_ctx *ctx, mk_list_foreach(head, &f_ins->properties) { kv = mk_list_entry(head, struct flb_kv, _head); - if (strcasecmp(kv->key, "label_field") != 0) { - continue; - } - if (counter >= MAX_LABEL_COUNT) { return MAX_LABEL_COUNT; } - snprintf(label_keys[counter++], MAX_LABEL_LENGTH - 1, "%s", kv->val); + + if (strcasecmp(kv->key, "label_field") == 0) { + snprintf(label_accessors[counter], MAX_LABEL_LENGTH - 1, "%s", kv->val); + snprintf(label_keys[counter], MAX_LABEL_LENGTH - 1, "%s", kv->val); + counter++; + } + else if (strcasecmp(kv->key, "label") == 0) { + split = flb_utils_split(kv->val, ' ', 1); + if (mk_list_size(split) != 2) { + flb_plg_error(ctx->ins, "invalid label, expected name and key"); + flb_utils_split_free(split); + return -1; + } + + sentry = mk_list_entry_first(split, struct flb_split_entry, _head); + tmp = flb_sds_create_len(sentry->value, sentry->len); + snprintf(label_keys[counter], MAX_LABEL_LENGTH - 1, "%s", tmp); + flb_sds_destroy(tmp); + + sentry = mk_list_entry_last(split, struct flb_split_entry, _head); + tmp = flb_sds_create_len(sentry->value, sentry->len); + snprintf(label_accessors[counter], MAX_LABEL_LENGTH - 1, "%s", tmp); + flb_sds_destroy(tmp); + counter++; + + flb_utils_split_free(split); + } + else { + continue; + } } *label_counter = counter; return counter; @@ -346,7 +381,7 @@ static int set_buckets(struct log_to_metrics_ctx *ctx, static int fill_labels(struct log_to_metrics_ctx *ctx, char **label_values, char kubernetes_label_values [NUMBER_OF_KUBERNETES_LABELS][MAX_LABEL_LENGTH], - char **label_keys, int label_counter, msgpack_object map) + char **label_accessors, int label_counter, msgpack_object map) { int label_iterator_start = 0; int i; @@ -365,14 +400,14 @@ static int fill_labels(struct log_to_metrics_ctx *ctx, char **label_values, if (kubernetes_label_keys[i] == NULL){ return -1; } - snprintf(label_values[i], MAX_LABEL_LENGTH - 1, "%s", + snprintf(label_values[i], MAX_LABEL_LENGTH - 1, "%s", kubernetes_label_values[i]); } label_iterator_start = NUMBER_OF_KUBERNETES_LABELS; } for (i = label_iterator_start; i < label_counter; i++){ - ra = flb_ra_create(label_keys[i], FLB_TRUE); + ra = flb_ra_create(label_accessors[i], FLB_TRUE); if (!ra) { flb_warn("invalid record accessor key, aborting"); break; @@ -383,15 +418,15 @@ static int fill_labels(struct log_to_metrics_ctx *ctx, char **label_values, /* Set value to empty string, so the value will be dropped in Cmetrics*/ label_values[i][0] = '\0'; } else if (rval->type == FLB_RA_STRING) { - snprintf(label_values[i], MAX_LABEL_LENGTH - 1, "%s", + snprintf(label_values[i], MAX_LABEL_LENGTH - 1, "%s", rval->val.string); } else if (rval->type == FLB_RA_FLOAT) { - snprintf(label_values[i], MAX_LABEL_LENGTH - 1, "%f", + snprintf(label_values[i], MAX_LABEL_LENGTH - 1, "%f", rval->val.f64); } else if (rval->type == FLB_RA_INT) { - snprintf(label_values[i], MAX_LABEL_LENGTH - 1, "%ld", + snprintf(label_values[i], MAX_LABEL_LENGTH - 1, "%ld", (long)rval->val.i64); } else { @@ -460,6 +495,11 @@ static int cb_log_to_metrics_init(struct flb_filter_instance *f_ins, return -1; } + ctx->label_accessors = NULL; + ctx->label_accessors = (char **) flb_malloc(MAX_LABEL_COUNT * sizeof(char *)); + for (i = 0; i < MAX_LABEL_COUNT; i++) { + ctx->label_accessors[i] = flb_malloc(MAX_LABEL_LENGTH * sizeof(char)); + } /* Set label keys */ ctx->label_keys = NULL; ctx->label_keys = (char **) flb_malloc(MAX_LABEL_COUNT * sizeof(char *)); @@ -468,7 +508,7 @@ static int cb_log_to_metrics_init(struct flb_filter_instance *f_ins, } ctx->label_counter = NULL; ctx->label_counter = flb_malloc(sizeof(int)); - label_count = set_labels(ctx, ctx->label_keys, ctx->label_counter, f_ins); + label_count = set_labels(ctx, ctx->label_accessors, ctx->label_keys, ctx->label_counter, f_ins); if (label_count < 0){ log_to_metrics_destroy(ctx); return -1; @@ -482,7 +522,7 @@ static int cb_log_to_metrics_init(struct flb_filter_instance *f_ins, } /* Check property metric mode */ - ctx->mode = 0; + ctx->mode = 0; tmp = (char *)flb_filter_get_property("metric_mode", f_ins); if (tmp != NULL) { if (strcasecmp(tmp, FLB_LOG_TO_METRICS_COUNTER_STR) == 0) { @@ -534,7 +574,7 @@ static int cb_log_to_metrics_init(struct flb_filter_instance *f_ins, log_to_metrics_destroy(ctx); return -1; } - snprintf(value_field, sizeof(value_field) - 1, "%s", + snprintf(value_field, sizeof(value_field) - 1, "%s", ctx->value_field); } @@ -563,12 +603,12 @@ static int cb_log_to_metrics_init(struct flb_filter_instance *f_ins, switch (ctx->mode) { case FLB_LOG_TO_METRICS_COUNTER: ctx->c = cmt_counter_create(ctx->cmt, "log_metric", "counter", - metric_name, metric_description, + metric_name, metric_description, label_count, ctx->label_keys); break; case FLB_LOG_TO_METRICS_GAUGE: ctx->g = cmt_gauge_create(ctx->cmt, "log_metric", "gauge", - metric_name, metric_description, + metric_name, metric_description, label_count, ctx->label_keys); break; case FLB_LOG_TO_METRICS_HISTOGRAM: @@ -619,7 +659,7 @@ static int cb_log_to_metrics_init(struct flb_filter_instance *f_ins, } static int cb_log_to_metrics_filter(const void *data, size_t bytes, - const char *tag, int tag_len, + const char *tag, int tag_len, void **out_buf, size_t *out_size, struct flb_filter_instance *f_ins, struct flb_input_instance *i_ins, void *context, @@ -691,7 +731,7 @@ static int cb_log_to_metrics_filter(const void *data, size_t bytes, break; } else { - snprintf(kubernetes_label_values[i], + snprintf(kubernetes_label_values[i], MAX_LABEL_LENGTH - 1, "%s", rval->val.string); } if (rval){ @@ -708,12 +748,12 @@ static int cb_log_to_metrics_filter(const void *data, size_t bytes, /* Fill optional labels */ label_values = flb_malloc(MAX_LABEL_COUNT * sizeof(char *)); for (i = 0; i < MAX_LABEL_COUNT; i++) { - label_values[i] = flb_malloc(MAX_LABEL_LENGTH * + label_values[i] = flb_malloc(MAX_LABEL_LENGTH * sizeof(char)); } - - label_count = fill_labels(ctx, label_values, - kubernetes_label_values, ctx->label_keys, + + label_count = fill_labels(ctx, label_values, + kubernetes_label_values, ctx->label_accessors, *ctx->label_counter, map); if (label_count != *ctx->label_counter){ label_count = 0; @@ -723,7 +763,7 @@ static int cb_log_to_metrics_filter(const void *data, size_t bytes, /* Calculating and setting metric depending on the mode */ switch (ctx->mode) { case FLB_LOG_TO_METRICS_COUNTER: - ret = cmt_counter_inc(ctx->c, ts, label_count, + ret = cmt_counter_inc(ctx->c, ts, label_count, label_values); break; @@ -750,11 +790,11 @@ static int cb_log_to_metrics_filter(const void *data, size_t bytes, gauge_value = (double)rval->val.i64; } else { - flb_plg_error(f_ins, + flb_plg_error(f_ins, "cannot convert given value to metric"); break; } - + ret = cmt_gauge_set(ctx->g, ts, gauge_value, label_count, label_values); if (rval) { @@ -811,13 +851,13 @@ static int cb_log_to_metrics_filter(const void *data, size_t bytes, log_to_metrics_destroy(ctx); return -1; } - + ret = flb_input_metrics_append(ctx->input_ins, ctx->tag, strlen(ctx->tag), ctx->cmt); if (ret != 0) { flb_plg_error(ctx->ins, "could not append metrics"); } - + /* Cleanup */ msgpack_unpacked_destroy(&result); if (label_values != NULL){ @@ -850,38 +890,38 @@ static int cb_log_to_metrics_exit(void *data, struct flb_config *config) static struct flb_config_map config_map[] = { { - FLB_CONFIG_MAP_STR, "regex", NULL, + FLB_CONFIG_MAP_STR, "regex", NULL, FLB_CONFIG_MAP_MULT, FLB_FALSE, 0, "Optional filter for records in which the content of KEY " "matches the regular expression." }, { - FLB_CONFIG_MAP_STR, "exclude", NULL, + FLB_CONFIG_MAP_STR, "exclude", NULL, FLB_CONFIG_MAP_MULT, FLB_FALSE, 0, "Optional filter for records in which the content of KEY " "does not matches the regular expression." }, { - FLB_CONFIG_MAP_STR, "metric_mode", "counter", + FLB_CONFIG_MAP_STR, "metric_mode", "counter", FLB_FALSE, FLB_TRUE, offsetof(struct log_to_metrics_ctx, mode), "Mode selector. Values counter, gauge," " or histogram. Summary is not supported" }, { - FLB_CONFIG_MAP_STR, "value_field", NULL, + FLB_CONFIG_MAP_STR, "value_field", NULL, FLB_FALSE, FLB_TRUE, offsetof(struct log_to_metrics_ctx, value_field), "Numeric field to use for gauge or histogram" }, { - FLB_CONFIG_MAP_STR, "metric_name", NULL, + FLB_CONFIG_MAP_STR, "metric_name", NULL, FLB_FALSE, FLB_TRUE, offsetof(struct log_to_metrics_ctx, metric_name), "Name of metric" }, { - FLB_CONFIG_MAP_STR, "metric_description", NULL, + FLB_CONFIG_MAP_STR, "metric_description", NULL, FLB_FALSE, FLB_TRUE, offsetof(struct log_to_metrics_ctx, metric_description), "Help text for metric" @@ -892,7 +932,12 @@ static struct flb_config_map config_map[] = { "Enable kubernetes log metric fields" }, { - FLB_CONFIG_MAP_STR, "label_field", NULL, + FLB_CONFIG_MAP_STR, "label", NULL, + FLB_CONFIG_MAP_MULT, FLB_FALSE, 0, + "Specify message field that should be included in the metric" + }, + { + FLB_CONFIG_MAP_STR, "label_field", NULL, FLB_CONFIG_MAP_MULT, FLB_FALSE, 0, "Specify message field that should be included in the metric" }, diff --git a/plugins/filter_log_to_metrics/log_to_metrics.h b/plugins/filter_log_to_metrics/log_to_metrics.h index b2ba4949064..6edb5ab305e 100644 --- a/plugins/filter_log_to_metrics/log_to_metrics.h +++ b/plugins/filter_log_to_metrics/log_to_metrics.h @@ -63,6 +63,7 @@ struct log_to_metrics_ctx struct cmt_gauge *g; struct cmt_histogram *h; struct cmt_histogram_buckets *histogram_buckets; + char **label_accessors; char **label_keys; int *label_counter; bool kubernetes_mode; diff --git a/tests/runtime/filter_log_to_metrics.c b/tests/runtime/filter_log_to_metrics.c index ccd564847bd..b8c8dba1dc2 100644 --- a/tests/runtime/filter_log_to_metrics.c +++ b/tests/runtime/filter_log_to_metrics.c @@ -62,6 +62,7 @@ void flb_test_log_to_metrics_gauge(void); void flb_test_log_to_metrics_histogram(void); void flb_test_log_to_metrics_reg(void); void flb_test_log_to_metrics_empty_label_keys_regex(void); +void flb_test_log_to_metrics_label(void); /* Test data */ @@ -122,6 +123,7 @@ TEST_LIST = { {"histogram", flb_test_log_to_metrics_histogram }, {"counter_regex", flb_test_log_to_metrics_reg }, {"regex_empty_label_keys", flb_test_log_to_metrics_empty_label_keys_regex }, + {"label", flb_test_log_to_metrics_label }, {NULL, NULL} }; @@ -138,7 +140,7 @@ int callback_test(void* data, size_t size, void* cb_data) flb_debug("[test_filter_log_to_metrics] received message: %s", (char*)data); pthread_mutex_lock(&result_mutex); strncat(output, data, size); - data_size = size; + data_size = size; pthread_mutex_unlock(&result_mutex); } flb_free(data); @@ -199,7 +201,7 @@ void flb_test_log_to_metrics_counter_k8s(void) "\"def456\",\"red\",\"right\"]"; ctx = flb_create(); - flb_service_set(ctx, "Flush", "0.200000000", "Grace", "1", "Log_Level", + flb_service_set(ctx, "Flush", "0.200000000", "Grace", "1", "Log_Level", "error", NULL); cb_data.cb = callback_test; @@ -260,7 +262,7 @@ void flb_test_log_to_metrics_counter(void) const char *expected = "\"value\":5.0,\"labels\":[\"red\",\"right\"]"; ctx = flb_create(); - flb_service_set(ctx, "Flush", "0.200000000", "Grace", "1", "Log_Level", + flb_service_set(ctx, "Flush", "0.200000000", "Grace", "1", "Log_Level", "error", NULL); cb_data.cb = callback_test; @@ -326,7 +328,7 @@ void flb_test_log_to_metrics_counter_k8s_two_tuples(void) ctx = flb_create(); - flb_service_set(ctx, "Flush", "0.200000000", "Grace", "1", "Log_Level", + flb_service_set(ctx, "Flush", "0.200000000", "Grace", "1", "Log_Level", "error", NULL); cb_data.cb = callback_test; @@ -394,7 +396,7 @@ void flb_test_log_to_metrics_gauge(void) const char *expected = "\"value\":20.0,\"labels\":[\"red\",\"right\"]"; ctx = flb_create(); - flb_service_set(ctx, "Flush", "0.200000000", "Grace", "1", "Log_Level", + flb_service_set(ctx, "Flush", "0.200000000", "Grace", "1", "Log_Level", "error", NULL); cb_data.cb = callback_test; @@ -522,7 +524,7 @@ void flb_test_log_to_metrics_reg(void) ctx = flb_create(); - flb_service_set(ctx, "Flush", "0.200000000", "Grace", "1", "Log_Level", + flb_service_set(ctx, "Flush", "0.200000000", "Grace", "1", "Log_Level", "error", NULL); cb_data.cb = callback_test; @@ -587,7 +589,7 @@ void flb_test_log_to_metrics_empty_label_keys_regex(void) ctx = flb_create(); - flb_service_set(ctx, "Flush", "0.200000000", "Grace", "1", "Log_Level", + flb_service_set(ctx, "Flush", "0.200000000", "Grace", "1", "Log_Level", "error", NULL); cb_data.cb = callback_test; @@ -631,3 +633,66 @@ void flb_test_log_to_metrics_empty_label_keys_regex(void) filter_test_destroy(ctx); } + +void flb_test_log_to_metrics_label(void) +{ + int ret; + int i; + flb_ctx_t *ctx; + int in_ffd; + int filter_ffd; + int out_ffd; + char *result = NULL; + struct flb_lib_out_cb cb_data; + char *input = JSON_MSG1; + char finalString[32768] = ""; + const char *expected_label_name = ",\"labels\":[\"pod_name\"],"; + const char *expected_label_value = "\"value\":2.0,\"labels\":[\"testpod\"]"; + + ctx = flb_create(); + flb_service_set(ctx, "Flush", "0.200000000", "Grace", "1", "Log_Level", + "error", NULL); + + cb_data.cb = callback_test; + cb_data.data = NULL; + + in_ffd = flb_input(ctx, (char *) "lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + filter_ffd = flb_filter(ctx, (char *) "log_to_metrics", NULL); + TEST_CHECK(filter_ffd >= 0); + ret = flb_filter_set(ctx, filter_ffd, + "Match", "*", + "Tag", "test_metric", + "metric_mode", "counter", + "metric_name", "test", + "metric_description", "Counts messages", + "kubernetes_mode", "off", + "label", "pod_name $kubernetes['pod_name']", + NULL); + + out_ffd = flb_output(ctx, (char *) "lib", (void *)&cb_data); + TEST_CHECK(out_ffd >= 0); + flb_output_set(ctx, out_ffd, + "match", "*", + "format", "json", + NULL); + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + for (i = 0; i < 2; i++){ + flb_lib_push(ctx, in_ffd, input, strlen(input)); + } + wait_with_timeout(500, finalString); + result = strstr(finalString, expected_label_name); + if (!TEST_CHECK(result != NULL)) { + TEST_MSG("expected substring:\n%s\ngot:\n%s\n", expected_label_name, finalString); + } + result = strstr(finalString, expected_label_value); + if (!TEST_CHECK(result != NULL)) { + TEST_MSG("expected substring:\n%s\ngot:\n%s\n", expected_label_value, finalString); + } + filter_test_destroy(ctx); + +}