From 47c51b265a1176e47164c55ea23fa08b7ed4cf3e Mon Sep 17 00:00:00 2001 From: Theo Bulut Date: Fri, 19 Jan 2024 21:29:43 +0100 Subject: [PATCH] Mutex locks for splitted link --- examples/fread.rs | 4 +- src/proactor.rs | 4 -- src/syscore/linux/iouring/iouring.rs | 95 +++++++++++++++----------- src/syscore/linux/iouring/processor.rs | 71 ++++++++++++++----- 4 files changed, 114 insertions(+), 60 deletions(-) diff --git a/examples/fread.rs b/examples/fread.rs index ec70d0a..d57293a 100644 --- a/examples/fread.rs +++ b/examples/fread.rs @@ -1,6 +1,7 @@ use nuclei::*; use std::fs::File; use std::io; +use std::io::{Seek, SeekFrom}; use std::path::PathBuf; use futures::AsyncReadExt; @@ -10,6 +11,7 @@ fn main() -> io::Result<()> { let mut path = PathBuf::from(env!("CARGO_MANIFEST_DIR")); path.push("data"); path.push("quark-gluon-plasma"); + dbg!(&path); let fo = File::open(&path).unwrap(); let mut file = Handle::::new(fo).unwrap(); @@ -19,7 +21,7 @@ fn main() -> io::Result<()> { }); let x = x?; - // println!("Content: {}", x); + println!("Content: {}", x); println!("Length of file is {}", x.len()); Ok(()) diff --git a/src/proactor.rs b/src/proactor.rs index f0de0fc..5c1c91b 100644 --- a/src/proactor.rs +++ b/src/proactor.rs @@ -1,10 +1,8 @@ use std::task::{Context, Poll}; use std::time::Duration; use std::{future::Future, io}; -use std::sync::Mutex; use once_cell::sync::Lazy; -use once_cell::sync::OnceCell; use super::syscore::*; use super::waker::*; @@ -12,8 +10,6 @@ use crate::spawn_blocking; pub use super::handle::*; -static PROACTOR: OnceCell> = OnceCell::new(); - /// /// Concrete proactor instance pub struct Proactor(SysProactor); diff --git a/src/syscore/linux/iouring/iouring.rs b/src/syscore/linux/iouring/iouring.rs index e0a79bd..9df2c0a 100644 --- a/src/syscore/linux/iouring/iouring.rs +++ b/src/syscore/linux/iouring/iouring.rs @@ -29,6 +29,7 @@ use crate::Proactor; use socket2::SockAddr; use std::mem; use std::os::unix::net::SocketAddr as UnixSocketAddr; +use std::sync::{atomic, Mutex}; use rustix_uring::{CompletionQueue, IoUring, SubmissionQueue, Submitter, squeue::Entry as SQEntry, cqueue::Entry as CQEntry}; fn max_len() -> usize { @@ -137,10 +138,10 @@ const MANUAL_TIMEOUT: u64 = -2 as _; const QUEUE_LEN: u32 = 1 << 10; pub struct SysProactor { - sq: TTas>, - cq: TTas>, - sbmt: TTas>, - submitters: TTas>>, + sq: Mutex>, + cq: Mutex>, + sbmt: Mutex>, + submitters: Mutex>>, submitter_id: AtomicU64, waker: AtomicBool, } @@ -157,14 +158,20 @@ impl SysProactor { pub(crate) fn new() -> io::Result { unsafe { // nodrop? - IO_URING = Some(IoUring::new(QUEUE_LEN).expect("nuclei: uring can't be initialized")); + let ring = IoUring::builder() + .build(QUEUE_LEN) + .expect("nuclei: uring can't be initialized"); + + IO_URING = Some(ring); + + // IO_URING = Some(IoUring::new(QUEUE_LEN).expect("nuclei: uring can't be initialized")); let (submitter, sq, cq) = IO_URING.as_mut().unwrap().split(); Ok(SysProactor { - sq: TTas::new(sq), - cq: TTas::new(cq), - sbmt: TTas::new(submitter), - submitters: TTas::new(HashMap::default()), + sq: Mutex::new(sq), + cq: Mutex::new(cq), + sbmt: Mutex::new(submitter), + submitters: Mutex::new(HashMap::default()), submitter_id: AtomicU64::default(), waker: AtomicBool::default(), }) @@ -173,46 +180,49 @@ impl SysProactor { pub(crate) fn register_io( &self, - mut sqe: &mut SQEntry, + mut sqe: SQEntry, ) -> io::Result { dbg!("REGISTER IO"); - let sub_comp = unsafe { - let mut sq = self.sq.lock(); - let id = self.submitter_id.fetch_add(1, Ordering::Relaxed); - let (tx, rx) = oneshot::channel(); - dbg!("self.submitter_id.fetch_add"); - - let mut sqe = sqe.clone(); - sqe = sqe.user_data(id); - dbg!("sqe = sqe.user_data(id);"); + let id = self.submitter_id.fetch_add(1, Ordering::Relaxed); + let (tx, rx) = oneshot::channel(); + dbg!("self.submitter_id.fetch_add"); - let mut subguard = self.submitters.lock(); - subguard.insert(id, tx); - // drop(subguard); - dbg!("subguard.insert"); + sqe = sqe.user_data(id); + dbg!("sqe = sqe.user_data(id);"); - let cc = CompletionChan { rx }; + let mut subguard = self.submitters.lock().unwrap(); + subguard.insert(id, tx); + drop(subguard); + dbg!("subguard.insert"); - dbg!("chan"); + let mut sq = self.sq.lock().unwrap(); + unsafe { + sq.push(&sqe).expect("nuclei: submission queue is full"); + } + sq.sync(); - sq.push(&sqe).expect("nuclei: queue is full"); + dbg!("pushed - submit was here"); - dbg!("pushed"); + // drop(sbmt); + let sbmt = self.sbmt.lock().unwrap(); + dbg!("submitting......................................."); + sbmt.submit()?; + dbg!("submitted......................................."); + drop(sbmt); - let sbmt = self.sbmt.lock(); - sbmt.submit_and_wait(1)?; - dbg!("submitted"); - // drop(sbmt); + sq.sync(); + drop(sq); - dbg!("leaving"); - cc - }; + dbg!("leaving"); - Ok(sub_comp) + Ok(CompletionChan { rx }) // sub_comp.ok_or(io::Error::from(io::ErrorKind::WouldBlock)) } pub(crate) fn wake(&self) -> io::Result<()> { + let (mut sq, mut cq) = (self.sq.lock().unwrap(), self.cq.lock().unwrap()); + sq.sync(); + cq.sync(); Ok(()) } @@ -222,24 +232,33 @@ impl SysProactor { duration: Option, ) -> io::Result { // dbg!("wait"); - let mut cq = self.cq.lock(); + + let mut cq = self.cq.lock().unwrap(); let mut acc: usize = 0; + // issue cas barrier + cq.sync(); while let Some(cqe) = cq.next() { - dbg!("cqe_completion"); + dbg!(&cqe); self.cqe_completion(&cqe)?; acc+=1; } + cq.sync(); Ok(acc) } fn cqe_completion(&self, cqe: &CQEntry) -> io::Result<()> { let udata = cqe.user_data(); + dbg!(&udata); let res: i32 = cqe.result(); dbg!("self.submitters.lock().remov"); - self.submitters.lock().remove(&udata).map(|s| s.send(res)); + let mut sbmts = self.submitters.lock().unwrap(); + sbmts.remove(&udata).map(|s| { + dbg!(&res); + s.send(res).unwrap() + }); dbg!("self.submitters.lock().removed"); Ok(()) diff --git a/src/syscore/linux/iouring/processor.rs b/src/syscore/linux/iouring/processor.rs index 8fd1f87..94b94bf 100644 --- a/src/syscore/linux/iouring/processor.rs +++ b/src/syscore/linux/iouring/processor.rs @@ -23,11 +23,12 @@ use std::mem::MaybeUninit; use std::os::fd::OwnedFd; use std::os::unix::ffi::OsStrExt; use std::os::unix::prelude::RawFd; +use std::ptr::null_mut; use libc::sockaddr_un; use os_socketaddr::OsSocketAddr; use pin_utils::unsafe_pinned; use rustix::io_uring::{msghdr, RecvFlags, SendFlags, sockaddr, sockaddr_storage, SocketFlags}; -use rustix::net::{connect_unix, SocketAddrAny, SocketAddrUnix}; +use rustix::net::{connect_unix, SocketAddrAny, SocketAddrStorage, SocketAddrUnix}; use rustix_uring::opcode::RecvMsg; use rustix_uring::squeue::Entry; use rustix_uring::types::{AtFlags, Mode, OFlags, socklen_t, Statx, StatxFlags}; @@ -60,7 +61,7 @@ impl Processor { .mode(Mode::from(0o666)) .build(); - let cc = Proactor::get().inner().register_io(&mut sqe)?; + let cc = Proactor::get().inner().register_io(sqe)?; let x = cc.await? as _; dbg!(x); @@ -73,10 +74,13 @@ impl Processor { buf: &mut [u8], offset: usize, ) -> io::Result { - let mut sqe = OP::Read::new(Fd(*io), buf as *mut _ as *mut _, offset as _) + dbg!(&offset); + let mut sqe = OP::Read::new(Fd(*io), buf.as_mut_ptr(), offset as _) .build(); - let cc = Proactor::get().inner().register_io(&mut sqe)?; + dbg!("READFILE"); + + let cc = Proactor::get().inner().register_io(sqe)?; Ok(cc.await? as _) } @@ -89,7 +93,7 @@ impl Processor { let mut sqe = OP::Write::new(Fd(*io), buf as *const _ as *const _, offset as _) .build(); - let cc = Proactor::get().inner().register_io(&mut sqe)?; + let cc = Proactor::get().inner().register_io(sqe)?; Ok(cc.await? as _) } @@ -98,7 +102,7 @@ impl Processor { let mut sqe = OP::Close::new(Fd(*io)) .build(); - let cc = Proactor::get().inner().register_io(&mut sqe)?; + let cc = Proactor::get().inner().register_io(sqe)?; Ok(cc.await? as _) } @@ -118,7 +122,7 @@ impl Processor { Proactor::get() .inner() - .register_io(&mut sqe)? + .register_io(sqe)? .await?; unsafe { Ok((*statx).stx_size as usize) } @@ -132,7 +136,7 @@ impl Processor { .offset(0_u64) .build(); - let cc = Proactor::get().inner().register_io(&mut sqe)?; + let cc = Proactor::get().inner().register_io(sqe)?; Ok(cc.await? as _) } @@ -145,7 +149,7 @@ impl Processor { .offset(0_u64) .build(); - let cc = Proactor::get().inner().register_io(&mut sqe)?; + let cc = Proactor::get().inner().register_io(sqe)?; Ok(cc.await? as _) } @@ -164,7 +168,7 @@ impl Processor { let res = Proactor::get() .inner() - .register_io(&mut sqe)? + .register_io(sqe)? .await?; Ok(res as _) @@ -184,14 +188,18 @@ impl Processor { flags: RecvFlags, ) -> io::Result { let fd = socket.as_raw_fd() as _; + dbg!(&fd); - let mut sqe = OP::Recv::new(Fd(fd), buf.as_ptr() as _, buf.len() as _) + let mut sqe = OP::Recv::new(Fd(fd), buf.as_mut_ptr(), buf.len() as _) .flags(flags) .build(); + dbg!("recv_with_flags time"); + dbg!(&flags); + let res = Proactor::get() .inner() - .register_io(&mut sqe)? + .register_io(sqe)? .await?; Ok(res as _) @@ -264,7 +272,7 @@ impl Processor { Proactor::get() .inner() - .register_io(&mut sqe)? + .register_io(sqe)? .await?; Ok(Handle::new(stream)?) @@ -394,13 +402,42 @@ impl Processor { // Ok((Handle::new(stream)?, addr)) // } + // pub(crate) async fn processor_accept_tcp_listener( + // listener: &R + // ) -> io::Result<(Handle, SocketAddr)> { + // let fd = listener.as_raw_fd() as _; + // let sockfd: OwnedFd = unsafe { OwnedFd::from_raw_fd(fd) }; + // let sockaddr = rustix::net::getsockname(&sockfd)?; + // + // let mut sqe = OP::Accept::new(Fd(fd), null_mut(), null_mut()) + // .build(); + // + // let cc = Proactor::get().inner().register_io(sqe)?; + // + // let stream = unsafe { TcpStream::from_raw_fd(cc.await?) }; + // + // let natsaddr = unsafe { + // let mut sas = std::mem::zeroed::(); + // sockaddr.write(&mut sas as *mut _ as *mut _); + // let size = std::mem::size_of::(); + // socket2::SockAddr::from_raw_parts(&sas as *const _ as *const _, size as _) + // .as_std() + // .unwrap() + // }; + // + // Ok((Handle::new(stream).unwrap(), natsaddr)) + // } + pub(crate) async fn processor_accept_tcp_listener( listener: &R, ) -> io::Result<(Handle, SocketAddr)> { + dbg!(&listener.as_raw_fd()); let socket = unsafe { socket2::Socket::from_raw_fd(listener.as_raw_fd()) }; let socket = socket.into_tcp_listener(); let socket = ManuallyDrop::new(socket); + dbg!("ACCEPT_TCP"); + socket .accept() .map(|(stream, sockaddr)| (Handle::new(stream).unwrap(), sockaddr)) @@ -440,7 +477,7 @@ impl Processor { let res = Proactor::get() .inner() - .register_io(&mut sqe)? + .register_io(sqe)? .await?; Ok(res as _) @@ -489,7 +526,7 @@ impl Processor { let res = Proactor::get() .inner() - .register_io(&mut sqe)? + .register_io(sqe)? .await?; let sockaddr = unsafe { @@ -521,7 +558,7 @@ impl Processor { .flags(SocketFlags::empty()) .build(); - let cc = Proactor::get().inner().register_io(&mut sqe)?; + let cc = Proactor::get().inner().register_io(sqe)?; let stream = unsafe { UnixStream::from_raw_fd(cc.await?) }; let usa = unsafe { socket2::SockAddr::from_raw_parts( @@ -554,7 +591,7 @@ impl Processor { Proactor::get() .inner() - .register_io(&mut sqe)? + .register_io(sqe)? .await?; Ok(Handle::new(stream)?)