Skip to content

Commit

Permalink
tests/net: add a test for multishot recvmsg
Browse files Browse the repository at this point in the history
This adds a test entry which exercises `IORING_RECV_MULTISHOT` on
`IORING_OP_RECVMSG`, as well as `RecvMsgOut` parsing logic.
  • Loading branch information
lucab committed Feb 15, 2023
1 parent 9c8a36b commit 65ce2c5
Show file tree
Hide file tree
Showing 2 changed files with 145 additions and 0 deletions.
1 change: 1 addition & 0 deletions io-uring-test/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ fn test<S: squeue::EntryMarker, C: cqueue::EntryMarker>(
#[cfg(not(feature = "ci"))]
tests::net::test_tcp_recv_multi(&mut ring, &test)?;
tests::net::test_socket(&mut ring, &test)?;
tests::net::test_udp_recvmsg_multishot(&mut ring, &test)?;

// queue
tests::poll::test_eventfd_poll(&mut ring, &test)?;
Expand Down
144 changes: 144 additions & 0 deletions io-uring-test/src/tests/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1235,3 +1235,147 @@ pub fn test_socket<S: squeue::EntryMarker, C: cqueue::EntryMarker>(

Ok(())
}

pub fn test_udp_recvmsg_multishot<S: squeue::EntryMarker, C: cqueue::EntryMarker>(
ring: &mut IoUring<S, C>,
test: &Test,
) -> anyhow::Result<()> {
// Multishot recvmsg was introduced in 6.0, like `SendZc`.
// We cannot probe for the former, so we check for the latter as a proxy instead.
require!(
test;
test.probe.is_supported(opcode::RecvMsg::CODE);
test.probe.is_supported(opcode::ProvideBuffers::CODE);
test.probe.is_supported(opcode::SendZc::CODE);
);

println!("test udp_recvmsg_multishot");

let (socket_slot, socket_addr) = {
// `:0` means "pick up a random available port number", which should
// help avoiding test flakes if a static port is already in use.
let server_sock = std::net::UdpSocket::bind("127.0.0.1:0")?;
ring.submitter()
.register_files(&[server_sock.as_raw_fd()])?;
let addr = server_sock.local_addr().unwrap();
(io_uring::types::Fixed(0), addr)
};

// Provide 2 buffers in buffer group `33`, at index 0 and 1.
// Each one is 512 bytes large.
const BUF_GROUP: u16 = 33;
const SIZE: usize = 512;
let mut buffers = [[0u8; SIZE]; 2];
for (index, buf) in buffers.iter_mut().enumerate() {
let provide_bufs_e = io_uring::opcode::ProvideBuffers::new(
buf.as_mut_ptr(),
SIZE as i32,
1,
BUF_GROUP,
index as u16,
)
.build()
.user_data(11)
.into();
unsafe { ring.submission().push(&provide_bufs_e)? };
ring.submitter().submit_and_wait(1)?;
let cqes: Vec<io_uring::cqueue::Entry> = ring.completion().map(Into::into).collect();
assert_eq!(cqes.len(), 1);
assert_eq!(cqes[0].user_data(), 11);
assert_eq!(cqes[0].result(), 0);
assert_eq!(cqes[0].flags(), 0);
}

// This structure is actually only used for input arguments to the kernel
// (and only name length and control length are actually relevant).
let mut msghdr: libc::msghdr = unsafe { std::mem::zeroed() };
msghdr.msg_namelen = 32;
msghdr.msg_controllen = 0;

// TODO(lucab): make this more ergonomic to use.
const IORING_RECV_MULTISHOT: u16 = 2;

let recvmsg_e = io_uring::opcode::RecvMsg::new(socket_slot, &mut msghdr as *mut _)
.ioprio(IORING_RECV_MULTISHOT)
.buf_group(BUF_GROUP)
.build()
.flags(io_uring::squeue::Flags::BUFFER_SELECT)
.user_data(77)
.into();
unsafe { ring.submission().push(&recvmsg_e)? };
ring.submitter().submit().unwrap();

let client_socket: socket2::Socket = std::net::UdpSocket::bind("127.0.0.1:0").unwrap().into();
let client_addr = client_socket
.local_addr()
.unwrap()
.as_socket_ipv4()
.unwrap();
client_socket
.send_to("testfoo".as_bytes(), &socket_addr.into())
.unwrap();
client_socket
.send_to("testbarbar".as_bytes(), &socket_addr.into())
.unwrap();

// Check the completion events for the two UDP messages, plus a trailing
// CQE signaling that we ran out of buffers.
ring.submitter().submit_and_wait(3).unwrap();
let cqes: Vec<io_uring::cqueue::Entry> = ring.completion().map(Into::into).collect();
assert_eq!(cqes.len(), 3);
assert_eq!(cqes[0].user_data(), 77);
assert!(cqes[0].result() > 0);
assert!(io_uring::cqueue::more(cqes[0].flags()));
assert_eq!(io_uring::cqueue::buffer_select(cqes[0].flags()), Some(0));
assert!(cqes[0].flags() != 0);
assert_eq!(cqes[1].user_data(), 77);
assert!(cqes[1].result() > 0);
assert!(io_uring::cqueue::more(cqes[1].flags()));
assert_eq!(io_uring::cqueue::buffer_select(cqes[1].flags()), Some(1));
assert!(cqes[1].flags() != 0);
assert_eq!(cqes[2].user_data(), 77);
assert_eq!(cqes[2].result(), -libc::ENOBUFS);
assert!(!io_uring::cqueue::more(cqes[2].flags()));
assert_eq!(io_uring::cqueue::buffer_select(cqes[2].flags()), None);
assert_eq!(cqes[2].flags(), 0);

let msg0 = types::RecvMsgOut::parse(buffers[0].as_slice(), &msghdr).unwrap();
assert!(!msg0.is_payload_truncated());
assert_eq!(msg0.payload_data(), b"testfoo".as_slice());
assert!(!msg0.is_control_data_truncated());
assert_eq!(msg0.control_data(), &[]);
assert!(!msg0.is_name_data_truncated());
let (_, addr) = unsafe {
socket2::SockAddr::init(|storage, len| {
*len = msg0.name_data().len() as u32;
std::ptr::copy_nonoverlapping(msg0.name_data().as_ptr() as _, storage, 1);
Ok(())
})
}
.unwrap();
let addr = addr.as_socket_ipv4().unwrap();
assert_eq!(addr.ip(), client_addr.ip());
assert_eq!(addr.port(), client_addr.port());

let msg1 = types::RecvMsgOut::parse(buffers[1].as_slice(), &msghdr).unwrap();
assert!(!msg1.is_payload_truncated());
assert_eq!(msg1.payload_data(), b"testbarbar".as_slice());
assert!(!msg1.is_control_data_truncated());
assert_eq!(msg1.control_data(), &[]);
assert!(!msg1.is_name_data_truncated());
let (_, addr) = unsafe {
socket2::SockAddr::init(|storage, len| {
*len = msg1.name_data().len() as u32;
std::ptr::copy_nonoverlapping(msg1.name_data().as_ptr() as _, storage, 1);
Ok(())
})
}
.unwrap();
let addr = addr.as_socket_ipv4().unwrap();
assert_eq!(addr.ip(), client_addr.ip());
assert_eq!(addr.port(), client_addr.port());

ring.submitter().unregister_files().unwrap();

Ok(())
}

0 comments on commit 65ce2c5

Please sign in to comment.