diff --git a/Cargo.lock b/Cargo.lock index 0ae2c03cc0e..6cb191965cc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -49,6 +49,21 @@ dependencies = [ "memchr", ] +[[package]] +name = "alloc-no-stdlib" +version = "2.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc7bb162ec39d46ab1ca8c77bf72e890535becd1751bb45f64c597edb4c8c6b3" + +[[package]] +name = "alloc-stdlib" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94fb8275041c72129eb51b7d0322c29b8387a0386127718b096429201a5d6ece" +dependencies = [ + "alloc-no-stdlib", +] + [[package]] name = "allocator-api2" version = "0.2.16" @@ -451,6 +466,27 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "brotli" +version = "3.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d640d25bc63c50fb1f0b545ffd80207d2e10a4c965530809b40ba3386825c391" +dependencies = [ + "alloc-no-stdlib", + "alloc-stdlib", + "brotli-decompressor", +] + +[[package]] +name = "brotli-decompressor" +version = "2.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4e2e4afe60d7dd600fdd3de8d0f08c2b7ec039712e3b6137ff98b7004e82de4f" +dependencies = [ + "alloc-no-stdlib", + "alloc-stdlib", +] + [[package]] name = "bumpalo" version = "3.14.0" @@ -4446,6 +4482,7 @@ dependencies = [ "backtrace", "base64 0.21.4", "blake3", + "brotli", "bytes", "bytestring", "chrono", @@ -4621,6 +4658,7 @@ dependencies = [ "anyhow", "anymap", "base64 0.21.4", + "brotli", "cursive", "futures", "futures-channel", diff --git a/Cargo.toml b/Cargo.toml index 7caba831b27..7fabb3ec520 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -106,6 +106,7 @@ base64 = "0.21.2" bitflags = "2.3.3" bit-vec = "0.6" blake3 = "1.5" +brotli = "3.5" byte-unit = "4.0.18" bytes = "1.2.1" bytestring = { version = "1.2.0", features = ["serde"] } diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 62c3d8963ad..3cff8fcc3f3 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -32,6 +32,7 @@ async-trait.workspace = true backtrace.workspace = true base64.workspace = true blake3.workspace = true +brotli.workspace = true bytes.workspace = true bytestring.workspace = true chrono.workspace = true diff --git a/crates/core/src/client/messages.rs b/crates/core/src/client/messages.rs index fc49bc75b6a..5563a2f53a1 100644 --- a/crates/core/src/client/messages.rs +++ b/crates/core/src/client/messages.rs @@ -1,7 +1,9 @@ use base64::Engine; +use brotli::CompressorReader; use derive_more::From; use prost::Message as _; use spacetimedb_lib::identity::RequestId; +use std::io::Read; use std::sync::Arc; use std::time::Instant; @@ -25,7 +27,40 @@ pub trait ServerMessage: Sized { fn serialize(self, protocol: Protocol) -> DataMessage { match protocol { Protocol::Text => self.serialize_text().to_json().into(), - Protocol::Binary => self.serialize_binary().encode_to_vec().into(), + Protocol::Binary => { + let msg_bytes = self.serialize_binary().encode_to_vec(); + let reader = &mut &msg_bytes[..]; + + // TODO(perf): Compression should depend on message size and type. + // + // SubscriptionUpdate messages will typically be quite large, + // while TransactionUpdate messages will typically be quite small. + // + // If we are optimizing for SubscriptionUpdates, + // we want a large buffer. + // But if we are optimizing for TransactionUpdates, + // we probably want to skip compression altogether. + // + // For now we choose a reasonable middle ground, + // which is to compress everything using a 32KB buffer. + const BUFFER_SIZE: usize = 32 * 1024; + // Again we are optimizing for compression speed, + // so we choose the lowest (fastest) level of compression. + // Experiments on internal workloads have shown compression ratios between 7:1 and 10:1 + // for large `SubscriptionUpdate` messages at this level. + const COMPRESSION_LEVEL: u32 = 1; + // The default value for an internal compression parameter. + // See `BrotliEncoderParams` for more details. + const LG_WIN: u32 = 22; + + let mut encoder = CompressorReader::new(reader, BUFFER_SIZE, COMPRESSION_LEVEL, LG_WIN); + + let mut out = Vec::new(); + encoder + .read_to_end(&mut out) + .expect("Failed to Brotli compress `SubscriptionUpdateMessage`"); + out.into() + } } } fn serialize_text(self) -> MessageJson; diff --git a/crates/sdk/Cargo.toml b/crates/sdk/Cargo.toml index dab6fca54cc..994a19bff55 100644 --- a/crates/sdk/Cargo.toml +++ b/crates/sdk/Cargo.toml @@ -15,6 +15,7 @@ spacetimedb-client-api-messages.workspace = true anyhow.workspace = true anymap.workspace = true base64.workspace = true +brotli.workspace = true futures.workspace = true futures-channel.workspace = true home.workspace = true diff --git a/crates/sdk/src/websocket.rs b/crates/sdk/src/websocket.rs index c112d53443c..61c536d7cb4 100644 --- a/crates/sdk/src/websocket.rs +++ b/crates/sdk/src/websocket.rs @@ -1,5 +1,6 @@ use crate::identity::Credentials; -use anyhow::{bail, Result}; +use anyhow::{bail, Context, Result}; +use brotli::BrotliDecompress; use futures::{SinkExt, StreamExt, TryStreamExt}; use futures_channel::mpsc; use http::uri::{Scheme, Uri}; @@ -151,7 +152,9 @@ impl DbConnection { } pub(crate) fn parse_response(bytes: &[u8]) -> Result { - Ok(Message::decode(bytes)?) + let mut decompressed = Vec::new(); + BrotliDecompress(&mut &bytes[..], &mut decompressed).context("Failed to Brotli decompress message")?; + Ok(Message::decode(&decompressed[..])?) } pub(crate) fn encode_message(msg: Message) -> WebSocketMessage {