Skip to content

Commit

Permalink
filter_ecs: Retrieve container ID from record field
Browse files Browse the repository at this point in the history
  • Loading branch information
RaJiska committed Jul 2, 2024
1 parent 13f96f9 commit 85c937c
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 28 deletions.
144 changes: 116 additions & 28 deletions plugins/filter_ecs/ecs.c
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,11 @@ static int cb_ecs_init(struct flb_filter_instance *f_ins,
}

ctx->ecs_tag_prefix_len = strlen(ctx->ecs_tag_prefix);
if (*ctx->container_id_field_name) {
flb_plg_debug(f_ins, "Using \"%s\" field to retrieve container ID", ctx->container_id_field_name);
} else {
flb_plg_debug(f_ins, "Using tag to retrieve container ID with prefix \"%s\"", ctx->ecs_tag_prefix);
}

/* attempt to get metadata in init, can retry in cb_filter */
ret = get_ecs_cluster_metadata(ctx);
Expand Down Expand Up @@ -1290,36 +1295,75 @@ Metadata Response:
return 0;
}

static int get_metadata_by_id(struct flb_filter_ecs *ctx,
const char *tag, int tag_len,
struct flb_ecs_metadata_buffer **metadata_buffer)
static int deduce_short_container_id_from_tag(struct flb_filter_ecs *ctx,
const char *tag, int tag_len,
flb_sds_t *container_short_id)
{
flb_sds_t container_short_id = NULL;
const char *tmp;
int ret;
size_t size;

if (ctx->ecs_tag_prefix_len + 12 > tag_len) {
flb_plg_warn(ctx->ins, "Tag '%s' length check failed: tag is expected "
"to be or be prefixed with '{ecs_tag_prefix}{12 character container short ID}'",
tag);
"to be or be prefixed with '{ecs_tag_prefix}{12 character container short ID}'",
tag);
return -1;
}

ret = strncmp(ctx->ecs_tag_prefix, tag, ctx->ecs_tag_prefix_len);
if (ret != 0) {
flb_plg_warn(ctx->ins, "Tag '%s' is not prefixed with ecs_tag_prefix '%s'",
tag, ctx->ecs_tag_prefix);
tag, ctx->ecs_tag_prefix);
return -1;
}

tmp = tag + ctx->ecs_tag_prefix_len;
container_short_id = flb_sds_create_len(tmp, 12);
if (!container_short_id) {
*container_short_id = flb_sds_create_len(tmp, 12);

if (!*container_short_id) {
flb_errno();
return -1;
}

return 0;
}

static int deduce_short_container_id_from_record(struct flb_filter_ecs *ctx,
const msgpack_object *record,
flb_sds_t *container_short_id)
{
struct flb_record_accessor *ra_key;
struct flb_ra_value *rval;

ra_key = flb_ra_create(ctx->container_id_field_name, FLB_TRUE);
if (!ra_key) {
flb_errno();
return -1;
}
rval = flb_ra_get_value_object(ra_key, *record);
if (!rval) {
flb_plg_debug(ctx->ins, "Container field \"%s\" not found in record",
ctx->container_id_field_name);
flb_ra_destroy(ra_key);
return -1;
}
flb_ra_destroy(ra_key);
*container_short_id = flb_sds_create_len(rval->val.string, 12);
if (!*container_short_id) {
flb_errno();
flb_ra_key_value_destroy(rval);
return -1;
}
flb_ra_key_value_destroy(rval);
return 0;
}

static int get_metadata_by_id(struct flb_filter_ecs *ctx,
const char *tag, const flb_sds_t container_short_id,
struct flb_ecs_metadata_buffer **metadata_buffer)
{
int ret;
size_t size;

/* get metadata for this container */
ret = flb_hash_table_get(ctx->container_hash_table,
container_short_id, flb_sds_len(container_short_id),
Expand All @@ -1329,9 +1373,14 @@ static int get_metadata_by_id(struct flb_filter_ecs *ctx,
/* try fetch metadata */
ret = get_task_metadata(ctx, container_short_id);
if (ret < 0) {
flb_plg_info(ctx->ins, "Requesting metadata from ECS Agent introspection endpoint failed for tag %s",
tag);
flb_sds_destroy(container_short_id);
if (*ctx->container_id_field_name) {
flb_plg_info(ctx->ins, "Requesting metadata from ECS Agent introspection endpoint failed for "
"container ID %s",
container_short_id);
} else {
flb_plg_info(ctx->ins, "Requesting metadata from ECS Agent introspection endpoint failed for tag %s",
tag);
}
return -1;
}
/* get from hash table */
Expand All @@ -1340,7 +1389,6 @@ static int get_metadata_by_id(struct flb_filter_ecs *ctx,
(void **) metadata_buffer, &size);
}

flb_sds_destroy(container_short_id);
return ret;
}

Expand Down Expand Up @@ -1456,6 +1504,7 @@ static int cb_ecs_filter(const void *data, size_t bytes,
struct flb_log_event_encoder log_encoder;
struct flb_log_event_decoder log_decoder;
struct flb_log_event log_event;
flb_sds_t container_short_id = NULL;

(void) f_ins;
(void) i_ins;
Expand All @@ -1471,22 +1520,33 @@ static int cb_ecs_filter(const void *data, size_t bytes,
}
}

/* check if the current tag is marked as failed */
check = is_tag_marked_failed(ctx, tag, tag_len);
if (check == FLB_TRUE) {
flb_plg_debug(ctx->ins, "Failed to get ECS Metadata for tag %s %d times. "
"Will not attempt to retry the metadata request. Will attach cluster metadata only.",
tag, ctx->agent_endpoint_retries);
if (!*ctx->container_id_field_name) {
/* check if the current tag is marked as failed */
check = is_tag_marked_failed(ctx, tag, tag_len);
if (check == FLB_TRUE) {
flb_plg_debug(ctx->ins, "Failed to get ECS Metadata for tag %s %d times. "
"Will not attempt to retry the metadata request. "
"Will attach cluster metadata only.",
tag, ctx->agent_endpoint_retries);
}
}

if (check == FLB_FALSE && ctx->cluster_metadata_only == FLB_FALSE) {
ret = get_metadata_by_id(ctx, tag, tag_len, &metadata_buffer);
if (ret == -1) {
flb_plg_info(ctx->ins, "Failed to get ECS Task metadata for %s, "
"falling back to process cluster metadata only. If "
"this is intentional, set `Cluster_Metadata_Only On`",
tag);
mark_tag_failed(ctx, tag, tag_len);
if (check == FLB_FALSE && ctx->cluster_metadata_only == FLB_FALSE
&& !*ctx->container_id_field_name) {
ret = deduce_short_container_id_from_tag(ctx, tag, tag_len, &container_short_id);
if (ret >= 0) {
ret = get_metadata_by_id(ctx, tag, container_short_id, &metadata_buffer);
if (ret == -1) {
flb_plg_info(ctx->ins, "Failed to get ECS Task metadata for tag %s, "
"falling back to process cluster metadata only. If "
"this is intentional, set `Cluster_Metadata_Only On`",
tag);
mark_tag_failed(ctx, tag, tag_len);
metadata_buffer = &ctx->cluster_meta_buf;
}
flb_sds_destroy(container_short_id);
} else {
flb_plg_warn(ctx->ins, "Could not retrieve container ID using tag");
metadata_buffer = &ctx->cluster_meta_buf;
}
} else {
Expand Down Expand Up @@ -1521,6 +1581,27 @@ static int cb_ecs_filter(const void *data, size_t bytes,
&log_event)) == FLB_EVENT_DECODER_SUCCESS) {
obj = log_event.body;

if (*ctx->container_id_field_name) {
container_short_id = NULL;
ret = deduce_short_container_id_from_record(ctx,
log_event.body,
&container_short_id);
if (ret >= 0) {
ret = get_metadata_by_id(ctx, tag, container_short_id, &metadata_buffer);
if (ret == -1) {
flb_plg_warn(ctx->ins, "Failed to get ECS Metadata for "
"container_id %s using field name %s. Will attach "
"cluster metadata only.",
container_short_id, ctx->container_id_field_name);
metadata_buffer = &ctx->cluster_meta_buf;
}
} else {
flb_plg_warn(ctx->ins, "Could not retrieve container ID using record");
metadata_buffer = &ctx->cluster_meta_buf;
}
flb_sds_destroy(container_short_id);
}

ret = flb_log_event_encoder_begin_record(&log_encoder);

if (ret == FLB_EVENT_ENCODER_SUCCESS) {
Expand Down Expand Up @@ -1703,6 +1784,13 @@ static struct flb_config_map config_map[] = {
"(cluster name, container instance ID/ARN, and ECS Agent version)."
},

{
FLB_CONFIG_MAP_STR, "container_id_field_name", "",
0, FLB_TRUE, offsetof(struct flb_filter_ecs, container_id_field_name),
"The field name to retrieve the container ID from. "
"Supersedes ecs_tag_prefix when set."
},

{
FLB_CONFIG_MAP_BOOL, "cluster_metadata_only", "false",
0, FLB_TRUE, offsetof(struct flb_filter_ecs, cluster_metadata_only),
Expand Down
1 change: 1 addition & 0 deletions plugins/filter_ecs/ecs.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ struct flb_filter_ecs {

int ecs_meta_cache_ttl;
char *ecs_tag_prefix;
char *container_id_field_name;
int ecs_tag_prefix_len;
int cluster_metadata_only;
};
Expand Down

0 comments on commit 85c937c

Please sign in to comment.