From f14ac5f014c75502fa20a8c78de856cb3b0629f9 Mon Sep 17 00:00:00 2001 From: Ra'Jiska Date: Tue, 2 Jul 2024 22:50:31 +0900 Subject: [PATCH 1/3] filter_ecs: Retrieve container ID from record field Signed-off-by: Ra'Jiska --- plugins/filter_ecs/ecs.c | 144 +++++++++++++++++++++++++++++++-------- plugins/filter_ecs/ecs.h | 1 + 2 files changed, 117 insertions(+), 28 deletions(-) diff --git a/plugins/filter_ecs/ecs.c b/plugins/filter_ecs/ecs.c index 1784a58a3d1..ed6b6e42b2b 100644 --- a/plugins/filter_ecs/ecs.c +++ b/plugins/filter_ecs/ecs.c @@ -199,6 +199,11 @@ static int cb_ecs_init(struct flb_filter_instance *f_ins, } ctx->ecs_tag_prefix_len = strlen(ctx->ecs_tag_prefix); + if (*ctx->container_id_field_name) { + flb_plg_debug(f_ins, "Using \"%s\" field to retrieve container ID", ctx->container_id_field_name); + } else { + flb_plg_debug(f_ins, "Using tag to retrieve container ID with prefix \"%s\"", ctx->ecs_tag_prefix); + } /* attempt to get metadata in init, can retry in cb_filter */ ret = get_ecs_cluster_metadata(ctx); @@ -1290,36 +1295,75 @@ Metadata Response: return 0; } -static int get_metadata_by_id(struct flb_filter_ecs *ctx, - const char *tag, int tag_len, - struct flb_ecs_metadata_buffer **metadata_buffer) +static int deduce_short_container_id_from_tag(struct flb_filter_ecs *ctx, + const char *tag, int tag_len, + flb_sds_t *container_short_id) { - flb_sds_t container_short_id = NULL; const char *tmp; int ret; - size_t size; if (ctx->ecs_tag_prefix_len + 12 > tag_len) { flb_plg_warn(ctx->ins, "Tag '%s' length check failed: tag is expected " - "to be or be prefixed with '{ecs_tag_prefix}{12 character container short ID}'", - tag); + "to be or be prefixed with '{ecs_tag_prefix}{12 character container short ID}'", + tag); return -1; } ret = strncmp(ctx->ecs_tag_prefix, tag, ctx->ecs_tag_prefix_len); if (ret != 0) { flb_plg_warn(ctx->ins, "Tag '%s' is not prefixed with ecs_tag_prefix '%s'", - tag, ctx->ecs_tag_prefix); + tag, ctx->ecs_tag_prefix); return -1; } tmp = tag + ctx->ecs_tag_prefix_len; - container_short_id = flb_sds_create_len(tmp, 12); - if (!container_short_id) { + *container_short_id = flb_sds_create_len(tmp, 12); + + if (!*container_short_id) { flb_errno(); return -1; } + return 0; +} + +static int deduce_short_container_id_from_record(struct flb_filter_ecs *ctx, + const msgpack_object *record, + flb_sds_t *container_short_id) +{ + struct flb_record_accessor *ra_key; + struct flb_ra_value *rval; + + ra_key = flb_ra_create(ctx->container_id_field_name, FLB_TRUE); + if (!ra_key) { + flb_errno(); + return -1; + } + rval = flb_ra_get_value_object(ra_key, *record); + if (!rval) { + flb_plg_debug(ctx->ins, "Container field \"%s\" not found in record", + ctx->container_id_field_name); + flb_ra_destroy(ra_key); + return -1; + } + flb_ra_destroy(ra_key); + *container_short_id = flb_sds_create_len(rval->val.string, 12); + if (!*container_short_id) { + flb_errno(); + flb_ra_key_value_destroy(rval); + return -1; + } + flb_ra_key_value_destroy(rval); + return 0; +} + +static int get_metadata_by_id(struct flb_filter_ecs *ctx, + const char *tag, const flb_sds_t container_short_id, + struct flb_ecs_metadata_buffer **metadata_buffer) +{ + int ret; + size_t size; + /* get metadata for this container */ ret = flb_hash_table_get(ctx->container_hash_table, container_short_id, flb_sds_len(container_short_id), @@ -1329,9 +1373,14 @@ static int get_metadata_by_id(struct flb_filter_ecs *ctx, /* try fetch metadata */ ret = get_task_metadata(ctx, container_short_id); if (ret < 0) { - flb_plg_info(ctx->ins, "Requesting metadata from ECS Agent introspection endpoint failed for tag %s", - tag); - flb_sds_destroy(container_short_id); + if (*ctx->container_id_field_name) { + flb_plg_info(ctx->ins, "Requesting metadata from ECS Agent introspection endpoint failed for " + "container ID %s", + container_short_id); + } else { + flb_plg_info(ctx->ins, "Requesting metadata from ECS Agent introspection endpoint failed for tag %s", + tag); + } return -1; } /* get from hash table */ @@ -1340,7 +1389,6 @@ static int get_metadata_by_id(struct flb_filter_ecs *ctx, (void **) metadata_buffer, &size); } - flb_sds_destroy(container_short_id); return ret; } @@ -1456,6 +1504,7 @@ static int cb_ecs_filter(const void *data, size_t bytes, struct flb_log_event_encoder log_encoder; struct flb_log_event_decoder log_decoder; struct flb_log_event log_event; + flb_sds_t container_short_id = NULL; (void) f_ins; (void) i_ins; @@ -1471,22 +1520,33 @@ static int cb_ecs_filter(const void *data, size_t bytes, } } - /* check if the current tag is marked as failed */ - check = is_tag_marked_failed(ctx, tag, tag_len); - if (check == FLB_TRUE) { - flb_plg_debug(ctx->ins, "Failed to get ECS Metadata for tag %s %d times. " - "Will not attempt to retry the metadata request. Will attach cluster metadata only.", - tag, ctx->agent_endpoint_retries); + if (!*ctx->container_id_field_name) { + /* check if the current tag is marked as failed */ + check = is_tag_marked_failed(ctx, tag, tag_len); + if (check == FLB_TRUE) { + flb_plg_debug(ctx->ins, "Failed to get ECS Metadata for tag %s %d times. " + "Will not attempt to retry the metadata request. " + "Will attach cluster metadata only.", + tag, ctx->agent_endpoint_retries); + } } - if (check == FLB_FALSE && ctx->cluster_metadata_only == FLB_FALSE) { - ret = get_metadata_by_id(ctx, tag, tag_len, &metadata_buffer); - if (ret == -1) { - flb_plg_info(ctx->ins, "Failed to get ECS Task metadata for %s, " - "falling back to process cluster metadata only. If " - "this is intentional, set `Cluster_Metadata_Only On`", - tag); - mark_tag_failed(ctx, tag, tag_len); + if (check == FLB_FALSE && ctx->cluster_metadata_only == FLB_FALSE + && !*ctx->container_id_field_name) { + ret = deduce_short_container_id_from_tag(ctx, tag, tag_len, &container_short_id); + if (ret >= 0) { + ret = get_metadata_by_id(ctx, tag, container_short_id, &metadata_buffer); + if (ret == -1) { + flb_plg_info(ctx->ins, "Failed to get ECS Task metadata for tag %s, " + "falling back to process cluster metadata only. If " + "this is intentional, set `Cluster_Metadata_Only On`", + tag); + mark_tag_failed(ctx, tag, tag_len); + metadata_buffer = &ctx->cluster_meta_buf; + } + flb_sds_destroy(container_short_id); + } else { + flb_plg_warn(ctx->ins, "Could not retrieve container ID using tag"); metadata_buffer = &ctx->cluster_meta_buf; } } else { @@ -1521,6 +1581,27 @@ static int cb_ecs_filter(const void *data, size_t bytes, &log_event)) == FLB_EVENT_DECODER_SUCCESS) { obj = log_event.body; + if (*ctx->container_id_field_name) { + container_short_id = NULL; + ret = deduce_short_container_id_from_record(ctx, + log_event.body, + &container_short_id); + if (ret >= 0) { + ret = get_metadata_by_id(ctx, tag, container_short_id, &metadata_buffer); + if (ret == -1) { + flb_plg_warn(ctx->ins, "Failed to get ECS Metadata for " + "container_id %s using field name %s. Will attach " + "cluster metadata only.", + container_short_id, ctx->container_id_field_name); + metadata_buffer = &ctx->cluster_meta_buf; + } + } else { + flb_plg_warn(ctx->ins, "Could not retrieve container ID using record"); + metadata_buffer = &ctx->cluster_meta_buf; + } + flb_sds_destroy(container_short_id); + } + ret = flb_log_event_encoder_begin_record(&log_encoder); if (ret == FLB_EVENT_ENCODER_SUCCESS) { @@ -1703,6 +1784,13 @@ static struct flb_config_map config_map[] = { "(cluster name, container instance ID/ARN, and ECS Agent version)." }, + { + FLB_CONFIG_MAP_STR, "container_id_field_name", "", + 0, FLB_TRUE, offsetof(struct flb_filter_ecs, container_id_field_name), + "The field name to retrieve the container ID from. " + "Supersedes ecs_tag_prefix when set." + }, + { FLB_CONFIG_MAP_BOOL, "cluster_metadata_only", "false", 0, FLB_TRUE, offsetof(struct flb_filter_ecs, cluster_metadata_only), diff --git a/plugins/filter_ecs/ecs.h b/plugins/filter_ecs/ecs.h index 7afd941c358..7fbc82d6e52 100644 --- a/plugins/filter_ecs/ecs.h +++ b/plugins/filter_ecs/ecs.h @@ -145,6 +145,7 @@ struct flb_filter_ecs { int ecs_meta_cache_ttl; char *ecs_tag_prefix; + char *container_id_field_name; int ecs_tag_prefix_len; int cluster_metadata_only; }; From 901be3b82769c676340775fad452ba9a346acf0e Mon Sep 17 00:00:00 2001 From: Ra'Jiska Date: Wed, 3 Jul 2024 12:33:38 +0900 Subject: [PATCH 2/3] filter_ecs_test: Add tests for container_id_field_name parameter Signed-off-by: Ra'Jiska --- tests/runtime/filter_ecs.c | 150 +++++++++++++++++++++++++++++++++++++ 1 file changed, 150 insertions(+) diff --git a/tests/runtime/filter_ecs.c b/tests/runtime/filter_ecs.c index 79868294beb..236d445c2c9 100644 --- a/tests/runtime/filter_ecs.c +++ b/tests/runtime/filter_ecs.c @@ -432,6 +432,153 @@ static void flb_test_ecs_filter_task_error() filter_test_destroy(ctx); } +static void flb_test_ecs_filter_containerid_field() +{ + int len; + int ret; + int bytes; + char *p; + struct flb_lib_out_cb cb_data; + struct filter_test *ctx; + struct filter_test_result expected = { 0 }; + + /* mocks calls- signals that we are in test mode */ + setenv("FLB_ECS_PLUGIN_UNDER_TEST", "true", 1); + + /* Create test context */ + ctx = filter_test_create((void *) &cb_data, "randomtag"); + if (!ctx) { + exit(EXIT_FAILURE); + } + + /* Configure filter */ + ret = flb_filter_set(ctx->flb, ctx->f_ffd, + "container_id_field_name", "container_id", + "ADD", "resource $ClusterName.$TaskID.$ECSContainerName", + NULL); + TEST_CHECK(ret == 0); + + /* Prepare output callback with expected result */ + expected.expected_records = 1; /* 1 record with metadata added */ + expected.expected_pattern = "cluster_name.e01d58a8-151b-40e8-bc01-22647b9ecfec.nginx"; + expected.expected_pattern_index = 0; + cb_data.cb = cb_check_result; + cb_data.data = (void *) &expected; + + /* Start the engine */ + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + /* Ingest data samples */ + p = "[0, {\"log\":\"error: my error\",\"container_id\":\"79c796ed2a7f864f485c76f83f3165488097279d296a7c05bd5201a1c69b2920\"}]"; + len = strlen(p); + bytes = flb_lib_push(ctx->flb, ctx->i_ffd, p, len); + TEST_CHECK(bytes == len); + + /* check number of outputted records */ + sleep(2); + TEST_CHECK(expected.actual_records == expected.expected_records); + filter_test_destroy(ctx); +} + +static void flb_test_ecs_filter_containerid_field_error_missing() +{ + int len; + int ret; + int bytes; + char *p; + struct flb_lib_out_cb cb_data; + struct filter_test *ctx; + struct filter_test_result expected = { 0 }; + + /* mocks calls- signals that we are in test mode */ + setenv("FLB_ECS_PLUGIN_UNDER_TEST", "true", 1); + + /* Create test context */ + ctx = filter_test_create((void *) &cb_data, "randomtag"); + if (!ctx) { + exit(EXIT_FAILURE); + } + + /* Configure filter */ + ret = flb_filter_set(ctx->flb, ctx->f_ffd, + "container_id_field_name", "missing_field", + "ADD", "resource $ClusterName.$TaskID.$ECSContainerName", + NULL); + TEST_CHECK(ret == 0); + + /* Prepare output callback with expected result */ + expected.expected_records = 1; /* 1 record with metadata added */ + expected.expected_pattern = "cluster_name.."; + expected.expected_pattern_index = 0; + cb_data.cb = cb_check_result; + cb_data.data = (void *) &expected; + + /* Start the engine */ + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + /* Ingest data samples */ + p = "[0, {\"log\":\"error: my error\",\"container_id\":\"79c796ed2a7f864f485c76f83f3165488097279d296a7c05bd5201a1c69b2920\"}]"; + len = strlen(p); + bytes = flb_lib_push(ctx->flb, ctx->i_ffd, p, len); + TEST_CHECK(bytes == len); + + /* check number of outputted records */ + sleep(2); + TEST_CHECK(expected.actual_records == expected.expected_records); + filter_test_destroy(ctx); +} + +static void flb_test_ecs_filter_containerid_field_error_invalid() +{ + int len; + int ret; + int bytes; + char *p; + struct flb_lib_out_cb cb_data; + struct filter_test *ctx; + struct filter_test_result expected = { 0 }; + + /* mocks calls- signals that we are in test mode */ + setenv("FLB_ECS_PLUGIN_UNDER_TEST", "true", 1); + + /* Create test context */ + ctx = filter_test_create((void *) &cb_data, "randomtag"); + if (!ctx) { + exit(EXIT_FAILURE); + } + + /* Configure filter */ + ret = flb_filter_set(ctx->flb, ctx->f_ffd, + "container_id_field_name", "container_id", + "ADD", "resource $ClusterName.$TaskID.$ECSContainerName", + NULL); + TEST_CHECK(ret == 0); + + /* Prepare output callback with expected result */ + expected.expected_records = 1; /* 1 record with metadata added */ + expected.expected_pattern = "cluster_name.."; + expected.expected_pattern_index = 0; + cb_data.cb = cb_check_result; + cb_data.data = (void *) &expected; + + /* Start the engine */ + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + /* Ingest data samples */ + p = "[0, {\"log\":\"error: my error\",\"container_id\":\"random\"}]"; + len = strlen(p); + bytes = flb_lib_push(ctx->flb, ctx->i_ffd, p, len); + TEST_CHECK(bytes == len); + + /* check number of outputted records */ + sleep(2); + TEST_CHECK(expected.actual_records == expected.expected_records); + filter_test_destroy(ctx); +} + TEST_LIST = { {"flb_test_ecs_filter_mark_tag_failed" , flb_test_ecs_filter_mark_tag_failed }, @@ -440,6 +587,9 @@ TEST_LIST = { {"flb_test_ecs_filter_cluster_metadata_only" , flb_test_ecs_filter_cluster_metadata_only }, {"flb_test_ecs_filter_cluster_error" , flb_test_ecs_filter_cluster_error }, {"flb_test_ecs_filter_task_error" , flb_test_ecs_filter_task_error }, + {"flb_test_ecs_filter_containerid_field" , flb_test_ecs_filter_containerid_field }, + {"flb_test_ecs_filter_containerid_field_error_missing" , flb_test_ecs_filter_containerid_field_error_missing }, + {"flb_test_ecs_filter_containerid_field_error_invalid" , flb_test_ecs_filter_containerid_field_error_invalid }, {NULL, NULL} }; From 59cd20b3b3db5a7b4c675efad1998fd54ec1c9b2 Mon Sep 17 00:00:00 2001 From: Ra'Jiska Date: Sat, 6 Jul 2024 16:34:36 +0900 Subject: [PATCH 3/3] Verify container_id field before processing Signed-off-by: Ra'Jiska --- plugins/filter_ecs/ecs.c | 19 ++++++++++++++----- tests/runtime/filter_ecs.c | 8 +++++++- 2 files changed, 21 insertions(+), 6 deletions(-) diff --git a/plugins/filter_ecs/ecs.c b/plugins/filter_ecs/ecs.c index ed6b6e42b2b..4323a56c8d2 100644 --- a/plugins/filter_ecs/ecs.c +++ b/plugins/filter_ecs/ecs.c @@ -1347,6 +1347,14 @@ static int deduce_short_container_id_from_record(struct flb_filter_ecs *ctx, return -1; } flb_ra_destroy(ra_key); + if (rval->type != FLB_RA_STRING || rval->o.via.str.size < 12) { + flb_plg_debug(ctx->ins, "Container field \"%s\" needs to represent a container " + "ID a string of at least 12 characters " + "representing the container ID", + ctx->container_id_field_name); + flb_ra_key_value_destroy(rval); + return -1; + } *container_short_id = flb_sds_create_len(rval->val.string, 12); if (!*container_short_id) { flb_errno(); @@ -1374,12 +1382,13 @@ static int get_metadata_by_id(struct flb_filter_ecs *ctx, ret = get_task_metadata(ctx, container_short_id); if (ret < 0) { if (*ctx->container_id_field_name) { - flb_plg_info(ctx->ins, "Requesting metadata from ECS Agent introspection endpoint failed for " - "container ID %s", - container_short_id); + flb_plg_info(ctx->ins, "Requesting metadata from ECS Agent " + "introspection endpoint failed for container ID %s", + container_short_id); } else { - flb_plg_info(ctx->ins, "Requesting metadata from ECS Agent introspection endpoint failed for tag %s", - tag); + flb_plg_info(ctx->ins, "Requesting metadata from ECS Agent " + "introspection endpoint failed for tag %s", + tag); } return -1; } diff --git a/tests/runtime/filter_ecs.c b/tests/runtime/filter_ecs.c index 236d445c2c9..c1337d69bb8 100644 --- a/tests/runtime/filter_ecs.c +++ b/tests/runtime/filter_ecs.c @@ -557,7 +557,7 @@ static void flb_test_ecs_filter_containerid_field_error_invalid() TEST_CHECK(ret == 0); /* Prepare output callback with expected result */ - expected.expected_records = 1; /* 1 record with metadata added */ + expected.expected_records = 2; /* 2 record with metadata added */ expected.expected_pattern = "cluster_name.."; expected.expected_pattern_index = 0; cb_data.cb = cb_check_result; @@ -572,6 +572,12 @@ static void flb_test_ecs_filter_containerid_field_error_invalid() len = strlen(p); bytes = flb_lib_push(ctx->flb, ctx->i_ffd, p, len); TEST_CHECK(bytes == len); + sleep(1); + + p = "[0, {\"log\":\"error: my error\",\"container_id\":123456789012}]"; + len = strlen(p); + bytes = flb_lib_push(ctx->flb, ctx->i_ffd, p, len); + TEST_CHECK(bytes == len); /* check number of outputted records */ sleep(2);