Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions plugins/in_tail/tail.c
Original file line number Diff line number Diff line change
Expand Up @@ -719,6 +719,12 @@ static struct flb_config_map config_map[] = {
0, FLB_TRUE, offsetof(struct flb_tail_config, skip_empty_lines),
"Allows to skip empty lines."
},

{
FLB_CONFIG_MAP_BOOL, "truncate_long_lines", "false",
0, FLB_TRUE, offsetof(struct flb_tail_config, truncate_long_lines),
"Truncate overlong lines after input encoding to UTF-8"
},
#ifdef __linux__
{
FLB_CONFIG_MAP_BOOL, "file_cache_advise", "true",
Expand Down
28 changes: 23 additions & 5 deletions plugins/in_tail/tail_config.c
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ struct flb_tail_config *flb_tail_config_create(struct flb_input_instance *ins,
if (sec == 0 && nsec == 0) {
flb_plg_error(ctx->ins, "invalid 'refresh_interval' config "
"value (%s)", tmp);
flb_free(ctx);
flb_tail_config_destroy(ctx);
return NULL;
}

Expand All @@ -192,7 +192,7 @@ struct flb_tail_config *flb_tail_config_create(struct flb_input_instance *ins,
/* Config: seconds interval to monitor file after rotation */
if (ctx->rotate_wait <= 0) {
flb_plg_error(ctx->ins, "invalid 'rotate_wait' config value");
flb_free(ctx);
flb_tail_config_destroy(ctx);
return NULL;
}

Expand All @@ -215,7 +215,7 @@ struct flb_tail_config *flb_tail_config_create(struct flb_input_instance *ins,
}
else {
flb_plg_error(ctx->ins, "invalid encoding 'unicode.encoding' value");
flb_free(ctx);
flb_tail_config_destroy(ctx);
return NULL;
}
}
Expand All @@ -230,11 +230,20 @@ struct flb_tail_config *flb_tail_config_create(struct flb_input_instance *ins,
}
else {
flb_plg_error(ctx->ins, "invalid encoding 'generic.encoding' value %s", tmp);
flb_free(ctx);
flb_tail_config_destroy(ctx);
return NULL;
}
}

#ifdef FLB_HAVE_UNICODE_ENCODER
if (ctx->preferred_input_encoding != FLB_UNICODE_ENCODING_UNSPECIFIED &&
ctx->generic_input_encoding_type != FLB_GENERIC_UNSPECIFIED) {
flb_plg_error(ctx->ins,
"'unicode.encoding' and 'generic.encoding' cannot be specified at the same time");
flb_tail_config_destroy(ctx);
return NULL;
}
#endif
#ifdef FLB_HAVE_PARSER
/* Config: multi-line support */
if (ctx->multiline == FLB_TRUE) {
Expand All @@ -258,7 +267,7 @@ struct flb_tail_config *flb_tail_config_create(struct flb_input_instance *ins,
/* Validate buffer limit */
if (ctx->buf_chunk_size > ctx->buf_max_size) {
flb_plg_error(ctx->ins, "buffer_max_size must be >= buffer_chunk");
flb_free(ctx);
flb_tail_config_destroy(ctx);
return NULL;
}

Expand Down Expand Up @@ -485,6 +494,13 @@ struct flb_tail_config *flb_tail_config_create(struct flb_input_instance *ins,
"multiline_truncated_total",
"Total number of truncated occurences for multilines",
1, (char *[]) {"name"});
ctx->cmt_long_line_truncated = \
cmt_counter_create(ins->cmt,
"fluentbit", "input",
"long_line_truncated_total",
"Total number of truncated occurences for long lines",
1, (char *[]) {"name"});

/* OLD metrics */
flb_metrics_add(FLB_TAIL_METRIC_F_OPENED,
"files_opened", ctx->ins->metrics);
Expand All @@ -494,6 +510,8 @@ struct flb_tail_config *flb_tail_config_create(struct flb_input_instance *ins,
"files_rotated", ctx->ins->metrics);
flb_metrics_add(FLB_TAIL_METRIC_M_TRUNCATED,
"multiline_truncated", ctx->ins->metrics);
flb_metrics_add(FLB_TAIL_METRIC_L_TRUNCATED,
"long_line_truncated", ctx->ins->metrics);
#endif

return ctx;
Expand Down
3 changes: 3 additions & 0 deletions plugins/in_tail/tail_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
#define FLB_TAIL_METRIC_F_CLOSED 101 /* number of closed files */
#define FLB_TAIL_METRIC_F_ROTATED 102 /* number of rotated files */
#define FLB_TAIL_METRIC_M_TRUNCATED 103 /* number of truncated occurrences of multiline */
#define FLB_TAIL_METRIC_L_TRUNCATED 104 /* number of truncated occurrences of long lines */
#endif

struct flb_tail_config {
Expand All @@ -54,6 +55,7 @@ struct flb_tail_config {
/* Buffer Config */
size_t buf_chunk_size; /* allocation chunks */
size_t buf_max_size; /* max size of a buffer */
int truncate_long_lines; /* truncate long lines after re-encode */

/* Static files processor */
size_t static_batch_size;
Expand Down Expand Up @@ -169,6 +171,7 @@ struct flb_tail_config {
struct cmt_counter *cmt_files_closed;
struct cmt_counter *cmt_files_rotated;
struct cmt_counter *cmt_multiline_truncated;
struct cmt_counter *cmt_long_line_truncated;

/* Hash: hash tables for quick acess to registered files */
struct flb_hash_table *static_hash;
Expand Down
115 changes: 111 additions & 4 deletions plugins/in_tail/tail_file.c
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,24 @@ static FLB_INLINE const char *flb_skip_leading_zeros_simd(const char *data, cons
return data;
}

/* Return a UTF-8 safe cut position <= max */
static size_t utf8_safe_truncate_pos(const char *s, size_t len, size_t max)
{
size_t cut = 0;

cut = (len <= max) ? len : max;
if (cut == len) {
return cut;
}

/* backtrack over continuation bytes 10xxxxxx */
while (cut > 0 && ((unsigned char)s[cut] & 0xC0) == 0x80) {
cut--;
}

return cut;
}

static int process_content(struct flb_tail_file *file, size_t *bytes)
{
size_t len;
Expand All @@ -481,6 +499,13 @@ static int process_content(struct flb_tail_file *file, size_t *bytes)
#ifdef FLB_HAVE_UNICODE_ENCODER
size_t decoded_len;
#endif
size_t cut = 0;
size_t eff_max = 0;
size_t dec_len = 0;
size_t window = 0;
int truncation_happened = FLB_FALSE;
size_t bytes_override = 0;
void *nl = NULL;
#ifdef FLB_HAVE_METRICS
uint64_t ts;
char *name;
Expand Down Expand Up @@ -530,7 +555,8 @@ static int process_content(struct flb_tail_file *file, size_t *bytes)
end - data);
if (ret > 0) {
data = decoded;
end = data + strlen(decoded);
/* Generic encoding conversion returns decoded length precisely with ret. */
end = data + (size_t) ret;
}
else {
flb_plg_error(ctx->ins, "encoding failed '%.*s' with status %d", end - data, data, ret);
Expand All @@ -542,6 +568,58 @@ static int process_content(struct flb_tail_file *file, size_t *bytes)
data = (char *)flb_skip_leading_zeros_simd(data, end, &processed_bytes);
}

if (ctx->truncate_long_lines == FLB_TRUE &&
file->buf_size >= ctx->buf_max_size) {
/* Use buf_max_size as the truncation threshold */
if (ctx->buf_max_size > 0) {
eff_max = ctx->buf_max_size - 1;
}
else {
eff_max = 0;
}
dec_len = (size_t)(end - data);
window = ctx->buf_max_size + 1;
if (window > dec_len) {
window = dec_len;
}

nl = memchr(data, '\n', window);
if (nl == NULL && eff_max > 0 && dec_len >= eff_max) {
if (file->skip_next == FLB_TRUE) {
bytes_override = (original_len > 0) ? original_len : file->buf_len;
goto truncation_end;
}
cut = utf8_safe_truncate_pos(data, dec_len, eff_max);

if (cut > 0) {
if (ctx->multiline == FLB_TRUE) {
flb_tail_mult_flush(file, ctx);
}

flb_tail_file_pack_line(NULL, data, cut, file, processed_bytes);
file->skip_next = FLB_TRUE;

#ifdef FLB_HAVE_METRICS
cmt_counter_inc(ctx->cmt_long_line_truncated,
cfl_time_now(), 1,
(char*[]){ (char*) flb_input_name(ctx->ins) });
/* Old api */
flb_metrics_sum(FLB_TAIL_METRIC_L_TRUNCATED, 1, ctx->ins->metrics);
#endif
if (original_len > 0) {
bytes_override = original_len;
}
else {
bytes_override = file->buf_len;
}
truncation_happened = FLB_TRUE;

lines++;
goto truncation_end;
}
}
}

while (data < end && (p = memchr(data, '\n', end - data))) {
len = (p - data);
crlf = 0;
Expand Down Expand Up @@ -700,6 +778,7 @@ static int process_content(struct flb_tail_file *file, size_t *bytes)
file->last_processed_bytes = processed_bytes;
}

truncation_end:
if (decoded) {
flb_free(decoded);
decoded = NULL;
Expand All @@ -709,9 +788,13 @@ static int process_content(struct flb_tail_file *file, size_t *bytes)

if (lines > 0) {
/* Append buffer content to a chunk */
if (original_len > 0) {
if (truncation_happened) {
*bytes = bytes_override;
}
else if (original_len > 0) {
*bytes = original_len;
} else {
}
else {
*bytes = processed_bytes;
}

Expand Down Expand Up @@ -1506,12 +1589,13 @@ int flb_tail_file_chunk(struct flb_tail_file *file)
size_t file_buffer_capacity;
size_t stream_data_length;
ssize_t raw_data_length;
size_t processed_bytes;
size_t processed_bytes = 0;
uint8_t *read_buffer;
size_t read_size;
size_t size;
char *tmp;
int ret;
int lines;
struct flb_tail_config *ctx;

/* Check if we the engine issued a pause */
Expand All @@ -1529,6 +1613,29 @@ int flb_tail_file_chunk(struct flb_tail_file *file)
* If there is no more room for more data, try to increase the
* buffer under the limit of buffer_max_size.
*/
if (ctx->truncate_long_lines == FLB_TRUE) {
lines = process_content(file, &processed_bytes);
if (lines < 0) {
flb_plg_debug(ctx->ins, "inode=%"PRIu64" file=%s process content ERROR",
file->inode, file->name);
return FLB_TAIL_ERROR;
}

if (lines > 0) {
file->stream_offset += processed_bytes;
file->last_processed_bytes = 0;
consume_bytes(file->buf_data, processed_bytes, file->buf_len);
file->buf_len -= processed_bytes;
file->buf_data[file->buf_len] = '\0';

#ifdef FLB_HAVE_SQLDB
if (file->config->db) {
flb_tail_db_file_offset(file, file->config);
}
#endif
return adjust_counters(ctx, file);
}
}
if (file->buf_size >= ctx->buf_max_size) {
if (ctx->skip_long_lines == FLB_FALSE) {
flb_plg_error(ctx->ins, "file=%s requires a larger buffer size, "
Expand Down
Loading
Loading