Skip to content

Commit

Permalink
virtio-consoe: Flush port tx before exiting the vm
Browse files Browse the repository at this point in the history
We should wait until the output currently in the queue of the guest
is written and only then exit the libkrun process.

This fixes an issue where you would sometimes not get the full output
of a program inside the vm (we would call exit sooner, than we
finished all the writes).

Signed-off-by: Matej Hrica <[email protected]>
  • Loading branch information
mtjhrc committed Jan 31, 2024
1 parent 29b6e8d commit c14f41f
Show file tree
Hide file tree
Showing 6 changed files with 78 additions and 14 deletions.
12 changes: 11 additions & 1 deletion src/devices/src/virtio/console/device.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::virtio::console::port::Port;
use crate::virtio::console::port_queue_mapping::{
num_queues, port_id_to_queue_idx, QueueDirection,
};
use crate::virtio::PortDescription;
use crate::virtio::{PortDescription, VmmExitObserver};

pub(crate) const CONTROL_RXQ_INDEX: usize = 2;
pub(crate) const CONTROL_TXQ_INDEX: usize = 3;
Expand Down Expand Up @@ -360,3 +360,13 @@ impl VirtioDevice for Console {
}
}
}

impl VmmExitObserver for Console {
fn on_vmm_exit(&mut self) {
for port in &mut self.ports {
port.flush();
}

log::trace!("Console on_vmm_exit finished");
}
}
30 changes: 28 additions & 2 deletions src/devices/src/virtio/console/port.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::borrow::Cow;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread::JoinHandle;
use std::{mem, thread};
Expand Down Expand Up @@ -33,6 +34,7 @@ enum PortState {
output: Option<Box<dyn PortOutput + Send>>,
},
Active {
stop: Arc<AtomicBool>,
rx_thread: Option<JoinHandle<()>>,
tx_thread: Option<JoinHandle<()>>,
},
Expand Down Expand Up @@ -126,12 +128,36 @@ impl Port {
thread::spawn(move || process_rx(mem, rx_queue, irq_signaler, input, control, port_id))
});

let tx_thread = output
.map(|output| thread::spawn(move || process_tx(mem, tx_queue, irq_signaler, output)));
let stop = Arc::new(AtomicBool::new(false));
let tx_thread = output.map(|output| {
let stop = stop.clone();
thread::spawn(move || process_tx(mem, tx_queue, irq_signaler, output, stop))
});

self.state = PortState::Active {
stop,
rx_thread,
tx_thread,
}
}

pub fn flush(&mut self) {
if let PortState::Active {
stop,
tx_thread,
rx_thread: _,
} = &mut self.state
{
stop.store(true, Ordering::Release);
if let Some(tx_thread) = mem::take(tx_thread) {
tx_thread.thread().unpark();
if let Err(e) = tx_thread.join() {
log::error!(
"Failed to flush tx for port {port_id}, thread panicked: {e:?}",
port_id = self.port_id
)
}
}
};
}
}
15 changes: 12 additions & 3 deletions src/devices/src/virtio/console/process_tx.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::{io, thread};

use vm_memory::{GuestMemory, GuestMemoryError, GuestMemoryMmap, GuestMemoryRegion};
Expand All @@ -11,9 +13,12 @@ pub(crate) fn process_tx(
mut queue: Queue,
irq: IRQSignaler,
mut output: Box<dyn PortOutput + Send>,
stop: Arc<AtomicBool>,
) {
loop {
let head = pop_head_blocking(&mut queue, &mem, &irq);
let Some(head) = pop_head_blocking(&mut queue, &mem, &irq, &stop) else {
return;
};

let head_index = head.index;
let mut bytes_written = 0;
Expand Down Expand Up @@ -48,13 +53,17 @@ fn pop_head_blocking<'mem>(
queue: &mut Queue,
mem: &'mem GuestMemoryMmap,
irq: &IRQSignaler,
) -> DescriptorChain<'mem> {
stop: &AtomicBool,
) -> Option<DescriptorChain<'mem>> {
loop {
match queue.pop(mem) {
Some(descriptor) => break descriptor,
Some(descriptor) => break Some(descriptor),
None => {
irq.signal_used_queue("tx queue empty, parking");
thread::park();
if stop.load(Ordering::Acquire) {
break None;
}
log::trace!("tx unparked, queue len {}", queue.len(mem))
}
}
Expand Down
11 changes: 11 additions & 0 deletions src/devices/src/virtio/device.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,17 @@ pub trait VirtioDevice: AsAny + Send {
}
}

pub trait VmmExitObserver {
/// Callback to finish processing or cleanup the device resources
fn on_vmm_exit(&mut self) {}
}

impl<F: Fn()> VmmExitObserver for F {
fn on_vmm_exit(&mut self) {
self()
}
}

impl std::fmt::Debug for dyn VirtioDevice {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "VirtioDevice type {}", self.device_type())
Expand Down
7 changes: 6 additions & 1 deletion src/vmm/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -515,12 +515,12 @@ pub fn build_microvm(
let shm_region = None;

let mut vmm = Vmm {
//events_observer: Some(Box::new(SerialStdin::get())),
guest_memory,
arch_memory_info,
kernel_cmdline,
vcpus_handles: Vec::new(),
exit_evt,
exit_observers: Vec::new(),
vm,
mmio_device_manager,
#[cfg(target_arch = "x86_64")]
Expand Down Expand Up @@ -603,6 +603,9 @@ pub fn build_microvm(
vmm.start_vcpus(vcpus)
.map_err(StartMicrovmError::Internal)?;

// Clippy thinks we don't need Arc<Mutex<...
// but we don't want to change the event_manager interface
#[allow(clippy::arc_with_non_send_sync)]
let vmm = Arc::new(Mutex::new(vmm));
event_manager
.add_subscriber(vmm.clone())
Expand Down Expand Up @@ -1096,6 +1099,8 @@ fn attach_console_devices(

let console = Arc::new(Mutex::new(devices::virtio::Console::new(ports).unwrap()));

vmm.exit_observers.push(console.clone());

if let Some(intc) = intc {
console.lock().unwrap().set_intc(intc);
}
Expand Down
17 changes: 10 additions & 7 deletions src/vmm/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use macos::vstate;
use std::fmt::{Display, Formatter};
use std::io;
use std::os::unix::io::AsRawFd;
use std::sync::Mutex;
use std::sync::{Arc, Mutex};
#[cfg(target_os = "linux")]
use std::time::Duration;

Expand All @@ -52,6 +52,7 @@ use crate::vstate::{Vcpu, VcpuHandle, VcpuResponse, Vm};
use arch::ArchMemoryInfo;
use arch::DeviceType;
use arch::InitrdConfig;
use devices::virtio::VmmExitObserver;
use devices::BusDevice;
use kernel::cmdline::Cmdline as KernelCmdline;
use polly::event_manager::{self, EventManager, Subscriber};
Expand Down Expand Up @@ -181,8 +182,6 @@ pub type Result<T> = std::result::Result<T, Error>;

/// Contains the state and associated methods required for the Firecracker VMM.
pub struct Vmm {
//events_observer: Option<Box<dyn VmmEventsObserver>>,

// Guest VM core resources.
guest_memory: GuestMemoryMmap,
arch_memory_info: ArchMemoryInfo,
Expand All @@ -192,6 +191,7 @@ pub struct Vmm {
vcpus_handles: Vec<VcpuHandle>,
exit_evt: EventFd,
vm: Vm,
exit_observers: Vec<Arc<Mutex<dyn VmmExitObserver>>>,

// Guest VM devices.
mmio_device_manager: MMIODeviceManager,
Expand All @@ -213,10 +213,6 @@ impl Vmm {
pub fn start_vcpus(&mut self, mut vcpus: Vec<Vcpu>) -> Result<()> {
let vcpu_count = vcpus.len();

//if let Some(observer) = self.events_observer.as_mut() {
// observer.on_vmm_boot().map_err(Error::VmmObserverInit)?;
//}

Vcpu::register_kick_signal_handler();

self.vcpus_handles.reserve(vcpu_count);
Expand Down Expand Up @@ -336,6 +332,13 @@ impl Vmm {
log::error!("Failed to restore terminal to canonical mode: {e}")
}

for observer in &self.exit_observers {
observer
.lock()
.expect("Poisoned mutex for exit observer")
.on_vmm_exit();
}

// Exit from Firecracker using the provided exit code. Safe because we're terminating
// the process anyway.
unsafe {
Expand Down

0 comments on commit c14f41f

Please sign in to comment.