Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

filter_ecs: Retrieve container ID from record field #9033

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 125 additions & 28 deletions plugins/filter_ecs/ecs.c
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,11 @@ static int cb_ecs_init(struct flb_filter_instance *f_ins,
}

ctx->ecs_tag_prefix_len = strlen(ctx->ecs_tag_prefix);
if (*ctx->container_id_field_name) {
flb_plg_debug(f_ins, "Using \"%s\" field to retrieve container ID", ctx->container_id_field_name);
} else {
flb_plg_debug(f_ins, "Using tag to retrieve container ID with prefix \"%s\"", ctx->ecs_tag_prefix);
}

/* attempt to get metadata in init, can retry in cb_filter */
ret = get_ecs_cluster_metadata(ctx);
Expand Down Expand Up @@ -1290,36 +1295,83 @@ Metadata Response:
return 0;
}

static int get_metadata_by_id(struct flb_filter_ecs *ctx,
const char *tag, int tag_len,
struct flb_ecs_metadata_buffer **metadata_buffer)
static int deduce_short_container_id_from_tag(struct flb_filter_ecs *ctx,
const char *tag, int tag_len,
flb_sds_t *container_short_id)
{
flb_sds_t container_short_id = NULL;
const char *tmp;
int ret;
size_t size;

if (ctx->ecs_tag_prefix_len + 12 > tag_len) {
flb_plg_warn(ctx->ins, "Tag '%s' length check failed: tag is expected "
"to be or be prefixed with '{ecs_tag_prefix}{12 character container short ID}'",
tag);
"to be or be prefixed with '{ecs_tag_prefix}{12 character container short ID}'",
tag);
return -1;
}

ret = strncmp(ctx->ecs_tag_prefix, tag, ctx->ecs_tag_prefix_len);
if (ret != 0) {
flb_plg_warn(ctx->ins, "Tag '%s' is not prefixed with ecs_tag_prefix '%s'",
tag, ctx->ecs_tag_prefix);
tag, ctx->ecs_tag_prefix);
return -1;
}

tmp = tag + ctx->ecs_tag_prefix_len;
container_short_id = flb_sds_create_len(tmp, 12);
if (!container_short_id) {
*container_short_id = flb_sds_create_len(tmp, 12);

if (!*container_short_id) {
flb_errno();
return -1;
}

return 0;
}

static int deduce_short_container_id_from_record(struct flb_filter_ecs *ctx,
const msgpack_object *record,
flb_sds_t *container_short_id)
{
struct flb_record_accessor *ra_key;
struct flb_ra_value *rval;

ra_key = flb_ra_create(ctx->container_id_field_name, FLB_TRUE);
if (!ra_key) {
flb_errno();
return -1;
}
rval = flb_ra_get_value_object(ra_key, *record);
if (!rval) {
flb_plg_debug(ctx->ins, "Container field \"%s\" not found in record",
ctx->container_id_field_name);
flb_ra_destroy(ra_key);
return -1;
}
flb_ra_destroy(ra_key);
if (rval->type != FLB_RA_STRING || rval->o.via.str.size < 12) {
flb_plg_debug(ctx->ins, "Container field \"%s\" needs to represent a container "
"ID a string of at least 12 characters "
"representing the container ID",
ctx->container_id_field_name);
flb_ra_key_value_destroy(rval);
return -1;
}
*container_short_id = flb_sds_create_len(rval->val.string, 12);
if (!*container_short_id) {
flb_errno();
flb_ra_key_value_destroy(rval);
return -1;
}
flb_ra_key_value_destroy(rval);
return 0;
}

static int get_metadata_by_id(struct flb_filter_ecs *ctx,
const char *tag, const flb_sds_t container_short_id,
struct flb_ecs_metadata_buffer **metadata_buffer)
{
int ret;
size_t size;

/* get metadata for this container */
ret = flb_hash_table_get(ctx->container_hash_table,
container_short_id, flb_sds_len(container_short_id),
Expand All @@ -1329,9 +1381,15 @@ static int get_metadata_by_id(struct flb_filter_ecs *ctx,
/* try fetch metadata */
ret = get_task_metadata(ctx, container_short_id);
if (ret < 0) {
flb_plg_info(ctx->ins, "Requesting metadata from ECS Agent introspection endpoint failed for tag %s",
tag);
flb_sds_destroy(container_short_id);
if (*ctx->container_id_field_name) {
flb_plg_info(ctx->ins, "Requesting metadata from ECS Agent "
"introspection endpoint failed for container ID %s",
container_short_id);
} else {
flb_plg_info(ctx->ins, "Requesting metadata from ECS Agent "
"introspection endpoint failed for tag %s",
tag);
}
return -1;
}
/* get from hash table */
Expand All @@ -1340,7 +1398,6 @@ static int get_metadata_by_id(struct flb_filter_ecs *ctx,
(void **) metadata_buffer, &size);
}

flb_sds_destroy(container_short_id);
return ret;
}

Expand Down Expand Up @@ -1456,6 +1513,7 @@ static int cb_ecs_filter(const void *data, size_t bytes,
struct flb_log_event_encoder log_encoder;
struct flb_log_event_decoder log_decoder;
struct flb_log_event log_event;
flb_sds_t container_short_id = NULL;

(void) f_ins;
(void) i_ins;
Expand All @@ -1471,22 +1529,33 @@ static int cb_ecs_filter(const void *data, size_t bytes,
}
}

/* check if the current tag is marked as failed */
check = is_tag_marked_failed(ctx, tag, tag_len);
if (check == FLB_TRUE) {
flb_plg_debug(ctx->ins, "Failed to get ECS Metadata for tag %s %d times. "
"Will not attempt to retry the metadata request. Will attach cluster metadata only.",
tag, ctx->agent_endpoint_retries);
if (!*ctx->container_id_field_name) {
/* check if the current tag is marked as failed */
check = is_tag_marked_failed(ctx, tag, tag_len);
if (check == FLB_TRUE) {
flb_plg_debug(ctx->ins, "Failed to get ECS Metadata for tag %s %d times. "
"Will not attempt to retry the metadata request. "
"Will attach cluster metadata only.",
tag, ctx->agent_endpoint_retries);
}
}

if (check == FLB_FALSE && ctx->cluster_metadata_only == FLB_FALSE) {
ret = get_metadata_by_id(ctx, tag, tag_len, &metadata_buffer);
if (ret == -1) {
flb_plg_info(ctx->ins, "Failed to get ECS Task metadata for %s, "
"falling back to process cluster metadata only. If "
"this is intentional, set `Cluster_Metadata_Only On`",
tag);
mark_tag_failed(ctx, tag, tag_len);
if (check == FLB_FALSE && ctx->cluster_metadata_only == FLB_FALSE
&& !*ctx->container_id_field_name) {
ret = deduce_short_container_id_from_tag(ctx, tag, tag_len, &container_short_id);
if (ret >= 0) {
ret = get_metadata_by_id(ctx, tag, container_short_id, &metadata_buffer);
if (ret == -1) {
flb_plg_info(ctx->ins, "Failed to get ECS Task metadata for tag %s, "
"falling back to process cluster metadata only. If "
"this is intentional, set `Cluster_Metadata_Only On`",
tag);
mark_tag_failed(ctx, tag, tag_len);
metadata_buffer = &ctx->cluster_meta_buf;
}
flb_sds_destroy(container_short_id);
} else {
flb_plg_warn(ctx->ins, "Could not retrieve container ID using tag");
metadata_buffer = &ctx->cluster_meta_buf;
}
} else {
Expand Down Expand Up @@ -1521,6 +1590,27 @@ static int cb_ecs_filter(const void *data, size_t bytes,
&log_event)) == FLB_EVENT_DECODER_SUCCESS) {
obj = log_event.body;

if (*ctx->container_id_field_name) {
container_short_id = NULL;
ret = deduce_short_container_id_from_record(ctx,
log_event.body,
&container_short_id);
if (ret >= 0) {
ret = get_metadata_by_id(ctx, tag, container_short_id, &metadata_buffer);
if (ret == -1) {
flb_plg_warn(ctx->ins, "Failed to get ECS Metadata for "
"container_id %s using field name %s. Will attach "
"cluster metadata only.",
container_short_id, ctx->container_id_field_name);
metadata_buffer = &ctx->cluster_meta_buf;
}
} else {
flb_plg_warn(ctx->ins, "Could not retrieve container ID using record");
metadata_buffer = &ctx->cluster_meta_buf;
}
flb_sds_destroy(container_short_id);
}

ret = flb_log_event_encoder_begin_record(&log_encoder);

if (ret == FLB_EVENT_ENCODER_SUCCESS) {
Expand Down Expand Up @@ -1703,6 +1793,13 @@ static struct flb_config_map config_map[] = {
"(cluster name, container instance ID/ARN, and ECS Agent version)."
},

{
FLB_CONFIG_MAP_STR, "container_id_field_name", "",
0, FLB_TRUE, offsetof(struct flb_filter_ecs, container_id_field_name),
"The field name to retrieve the container ID from. "
"Supersedes ecs_tag_prefix when set."
},

{
FLB_CONFIG_MAP_BOOL, "cluster_metadata_only", "false",
0, FLB_TRUE, offsetof(struct flb_filter_ecs, cluster_metadata_only),
Expand Down
1 change: 1 addition & 0 deletions plugins/filter_ecs/ecs.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ struct flb_filter_ecs {

int ecs_meta_cache_ttl;
char *ecs_tag_prefix;
char *container_id_field_name;
int ecs_tag_prefix_len;
int cluster_metadata_only;
};
Expand Down
Loading
Loading