Skip to content

Commit

Permalink
processor: added profile event support
Browse files Browse the repository at this point in the history
Signed-off-by: Leonardo Alminana <[email protected]>
  • Loading branch information
leonardo-albertovich authored and edsiper committed Jan 8, 2025
1 parent dd05fdc commit 1d4708f
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 3 deletions.
10 changes: 9 additions & 1 deletion include/fluent-bit/flb_processor.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

#include <ctraces/ctraces.h>
#include <cmetrics/cmetrics.h>
#include <cprofiles/cprofiles.h>

/* Processor plugin result values */
#define FLB_PROCESSOR_SUCCESS 0
Expand All @@ -37,6 +38,7 @@
#define FLB_PROCESSOR_LOGS 1
#define FLB_PROCESSOR_METRICS 2
#define FLB_PROCESSOR_TRACES 4
#define FLB_PROCESSOR_PROFILES 5

/* Type of processor unit: 'pipeline filter' or 'native unit' */
#define FLB_PROCESSOR_UNIT_NATIVE 0
Expand Down Expand Up @@ -93,7 +95,7 @@ struct flb_processor_unit {
*/
struct mk_list unused_list;

/* link to struct flb_processor->(logs, metrics, traces) list */
/* link to struct flb_processor->(logs, metrics, traces, profiles) list */
struct mk_list _head;

/* link to parent processor */
Expand All @@ -110,6 +112,7 @@ struct flb_processor {
struct mk_list logs;
struct mk_list metrics;
struct mk_list traces;
struct mk_list profiles;

size_t stage_count;
/*
Expand Down Expand Up @@ -155,6 +158,11 @@ struct flb_processor_plugin {
const char *,
int);

int (*cb_process_profiles) (struct flb_processor_instance *,
struct cprof *,
const char *,
int);

int (*cb_exit) (struct flb_processor_instance *, void *);

/* Notification: this callback will be invoked anytime a notification is received*/
Expand Down
56 changes: 54 additions & 2 deletions src/flb_processor.c
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ static int release_lock(pthread_mutex_t *lock,

/*
* A processor creates a chain of processing units for different telemetry data
* types such as logs, metrics and traces.
* types such as logs, metrics, traces and profiles.
*
* From a design perspective, a Processor can be run independently from inputs, outputs
* or unit tests directly.
Expand Down Expand Up @@ -129,6 +129,7 @@ struct flb_processor *flb_processor_create(struct flb_config *config,
mk_list_init(&proc->logs);
mk_list_init(&proc->metrics);
mk_list_init(&proc->traces);
mk_list_init(&proc->profiles);

return proc;
}
Expand Down Expand Up @@ -267,6 +268,9 @@ struct flb_processor_unit *flb_processor_unit_create(struct flb_processor *proc,
else if (event_type == FLB_PROCESSOR_TRACES) {
mk_list_add(&pu->_head, &proc->traces);
}
else if (event_type == FLB_PROCESSOR_PROFILES) {
mk_list_add(&pu->_head, &proc->profiles);
}

pu->stage = proc->stage_count;
proc->stage_count++;
Expand Down Expand Up @@ -406,6 +410,16 @@ int flb_processor_init(struct flb_processor *proc)
count++;
}

mk_list_foreach(head, &proc->profiles) {
pu = mk_list_entry(head, struct flb_processor_unit, _head);
ret = flb_processor_unit_init(pu);

if (ret == -1) {
return -1;
}
count++;
}

if (count > 0) {
proc->is_active = FLB_TRUE;
}
Expand All @@ -426,7 +440,8 @@ int flb_processor_is_active(struct flb_processor *proc)
/*
* This function will run all the processor units for the given tag and data, note
* that depending of the 'type', 'data' can reference a msgpack for logs, a CMetrics
* context for metrics or a 'CTraces' context for traces.
* context for metrics, a 'CTraces' context for traces or a 'CProfiles' context for
* profiles.
*/
int flb_processor_run(struct flb_processor *proc,
size_t starting_stage,
Expand Down Expand Up @@ -465,6 +480,9 @@ int flb_processor_run(struct flb_processor *proc,
else if (type == FLB_PROCESSOR_TRACES) {
list = &proc->traces;
}
else if (type == FLB_PROCESSOR_PROFILES) {
list = &proc->profiles;
}

#ifdef FLB_HAVE_METRICS
/* timestamp */
Expand Down Expand Up @@ -743,6 +761,22 @@ int flb_processor_run(struct flb_processor *proc,
}
}
}
else if (type == FLB_PROCESSOR_PROFILES) {
if (p_ins->p->cb_process_profiles != NULL) {
ret = p_ins->p->cb_process_profiles(p_ins,
(struct cprof *) cur_buf,
tag,
tag_len);

if (ret != FLB_PROCESSOR_SUCCESS) {
release_lock(&pu->lock,
FLB_PROCESSOR_LOCK_RETRY_LIMIT,
FLB_PROCESSOR_LOCK_RETRY_DELAY);

return -1;
}
}
}
}

release_lock(&pu->lock,
Expand Down Expand Up @@ -785,6 +819,13 @@ void flb_processor_destroy(struct flb_processor *proc)
mk_list_del(&pu->_head);
flb_processor_unit_destroy(pu);
}

mk_list_foreach_safe(head, tmp, &proc->profiles) {
pu = mk_list_entry(head, struct flb_processor_unit, _head);
mk_list_del(&pu->_head);
flb_processor_unit_destroy(pu);
}

flb_free(proc);
}

Expand Down Expand Up @@ -912,6 +953,17 @@ int flb_processors_load_from_config_format_group(struct flb_processor *proc, str
}
}

/* profiles */
val = cfl_kvlist_fetch(g->properties, "profiles");
if (val) {
ret = load_from_config_format_group(proc, FLB_PROCESSOR_PROFILES, val);

if (ret == -1) {
flb_error("failed to load 'profiles' processors");
return -1;
}
}

return 0;
}

Expand Down

0 comments on commit 1d4708f

Please sign in to comment.