Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pageserver: handle decompression outside vectored read_blobs #8942

Merged
76 changes: 24 additions & 52 deletions pageserver/src/tenant/storage_layer/delta_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use crate::tenant::disk_btree::{
use crate::tenant::storage_layer::layer::S3_UPLOAD_LIMIT;
use crate::tenant::timeline::GetVectoredError;
use crate::tenant::vectored_blob_io::{
self, BlobFlag, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead,
BlobFlag, StreamingVectoredReadPlanner, VectoredBlobBufView, VectoredBlobReader, VectoredRead,
VectoredReadCoalesceMode, VectoredReadPlanner,
};
use crate::tenant::PageReconstructError;
Expand Down Expand Up @@ -1021,14 +1021,13 @@ impl DeltaLayerInner {
continue;
}
};

let view = VectoredBlobBufView::new_slice(&blobs_buf.buf);
for meta in blobs_buf.blobs.iter().rev() {
if Some(meta.meta.key) == ignore_key_with_err {
continue;
}
let slice = &blobs_buf.buf[meta.start..meta.end];
let decompressed = vectored_blob_io::decompress(slice, meta.compression_bits).await;
let decompressed = match decompressed {
let blob_read = meta.read(&view).await;
let blob_read = match blob_read {
Ok(buf) => buf,
Err(e) => {
reconstruct_state.on_key_error(
Expand All @@ -1044,11 +1043,7 @@ impl DeltaLayerInner {
}
};

let value = if let Some(decompressed) = decompressed {
Value::des(&decompressed)
} else {
Value::des(slice)
};
let value = Value::des(&blob_read);

let value = match value {
Ok(v) => v,
Expand Down Expand Up @@ -1265,51 +1260,37 @@ impl DeltaLayerInner {
buf.reserve(read.size());
let res = reader.read_blobs(&read, buf, ctx).await?;

let view = VectoredBlobBufView::new_slice(&res.buf);

for blob in res.blobs {
let key = blob.meta.key;
let lsn = blob.meta.lsn;
let slice = &res.buf[blob.start..blob.end];
let decompressed = vectored_blob_io::decompress(slice, blob.compression_bits)
.await
.with_context(|| {
format!(
"blob failed to decompress for {}@{}, {}..{}: ",
blob.meta.key, blob.meta.lsn, blob.start, blob.end
)
})?;

let data = if let Some(decompressed) = &decompressed {
&decompressed[..]
} else {
slice
};
let data = blob.read(&view).await?;

#[cfg(debug_assertions)]
Value::des(data)
Value::des(&data)
.with_context(|| {
format!(
"blob failed to deserialize for {}@{}, {}..{}: {:?}",
blob.meta.key,
blob.meta.lsn,
blob.start,
blob.end,
utils::Hex(data)
"blob failed to deserialize for {}: {:?}",
blob,
utils::Hex(&data)
)
})
.unwrap();

// is it an image or will_init walrecord?
// FIXME: this could be handled by threading the BlobRef to the
// VectoredReadBuilder
let will_init = crate::repository::ValueBytes::will_init(data)
let will_init = crate::repository::ValueBytes::will_init(&data)
.inspect_err(|_e| {
#[cfg(feature = "testing")]
tracing::error!(data=?utils::Hex(data), err=?_e, %key, %lsn, "failed to parse will_init out of serialized value");
tracing::error!(data=?utils::Hex(&data), err=?_e, %key, %lsn, "failed to parse will_init out of serialized value");
})
.unwrap_or(false);

per_blob_copy.clear();
per_blob_copy.extend_from_slice(data);
per_blob_copy.extend_from_slice(&data);

let (tmp, res) = writer
.put_value_bytes(
Expand Down Expand Up @@ -1574,14 +1555,10 @@ impl<'a> DeltaLayerIterator<'a> {
.read_blobs(&plan, buf, self.ctx)
.await?;
let frozen_buf = blobs_buf.buf.freeze();
let view = VectoredBlobBufView::new_bytes(frozen_buf);
for meta in blobs_buf.blobs.iter() {
let slice = &frozen_buf[meta.start..meta.end];
let decompressed = vectored_blob_io::decompress(slice, meta.compression_bits).await?;
let value = if let Some(decompressed) = &decompressed {
Value::des(decompressed)?
} else {
Value::des(slice)?
};
let blob_read = meta.read(&view).await?;
let value = Value::des(&blob_read)?;

next_batch.push_back((meta.meta.key, meta.meta.lsn, value));
}
Expand Down Expand Up @@ -1959,18 +1936,13 @@ pub(crate) mod test {
let blobs_buf = vectored_blob_reader
.read_blobs(&read, buf.take().expect("Should have a buffer"), &ctx)
.await?;
let view = VectoredBlobBufView::new_slice(&blobs_buf.buf);
for meta in blobs_buf.blobs.iter() {
let slice = &blobs_buf.buf[meta.start..meta.end];
let decompressed =
vectored_blob_io::decompress(slice, meta.compression_bits).await?;

let value = if let Some(decompressed) = &decompressed {
decompressed
} else {
slice
};

assert_eq!(&value, &entries_meta.index[&(meta.meta.key, meta.meta.lsn)]);
let value = meta.read(&view).await?;
assert_eq!(
&value[..],
&entries_meta.index[&(meta.meta.key, meta.meta.lsn)]
);
}

buf = Some(blobs_buf.buf);
Expand Down
43 changes: 16 additions & 27 deletions pageserver/src/tenant/storage_layer/image_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use crate::tenant::disk_btree::{
};
use crate::tenant::timeline::GetVectoredError;
use crate::tenant::vectored_blob_io::{
self, BlobFlag, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead,
BlobFlag, StreamingVectoredReadPlanner, VectoredBlobBufView, VectoredBlobReader, VectoredRead,
VectoredReadPlanner,
};
use crate::tenant::{PageReconstructError, Timeline};
Expand Down Expand Up @@ -552,21 +552,14 @@ impl ImageLayerInner {
let buf = BytesMut::with_capacity(buf_size);
let blobs_buf = vectored_blob_reader.read_blobs(&read, buf, ctx).await?;
let frozen_buf = blobs_buf.buf.freeze();
let view = VectoredBlobBufView::new_bytes(frozen_buf);

for meta in blobs_buf.blobs.iter() {
let img_buf = frozen_buf.slice(meta.start..meta.end);
let decompressed =
vectored_blob_io::decompress(&img_buf, meta.compression_bits).await?;

let img_buf = if let Some(decompressed) = decompressed {
decompressed
} else {
img_buf
};
let img_buf = meta.read(&view).await?;

key_count += 1;
writer
.put_image(meta.meta.key, Bytes::from(Vec::from(img_buf)), ctx)
.put_image(meta.meta.key, img_buf.into(), ctx)
.await
.context(format!("Storing key {}", meta.meta.key))?;
}
Expand Down Expand Up @@ -613,14 +606,12 @@ impl ImageLayerInner {
match res {
Ok(blobs_buf) => {
let frozen_buf = blobs_buf.buf.freeze();
let view = VectoredBlobBufView::new_bytes(frozen_buf);
for meta in blobs_buf.blobs.iter() {
let img_buf = frozen_buf.slice(meta.start..meta.end);
let decompressed =
vectored_blob_io::decompress(&img_buf, meta.compression_bits).await;
let img_buf = meta.read(&view).await;

let img_buf = match decompressed {
Ok(None) => img_buf,
Ok(Some(decompressed)) => decompressed,
let img_buf = match img_buf {
Ok(img_buf) => img_buf,
Err(e) => {
reconstruct_state.on_key_error(
meta.meta.key,
Expand All @@ -636,7 +627,7 @@ impl ImageLayerInner {
reconstruct_state.update_key(
&meta.meta.key,
self.lsn,
Value::Image(img_buf),
Value::Image(img_buf.into()),
);
}
}
Expand Down Expand Up @@ -1064,16 +1055,14 @@ impl<'a> ImageLayerIterator<'a> {
.read_blobs(&plan, buf, self.ctx)
.await?;
let frozen_buf = blobs_buf.buf.freeze();
let view = VectoredBlobBufView::new_bytes(frozen_buf);
for meta in blobs_buf.blobs.iter() {
let img_buf = frozen_buf.slice(meta.start..meta.end);
let decompressed =
vectored_blob_io::decompress(&img_buf, meta.compression_bits).await?;
let img_buf = if let Some(decompressed) = decompressed {
decompressed
} else {
img_buf
};
next_batch.push_back((meta.meta.key, self.image_layer.lsn, Value::Image(img_buf)));
let img_buf = meta.read(&view).await?;
next_batch.push_back((
meta.meta.key,
self.image_layer.lsn,
Value::Image(img_buf.into()),
));
}
self.key_values_batch = next_batch;
Ok(())
Expand Down
Loading
Loading