Skip to content

Commit

Permalink
Binary WebSocket API: Brotli-compress all outgoing messages (#1026)
Browse files Browse the repository at this point in the history
* Binary WebSocket API: Brotli-compress all outgoing messages

* Decrease buffer size; comment on future work

Co-authored-by: joshua-spacetime <[email protected]>
Signed-off-by: Phoebe Goldman <[email protected]>

* Note experimental compression ratio

---------

Signed-off-by: Phoebe Goldman <[email protected]>
Co-authored-by: joshua-spacetime <[email protected]>
  • Loading branch information
gefjon and joshua-spacetime authored Mar 29, 2024
1 parent 01ba5cd commit 2d971f3
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 3 deletions.
38 changes: 38 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ base64 = "0.21.2"
bitflags = "2.3.3"
bit-vec = "0.6"
blake3 = "1.5"
brotli = "3.5"
byte-unit = "4.0.18"
bytes = "1.2.1"
bytestring = { version = "1.2.0", features = ["serde"] }
Expand Down
1 change: 1 addition & 0 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ async-trait.workspace = true
backtrace.workspace = true
base64.workspace = true
blake3.workspace = true
brotli.workspace = true
bytes.workspace = true
bytestring.workspace = true
chrono.workspace = true
Expand Down
37 changes: 36 additions & 1 deletion crates/core/src/client/messages.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use base64::Engine;
use brotli::CompressorReader;
use derive_more::From;
use prost::Message as _;
use spacetimedb_lib::identity::RequestId;
use std::io::Read;
use std::sync::Arc;
use std::time::Instant;

Expand All @@ -25,7 +27,40 @@ pub trait ServerMessage: Sized {
fn serialize(self, protocol: Protocol) -> DataMessage {
match protocol {
Protocol::Text => self.serialize_text().to_json().into(),
Protocol::Binary => self.serialize_binary().encode_to_vec().into(),
Protocol::Binary => {
let msg_bytes = self.serialize_binary().encode_to_vec();
let reader = &mut &msg_bytes[..];

// TODO(perf): Compression should depend on message size and type.
//
// SubscriptionUpdate messages will typically be quite large,
// while TransactionUpdate messages will typically be quite small.
//
// If we are optimizing for SubscriptionUpdates,
// we want a large buffer.
// But if we are optimizing for TransactionUpdates,
// we probably want to skip compression altogether.
//
// For now we choose a reasonable middle ground,
// which is to compress everything using a 32KB buffer.
const BUFFER_SIZE: usize = 32 * 1024;
// Again we are optimizing for compression speed,
// so we choose the lowest (fastest) level of compression.
// Experiments on internal workloads have shown compression ratios between 7:1 and 10:1
// for large `SubscriptionUpdate` messages at this level.
const COMPRESSION_LEVEL: u32 = 1;
// The default value for an internal compression parameter.
// See `BrotliEncoderParams` for more details.
const LG_WIN: u32 = 22;

let mut encoder = CompressorReader::new(reader, BUFFER_SIZE, COMPRESSION_LEVEL, LG_WIN);

let mut out = Vec::new();
encoder
.read_to_end(&mut out)
.expect("Failed to Brotli compress `SubscriptionUpdateMessage`");
out.into()
}
}
}
fn serialize_text(self) -> MessageJson;
Expand Down
1 change: 1 addition & 0 deletions crates/sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ spacetimedb-client-api-messages.workspace = true
anyhow.workspace = true
anymap.workspace = true
base64.workspace = true
brotli.workspace = true
futures.workspace = true
futures-channel.workspace = true
home.workspace = true
Expand Down
7 changes: 5 additions & 2 deletions crates/sdk/src/websocket.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::identity::Credentials;
use anyhow::{bail, Result};
use anyhow::{bail, Context, Result};
use brotli::BrotliDecompress;
use futures::{SinkExt, StreamExt, TryStreamExt};
use futures_channel::mpsc;
use http::uri::{Scheme, Uri};
Expand Down Expand Up @@ -151,7 +152,9 @@ impl DbConnection {
}

pub(crate) fn parse_response(bytes: &[u8]) -> Result<Message> {
Ok(Message::decode(bytes)?)
let mut decompressed = Vec::new();
BrotliDecompress(&mut &bytes[..], &mut decompressed).context("Failed to Brotli decompress message")?;
Ok(Message::decode(&decompressed[..])?)
}

pub(crate) fn encode_message(msg: Message) -> WebSocketMessage {
Expand Down

1 comment on commit 2d971f3

@github-actions
Copy link

@github-actions github-actions bot commented on 2d971f3 Mar 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Criterion benchmark results

Criterion benchmark report

YOU SHOULD PROBABLY IGNORE THESE RESULTS.

Criterion is a wall time based benchmarking system that is extremely noisy when run on CI. We collect these results for longitudinal analysis, but they are not reliable for comparing individual PRs.

Go look at the callgrind report instead.

empty

db on disk new latency old latency new throughput old throughput
sqlite 💿 417.7±2.13ns 419.6±2.33ns - -
sqlite 🧠 406.3±1.24ns 418.9±2.44ns - -
stdb_raw 💿 722.8±0.82ns 709.4±1.72ns - -
stdb_raw 🧠 689.4±0.94ns 683.9±1.47ns - -

insert_1

db on disk schema indices preload new latency old latency new throughput old throughput

insert_bulk

db on disk schema indices preload count new latency old latency new throughput old throughput
sqlite 💿 u32_u64_str btree_each_column 2048 256 513.5±0.88µs 511.1±0.81µs 1947 tx/sec 1956 tx/sec
sqlite 💿 u32_u64_str unique_0 2048 256 134.9±0.65µs 132.6±0.21µs 7.2 Ktx/sec 7.4 Ktx/sec
sqlite 💿 u32_u64_u64 btree_each_column 2048 256 420.3±0.63µs 417.3±0.51µs 2.3 Ktx/sec 2.3 Ktx/sec
sqlite 💿 u32_u64_u64 unique_0 2048 256 123.1±0.25µs 119.8±0.71µs 7.9 Ktx/sec 8.2 Ktx/sec
sqlite 🧠 u32_u64_str btree_each_column 2048 256 448.1±1.01µs 445.3±0.21µs 2.2 Ktx/sec 2.2 Ktx/sec
sqlite 🧠 u32_u64_str unique_0 2048 256 122.6±0.74µs 116.2±0.61µs 8.0 Ktx/sec 8.4 Ktx/sec
sqlite 🧠 u32_u64_u64 btree_each_column 2048 256 366.2±0.31µs 366.0±0.35µs 2.7 Ktx/sec 2.7 Ktx/sec
sqlite 🧠 u32_u64_u64 unique_0 2048 256 106.1±0.38µs 102.8±0.49µs 9.2 Ktx/sec 9.5 Ktx/sec
stdb_raw 💿 u32_u64_str btree_each_column 2048 256 725.2±0.46µs 717.4±0.61µs 1378 tx/sec 1393 tx/sec
stdb_raw 💿 u32_u64_str unique_0 2048 256 637.2±2.20µs 627.7±0.62µs 1569 tx/sec 1593 tx/sec
stdb_raw 💿 u32_u64_u64 btree_each_column 2048 256 400.5±0.26µs 402.8±1.20µs 2.4 Ktx/sec 2.4 Ktx/sec
stdb_raw 💿 u32_u64_u64 unique_0 2048 256 364.5±0.43µs 363.8±0.43µs 2.7 Ktx/sec 2.7 Ktx/sec
stdb_raw 🧠 u32_u64_str btree_each_column 2048 256 572.4±0.40µs 564.8±0.44µs 1747 tx/sec 1770 tx/sec
stdb_raw 🧠 u32_u64_str unique_0 2048 256 483.7±0.51µs 473.3±0.55µs 2.0 Ktx/sec 2.1 Ktx/sec
stdb_raw 🧠 u32_u64_u64 btree_each_column 2048 256 371.6±0.19µs 369.7±0.22µs 2.6 Ktx/sec 2.6 Ktx/sec
stdb_raw 🧠 u32_u64_u64 unique_0 2048 256 333.5±0.43µs 330.4±0.35µs 2.9 Ktx/sec 3.0 Ktx/sec

iterate

db on disk schema indices new latency old latency new throughput old throughput
sqlite 💿 u32_u64_str unique_0 20.9±0.25µs 19.8±0.20µs 46.6 Ktx/sec 49.4 Ktx/sec
sqlite 💿 u32_u64_u64 unique_0 19.1±0.16µs 18.2±0.03µs 51.2 Ktx/sec 53.8 Ktx/sec
sqlite 🧠 u32_u64_str unique_0 20.2±0.14µs 18.1±0.26µs 48.3 Ktx/sec 53.9 Ktx/sec
sqlite 🧠 u32_u64_u64 unique_0 18.0±0.30µs 17.0±0.04µs 54.4 Ktx/sec 57.6 Ktx/sec
stdb_raw 💿 u32_u64_str unique_0 17.7±0.00µs 18.7±0.00µs 55.2 Ktx/sec 52.3 Ktx/sec
stdb_raw 💿 u32_u64_u64 unique_0 14.8±0.00µs 15.8±0.00µs 65.8 Ktx/sec 61.7 Ktx/sec
stdb_raw 🧠 u32_u64_str unique_0 17.7±0.01µs 18.6±0.00µs 55.3 Ktx/sec 52.4 Ktx/sec
stdb_raw 🧠 u32_u64_u64 unique_0 14.8±0.00µs 15.8±0.00µs 65.9 Ktx/sec 61.8 Ktx/sec

find_unique

db on disk key type preload new latency old latency new throughput old throughput

filter

db on disk key type index strategy load count new latency old latency new throughput old throughput
sqlite 💿 string index 2048 256 65.6±0.22µs 64.5±0.24µs 14.9 Ktx/sec 15.1 Ktx/sec
sqlite 💿 u64 index 2048 256 63.3±0.26µs 63.1±0.14µs 15.4 Ktx/sec 15.5 Ktx/sec
sqlite 🧠 string index 2048 256 63.0±0.24µs 62.7±0.57µs 15.5 Ktx/sec 15.6 Ktx/sec
sqlite 🧠 u64 index 2048 256 58.7±0.13µs 58.8±0.14µs 16.6 Ktx/sec 16.6 Ktx/sec
stdb_raw 💿 string index 2048 256 5.7±0.00µs 5.6±0.00µs 171.1 Ktx/sec 175.2 Ktx/sec
stdb_raw 💿 u64 index 2048 256 5.5±0.00µs 5.6±0.00µs 177.6 Ktx/sec 175.9 Ktx/sec
stdb_raw 🧠 string index 2048 256 5.7±0.00µs 5.6±0.00µs 171.6 Ktx/sec 175.9 Ktx/sec
stdb_raw 🧠 u64 index 2048 256 5.5±0.00µs 5.5±0.00µs 178.6 Ktx/sec 177.0 Ktx/sec

serialize

schema format count new latency old latency new throughput old throughput
u32_u64_str bflatn_to_bsatn_fast_path 100 4.0±0.05µs 4.0±0.01µs 24.1 Mtx/sec 23.9 Mtx/sec
u32_u64_str bflatn_to_bsatn_slow_path 100 3.7±0.01µs 3.7±0.05µs 25.7 Mtx/sec 25.6 Mtx/sec
u32_u64_str bsatn 100 2.6±0.07µs 2.5±0.00µs 36.5 Mtx/sec 38.6 Mtx/sec
u32_u64_str json 100 4.9±0.03µs 5.1±0.07µs 19.3 Mtx/sec 18.8 Mtx/sec
u32_u64_str product_value 100 649.4±1.11ns 656.1±1.18ns 146.9 Mtx/sec 145.4 Mtx/sec
u32_u64_u64 bflatn_to_bsatn_fast_path 100 1330.0±5.11ns 1344.1±3.73ns 71.7 Mtx/sec 71.0 Mtx/sec
u32_u64_u64 bflatn_to_bsatn_slow_path 100 2.9±0.00µs 3.0±0.10µs 32.5 Mtx/sec 32.2 Mtx/sec
u32_u64_u64 bsatn 100 1829.7±48.76ns 1734.4±31.81ns 52.1 Mtx/sec 55.0 Mtx/sec
u32_u64_u64 json 100 3.3±0.05µs 3.4±0.04µs 28.5 Mtx/sec 27.9 Mtx/sec
u32_u64_u64 product_value 100 632.9±0.25ns 611.6±0.44ns 150.7 Mtx/sec 155.9 Mtx/sec
u64_u64_u32 bflatn_to_bsatn_fast_path 100 1088.1±13.15ns 1111.5±14.92ns 87.6 Mtx/sec 85.8 Mtx/sec
u64_u64_u32 bflatn_to_bsatn_slow_path 100 3.0±0.00µs 2.9±0.01µs 32.3 Mtx/sec 32.6 Mtx/sec
u64_u64_u32 bsatn 100 1838.4±22.79ns 1737.8±34.83ns 51.9 Mtx/sec 54.9 Mtx/sec
u64_u64_u32 json 100 3.2±0.06µs 3.5±0.02µs 29.7 Mtx/sec 27.1 Mtx/sec
u64_u64_u32 product_value 100 602.2±12.78ns 604.9±0.78ns 158.4 Mtx/sec 157.7 Mtx/sec

stdb_module_large_arguments

arg size new latency old latency new throughput old throughput
64KiB 77.2±5.62µs 89.3±3.21µs - -

stdb_module_print_bulk

line count new latency old latency new throughput old throughput
1 38.3±6.13µs 43.9±6.16µs - -
100 347.2±6.35µs 349.2±5.57µs - -
1000 2.9±0.02ms 1868.1±22.13µs - -

remaining

name new latency old latency new throughput old throughput
sqlite/💿/update_bulk/u32_u64_str/unique_0/load=2048/count=256 46.9±0.37µs 46.0±0.12µs 20.8 Ktx/sec 21.2 Ktx/sec
sqlite/💿/update_bulk/u32_u64_u64/unique_0/load=2048/count=256 40.3±0.18µs 40.6±0.10µs 24.2 Ktx/sec 24.1 Ktx/sec
sqlite/🧠/update_bulk/u32_u64_str/unique_0/load=2048/count=256 39.9±0.35µs 38.4±0.07µs 24.5 Ktx/sec 25.4 Ktx/sec
sqlite/🧠/update_bulk/u32_u64_u64/unique_0/load=2048/count=256 35.1±0.08µs 35.1±0.14µs 27.8 Ktx/sec 27.8 Ktx/sec
stdb_module/💿/update_bulk/u32_u64_str/unique_0/load=2048/count=256 2.9±0.01ms 2.8±0.04ms 346 tx/sec 354 tx/sec
stdb_module/💿/update_bulk/u32_u64_u64/unique_0/load=2048/count=256 1895.9±13.68µs 1823.0±1.58µs 527 tx/sec 548 tx/sec
stdb_raw/💿/update_bulk/u32_u64_str/unique_0/load=2048/count=256 1120.2±1.06µs 1102.4±0.72µs 892 tx/sec 907 tx/sec
stdb_raw/💿/update_bulk/u32_u64_u64/unique_0/load=2048/count=256 723.6±0.58µs 723.0±0.44µs 1381 tx/sec 1383 tx/sec
stdb_raw/🧠/update_bulk/u32_u64_str/unique_0/load=2048/count=256 931.1±0.39µs 911.4±0.88µs 1073 tx/sec 1097 tx/sec
stdb_raw/🧠/update_bulk/u32_u64_u64/unique_0/load=2048/count=256 662.7±1.07µs 663.8±0.48µs 1508 tx/sec 1506 tx/sec

Please sign in to comment.