Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cloudwatch_logs: remove sequence tokens from API calls #7973

Merged
merged 1 commit into from
Oct 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 11 additions & 103 deletions plugins/out_cloudwatch_logs/cloudwatch_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,7 @@
#include "cloudwatch_api.h"

#define ERR_CODE_ALREADY_EXISTS "ResourceAlreadyExistsException"
#define ERR_CODE_INVALID_SEQUENCE_TOKEN "InvalidSequenceTokenException"
#define ERR_CODE_NOT_FOUND "ResourceNotFoundException"
#define ERR_CODE_DATA_ALREADY_ACCEPTED "DataAlreadyAcceptedException"

#define AMZN_REQUEST_ID_HEADER "x-amzn-RequestId"

Expand Down Expand Up @@ -229,23 +227,6 @@ static int init_put_payload(struct flb_cloudwatch *ctx, struct cw_flush *buf,
goto error;
}

if (stream->sequence_token) {
if (!try_to_write(buf->out_buf, offset, buf->out_buf_size,
"\"sequenceToken\":\"", 17)) {
goto error;
}

if (!try_to_write(buf->out_buf, offset, buf->out_buf_size,
stream->sequence_token, 0)) {
goto error;
}

if (!try_to_write(buf->out_buf, offset, buf->out_buf_size,
"\",", 2)) {
goto error;
}
}

if (!try_to_write(buf->out_buf, offset, buf->out_buf_size,
"\"logEvents\":[", 13)) {
goto error;
Expand Down Expand Up @@ -493,9 +474,6 @@ void reset_flush_buf(struct flb_cloudwatch *ctx, struct cw_flush *buf) {
if (buf->current_stream != NULL) {
buf->data_size += strlen(buf->current_stream->name);
buf->data_size += strlen(buf->current_stream->group);
if (buf->current_stream->sequence_token) {
buf->data_size += strlen(buf->current_stream->sequence_token);
}
}
}

Expand Down Expand Up @@ -1153,7 +1131,6 @@ static int set_log_group_retention(struct flb_cloudwatch *ctx, struct log_stream
struct flb_aws_client *cw_client;
flb_sds_t body;
flb_sds_t tmp;
flb_sds_t error;

flb_plg_info(ctx->ins, "Setting retention policy on log group %s to %dd", stream->group, ctx->log_retention_days);

Expand Down Expand Up @@ -1196,17 +1173,9 @@ static int set_log_group_retention(struct flb_cloudwatch *ctx, struct log_stream

/* Check error */
if (c->resp.payload_size > 0) {
error = flb_aws_error(c->resp.payload, c->resp.payload_size);
if (error != NULL) {
/* some other error occurred; notify user */
flb_aws_print_error(c->resp.payload, c->resp.payload_size,
"PutRetentionPolicy", ctx->ins);
flb_sds_destroy(error);
}
else {
/* error can not be parsed, print raw response to debug */
flb_plg_debug(ctx->ins, "Raw response: %s", c->resp.payload);
}
/* some error occurred; notify user */
flb_aws_print_error(c->resp.payload, c->resp.payload_size,
"PutRetentionPolicy", ctx->ins);
}
}

Expand Down Expand Up @@ -1287,8 +1256,8 @@ int create_log_group(struct flb_cloudwatch *ctx, struct log_stream *stream)
flb_sds_destroy(error);
}
else {
/* error can not be parsed, print raw response to debug */
flb_plg_debug(ctx->ins, "Raw response: %s", c->resp.payload);
/* error can not be parsed, print raw response */
flb_plg_warn(ctx->ins, "Raw response: %s", c->resp.payload);
}
}
}
Expand Down Expand Up @@ -1402,8 +1371,8 @@ int create_log_stream(struct flb_cloudwatch *ctx, struct log_stream *stream,
flb_sds_destroy(error);
}
else {
/* error can not be parsed, print raw response to debug */
flb_plg_debug(ctx->ins, "Raw response: %s", c->resp.payload);
/* error can not be parsed, print raw response */
flb_plg_warn(ctx->ins, "Raw response: %s", c->resp.payload);
}
}
}
Expand All @@ -1417,8 +1386,7 @@ int create_log_stream(struct flb_cloudwatch *ctx, struct log_stream *stream,
}

/*
* Returns -1 on failure, 0 on success, and 1 for a sequence token error,
* which means the caller can retry.
* Returns -1 on failure, 0 on success
*/
int put_log_events(struct flb_cloudwatch *ctx, struct cw_flush *buf,
struct log_stream *stream, size_t payload_size)
Expand All @@ -1427,7 +1395,6 @@ int put_log_events(struct flb_cloudwatch *ctx, struct cw_flush *buf,
struct flb_http_client *c = NULL;
struct flb_aws_client *cw_client;
flb_sds_t tmp;
flb_sds_t error;
int num_headers = 1;
int retry = FLB_TRUE;

Expand Down Expand Up @@ -1460,8 +1427,7 @@ int put_log_events(struct flb_cloudwatch *ctx, struct cw_flush *buf,
if (c->resp.data == NULL || c->resp.data_len == 0 || strstr(c->resp.data, AMZN_REQUEST_ID_HEADER) == NULL) {
/* code was 200, but response is invalid, treat as failure */
if (c->resp.data != NULL) {
flb_plg_debug(ctx->ins, "Could not find sequence token in "
"response: response body is empty: full data: `%.*s`", c->resp.data_len, c->resp.data);
flb_plg_debug(ctx->ins, "Invalid response: full data: `%.*s`", c->resp.data_len, c->resp.data);
}
flb_http_client_destroy(c);

Expand All @@ -1474,73 +1440,15 @@ int put_log_events(struct flb_cloudwatch *ctx, struct cw_flush *buf,
AMZN_REQUEST_ID_HEADER);
return -1;
}


/* success */
if (c->resp.payload_size > 0) {
flb_plg_debug(ctx->ins, "Sent events to %s", stream->name);
tmp = flb_json_get_val(c->resp.payload, c->resp.payload_size,
"nextSequenceToken");
if (tmp) {
if (stream->sequence_token != NULL) {
flb_sds_destroy(stream->sequence_token);
}
stream->sequence_token = tmp;

flb_http_client_destroy(c);
return 0;
}
else {
flb_plg_error(ctx->ins, "Could not find sequence token in "
"response: %s", c->resp.payload);
}
}

flb_http_client_destroy(c);
return 0;
}

/* Check error */
if (c->resp.payload_size > 0) {
error = flb_aws_error(c->resp.payload, c->resp.payload_size);
if (error != NULL) {
if (strcmp(error, ERR_CODE_INVALID_SEQUENCE_TOKEN) == 0) {
/*
* This case will happen when we do not know the correct
* sequence token; we can find it in the error response
* and retry.
*/
flb_plg_debug(ctx->ins, "Sequence token was invalid, "
"will retry");
tmp = flb_json_get_val(c->resp.payload, c->resp.payload_size,
"expectedSequenceToken");
if (tmp) {
if (stream->sequence_token != NULL) {
flb_sds_destroy(stream->sequence_token);
}
stream->sequence_token = tmp;
flb_sds_destroy(error);
flb_http_client_destroy(c);
/* tell the caller to retry */
return 1;
}
} else if (strcmp(error, ERR_CODE_DATA_ALREADY_ACCEPTED) == 0) {
/* not sure what causes this but it counts as success */
flb_plg_info(ctx->ins, "Got %s, a previous retry must have succeeded asychronously", ERR_CODE_DATA_ALREADY_ACCEPTED);
flb_sds_destroy(error);
flb_http_client_destroy(c);
/* success */
return 0;
}
/* some other error occurred; notify user */
flb_aws_print_error(c->resp.payload, c->resp.payload_size,
"PutLogEvents", ctx->ins);
flb_sds_destroy(error);
}
else {
/* error could not be parsed, print raw response to debug */
flb_plg_debug(ctx->ins, "Raw response: %s", c->resp.payload);
}
flb_aws_print_error(c->resp.payload, c->resp.payload_size,
"PutLogEvents", ctx->ins);
}
}

Expand Down
68 changes: 32 additions & 36 deletions plugins/out_cloudwatch_logs/cloudwatch_logs.c
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ static int cb_cloudwatch_init(struct flb_output_instance *ins,
const char *tmp;
char *session_name = NULL;
struct flb_cloudwatch *ctx = NULL;
struct cw_flush *buf = NULL;
int ret;
flb_sds_t tmp_sds = NULL;
(void) config;
Expand Down Expand Up @@ -348,50 +347,53 @@ static int cb_cloudwatch_init(struct flb_output_instance *ins,
flb_output_upstream_set(upstream, ctx->ins);
ctx->cw_client->host = ctx->endpoint;

/* alloc the payload/processing buffer */
/* Export context */
flb_output_set_context(ins, ctx);

return 0;

error:
flb_free(session_name);
flb_plg_error(ctx->ins, "Initialization failed");
flb_cloudwatch_ctx_destroy(ctx);
return -1;
}

struct cw_flush *new_buffer()
{
struct cw_flush *buf;

buf = flb_calloc(1, sizeof(struct cw_flush));
if (!buf) {
flb_errno();
goto error;
return NULL;
}

buf->out_buf = flb_malloc(PUT_LOG_EVENTS_PAYLOAD_SIZE);
if (!buf->out_buf) {
flb_errno();
cw_flush_destroy(buf);
goto error;
return NULL;
}
buf->out_buf_size = PUT_LOG_EVENTS_PAYLOAD_SIZE;

buf->tmp_buf = flb_malloc(sizeof(char) * PUT_LOG_EVENTS_PAYLOAD_SIZE);
if (!buf->tmp_buf) {
flb_errno();
cw_flush_destroy(buf);
goto error;
return NULL;
}
buf->tmp_buf_size = PUT_LOG_EVENTS_PAYLOAD_SIZE;

buf->events = flb_malloc(sizeof(struct cw_event) * MAX_EVENTS_PER_PUT);
if (!buf->events) {
flb_errno();
cw_flush_destroy(buf);
goto error;
return NULL;
}
buf->events_capacity = MAX_EVENTS_PER_PUT;

ctx->buf = buf;


/* Export context */
flb_output_set_context(ins, ctx);

return 0;

error:
flb_free(session_name);
flb_plg_error(ctx->ins, "Initialization failed");
flb_cloudwatch_ctx_destroy(ctx);
return -1;
return buf;
}

static void cb_cloudwatch_flush(struct flb_event_chunk *event_chunk,
Expand All @@ -405,15 +407,21 @@ static void cb_cloudwatch_flush(struct flb_event_chunk *event_chunk,
(void) i_ins;
(void) config;

event_count = process_and_send(ctx, i_ins->p->name, ctx->buf, event_chunk->tag,
event_chunk->data, event_chunk->size);
struct cw_flush *buf;

buf = new_buffer();
if (!buf) {
FLB_OUTPUT_RETURN(FLB_RETRY);
}

event_count = process_and_send(ctx, i_ins->p->name, buf, event_chunk->tag, event_chunk->data, event_chunk->size);
if (event_count < 0) {
flb_plg_error(ctx->ins, "Failed to send events");
cw_flush_destroy(buf);
FLB_OUTPUT_RETURN(FLB_RETRY);
}

// TODO: this msg is innaccurate if events are skipped
flb_plg_debug(ctx->ins, "Sent %d events to CloudWatch", event_count);
cw_flush_destroy(buf);

FLB_OUTPUT_RETURN(FLB_OK);
}
Expand All @@ -429,10 +437,6 @@ void flb_cloudwatch_ctx_destroy(struct flb_cloudwatch *ctx)
flb_aws_provider_destroy(ctx->base_aws_provider);
}

if (ctx->buf) {
cw_flush_destroy(ctx->buf);
}

if (ctx->aws_provider) {
flb_aws_provider_destroy(ctx->aws_provider);
}
Expand Down Expand Up @@ -496,9 +500,6 @@ void log_stream_destroy(struct log_stream *stream)
if (stream->name) {
flb_sds_destroy(stream->name);
}
if (stream->sequence_token) {
flb_sds_destroy(stream->sequence_token);
}
if (stream->group) {
flb_sds_destroy(stream->group);
}
Expand Down Expand Up @@ -657,12 +658,7 @@ struct flb_output_plugin out_cloudwatch_logs_plugin = {
.cb_init = cb_cloudwatch_init,
.cb_flush = cb_cloudwatch_flush,
.cb_exit = cb_cloudwatch_exit,

/*
* Allow cloudwatch to use async network stack synchronously by opting into
* FLB_OUTPUT_SYNCHRONOUS synchronous task scheduler
*/
.flags = FLB_OUTPUT_SYNCHRONOUS,
.flags = 0,
.workers = 1,

/* Configuration */
Expand Down
8 changes: 3 additions & 5 deletions plugins/out_cloudwatch_logs/cloudwatch_logs.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ struct cw_event {
struct log_stream {
flb_sds_t name;
flb_sds_t group;
flb_sds_t sequence_token;

/*
* log streams in CloudWatch do not expire; but our internal representations
* of them are periodically cleaned up if they have been unused for too long
Expand All @@ -87,8 +87,6 @@ struct log_stream {
struct mk_list _head;
};

void log_stream_destroy(struct log_stream *stream);

struct flb_cloudwatch {
/*
* TLS instances can not be re-used. So we have one for:
Expand Down Expand Up @@ -138,8 +136,6 @@ struct flb_cloudwatch {
/* stores log streams we're putting to */
struct mk_list streams;

/* buffers for data processing and request payload */
struct cw_flush *buf;
/* The namespace to use for the metric */
flb_sds_t metric_namespace;

Expand All @@ -155,4 +151,6 @@ struct flb_cloudwatch {

void flb_cloudwatch_ctx_destroy(struct flb_cloudwatch *ctx);

void log_stream_destroy(struct log_stream *stream);

#endif
2 changes: 2 additions & 0 deletions src/aws/flb_aws_util.c
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,8 @@ void flb_aws_print_error(char *response, size_t response_len,

error = flb_json_get_val(response, response_len, "__type");
if (!error) {
/* error can not be parsed, print raw response */
flb_plg_warn(ins, "Raw response: %s", response);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This lacks full context that it could have: we can print the api value in the raw response here, like: PutLogEvents: Raw Response:

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess based on the response, the API should be obvious? Is this true? I think if this does really add context, it'd be nice to add.

We could also follow up with it later I suppose, as long we stage it now, so we don't forget it..

What are your thoughts on how important this context is?

Copy link
Contributor Author

@matthewfala matthewfala Oct 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree this is nice to have but not essential. I'll update this for master, and include it in aws-for-fluent-bit after release of 2.32.0

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool and thanks!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return;
}

Expand Down
Loading