Skip to content

Commit

Permalink
storage: a series fixups for stargz chunks
Browse files Browse the repository at this point in the history
For rafs v6 in fscache daemon, we must make compatible with
stargz chunks.

Signed-off-by: Yan Song <[email protected]>
  • Loading branch information
imeoer committed Jun 21, 2022
1 parent 147d176 commit a1e1b65
Show file tree
Hide file tree
Showing 9 changed files with 97 additions and 23 deletions.
11 changes: 5 additions & 6 deletions rafs/src/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@ use nydus_utils::metrics::{self, FopRecorder, StatsFop::*};

use crate::metadata::layout::RAFS_ROOT_INODE;
use crate::metadata::{
Inode, PostWalkAction, RafsInode, RafsSuper, RafsSuperMeta, DOT, DOTDOT,
RAFS_DEFAULT_CHUNK_SIZE,
Inode, PostWalkAction, RafsInode, RafsSuper, RafsSuperMeta, DOT, DOTDOT, RAFS_MAX_CHUNK_SIZE,
};
use crate::{RafsError, RafsIoReader, RafsResult};

Expand Down Expand Up @@ -105,9 +104,9 @@ impl TryFrom<&RafsConfig> for BlobPrefetchConfig {
type Error = RafsError;

fn try_from(c: &RafsConfig) -> RafsResult<Self> {
if c.fs_prefetch.merging_size as u64 > RAFS_DEFAULT_CHUNK_SIZE {
if c.fs_prefetch.merging_size as u64 > RAFS_MAX_CHUNK_SIZE {
return Err(RafsError::Configure(
"Merging size can't exceed chunk size".to_string(),
"merging size can't exceed max chunk size".to_string(),
));
} else if c.fs_prefetch.enable && c.fs_prefetch.threads_count == 0 {
return Err(RafsError::Configure(
Expand Down Expand Up @@ -924,8 +923,8 @@ impl FileSystem for Rafs {
#[cfg(test)]
pub(crate) mod tests {
use super::*;
use crate::metadata::RAFS_DEFAULT_CHUNK_SIZE;
use crate::RafsIoRead;
use storage::RAFS_MAX_CHUNK_SIZE;

pub fn new_rafs_backend() -> Box<Rafs> {
let config = r#"
Expand Down Expand Up @@ -1075,7 +1074,7 @@ pub(crate) mod tests {
config.fs_prefetch.merging_size = RAFS_MAX_CHUNK_SIZE as usize + 1;
assert!(BlobPrefetchConfig::try_from(&config).is_err());

config.fs_prefetch.merging_size = RAFS_MAX_CHUNK_SIZE as usize;
config.fs_prefetch.merging_size = RAFS_DEFAULT_CHUNK_SIZE as usize;
config.fs_prefetch.bandwidth_rate = 1;
config.fs_prefetch.prefetch_all = true;
assert!(BlobPrefetchConfig::try_from(&config).is_ok());
Expand Down
12 changes: 10 additions & 2 deletions rafs/src/metadata/layout/v6.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use lazy_static::lazy_static;
use nydus_utils::{compress, digest, round_up, ByteSize};
use storage::device::{BlobFeatures, BlobInfo};
use storage::meta::{BlobMetaHeaderOndisk, BLOB_FEATURE_4K_ALIGNED};
use storage::RAFS_MAX_CHUNK_SIZE;

use crate::metadata::{layout::RafsXAttrs, RafsStore, RafsSuperFlags};
use crate::{impl_bootstrap_converter, impl_pub_getter_setter, RafsIoReader, RafsIoWrite};
Expand Down Expand Up @@ -352,7 +353,10 @@ impl RafsV6SuperBlockExt {
}

let chunk_size = u32::from_le(self.s_chunk_size) as u64;
if !chunk_size.is_power_of_two() || chunk_size < EROFS_BLOCK_SIZE {
if !chunk_size.is_power_of_two()
|| chunk_size < EROFS_BLOCK_SIZE
|| chunk_size > RAFS_MAX_CHUNK_SIZE
{
return Err(einval!("invalid chunk size in Rafs v6 extended superblock"));
}

Expand Down Expand Up @@ -1292,7 +1296,11 @@ impl RafsV6Blob {
}

let c_size = u32::from_le(self.chunk_size) as u64;
if c_size.count_ones() != 1 || c_size < EROFS_BLOCK_SIZE || c_size != chunk_size as u64 {
if c_size.count_ones() != 1
|| c_size < EROFS_BLOCK_SIZE
|| c_size > RAFS_MAX_CHUNK_SIZE
|| c_size != chunk_size as u64
{
error!(
"RafsV6Blob: idx {} invalid c_size {}, count_ones() {}",
blob_index,
Expand Down
7 changes: 5 additions & 2 deletions src/bin/nydus-image/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use serde::{Deserialize, Serialize};
use nydus_app::{setup_logging, BuildTimeInfo};
use nydus_rafs::RafsIoReader;
use nydus_storage::factory::{BackendConfig, BlobFactory};
use nydus_storage::RAFS_DEFAULT_CHUNK_SIZE;
use nydus_storage::{RAFS_DEFAULT_CHUNK_SIZE, RAFS_MAX_CHUNK_SIZE};
use nydus_utils::{compress, digest};

use crate::builder::{Builder, DiffBuilder, DirectoryBuilder, StargzBuilder};
Expand Down Expand Up @@ -966,7 +966,10 @@ impl Command {
let param = v.trim_start_matches("0x").trim_end_matches("0X");
let chunk_size =
u32::from_str_radix(param, 16).context(format!("invalid chunk size {}", v))?;
if chunk_size < 0x1000 || !chunk_size.is_power_of_two() {
if chunk_size as u64 > RAFS_MAX_CHUNK_SIZE
|| chunk_size < 0x1000
|| !chunk_size.is_power_of_two()
{
bail!("invalid chunk size: {}", chunk_size);
}
Ok(chunk_size)
Expand Down
11 changes: 10 additions & 1 deletion src/bin/nydusd/fs_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -584,7 +584,16 @@ impl FsCacheHandler {
}
Some(obj) => match obj.fetch_range_uncompressed(msg.off, msg.len) {
Ok(v) if v == msg.len as usize => {}
_ => debug!("fscache: failed to read data from blob object"),
Ok(v) => {
warn!(
"fscache: read data from blob object not matched: {} != {}",
v, msg.len
);
}
Err(e) => error!(
"{}",
format!("fscache: failed to read data from blob object: {}", e,)
),
},
}
}
Expand Down
28 changes: 28 additions & 0 deletions storage/src/cache/cachedfile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,34 @@ impl BlobObject for FileCacheEntry {

impl FileCacheEntry {
fn do_fetch_chunks(&self, chunks: &[BlobIoChunk]) -> Result<usize> {
if self.is_stargz() {
// FIXME: for stargz, we need to implement fetching multiple chunks. here
// is a heavy overhead workaround, needs to be optimized.
for chunk in chunks {
let mut buf = alloc_buf(chunk.uncompress_size() as usize);
self.read_raw_chunk(chunk, &mut buf, false, None)
.map_err(|e| {
eio!(format!(
"read_raw_chunk failed to read and decompress stargz chunk, {:?}",
e
))
})?;
if self.dio_enabled {
self.adjust_buffer_for_dio(&mut buf)
}
Self::persist_chunk(&self.file, chunk.uncompress_offset(), &buf).map_err(|e| {
eio!(format!(
"do_fetch_chunk failed to persist stargz chunk, {:?}",
e
))
})?;
self.chunk_map
.set_ready_and_clear_pending(chunk.as_base())
.unwrap_or_else(|e| error!("set stargz chunk ready failed, {}", e));
}
return Ok(0);
}

debug_assert!(!chunks.is_empty());
let bitmap = self
.chunk_map
Expand Down
5 changes: 1 addition & 4 deletions storage/src/cache/fscache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,6 @@ impl FileCacheEntry {
if blob_info.has_feature(BlobFeatures::V5_NO_EXT_BLOB_TABLE) {
return Err(einval!("fscache does not support Rafs v5 blobs"));
}
if blob_info.is_stargz() {
return Err(einval!("fscache does not support stargz blob file"));
}
let file = blob_info
.get_fscache_file()
.ok_or_else(|| einval!("No fscache file associated with the blob_info"))?;
Expand Down Expand Up @@ -211,7 +208,7 @@ impl FileCacheEntry {
is_get_blob_object_supported: true,
is_compressed: false,
is_direct_chunkmap: true,
is_stargz: false,
is_stargz: blob_info.is_stargz(),
dio_enabled: true,
need_validate: mgr.validate,
prefetch_config,
Expand Down
11 changes: 7 additions & 4 deletions storage/src/cache/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use nydus_utils::async_helper::with_runtime;
use nydus_utils::mpmc::Channel;

use crate::cache::{BlobCache, BlobIoRange};
use crate::RAFS_MAX_CHUNK_SIZE;
use crate::RAFS_DEFAULT_CHUNK_SIZE;

/// Configuration information for asynchronous workers.
pub(crate) struct AsyncPrefetchConfig {
Expand Down Expand Up @@ -99,11 +99,14 @@ impl AsyncWorkerMgr {
metrics: Arc<BlobcacheMetrics>,
prefetch_config: Arc<AsyncPrefetchConfig>,
) -> Result<Self> {
// If the given value is less than maximum blob chunk size, it exceeds burst size of the
// If the given value is less than default blob chunk size, it exceeds burst size of the
// limiter ending up with throttling all throughput, so ensure bandwidth is bigger than
// the maximum chunk size.
// the default chunk size.
let tweaked_bw_limit = if prefetch_config.bandwidth_rate != 0 {
std::cmp::max(RAFS_MAX_CHUNK_SIZE as u32, prefetch_config.bandwidth_rate)
std::cmp::max(
RAFS_DEFAULT_CHUNK_SIZE as u32,
prefetch_config.bandwidth_rate,
)
} else {
0
};
Expand Down
4 changes: 2 additions & 2 deletions storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ macro_rules! impl_getter {

/// Default blob chunk size.
pub const RAFS_DEFAULT_CHUNK_SIZE: u64 = 1024 * 1024;
/// Maximum blob chunk size.
pub const RAFS_MAX_CHUNK_SIZE: u64 = 1024 * 1024;
/// Maximum blob chunk size, 16MB.
pub const RAFS_MAX_CHUNK_SIZE: u64 = 1024 * 1024 * 16;

/// Error codes related to storage subsystem.
#[derive(Debug)]
Expand Down
31 changes: 29 additions & 2 deletions storage/src/meta/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,7 @@ impl BlobMetaInfo {
chunks: chunk_infos,
base: base as *const u8,
unmap_len: expected_size,
is_stargz: blob_info.is_stargz(),
});

Ok(BlobMetaInfo { state })
Expand Down Expand Up @@ -465,7 +466,9 @@ impl BlobMetaInfo {
index += 1;
let entry = &infos[index];
self.validate_chunk(entry)?;
if entry.uncompressed_offset() != last_end {

// For stargz chunks, disable this check.
if !self.state.is_stargz && entry.uncompressed_offset() != last_end {
return Err(einval!(format!(
"mismatch uncompressed {} size {} last_end {}",
entry.uncompressed_offset(),
Expand Down Expand Up @@ -562,7 +565,8 @@ impl BlobMetaInfo {

#[inline]
fn validate_chunk(&self, entry: &BlobChunkInfoOndisk) -> Result<()> {
if entry.compressed_end() > self.state.compressed_size
// For stargz blob, self.state.compressed_size == 0, so don't validate it.
if (!self.state.is_stargz && entry.compressed_end() > self.state.compressed_size)
|| entry.uncompressed_end() > self.state.uncompressed_size
{
Err(einval!())
Expand Down Expand Up @@ -646,6 +650,8 @@ pub struct BlobMetaState {
chunks: ManuallyDrop<Vec<BlobChunkInfoOndisk>>,
base: *const u8,
unmap_len: usize,
/// The blob meta is for an stargz image.
is_stargz: bool,
}

// // Safe to Send/Sync because the underlying data structures are readonly
Expand All @@ -671,6 +677,25 @@ impl BlobMetaState {
let mut start = 0;
let mut end = 0;

if self.is_stargz {
// FIXME: since stargz chunks are not currently allocated chunk index in the order of uncompressed_offset,
// a binary search is not available for now, here is a heavy overhead workaround, need to be fixed.
for i in 0..self.chunk_count {
let off = if compressed {
chunks[i as usize].compressed_offset()
} else {
chunks[i as usize].uncompressed_offset()
};
if addr == off {
return Ok(i as usize);
}
}
return Err(einval!(format!(
"can't find stargz chunk by offset {}",
addr,
)));
}

while left < right {
let mid = left + size / 2;
// SAFETY: the call is made safe by the following invariants:
Expand Down Expand Up @@ -799,6 +824,7 @@ mod tests {
]),
base: std::ptr::null(),
unmap_len: 0,
is_stargz: false,
};

assert_eq!(state.get_chunk_index_nocheck(0, false).unwrap(), 0);
Expand Down Expand Up @@ -883,6 +909,7 @@ mod tests {
]),
base: std::ptr::null(),
unmap_len: 0,
is_stargz: false,
};
let info = BlobMetaInfo {
state: Arc::new(state),
Expand Down

0 comments on commit a1e1b65

Please sign in to comment.