diff --git a/changelog.d/20032_gzip_zlib_performance.fix.md b/changelog.d/20032_gzip_zlib_performance.fix.md new file mode 100644 index 0000000000000..ef58f4384c7d7 --- /dev/null +++ b/changelog.d/20032_gzip_zlib_performance.fix.md @@ -0,0 +1,3 @@ +Fixed gzip and zlib compression performance degradation introduced in v0.34.0. + +authors: Hexta diff --git a/src/sinks/util/compressor.rs b/src/sinks/util/compressor.rs index f6d3c01ccca87..8eeed45c9a1d8 100644 --- a/src/sinks/util/compressor.rs +++ b/src/sinks/util/compressor.rs @@ -1,14 +1,19 @@ -use std::io; +use std::{io, io::BufWriter}; use bytes::{BufMut, BytesMut}; use flate2::write::{GzEncoder, ZlibEncoder}; use super::{snappy::SnappyEncoder, zstd::ZstdEncoder, Compression}; +const GZIP_INPUT_BUFFER_CAPACITY: usize = 4_096; +const ZLIB_INPUT_BUFFER_CAPACITY: usize = 4_096; + +const OUTPUT_BUFFER_CAPACITY: usize = 1_024; + enum Writer { Plain(bytes::buf::Writer), - Gzip(GzEncoder>), - Zlib(ZlibEncoder>), + Gzip(BufWriter>>), + Zlib(BufWriter>>), Zstd(ZstdEncoder>), Snappy(SnappyEncoder>), } @@ -17,21 +22,69 @@ impl Writer { pub fn get_ref(&self) -> &BytesMut { match self { Writer::Plain(inner) => inner.get_ref(), - Writer::Gzip(inner) => inner.get_ref().get_ref(), - Writer::Zlib(inner) => inner.get_ref().get_ref(), + Writer::Gzip(inner) => inner.get_ref().get_ref().get_ref(), + Writer::Zlib(inner) => inner.get_ref().get_ref().get_ref(), Writer::Zstd(inner) => inner.get_ref().get_ref(), Writer::Snappy(inner) => inner.get_ref().get_ref(), } } + + pub fn into_inner(self) -> BytesMut { + match self { + Writer::Plain(writer) => writer, + Writer::Gzip(writer) => writer + .into_inner() + .expect("BufWriter writer should not fail to finish") + .finish() + .expect("gzip writer should not fail to finish"), + Writer::Zlib(writer) => writer + .into_inner() + .expect("BufWriter writer should not fail to finish") + .finish() + .expect("zlib writer should not fail to finish"), + Writer::Zstd(writer) => writer + .finish() + .expect("zstd writer should not fail to finish"), + Writer::Snappy(writer) => writer + .finish() + .expect("snappy writer should not fail to finish"), + } + .into_inner() + } + + pub fn finish(self) -> io::Result { + let buf = match self { + Writer::Plain(writer) => writer, + Writer::Gzip(writer) => writer.into_inner()?.finish()?, + Writer::Zlib(writer) => writer.into_inner()?.finish()?, + Writer::Zstd(writer) => writer.finish()?, + Writer::Snappy(writer) => writer.finish()?, + } + .into_inner(); + + Ok(buf) + } } impl From for Writer { fn from(compression: Compression) -> Self { - let writer = BytesMut::with_capacity(1_024).writer(); + let writer = BytesMut::with_capacity(OUTPUT_BUFFER_CAPACITY).writer(); match compression { Compression::None => Writer::Plain(writer), - Compression::Gzip(level) => Writer::Gzip(GzEncoder::new(writer, level.as_flate2())), - Compression::Zlib(level) => Writer::Zlib(ZlibEncoder::new(writer, level.as_flate2())), + // Buffering writes to the underlying Encoder writer + // to avoid Vec-trashing and expensive memset syscalls. + // https://github.com/rust-lang/flate2-rs/issues/395#issuecomment-1975088152 + Compression::Gzip(level) => Writer::Gzip(BufWriter::with_capacity( + GZIP_INPUT_BUFFER_CAPACITY, + GzEncoder::new(writer, level.as_flate2()), + )), + // Buffering writes to the underlying Encoder writer + // to avoid Vec-trashing and expensive memset syscalls. + // https://github.com/rust-lang/flate2-rs/issues/395#issuecomment-1975088152 + Compression::Zlib(level) => Writer::Zlib(BufWriter::with_capacity( + ZLIB_INPUT_BUFFER_CAPACITY, + ZlibEncoder::new(writer, level.as_flate2()), + )), Compression::Zstd(level) => { let encoder = ZstdEncoder::new(writer, level.into()) .expect("Zstd encoder should not fail on init."); @@ -98,16 +151,7 @@ impl Compressor { /// If the compressor encounters an I/O error while finalizing the payload, an error /// variant will be returned. pub fn finish(self) -> io::Result { - let buf = match self.inner { - Writer::Plain(writer) => writer, - Writer::Gzip(writer) => writer.finish()?, - Writer::Zlib(writer) => writer.finish()?, - Writer::Zstd(writer) => writer.finish()?, - Writer::Snappy(writer) => writer.finish()?, - } - .into_inner(); - - Ok(buf) + self.inner.finish() } /// Consumes the compressor, returning the internal buffer used by the compressor. @@ -120,22 +164,7 @@ impl Compressor { /// /// Consider using `finish` if catching these scenarios is important. pub fn into_inner(self) -> BytesMut { - match self.inner { - Writer::Plain(writer) => writer, - Writer::Gzip(writer) => writer - .finish() - .expect("gzip writer should not fail to finish"), - Writer::Zlib(writer) => writer - .finish() - .expect("zlib writer should not fail to finish"), - Writer::Zstd(writer) => writer - .finish() - .expect("zstd writer should not fail to finish"), - Writer::Snappy(writer) => writer - .finish() - .expect("snappy writer should not fail to finish"), - } - .into_inner() + self.inner.into_inner() } }