Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pageserver: batch InMemoryLayer puts, remove need to sort items by LSN during ingest #8591

Merged
merged 14 commits into from
Aug 22, 2024
Merged
2 changes: 1 addition & 1 deletion pageserver/src/pgdatadir_mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1796,7 +1796,7 @@ impl<'a> DatadirModification<'a> {
if key.is_rel_block_key() || key.is_slru_block_key() {
// This bails out on first error without modifying pending_updates.
// That's Ok, cf this function's doc comment.
write_batch.push((key, lsn, value));
write_batch.push((key.to_compact(), lsn, value));
} else {
retained_pending_updates
.entry(key)
Expand Down
19 changes: 9 additions & 10 deletions pageserver/src/tenant/ephemeral_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,24 +83,23 @@ impl EphemeralFile {
}

#[cfg(test)]
// This is a test helper: outside of tests, we are always written do via a pre-serialized batch.
jcsp marked this conversation as resolved.
Show resolved Hide resolved
pub(crate) async fn write_blob(
&mut self,
srcbuf: &[u8],
ctx: &RequestContext,
) -> Result<u64, io::Error> {
let pos = self.rw.bytes_written();

let mut len_bytes = std::io::Cursor::new(Vec::new());
crate::tenant::storage_layer::inmemory_layer::SerializedBatch::write_blob_length(
srcbuf.len(),
&mut len_bytes,
);
let len_bytes = len_bytes.into_inner();

// Write the length field
if srcbuf.len() < 0x80 {
// short one-byte length header
let len_buf = [srcbuf.len() as u8];

self.rw.write_all_borrowed(&len_buf, ctx).await?;
} else {
let mut len_buf = u32::to_be_bytes(srcbuf.len() as u32);
len_buf[0] |= 0x80;
self.rw.write_all_borrowed(&len_buf, ctx).await?;
}
self.rw.write_all_borrowed(&len_bytes, ctx).await?;

// Write the payload
self.rw.write_all_borrowed(srcbuf, ctx).await?;
Expand Down
75 changes: 71 additions & 4 deletions pageserver/src/tenant/storage_layer/inmemory_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,74 @@ impl InMemoryLayer {
}
}

pub(crate) struct SerializedBatch {
/// Blobs serialized in EphemeralFile's native format, ready for passing to [`EphemeralFile::write_raw`].
pub(crate) raw: Vec<u8>,

/// Index of values in [`Self::raw`], using offsets relative to the start of the buffer.
pub(crate) offsets: Vec<(CompactKey, Lsn, u64)>,
VladLazar marked this conversation as resolved.
Show resolved Hide resolved

/// The highest LSN of any value in the batch
pub(crate) max_lsn: Lsn,
}

impl SerializedBatch {
/// Write a blob length in the internal format of the EphemeralFile
pub(crate) fn write_blob_length(len: usize, cursor: &mut std::io::Cursor<Vec<u8>>) {
use std::io::Write;

if len < 0x80 {
// short one-byte length header
let len_buf = [len as u8];

cursor
.write_all(&len_buf)
.expect("Writing to Vec is infallible");
} else {
let mut len_buf = u32::to_be_bytes(len as u32);
len_buf[0] |= 0x80;
cursor
.write_all(&len_buf)
.expect("Writing to Vec is infallible");
}
}

pub(crate) fn from_values(batch: Vec<(CompactKey, Lsn, Value)>) -> Self {
use std::io::Write;

let mut offsets: Vec<(CompactKey, Lsn, u64)> = Vec::new();
let mut cursor = std::io::Cursor::new(Vec::<u8>::new());
let mut max_lsn: Lsn = Lsn(0);
let mut value_buf = smallvec::SmallVec::<[u8; 256]>::new();
for (key, lsn, val) in batch {
let relative_off = cursor.position();

value_buf.clear();
val.ser_into(&mut value_buf)
.expect("Value serialization is infallible");
Self::write_blob_length(value_buf.len(), &mut cursor);

cursor
.write_all(&value_buf)
.expect("Writing to Vec is infallible");

// We can't write straight into the buffer, because the InMemoryLayer file format requires
// the size to come before the value. However... we could probably calculate the size before
// actually serializing the value
//val.ser_into(&mut cursor)?;

offsets.push((key, lsn, relative_off));
max_lsn = std::cmp::max(max_lsn, lsn);
}

Self {
raw: cursor.into_inner(),
offsets,
max_lsn,
}
}
}

fn inmem_layer_display(mut f: impl Write, start_lsn: Lsn, end_lsn: Lsn) -> std::fmt::Result {
write!(f, "inmem-{:016X}-{:016X}", start_lsn.0, end_lsn.0)
}
Expand Down Expand Up @@ -417,8 +485,7 @@ impl InMemoryLayer {
// Write path.
pub(crate) async fn put_batch(
&self,
buf: Vec<u8>,
values: Vec<(CompactKey, Lsn, u64)>,
serialized_batch: SerializedBatch,
ctx: &RequestContext,
) -> Result<()> {
let mut inner = self.inner.write().await;
Expand All @@ -428,15 +495,15 @@ impl InMemoryLayer {
inner
.file
.write_raw(
&buf,
&serialized_batch.raw,
&RequestContextBuilder::extend(ctx)
.page_content_kind(PageContentKind::InMemoryLayer)
.build(),
)
.await?
};

for (key, lsn, relative_off) in values {
for (key, lsn, relative_off) in serialized_batch.offsets {
let off = base_off + relative_off;
let vec_map = inner.index.entry(key).or_default();
let old = vec_map.append_or_update_last(lsn, off).unwrap().0;
Expand Down
49 changes: 12 additions & 37 deletions pageserver/src/tenant/timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ use tokio::{
use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::{
bin_ser::BeSer,
fs_ext, pausable_failpoint,
sync::gate::{Gate, GateGuard},
};

use std::pin::pin;
use std::sync::atomic::Ordering as AtomicOrdering;
use std::sync::{Arc, Mutex, RwLock, Weak};
use std::time::{Duration, Instant, SystemTime};
Expand All @@ -62,7 +62,6 @@ use std::{
collections::btree_map::Entry,
ops::{Deref, Range},
};
use std::{io::Write, pin::pin};

use crate::{
aux_file::AuxFileSizeEstimator,
Expand Down Expand Up @@ -136,7 +135,10 @@ use self::layer_manager::LayerManager;
use self::logical_size::LogicalSize;
use self::walreceiver::{WalReceiver, WalReceiverConf};

use super::{config::TenantConf, storage_layer::LayerVisibilityHint, upload_queue::NotInitialized};
use super::{
config::TenantConf, storage_layer::inmemory_layer, storage_layer::LayerVisibilityHint,
upload_queue::NotInitialized,
};
use super::{debug_assert_current_span_has_tenant_and_timeline_id, AttachedTenantConf};
use super::{remote_timeline_client::index::IndexPart, storage_layer::LayerFringe};
use super::{
Expand Down Expand Up @@ -5638,44 +5640,16 @@ impl<'a> TimelineWriter<'a> {
return Ok(());
}

let mut values: Vec<(CompactKey, Lsn, u64)> = Vec::new();
let mut cursor = std::io::Cursor::new(Vec::<u8>::new());
let mut batch_max_lsn: Lsn = Lsn(0);
let mut value_buf = smallvec::SmallVec::<[u8; 256]>::new();
for (key, lsn, val) in batch {
let relative_off = cursor.position();

value_buf.clear();
val.ser_into(&mut value_buf)?;
if value_buf.len() < 0x80 {
// short one-byte length header
let len_buf = [value_buf.len() as u8];

cursor.write_all(&len_buf)?;
} else {
let mut len_buf = u32::to_be_bytes(value_buf.len() as u32);
len_buf[0] |= 0x80;
cursor.write_all(&len_buf)?;
}
cursor.write_all(&value_buf)?;

// We can't write straight into the buffer, because the InMemoryLayer file format requires
// the size to come before the value. However... we could probably calculate the size before
// actually serializing the value
//val.ser_into(&mut cursor)?;

values.push((key, lsn, relative_off));
batch_max_lsn = std::cmp::max(batch_max_lsn, lsn);
}

let buf = cursor.into_inner();
let buf_size: u64 = buf.len() as u64;
let serialized_batch = inmemory_layer::SerializedBatch::from_values(batch);
let batch_max_lsn = serialized_batch.max_lsn;
let buf_size: u64 = serialized_batch.raw.len() as u64;

let action = self.get_open_layer_action(batch_max_lsn, buf_size);
let layer = self
.handle_open_layer_action(batch_max_lsn, action, ctx)
.await?;
let res = layer.put_batch(buf, values, ctx).await;

let res = layer.put_batch(serialized_batch, ctx).await;

if res.is_ok() {
// Update the current size only when the entire write was ok.
Expand All @@ -5702,7 +5676,8 @@ impl<'a> TimelineWriter<'a> {
value: &Value,
ctx: &RequestContext,
) -> anyhow::Result<()> {
self.put_batch(vec![(key, lsn, value.clone())], ctx).await
self.put_batch(vec![(key.to_compact(), lsn, value.clone())], ctx)
.await
}

pub(crate) async fn delete_batch(
Expand Down