Skip to content

Commit

Permalink
revise handling of the serial device
Browse files Browse the repository at this point in the history
- add Wakers for async handling of the input stream
  • Loading branch information
stlankes committed Dec 19, 2024
1 parent 8059376 commit 8de279b
Show file tree
Hide file tree
Showing 9 changed files with 137 additions and 55 deletions.
8 changes: 8 additions & 0 deletions src/arch/aarch64/kernel/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,14 @@ impl Console {
self.serial_port.write_byte(*byte);
}
}

pub fn read(&mut self) -> Option<u8> {
None
}

pub fn is_empty(&self) -> bool {
true
}
}

impl Default for Console {
Expand Down
8 changes: 8 additions & 0 deletions src/arch/riscv64/kernel/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,14 @@ impl Console {
sbi_rt::console_write_byte(*byte);
}
}

pub fn read(&mut self) -> Option<u8> {
None
}

pub fn is_empty(&self) -> bool {
true
}
}

impl Default for Console {
Expand Down
10 changes: 9 additions & 1 deletion src/arch/x86_64/kernel/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use core::arch::asm;
use core::num::NonZeroU64;
use core::ptr;
use core::sync::atomic::{AtomicPtr, AtomicU32, Ordering};
use core::task::Waker;

use hermit_entry::boot_info::{PlatformInfo, RawBootInfo};
use memory_addresses::{PhysAddr, VirtAddr};
Expand Down Expand Up @@ -68,10 +69,17 @@ impl Console {
self.serial_port.buffer_input();
}

#[cfg(feature = "shell")]
pub fn read(&mut self) -> Option<u8> {
self.serial_port.read()
}

pub fn is_empty(&self) -> bool {
self.serial_port.is_empty()
}

pub fn register_waker(&mut self, waker: &Waker) {
self.serial_port.register_waker(waker);
}
}

impl Default for Console {
Expand Down
31 changes: 20 additions & 11 deletions src/arch/x86_64/kernel/serial.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
#[cfg(feature = "shell")]
use alloc::collections::VecDeque;
use core::task::Waker;

use x86_64::instructions::port::Port;

use crate::arch::x86_64::kernel::apic;
use crate::arch::x86_64::kernel::core_local::increment_irq_counter;
use crate::arch::x86_64::kernel::interrupts::{self, IDT};
use crate::executor::WakerRegistration;

const SERIAL_IRQ: u8 = 36;

Expand All @@ -16,8 +17,8 @@ enum SerialInner {

pub struct SerialPort {
inner: SerialInner,
#[cfg(feature = "shell")]
buffer: VecDeque<u8>,
waker: WakerRegistration,
}

impl SerialPort {
Expand All @@ -26,37 +27,44 @@ impl SerialPort {
let serial = Port::new(base);
Self {
inner: SerialInner::Uhyve(serial),
#[cfg(feature = "shell")]
buffer: VecDeque::new(),
waker: WakerRegistration::new(),
}
} else {
let mut serial = unsafe { uart_16550::SerialPort::new(base) };
serial.init();
Self {
inner: SerialInner::Uart(serial),
#[cfg(feature = "shell")]
buffer: VecDeque::new(),
waker: WakerRegistration::new(),
}
}
}

pub fn buffer_input(&mut self) {
if let SerialInner::Uart(s) = &mut self.inner {
let c = unsafe { char::from_u32_unchecked(s.receive().into()) };
#[cfg(not(feature = "shell"))]
if !c.is_ascii_control() {
print!("{}", c);
let c = s.receive();
if c == b'\r' {
self.buffer.push_back(b'\n');
} else {
self.buffer.push_back(c);
}
#[cfg(feature = "shell")]
self.buffer.push_back(c.try_into().unwrap());
self.waker.wake();
}
}

#[cfg(feature = "shell")]
pub fn register_waker(&mut self, waker: &Waker) {
self.waker.register(waker);
}

pub fn read(&mut self) -> Option<u8> {
self.buffer.pop_front()
}

pub fn is_empty(&self) -> bool {
self.buffer.is_empty()
}

pub fn send(&mut self, buf: &[u8]) {
match &mut self.inner {
SerialInner::Uhyve(s) => {
Expand All @@ -78,6 +86,7 @@ impl SerialPort {
extern "x86-interrupt" fn serial_interrupt(_stack_frame: crate::interrupts::ExceptionStackFrame) {
crate::console::CONSOLE.lock().0.buffer_input();
increment_irq_counter(SERIAL_IRQ);
crate::executor::run();

apic::eoi();
}
Expand Down
2 changes: 1 addition & 1 deletion src/arch/x86_64/mm/virtualmem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pub fn init() {
} else {
PageRange::new(
mm::kernel_end_address().as_usize(),
kernel_heap_end().as_usize() + 1
kernel_heap_end().as_usize() + 1,
)
.unwrap()
};
Expand Down
18 changes: 13 additions & 5 deletions src/console.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use core::task::Waker;
use core::{fmt, mem};

use hermit_sync::{InterruptTicketMutex, Lazy};

use crate::arch;

pub struct Console(pub arch::kernel::Console);
pub(crate) struct Console(pub arch::kernel::Console);

impl Console {
fn new() -> Self {
Expand All @@ -15,10 +16,17 @@ impl Console {
self.0.write(buf);
}

#[cfg(feature = "shell")]
pub fn read(&mut self) -> Option<u8> {
self.0.read()
}

pub fn is_empty(&self) -> bool {
self.0.is_empty()
}

pub fn register_waker(&mut self, waker: &Waker) {
self.0.register_waker(waker);
}
}

/// A collection of methods that are required to format
Expand All @@ -35,17 +43,17 @@ impl fmt::Write for Console {
}
}

pub static CONSOLE: Lazy<InterruptTicketMutex<Console>> =
pub(crate) static CONSOLE: Lazy<InterruptTicketMutex<Console>> =
Lazy::new(|| InterruptTicketMutex::new(Console::new()));

#[doc(hidden)]
pub fn _print(args: fmt::Arguments<'_>) {
pub(crate) fn _print(args: fmt::Arguments<'_>) {
use fmt::Write;
CONSOLE.lock().write_fmt(args).unwrap();
}

#[doc(hidden)]
pub fn _panic_print(args: fmt::Arguments<'_>) {
pub(crate) fn _panic_print(args: fmt::Arguments<'_>) {
use fmt::Write;
let mut console = unsafe { CONSOLE.make_guard_unchecked() };
console.write_fmt(args).ok();
Expand Down
34 changes: 34 additions & 0 deletions src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,40 @@ use crate::io;
use crate::scheduler::PerCoreSchedulerExt;
use crate::synch::futex::*;

/// WakerRegistration is derived from smoltcp's
/// implementation.
#[derive(Debug)]
pub(crate) struct WakerRegistration {
waker: Option<Waker>,
}

impl WakerRegistration {
pub const fn new() -> Self {
Self { waker: None }
}

/// Register a waker. Overwrites the previous waker, if any.
pub fn register(&mut self, w: &Waker) {
match self.waker {
// Optimization: If both the old and new Wakers wake the same task, we can simply
// keep the old waker, skipping the clone.
Some(ref w2) if (w2.will_wake(w)) => {}
// In all other cases
// - we have no waker registered
// - we have a waker registered but it's for a different task.
// then clone the new waker and store it
_ => self.waker = Some(w.clone()),
}
}

/// Wake the registered waker, if any.
pub fn wake(&mut self) {
if let Some(w) = self.waker.take() {
w.wake();
}
}
}

struct TaskNotify {
/// Futex to wakeup a single task
futex: AtomicU32,
Expand Down
38 changes: 2 additions & 36 deletions src/executor/vsock.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use alloc::collections::BTreeMap;
use alloc::vec::Vec;
use core::future;
use core::task::{Poll, Waker};
use core::task::Poll;

use hermit_sync::InterruptTicketMutex;
use virtio::vsock::{Hdr, Op, Type};
Expand All @@ -11,7 +11,7 @@ use virtio::{le16, le32};
use crate::arch::kernel::mmio as hardware;
#[cfg(feature = "pci")]
use crate::drivers::pci as hardware;
use crate::executor::spawn;
use crate::executor::{spawn, WakerRegistration};
use crate::io;
use crate::io::Error::EADDRINUSE;

Expand All @@ -27,40 +27,6 @@ pub(crate) enum VsockState {
Shutdown,
}

/// WakerRegistration is derived from smoltcp's
/// implementation.
#[derive(Debug)]
pub(crate) struct WakerRegistration {
waker: Option<Waker>,
}

impl WakerRegistration {
pub const fn new() -> Self {
Self { waker: None }
}

/// Register a waker. Overwrites the previous waker, if any.
pub fn register(&mut self, w: &Waker) {
match self.waker {
// Optimization: If both the old and new Wakers wake the same task, we can simply
// keep the old waker, skipping the clone.
Some(ref w2) if (w2.will_wake(w)) => {}
// In all other cases
// - we have no waker registered
// - we have a waker registered but it's for a different task.
// then clone the new waker and store it
_ => self.waker = Some(w.clone()),
}
}

/// Wake the registered waker, if any.
pub fn wake(&mut self) {
if let Some(w) = self.waker.take() {
w.wake();
}
}
}

pub(crate) const RAW_SOCKET_BUFFER_SIZE: usize = 256 * 1024;

#[derive(Debug)]
Expand Down
43 changes: 42 additions & 1 deletion src/fd/stdio.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
use alloc::boxed::Box;
use core::future;
#[cfg(any(target_arch = "x86_64", target_arch = "aarch64"))]
use core::ptr;
use core::task::Poll;

use async_trait::async_trait;
#[cfg(any(target_arch = "x86_64", target_arch = "aarch64"))]
use memory_addresses::VirtAddr;
#[cfg(target_arch = "x86_64")]
use x86::io::*;
use zerocopy::IntoBytes;

#[cfg(any(target_arch = "x86_64", target_arch = "aarch64"))]
use crate::arch::mm::paging;
Expand Down Expand Up @@ -70,7 +73,45 @@ fn uhyve_send<T>(_port: u16, _data: &mut T) {
#[derive(Debug)]
pub struct GenericStdin;

impl ObjectInterface for GenericStdin {}
#[async_trait]
impl ObjectInterface for GenericStdin {
async fn poll(&self, event: PollEvent) -> io::Result<PollEvent> {
let available = if CONSOLE.lock().is_empty() {
PollEvent::empty()
} else {
PollEvent::POLLIN | PollEvent::POLLRDNORM | PollEvent::POLLRDBAND
};

Ok(event & available)
}

async fn read(&self, buf: &mut [u8]) -> io::Result<usize> {
future::poll_fn(|cx| {
let mut read_bytes = 0;
let mut guard = CONSOLE.lock();

while let Some(byte) = guard.read() {
let c = unsafe { char::from_u32_unchecked(byte.into()) };
guard.write(c.as_bytes());

buf[read_bytes] = byte;
read_bytes += 1;

if read_bytes >= buf.len() {
return Poll::Ready(Ok(read_bytes));
}
}

if read_bytes > 0 {
Poll::Ready(Ok(read_bytes))
} else {
guard.register_waker(cx.waker());
Poll::Pending
}
})
.await
}
}

impl GenericStdin {
pub const fn new() -> Self {
Expand Down

0 comments on commit 8de279b

Please sign in to comment.