From d07a435a3ef0002919a3d2d140843b998f9689d3 Mon Sep 17 00:00:00 2001 From: Artur Malchanau Date: Fri, 8 Mar 2024 17:25:33 +0100 Subject: [PATCH] fix(compression): Fix gzip and zlib performance degradation (#20032) * fix(compression): Fix gzip and zlib performance degradation Fix gzip and zlib performance degradation caused by: * flate2 v1.0.28 started to resize its input buffer up to its capacity and back to the actual number of bytes written. * Some sinks are writing to Compressor without buffering, resulting in frequent small writes to the flate2 writer. Within 32KB of input buffer in flate2, this causes an excessive number of memset operations and degraded sink throughput. This fix introduces a wrapper buffer in front of gzip and zlib writers to accumulate data before calling the write function of the underlying writer. Signed-off-by: Artur Malchanau * Add a link to the comment with more context. --------- Signed-off-by: Artur Malchanau --- .../20032_gzip_zlib_performance.fix.md | 3 + src/sinks/util/compressor.rs | 97 ++++++++++++------- 2 files changed, 66 insertions(+), 34 deletions(-) create mode 100644 changelog.d/20032_gzip_zlib_performance.fix.md diff --git a/changelog.d/20032_gzip_zlib_performance.fix.md b/changelog.d/20032_gzip_zlib_performance.fix.md new file mode 100644 index 0000000000000..ef58f4384c7d7 --- /dev/null +++ b/changelog.d/20032_gzip_zlib_performance.fix.md @@ -0,0 +1,3 @@ +Fixed gzip and zlib compression performance degradation introduced in v0.34.0. + +authors: Hexta diff --git a/src/sinks/util/compressor.rs b/src/sinks/util/compressor.rs index f6d3c01ccca87..8eeed45c9a1d8 100644 --- a/src/sinks/util/compressor.rs +++ b/src/sinks/util/compressor.rs @@ -1,14 +1,19 @@ -use std::io; +use std::{io, io::BufWriter}; use bytes::{BufMut, BytesMut}; use flate2::write::{GzEncoder, ZlibEncoder}; use super::{snappy::SnappyEncoder, zstd::ZstdEncoder, Compression}; +const GZIP_INPUT_BUFFER_CAPACITY: usize = 4_096; +const ZLIB_INPUT_BUFFER_CAPACITY: usize = 4_096; + +const OUTPUT_BUFFER_CAPACITY: usize = 1_024; + enum Writer { Plain(bytes::buf::Writer), - Gzip(GzEncoder>), - Zlib(ZlibEncoder>), + Gzip(BufWriter>>), + Zlib(BufWriter>>), Zstd(ZstdEncoder>), Snappy(SnappyEncoder>), } @@ -17,21 +22,69 @@ impl Writer { pub fn get_ref(&self) -> &BytesMut { match self { Writer::Plain(inner) => inner.get_ref(), - Writer::Gzip(inner) => inner.get_ref().get_ref(), - Writer::Zlib(inner) => inner.get_ref().get_ref(), + Writer::Gzip(inner) => inner.get_ref().get_ref().get_ref(), + Writer::Zlib(inner) => inner.get_ref().get_ref().get_ref(), Writer::Zstd(inner) => inner.get_ref().get_ref(), Writer::Snappy(inner) => inner.get_ref().get_ref(), } } + + pub fn into_inner(self) -> BytesMut { + match self { + Writer::Plain(writer) => writer, + Writer::Gzip(writer) => writer + .into_inner() + .expect("BufWriter writer should not fail to finish") + .finish() + .expect("gzip writer should not fail to finish"), + Writer::Zlib(writer) => writer + .into_inner() + .expect("BufWriter writer should not fail to finish") + .finish() + .expect("zlib writer should not fail to finish"), + Writer::Zstd(writer) => writer + .finish() + .expect("zstd writer should not fail to finish"), + Writer::Snappy(writer) => writer + .finish() + .expect("snappy writer should not fail to finish"), + } + .into_inner() + } + + pub fn finish(self) -> io::Result { + let buf = match self { + Writer::Plain(writer) => writer, + Writer::Gzip(writer) => writer.into_inner()?.finish()?, + Writer::Zlib(writer) => writer.into_inner()?.finish()?, + Writer::Zstd(writer) => writer.finish()?, + Writer::Snappy(writer) => writer.finish()?, + } + .into_inner(); + + Ok(buf) + } } impl From for Writer { fn from(compression: Compression) -> Self { - let writer = BytesMut::with_capacity(1_024).writer(); + let writer = BytesMut::with_capacity(OUTPUT_BUFFER_CAPACITY).writer(); match compression { Compression::None => Writer::Plain(writer), - Compression::Gzip(level) => Writer::Gzip(GzEncoder::new(writer, level.as_flate2())), - Compression::Zlib(level) => Writer::Zlib(ZlibEncoder::new(writer, level.as_flate2())), + // Buffering writes to the underlying Encoder writer + // to avoid Vec-trashing and expensive memset syscalls. + // https://github.com/rust-lang/flate2-rs/issues/395#issuecomment-1975088152 + Compression::Gzip(level) => Writer::Gzip(BufWriter::with_capacity( + GZIP_INPUT_BUFFER_CAPACITY, + GzEncoder::new(writer, level.as_flate2()), + )), + // Buffering writes to the underlying Encoder writer + // to avoid Vec-trashing and expensive memset syscalls. + // https://github.com/rust-lang/flate2-rs/issues/395#issuecomment-1975088152 + Compression::Zlib(level) => Writer::Zlib(BufWriter::with_capacity( + ZLIB_INPUT_BUFFER_CAPACITY, + ZlibEncoder::new(writer, level.as_flate2()), + )), Compression::Zstd(level) => { let encoder = ZstdEncoder::new(writer, level.into()) .expect("Zstd encoder should not fail on init."); @@ -98,16 +151,7 @@ impl Compressor { /// If the compressor encounters an I/O error while finalizing the payload, an error /// variant will be returned. pub fn finish(self) -> io::Result { - let buf = match self.inner { - Writer::Plain(writer) => writer, - Writer::Gzip(writer) => writer.finish()?, - Writer::Zlib(writer) => writer.finish()?, - Writer::Zstd(writer) => writer.finish()?, - Writer::Snappy(writer) => writer.finish()?, - } - .into_inner(); - - Ok(buf) + self.inner.finish() } /// Consumes the compressor, returning the internal buffer used by the compressor. @@ -120,22 +164,7 @@ impl Compressor { /// /// Consider using `finish` if catching these scenarios is important. pub fn into_inner(self) -> BytesMut { - match self.inner { - Writer::Plain(writer) => writer, - Writer::Gzip(writer) => writer - .finish() - .expect("gzip writer should not fail to finish"), - Writer::Zlib(writer) => writer - .finish() - .expect("zlib writer should not fail to finish"), - Writer::Zstd(writer) => writer - .finish() - .expect("zstd writer should not fail to finish"), - Writer::Snappy(writer) => writer - .finish() - .expect("snappy writer should not fail to finish"), - } - .into_inner() + self.inner.into_inner() } }