Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

out_exit: add time_count (seconds) and record_count parameters for out_exit. #8621

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 84 additions & 9 deletions plugins/out_exit/exit.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,22 @@

#include <fluent-bit/flb_output_plugin.h>
#include <fluent-bit/flb_utils.h>
#include <fluent-bit/flb_time.h>
#include <fluent-bit/flb_log_event_decoder.h>

#define FLB_EXIT_FLUSH_COUNT "1"
#define FLB_EXIT_FLUSH_COUNT "-1"
#define FLB_EXIT_RECORD_COUNT "-1"
#define FLB_EXIT_TIME_COUNT "-1"

struct flb_exit {
int is_running;
int count;
struct flb_time start_time;

/* config */
int flush_count;
int record_count;
int time_count;
struct flb_output_instance *ins;
};

static int cb_exit_init(struct flb_output_instance *ins, struct flb_config *config,
Expand All @@ -43,15 +50,27 @@ static int cb_exit_init(struct flb_output_instance *ins, struct flb_config *conf
flb_errno();
return -1;
}
ctx->count = 0;
ctx->ins = ins;
ctx->is_running = FLB_TRUE;
flb_time_get(&ctx->start_time);

ctx->flush_count = -1;
ctx->record_count = -1;
ctx->time_count = -1;

ret = flb_output_config_map_set(ins, (void *) ctx);
if (ret == -1) {
flb_free(ctx);
return -1;
}

if (ctx->flush_count == -1 &&
ctx->record_count == -1 &&
ctx->time_count == -1) {
// emulate legacy behaviour by setting to a single flush.
ctx->flush_count = 1;
}

flb_output_set_context(ins, ctx);

return 0;
Expand All @@ -66,11 +85,57 @@ static void cb_exit_flush(struct flb_event_chunk *event_chunk,
(void) i_ins;
(void) out_context;
struct flb_exit *ctx = out_context;

ctx->count++;
if (ctx->is_running == FLB_TRUE && ctx->count >= ctx->flush_count) {
flb_engine_exit(config);
ctx->is_running = FLB_FALSE;
struct flb_log_event_decoder log_decoder;
struct flb_log_event log_event;
struct flb_time now;
struct flb_time run;
int result;

if (ctx->is_running == FLB_TRUE) {
if (ctx->flush_count > 0) {
ctx->flush_count--;
}

if (ctx->record_count > 0 && event_chunk->type == FLB_EVENT_TYPE_LOGS) {
result = flb_log_event_decoder_init(&log_decoder,
(char *) event_chunk->data,
event_chunk->size);
if (result != FLB_EVENT_DECODER_SUCCESS) {
flb_plg_error(ctx->ins,
"Log event decoder initialization error : %d", result);

FLB_OUTPUT_RETURN(FLB_RETRY);
}

while (flb_log_event_decoder_next(&log_decoder,
&log_event) == FLB_EVENT_DECODER_SUCCESS) {
if (ctx->record_count > 0) {
ctx->record_count--;
}
}

result = flb_log_event_decoder_get_last_result(&log_decoder);
flb_log_event_decoder_destroy(&log_decoder);

if (result != FLB_EVENT_DECODER_SUCCESS) {
flb_plg_error(ctx->ins, "Log event decoder error : %d", result);
FLB_OUTPUT_RETURN(FLB_ERROR);
}

FLB_OUTPUT_RETURN(FLB_OK);
}

if (ctx->time_count > 0) {
flb_time_get(&now);
flb_time_diff(&now, &ctx->start_time, &run);
}

if (ctx->flush_count == 0 ||
ctx->record_count == 0 ||
(ctx->time_count > 0 && flb_time_to_millisec(&run) > (ctx->time_count*1000))) {
flb_engine_exit(config);
ctx->is_running = FLB_FALSE;
}
}

FLB_OUTPUT_RETURN(FLB_OK);
Expand All @@ -90,7 +155,17 @@ static struct flb_config_map config_map[] = {
{
FLB_CONFIG_MAP_INT, "flush_count", FLB_EXIT_FLUSH_COUNT,
0, FLB_TRUE, offsetof(struct flb_exit, flush_count),
NULL
"number of flushes before exiting"
},
{
FLB_CONFIG_MAP_INT, "record_count", FLB_EXIT_RECORD_COUNT,
0, FLB_TRUE, offsetof(struct flb_exit, record_count),
"number of records received before exiting"
},
{
FLB_CONFIG_MAP_INT, "time_count", FLB_EXIT_TIME_COUNT,
0, FLB_TRUE, offsetof(struct flb_exit, time_count),
"number of seconds before exiting (will trigger upon receiving a flush)"
},

/* EOF */
Expand Down
Loading