Skip to content

Commit

Permalink
safekeeper: use protobuf for sending compressed records to pageserver (
Browse files Browse the repository at this point in the history
…#9821)

## Problem

#9746 lifted decoding and
interpretation of WAL to the safekeeper.
This reduced the ingested amount on the pageservers by around 10x for a
tenant with 8 shards, but doubled
the ingested amount for single sharded tenants.

Also, #9746 uses bincode which
doesn't support schema evolution.
Technically the schema can be evolved, but it's very cumbersome.

## Summary of changes

This patch set addresses both problems by adding protobuf support for
the interpreted wal records and adding compression support. Compressed
protobuf reduced the ingested amount by 100x on the 32 shards
`test_sharded_ingest` case (compared to non-interpreted proto). For the
1 shard case the reduction is 5x.

Sister change to `rust-postgres` is
[here](neondatabase/rust-postgres#33).

## Links

Related: #9336
Epic: #9329
  • Loading branch information
VladLazar authored Nov 27, 2024
1 parent 7b41ee8 commit 9e0148d
Show file tree
Hide file tree
Showing 21 changed files with 702 additions and 106 deletions.
14 changes: 10 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions libs/pageserver_api/src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,18 @@ impl Key {
}
}

impl CompactKey {
pub fn raw(&self) -> i128 {
self.0
}
}

impl From<i128> for CompactKey {
fn from(value: i128) -> Self {
Self(value)
}
}

impl fmt::Display for Key {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
Expand Down
4 changes: 0 additions & 4 deletions libs/pq_proto/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -688,9 +688,6 @@ pub struct InterpretedWalRecordsBody<'a> {
pub streaming_lsn: u64,
/// Current end of WAL on the server
pub commit_lsn: u64,
/// Start LSN of the next record in PG WAL.
/// Is 0 if the portion of PG WAL did not contain any records.
pub next_record_lsn: u64,
pub data: &'a [u8],
}

Expand Down Expand Up @@ -1028,7 +1025,6 @@ impl BeMessage<'_> {
// dependency
buf.put_u64(rec.streaming_lsn);
buf.put_u64(rec.commit_lsn);
buf.put_u64(rec.next_record_lsn);
buf.put_slice(rec.data);
});
}
Expand Down
54 changes: 24 additions & 30 deletions libs/utils/src/postgres_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,40 +7,31 @@ use postgres_connection::{parse_host_port, PgConnectionConfig};

use crate::id::TenantTimelineId;

/// Postgres client protocol types
#[derive(
Copy,
Clone,
PartialEq,
Eq,
strum_macros::EnumString,
strum_macros::Display,
serde_with::DeserializeFromStr,
serde_with::SerializeDisplay,
Debug,
)]
#[strum(serialize_all = "kebab-case")]
#[repr(u8)]
#[derive(Copy, Clone, PartialEq, Eq, Debug, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "kebab-case")]
pub enum InterpretedFormat {
Bincode,
Protobuf,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "kebab-case")]
pub enum Compression {
Zstd { level: i8 },
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(tag = "type", content = "args")]
#[serde(rename_all = "kebab-case")]
pub enum PostgresClientProtocol {
/// Usual Postgres replication protocol
Vanilla,
/// Custom shard-aware protocol that replicates interpreted records.
/// Used to send wal from safekeeper to pageserver.
Interpreted,
}

impl TryFrom<u8> for PostgresClientProtocol {
type Error = u8;

fn try_from(value: u8) -> Result<Self, Self::Error> {
Ok(match value {
v if v == (PostgresClientProtocol::Vanilla as u8) => PostgresClientProtocol::Vanilla,
v if v == (PostgresClientProtocol::Interpreted as u8) => {
PostgresClientProtocol::Interpreted
}
x => return Err(x),
})
}
Interpreted {
format: InterpretedFormat,
compression: Option<Compression>,
},
}

pub struct ConnectionConfigArgs<'a> {
Expand All @@ -63,7 +54,10 @@ impl<'a> ConnectionConfigArgs<'a> {
"-c".to_owned(),
format!("timeline_id={}", self.ttid.timeline_id),
format!("tenant_id={}", self.ttid.tenant_id),
format!("protocol={}", self.protocol as u8),
format!(
"protocol={}",
serde_json::to_string(&self.protocol).unwrap()
),
];

if self.shard_number.is_some() {
Expand Down
8 changes: 8 additions & 0 deletions libs/wal_decoder/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,19 @@ license.workspace = true
testing = ["pageserver_api/testing"]

[dependencies]
async-compression.workspace = true
anyhow.workspace = true
bytes.workspace = true
pageserver_api.workspace = true
prost.workspace = true
postgres_ffi.workspace = true
serde.workspace = true
thiserror.workspace = true
tokio = { workspace = true, features = ["io-util"] }
tonic.workspace = true
tracing.workspace = true
utils.workspace = true
workspace_hack = { version = "0.1", path = "../../workspace_hack" }

[build-dependencies]
tonic-build.workspace = true
11 changes: 11 additions & 0 deletions libs/wal_decoder/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
fn main() -> Result<(), Box<dyn std::error::Error>> {
// Generate rust code from .proto protobuf.
//
// Note: we previously tried to use deterministic location at proto/ for
// easy location, but apparently interference with cachepot sometimes fails
// the build then. Anyway, per cargo docs build script shouldn't output to
// anywhere but $OUT_DIR.
tonic_build::compile_protos("proto/interpreted_wal.proto")
.unwrap_or_else(|e| panic!("failed to compile protos {:?}", e));
Ok(())
}
43 changes: 43 additions & 0 deletions libs/wal_decoder/proto/interpreted_wal.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
syntax = "proto3";

package interpreted_wal;

message InterpretedWalRecords {
repeated InterpretedWalRecord records = 1;
optional uint64 next_record_lsn = 2;
}

message InterpretedWalRecord {
optional bytes metadata_record = 1;
SerializedValueBatch batch = 2;
uint64 next_record_lsn = 3;
bool flush_uncommitted = 4;
uint32 xid = 5;
}

message SerializedValueBatch {
bytes raw = 1;
repeated ValueMeta metadata = 2;
uint64 max_lsn = 3;
uint64 len = 4;
}

enum ValueMetaType {
Serialized = 0;
Observed = 1;
}

message ValueMeta {
ValueMetaType type = 1;
CompactKey key = 2;
uint64 lsn = 3;
optional uint64 batch_offset = 4;
optional uint64 len = 5;
optional bool will_init = 6;
}

message CompactKey {
int64 high = 1;
int64 low = 2;
}

1 change: 1 addition & 0 deletions libs/wal_decoder/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod decoder;
pub mod models;
pub mod serialized_batch;
pub mod wire_format;
20 changes: 20 additions & 0 deletions libs/wal_decoder/src/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,32 @@ use utils::lsn::Lsn;

use crate::serialized_batch::SerializedValueBatch;

// Code generated by protobuf.
pub mod proto {
// Tonic does derives as `#[derive(Clone, PartialEq, ::prost::Message)]`
// we don't use these types for anything but broker data transmission,
// so it's ok to ignore this one.
#![allow(clippy::derive_partial_eq_without_eq)]
// The generated ValueMeta has a `len` method generate for its `len` field.
#![allow(clippy::len_without_is_empty)]
tonic::include_proto!("interpreted_wal");
}

#[derive(Serialize, Deserialize)]
pub enum FlushUncommittedRecords {
Yes,
No,
}

/// A batch of interpreted WAL records
#[derive(Serialize, Deserialize)]
pub struct InterpretedWalRecords {
pub records: Vec<InterpretedWalRecord>,
// Start LSN of the next record after the batch.
// Note that said record may not belong to the current shard.
pub next_record_lsn: Option<Lsn>,
}

/// An interpreted Postgres WAL record, ready to be handled by the pageserver
#[derive(Serialize, Deserialize)]
pub struct InterpretedWalRecord {
Expand Down
Loading

1 comment on commit 9e0148d

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

7053 tests run: 6718 passed, 2 failed, 333 skipped (full report)


Failures on Postgres 16

  • test_sharded_ingest[github-actions-selfhosted-vanilla-1]: release-x86-64
  • test_storage_controller_many_tenants[github-actions-selfhosted]: release-x86-64
# Run all failed tests locally:
scripts/pytest -vv -n $(nproc) -k "test_sharded_ingest[release-pg16-github-actions-selfhosted-vanilla-1] or test_storage_controller_many_tenants[release-pg16-github-actions-selfhosted]"
Flaky tests (3)

Postgres 17

Postgres 14

Code coverage* (full report)

  • functions: 30.7% (7977 of 26014 functions)
  • lines: 48.6% (63326 of 130404 lines)

* collected from Rust tests only


The comment gets automatically updated with the latest test results
9e0148d at 2024-11-27T14:33:01.962Z :recycle:

Please sign in to comment.