Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

in_systemd: tests: Provide restoring way the previous behavior #9764

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
208 changes: 161 additions & 47 deletions plugins/in_systemd/systemd.c
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,71 @@ static int systemd_enumerate_data_store(struct flb_config *config,
return -1;
}

static int systemd_process_simple(struct flb_config *config,
struct flb_input_instance *ins,
void *plugin_context,
void *format_context,
const void *data, size_t data_size)
{
int i;
int ret;
int len;
size_t length = data_size;
char *buf = NULL;
const char *sep;
const char *key;
const char *val;
struct flb_systemd_config *ctx = plugin_context;

key = (const char *) data;
sep = strchr(key, '=');
if (sep == NULL) {
return -2;
}

len = (sep - key);

ret = flb_log_event_encoder_append_body_string_length(
ctx->log_encoder, len);

if (ctx->lowercase == FLB_TRUE) {
/*
* Ensure buf to have enough space for the key because the libsystemd
* might return larger data than the threshold.
*/
if (buf == NULL) {
buf = flb_sds_create_len(NULL, ctx->threshold);
}
if (flb_sds_alloc(buf) < len) {
buf = flb_sds_increase(buf, len - flb_sds_alloc(buf));
}
for (i = 0; i < len; i++) {
buf[i] = tolower(key[i]);
}

if (ret == FLB_EVENT_ENCODER_SUCCESS) {
ret = flb_log_event_encoder_append_body_string_body(
ctx->log_encoder, buf, len);
}
}
else {
if (ret == FLB_EVENT_ENCODER_SUCCESS) {
ret = flb_log_event_encoder_append_body_string_body(
ctx->log_encoder, (char *) key, len);
}
}

val = sep + 1;
len = length - (sep - key) - 1;

if (ret == FLB_EVENT_ENCODER_SUCCESS) {
ret = flb_log_event_encoder_append_body_string(
ctx->log_encoder, (char *) val, len);
}

return 0;
}

static int in_systemd_collect(struct flb_input_instance *ins,
struct flb_config *config, void *in_context)
{
Expand Down Expand Up @@ -387,44 +452,74 @@ static int in_systemd_collect(struct flb_input_instance *ins,
ret = flb_log_event_encoder_set_timestamp(ctx->log_encoder, &tm);
}

/* create an empty kvlist as the labels */
kvlist = cfl_kvlist_create();
if (!kvlist) {
flb_plg_error(ctx->ins, "error allocating kvlist");
break;
}

/* Pack every field in the entry */
entries = 0;
skip_entries = 0;
while (sd_journal_enumerate_data(ctx->j, &data, &length) > 0 &&
entries < ctx->max_fields) {
key = (const char *) data;
if (ctx->strip_underscores == FLB_TRUE && key[0] == '_') {
key++;
length--;
if (ctx->compact_key == FLB_TRUE) {
/* create an empty kvlist as the labels */
kvlist = cfl_kvlist_create();
if (!kvlist) {
flb_plg_error(ctx->ins, "error allocating kvlist");
break;
}

ret = systemd_enumerate_data_store(config, ctx->ins,
(void *)ctx, (void *)kvlist,
key, length);
if (ret == -2) {
skip_entries++;
continue;
}
else if (ret == -1) {
continue;
while (sd_journal_enumerate_data(ctx->j, &data, &length) > 0 &&
entries < ctx->max_fields) {
key = (const char *) data;
if (ctx->strip_underscores == FLB_TRUE && key[0] == '_') {
key++;
length--;
}

ret = systemd_enumerate_data_store(config, ctx->ins,
(void *)ctx, (void *)kvlist,
key, length);
if (ret == -2) {
skip_entries++;
continue;
}
else if (ret == -1) {
continue;
}

entries++;
}
rows++;

entries++;
/* Interpret cfl_kvlist as logs type of events later. */
ret = append_enumerate_data(ctx, kvlist);

if (kvlist) {
cfl_kvlist_destroy(kvlist);
}
}
rows++;
else {
/* Pack every field in the entry */
while (sd_journal_enumerate_data(ctx->j, &data, &length) > 0 &&
entries < ctx->max_fields) {
key = (const char *) data;
if (ctx->strip_underscores == FLB_TRUE && key[0] == '_') {
key++;
length--;
}

/* Interpret cfl_kvlist as logs type of events later. */
ret = append_enumerate_data(ctx, kvlist);
if (ret == FLB_EVENT_ENCODER_SUCCESS) {
ret = systemd_process_simple(config, ctx->ins,
(void *)ctx, NULL,
key, length);
}

if (kvlist) {
cfl_kvlist_destroy(kvlist);
if (ret == -2) {
skip_entries++;
continue;
}
else if (ret == -1) {
continue;
}

entries++;
}
rows++;
}

if (skip_entries > 0) {
Expand Down Expand Up @@ -668,35 +763,49 @@ static int cb_systemd_format_test(struct flb_config *config,
ret = flb_log_event_encoder_set_timestamp(ctx->log_encoder, &tm);
}

/* create an empty kvlist as the labels */
kvlist = cfl_kvlist_create();
if (!kvlist) {
flb_plg_error(ctx->ins, "error allocating kvlist");
return -1;
}

keys = (const char *) data;
kvs = cfl_utils_split(keys, '\n', -1 );
if (kvs == NULL) {
goto split_error;
}

cfl_list_foreach(head, kvs) {
cur = cfl_list_entry(head, struct cfl_split_entry, _head);
ret = systemd_enumerate_data_store(config, ctx->ins,
(void *)ctx, (void *)kvlist,
cur->value, cur->len);
if (ctx->compact_key == FLB_TRUE) {
/* create an empty kvlist as the labels */
kvlist = cfl_kvlist_create();
if (!kvlist) {
flb_plg_error(ctx->ins, "error allocating kvlist");
return -1;
}

cfl_list_foreach(head, kvs) {
cur = cfl_list_entry(head, struct cfl_split_entry, _head);
ret = systemd_enumerate_data_store(config, ctx->ins,
(void *)ctx, (void *)kvlist,
cur->value, cur->len);

if (ret == -2 || ret == -1) {
continue;
if (ret == -2 || ret == -1) {
continue;
}
}
}

/* Interpret cfl_kvlist as logs type of events later. */
ret = append_enumerate_data(ctx, kvlist);
/* Interpret cfl_kvlist as logs type of events later. */
ret = append_enumerate_data(ctx, kvlist);

if (kvlist) {
cfl_kvlist_destroy(kvlist);
}
}
else {
cfl_list_foreach(head, kvs) {
cur = cfl_list_entry(head, struct cfl_split_entry, _head);
ret = systemd_process_simple(config, ctx->ins,
(void *)ctx, NULL,
cur->value, cur->len);

if (kvlist) {
cfl_kvlist_destroy(kvlist);
if (ret == -2 || ret == -1) {
continue;
}
}
}

if (kvs != NULL) {
Expand Down Expand Up @@ -760,6 +869,11 @@ static struct flb_config_map config_map[] = {
0, FLB_TRUE, offsetof(struct flb_systemd_config, strip_underscores),
"Strip undersecores from fields"
},
{
FLB_CONFIG_MAP_BOOL, "compact_key", "true",
0, FLB_TRUE, offsetof(struct flb_systemd_config, compact_key),
"Do compaction for dupliucated keys into an array"
},
#ifdef FLB_HAVE_SQLDB
{
FLB_CONFIG_MAP_STR, "db.sync", (char *)NULL,
Expand Down
1 change: 1 addition & 0 deletions plugins/in_systemd/systemd_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ struct flb_systemd_config {
int max_fields; /* max number of fields per record */
int max_entries; /* max number of records per iteration */
size_t threshold; /* threshold for retriveing journal */
int compact_key; /* Unify deprecated keys into an array */

#ifdef FLB_HAVE_SQLDB
flb_sds_t db_path;
Expand Down
85 changes: 85 additions & 0 deletions tests/runtime/in_systemd.c
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,40 @@ static void cb_check_cfl_variant_properties(void *ctx, int ffd,
flb_sds_destroy(output);
}

static void cb_check_simply_processed_properties(void *ctx, int ffd,
int res_ret, void *res_data, size_t res_size,
void *data)
{
flb_sds_t output;
char *result = NULL;

/* Convert from msgpack to JSON */
output = flb_msgpack_raw_to_json_sds(res_data, res_size);
TEST_CHECK(output != NULL);

result = strstr(output, "\"MESSAGE\":\"test native message with multiple values\"");
if (TEST_CHECK(result != NULL)) {
TEST_MSG("output:%s\n", output);
}

result = strstr(output, "\"KEY\":\"another\"");
if (TEST_CHECK(result != NULL)) {
TEST_MSG("output:%s\n", output);
}

result = strstr(output, "\"KEY2\":\"final_field\"");
if (TEST_CHECK(result != NULL)) {
TEST_MSG("output:%s\n", output);
}

result = strstr(output, "\"KEY3\":\"wow\"");
if (TEST_CHECK(result != NULL)) {
TEST_MSG("output:%s\n", output);
}

flb_sds_destroy(output);
}

void flb_test_duplicated_keys()
{
int ret;
Expand Down Expand Up @@ -107,8 +141,59 @@ void flb_test_duplicated_keys()
flb_destroy(ctx);
}

void flb_test_dont_compact_keys()
{
int ret;
int in_ffd;
int out_ffd;
flb_ctx_t *ctx;
char *message = "MESSAGE=test native message with multiple values\nKEY=value1\nKEY=value4\n"
"KEY2=value2\nKEY=another\nKEY2=value3\nKEY2=value5\nKEY3=howdy\nKEY3=prettygood\nKEY2=value10\n"
"KEY3=wow\nKEY2=final_field\n";

/* Create context, flush every second (some checks omitted here) */
ctx = flb_create();
flb_service_set(ctx,
"flush", "2",
"grace", "1",
"Log_Level", "error",
NULL);

/* Systemd */
in_ffd = flb_input(ctx, (char *) "systemd", NULL);
flb_input_set(ctx, in_ffd,
"tag", "test",
"Read_From_Tail", "On",
"Compact_Key", "Off",
NULL);


out_ffd = flb_output(ctx, (char *) "null", NULL);
flb_output_set(ctx, out_ffd,
"match", "test",
NULL);

/* Enable test mode */
ret = flb_input_set_test(ctx, in_ffd, "formatter",
cb_check_simply_processed_properties,
NULL);

/* Start */
ret = flb_start(ctx);
TEST_CHECK(ret == 0);

/* Ingest data sample to run test formatter */
ret = flb_lib_push(ctx, in_ffd, message, strlen(message));
TEST_CHECK(ret == 0);

sleep(2);
flb_stop(ctx);
flb_destroy(ctx);
}

/* Test list */
TEST_LIST = {
{ "duplicated_keys", flb_test_duplicated_keys },
{ "dont_compact_keys", flb_test_dont_compact_keys },
{ NULL, NULL}
};
Loading