Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Binary WebSocket API: Brotli-compress all outgoing messages #1026

Merged
merged 4 commits into from
Mar 29, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ base64 = "0.21.2"
bitflags = "2.3.3"
bit-vec = "0.6"
blake3 = "1.5"
brotli = "3.5"
byte-unit = "4.0.18"
bytes = "1.2.1"
bytestring = { version = "1.2.0", features = ["serde"] }
Expand Down
1 change: 1 addition & 0 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ async-trait.workspace = true
backtrace.workspace = true
base64.workspace = true
blake3.workspace = true
brotli.workspace = true
bytes.workspace = true
bytestring.workspace = true
chrono.workspace = true
Expand Down
35 changes: 34 additions & 1 deletion crates/core/src/client/messages.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use base64::Engine;
use brotli::CompressorReader;
use prost::Message as _;
use spacetimedb_lib::identity::RequestId;
use std::io::Read;
use std::time::Instant;

use crate::host::module_host::{EventStatus, ModuleEvent, ProtocolDatabaseUpdate};
Expand All @@ -22,7 +24,38 @@ pub trait ServerMessage: Sized {
fn serialize(self, protocol: Protocol) -> DataMessage {
match protocol {
Protocol::Text => self.serialize_text().to_json().into(),
Protocol::Binary => self.serialize_binary().encode_to_vec().into(),
Protocol::Binary => {
let msg_bytes = self.serialize_binary().encode_to_vec();
let reader = &mut &msg_bytes[..];

// TODO(perf): Compression should depend on message size and type.
//
// SubscriptionUpdate messages will typically be quite large,
// while TransactionUpdate messages will typically be quite small.
//
// If we are optimizing for SubscriptionUpdates,
// we want a large buffer.
// But if we are optimizing for TransactionUpdates,
// we probably want to skip compression altogether.
//
// For now we choose a reasonable middle ground,
// which is to compress everything using a 32KB buffer.
const BUFFER_SIZE: usize = 32 * 1024;
// Again we are optimizing for compression speed,
// so we choose the lowest (fastest) level of compression.
gefjon marked this conversation as resolved.
Show resolved Hide resolved
const COMPRESSION_LEVEL: u32 = 1;
// The default value for an internal compression parameter.
// See `BrotliEncoderParams` for more details.
const LG_WIN: u32 = 22;

let mut encoder = CompressorReader::new(reader, BUFFER_SIZE, COMPRESSION_LEVEL, LG_WIN);

let mut out = Vec::new();
encoder
.read_to_end(&mut out)
.expect("Failed to Brotli compress `SubscriptionUpdateMessage`");
out.into()
}
}
}
fn serialize_text(self) -> MessageJson;
Expand Down
1 change: 1 addition & 0 deletions crates/sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ spacetimedb-client-api-messages.workspace = true
anyhow.workspace = true
anymap.workspace = true
base64.workspace = true
brotli.workspace = true
futures.workspace = true
futures-channel.workspace = true
home.workspace = true
Expand Down
7 changes: 5 additions & 2 deletions crates/sdk/src/websocket.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::identity::Credentials;
use anyhow::{bail, Result};
use anyhow::{bail, Context, Result};
use brotli::BrotliDecompress;
use futures::{SinkExt, StreamExt, TryStreamExt};
use futures_channel::mpsc;
use http::uri::{Scheme, Uri};
Expand Down Expand Up @@ -151,7 +152,9 @@ impl DbConnection {
}

pub(crate) fn parse_response(bytes: &[u8]) -> Result<Message> {
Ok(Message::decode(bytes)?)
let mut decompressed = Vec::new();
BrotliDecompress(&mut &bytes[..], &mut decompressed).context("Failed to Brotli decompress message")?;
Ok(Message::decode(&decompressed[..])?)
}

pub(crate) fn encode_message(msg: Message) -> WebSocketMessage {
Expand Down
Loading