Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pageserver: handle decompression outside vectored read_blobs #8942

Merged
58 changes: 41 additions & 17 deletions pageserver/src/tenant/storage_layer/delta_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use crate::tenant::disk_btree::{
use crate::tenant::storage_layer::layer::S3_UPLOAD_LIMIT;
use crate::tenant::timeline::GetVectoredError;
use crate::tenant::vectored_blob_io::{
BlobFlag, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead,
BlobFlag, BufView, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead,
VectoredReadCoalesceMode, VectoredReadPlanner,
};
use crate::tenant::PageReconstructError;
Expand Down Expand Up @@ -1021,13 +1021,30 @@ impl DeltaLayerInner {
continue;
}
};

let view = BufView::new_slice(&blobs_buf.buf);
for meta in blobs_buf.blobs.iter().rev() {
if Some(meta.meta.key) == ignore_key_with_err {
continue;
}
let blob_read = meta.read(&view).await;
let blob_read = match blob_read {
Ok(buf) => buf,
Err(e) => {
reconstruct_state.on_key_error(
meta.meta.key,
PageReconstructError::Other(anyhow!(e).context(format!(
"Failed to decompress blob from virtual file {}",
self.file.path,
))),
);

ignore_key_with_err = Some(meta.meta.key);
continue;
problame marked this conversation as resolved.
Show resolved Hide resolved
}
};

let value = Value::des(&blob_read);

let value = Value::des(&blobs_buf.buf[meta.start..meta.end]);
let value = match value {
Ok(v) => v,
Err(e) => {
Expand Down Expand Up @@ -1243,37 +1260,37 @@ impl DeltaLayerInner {
buf.reserve(read.size());
let res = reader.read_blobs(&read, buf, ctx).await?;

let view = BufView::new_slice(&res.buf);

for blob in res.blobs {
let key = blob.meta.key;
let lsn = blob.meta.lsn;
let data = &res.buf[blob.start..blob.end];

let data = blob.read(&view).await?;

#[cfg(debug_assertions)]
Value::des(data)
Value::des(&data)
.with_context(|| {
format!(
"blob failed to deserialize for {}@{}, {}..{}: {:?}",
blob.meta.key,
blob.meta.lsn,
blob.start,
blob.end,
utils::Hex(data)
"blob failed to deserialize for {}: {:?}",
blob,
utils::Hex(&data)
)
})
.unwrap();

// is it an image or will_init walrecord?
// FIXME: this could be handled by threading the BlobRef to the
// VectoredReadBuilder
let will_init = crate::repository::ValueBytes::will_init(data)
let will_init = crate::repository::ValueBytes::will_init(&data)
.inspect_err(|_e| {
#[cfg(feature = "testing")]
tracing::error!(data=?utils::Hex(data), err=?_e, %key, %lsn, "failed to parse will_init out of serialized value");
tracing::error!(data=?utils::Hex(&data), err=?_e, %key, %lsn, "failed to parse will_init out of serialized value");
})
.unwrap_or(false);

per_blob_copy.clear();
per_blob_copy.extend_from_slice(data);
per_blob_copy.extend_from_slice(&data);

let (tmp, res) = writer
.put_value_bytes(
Expand Down Expand Up @@ -1538,8 +1555,11 @@ impl<'a> DeltaLayerIterator<'a> {
.read_blobs(&plan, buf, self.ctx)
.await?;
let frozen_buf = blobs_buf.buf.freeze();
let view = BufView::new_bytes(frozen_buf);
for meta in blobs_buf.blobs.iter() {
let value = Value::des(&frozen_buf[meta.start..meta.end])?;
let blob_read = meta.read(&view).await?;
let value = Value::des(&blob_read)?;

next_batch.push_back((meta.meta.key, meta.meta.lsn, value));
}
self.key_values_batch = next_batch;
Expand Down Expand Up @@ -1916,9 +1936,13 @@ pub(crate) mod test {
let blobs_buf = vectored_blob_reader
.read_blobs(&read, buf.take().expect("Should have a buffer"), &ctx)
.await?;
let view = BufView::new_slice(&blobs_buf.buf);
for meta in blobs_buf.blobs.iter() {
let value = &blobs_buf.buf[meta.start..meta.end];
assert_eq!(value, entries_meta.index[&(meta.meta.key, meta.meta.lsn)]);
let value = meta.read(&view).await?;
assert_eq!(
&value[..],
&entries_meta.index[&(meta.meta.key, meta.meta.lsn)]
);
}

buf = Some(blobs_buf.buf);
Expand Down
41 changes: 31 additions & 10 deletions pageserver/src/tenant/storage_layer/image_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ use crate::tenant::disk_btree::{
};
use crate::tenant::timeline::GetVectoredError;
use crate::tenant::vectored_blob_io::{
BlobFlag, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead, VectoredReadPlanner,
BlobFlag, BufView, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead,
VectoredReadPlanner,
};
use crate::tenant::PageReconstructError;
use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt;
Expand Down Expand Up @@ -547,15 +548,15 @@ impl ImageLayerInner {

let buf = BytesMut::with_capacity(buf_size);
let blobs_buf = vectored_blob_reader.read_blobs(&read, buf, ctx).await?;

let frozen_buf = blobs_buf.buf.freeze();
let view = BufView::new_bytes(frozen_buf);

for meta in blobs_buf.blobs.iter() {
let img_buf = frozen_buf.slice(meta.start..meta.end);
let img_buf = meta.read(&view).await?;

key_count += 1;
writer
.put_image(meta.meta.key, img_buf, ctx)
.put_image(meta.meta.key, img_buf.into_bytes(), ctx)
.await
.context(format!("Storing key {}", meta.meta.key))?;
}
Expand Down Expand Up @@ -602,13 +603,28 @@ impl ImageLayerInner {
match res {
Ok(blobs_buf) => {
let frozen_buf = blobs_buf.buf.freeze();

let view = BufView::new_bytes(frozen_buf);
for meta in blobs_buf.blobs.iter() {
let img_buf = frozen_buf.slice(meta.start..meta.end);
let img_buf = meta.read(&view).await;

let img_buf = match img_buf {
Ok(img_buf) => img_buf,
Err(e) => {
reconstruct_state.on_key_error(
meta.meta.key,
PageReconstructError::Other(anyhow!(e).context(format!(
"Failed to decompress blob from virtual file {}",
self.file.path,
))),
);
problame marked this conversation as resolved.
Show resolved Hide resolved

continue;
}
};
reconstruct_state.update_key(
&meta.meta.key,
self.lsn,
Value::Image(img_buf),
Value::Image(img_buf.into_bytes()),
);
}
}
Expand Down Expand Up @@ -1025,10 +1041,15 @@ impl<'a> ImageLayerIterator<'a> {
let blobs_buf = vectored_blob_reader
.read_blobs(&plan, buf, self.ctx)
.await?;
let frozen_buf: Bytes = blobs_buf.buf.freeze();
let frozen_buf = blobs_buf.buf.freeze();
let view = BufView::new_bytes(frozen_buf);
for meta in blobs_buf.blobs.iter() {
let img_buf = frozen_buf.slice(meta.start..meta.end);
next_batch.push_back((meta.meta.key, self.image_layer.lsn, Value::Image(img_buf)));
let img_buf = meta.read(&view).await?;
next_batch.push_back((
meta.meta.key,
self.image_layer.lsn,
Value::Image(img_buf.into_bytes()),
));
}
self.key_values_batch = next_batch;
Ok(())
Expand Down
Loading
Loading