Skip to content

Commit

Permalink
初版Seqpacket socket
Browse files Browse the repository at this point in the history
  • Loading branch information
Saga1718 committed Sep 9, 2024
1 parent 703b8e1 commit cc2b4aa
Show file tree
Hide file tree
Showing 6 changed files with 553 additions and 1 deletion.
3 changes: 2 additions & 1 deletion kernel/src/net/socket/base.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#![allow(unused_variables)]

use core::any::Any;
use core::fmt::Debug;
use alloc::sync::Arc;
use system_error::SystemError::{self, *};
Expand All @@ -9,7 +10,7 @@ use crate::net::syscall_util::MsgHdr;
/// # `Socket` methods
/// ## Reference
/// - [Posix standard](https://pubs.opengroup.org/onlinepubs/9699919799/)
pub trait Socket: Sync + Send + Debug {
pub trait Socket: Sync + Send + Debug + Any{
/// # `wait_queue`
/// 获取socket的wait queue
fn wait_queue(&self) -> WaitQueue;
Expand Down
4 changes: 4 additions & 0 deletions kernel/src/net/socket/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ impl Buffer {
return self.read_buffer.lock().is_empty();
}

pub fn is_read_buf_full(&self) ->bool {
return self.metadata.buf_size-self.read_buffer.lock().len()==0
}

pub fn is_write_buf_empty(&self) -> bool {
return self.write_buffer.lock().is_empty();
}
Expand Down
4 changes: 4 additions & 0 deletions kernel/src/net/socket/inode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,4 +93,8 @@ impl Inode {
pub fn set_close_on_exec(&self, close_on_exec: bool) {
todo!()
}

pub fn inner(&self)->Arc<dyn Socket>{
return self.inner.clone();
}
}
1 change: 1 addition & 0 deletions kernel/src/net/socket/unix/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
mod stream;
mod seqpacket;
205 changes: 205 additions & 0 deletions kernel/src/net/socket/unix/seqpacket/inner.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
use core::sync::atomic::{AtomicUsize, Ordering};
use alloc::{collections::VecDeque,sync::Arc};

use crate::{libs::mutex::Mutex, net::socket::{buffer::Buffer, endpoint::Endpoint, Inode, ShutdownTemp}};
use system_error::SystemError::{self, *};
use super::SeqpacketSocket;

#[derive(Debug)]
pub(super) struct Init{
inode:Option<Endpoint>,
}

impl Init{
pub(super) fn new() -> Self{
Self {
inode:None,
}
}

pub(super) fn bind(&mut self, epoint_to_bind: Endpoint) ->Result<(), SystemError> {
if self.inode.is_some() {
log::error!("the socket is already bound");
return Err(EINVAL);
}
match epoint_to_bind {
Endpoint::Inode(_)=>self.inode=Some(epoint_to_bind),
_=>return Err(EINVAL),

}

return Ok(())
}

pub fn endpoint(&self) ->Option<&Endpoint>{
return self.inode.as_ref()
}
}

#[derive(Debug)]
pub(super) struct Listener{
inode: Endpoint,
backlog: AtomicUsize,
incoming_conns: Mutex<VecDeque<Connected>>,
}

impl Listener {
pub(super) fn new(inode:Endpoint,backlog:usize)->Self{
return Self{
inode,
backlog:AtomicUsize::new(backlog),
incoming_conns:Mutex::new(VecDeque::with_capacity(backlog)),
}
}
pub(super) fn endpoint(&self) ->&Endpoint{
return &self.inode;
}

pub(super) fn try_accept(&self) ->Result<(Arc<Inode>, Endpoint),SystemError>{
let mut incoming_conns =self.incoming_conns.lock();
let conn=incoming_conns.pop_front().ok_or_else(|| SystemError::EAGAIN_OR_EWOULDBLOCK)?;
let peer = conn.peer_endpoint().cloned().unwrap();
// *** 返回Arc<Inode>额不是Arc<dyn IndexNode>
let socket = SeqpacketSocket::new_connected(conn, false);

return Ok((Inode::new(socket),peer));
}

pub(super) fn listen(&self, backlog:usize) ->Result<(),SystemError>{
self.backlog.store(backlog, Ordering::Relaxed);
Ok(())
}

pub(super) fn push_incoming(&self, client_epoint:Option<Endpoint>) ->Result<Connected,SystemError>{
let mut incoming_conns=self.incoming_conns.lock();
if incoming_conns.len() >= self.backlog.load(Ordering::Relaxed) {
log::error!("the pending connection queue on the listening socket is full");
return Err(SystemError::EAGAIN_OR_EWOULDBLOCK);
}

let (server_conn, client_conn) = Connected::new_pair(Some(self.inode.clone()), client_epoint);
incoming_conns.push_back(server_conn);

// TODO: epollin

Ok(client_conn)
}
}

#[derive(Debug)]
pub struct Connected{
inode:Option<Endpoint>,
peer_inode:Option<Endpoint>,
buffer: Arc<Buffer>,
}

impl Connected{
/// 默认的缓冲区大小
pub const DEFAULT_BUF_SIZE: usize = 64 * 1024;

pub fn new_pair(inode:Option<Endpoint>,peer_inode:Option<Endpoint>) ->(Connected,Connected){
// let rebuffer = Arc::new(SpinLock::new(Vec::with_capacity(Self::DEFAULT_BUF_SIZE)));
// let sebuffer = Arc::new(SpinLock::new(Vec::with_capacity(Self::DEFAULT_BUF_SIZE)));

let this = Connected{
inode:inode.clone(),
peer_inode:peer_inode.clone(),
buffer:Buffer::new(),
};
let peer = Connected{
inode:peer_inode,
peer_inode:inode,
buffer:Buffer::new(),
};

(this,peer)
}

pub fn endpoint(&self) ->Option<&Endpoint> {
self.inode.as_ref()
}

pub fn peer_endpoint(&self) ->Option<&Endpoint> {
self.peer_inode.as_ref()
}

pub fn try_read(&self, buf: &mut [u8]) -> Result<usize, SystemError> {
if self.can_recv() {
return self.recv_slice(buf);
} else {
return Err(SystemError::EINVAL);
}
}

pub fn try_write(&self, buf: &[u8]) -> Result<usize, SystemError> {
if self.can_send()? {
return self.send_slice(buf);
} else {
return Err(SystemError::ENOBUFS);
}
}

pub fn can_recv(&self) -> bool {
return !self.buffer.is_read_buf_empty();
}

// 检查发送缓冲区是否为空
pub fn can_send(&self) -> Result<bool, SystemError> {
// let sebuffer = self.sebuffer.lock(); // 获取锁
// sebuffer.capacity()-sebuffer.len() ==0;
let peer_inode=match self.peer_inode.as_ref().unwrap(){
Endpoint::Inode(inode) =>inode,
_=>return Err(SystemError::EINVAL),
};
let peer_socket=Arc::downcast::<SeqpacketSocket>(peer_inode.inner()).map_err(|_| SystemError::EINVAL)?;
let is_full=match &*peer_socket.inner.read(){
Inner::Connected(connected) => {
connected.buffer.is_write_buf_empty()
},
_=>return Err(SystemError::EINVAL),
};
Ok(is_full)
}

fn recv_slice(&self, buf: &mut [u8]) -> Result<usize, SystemError>{
return self.buffer.read_read_buffer(buf);
}

fn send_slice(&self, buf: &[u8]) -> Result<usize, SystemError> {
//找到peer_inode,并将write_buffer的内容写入对端的read_buffer
let peer_inode=match self.peer_inode.as_ref().unwrap(){
Endpoint::Inode(inode) =>inode,
_=>return Err(SystemError::EINVAL),
};
let peer_socket=Arc::downcast::<SeqpacketSocket>(peer_inode.inner()).map_err(|_| SystemError::EINVAL)?;
let usize = match &*peer_socket.inner.write(){
Inner::Connected(connected) => {
let usize =connected.buffer.write_read_buffer(buf)?;
usize
},
_ =>return Err(SystemError::EINVAL),
};
Ok(usize)
}

pub fn shutdown(&self, how: ShutdownTemp) -> Result<(), SystemError> {

if how.is_empty() {
return Err(SystemError::EINVAL);
} else if how.is_send_shutdown() {
unimplemented!("unimplemented!");
} else if how.is_recv_shutdown() {
unimplemented!("unimplemented!");
}

Ok(())
}

}

#[derive(Debug)]
pub(super) enum Inner {
Init(Init),
Listen(Listener),
Connected(Connected),
}
Loading

0 comments on commit cc2b4aa

Please sign in to comment.