Skip to content

Commit

Permalink
feat(net):建立Unix InodeID映射表,并完善seqpacket相关功能 (#929)
Browse files Browse the repository at this point in the history
* 初版Seqpacket socket

* 实现unix seq的socket pair

* 优化socketpair

* 加上叹号进入系统

* 将listener的backlog改为inode

* feat: impl for buffer size meta

* 添加seqpacket测试

* 实现seq_socket的accpet阻塞

* 暂时将部分红线改为todo

* 删除部分注释代码

---------

Co-authored-by: Samuka007 <[email protected]>
  • Loading branch information
Saga1718 and Samuka007 authored Sep 19, 2024
1 parent 11dad02 commit 3ab8d05
Show file tree
Hide file tree
Showing 17 changed files with 716 additions and 110 deletions.
71 changes: 24 additions & 47 deletions kernel/src/arch/x86_64/syscall/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::{
arch::{
ipc::signal::X86_64SignalArch,
syscall::nr::{SysCall, SYS_ARCH_PRCTL, SYS_RT_SIGRETURN},
syscall::nr::{SYS_ARCH_PRCTL, SYS_RT_SIGRETURN},
CurrentIrqArch,
},
exception::InterruptArch,
Expand All @@ -12,6 +12,7 @@ use crate::{
syscall::{Syscall, SYS_SCHED},
};
use log::debug;
use nr::SysCall;
use system_error::SystemError;

use super::{
Expand Down Expand Up @@ -53,7 +54,7 @@ macro_rules! syscall_return {

if $show {
let pid = ProcessManager::current_pcb().pid();
debug!("[SYS] [Pid: {:?}] [Retn: {:?}]", pid, ret as i64);
debug!("syscall return:pid={:?},ret= {:?}\n", pid, ret as isize);
}

unsafe {
Expand All @@ -63,24 +64,6 @@ macro_rules! syscall_return {
}};
}

macro_rules! normal_syscall_return {
($val:expr, $regs:expr, $show:expr) => {{
let ret = $val;

if $show {
let pid = ProcessManager::current_pcb().pid();
debug!("[SYS] [Pid: {:?}] [Retn: {:?}]", pid, ret);
}

$regs.rax = ret.unwrap_or_else(|e| e.to_posix_errno() as usize) as u64;

unsafe {
CurrentIrqArch::interrupt_disable();
}
return;
}};
}

#[no_mangle]
pub extern "sysv64" fn syscall_handler(frame: &mut TrapFrame) {
let syscall_num = frame.rax as usize;
Expand All @@ -105,31 +88,15 @@ pub extern "sysv64" fn syscall_handler(frame: &mut TrapFrame) {
];
mfence();
let pid = ProcessManager::current_pcb().pid();
let mut show = (syscall_num != SYS_SCHED) && (pid.data() >= 7);
// let mut show = true;

let to_print = SysCall::try_from(syscall_num);
if let Ok(to_print) = to_print {
use SysCall::*;
match to_print {
SYS_ACCEPT | SYS_ACCEPT4 | SYS_BIND | SYS_CONNECT | SYS_SHUTDOWN | SYS_LISTEN => {
show &= true;
}
SYS_RECVFROM | SYS_SENDTO | SYS_SENDMSG | SYS_RECVMSG => {
show &= true;
}
SYS_SOCKET | SYS_GETSOCKNAME | SYS_GETPEERNAME | SYS_SOCKETPAIR | SYS_SETSOCKOPT
| SYS_GETSOCKOPT => {
show &= true;
}
_ => {
show &= true;
}
}

if show {
debug!("[SYS] [Pid: {:?}] [Call: {:?}]", pid, to_print);
}
let show = false;
// let show = if syscall_num != SYS_SCHED && pid.data() >= 7 {
// true
// } else {
// false
// };

if show {
debug!("syscall: pid: {:?}, num={:?}\n", pid, syscall_num);
}

// Arch specific syscall
Expand All @@ -142,11 +109,21 @@ pub extern "sysv64" fn syscall_handler(frame: &mut TrapFrame) {
);
}
SYS_ARCH_PRCTL => {
normal_syscall_return!(Syscall::arch_prctl(args[0], args[1]), frame, show);
syscall_return!(
Syscall::arch_prctl(args[0], args[1])
.unwrap_or_else(|e| e.to_posix_errno() as usize),
frame,
show
);
}
_ => {}
}
normal_syscall_return!(Syscall::handle(syscall_num, &args, frame), frame, show);
syscall_return!(
Syscall::handle(syscall_num, &args, frame).unwrap_or_else(|e| e.to_posix_errno() as usize)
as u64,
frame,
show
);
}

/// 系统调用初始化
Expand Down
8 changes: 5 additions & 3 deletions kernel/src/net/socket/buffer.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use alloc::vec::Vec;

use alloc::sync::Arc;
use alloc::{sync::Arc,string::String};
use system_error::SystemError;

use crate::libs::spinlock::SpinLock;
Expand Down Expand Up @@ -37,13 +37,15 @@ impl Buffer {
let mut read_buffer = self.read_buffer.lock_irqsave();
let len = core::cmp::min(buf.len(), read_buffer.len());
buf[..len].copy_from_slice(&read_buffer[..len]);
read_buffer.split_off(len);
let _ = read_buffer.split_off(len);
log::debug!("recv buf {}",String::from_utf8_lossy(buf));

return Ok(len);
}

pub fn write_read_buffer(&self, buf: &[u8]) -> Result<usize, SystemError> {
let mut buffer = self.read_buffer.lock_irqsave();

log::debug!("send buf {}",String::from_utf8_lossy(buf));
let len = buf.len();
if self.metadata.buf_size - buffer.len() < len {
return Err(SystemError::ENOBUFS);
Expand Down
3 changes: 2 additions & 1 deletion kernel/src/net/socket/endpoint.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::net::socket;
use crate::{filesystem::vfs::InodeId, net::socket};
use alloc::sync::Arc;

pub use smoltcp::wire::IpEndpoint;
Expand All @@ -13,6 +13,7 @@ pub enum Endpoint {
/// inode端点
Inode(Arc<socket::Inode>),
// todo: 增加NetLink机制后,增加NetLink端点
InodeId(InodeId),
/// NetLink端点
Netlink(NetlinkEndpoint),
}
Expand Down
11 changes: 6 additions & 5 deletions kernel/src/net/socket/inet/datagram/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,10 @@ impl UdpSocket {
if self.is_nonblock() {
return self.try_recv(buf).map(|(size, _)| size);
} else {
return self
.wait_queue
.busy_wait(EP::EPOLLIN, || self.try_recv(buf).map(|(size, _)| size));
// return self
// .wait_queue
// .busy_wait(EP::EPOLLIN, || self.try_recv(buf).map(|(size, _)| size));
todo!()
}
}

Expand Down Expand Up @@ -151,8 +152,8 @@ impl UdpSocket {
}

impl Socket for UdpSocket {
fn wait_queue(&self) -> WaitQueue {
self.wait_queue.clone()
fn wait_queue(&self) -> &WaitQueue {
&self.wait_queue
}

fn poll(&self) -> usize {
Expand Down
26 changes: 17 additions & 9 deletions kernel/src/net/socket/unix/mod.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,26 @@
mod stream;
pub(crate) mod seqpacket;
use crate::net::socket::*;
use crate::{filesystem::vfs::InodeId, libs::rwlock::RwLock, net::socket::*};
use hashbrown::HashMap;
use system_error::SystemError::{self, *};
use alloc::sync::Arc;
pub struct Unix;

lazy_static!{
pub static ref INODE_MAP: RwLock<HashMap<InodeId, Endpoint>> = RwLock::new(HashMap::new());
}

fn create_unix_socket(
sock_type: Type,
) -> Result<Arc<dyn Socket>, SystemError> {
) -> Result<Arc<Inode>, SystemError> {
match sock_type {
Type::Stream => {
Ok(stream::StreamSocket::new())
// Type::Stream => {
// Ok(stream::StreamSocket::new())
// },
Type::SeqPacket |Type::Datagram=>{
// Ok(seqpacket::SeqpacketSocket::new(false))
seqpacket::SeqpacketSocket::new_inode(false)
},
Type::SeqPacket =>{
Ok(seqpacket::SeqpacketSocket::new(false))
}
_ => {
Err(EPROTONOSUPPORT)
}
Expand All @@ -24,14 +30,16 @@ fn create_unix_socket(
impl family::Family for Unix {
fn socket(stype: Type, _protocol: u32) -> Result<Arc<Inode>, SystemError> {
let socket = create_unix_socket(stype)?;
Ok(Inode::new(socket))
// Ok(Inode::new(socket))
Ok(socket)
}
}

impl Unix {
pub fn new_pairs(socket_type:Type) ->Result<(Arc<Inode>,Arc<Inode>),SystemError>{
log::debug!("socket_type {:?}",socket_type);
match socket_type {
Type::SeqPacket=>seqpacket::SeqpacketSocket::new_pairs(),
Type::SeqPacket |Type::Datagram=>seqpacket::SeqpacketSocket::new_pairs(),
_=>todo!()
}
}
Expand Down
26 changes: 20 additions & 6 deletions kernel/src/net/socket/unix/seqpacket/inner.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use core::sync::atomic::{AtomicUsize, Ordering};
use alloc::string::String;
use alloc::{collections::VecDeque,sync::Arc};

use crate::{libs::mutex::Mutex, net::socket::{buffer::Buffer, endpoint::Endpoint, Inode, ShutdownTemp}};
Expand Down Expand Up @@ -45,10 +46,17 @@ pub(super) struct Listener{

impl Listener {
pub(super) fn new(inode:Endpoint,backlog:usize)->Self{
log::debug!("backlog {}",backlog);
let back = if backlog>1024{
1024 as usize
}
else{
backlog
};
return Self{
inode,
backlog:AtomicUsize::new(backlog),
incoming_conns:Mutex::new(VecDeque::with_capacity(backlog)),
backlog:AtomicUsize::new(back),
incoming_conns:Mutex::new(VecDeque::with_capacity(back)),
}
}
pub(super) fn endpoint(&self) ->&Endpoint{
Expand All @@ -57,8 +65,9 @@ impl Listener {

pub(super) fn try_accept(&self) ->Result<(Arc<Inode>, Endpoint),SystemError>{
let mut incoming_conns =self.incoming_conns.lock();
log::debug!(" incom len {}",incoming_conns.len());
let conn=incoming_conns.pop_front().ok_or_else(|| SystemError::EAGAIN_OR_EWOULDBLOCK)?;
let socket =Arc::downcast::<SeqpacketSocket>(conn).map_err(|_| SystemError::EINVAL)?;
let socket =Arc::downcast::<SeqpacketSocket>(conn.inner()).map_err(|_| SystemError::EINVAL)?;
let peer = match &*socket.inner.read(){
Inner::Connected(connected)=>connected.peer_endpoint().unwrap().clone(),
_=>return Err(SystemError::ENOTCONN),
Expand All @@ -81,7 +90,7 @@ impl Listener {

let new_server=SeqpacketSocket::new(false);
let new_inode=Inode::new(new_server.clone());

// log::debug!("new inode {:?},client_epoint {:?}",new_inode,client_epoint);
let (server_conn, client_conn) = Connected::new_pair(Some(Endpoint::Inode(new_inode.clone())), client_epoint);
*new_server.inner.write()=Inner::Connected(server_conn);
incoming_conns.push_back(new_inode);
Expand All @@ -90,6 +99,10 @@ impl Listener {

Ok(client_conn)
}

pub(super) fn is_acceptable(&self)->bool {
return self.incoming_conns.lock().len()!=0
}
}

#[derive(Debug)]
Expand Down Expand Up @@ -147,6 +160,7 @@ impl Connected{
if self.can_send()? {
return self.send_slice(buf);
} else {
log::debug!("can not send {:?}",String::from_utf8_lossy(&buf[..]));
return Err(SystemError::ENOBUFS);
}
}
Expand All @@ -155,7 +169,7 @@ impl Connected{
return !self.buffer.is_read_buf_empty();
}

// 检查发送缓冲区是否为空
// 检查发送缓冲区是否满了
pub fn can_send(&self) -> Result<bool, SystemError> {
// let sebuffer = self.sebuffer.lock(); // 获取锁
// sebuffer.capacity()-sebuffer.len() ==0;
Expand All @@ -170,7 +184,7 @@ impl Connected{
},
_=>return Err(SystemError::EINVAL),
};
Ok(is_full)
Ok(!is_full)
}

pub fn recv_slice(&self, buf: &mut [u8]) -> Result<usize, SystemError>{
Expand Down
Loading

0 comments on commit 3ab8d05

Please sign in to comment.