diff --git a/integrations/virtiofs/src/buffer.rs b/integrations/virtiofs/src/buffer.rs index 09e42498dd5c..82301b1aff0c 100644 --- a/integrations/virtiofs/src/buffer.rs +++ b/integrations/virtiofs/src/buffer.rs @@ -22,20 +22,18 @@ use std::ptr; use vm_memory::bitmap::BitmapSlice; use vm_memory::VolatileSlice; -use crate::error::*; - /// ReadWriteAtVolatile is a trait that allows reading and writing from a slice of VolatileSlice. pub trait ReadWriteAtVolatile { - fn read_vectored_at_volatile(&self, bufs: &[&VolatileSlice]) -> Result; - fn write_vectored_at_volatile(&self, bufs: &[&VolatileSlice]) -> Result; + fn read_vectored_at_volatile(&self, bufs: &[&VolatileSlice]) -> usize; + fn write_vectored_at_volatile(&self, bufs: &[&VolatileSlice]) -> usize; } impl + ?Sized> ReadWriteAtVolatile for &T { - fn read_vectored_at_volatile(&self, bufs: &[&VolatileSlice]) -> Result { + fn read_vectored_at_volatile(&self, bufs: &[&VolatileSlice]) -> usize { (**self).read_vectored_at_volatile(bufs) } - fn write_vectored_at_volatile(&self, bufs: &[&VolatileSlice]) -> Result { + fn write_vectored_at_volatile(&self, bufs: &[&VolatileSlice]) -> usize { (**self).write_vectored_at_volatile(bufs) } } @@ -58,7 +56,7 @@ impl BufferWrapper { } impl ReadWriteAtVolatile for BufferWrapper { - fn read_vectored_at_volatile(&self, bufs: &[&VolatileSlice]) -> Result { + fn read_vectored_at_volatile(&self, bufs: &[&VolatileSlice]) -> usize { let slice_guards: Vec<_> = bufs.iter().map(|s| s.ptr_guard_mut()).collect(); let iovecs: Vec<_> = slice_guards .iter() @@ -68,7 +66,7 @@ impl ReadWriteAtVolatile for BufferWrapper { }) .collect(); if iovecs.is_empty() { - return Ok(0); + return 0; } let data = self.buffer.borrow().to_vec(); let mut result = 0; @@ -83,10 +81,10 @@ impl ReadWriteAtVolatile for BufferWrapper { bufs[index].bitmap().mark_dirty(0, num); result += num; } - Ok(result) + result } - fn write_vectored_at_volatile(&self, bufs: &[&VolatileSlice]) -> Result { + fn write_vectored_at_volatile(&self, bufs: &[&VolatileSlice]) -> usize { let slice_guards: Vec<_> = bufs.iter().map(|s| s.ptr_guard()).collect(); let iovecs: Vec<_> = slice_guards .iter() @@ -96,7 +94,7 @@ impl ReadWriteAtVolatile for BufferWrapper { }) .collect(); if iovecs.is_empty() { - return Ok(0); + return 0; } let len = iovecs.iter().map(|iov| iov.iov_len).sum(); let mut data = vec![0; len]; @@ -112,6 +110,6 @@ impl ReadWriteAtVolatile for BufferWrapper { offset += iov.iov_len; } *self.buffer.borrow_mut() = opendal::Buffer::from(data); - Ok(len) + len } } diff --git a/integrations/virtiofs/src/error.rs b/integrations/virtiofs/src/error.rs index 99d67a86ecf1..3adf580f021a 100644 --- a/integrations/virtiofs/src/error.rs +++ b/integrations/virtiofs/src/error.rs @@ -15,21 +15,20 @@ // specific language governing permissions and limitations // under the License. -use std::ffi::CStr; use std::io; use anyhow::Error as AnyError; +use opendal::ErrorKind; use snafu::prelude::Snafu; /// Error is a error struct returned by all ovfs functions. #[derive(Debug, Snafu)] #[non_exhaustive] pub enum Error { - #[snafu(display("Vhost user fs error: {}, source: {:?}", message, source))] - VhostUserFsError { - message: String, + #[snafu(display("IO error: {:?}", source))] + IOError { #[snafu(source(false))] - source: Option, + source: io::Error, }, #[snafu(display("Unexpected error: {}, source: {:?}", message, source))] Unexpected { @@ -41,16 +40,33 @@ pub enum Error { impl From for Error { fn from(errno: libc::c_int) -> Error { - let err_str = unsafe { libc::strerror(errno) }; - let message = if err_str.is_null() { - format!("errno: {}", errno) - } else { - let c_str = unsafe { CStr::from_ptr(err_str) }; - c_str.to_string_lossy().into_owned() - }; - Error::VhostUserFsError { - message, - source: None, + Error::IOError { + source: io::Error::from_raw_os_error(errno), + } + } +} + +impl From for Error { + fn from(error: opendal::Error) -> Error { + match error.kind() { + ErrorKind::Unsupported => libc::ENOTSUP.into(), + ErrorKind::IsADirectory => libc::EISDIR.into(), + ErrorKind::NotFound => libc::ENOENT.into(), + ErrorKind::PermissionDenied => libc::EPERM.into(), + ErrorKind::AlreadyExists => libc::EEXIST.into(), + ErrorKind::NotADirectory => libc::ENOTDIR.into(), + ErrorKind::RangeNotSatisfied => libc::ERANGE.into(), + ErrorKind::RateLimited => libc::EAGAIN.into(), + _ => libc::EIO.into(), + } + } +} + +impl From for libc::c_int { + fn from(error: Error) -> libc::c_int { + match error { + Error::IOError { source } => source.raw_os_error().unwrap_or(libc::EIO), + Error::Unexpected { .. } => libc::EIO, } } } @@ -58,16 +74,7 @@ impl From for Error { impl From for io::Error { fn from(error: Error) -> io::Error { match error { - Error::VhostUserFsError { message, source } => { - let message = format!("Vhost user fs error: {}", message); - match source { - Some(source) => io::Error::new( - io::ErrorKind::Other, - format!("{}, source: {:?}", message, source), - ), - None => io::Error::new(io::ErrorKind::Other, message), - } - } + Error::IOError { source } => source, Error::Unexpected { message, source } => { let message = format!("Unexpected error: {}", message); match source { @@ -85,13 +92,6 @@ impl From for io::Error { /// Result is a result wrapper in ovfs. pub type Result = std::result::Result; -pub fn new_vhost_user_fs_error(message: &str, source: Option) -> Error { - Error::VhostUserFsError { - message: message.to_string(), - source, - } -} - pub fn new_unexpected_error(message: &str, source: Option) -> Error { Error::Unexpected { message: message.to_string(), diff --git a/integrations/virtiofs/src/filesystem.rs b/integrations/virtiofs/src/filesystem.rs index 764d03596648..f6e6ddd525f1 100644 --- a/integrations/virtiofs/src/filesystem.rs +++ b/integrations/virtiofs/src/filesystem.rs @@ -24,8 +24,8 @@ use std::sync::Mutex; use std::time::Duration; use log::debug; +use log::warn; use opendal::Buffer; -use opendal::ErrorKind; use opendal::Operator; use sharded_slab::Slab; use tokio::runtime::Builder; @@ -91,20 +91,6 @@ impl OpenedFile { } } -fn opendal_error2error(error: opendal::Error) -> Error { - match error.kind() { - ErrorKind::Unsupported => Error::from(libc::EOPNOTSUPP), - ErrorKind::IsADirectory => Error::from(libc::EISDIR), - ErrorKind::NotFound => Error::from(libc::ENOENT), - ErrorKind::PermissionDenied => Error::from(libc::EACCES), - ErrorKind::AlreadyExists => Error::from(libc::EEXIST), - ErrorKind::NotADirectory => Error::from(libc::ENOTDIR), - ErrorKind::RangeNotSatisfied => Error::from(libc::EINVAL), - ErrorKind::RateLimited => Error::from(libc::EBUSY), - _ => Error::from(libc::ENOENT), - } -} - fn opendal_metadata2opened_file( path: &str, metadata: &opendal::Metadata, @@ -153,15 +139,16 @@ impl Filesystem { } pub fn handle_message(&self, mut r: Reader, w: Writer) -> Result { - let in_header: InHeader = r.read_obj().map_err(|e| { - new_vhost_user_fs_error("failed to decode protocol messages", Some(e.into())) - })?; + let in_header: InHeader = r.read_obj()?; if in_header.len > (MAX_BUFFER_SIZE + BUFFER_HEADER_SIZE) { - // The message is too long here. - return Filesystem::reply_error(in_header.unique, w); + return Filesystem::reply_error( + in_header.unique, + w, + new_unexpected_error("message is too long", None), + ); } - if let Ok(opcode) = Opcode::try_from(in_header.opcode) { - match opcode { + match Opcode::try_from(in_header.opcode) { + Ok(opcode) => match opcode { Opcode::Init => self.init(in_header, r, w), Opcode::Destroy => self.destroy(in_header, r, w), Opcode::Lookup => self.lookup(in_header, r, w), @@ -175,9 +162,8 @@ impl Filesystem { Opcode::Open => self.open(in_header, r, w), Opcode::Read => self.read(in_header, r, w), Opcode::Write => self.write(in_header, r, w), - } - } else { - Filesystem::reply_error(in_header.unique, w) + }, + Err(err) => Filesystem::reply_error(in_header.unique, w, err), } } } @@ -198,47 +184,69 @@ impl Filesystem { } let header = OutHeader { unique, - error: 0, // Return no error. + error: 0, len: len as u32, }; w.write_all(header.as_slice()).map_err(|e| { - new_vhost_user_fs_error("failed to encode protocol messages", Some(e.into())) + new_unexpected_error( + "unexpected error occured while replying to frontend", + Some(e.into()), + ) })?; if let Some(out) = out { w.write_all(out.as_slice()).map_err(|e| { - new_vhost_user_fs_error("failed to encode protocol messages", Some(e.into())) + new_unexpected_error( + "unexpected error occured while replying to frontend", + Some(e.into()), + ) })?; } if let Some(data) = data { w.write_all(data).map_err(|e| { - new_vhost_user_fs_error("failed to encode protocol messages", Some(e.into())) + new_unexpected_error( + "unexpected error occured while replying to frontend", + Some(e.into()), + ) })?; } Ok(w.bytes_written()) } - fn reply_error(unique: u64, mut w: Writer) -> Result { + fn reply_error(unique: u64, mut w: Writer, err: Error) -> Result { + // Errors that meet expectations are consumed here. + warn!( + "[virtiofs] reply error to frontend: unique={} error={}", + unique, err + ); let header = OutHeader { unique, - error: libc::EIO, // Here we simply return I/O error. + error: err.into(), len: size_of::() as u32, }; w.write_all(header.as_slice()).map_err(|e| { - new_vhost_user_fs_error("failed to encode protocol messages", Some(e.into())) + new_unexpected_error( + "unexpected error occured while replying to frontend", + Some(e.into()), + ) })?; Ok(w.bytes_written()) } fn bytes_to_str(buf: &[u8]) -> Result<&str> { - Filesystem::bytes_to_cstr(buf)?.to_str().map_err(|e| { - new_vhost_user_fs_error("failed to decode protocol messages", Some(e.into())) - }) - } - - fn bytes_to_cstr(buf: &[u8]) -> Result<&CStr> { - CStr::from_bytes_with_nul(buf).map_err(|e| { - new_vhost_user_fs_error("failed to decode protocol messages", Some(e.into())) - }) + CStr::from_bytes_with_nul(buf) + .map_err(|e| { + new_unexpected_error( + "unexpected error occured while decoding protocol messages", + Some(e.into()), + ) + })? + .to_str() + .map_err(|e| { + new_unexpected_error( + "unexpected error occured while decoding protocol messages", + Some(e.into()), + ) + }) } fn check_flags(&self, flags: u32) -> Result<(bool, bool)> { @@ -262,24 +270,22 @@ impl Filesystem { impl Filesystem { fn init(&self, in_header: InHeader, mut r: Reader, w: Writer) -> Result { - let InitIn { major, minor, .. } = r.read_obj().map_err(|e| { - new_vhost_user_fs_error("failed to decode protocol messages", Some(e.into())) - })?; + let InitIn { major, minor, .. } = r.read_obj()?; if major != KERNEL_VERSION || minor < MIN_KERNEL_MINOR_VERSION { - return Filesystem::reply_error(in_header.unique, w); + return Filesystem::reply_error( + in_header.unique, + w, + new_unexpected_error("unsupported virtiofs version", None), + ); } let mut attr = OpenedFile::new(FileType::Dir, "/", self.uid, self.gid); attr.metadata.ino = 1; // We need to allocate the inode 1 for the root directory. The double insertion // here makes 1 the first inode and avoids extra alignment and processing elsewhere. - self.opened_files - .insert(attr.clone()) - .expect("failed to allocate inode"); - self.opened_files - .insert(attr.clone()) - .expect("failed to allocate inode"); + self.opened_files.insert(attr.clone()); + self.opened_files.insert(attr.clone()); let mut opened_files_map = self.opened_files_map.lock().unwrap(); opened_files_map.insert("/".to_string(), 1); @@ -311,11 +317,14 @@ impl Filesystem { let name_len = in_header.len as usize - size_of::(); let mut buf = vec![0; name_len]; r.read_exact(&mut buf).map_err(|e| { - new_unexpected_error("failed to decode protocol messages", Some(e.into())) + new_unexpected_error( + "unexpected error occured while decoding protocol messages", + Some(e.into()), + ) })?; let name = match Filesystem::bytes_to_str(buf.as_ref()) { Ok(name) => name, - Err(_) => return Filesystem::reply_error(in_header.unique, w), + Err(err) => return Filesystem::reply_error(in_header.unique, w, err), }; debug!("lookup: parent inode={} name={}", in_header.nodeid, name); @@ -326,13 +335,13 @@ impl Filesystem { .map(|f| f.path.clone()) { Some(path) => path, - None => return Filesystem::reply_error(in_header.unique, w), + None => return Filesystem::reply_error(in_header.unique, w, Error::from(libc::ENOENT)), }; let path = format!("{}/{}", parent_path, name); let metadata = match self.rt.block_on(self.do_get_metadata(&path)) { Ok(metadata) => metadata, - Err(_) => return Filesystem::reply_error(in_header.unique, w), + Err(err) => return Filesystem::reply_error(in_header.unique, w, err), }; let out = EntryOut { @@ -356,12 +365,12 @@ impl Filesystem { .map(|f| f.path.clone()) { Some(path) => path, - None => return Filesystem::reply_error(in_header.unique, w), + None => return Filesystem::reply_error(in_header.unique, w, Error::from(libc::ENOENT)), }; let metadata = match self.rt.block_on(self.do_get_metadata(&path)) { Ok(metadata) => metadata, - Err(_) => return Filesystem::reply_error(in_header.unique, w), + Err(err) => return Filesystem::reply_error(in_header.unique, w, err), }; let out = AttrOut { @@ -381,18 +390,19 @@ impl Filesystem { } fn create(&self, in_header: InHeader, mut r: Reader, w: Writer) -> Result { - let CreateIn { flags, .. } = r.read_obj().map_err(|e| { - new_vhost_user_fs_error("failed to decode protocol messages", Some(e.into())) - })?; + let CreateIn { flags, .. } = r.read_obj()?; let name_len = in_header.len as usize - size_of::() - size_of::(); let mut buf = vec![0; name_len]; r.read_exact(&mut buf).map_err(|e| { - new_unexpected_error("failed to decode protocol messages", Some(e.into())) + new_unexpected_error( + "unexpected error occured while decoding protocol messages", + Some(e.into()), + ) })?; let name = match Filesystem::bytes_to_str(buf.as_ref()) { Ok(name) => name, - Err(_) => return Filesystem::reply_error(in_header.unique, w), + Err(err) => return Filesystem::reply_error(in_header.unique, w, err), }; debug!("create: parent inode={} name={}", in_header.nodeid, name); @@ -403,7 +413,7 @@ impl Filesystem { .map(|f| f.path.clone()) { Some(path) => path, - None => return Filesystem::reply_error(in_header.unique, w), + None => return Filesystem::reply_error(in_header.unique, w, Error::from(libc::ENOENT)), }; let path = format!("{}/{}", parent_path, name); @@ -418,7 +428,7 @@ impl Filesystem { match self.rt.block_on(self.do_set_writer(&path, flags)) { Ok(writer) => writer, - Err(_) => return Filesystem::reply_error(in_header.unique, w), + Err(err) => return Filesystem::reply_error(in_header.unique, w, err), }; let entry_out = EntryOut { @@ -445,11 +455,14 @@ impl Filesystem { let name_len = in_header.len as usize - size_of::(); let mut buf = vec![0; name_len]; r.read_exact(&mut buf).map_err(|e| { - new_unexpected_error("failed to decode protocol messages", Some(e.into())) + new_unexpected_error( + "unexpected error occured while decoding protocol messages", + Some(e.into()), + ) })?; let name = match Filesystem::bytes_to_str(buf.as_ref()) { Ok(name) => name, - Err(_) => return Filesystem::reply_error(in_header.unique, w), + Err(err) => return Filesystem::reply_error(in_header.unique, w, err), }; debug!("unlink: parent inode={} name={}", in_header.nodeid, name); @@ -460,12 +473,12 @@ impl Filesystem { .map(|f| f.path.clone()) { Some(path) => path, - None => return Filesystem::reply_error(in_header.unique, w), + None => return Filesystem::reply_error(in_header.unique, w, Error::from(libc::ENOENT)), }; let path = format!("{}/{}", parent_path, name); - if self.rt.block_on(self.do_delete(&path)).is_err() { - return Filesystem::reply_error(in_header.unique, w); + if let Err(err) = self.rt.block_on(self.do_delete(&path)) { + return Filesystem::reply_error(in_header.unique, w, err); } let mut opened_files_map = self.opened_files_map.lock().unwrap(); @@ -483,11 +496,11 @@ impl Filesystem { .map(|f| f.path.clone()) { Some(path) => path, - None => return Filesystem::reply_error(in_header.unique, w), + None => return Filesystem::reply_error(in_header.unique, w, Error::from(libc::ENOENT)), }; - if self.rt.block_on(self.do_release_writer(&path)).is_err() { - return Filesystem::reply_error(in_header.unique, w); + if let Err(err) = self.rt.block_on(self.do_release_writer(&path)) { + return Filesystem::reply_error(in_header.unique, w, err); } Filesystem::reply_ok(None::, None, in_header.unique, w) @@ -496,9 +509,7 @@ impl Filesystem { fn open(&self, in_header: InHeader, mut r: Reader, w: Writer) -> Result { debug!("open: inode={}", in_header.nodeid); - let OpenIn { flags, .. } = r.read_obj().map_err(|e| { - new_vhost_user_fs_error("failed to decode protocol messages", Some(e.into())) - })?; + let OpenIn { flags, .. } = r.read_obj()?; let path = match self .opened_files @@ -506,12 +517,12 @@ impl Filesystem { .map(|f| f.path.clone()) { Some(path) => path, - None => return Filesystem::reply_error(in_header.unique, w), + None => return Filesystem::reply_error(in_header.unique, w, Error::from(libc::ENOENT)), }; match self.rt.block_on(self.do_set_writer(&path, flags)) { Ok(writer) => writer, - Err(_) => return Filesystem::reply_error(in_header.unique, w), + Err(err) => return Filesystem::reply_error(in_header.unique, w, err), }; let out = OpenOut { @@ -527,12 +538,10 @@ impl Filesystem { .map(|f| f.path.clone()) { Some(path) => path, - None => return Filesystem::reply_error(in_header.unique, w), + None => return Filesystem::reply_error(in_header.unique, w, Error::from(libc::ENOENT)), }; - let ReadIn { offset, size, .. } = r.read_obj().map_err(|e| { - new_vhost_user_fs_error("failed to decode protocol messages", Some(e.into())) - })?; + let ReadIn { offset, size, .. } = r.read_obj()?; debug!( "read: inode={} offset={} size={}", @@ -541,15 +550,13 @@ impl Filesystem { let data = match self.rt.block_on(self.do_read(&path, offset)) { Ok(data) => data, - Err(_) => return Filesystem::reply_error(in_header.unique, w), + Err(err) => return Filesystem::reply_error(in_header.unique, w, err), }; let len = data.len(); let buffer = BufferWrapper::new(data); - let mut data_writer = w.split_at(size_of::()).unwrap(); - data_writer.write_from_at(&buffer, len).map_err(|e| { - new_vhost_user_fs_error("failed to encode protocol messages", Some(e.into())) - })?; + let mut data_writer = w.split_at(size_of::()); + data_writer.write_from_at(&buffer, len); let out = OutHeader { len: (size_of::() + len) as u32, @@ -557,7 +564,10 @@ impl Filesystem { unique: in_header.unique, }; w.write_all(out.as_slice()).map_err(|e| { - new_vhost_user_fs_error("failed to encode protocol messages", Some(e.into())) + new_unexpected_error( + "unexpected error occured while replying to frontend", + Some(e.into()), + ) })?; Ok(out.len as usize) } @@ -571,22 +581,18 @@ impl Filesystem { .map(|f| f.path.clone()) { Some(path) => path, - None => return Filesystem::reply_error(in_header.unique, w), + None => return Filesystem::reply_error(in_header.unique, w, Error::from(libc::ENOENT)), }; - let WriteIn { offset, size, .. } = r.read_obj().map_err(|e| { - new_vhost_user_fs_error("failed to decode protocol messages", Some(e.into())) - })?; + let WriteIn { offset, size, .. } = r.read_obj()?; let buffer = BufferWrapper::new(Buffer::new()); - r.read_to_at(&buffer, size as usize).map_err(|e| { - new_vhost_user_fs_error("failed to decode protocol messages", Some(e.into())) - })?; + r.read_to_at(&buffer, size as usize); let buffer = buffer.get_buffer(); match self.rt.block_on(self.do_write(&path, offset, buffer)) { Ok(writer) => writer, - Err(_) => return Filesystem::reply_error(in_header.unique, w), + Err(err) => return Filesystem::reply_error(in_header.unique, w, err), }; let out = WriteOut { @@ -599,7 +605,7 @@ impl Filesystem { impl Filesystem { async fn do_get_metadata(&self, path: &str) -> Result { - let metadata = self.core.stat(path).await.map_err(opendal_error2error)?; + let metadata = self.core.stat(path).await.map_err(|err| Error::from(err))?; let mut attr = opendal_metadata2opened_file(path, &metadata, self.uid, self.gid); attr.metadata.size = metadata.content_length(); let mut opened_files_map = self.opened_files_map.lock().unwrap(); @@ -628,12 +634,12 @@ impl Filesystem { .writer_with(path) .append(is_append) .await - .map_err(opendal_error2error)?; + .map_err(|err| Error::from(err))?; let written = if is_append { self.core .stat(path) .await - .map_err(opendal_error2error)? + .map_err(|err| Error::from(err))? .content_length() } else { 0 @@ -655,14 +661,17 @@ impl Filesystem { .writer .close() .await - .map_err(opendal_error2error)?; + .map_err(|err| Error::from(err))?; opened_file_writer.remove(path); Ok(()) } async fn do_delete(&self, path: &str) -> Result<()> { - self.core.delete(path).await.map_err(opendal_error2error)?; + self.core + .delete(path) + .await + .map_err(|err| Error::from(err))?; Ok(()) } @@ -673,7 +682,7 @@ impl Filesystem { .read_with(path) .range(offset..) .await - .map_err(opendal_error2error)?; + .map_err(|err| Error::from(err))?; Ok(data) } @@ -691,7 +700,7 @@ impl Filesystem { .writer .write_from(data) .await - .map_err(opendal_error2error)?; + .map_err(|err| Error::from(err))?; inner_writer.written += len as u64; Ok(len) diff --git a/integrations/virtiofs/src/filesystem_message.rs b/integrations/virtiofs/src/filesystem_message.rs index 50562aa7014d..6af2be6e9d34 100644 --- a/integrations/virtiofs/src/filesystem_message.rs +++ b/integrations/virtiofs/src/filesystem_message.rs @@ -56,7 +56,7 @@ impl TryFrom for Opcode { 26 => Ok(Opcode::Init), 35 => Ok(Opcode::Create), 38 => Ok(Opcode::Destroy), - _ => Err(new_vhost_user_fs_error("failed to decode opcode", None)), + _ => Err(new_unexpected_error("unsupported opcode", None)), } } } diff --git a/integrations/virtiofs/src/virtiofs.rs b/integrations/virtiofs/src/virtiofs.rs index fbacaf1f672e..566ad86c1c24 100644 --- a/integrations/virtiofs/src/virtiofs.rs +++ b/integrations/virtiofs/src/virtiofs.rs @@ -19,7 +19,7 @@ use std::io; use std::sync::Arc; use std::sync::RwLock; -use log::warn; +use log::error; use opendal::Operator; use vhost::vhost_user::message::VhostUserProtocolFeatures; use vhost::vhost_user::message::VhostUserVirtioFeatures; @@ -69,108 +69,86 @@ struct VhostUserFsThread { } impl VhostUserFsThread { - fn new(core: Filesystem) -> Result { - let event_fd = EventFd::new(libc::EFD_NONBLOCK).map_err(|err| { - new_unexpected_error("failed to create kill eventfd", Some(err.into())) - })?; - Ok(VhostUserFsThread { + fn new(core: Filesystem) -> VhostUserFsThread { + let event_fd = EventFd::new(libc::EFD_NONBLOCK).unwrap(); + VhostUserFsThread { core, mem: None, vu_req: None, event_idx: false, kill_event_fd: event_fd, - }) - } - - /// This is used when the backend has processed a request and needs to notify the frontend. - fn return_descriptor( - vring_state: &mut VringState, - head_index: u16, - event_idx: bool, - len: usize, - ) { - if vring_state.add_used(head_index, len as u32).is_err() { - warn!("Failed to add used to used queue."); - }; - // Check if the used queue needs to be signaled. - if event_idx { - match vring_state.needs_notification() { - Ok(needs_notification) => { - if needs_notification && vring_state.signal_used_queue().is_err() { - warn!("Failed to signal used queue."); - } - } - Err(_) => { - if vring_state.signal_used_queue().is_err() { - warn!("Failed to signal used queue."); - }; - } - } - } else if vring_state.signal_used_queue().is_err() { - warn!("Failed to signal used queue."); } } /// Process filesystem requests one at a time in a serialized manner. - fn handle_event_serial(&self, device_event: u16, vrings: &[VringMutex]) -> Result<()> { - let mut vring_state = match device_event { - HIPRIO_QUEUE_EVENT => vrings[0].get_mut(), - REQ_QUEUE_EVENT => vrings[1].get_mut(), - _ => return Err(new_unexpected_error("failed to handle unknown event", None)), + fn handle_event_serial(&self, device_event: u16, vrings: &[VringMutex]) { + let mut vring_state = if device_event == HIPRIO_QUEUE_EVENT { + vrings[0].get_mut() + } else { + vrings[1].get_mut() }; if self.event_idx { // If EVENT_IDX is enabled, we could keep calling process_queue() // until it stops finding new request on the queue. loop { - if vring_state.disable_notification().is_err() { - warn!("Failed to disable used queue notification."); - } - self.process_queue_serial(&mut vring_state)?; - if let Ok(has_more) = vring_state.enable_notification() { - if !has_more { - break; - } - } else { - warn!("Failed to enable used queue notification."); + let _ = vring_state.disable_notification(); + self.handle_event_once(&mut vring_state); + if vring_state.enable_notification().unwrap_or(true) { + break; } } } else { // Without EVENT_IDX, a single call is enough. - self.process_queue_serial(&mut vring_state)?; + self.handle_event_once(&mut vring_state); } - Ok(()) } /// Forwards filesystem messages to specific functions and /// returns the filesystem request execution result. - fn process_queue_serial(&self, vring_state: &mut VringState) -> Result { - let mut used_any = false; - let mem = match &self.mem { - Some(m) => m.memory(), - None => return Err(new_unexpected_error("no memory configured", None)), - }; + fn handle_event_once(&self, vring_state: &mut VringState) { + let mem = self.mem.as_ref().unwrap().memory(); let avail_chains: Vec>> = vring_state .get_queue_mut() .iter(mem.clone()) - .map_err(|_| new_unexpected_error("iterating through the queue failed", None))? + .unwrap() .collect(); for chain in avail_chains { - used_any = true; let head_index = chain.head_index(); - let reader = Reader::new(&mem, chain.clone()) - .map_err(|_| new_unexpected_error("creating a queue reader failed", None)) - .unwrap(); - let writer = Writer::new(&mem, chain.clone()) - .map_err(|_| new_unexpected_error("creating a queue writer failed", None)) - .unwrap(); - let len = self - .core - .handle_message(reader, writer) - .map_err(|_| new_unexpected_error("processing a queue request failed", None)) - .unwrap(); - VhostUserFsThread::return_descriptor(vring_state, head_index, self.event_idx, len); + let reader = match Reader::new(&mem, chain.clone()) { + Ok(reader) => reader, + Err(err) => { + error!( + "[virtiofs] error occurred while handling message from frontend: {}", + err + ); + continue; + } + }; + let writer = match Writer::new(&mem, chain.clone()) { + Ok(writer) => writer, + Err(err) => { + error!( + "[virtiofs] error occurred while handling message from frontend: {}", + err + ); + continue; + } + }; + match self.core.handle_message(reader, writer) { + Ok(len) => { + vring_state.add_used(head_index, len as u32).unwrap(); + if !self.event_idx || !vring_state.needs_notification().unwrap_or(false) { + let _ = vring_state.signal_used_queue(); + } + } + Err(err) => { + error!( + "[virtiofs] error occurred while handling message from frontend: {}", + err + ); + } + } } - Ok(used_any) } } @@ -181,20 +159,13 @@ struct VhostUserFsBackend { } impl VhostUserFsBackend { - fn new(core: Filesystem) -> Result { - let thread = RwLock::new(VhostUserFsThread::new(core)?); - Ok(VhostUserFsBackend { thread }) + fn new(core: Filesystem) -> VhostUserFsBackend { + let thread = RwLock::new(VhostUserFsThread::new(core)); + VhostUserFsBackend { thread } } - fn kill(&self) -> Result<()> { - self.thread - .read() - .unwrap() - .kill_event_fd - .write(1) - .map_err(|err| { - new_unexpected_error("failed to write to kill eventfd", Some(err.into())) - }) + fn kill(&self) { + self.thread.read().unwrap().kill_event_fd.write(1).unwrap() } } @@ -267,17 +238,14 @@ impl VhostUserBackend for VhostUserFsBackend { vrings: &[Self::Vring], _thread_id: usize, ) -> io::Result<()> { - if evset != EventSet::IN { - return Err(new_unexpected_error( - "failed to handle event other than input event", - None, - ) - .into()); + if evset != EventSet::IN + || (device_event != HIPRIO_QUEUE_EVENT && device_event != REQ_QUEUE_EVENT) + { + return Err(new_unexpected_error("failed to handle unknown event", None).into()); } let thread = self.thread.read().unwrap(); - thread - .handle_event_serial(device_event, vrings) - .map_err(|err| err.into()) + thread.handle_event_serial(device_event, vrings); + Ok(()) } } @@ -291,39 +259,30 @@ pub struct VirtioFs { } impl VirtioFs { - pub fn new(core: Operator, socket_path: &str) -> Result { + pub fn new(core: Operator, socket_path: &str) -> VirtioFs { let filesystem_core = Filesystem::new(core); - let filesystem_backend = Arc::new(VhostUserFsBackend::new(filesystem_core).unwrap()); - Ok(VirtioFs { + let filesystem_backend = Arc::new(VhostUserFsBackend::new(filesystem_core)); + VirtioFs { socket_path: socket_path.to_string(), filesystem_backend, - }) + } } // Run the virtiofs service. - pub fn run(&self) -> Result<()> { - let listener = Listener::new(&self.socket_path, true) - .map_err(|_| new_unexpected_error("failed to create listener", None))?; + pub fn run(&self) { + let listener = Listener::new(&self.socket_path, true).unwrap(); let mut daemon = VhostUserDaemon::new( String::from("virtiofs-backend"), self.filesystem_backend.clone(), GuestMemoryAtomic::new(GuestMemoryMmap::new()), ) .unwrap(); - if daemon.start(listener).is_err() { - return Err(new_unexpected_error("failed to start daemon", None)); - } - if daemon.wait().is_err() { - return Err(new_unexpected_error("failed to wait daemon", None)); - } - Ok(()) + let _ = daemon.start(listener); + let _ = daemon.wait(); } // Kill the virtiofs service. - pub fn kill(&self) -> Result<()> { - if self.filesystem_backend.kill().is_err() { - return Err(new_unexpected_error("failed to kill backend", None)); - } - Ok(()) + pub fn kill(&self) { + self.filesystem_backend.kill(); } } diff --git a/integrations/virtiofs/src/virtiofs_util.rs b/integrations/virtiofs/src/virtiofs_util.rs index 9d390da97532..5ee196d0bd22 100644 --- a/integrations/virtiofs/src/virtiofs_util.rs +++ b/integrations/virtiofs/src/virtiofs_util.rs @@ -46,18 +46,9 @@ struct DescriptorChainConsumer<'a, B> { } impl<'a, B: BitmapSlice> DescriptorChainConsumer<'a, B> { - #[cfg(test)] - fn available_bytes(&self) -> usize { - self.buffers.iter().fold(0, |count, vs| count + vs.len()) - } - - fn bytes_consumed(&self) -> usize { - self.bytes_consumed - } - - fn consume(&mut self, count: usize, f: F) -> Result + fn consume(&mut self, count: usize, f: F) -> usize where - F: FnOnce(&[&VolatileSlice]) -> Result, + F: FnOnce(&[&VolatileSlice]) -> usize, { let mut len = 0; let mut bufs = Vec::with_capacity(self.buffers.len()); @@ -74,16 +65,9 @@ impl<'a, B: BitmapSlice> DescriptorChainConsumer<'a, B> { } } if bufs.is_empty() { - return Ok(0); + return 0; } - let bytes_consumed = f(&bufs)?; - let total_bytes_consumed = - self.bytes_consumed - .checked_add(bytes_consumed) - .ok_or(new_vhost_user_fs_error( - "the combined length of all the buffers in DescriptorChain would overflow", - None, - ))?; + let bytes_consumed = f(&bufs); let mut remain = bytes_consumed; while let Some(vs) = self.buffers.pop_front() { if remain < vs.len() { @@ -92,11 +76,11 @@ impl<'a, B: BitmapSlice> DescriptorChainConsumer<'a, B> { } remain -= vs.len(); } - self.bytes_consumed = total_bytes_consumed; - Ok(bytes_consumed) + self.bytes_consumed += bytes_consumed; + bytes_consumed } - fn split_at(&mut self, offset: usize) -> Result> { + fn split_at(&mut self, offset: usize) -> DescriptorChainConsumer<'a, B> { let mut remain = offset; let pos = self.buffers.iter().position(|vs| { if remain < vs.len() { @@ -109,34 +93,30 @@ impl<'a, B: BitmapSlice> DescriptorChainConsumer<'a, B> { if let Some(at) = pos { let mut other = self.buffers.split_off(at); if remain > 0 { - let front = other.pop_front().expect("empty VecDeque after split"); - self.buffers.push_back( - front - .subslice(0, remain) - .map_err(|_| new_vhost_user_fs_error("volatile memory error", None))?, - ); - other.push_front( - front - .offset(remain) - .map_err(|_| new_vhost_user_fs_error("volatile memory error", None))?, - ); + let front = other.pop_front().unwrap(); + self.buffers.push_back(front.subslice(0, remain).unwrap()); + other.push_front(front.offset(remain).unwrap()); } - Ok(DescriptorChainConsumer { + DescriptorChainConsumer { buffers: other, bytes_consumed: 0, - }) - } else if remain == 0 { - Ok(DescriptorChainConsumer { + } + } else { + DescriptorChainConsumer { buffers: VecDeque::new(), bytes_consumed: 0, - }) - } else { - Err(new_vhost_user_fs_error( - "DescriptorChain split is out of bounds", - None, - )) + } } } + + fn bytes_consumed(&self) -> usize { + self.bytes_consumed + } + + #[cfg(test)] + fn available_bytes(&self) -> usize { + self.buffers.iter().fold(0, |count, vs| count + vs.len()) + } } /// Provides a high-level interface for reading data in shared memory sequences. @@ -153,29 +133,18 @@ impl<'a, B: Bitmap + BitmapSlice + 'static> Reader<'a, B> { M: Deref, M::Target: GuestMemory + Sized, { - let mut len: usize = 0; let buffers = desc_chain .readable() .map(|desc| { - len = len - .checked_add(desc.len() as usize) - .ok_or(new_vhost_user_fs_error( - "the combined length of all the buffers in DescriptorChain would overflow", - None, - ))?; - let region = mem.find_region(desc.addr()).ok_or(new_vhost_user_fs_error( - "no memory region for this address range", - None, - ))?; - let offset = desc - .addr() - .checked_sub(region.start_addr().raw_value()) - .unwrap(); + let region = mem + .find_region(desc.addr()) + .ok_or(new_unexpected_error("cannot get volatile memory", None))?; + let offset = desc.addr().raw_value() - region.start_addr().raw_value(); region .deref() - .get_slice(offset.raw_value() as usize, desc.len() as usize) + .get_slice(offset as usize, desc.len() as usize) .map_err(|err| { - new_vhost_user_fs_error("volatile memory error", Some(err.into())) + new_unexpected_error("cannot get volatile memory", Some(err.into())) }) }) .collect::>>>()?; @@ -187,22 +156,18 @@ impl<'a, B: Bitmap + BitmapSlice + 'static> Reader<'a, B> { }) } - pub fn read_obj(&mut self) -> io::Result { + pub fn read_obj(&mut self) -> Result { let mut obj = MaybeUninit::::uninit(); let buf = unsafe { std::slice::from_raw_parts_mut(obj.as_mut_ptr() as *mut u8, size_of::()) }; - self.read_exact(buf)?; + self.read_exact(buf) + .map_err(|e| new_unexpected_error("failed to read object", Some(e.into())))?; Ok(unsafe { obj.assume_init() }) } - pub fn read_to_at>( - &mut self, - dst: F, - count: usize, - ) -> io::Result { + pub fn read_to_at>(&mut self, dst: F, count: usize) -> usize { self.buffer .consume(count, |bufs| dst.write_vectored_at_volatile(bufs)) - .map_err(|err| err.into()) } #[cfg(test)] @@ -218,21 +183,20 @@ impl<'a, B: Bitmap + BitmapSlice + 'static> Reader<'a, B> { impl io::Read for Reader<'_, B> { fn read(&mut self, buf: &mut [u8]) -> io::Result { - self.buffer - .consume(buf.len(), |bufs| { - let mut rem = buf; - let mut total = 0; - for vs in bufs { - let copy_len = min(rem.len(), vs.len()); - unsafe { - copy_nonoverlapping(vs.ptr_guard().as_ptr(), rem.as_mut_ptr(), copy_len); - } - rem = &mut rem[copy_len..]; - total += copy_len; + let total = self.buffer.consume(buf.len(), |bufs| { + let mut rem = buf; + let mut total = 0; + for vs in bufs { + let copy_len = min(rem.len(), vs.len()); + unsafe { + copy_nonoverlapping(vs.ptr_guard().as_ptr(), rem.as_mut_ptr(), copy_len); } - Ok(total) - }) - .map_err(|err| err.into()) + rem = &mut rem[copy_len..]; + total += copy_len; + } + total + }); + Ok(total) } } @@ -250,29 +214,18 @@ impl<'a, B: Bitmap + BitmapSlice + 'static> Writer<'a, B> { M: Deref, M::Target: GuestMemory + Sized, { - let mut len: usize = 0; let buffers = desc_chain .writable() .map(|desc| { - len = len - .checked_add(desc.len() as usize) - .ok_or(new_vhost_user_fs_error( - "the combined length of all the buffers in DescriptorChain would overflow", - None, - ))?; - let region = mem.find_region(desc.addr()).ok_or(new_vhost_user_fs_error( - "no memory region for this address range", - None, - ))?; - let offset = desc - .addr() - .checked_sub(region.start_addr().raw_value()) - .unwrap(); + let region = mem + .find_region(desc.addr()) + .ok_or(new_unexpected_error("cannot get volatile memory", None))?; + let offset = desc.addr().raw_value() - region.start_addr().raw_value(); region .deref() - .get_slice(offset.raw_value() as usize, desc.len() as usize) + .get_slice(offset as usize, desc.len() as usize) .map_err(|err| { - new_vhost_user_fs_error("volatile memory error", Some(err.into())) + new_unexpected_error("cannot get volatile memory", Some(err.into())) }) }) .collect::>>>()?; @@ -284,48 +237,44 @@ impl<'a, B: Bitmap + BitmapSlice + 'static> Writer<'a, B> { }) } - pub fn split_at(&mut self, offset: usize) -> Result> { - self.buffer.split_at(offset).map(|buffer| Writer { buffer }) + pub fn split_at(&mut self, offset: usize) -> Writer<'a, B> { + Writer { + buffer: self.buffer.split_at(offset), + } } - pub fn write_from_at>( - &mut self, - src: F, - count: usize, - ) -> io::Result { + pub fn write_from_at>(&mut self, src: F, count: usize) -> usize { self.buffer .consume(count, |bufs| src.read_vectored_at_volatile(bufs)) - .map_err(|err| err.into()) + } + + pub fn bytes_written(&self) -> usize { + self.buffer.bytes_consumed() } #[cfg(test)] pub fn available_bytes(&self) -> usize { self.buffer.available_bytes() } - - pub fn bytes_written(&self) -> usize { - self.buffer.bytes_consumed() - } } impl Write for Writer<'_, B> { fn write(&mut self, buf: &[u8]) -> io::Result { - self.buffer - .consume(buf.len(), |bufs| { - let mut rem = buf; - let mut total = 0; - for vs in bufs { - let copy_len = min(rem.len(), vs.len()); - unsafe { - copy_nonoverlapping(rem.as_ptr(), vs.ptr_guard_mut().as_ptr(), copy_len); - } - vs.bitmap().mark_dirty(0, copy_len); - rem = &rem[copy_len..]; - total += copy_len; + let total = self.buffer.consume(buf.len(), |bufs| { + let mut rem = buf; + let mut total = 0; + for vs in bufs { + let copy_len = min(rem.len(), vs.len()); + unsafe { + copy_nonoverlapping(rem.as_ptr(), vs.ptr_guard_mut().as_ptr(), copy_len); } - Ok(total) - }) - .map_err(|err| err.into()) + vs.bitmap().mark_dirty(0, copy_len); + rem = &rem[copy_len..]; + total += copy_len; + } + total + }); + Ok(total) } fn flush(&mut self) -> io::Result<()> {