diff --git a/plugins/in_tail/tail.c b/plugins/in_tail/tail.c index 41ac94a95bc..82bf1dc69f1 100644 --- a/plugins/in_tail/tail.c +++ b/plugins/in_tail/tail.c @@ -719,6 +719,12 @@ static struct flb_config_map config_map[] = { 0, FLB_TRUE, offsetof(struct flb_tail_config, skip_empty_lines), "Allows to skip empty lines." }, + + { + FLB_CONFIG_MAP_BOOL, "truncate_long_lines", "false", + 0, FLB_TRUE, offsetof(struct flb_tail_config, truncate_long_lines), + "Truncate overlong lines after input encoding to UTF-8" + }, #ifdef __linux__ { FLB_CONFIG_MAP_BOOL, "file_cache_advise", "true", diff --git a/plugins/in_tail/tail_config.c b/plugins/in_tail/tail_config.c index 929834b782e..2778b7af548 100644 --- a/plugins/in_tail/tail_config.c +++ b/plugins/in_tail/tail_config.c @@ -170,7 +170,7 @@ struct flb_tail_config *flb_tail_config_create(struct flb_input_instance *ins, if (sec == 0 && nsec == 0) { flb_plg_error(ctx->ins, "invalid 'refresh_interval' config " "value (%s)", tmp); - flb_free(ctx); + flb_tail_config_destroy(ctx); return NULL; } @@ -192,7 +192,7 @@ struct flb_tail_config *flb_tail_config_create(struct flb_input_instance *ins, /* Config: seconds interval to monitor file after rotation */ if (ctx->rotate_wait <= 0) { flb_plg_error(ctx->ins, "invalid 'rotate_wait' config value"); - flb_free(ctx); + flb_tail_config_destroy(ctx); return NULL; } @@ -215,7 +215,7 @@ struct flb_tail_config *flb_tail_config_create(struct flb_input_instance *ins, } else { flb_plg_error(ctx->ins, "invalid encoding 'unicode.encoding' value"); - flb_free(ctx); + flb_tail_config_destroy(ctx); return NULL; } } @@ -230,11 +230,20 @@ struct flb_tail_config *flb_tail_config_create(struct flb_input_instance *ins, } else { flb_plg_error(ctx->ins, "invalid encoding 'generic.encoding' value %s", tmp); - flb_free(ctx); + flb_tail_config_destroy(ctx); return NULL; } } +#ifdef FLB_HAVE_UNICODE_ENCODER + if (ctx->preferred_input_encoding != FLB_UNICODE_ENCODING_UNSPECIFIED && + ctx->generic_input_encoding_type != FLB_GENERIC_UNSPECIFIED) { + flb_plg_error(ctx->ins, + "'unicode.encoding' and 'generic.encoding' cannot be specified at the same time"); + flb_tail_config_destroy(ctx); + return NULL; + } +#endif #ifdef FLB_HAVE_PARSER /* Config: multi-line support */ if (ctx->multiline == FLB_TRUE) { @@ -258,7 +267,7 @@ struct flb_tail_config *flb_tail_config_create(struct flb_input_instance *ins, /* Validate buffer limit */ if (ctx->buf_chunk_size > ctx->buf_max_size) { flb_plg_error(ctx->ins, "buffer_max_size must be >= buffer_chunk"); - flb_free(ctx); + flb_tail_config_destroy(ctx); return NULL; } @@ -485,6 +494,13 @@ struct flb_tail_config *flb_tail_config_create(struct flb_input_instance *ins, "multiline_truncated_total", "Total number of truncated occurences for multilines", 1, (char *[]) {"name"}); + ctx->cmt_long_line_truncated = \ + cmt_counter_create(ins->cmt, + "fluentbit", "input", + "long_line_truncated_total", + "Total number of truncated occurences for long lines", + 1, (char *[]) {"name"}); + /* OLD metrics */ flb_metrics_add(FLB_TAIL_METRIC_F_OPENED, "files_opened", ctx->ins->metrics); @@ -494,6 +510,8 @@ struct flb_tail_config *flb_tail_config_create(struct flb_input_instance *ins, "files_rotated", ctx->ins->metrics); flb_metrics_add(FLB_TAIL_METRIC_M_TRUNCATED, "multiline_truncated", ctx->ins->metrics); + flb_metrics_add(FLB_TAIL_METRIC_L_TRUNCATED, + "long_line_truncated", ctx->ins->metrics); #endif return ctx; diff --git a/plugins/in_tail/tail_config.h b/plugins/in_tail/tail_config.h index dfd5f919354..00615448fac 100644 --- a/plugins/in_tail/tail_config.h +++ b/plugins/in_tail/tail_config.h @@ -42,6 +42,7 @@ #define FLB_TAIL_METRIC_F_CLOSED 101 /* number of closed files */ #define FLB_TAIL_METRIC_F_ROTATED 102 /* number of rotated files */ #define FLB_TAIL_METRIC_M_TRUNCATED 103 /* number of truncated occurrences of multiline */ +#define FLB_TAIL_METRIC_L_TRUNCATED 104 /* number of truncated occurrences of long lines */ #endif struct flb_tail_config { @@ -54,6 +55,7 @@ struct flb_tail_config { /* Buffer Config */ size_t buf_chunk_size; /* allocation chunks */ size_t buf_max_size; /* max size of a buffer */ + int truncate_long_lines; /* truncate long lines after re-encode */ /* Static files processor */ size_t static_batch_size; @@ -169,6 +171,7 @@ struct flb_tail_config { struct cmt_counter *cmt_files_closed; struct cmt_counter *cmt_files_rotated; struct cmt_counter *cmt_multiline_truncated; + struct cmt_counter *cmt_long_line_truncated; /* Hash: hash tables for quick acess to registered files */ struct flb_hash_table *static_hash; diff --git a/plugins/in_tail/tail_file.c b/plugins/in_tail/tail_file.c index ae2c4204801..1624bd94c7a 100644 --- a/plugins/in_tail/tail_file.c +++ b/plugins/in_tail/tail_file.c @@ -457,6 +457,24 @@ static FLB_INLINE const char *flb_skip_leading_zeros_simd(const char *data, cons return data; } +/* Return a UTF-8 safe cut position <= max */ +static size_t utf8_safe_truncate_pos(const char *s, size_t len, size_t max) +{ + size_t cut = 0; + + cut = (len <= max) ? len : max; + if (cut == len) { + return cut; + } + + /* backtrack over continuation bytes 10xxxxxx */ + while (cut > 0 && ((unsigned char)s[cut] & 0xC0) == 0x80) { + cut--; + } + + return cut; +} + static int process_content(struct flb_tail_file *file, size_t *bytes) { size_t len; @@ -481,6 +499,13 @@ static int process_content(struct flb_tail_file *file, size_t *bytes) #ifdef FLB_HAVE_UNICODE_ENCODER size_t decoded_len; #endif + size_t cut = 0; + size_t eff_max = 0; + size_t dec_len = 0; + size_t window = 0; + int truncation_happened = FLB_FALSE; + size_t bytes_override = 0; + void *nl = NULL; #ifdef FLB_HAVE_METRICS uint64_t ts; char *name; @@ -530,7 +555,8 @@ static int process_content(struct flb_tail_file *file, size_t *bytes) end - data); if (ret > 0) { data = decoded; - end = data + strlen(decoded); + /* Generic encoding conversion returns decoded length precisely with ret. */ + end = data + (size_t) ret; } else { flb_plg_error(ctx->ins, "encoding failed '%.*s' with status %d", end - data, data, ret); @@ -542,6 +568,58 @@ static int process_content(struct flb_tail_file *file, size_t *bytes) data = (char *)flb_skip_leading_zeros_simd(data, end, &processed_bytes); } + if (ctx->truncate_long_lines == FLB_TRUE && + file->buf_size >= ctx->buf_max_size) { + /* Use buf_max_size as the truncation threshold */ + if (ctx->buf_max_size > 0) { + eff_max = ctx->buf_max_size - 1; + } + else { + eff_max = 0; + } + dec_len = (size_t)(end - data); + window = ctx->buf_max_size + 1; + if (window > dec_len) { + window = dec_len; + } + + nl = memchr(data, '\n', window); + if (nl == NULL && eff_max > 0 && dec_len >= eff_max) { + if (file->skip_next == FLB_TRUE) { + bytes_override = (original_len > 0) ? original_len : file->buf_len; + goto truncation_end; + } + cut = utf8_safe_truncate_pos(data, dec_len, eff_max); + + if (cut > 0) { + if (ctx->multiline == FLB_TRUE) { + flb_tail_mult_flush(file, ctx); + } + + flb_tail_file_pack_line(NULL, data, cut, file, processed_bytes); + file->skip_next = FLB_TRUE; + + #ifdef FLB_HAVE_METRICS + cmt_counter_inc(ctx->cmt_long_line_truncated, + cfl_time_now(), 1, + (char*[]){ (char*) flb_input_name(ctx->ins) }); + /* Old api */ + flb_metrics_sum(FLB_TAIL_METRIC_L_TRUNCATED, 1, ctx->ins->metrics); + #endif + if (original_len > 0) { + bytes_override = original_len; + } + else { + bytes_override = file->buf_len; + } + truncation_happened = FLB_TRUE; + + lines++; + goto truncation_end; + } + } + } + while (data < end && (p = memchr(data, '\n', end - data))) { len = (p - data); crlf = 0; @@ -700,6 +778,7 @@ static int process_content(struct flb_tail_file *file, size_t *bytes) file->last_processed_bytes = processed_bytes; } +truncation_end: if (decoded) { flb_free(decoded); decoded = NULL; @@ -709,9 +788,13 @@ static int process_content(struct flb_tail_file *file, size_t *bytes) if (lines > 0) { /* Append buffer content to a chunk */ - if (original_len > 0) { + if (truncation_happened) { + *bytes = bytes_override; + } + else if (original_len > 0) { *bytes = original_len; - } else { + } + else { *bytes = processed_bytes; } @@ -1506,12 +1589,13 @@ int flb_tail_file_chunk(struct flb_tail_file *file) size_t file_buffer_capacity; size_t stream_data_length; ssize_t raw_data_length; - size_t processed_bytes; + size_t processed_bytes = 0; uint8_t *read_buffer; size_t read_size; size_t size; char *tmp; int ret; + int lines; struct flb_tail_config *ctx; /* Check if we the engine issued a pause */ @@ -1529,6 +1613,29 @@ int flb_tail_file_chunk(struct flb_tail_file *file) * If there is no more room for more data, try to increase the * buffer under the limit of buffer_max_size. */ + if (ctx->truncate_long_lines == FLB_TRUE) { + lines = process_content(file, &processed_bytes); + if (lines < 0) { + flb_plg_debug(ctx->ins, "inode=%"PRIu64" file=%s process content ERROR", + file->inode, file->name); + return FLB_TAIL_ERROR; + } + + if (lines > 0) { + file->stream_offset += processed_bytes; + file->last_processed_bytes = 0; + consume_bytes(file->buf_data, processed_bytes, file->buf_len); + file->buf_len -= processed_bytes; + file->buf_data[file->buf_len] = '\0'; + +#ifdef FLB_HAVE_SQLDB + if (file->config->db) { + flb_tail_db_file_offset(file, file->config); + } +#endif + return adjust_counters(ctx, file); + } + } if (file->buf_size >= ctx->buf_max_size) { if (ctx->skip_long_lines == FLB_FALSE) { flb_plg_error(ctx->ins, "file=%s requires a larger buffer size, " diff --git a/tests/runtime/in_tail.c b/tests/runtime/in_tail.c index c83e2b01213..2b1ebb5228a 100644 --- a/tests/runtime/in_tail.c +++ b/tests/runtime/in_tail.c @@ -991,6 +991,236 @@ void flb_test_in_tail_skip_long_lines() unlink(path); } +static int write_long_ascii_line(int fd, size_t total_bytes) +{ + const char *chunk = "0123456789abcdef0123456789abcdef"; /* 32 bytes */ + size_t chunk_len = strlen(chunk); + size_t written = 0; + ssize_t ret; + size_t rest = 0; + + while (written + chunk_len <= total_bytes) { + ret = write(fd, chunk, chunk_len); + if (ret < 0) { + flb_errno(); + return -1; + } + written += (size_t) ret; + } + if (written < total_bytes) { + rest = total_bytes - written; + ret = write(fd, chunk, rest); + if (ret < 0) { + flb_errno(); + return -1; + } + written += (size_t) ret; + } + if (write(fd, "\n", 1) != 1) { + flb_errno(); + return -1; + } + return 0; +} + +static int write_long_utf8_line(int fd, size_t total_bytes) +{ + const char *u8_aa = "あ"; + size_t u8_len = strlen(u8_aa); /* 3 */ + size_t written = 0; + ssize_t ret; + const char *ascii = "XYZ"; + size_t rest = 0; + + while (written + u8_len <= total_bytes) { + ret = write(fd, u8_aa, u8_len); + if (ret < 0) { + flb_errno(); + return -1; + } + written += (size_t) ret; + } + + if (written < total_bytes) { + rest = total_bytes - written; + if (rest > strlen(ascii)) { + rest = strlen(ascii); + } + ret = write(fd, ascii, rest); + if (ret < 0) { + flb_errno(); + return -1; + } + written += (size_t) ret; + } + if (write(fd, "\n", 1) != 1) { + flb_errno(); + return -1; + } + return 0; +} + +void flb_test_in_tail_truncate_long_lines() +{ + int64_t ret; + flb_ctx_t *ctx = NULL; + int in_ffd, out_ffd; + char path[PATH_MAX]; + struct tail_test_result result = {0}; + int fd; + + const char *target = "truncate_long_lines_basic"; + int nExpected = 3; /* before + truncated long line + after */ + int nExpectedNotMatched = 0; /* unused */ + int nExpectedLines = 0; /* unused */ + + struct flb_lib_out_cb cb; + int unused = 0; + int num = 0; + + cb.cb = cb_count_msgpack; + cb.data = &unused; + + clear_output_num(); + + ctx = flb_create(); + TEST_CHECK_(ctx != NULL, "flb_create failed"); + + TEST_CHECK_(flb_service_set(ctx, "Log_Level", "error", NULL) == 0, + "setting service options"); + + in_ffd = flb_input(ctx, "tail", NULL); + TEST_CHECK(in_ffd >= 0); + TEST_CHECK(flb_input_set(ctx, in_ffd, "tag", "test", NULL) == 0); + + snprintf(path, sizeof(path) - 1, DPATH "/log/%s.log", target); + fd = creat(path, S_IRWXU | S_IRGRP); + TEST_CHECK(fd >= 0); + + write(fd, "before_long_line\n", strlen("before_long_line\n")); + + TEST_CHECK(write_long_ascii_line(fd, 10 * 1024) == 0); + + write(fd, "after_long_line\n", strlen("after_long_line\n")); + close(fd); + + TEST_CHECK_(access(path, R_OK) == 0, "accessing log file: %s", path); + + TEST_CHECK(flb_input_set(ctx, in_ffd, + "path", path, + "read_from_head", "true", + "truncate_long_lines", "on", + "skip_long_lines", "off", + "Buffer_Chunk_Size", "1k", + "Buffer_Max_Size", "4k", + NULL) == 0); + + out_ffd = flb_output(ctx, "lib", &cb); + TEST_CHECK(out_ffd >= 0); + TEST_CHECK(flb_output_set(ctx, out_ffd, + "match", "test", + NULL) == 0); + + TEST_CHECK(flb_service_set(ctx, "Flush", "0.5", + "Grace", "1", + NULL) == 0); + + ret = flb_start(ctx); + TEST_CHECK_(ret == 0, "starting engine"); + + wait_num_with_timeout(5000, &num); + + num = get_output_num(); + TEST_CHECK(num == nExpected); + TEST_MSG("output count (truncate basic): got=%d expected=%d", num, nExpected); + + ret = flb_stop(ctx); + TEST_CHECK_(ret == 0, "stopping engine"); + + if (ctx) { + flb_destroy(ctx); + } + + unlink(path); +} + +void flb_test_in_tail_truncate_long_lines_utf8() +{ + int64_t ret; + flb_ctx_t *ctx = NULL; + int in_ffd, out_ffd; + char path[PATH_MAX]; + int fd; + + const char *target = "truncate_long_lines_utf8"; + int nExpected = 1; + + struct flb_lib_out_cb cb; + int unused = 0; + int num = 0; + + cb.cb = cb_count_msgpack; + cb.data = &unused; + + clear_output_num(); + + ctx = flb_create(); + TEST_CHECK_(ctx != NULL, "flb_create failed"); + + TEST_CHECK_(flb_service_set(ctx, "Log_Level", "error", NULL) == 0, + "setting service options"); + + in_ffd = flb_input(ctx, "tail", NULL); + TEST_CHECK(in_ffd >= 0); + TEST_CHECK(flb_input_set(ctx, in_ffd, "tag", "test", NULL) == 0); + + snprintf(path, sizeof(path) - 1, DPATH "/log/%s.log", target); + fd = creat(path, S_IRWXU | S_IRGRP); + TEST_CHECK(fd >= 0); + + TEST_CHECK(write_long_utf8_line(fd, 10 * 1024) == 0); + close(fd); + + TEST_CHECK_(access(path, R_OK) == 0, "accessing log file: %s", path); + + TEST_CHECK(flb_input_set(ctx, in_ffd, + "path", path, + "read_from_head", "true", + "truncate_long_lines", "on", + "skip_long_lines", "off", + "Buffer_Chunk_Size", "1k", + "Buffer_Max_Size", "4k", + NULL) == 0); + + out_ffd = flb_output(ctx, "lib", &cb); + TEST_CHECK(out_ffd >= 0); + TEST_CHECK(flb_output_set(ctx, out_ffd, + "match", "test", + NULL) == 0); + + TEST_CHECK(flb_service_set(ctx, "Flush", "0.5", + "Grace", "1", + NULL) == 0); + + ret = flb_start(ctx); + TEST_CHECK_(ret == 0, "starting engine"); + + wait_num_with_timeout(5000, &num); + + num = get_output_num(); + TEST_CHECK(num == nExpected); + TEST_MSG("output count (truncate utf8): got=%d expected=%d", num, nExpected); + + ret = flb_stop(ctx); + TEST_CHECK_(ret == 0, "stopping engine"); + + if (ctx) { + flb_destroy(ctx); + } + + unlink(path); +} + /* * test case for https://github.com/fluent/fluent-bit/issues/3943 * @@ -2426,6 +2656,8 @@ TEST_LIST = { {"issue_3943", flb_test_in_tail_issue_3943}, /* Properties */ {"skip_long_lines", flb_test_in_tail_skip_long_lines}, + {"truncate_long_lines", flb_test_in_tail_truncate_long_lines}, + {"truncate_long_lines_utf8", flb_test_in_tail_truncate_long_lines_utf8}, {"path_comma", flb_test_path_comma}, {"path_key", flb_test_path_key}, {"exclude_path", flb_test_exclude_path},