Skip to content

config: backport 31a8163, b41742e, and 7130d74 #10344

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: 3.2
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 36 additions & 8 deletions src/flb_config.c
Original file line number Diff line number Diff line change
Expand Up @@ -714,7 +714,7 @@ static int configure_plugins_type(struct flb_config *config, struct flb_cf *cf,
{
int ret;
char *tmp;
char *name;
char *name = NULL;
char *s_type;
struct mk_list *list;
struct mk_list *head;
Expand All @@ -724,7 +724,7 @@ static int configure_plugins_type(struct flb_config *config, struct flb_cf *cf,
struct flb_cf_section *s;
struct flb_cf_group *processors = NULL;
int i;
void *ins;
void *ins = NULL;

if (type == FLB_CF_CUSTOM) {
s_type = "custom";
Expand All @@ -743,7 +743,7 @@ static int configure_plugins_type(struct flb_config *config, struct flb_cf *cf,
list = &cf->outputs;
}
else {
return -1;
goto error;
}

mk_list_foreach(head, list) {
Expand All @@ -752,7 +752,7 @@ static int configure_plugins_type(struct flb_config *config, struct flb_cf *cf,
if (!name) {
flb_error("[config] section '%s' is missing the 'name' property",
s_type);
return -1;
goto error;
}

/* translate the variable */
Expand All @@ -779,9 +779,8 @@ static int configure_plugins_type(struct flb_config *config, struct flb_cf *cf,
flb_error("[config] section '%s' tried to instance a plugin name "
"that doesn't exist", name);
flb_sds_destroy(name);
return -1;
goto error;
}
flb_sds_destroy(name);

/*
* iterate section properties and populate instance by using specific
Expand Down Expand Up @@ -843,25 +842,54 @@ static int configure_plugins_type(struct flb_config *config, struct flb_cf *cf,
flb_error("[config] could not configure property '%s' on "
"%s plugin with section name '%s'",
kv->key, s_type, name);
goto error;
}
}

/* Processors */
processors = flb_cf_group_get(cf, s, "processors");
if (processors) {
if (type == FLB_CF_INPUT) {
flb_processors_load_from_config_format_group(((struct flb_input_instance *) ins)->processor, processors);
ret = flb_processors_load_from_config_format_group(((struct flb_input_instance *) ins)->processor, processors);
if (ret == -1) {
goto error;
}
}
else if (type == FLB_CF_OUTPUT) {
flb_processors_load_from_config_format_group(((struct flb_output_instance *) ins)->processor, processors);
ret = flb_processors_load_from_config_format_group(((struct flb_output_instance *) ins)->processor, processors);
if (ret == -1) {
goto error;
}
}
else {
flb_error("[config] section '%s' does not support processors", s_type);
}
}

flb_sds_destroy(name);
}

return 0;

error:
if (name != NULL) {
flb_sds_destroy(name);
}
if (ins != NULL) {
if (type == FLB_CF_CUSTOM) {
flb_custom_instance_destroy(ins);
}
else if (type == FLB_CF_INPUT) {
flb_input_instance_destroy(ins);
}
else if (type == FLB_CF_FILTER) {
flb_filter_instance_destroy(ins);
}
else if (type == FLB_CF_OUTPUT) {
flb_output_instance_destroy(ins);
}
}
return -1;
}
/* Load a struct flb_config_format context into a flb_config instance */
int flb_config_load_config_format(struct flb_config *config, struct flb_cf *cf)
Expand Down
1 change: 1 addition & 0 deletions src/flb_input.c
Original file line number Diff line number Diff line change
Expand Up @@ -1240,6 +1240,7 @@ int flb_input_instance_init(struct flb_input_instance *ins,
/* initialize processors */
ret = flb_processor_init(ins->processor);
if (ret == -1) {
flb_error("[input %s] error initializing processor, aborting startup", ins->name);
return -1;
}

Expand Down
1 change: 1 addition & 0 deletions src/flb_output.c
Original file line number Diff line number Diff line change
Expand Up @@ -1340,6 +1340,7 @@ int flb_output_init_all(struct flb_config *config)
/* initialize processors */
ret = flb_processor_init(ins->processor);
if (ret == -1) {
flb_error("[output %s] error initializing processor, aborting startup", ins->name);
return -1;
}
}
Expand Down
8 changes: 5 additions & 3 deletions src/flb_processor.c
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ struct flb_processor_unit *flb_processor_unit_create(struct flb_processor *proc,
processor_instance = flb_processor_instance_create(config, pu->event_type, unit_name, NULL);

if (processor_instance == NULL) {
flb_error("[processor] error creating native processor instance %s", pu->name);
flb_error("[processor] error creating processor '%s': plugin doesn't exist or failed to initialize", unit_name);

pthread_mutex_destroy(&pu->lock);
flb_sds_destroy(pu->name);
Expand Down Expand Up @@ -381,6 +381,7 @@ int flb_processor_init(struct flb_processor *proc)
ret = flb_processor_unit_init(pu);

if (ret == -1) {
flb_error("[processor] initialization of processor unit '%s' failed", pu->name);
return -1;
}
count++;
Expand All @@ -391,6 +392,7 @@ int flb_processor_init(struct flb_processor *proc)
ret = flb_processor_unit_init(pu);

if (ret == -1) {
flb_error("[processor] initialization of processor unit '%s' failed", pu->name);
return -1;
}
count++;
Expand All @@ -401,6 +403,7 @@ int flb_processor_init(struct flb_processor *proc)
ret = flb_processor_unit_init(pu);

if (ret == -1) {
flb_error("[processor] initialization of processor unit '%s' failed", pu->name);
return -1;
}
count++;
Expand Down Expand Up @@ -822,7 +825,7 @@ static int load_from_config_format_group(struct flb_processor *proc, int type, s
tmp = cfl_kvlist_fetch(kvlist, "name");

if (!tmp) {
flb_error("processor configuration don't have a 'name' defined");
flb_error("[processor] configuration missing required 'name' field");
return -1;
}

Expand All @@ -831,7 +834,6 @@ static int load_from_config_format_group(struct flb_processor *proc, int type, s
pu = flb_processor_unit_create(proc, type, name);

if (!pu) {
flb_error("cannot create '%s' processor unit", name);
return -1;
}

Expand Down
31 changes: 31 additions & 0 deletions tests/internal/config_format_yaml.c
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */

#include <fluent-bit/flb_info.h>
#include <fluent-bit/flb_input.h>
#include <fluent-bit/flb_output.h>
#include <fluent-bit/flb_mem.h>
#include <fluent-bit/flb_kv.h>
#include <fluent-bit/flb_str.h>
Expand Down Expand Up @@ -833,6 +835,34 @@ static void test_upstream_servers()
flb_cf_destroy(cf);
}

static void test_invalid_property()
{
char* test_cases[] = {
FLB_TESTS_CONF_PATH "/invalid_input_property.yaml",
FLB_TESTS_CONF_PATH "/invalid_output_property.yaml",
NULL,
};

struct flb_cf *cf;
struct flb_config *config;
int ret;
int i;

for (i = 0; test_cases[i] != NULL; i++) {
cf = flb_cf_yaml_create(NULL, test_cases[i], NULL, 0);
TEST_ASSERT(cf != NULL);

config = flb_config_init();
TEST_ASSERT(config != NULL);

ret = flb_config_load_config_format(config, cf);
TEST_ASSERT_(ret == -1, "expected invalid property to return an error in file %s", test_cases[i]);

flb_config_exit(config);
flb_cf_destroy(cf);
}
}

TEST_LIST = {
{ "basic" , test_basic},
{ "customs section", test_customs_section},
Expand All @@ -846,5 +876,6 @@ TEST_LIST = {
{ "stream_processor", test_stream_processor},
{ "plugins", test_plugins},
{ "upstream_servers", test_upstream_servers},
{ "invalid_input_property", test_invalid_property},
{ 0 }
};
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
pipeline:
inputs:
- name: dummy
log_level: thisdoesnotexist
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pipeline:
outputs:
- name: stdout
match: '*'
log_level: thisdoesnotexist
1 change: 1 addition & 0 deletions tests/runtime_shell/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ set(UNIT_TESTS_SH
in_syslog_udp_plaintext_expect.sh
in_syslog_uds_dgram_plaintext_expect.sh
in_syslog_uds_stream_plaintext_expect.sh
processor_invalid.sh
)

# Prepare list of unit tests
Expand Down
67 changes: 67 additions & 0 deletions tests/runtime_shell/processor_invalid.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
#!/bin/sh

# Setup environment if not already set
if [ -z "$FLB_BIN" ]; then
FLB_ROOT=${FLB_ROOT:-$(cd $(dirname $0)/../.. && pwd)}
FLB_BIN=${FLB_BIN:-$FLB_ROOT/build/bin/fluent-bit}
fi

echo "Using Fluent Bit at: $FLB_BIN"

# Create a temporary YAML config file
cat > /tmp/processor_invalid.yaml << EOL
service:
log_level: debug
flush: 1
pipeline:
inputs:
- name: dummy
dummy: '{"message": "test message"}'
tag: test
processors:
logs:
- name: non_existent_processor
action: invalid

outputs:
- name: stdout
match: '*'
EOL

echo "Running Fluent Bit with invalid processor YAML config..."
echo "YAML Config:"
cat /tmp/processor_invalid.yaml

# Redirect stdout and stderr to a file for analysis
OUTPUT_FILE="/tmp/processor_invalid_output.txt"
$FLB_BIN -c /tmp/processor_invalid.yaml -o stdout > $OUTPUT_FILE 2>&1

# Check exit code - we expect it to fail
EXIT_CODE=$?
echo "Fluent Bit exited with code: $EXIT_CODE"

# Show the output
echo "Output file content:"
cat $OUTPUT_FILE

# Check if the output contains an error related to invalid processor
INVALID_PROCESSOR=$(grep -c "error creating processor 'non_existent_processor': plugin doesn't exist or failed to initialize" $OUTPUT_FILE || true)
FAILED_INIT=$(grep -c "error initializing processor" $OUTPUT_FILE || true)

# Clean up
echo "Cleaning up..."
rm -f /tmp/processor_invalid.yaml
rm -f $OUTPUT_FILE

# Check results - we expect Fluent Bit to fail (non-zero exit code)
# and have an error message about the invalid processor
if [ "$EXIT_CODE" -ne 0 ] && ([ "$INVALID_PROCESSOR" -gt 0 ] || [ "$FAILED_INIT" -gt 0 ]); then
echo "Test passed: Fluent Bit failed with error about invalid processor"
exit 0
else
echo "Test failed: Fluent Bit should fail when an invalid processor is configured"
echo "Exit code: $EXIT_CODE (expected non-zero)"
echo "Invalid processor message count: $INVALID_PROCESSOR (expected > 0)"
echo "Failed init message count: $FAILED_INIT (expected > 0)"
exit 1
fi
Loading