diff --git a/plugins/out_prometheus_exporter/prom.c b/plugins/out_prometheus_exporter/prom.c index e120f6c1761..77f60a026ee 100644 --- a/plugins/out_prometheus_exporter/prom.c +++ b/plugins/out_prometheus_exporter/prom.c @@ -175,44 +175,68 @@ static void cb_prom_flush(struct flb_event_chunk *event_chunk, int add_ts; size_t off = 0; flb_sds_t metrics; - cfl_sds_t text; + cfl_sds_t text = NULL; + cfl_sds_t tmp = NULL; struct cmt *cmt; struct prom_exporter *ctx = out_context; + int ok = CMT_DECODE_MSGPACK_SUCCESS; + + text = flb_sds_create_size(128); + if (text == NULL) { + flb_plg_debug(ctx->ins, "failed to allocate buffer for text representation of metrics"); + FLB_OUTPUT_RETURN(FLB_ERROR); + } /* * A new set of metrics has arrived, perform decoding, apply labels, * convert to Prometheus text format and store the output in the * hash table for metrics. + * Note that metrics might be concatenated. So, we need to consume + * until the end of event_chunk. */ - ret = cmt_decode_msgpack_create(&cmt, - (char *) event_chunk->data, - event_chunk->size, &off); - if (ret != 0) { - FLB_OUTPUT_RETURN(FLB_ERROR); - } + while ((ret = cmt_decode_msgpack_create(&cmt, + (char *) event_chunk->data, + event_chunk->size, &off)) == ok) { - /* append labels set by config */ - append_labels(ctx, cmt); + if (ret != 0) { + flb_sds_destroy(text); + FLB_OUTPUT_RETURN(FLB_ERROR); + } - /* add timestamp in the output format ? */ - if (ctx->add_timestamp) { - add_ts = CMT_TRUE; - } - else { - add_ts = CMT_FALSE; - } + /* append labels set by config */ + append_labels(ctx, cmt); + + /* add timestamp in the output format ? */ + if (ctx->add_timestamp) { + add_ts = CMT_TRUE; + } + else { + add_ts = CMT_FALSE; + } - /* convert to text representation */ - text = cmt_encode_prometheus_create(cmt, add_ts); - if (!text) { + /* convert to text representation */ + tmp = cmt_encode_prometheus_create(cmt, add_ts); + if (!tmp) { + cmt_destroy(cmt); + flb_sds_destroy(text); + FLB_OUTPUT_RETURN(FLB_ERROR); + } + ret = flb_sds_cat_safe(&text, tmp, flb_sds_len(tmp)); + if (ret != 0) { + flb_plg_error(ctx->ins, "could not concatenate text representant coming from: %s", + flb_input_name(ins)); + cmt_encode_prometheus_destroy(tmp); + flb_sds_destroy(text); + cmt_destroy(cmt); + FLB_OUTPUT_RETURN(FLB_ERROR); + } + cmt_encode_prometheus_destroy(tmp); cmt_destroy(cmt); - FLB_OUTPUT_RETURN(FLB_ERROR); } - cmt_destroy(cmt); if (cfl_sds_len(text) == 0) { flb_plg_debug(ctx->ins, "context without metrics (empty)"); - cmt_encode_text_destroy(text); + flb_sds_destroy(text); FLB_OUTPUT_RETURN(FLB_OK); } @@ -221,11 +245,11 @@ static void cb_prom_flush(struct flb_event_chunk *event_chunk, if (ret == -1) { flb_plg_error(ctx->ins, "could not store metrics coming from: %s", flb_input_name(ins)); - cmt_encode_prometheus_destroy(text); + flb_sds_destroy(text); cmt_destroy(cmt); FLB_OUTPUT_RETURN(FLB_ERROR); } - cmt_encode_prometheus_destroy(text); + flb_sds_destroy(text); /* retrieve a full copy of all metrics */ metrics = hash_format_metrics(ctx);