Skip to content

Commit

Permalink
feat(ovfs): export VirtioFs struct (#4983)
Browse files Browse the repository at this point in the history
* feat: export VirtioFs struct

* feat: polish code
  • Loading branch information
zjregee authored Aug 8, 2024
1 parent 5727752 commit 690e494
Show file tree
Hide file tree
Showing 5 changed files with 156 additions and 61 deletions.
2 changes: 2 additions & 0 deletions integrations/virtiofs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ version = "0.0.0"
[dependencies]
anyhow = { version = "1.0.86", features = ["std"] }
libc = "0.2.139"
log = "0.4.22"
opendal = { version = "0.48.0", path = "../../core" }
snafu = "0.8.3"
vhost = "0.11.0"
vhost-user-backend = "0.14.0"
Expand Down
12 changes: 8 additions & 4 deletions integrations/virtiofs/src/filesystem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
use std::io::Write;
use std::mem::size_of;

use opendal::Operator;
use vm_memory::ByteValued;

use crate::error::*;
Expand All @@ -37,12 +38,15 @@ const MAX_BUFFER_SIZE: u32 = 1 << 20;

/// Filesystem is a filesystem implementation with opendal backend,
/// and will decode and process messages from VMs.
pub struct Filesystem {}
pub struct Filesystem {
// FIXME: #[allow(dead_code)] here should be removed in the future.
#[allow(dead_code)]
core: Operator,
}

#[allow(dead_code)]
impl Filesystem {
pub fn new() -> Filesystem {
Filesystem {}
pub fn new(core: Operator) -> Filesystem {
Filesystem { core }
}

pub fn handle_message(&self, mut r: Reader, w: Writer) -> Result<usize> {
Expand Down
2 changes: 2 additions & 0 deletions integrations/virtiofs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,5 @@ mod filesystem;
mod filesystem_message;
mod virtiofs;
mod virtiofs_util;

pub use virtiofs::VirtioFs;
199 changes: 144 additions & 55 deletions integrations/virtiofs/src/virtiofs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,33 +16,41 @@
// under the License.

use std::io;
use std::sync::Arc;
use std::sync::RwLock;

use log::warn;
use opendal::Operator;
use vhost::vhost_user::message::VhostUserProtocolFeatures;
use vhost::vhost_user::message::VhostUserVirtioFeatures;
use vhost::vhost_user::Backend;
use vhost::vhost_user::Listener;
use vhost_user_backend::VhostUserBackend;
use vhost_user_backend::VhostUserDaemon;
use vhost_user_backend::VringMutex;
use vhost_user_backend::VringState;
use vhost_user_backend::VringT;
use virtio_bindings::bindings::virtio_config::VIRTIO_F_VERSION_1;
use virtio_bindings::bindings::virtio_ring::VIRTIO_RING_F_EVENT_IDX;
use virtio_bindings::bindings::virtio_ring::VIRTIO_RING_F_INDIRECT_DESC;
use vm_memory::ByteValued;
use virtio_queue::DescriptorChain;
use virtio_queue::QueueOwnedT;
use vm_memory::GuestAddressSpace;
use vm_memory::GuestMemoryAtomic;
use vm_memory::GuestMemoryLoadGuard;
use vm_memory::GuestMemoryMmap;
use vm_memory::Le32;
use vmm_sys_util::epoll::EventSet;
use vmm_sys_util::eventfd::EventFd;

use crate::error::*;
use crate::filesystem::Filesystem;
use crate::virtiofs_util::Reader;
use crate::virtiofs_util::Writer;

/// Marks an event from the high priority queue.
const HIPRIO_QUEUE_EVENT: u16 = 0;
/// Marks an event from the request queue.
const REQ_QUEUE_EVENT: u16 = 1;
/// The maximum number of bytes in VirtioFsConfig tag field.
const MAX_TAG_LEN: usize = 36;
/// The maximum queue size supported.
const QUEUE_SIZE: usize = 32768;
/// The number of request queues supported.
Expand All @@ -53,25 +61,56 @@ const NUM_QUEUES: usize = REQUEST_QUEUES + 1;

/// VhostUserFsThread represents the actual worker process used to handle file system requests from VMs.
struct VhostUserFsThread {
core: Filesystem,
mem: Option<GuestMemoryAtomic<GuestMemoryMmap>>,
vu_req: Option<Backend>,
event_idx: bool,
kill_event_fd: EventFd,
}

impl VhostUserFsThread {
fn new() -> Result<VhostUserFsThread> {
fn new(core: Filesystem) -> Result<VhostUserFsThread> {
let event_fd = EventFd::new(libc::EFD_NONBLOCK).map_err(|err| {
new_unexpected_error("failed to create kill eventfd", Some(err.into()))
})?;
Ok(VhostUserFsThread {
core,
mem: None,
vu_req: None,
event_idx: false,
kill_event_fd: event_fd,
})
}

/// This is used when the backend has processed a request and needs to notify the frontend.
fn return_descriptor(
vring_state: &mut VringState,
head_index: u16,
event_idx: bool,
len: usize,
) {
if vring_state.add_used(head_index, len as u32).is_err() {
warn!("Failed to add used to used queue.");
};
// Check if the used queue needs to be signaled.
if event_idx {
match vring_state.needs_notification() {
Ok(needs_notification) => {
if needs_notification && vring_state.signal_used_queue().is_err() {
warn!("Failed to signal used queue.");
}
}
Err(_) => {
if vring_state.signal_used_queue().is_err() {
warn!("Failed to signal used queue.");
};
}
}
} else if vring_state.signal_used_queue().is_err() {
warn!("Failed to signal used queue.");
}
}

/// Process filesystem requests one at a time in a serialized manner.
fn handle_event_serial(&self, device_event: u16, vrings: &[VringMutex]) -> Result<()> {
let mut vring_state = match device_event {
Expand All @@ -83,10 +122,16 @@ impl VhostUserFsThread {
// If EVENT_IDX is enabled, we could keep calling process_queue()
// until it stops finding new request on the queue.
loop {
vring_state.disable_notification().unwrap();
if vring_state.disable_notification().is_err() {
warn!("Failed to disable used queue notification.");
}
self.process_queue_serial(&mut vring_state)?;
if !vring_state.enable_notification().unwrap() {
break;
if let Ok(has_more) = vring_state.enable_notification() {
if !has_more {
break;
}
} else {
warn!("Failed to enable used queue notification.");
}
}
} else {
Expand All @@ -98,37 +143,61 @@ impl VhostUserFsThread {

/// Forwards filesystem messages to specific functions and
/// returns the filesystem request execution result.
fn process_queue_serial(&self, _vring_state: &mut VringState) -> Result<bool> {
unimplemented!()
fn process_queue_serial(&self, vring_state: &mut VringState) -> Result<bool> {
let mut used_any = false;
let mem = match &self.mem {
Some(m) => m.memory(),
None => return Err(new_unexpected_error("no memory configured", None)),
};
let avail_chains: Vec<DescriptorChain<GuestMemoryLoadGuard<GuestMemoryMmap>>> = vring_state
.get_queue_mut()
.iter(mem.clone())
.map_err(|_| new_unexpected_error("iterating through the queue failed", None))?
.collect();
for chain in avail_chains {
used_any = true;
let head_index = chain.head_index();
let reader = Reader::new(&mem, chain.clone())
.map_err(|_| new_unexpected_error("creating a queue reader failed", None))
.unwrap();
let writer = Writer::new(&mem, chain.clone())
.map_err(|_| new_unexpected_error("creating a queue writer failed", None))
.unwrap();
let len = self
.core
.handle_message(reader, writer)
.map_err(|_| new_unexpected_error("processing a queue request failed", None))
.unwrap();
VhostUserFsThread::return_descriptor(vring_state, head_index, self.event_idx, len);
}
Ok(used_any)
}
}

/// VhostUserFsBackend is a structure that implements the VhostUserBackend trait
/// and implements concrete services for the vhost user backend server.
pub struct VhostUserFsBackend {
tag: Option<String>,
struct VhostUserFsBackend {
thread: RwLock<VhostUserFsThread>,
}

#[allow(dead_code)]
impl VhostUserFsBackend {
pub fn new(tag: Option<String>) -> Result<VhostUserFsBackend> {
let thread = RwLock::new(VhostUserFsThread::new()?);
Ok(VhostUserFsBackend { thread, tag })
fn new(core: Filesystem) -> Result<VhostUserFsBackend> {
let thread = RwLock::new(VhostUserFsThread::new(core)?);
Ok(VhostUserFsBackend { thread })
}
}

/// VirtioFsConfig will be serialized and used as
/// the return value of get_config function in the VhostUserBackend trait.
#[repr(C)]
#[derive(Clone, Copy)]
struct VirtioFsConfig {
tag: [u8; MAX_TAG_LEN],
num_request_queues: Le32,
fn kill(&self) -> Result<()> {
self.thread
.read()
.unwrap()
.kill_event_fd
.write(1)
.map_err(|err| {
new_unexpected_error("failed to write to kill eventfd", Some(err.into()))
})
}
}

unsafe impl ByteValued for VirtioFsConfig {}

impl VhostUserBackend for VhostUserFsBackend {
type Bitmap = ();
type Vring = VringMutex;
Expand All @@ -155,45 +224,18 @@ impl VhostUserBackend for VhostUserFsBackend {
/// Get available vhost protocol features.
fn protocol_features(&self) -> VhostUserProtocolFeatures {
// Align to the virtiofsd's protocol features here.
let mut protocol_features = VhostUserProtocolFeatures::MQ
VhostUserProtocolFeatures::MQ
| VhostUserProtocolFeatures::BACKEND_REQ
| VhostUserProtocolFeatures::BACKEND_SEND_FD
| VhostUserProtocolFeatures::REPLY_ACK
| VhostUserProtocolFeatures::CONFIGURE_MEM_SLOTS;
if self.tag.is_some() {
protocol_features |= VhostUserProtocolFeatures::CONFIG;
}
protocol_features
| VhostUserProtocolFeatures::CONFIGURE_MEM_SLOTS
}

/// Enable or disabled the virtio EVENT_IDX feature.
fn set_event_idx(&self, enabled: bool) {
self.thread.write().unwrap().event_idx = enabled;
}

/// Get virtio device configuration.
fn get_config(&self, offset: u32, size: u32) -> Vec<u8> {
let tag = self
.tag
.as_ref()
.expect("did not expect read of config if tag is not set.");
let mut fixed_len_tag = [0; MAX_TAG_LEN];
fixed_len_tag[0..tag.len()].copy_from_slice(tag.as_bytes());
let config = VirtioFsConfig {
tag: fixed_len_tag,
num_request_queues: Le32::from(REQUEST_QUEUES as u32),
};
let mut result: Vec<_> = config
.as_slice()
.iter()
.skip(offset as usize)
.take(size as usize)
.copied()
.collect();
result.resize(size as usize, 0);
result
}

/// Update guest memory regions.
fn update_memory(&self, mem: GuestMemoryAtomic<GuestMemoryMmap>) -> io::Result<()> {
self.thread.write().unwrap().mem = Some(mem);
Expand Down Expand Up @@ -238,3 +280,50 @@ impl VhostUserBackend for VhostUserFsBackend {
.map_err(|err| err.into())
}
}

/// VirtioFS is a structure that represents the virtiofs service.
/// It is used to run the virtiofs service with the given operator and socket path.
/// The operator is used to interact with the backend storage system.
/// The socket path is used to communicate with the QEMU and VMs.
pub struct VirtioFs {
socket_path: String,
filesystem_backend: Arc<VhostUserFsBackend>,
}

impl VirtioFs {
pub fn new(core: Operator, socket_path: &str) -> Result<VirtioFs> {
let filesystem_core = Filesystem::new(core);
let filesystem_backend = Arc::new(VhostUserFsBackend::new(filesystem_core).unwrap());
Ok(VirtioFs {
socket_path: socket_path.to_string(),
filesystem_backend,
})
}

// Run the virtiofs service.
pub fn run(&self) -> Result<()> {
let listener = Listener::new(&self.socket_path, true)
.map_err(|_| new_unexpected_error("failed to create listener", None))?;
let mut daemon = VhostUserDaemon::new(
String::from("virtiofs-backend"),
self.filesystem_backend.clone(),
GuestMemoryAtomic::new(GuestMemoryMmap::new()),
)
.unwrap();
if daemon.start(listener).is_err() {
return Err(new_unexpected_error("failed to start daemon", None));
}
if daemon.wait().is_err() {
return Err(new_unexpected_error("failed to wait daemon", None));
}
Ok(())
}

// Kill the virtiofs service.
pub fn kill(&self) -> Result<()> {
if self.filesystem_backend.kill().is_err() {
return Err(new_unexpected_error("failed to kill backend", None));
}
Ok(())
}
}
2 changes: 0 additions & 2 deletions integrations/virtiofs/src/virtiofs_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ pub struct Reader<'a, B = ()> {
buffer: DescriptorChainConsumer<'a, B>,
}

#[allow(dead_code)]
impl<'a, B: Bitmap + BitmapSlice + 'static> Reader<'a, B> {
pub fn new<M>(
mem: &'a GuestMemoryMmap<B>,
Expand Down Expand Up @@ -174,7 +173,6 @@ pub struct Writer<'a, B = ()> {
buffer: DescriptorChainConsumer<'a, B>,
}

#[allow(dead_code)]
impl<'a, B: Bitmap + BitmapSlice + 'static> Writer<'a, B> {
pub fn new<M>(
mem: &'a GuestMemoryMmap<B>,
Expand Down

0 comments on commit 690e494

Please sign in to comment.