Skip to content

Commit

Permalink
Mutex locks for splitted link
Browse files Browse the repository at this point in the history
  • Loading branch information
vertexclique committed Jan 19, 2024
1 parent 10028e7 commit 47c51b2
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 60 deletions.
4 changes: 3 additions & 1 deletion examples/fread.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use nuclei::*;
use std::fs::File;
use std::io;
use std::io::{Seek, SeekFrom};

Check warning on line 4 in examples/fread.rs

View workflow job for this annotation

GitHub Actions / stable - x86_64-apple-darwin

unused imports: `SeekFrom`, `Seek`

Check warning on line 4 in examples/fread.rs

View workflow job for this annotation

GitHub Actions / stable - x86_64-apple-darwin

unused imports: `SeekFrom`, `Seek`
use std::path::PathBuf;

use futures::AsyncReadExt;
Expand All @@ -10,6 +11,7 @@ fn main() -> io::Result<()> {
let mut path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
path.push("data");
path.push("quark-gluon-plasma");
dbg!(&path);

let fo = File::open(&path).unwrap();
let mut file = Handle::<File>::new(fo).unwrap();
Expand All @@ -19,7 +21,7 @@ fn main() -> io::Result<()> {
});
let x = x?;

// println!("Content: {}", x);
println!("Content: {}", x);
println!("Length of file is {}", x.len());

Ok(())
Expand Down
4 changes: 0 additions & 4 deletions src/proactor.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,15 @@
use std::task::{Context, Poll};
use std::time::Duration;
use std::{future::Future, io};
use std::sync::Mutex;

use once_cell::sync::Lazy;
use once_cell::sync::OnceCell;

use super::syscore::*;
use super::waker::*;
use crate::spawn_blocking;

pub use super::handle::*;

static PROACTOR: OnceCell<Mutex<Proactor>> = OnceCell::new();

///
/// Concrete proactor instance
pub struct Proactor(SysProactor);
Expand Down
95 changes: 57 additions & 38 deletions src/syscore/linux/iouring/iouring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use crate::Proactor;
use socket2::SockAddr;
use std::mem;
use std::os::unix::net::SocketAddr as UnixSocketAddr;
use std::sync::{atomic, Mutex};
use rustix_uring::{CompletionQueue, IoUring, SubmissionQueue, Submitter, squeue::Entry as SQEntry, cqueue::Entry as CQEntry};

fn max_len() -> usize {
Expand Down Expand Up @@ -137,10 +138,10 @@ const MANUAL_TIMEOUT: u64 = -2 as _;
const QUEUE_LEN: u32 = 1 << 10;

pub struct SysProactor {
sq: TTas<SubmissionQueue<'static>>,
cq: TTas<CompletionQueue<'static>>,
sbmt: TTas<Submitter<'static>>,
submitters: TTas<HashMap<u64, oneshot::Sender<i32>>>,
sq: Mutex<SubmissionQueue<'static>>,
cq: Mutex<CompletionQueue<'static>>,
sbmt: Mutex<Submitter<'static>>,
submitters: Mutex<HashMap<u64, oneshot::Sender<i32>>>,
submitter_id: AtomicU64,
waker: AtomicBool,
}
Expand All @@ -157,14 +158,20 @@ impl SysProactor {
pub(crate) fn new() -> io::Result<SysProactor> {
unsafe {
// nodrop?
IO_URING = Some(IoUring::new(QUEUE_LEN).expect("nuclei: uring can't be initialized"));
let ring = IoUring::builder()
.build(QUEUE_LEN)
.expect("nuclei: uring can't be initialized");

IO_URING = Some(ring);

// IO_URING = Some(IoUring::new(QUEUE_LEN).expect("nuclei: uring can't be initialized"));
let (submitter, sq, cq) = IO_URING.as_mut().unwrap().split();

Ok(SysProactor {
sq: TTas::new(sq),
cq: TTas::new(cq),
sbmt: TTas::new(submitter),
submitters: TTas::new(HashMap::default()),
sq: Mutex::new(sq),
cq: Mutex::new(cq),
sbmt: Mutex::new(submitter),
submitters: Mutex::new(HashMap::default()),
submitter_id: AtomicU64::default(),
waker: AtomicBool::default(),
})
Expand All @@ -173,46 +180,49 @@ impl SysProactor {

pub(crate) fn register_io(
&self,
mut sqe: &mut SQEntry,
mut sqe: SQEntry,
) -> io::Result<CompletionChan> {
dbg!("REGISTER IO");
let sub_comp = unsafe {
let mut sq = self.sq.lock();
let id = self.submitter_id.fetch_add(1, Ordering::Relaxed);
let (tx, rx) = oneshot::channel();
dbg!("self.submitter_id.fetch_add");

let mut sqe = sqe.clone();
sqe = sqe.user_data(id);
dbg!("sqe = sqe.user_data(id);");
let id = self.submitter_id.fetch_add(1, Ordering::Relaxed);
let (tx, rx) = oneshot::channel();
dbg!("self.submitter_id.fetch_add");

let mut subguard = self.submitters.lock();
subguard.insert(id, tx);
// drop(subguard);
dbg!("subguard.insert");
sqe = sqe.user_data(id);
dbg!("sqe = sqe.user_data(id);");

let cc = CompletionChan { rx };
let mut subguard = self.submitters.lock().unwrap();
subguard.insert(id, tx);
drop(subguard);
dbg!("subguard.insert");

dbg!("chan");
let mut sq = self.sq.lock().unwrap();
unsafe {
sq.push(&sqe).expect("nuclei: submission queue is full");
}
sq.sync();

sq.push(&sqe).expect("nuclei: queue is full");
dbg!("pushed - submit was here");

dbg!("pushed");
// drop(sbmt);
let sbmt = self.sbmt.lock().unwrap();
dbg!("submitting.......................................");
sbmt.submit()?;
dbg!("submitted.......................................");
drop(sbmt);

let sbmt = self.sbmt.lock();
sbmt.submit_and_wait(1)?;
dbg!("submitted");
// drop(sbmt);
sq.sync();
drop(sq);

dbg!("leaving");
cc
};
dbg!("leaving");

Ok(sub_comp)
Ok(CompletionChan { rx })
// sub_comp.ok_or(io::Error::from(io::ErrorKind::WouldBlock))
}

pub(crate) fn wake(&self) -> io::Result<()> {
let (mut sq, mut cq) = (self.sq.lock().unwrap(), self.cq.lock().unwrap());
sq.sync();
cq.sync();
Ok(())
}

Expand All @@ -222,24 +232,33 @@ impl SysProactor {
duration: Option<Duration>,
) -> io::Result<usize> {
// dbg!("wait");
let mut cq = self.cq.lock();

let mut cq = self.cq.lock().unwrap();
let mut acc: usize = 0;

// issue cas barrier
cq.sync();
while let Some(cqe) = cq.next() {
dbg!("cqe_completion");
dbg!(&cqe);
self.cqe_completion(&cqe)?;
acc+=1;
}
cq.sync();

Ok(acc)
}

fn cqe_completion(&self, cqe: &CQEntry) -> io::Result<()> {
let udata = cqe.user_data();
dbg!(&udata);
let res: i32 = cqe.result();

dbg!("self.submitters.lock().remov");
self.submitters.lock().remove(&udata).map(|s| s.send(res));
let mut sbmts = self.submitters.lock().unwrap();
sbmts.remove(&udata).map(|s| {
dbg!(&res);
s.send(res).unwrap()
});
dbg!("self.submitters.lock().removed");

Ok(())
Expand Down
Loading

0 comments on commit 47c51b2

Please sign in to comment.