Skip to content

Commit

Permalink
Binary WebSocket API: Brotli-compress all outgoing messages (#1026)
Browse files Browse the repository at this point in the history
* Binary WebSocket API: Brotli-compress all outgoing messages

* Decrease buffer size; comment on future work

Co-authored-by: joshua-spacetime <[email protected]>
Signed-off-by: Phoebe Goldman <[email protected]>

* Note experimental compression ratio

---------

Signed-off-by: Phoebe Goldman <[email protected]>
Co-authored-by: joshua-spacetime <[email protected]>
  • Loading branch information
2 people authored and RReverser committed Apr 17, 2024
1 parent 2703c1f commit 02dcab1
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 3 deletions.
38 changes: 38 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ base64 = "0.21.2"
bitflags = "2.3.3"
bit-vec = "0.6"
blake3 = "1.5"
brotli = "3.5"
byte-unit = "4.0.18"
bytes = "1.2.1"
bytestring = { version = "1.2.0", features = ["serde"] }
Expand Down
1 change: 1 addition & 0 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ async-trait.workspace = true
backtrace.workspace = true
base64.workspace = true
blake3.workspace = true
brotli.workspace = true
bytes.workspace = true
bytestring.workspace = true
chrono.workspace = true
Expand Down
37 changes: 36 additions & 1 deletion crates/core/src/client/messages.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use base64::Engine;
use brotli::CompressorReader;
use prost::Message as _;
use spacetimedb_lib::identity::RequestId;
use std::io::Read;
use std::time::Instant;

use crate::host::module_host::{DatabaseUpdate, EventStatus, ModuleEvent};
Expand All @@ -22,7 +24,40 @@ pub trait ServerMessage: Sized {
fn serialize(self, protocol: Protocol) -> DataMessage {
match protocol {
Protocol::Text => self.serialize_text().to_json().into(),
Protocol::Binary => self.serialize_binary().encode_to_vec().into(),
Protocol::Binary => {
let msg_bytes = self.serialize_binary().encode_to_vec();
let reader = &mut &msg_bytes[..];

// TODO(perf): Compression should depend on message size and type.
//
// SubscriptionUpdate messages will typically be quite large,
// while TransactionUpdate messages will typically be quite small.
//
// If we are optimizing for SubscriptionUpdates,
// we want a large buffer.
// But if we are optimizing for TransactionUpdates,
// we probably want to skip compression altogether.
//
// For now we choose a reasonable middle ground,
// which is to compress everything using a 32KB buffer.
const BUFFER_SIZE: usize = 32 * 1024;
// Again we are optimizing for compression speed,
// so we choose the lowest (fastest) level of compression.
// Experiments on internal workloads have shown compression ratios between 7:1 and 10:1
// for large `SubscriptionUpdate` messages at this level.
const COMPRESSION_LEVEL: u32 = 1;
// The default value for an internal compression parameter.
// See `BrotliEncoderParams` for more details.
const LG_WIN: u32 = 22;

let mut encoder = CompressorReader::new(reader, BUFFER_SIZE, COMPRESSION_LEVEL, LG_WIN);

let mut out = Vec::new();
encoder
.read_to_end(&mut out)
.expect("Failed to Brotli compress `SubscriptionUpdateMessage`");
out.into()
}
}
}
fn serialize_text(self) -> MessageJson;
Expand Down
1 change: 1 addition & 0 deletions crates/sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ spacetimedb-client-api-messages.workspace = true
anyhow.workspace = true
anymap.workspace = true
base64.workspace = true
brotli.workspace = true
futures.workspace = true
futures-channel.workspace = true
home.workspace = true
Expand Down
7 changes: 5 additions & 2 deletions crates/sdk/src/websocket.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::identity::Credentials;
use anyhow::{bail, Result};
use anyhow::{bail, Context, Result};
use brotli::BrotliDecompress;
use futures::{SinkExt, StreamExt, TryStreamExt};
use futures_channel::mpsc;
use http::uri::{Scheme, Uri};
Expand Down Expand Up @@ -151,7 +152,9 @@ impl DbConnection {
}

pub(crate) fn parse_response(bytes: &[u8]) -> Result<Message> {
Ok(Message::decode(bytes)?)
let mut decompressed = Vec::new();
BrotliDecompress(&mut &bytes[..], &mut decompressed).context("Failed to Brotli decompress message")?;
Ok(Message::decode(&decompressed[..])?)
}

pub(crate) fn encode_message(msg: Message) -> WebSocketMessage {
Expand Down

0 comments on commit 02dcab1

Please sign in to comment.