From 1d4708f525fd7b74edcdc8ecef3217bcba026343 Mon Sep 17 00:00:00 2001 From: Leonardo Alminana Date: Wed, 18 Dec 2024 21:52:10 +0100 Subject: [PATCH] processor: added profile event support Signed-off-by: Leonardo Alminana --- include/fluent-bit/flb_processor.h | 10 +++++- src/flb_processor.c | 56 ++++++++++++++++++++++++++++-- 2 files changed, 63 insertions(+), 3 deletions(-) diff --git a/include/fluent-bit/flb_processor.h b/include/fluent-bit/flb_processor.h index bc7a9338371..5842fb37db7 100644 --- a/include/fluent-bit/flb_processor.h +++ b/include/fluent-bit/flb_processor.h @@ -28,6 +28,7 @@ #include #include +#include /* Processor plugin result values */ #define FLB_PROCESSOR_SUCCESS 0 @@ -37,6 +38,7 @@ #define FLB_PROCESSOR_LOGS 1 #define FLB_PROCESSOR_METRICS 2 #define FLB_PROCESSOR_TRACES 4 +#define FLB_PROCESSOR_PROFILES 5 /* Type of processor unit: 'pipeline filter' or 'native unit' */ #define FLB_PROCESSOR_UNIT_NATIVE 0 @@ -93,7 +95,7 @@ struct flb_processor_unit { */ struct mk_list unused_list; - /* link to struct flb_processor->(logs, metrics, traces) list */ + /* link to struct flb_processor->(logs, metrics, traces, profiles) list */ struct mk_list _head; /* link to parent processor */ @@ -110,6 +112,7 @@ struct flb_processor { struct mk_list logs; struct mk_list metrics; struct mk_list traces; + struct mk_list profiles; size_t stage_count; /* @@ -155,6 +158,11 @@ struct flb_processor_plugin { const char *, int); + int (*cb_process_profiles) (struct flb_processor_instance *, + struct cprof *, + const char *, + int); + int (*cb_exit) (struct flb_processor_instance *, void *); /* Notification: this callback will be invoked anytime a notification is received*/ diff --git a/src/flb_processor.c b/src/flb_processor.c index b04b04ddfc1..9322e297b1e 100644 --- a/src/flb_processor.c +++ b/src/flb_processor.c @@ -101,7 +101,7 @@ static int release_lock(pthread_mutex_t *lock, /* * A processor creates a chain of processing units for different telemetry data - * types such as logs, metrics and traces. + * types such as logs, metrics, traces and profiles. * * From a design perspective, a Processor can be run independently from inputs, outputs * or unit tests directly. @@ -129,6 +129,7 @@ struct flb_processor *flb_processor_create(struct flb_config *config, mk_list_init(&proc->logs); mk_list_init(&proc->metrics); mk_list_init(&proc->traces); + mk_list_init(&proc->profiles); return proc; } @@ -267,6 +268,9 @@ struct flb_processor_unit *flb_processor_unit_create(struct flb_processor *proc, else if (event_type == FLB_PROCESSOR_TRACES) { mk_list_add(&pu->_head, &proc->traces); } + else if (event_type == FLB_PROCESSOR_PROFILES) { + mk_list_add(&pu->_head, &proc->profiles); + } pu->stage = proc->stage_count; proc->stage_count++; @@ -406,6 +410,16 @@ int flb_processor_init(struct flb_processor *proc) count++; } + mk_list_foreach(head, &proc->profiles) { + pu = mk_list_entry(head, struct flb_processor_unit, _head); + ret = flb_processor_unit_init(pu); + + if (ret == -1) { + return -1; + } + count++; + } + if (count > 0) { proc->is_active = FLB_TRUE; } @@ -426,7 +440,8 @@ int flb_processor_is_active(struct flb_processor *proc) /* * This function will run all the processor units for the given tag and data, note * that depending of the 'type', 'data' can reference a msgpack for logs, a CMetrics - * context for metrics or a 'CTraces' context for traces. + * context for metrics, a 'CTraces' context for traces or a 'CProfiles' context for + * profiles. */ int flb_processor_run(struct flb_processor *proc, size_t starting_stage, @@ -465,6 +480,9 @@ int flb_processor_run(struct flb_processor *proc, else if (type == FLB_PROCESSOR_TRACES) { list = &proc->traces; } + else if (type == FLB_PROCESSOR_PROFILES) { + list = &proc->profiles; + } #ifdef FLB_HAVE_METRICS /* timestamp */ @@ -743,6 +761,22 @@ int flb_processor_run(struct flb_processor *proc, } } } + else if (type == FLB_PROCESSOR_PROFILES) { + if (p_ins->p->cb_process_profiles != NULL) { + ret = p_ins->p->cb_process_profiles(p_ins, + (struct cprof *) cur_buf, + tag, + tag_len); + + if (ret != FLB_PROCESSOR_SUCCESS) { + release_lock(&pu->lock, + FLB_PROCESSOR_LOCK_RETRY_LIMIT, + FLB_PROCESSOR_LOCK_RETRY_DELAY); + + return -1; + } + } + } } release_lock(&pu->lock, @@ -785,6 +819,13 @@ void flb_processor_destroy(struct flb_processor *proc) mk_list_del(&pu->_head); flb_processor_unit_destroy(pu); } + + mk_list_foreach_safe(head, tmp, &proc->profiles) { + pu = mk_list_entry(head, struct flb_processor_unit, _head); + mk_list_del(&pu->_head); + flb_processor_unit_destroy(pu); + } + flb_free(proc); } @@ -912,6 +953,17 @@ int flb_processors_load_from_config_format_group(struct flb_processor *proc, str } } + /* profiles */ + val = cfl_kvlist_fetch(g->properties, "profiles"); + if (val) { + ret = load_from_config_format_group(proc, FLB_PROCESSOR_PROFILES, val); + + if (ret == -1) { + flb_error("failed to load 'profiles' processors"); + return -1; + } + } + return 0; }