Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(net):完善socket_pair实现,网络重构初步进入系统 #921

Merged
merged 8 commits into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions kernel/src/arch/x86_64/syscall/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ pub extern "sysv64" fn syscall_handler(frame: &mut TrapFrame) {
show &= true;
}
}
show =false;
if show {
debug!("[SYS] [Pid: {:?}] [Call: {:?}]", pid, to_print);
}
Expand Down
23 changes: 23 additions & 0 deletions kernel/src/net/socket/inode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,28 @@ impl IndexNode for Inode {
drop(private_data);
Ok(self.inner.poll())
}

fn open(
&self,
_data: crate::libs::spinlock::SpinLockGuard<crate::filesystem::vfs::FilePrivateData>,
_mode: &crate::filesystem::vfs::file::FileMode,
) -> Result<(), SystemError> {
Ok(())
}

fn metadata(&self) -> Result<crate::filesystem::vfs::Metadata, SystemError> {
let meta = crate::filesystem::vfs::Metadata {
mode: crate::filesystem::vfs::syscall::ModeType::from_bits_truncate(0o755),
file_type: crate::filesystem::vfs::FileType::Socket,
..Default::default()
};

return Ok(meta);
}

fn close(&self, _data: crate::libs::spinlock::SpinLockGuard<crate::filesystem::vfs::FilePrivateData>) -> Result<(), SystemError> {
self.inner.close()
}
}

use super::common::poll_unit::WaitQueue;
Expand Down Expand Up @@ -125,6 +147,7 @@ impl Inode {
flags: MessageFlag,
address: Option<Endpoint>,
) -> Result<(usize, Endpoint), SystemError> {

self.inner.recv_from(buffer, flags, address)
}

Expand Down
11 changes: 10 additions & 1 deletion kernel/src/net/socket/unix/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
mod stream;
mod seqpacket;
pub(crate) mod seqpacket;
use crate::net::socket::*;
use system_error::SystemError::{self, *};
use alloc::sync::Arc;
Expand All @@ -26,4 +26,13 @@ impl family::Family for Unix {
let socket = create_unix_socket(stype)?;
Ok(Inode::new(socket))
}
}

impl Unix {
pub fn new_pairs(socket_type:Type) ->Result<(Arc<Inode>,Arc<Inode>),SystemError>{
match socket_type {
Type::SeqPacket=>seqpacket::SeqpacketSocket::new_pairs(),
_=>todo!()
}
}
}
27 changes: 15 additions & 12 deletions kernel/src/net/socket/unix/seqpacket/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ impl Init{
pub(super) struct Listener{
inode: Endpoint,
backlog: AtomicUsize,
incoming_conns: Mutex<VecDeque<Connected>>,
incoming_conns: Mutex<VecDeque<Arc<Inode>>>,
}

impl Listener {
Expand All @@ -58,9 +58,11 @@ impl Listener {
pub(super) fn try_accept(&self) ->Result<(Arc<Inode>, Endpoint),SystemError>{
let mut incoming_conns =self.incoming_conns.lock();
let conn=incoming_conns.pop_front().ok_or_else(|| SystemError::EAGAIN_OR_EWOULDBLOCK)?;
let peer = conn.peer_endpoint().cloned().unwrap();
// *** 返回Arc<Inode>额不是Arc<dyn IndexNode>
let socket = SeqpacketSocket::new_connected(conn, false);
let socket =Arc::downcast::<SeqpacketSocket>(conn).map_err(|_| SystemError::EINVAL)?;
let peer = match &*socket.inner.read(){
Inner::Connected(connected)=>connected.peer_endpoint().unwrap().clone(),
_=>return Err(SystemError::ENOTCONN),
};

return Ok((Inode::new(socket),peer));
}
Expand All @@ -77,8 +79,12 @@ impl Listener {
return Err(SystemError::EAGAIN_OR_EWOULDBLOCK);
}

let (server_conn, client_conn) = Connected::new_pair(Some(self.inode.clone()), client_epoint);
incoming_conns.push_back(server_conn);
let new_server=SeqpacketSocket::new(false);
let new_inode=Inode::new(new_server.clone());

let (server_conn, client_conn) = Connected::new_pair(Some(Endpoint::Inode(new_inode.clone())), client_epoint);
*new_server.inner.write()=Inner::Connected(server_conn);
incoming_conns.push_back(new_inode);

// TODO: epollin

Expand All @@ -98,8 +104,6 @@ impl Connected{
pub const DEFAULT_BUF_SIZE: usize = 64 * 1024;

pub fn new_pair(inode:Option<Endpoint>,peer_inode:Option<Endpoint>) ->(Connected,Connected){
// let rebuffer = Arc::new(SpinLock::new(Vec::with_capacity(Self::DEFAULT_BUF_SIZE)));
// let sebuffer = Arc::new(SpinLock::new(Vec::with_capacity(Self::DEFAULT_BUF_SIZE)));

let this = Connected{
inode:inode.clone(),
Expand All @@ -115,7 +119,6 @@ impl Connected{
(this,peer)
}


pub fn set_peer_inode(&mut self,peer_epoint:Option<Endpoint>){
self.peer_inode=peer_epoint;
}
Expand Down Expand Up @@ -163,18 +166,18 @@ impl Connected{
let peer_socket=Arc::downcast::<SeqpacketSocket>(peer_inode.inner()).map_err(|_| SystemError::EINVAL)?;
let is_full=match &*peer_socket.inner.read(){
Inner::Connected(connected) => {
connected.buffer.is_write_buf_empty()
connected.buffer.is_read_buf_full()
},
_=>return Err(SystemError::EINVAL),
};
Ok(is_full)
}

fn recv_slice(&self, buf: &mut [u8]) -> Result<usize, SystemError>{
pub fn recv_slice(&self, buf: &mut [u8]) -> Result<usize, SystemError>{
return self.buffer.read_read_buffer(buf);
}

fn send_slice(&self, buf: &[u8]) -> Result<usize, SystemError> {
pub fn send_slice(&self, buf: &[u8]) -> Result<usize, SystemError> {
//找到peer_inode,并将write_buffer的内容写入对端的read_buffer
let peer_inode=match self.peer_inode.as_ref().unwrap(){
Endpoint::Inode(inode) =>inode,
Expand Down
57 changes: 47 additions & 10 deletions kernel/src/net/socket/unix/seqpacket/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,17 @@ impl SeqpacketSocket {
})
}

pub fn new_pair(is_nonblocking: bool) -> (Arc<Self>, Arc<Self>) {
let (conn_a, conn_b) = Connected::new_pair(None, None);
(
Self::new_connected(conn_a, is_nonblocking),
Self::new_connected(conn_b, is_nonblocking),
)
pub fn new_pairs() ->Result<(Arc<Inode>,Arc<Inode>),SystemError> {
let socket0=SeqpacketSocket::new(false);
let socket1=SeqpacketSocket::new(false);
let inode0=Inode::new(socket0.clone());
let inode1=Inode::new(socket1.clone());

let (conn_0, conn_1)=Connected::new_pair(Some(Endpoint::Inode(inode0.clone())), Some(Endpoint::Inode(inode1.clone())));
*socket0.inner.write()=Inner::Connected(conn_0);
*socket1.inner.write()=Inner::Connected(conn_1);

return Ok((inode0, inode1))
}

fn try_accept(&self) -> Result<(Arc<Inode>, Endpoint),SystemError> {
Expand Down Expand Up @@ -176,14 +181,14 @@ impl Socket for SeqpacketSocket{
}

fn close(&self) -> Result<(), SystemError> {
Err(SystemError::ENOSYS)
Ok(())
}

fn get_peer_name(&self) -> Result<Endpoint, SystemError> {
// 获取对端地址
let endpoint = match &*self.inner.read() {
Inner::Connected(connected) => connected.peer_endpoint().cloned(),
_ =>panic!("the socket is not connected")
_ =>return Err(SystemError::ENOTCONN)
};

if let Some(endpoint) = endpoint{
Expand Down Expand Up @@ -229,7 +234,7 @@ impl Socket for SeqpacketSocket{
if flags.contains(MessageFlag::OOB){
return Err(SystemError::EOPNOTSUPP_OR_ENOTSUP);
}
if flags.contains(MessageFlag::DONTWAIT){
if !flags.contains(MessageFlag::DONTWAIT){
loop{
match connected.try_read(buffer){
Ok(usize)=>return Ok(usize),
Expand Down Expand Up @@ -261,7 +266,7 @@ impl Socket for SeqpacketSocket{
return Err(SystemError::EOPNOTSUPP_OR_ENOTSUP);
}

if flags.contains(MessageFlag::DONTWAIT){
if !flags.contains(MessageFlag::DONTWAIT){
loop{
match connected.try_write(buffer){
Ok(usize)=>return Ok(usize),
Expand All @@ -288,6 +293,38 @@ impl Socket for SeqpacketSocket{
fn write(&self, buffer: &[u8]) -> Result<usize, SystemError> {
self.send(buffer, crate::net::socket::MessageFlag::empty())
}

fn recv_from(
&self,
buffer: &mut [u8],
flags: MessageFlag,
_address: Option<Endpoint>,
) -> Result<(usize, Endpoint), SystemError> {
match &*self.inner.write(){
Inner::Connected(connected)=>{
if flags.contains(MessageFlag::OOB){
return Err(SystemError::EOPNOTSUPP_OR_ENOTSUP);
}
if !flags.contains(MessageFlag::DONTWAIT){
loop{
match connected.recv_slice(buffer){
Ok(usize)=>return Ok((usize,connected.endpoint().unwrap().clone())),
Err(_)=>continue,
}
}
}
else {
unimplemented!("unimplemented non_block")
}
},
_=>{
log::error!("the socket is not connected");
return Err(SystemError::ENOTCONN)
}
}
//Err(SystemError::ENOSYS)
}


// fn update_io_events(&self) -> Result<crate::net::socket::EPollEventType, SystemError> {
// // 参考linux的unix_poll https://code.dragonos.org.cn/xref/linux-6.1.9/net/unix/af_unix.c#3152
Expand Down
53 changes: 31 additions & 22 deletions kernel/src/net/syscall.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::{
syscall::Syscall,
};

use super::socket::AddressFamily as AF;
use super::socket::{netlink::endpoint, unix::Unix, AddressFamily as AF};
use super::socket::{self, Endpoint, Socket};

pub use super::syscall_util::*;
Expand Down Expand Up @@ -96,26 +96,29 @@ impl Syscall {
}

// 创建一对socket
let inode0 = socket::create_socket(
address_family,
stype,
protocol as u32,
socket_type.is_nonblock(),
socket_type.is_cloexec(),
)?;
let inode1 = socket::create_socket(
address_family,
stype,
protocol as u32,
socket_type.is_nonblock(),
socket_type.is_cloexec(),
)?;
// let inode0 = socket::create_socket(
// address_family,
// stype,
// protocol as u32,
// socket_type.is_nonblock(),
// socket_type.is_cloexec(),
// )?;
// let inode1 = socket::create_socket(
// address_family,
// stype,
// protocol as u32,
// socket_type.is_nonblock(),
// socket_type.is_cloexec(),
// )?;

// // 进行pair
// unsafe {
// inode0.connect(socket::Endpoint::Inode(inode1.clone()))?;
// inode1.connect(socket::Endpoint::Inode(inode0.clone()))?;
// }

// 进行pair
unsafe {
inode0.connect(socket::Endpoint::Inode(inode1.clone()))?;
inode1.connect(socket::Endpoint::Inode(inode0.clone()))?;
}
// 创建一对新的unix socket pair
let (inode0,inode1)=Unix::new_pairs(stype)?;

fds[0] = fd_table_guard.alloc_fd(File::new(inode0, FileMode::O_RDWR)?, None)?;
fds[1] = fd_table_guard.alloc_fd(File::new(inode1, FileMode::O_RDWR)?, None)?;
Expand Down Expand Up @@ -328,11 +331,17 @@ impl Syscall {
)?)
};

let (n, endpoint) = socket.recv_from(buf, flags, address)?;
let (n, endpoint) = match socket.recv_from(buf, flags, address){
Ok((n,endpoint))=>(n,endpoint),
Err(err)=>{
//log::debug!("recvfrom not impl");
return Err(err)
}
};
drop(socket);

// 如果有地址信息,将地址信息写入用户空间
if addr.is_null() {
if !addr.is_null() {
let sockaddr_in = SockAddr::from(endpoint);
unsafe {
sockaddr_in.write_to_user(addr, addrlen)?;
Expand Down
Loading