diff --git a/plugins/out_exit/exit.c b/plugins/out_exit/exit.c index 825903ac09e..59df850879f 100644 --- a/plugins/out_exit/exit.c +++ b/plugins/out_exit/exit.c @@ -19,15 +19,22 @@ #include #include +#include +#include -#define FLB_EXIT_FLUSH_COUNT "1" +#define FLB_EXIT_FLUSH_COUNT "-1" +#define FLB_EXIT_RECORD_COUNT "-1" +#define FLB_EXIT_TIME_COUNT "-1" struct flb_exit { int is_running; - int count; + struct flb_time start_time; /* config */ int flush_count; + int record_count; + int time_count; + struct flb_output_instance *ins; }; static int cb_exit_init(struct flb_output_instance *ins, struct flb_config *config, @@ -43,8 +50,13 @@ static int cb_exit_init(struct flb_output_instance *ins, struct flb_config *conf flb_errno(); return -1; } - ctx->count = 0; + ctx->ins = ins; ctx->is_running = FLB_TRUE; + flb_time_get(&ctx->start_time); + + ctx->flush_count = -1; + ctx->record_count = -1; + ctx->time_count = -1; ret = flb_output_config_map_set(ins, (void *) ctx); if (ret == -1) { @@ -52,6 +64,13 @@ static int cb_exit_init(struct flb_output_instance *ins, struct flb_config *conf return -1; } + if (ctx->flush_count == -1 && + ctx->record_count == -1 && + ctx->time_count == -1) { + // emulate legacy behaviour by setting to a single flush. + ctx->flush_count = 1; + } + flb_output_set_context(ins, ctx); return 0; @@ -66,11 +85,57 @@ static void cb_exit_flush(struct flb_event_chunk *event_chunk, (void) i_ins; (void) out_context; struct flb_exit *ctx = out_context; - - ctx->count++; - if (ctx->is_running == FLB_TRUE && ctx->count >= ctx->flush_count) { - flb_engine_exit(config); - ctx->is_running = FLB_FALSE; + struct flb_log_event_decoder log_decoder; + struct flb_log_event log_event; + struct flb_time now; + struct flb_time run; + int result; + + if (ctx->is_running == FLB_TRUE) { + if (ctx->flush_count > 0) { + ctx->flush_count--; + } + + if (ctx->record_count > 0 && event_chunk->type == FLB_EVENT_TYPE_LOGS) { + result = flb_log_event_decoder_init(&log_decoder, + (char *) event_chunk->data, + event_chunk->size); + if (result != FLB_EVENT_DECODER_SUCCESS) { + flb_plg_error(ctx->ins, + "Log event decoder initialization error : %d", result); + + FLB_OUTPUT_RETURN(FLB_RETRY); + } + + while (flb_log_event_decoder_next(&log_decoder, + &log_event) == FLB_EVENT_DECODER_SUCCESS) { + if (ctx->record_count > 0) { + ctx->record_count--; + } + } + + result = flb_log_event_decoder_get_last_result(&log_decoder); + flb_log_event_decoder_destroy(&log_decoder); + + if (result != FLB_EVENT_DECODER_SUCCESS) { + flb_plg_error(ctx->ins, "Log event decoder error : %d", result); + FLB_OUTPUT_RETURN(FLB_ERROR); + } + + FLB_OUTPUT_RETURN(FLB_OK); + } + + if (ctx->time_count > 0) { + flb_time_get(&now); + flb_time_diff(&now, &ctx->start_time, &run); + } + + if (ctx->flush_count == 0 || + ctx->record_count == 0 || + (ctx->time_count > 0 && flb_time_to_millisec(&run) > (ctx->time_count*1000))) { + flb_engine_exit(config); + ctx->is_running = FLB_FALSE; + } } FLB_OUTPUT_RETURN(FLB_OK); @@ -90,7 +155,17 @@ static struct flb_config_map config_map[] = { { FLB_CONFIG_MAP_INT, "flush_count", FLB_EXIT_FLUSH_COUNT, 0, FLB_TRUE, offsetof(struct flb_exit, flush_count), - NULL + "number of flushes before exiting" + }, + { + FLB_CONFIG_MAP_INT, "record_count", FLB_EXIT_RECORD_COUNT, + 0, FLB_TRUE, offsetof(struct flb_exit, record_count), + "number of records received before exiting" + }, + { + FLB_CONFIG_MAP_INT, "time_count", FLB_EXIT_TIME_COUNT, + 0, FLB_TRUE, offsetof(struct flb_exit, time_count), + "number of seconds before exiting (will trigger upon receiving a flush)" }, /* EOF */