From 89acb65058ebb6e5ad8f0eb0663b0f47777ce942 Mon Sep 17 00:00:00 2001 From: Rob Skillington Date: Wed, 27 Nov 2024 07:20:03 +1100 Subject: [PATCH] Downgrade zstd compression to gzip if sending grpc payload Signed-off-by: Rob Skillington --- include/fluent-bit/flb_http_client.h | 1 + plugins/out_opentelemetry/opentelemetry.c | 40 ++++++++++++++++++----- src/flb_http_client.c | 12 +++++++ src/flb_http_common.c | 2 +- 4 files changed, 46 insertions(+), 9 deletions(-) diff --git a/include/fluent-bit/flb_http_client.h b/include/fluent-bit/flb_http_client.h index 243ed253da8..3de3b14ecc9 100644 --- a/include/fluent-bit/flb_http_client.h +++ b/include/fluent-bit/flb_http_client.h @@ -292,6 +292,7 @@ int flb_http_bearer_auth(struct flb_http_client *c, const char *token); int flb_http_set_keepalive(struct flb_http_client *c); int flb_http_set_content_encoding_gzip(struct flb_http_client *c); +int flb_http_set_content_encoding_zstd(struct flb_http_client *c); int flb_http_set_callback_context(struct flb_http_client *c, struct flb_callback *cb_ctx); diff --git a/plugins/out_opentelemetry/opentelemetry.c b/plugins/out_opentelemetry/opentelemetry.c index d93890d7c43..1426b853aef 100644 --- a/plugins/out_opentelemetry/opentelemetry.c +++ b/plugins/out_opentelemetry/opentelemetry.c @@ -52,6 +52,7 @@ int opentelemetry_legacy_post(struct opentelemetry_context *ctx, { size_t final_body_len; void *final_body; + const char *compression_algorithm; int compressed; int out_ret; size_t b_sent; @@ -63,6 +64,7 @@ int opentelemetry_legacy_post(struct opentelemetry_context *ctx, struct flb_config_map_val *mv; struct flb_http_client *c; + compression_algorithm = NULL; compressed = FLB_FALSE; u_conn = flb_upstream_conn_get(ctx->u); @@ -81,6 +83,7 @@ int opentelemetry_legacy_post(struct opentelemetry_context *ctx, &final_body, &final_body_len); if (ret == 0) { + compression_algorithm = "gzip"; compressed = FLB_TRUE; } else { @@ -94,6 +97,7 @@ int opentelemetry_legacy_post(struct opentelemetry_context *ctx, &final_body, &final_body_len); if (ret == 0) { + compression_algorithm = "zstd"; compressed = FLB_TRUE; } else { @@ -163,7 +167,12 @@ int opentelemetry_legacy_post(struct opentelemetry_context *ctx, } if (compressed) { - flb_http_set_content_encoding_gzip(c); + if (strncasecmp(compression_algorithm, "gzip", 4) == 0) { + flb_http_set_content_encoding_gzip(c); + } + else if (strncasecmp(compression_algorithm, "zstd", 4) == 0) { + flb_http_set_content_encoding_zstd(c); + } } ret = flb_http_do(c, &b_sent); @@ -256,7 +265,15 @@ int opentelemetry_post(struct opentelemetry_context *ctx, http_uri); } - compression_algorithm = NULL; + if (ctx->compress_gzip == FLB_TRUE) { + compression_algorithm = "gzip"; + } + else if (ctx->compress_zstd == FLB_TRUE) { + compression_algorithm = "zstd"; + } + else { + compression_algorithm = NULL; + } request = flb_http_client_request_builder( &ctx->http_client, @@ -293,6 +310,19 @@ int opentelemetry_post(struct opentelemetry_context *ctx, grpc_body_length = cfl_sds_len(grpc_body); + if (compression_algorithm != NULL) { + // If compression enabled, ensure compression is gzip otherwise we + // need to fallback to gzip. + // Today grpc only supports gzip compression. + if (strncasecmp(compression_algorithm, "gzip", 4) != 0) { + // Only gzip is supported for gRPC, fall back to gzip. + flb_plg_debug(ctx->ins, + "grpc compression '%s' unsupported, using gzip", + compression_algorithm); + compression_algorithm = "gzip"; + } + } + result = flb_http_request_set_parameters(request, FLB_HTTP_CLIENT_ARGUMENT_URI(grpc_uri), FLB_HTTP_CLIENT_ARGUMENT_CONTENT_TYPE( @@ -310,12 +340,6 @@ int opentelemetry_post(struct opentelemetry_context *ctx, } } else { - if (ctx->compress_gzip == FLB_TRUE) { - compression_algorithm = "gzip"; - } else if (ctx->compress_zstd == FLB_TRUE) { - compression_algorithm = "zstd"; - } - result = flb_http_request_set_parameters(request, FLB_HTTP_CLIENT_ARGUMENT_URI(http_uri), FLB_HTTP_CLIENT_ARGUMENT_CONTENT_TYPE( diff --git a/src/flb_http_client.c b/src/flb_http_client.c index 80a643678ee..94d8ace63e4 100644 --- a/src/flb_http_client.c +++ b/src/flb_http_client.c @@ -1068,6 +1068,18 @@ int flb_http_set_content_encoding_gzip(struct flb_http_client *c) return ret; } +/* Adds a header specifying that the payload is compressed with zstd */ +int flb_http_set_content_encoding_zstd(struct flb_http_client *c) +{ + int ret; + + ret = flb_http_add_header(c, + FLB_HTTP_HEADER_CONTENT_ENCODING, + sizeof(FLB_HTTP_HEADER_CONTENT_ENCODING) - 1, + "zstd", 4); + return ret; +} + int flb_http_set_callback_context(struct flb_http_client *c, struct flb_callback *cb_ctx) { diff --git a/src/flb_http_common.c b/src/flb_http_common.c index 83ce5e87a45..90f00bc511d 100644 --- a/src/flb_http_common.c +++ b/src/flb_http_common.c @@ -347,7 +347,7 @@ int flb_http_request_compress_body( request->body, cfl_sds_len(request->body)); } - else if (strncasecmp(content_encoding_header_value, "deflate", 4) == 0) { + else if (strncasecmp(content_encoding_header_value, "deflate", 7) == 0) { result = compress_deflate(&output_buffer, &output_size, request->body,