From 802392a6f7f4e0ee1feaeed13c8aa7aaf868a2c6 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Tue, 24 Dec 2024 18:06:57 +0900 Subject: [PATCH] in_systemd: tests: Provide restoring way the previuous behavior Signed-off-by: Hiroshi Hatake --- plugins/in_systemd/systemd.c | 208 +++++++++++++++++++++------- plugins/in_systemd/systemd_config.h | 1 + tests/runtime/in_systemd.c | 85 ++++++++++++ 3 files changed, 247 insertions(+), 47 deletions(-) diff --git a/plugins/in_systemd/systemd.c b/plugins/in_systemd/systemd.c index 08ba75d210e..9addc6556c7 100644 --- a/plugins/in_systemd/systemd.c +++ b/plugins/in_systemd/systemd.c @@ -261,6 +261,71 @@ static int systemd_enumerate_data_store(struct flb_config *config, return -1; } +static int systemd_process_simple(struct flb_config *config, + struct flb_input_instance *ins, + void *plugin_context, + void *format_context, + const void *data, size_t data_size) +{ + int i; + int ret; + int len; + size_t length = data_size; + char *buf = NULL; + const char *sep; + const char *key; + const char *val; + struct flb_systemd_config *ctx = plugin_context; + + key = (const char *) data; + sep = strchr(key, '='); + if (sep == NULL) { + return -2; + } + + len = (sep - key); + + ret = flb_log_event_encoder_append_body_string_length( + ctx->log_encoder, len); + + if (ctx->lowercase == FLB_TRUE) { + /* + * Ensure buf to have enough space for the key because the libsystemd + * might return larger data than the threshold. + */ + if (buf == NULL) { + buf = flb_sds_create_len(NULL, ctx->threshold); + } + if (flb_sds_alloc(buf) < len) { + buf = flb_sds_increase(buf, len - flb_sds_alloc(buf)); + } + for (i = 0; i < len; i++) { + buf[i] = tolower(key[i]); + } + + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + ret = flb_log_event_encoder_append_body_string_body( + ctx->log_encoder, buf, len); + } + } + else { + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + ret = flb_log_event_encoder_append_body_string_body( + ctx->log_encoder, (char *) key, len); + } + } + + val = sep + 1; + len = length - (sep - key) - 1; + + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + ret = flb_log_event_encoder_append_body_string( + ctx->log_encoder, (char *) val, len); + } + + return 0; +} + static int in_systemd_collect(struct flb_input_instance *ins, struct flb_config *config, void *in_context) { @@ -387,44 +452,74 @@ static int in_systemd_collect(struct flb_input_instance *ins, ret = flb_log_event_encoder_set_timestamp(ctx->log_encoder, &tm); } - /* create an empty kvlist as the labels */ - kvlist = cfl_kvlist_create(); - if (!kvlist) { - flb_plg_error(ctx->ins, "error allocating kvlist"); - break; - } - /* Pack every field in the entry */ entries = 0; skip_entries = 0; - while (sd_journal_enumerate_data(ctx->j, &data, &length) > 0 && - entries < ctx->max_fields) { - key = (const char *) data; - if (ctx->strip_underscores == FLB_TRUE && key[0] == '_') { - key++; - length--; + if (ctx->compact_key == FLB_TRUE) { + /* create an empty kvlist as the labels */ + kvlist = cfl_kvlist_create(); + if (!kvlist) { + flb_plg_error(ctx->ins, "error allocating kvlist"); + break; } - ret = systemd_enumerate_data_store(config, ctx->ins, - (void *)ctx, (void *)kvlist, - key, length); - if (ret == -2) { - skip_entries++; - continue; - } - else if (ret == -1) { - continue; + while (sd_journal_enumerate_data(ctx->j, &data, &length) > 0 && + entries < ctx->max_fields) { + key = (const char *) data; + if (ctx->strip_underscores == FLB_TRUE && key[0] == '_') { + key++; + length--; + } + + ret = systemd_enumerate_data_store(config, ctx->ins, + (void *)ctx, (void *)kvlist, + key, length); + if (ret == -2) { + skip_entries++; + continue; + } + else if (ret == -1) { + continue; + } + + entries++; } + rows++; - entries++; + /* Interpret cfl_kvlist as logs type of events later. */ + ret = append_enumerate_data(ctx, kvlist); + + if (kvlist) { + cfl_kvlist_destroy(kvlist); + } } - rows++; + else { + /* Pack every field in the entry */ + while (sd_journal_enumerate_data(ctx->j, &data, &length) > 0 && + entries < ctx->max_fields) { + key = (const char *) data; + if (ctx->strip_underscores == FLB_TRUE && key[0] == '_') { + key++; + length--; + } - /* Interpret cfl_kvlist as logs type of events later. */ - ret = append_enumerate_data(ctx, kvlist); + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + ret = systemd_process_simple(config, ctx->ins, + (void *)ctx, NULL, + key, length); + } - if (kvlist) { - cfl_kvlist_destroy(kvlist); + if (ret == -2) { + skip_entries++; + continue; + } + else if (ret == -1) { + continue; + } + + entries++; + } + rows++; } if (skip_entries > 0) { @@ -668,35 +763,49 @@ static int cb_systemd_format_test(struct flb_config *config, ret = flb_log_event_encoder_set_timestamp(ctx->log_encoder, &tm); } - /* create an empty kvlist as the labels */ - kvlist = cfl_kvlist_create(); - if (!kvlist) { - flb_plg_error(ctx->ins, "error allocating kvlist"); - return -1; - } - keys = (const char *) data; kvs = cfl_utils_split(keys, '\n', -1 ); if (kvs == NULL) { goto split_error; } - cfl_list_foreach(head, kvs) { - cur = cfl_list_entry(head, struct cfl_split_entry, _head); - ret = systemd_enumerate_data_store(config, ctx->ins, - (void *)ctx, (void *)kvlist, - cur->value, cur->len); + if (ctx->compact_key == FLB_TRUE) { + /* create an empty kvlist as the labels */ + kvlist = cfl_kvlist_create(); + if (!kvlist) { + flb_plg_error(ctx->ins, "error allocating kvlist"); + return -1; + } + + cfl_list_foreach(head, kvs) { + cur = cfl_list_entry(head, struct cfl_split_entry, _head); + ret = systemd_enumerate_data_store(config, ctx->ins, + (void *)ctx, (void *)kvlist, + cur->value, cur->len); - if (ret == -2 || ret == -1) { - continue; + if (ret == -2 || ret == -1) { + continue; + } } - } - /* Interpret cfl_kvlist as logs type of events later. */ - ret = append_enumerate_data(ctx, kvlist); + /* Interpret cfl_kvlist as logs type of events later. */ + ret = append_enumerate_data(ctx, kvlist); + + if (kvlist) { + cfl_kvlist_destroy(kvlist); + } + } + else { + cfl_list_foreach(head, kvs) { + cur = cfl_list_entry(head, struct cfl_split_entry, _head); + ret = systemd_process_simple(config, ctx->ins, + (void *)ctx, NULL, + cur->value, cur->len); - if (kvlist) { - cfl_kvlist_destroy(kvlist); + if (ret == -2 || ret == -1) { + continue; + } + } } if (kvs != NULL) { @@ -760,6 +869,11 @@ static struct flb_config_map config_map[] = { 0, FLB_TRUE, offsetof(struct flb_systemd_config, strip_underscores), "Strip undersecores from fields" }, + { + FLB_CONFIG_MAP_BOOL, "compact_key", "true", + 0, FLB_TRUE, offsetof(struct flb_systemd_config, compact_key), + "Do compaction for dupliucated keys into an array" + }, #ifdef FLB_HAVE_SQLDB { FLB_CONFIG_MAP_STR, "db.sync", (char *)NULL, diff --git a/plugins/in_systemd/systemd_config.h b/plugins/in_systemd/systemd_config.h index af789b7ea3b..ef3198a4f42 100644 --- a/plugins/in_systemd/systemd_config.h +++ b/plugins/in_systemd/systemd_config.h @@ -64,6 +64,7 @@ struct flb_systemd_config { int max_fields; /* max number of fields per record */ int max_entries; /* max number of records per iteration */ size_t threshold; /* threshold for retriveing journal */ + int compact_key; /* Unify deprecated keys into an array */ #ifdef FLB_HAVE_SQLDB flb_sds_t db_path; diff --git a/tests/runtime/in_systemd.c b/tests/runtime/in_systemd.c index 098d5ceb0c4..c5614942b75 100644 --- a/tests/runtime/in_systemd.c +++ b/tests/runtime/in_systemd.c @@ -58,6 +58,40 @@ static void cb_check_cfl_variant_properties(void *ctx, int ffd, flb_sds_destroy(output); } +static void cb_check_simply_processed_properties(void *ctx, int ffd, + int res_ret, void *res_data, size_t res_size, + void *data) +{ + flb_sds_t output; + char *result = NULL; + + /* Convert from msgpack to JSON */ + output = flb_msgpack_raw_to_json_sds(res_data, res_size); + TEST_CHECK(output != NULL); + + result = strstr(output, "\"MESSAGE\":\"test native message with multiple values\""); + if (TEST_CHECK(result != NULL)) { + TEST_MSG("output:%s\n", output); + } + + result = strstr(output, "\"KEY\":\"another\""); + if (TEST_CHECK(result != NULL)) { + TEST_MSG("output:%s\n", output); + } + + result = strstr(output, "\"KEY2\":\"final_field\""); + if (TEST_CHECK(result != NULL)) { + TEST_MSG("output:%s\n", output); + } + + result = strstr(output, "\"KEY3\":\"wow\""); + if (TEST_CHECK(result != NULL)) { + TEST_MSG("output:%s\n", output); + } + + flb_sds_destroy(output); +} + void flb_test_duplicated_keys() { int ret; @@ -107,8 +141,59 @@ void flb_test_duplicated_keys() flb_destroy(ctx); } +void flb_test_dont_compact_keys() +{ + int ret; + int in_ffd; + int out_ffd; + flb_ctx_t *ctx; + char *message = "MESSAGE=test native message with multiple values\nKEY=value1\nKEY=value4\n" + "KEY2=value2\nKEY=another\nKEY2=value3\nKEY2=value5\nKEY3=howdy\nKEY3=prettygood\nKEY2=value10\n" + "KEY3=wow\nKEY2=final_field\n"; + + /* Create context, flush every second (some checks omitted here) */ + ctx = flb_create(); + flb_service_set(ctx, + "flush", "2", + "grace", "1", + "Log_Level", "error", + NULL); + + /* Systemd */ + in_ffd = flb_input(ctx, (char *) "systemd", NULL); + flb_input_set(ctx, in_ffd, + "tag", "test", + "Read_From_Tail", "On", + "Compact_Key", "Off", + NULL); + + + out_ffd = flb_output(ctx, (char *) "null", NULL); + flb_output_set(ctx, out_ffd, + "match", "test", + NULL); + + /* Enable test mode */ + ret = flb_input_set_test(ctx, in_ffd, "formatter", + cb_check_simply_processed_properties, + NULL); + + /* Start */ + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Ingest data sample to run test formatter */ + ret = flb_lib_push(ctx, in_ffd, message, strlen(message)); + TEST_CHECK(ret == 0); + + sleep(2); + flb_stop(ctx); + flb_destroy(ctx); +} + /* Test list */ TEST_LIST = { { "duplicated_keys", flb_test_duplicated_keys }, + { "dont_compact_keys", flb_test_dont_compact_keys }, { NULL, NULL} };