Skip to content

Commit

Permalink
server: Increase channel capacity
Browse files Browse the repository at this point in the history
  • Loading branch information
XuShaohua committed Jun 8, 2024
1 parent a2f3f79 commit 94372e4
Show file tree
Hide file tree
Showing 8 changed files with 31 additions and 31 deletions.
11 changes: 7 additions & 4 deletions src/listener/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ pub struct Listener {
dispatcher_receiver: Option<Receiver<DispatcherToListenerCmd>>,
}

const CHANNEL_CAPACITY: usize = 16;
const CHANNEL_CAPACITY: usize = 1024 * 8;

impl Listener {
#[must_use]
Expand All @@ -57,7 +57,10 @@ impl Listener {
fn new_connection(&mut self, stream: Stream) {
let (sender, receiver) = mpsc::channel(CHANNEL_CAPACITY);
let session_id = self.next_session_id();
log::info!("Got new connection in listener {}, session id: {session_id}", self.id);
log::info!(
"Got new connection in listener {}, session id: {session_id}",
self.id
);
self.session_senders.insert(session_id, sender);
let session_config = SessionConfig::new(self.config.keepalive());
let session = Session::new(
Expand Down Expand Up @@ -127,8 +130,8 @@ impl Listener {
pub(super) async fn accept(&mut self) -> Result<Stream, Error> {
match &mut self.socket_listener {
SocketListener::Tcp(tcp_listener) => {
let (tcp_stream, address) = tcp_listener.accept().await?;
Ok(Stream::Tcp(tcp_stream, address))
let (tcp_stream, _address) = tcp_listener.accept().await?;
Ok(Stream::Tcp(tcp_stream))
}
}
}
Expand Down
7 changes: 5 additions & 2 deletions src/listener/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
// Use of this source is governed by GNU Affero General Public License
// that can be found in the LICENSE file.

use stdext::function_name;

use crate::commands::{ListenerToDispatcherCmd, SessionToListenerCmd};
use crate::error::Error;
use crate::listener::Listener;
Expand All @@ -15,12 +17,13 @@ impl Listener {
match cmd {
SessionToListenerCmd::Cmd(session_id, command) => {
// Pass cmd to dispatcher
let cmd =
ListenerToDispatcherCmd::Cmd(SessionGroup::new(self.id, session_id), command);
let session_group = SessionGroup::new(self.id, session_id);
let cmd = ListenerToDispatcherCmd::Cmd(session_group, command);
self.dispatcher_sender.send(cmd).await?;
Ok(())
}
SessionToListenerCmd::Disconnect(session_id) => {
log::info!("{} remove session: {session_id}", function_name!());
self.session_senders.remove_entry(&session_id);
Ok(())
}
Expand Down
10 changes: 4 additions & 6 deletions src/listener/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@
// Use of this source is governed by GNU Affero General Public License
// that can be found in the LICENSE file.

use std::net::SocketAddr;

use bytes::{Bytes, BytesMut};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
Expand All @@ -12,7 +10,7 @@ use crate::error::Error;

#[derive(Debug)]
pub enum Stream {
Tcp(TcpStream, SocketAddr),
Tcp(TcpStream),
}

impl Stream {
Expand All @@ -23,7 +21,7 @@ impl Stream {
/// Returns error if stream/socket gets error.
pub async fn read_buf(&mut self, buf: &mut BytesMut) -> Result<usize, Error> {
match self {
Self::Tcp(ref mut tcp_stream, _address) => Ok(tcp_stream.read_buf(buf).await?),
Self::Tcp(ref mut tcp_stream) => Ok(tcp_stream.read_buf(buf).await?),
}
}

Expand All @@ -34,13 +32,13 @@ impl Stream {
/// Returns error if socket/stream gets error.
pub async fn write(&mut self, buf: &Bytes) -> Result<usize, Error> {
match self {
Self::Tcp(tcp_stream, _address) => Ok(tcp_stream.write(buf).await?),
Self::Tcp(tcp_stream) => Ok(tcp_stream.write(buf).await?),
}
}

pub async fn flush(&mut self) -> Result<(), Error> {
match self {
Self::Tcp(tcp_stream, _address) => Ok(tcp_stream.flush().await?),
Self::Tcp(tcp_stream) => Ok(tcp_stream.flush().await?),
}
}
}
3 changes: 3 additions & 0 deletions src/mem/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
// Use of this source is governed by GNU Affero General Public License
// that can be found in the LICENSE file.

use stdext::function_name;

use crate::commands::{DispatcherToMemCmd, MemToDispatcherCmd};
use crate::error::Error;
use crate::mem::Mem;
Expand All @@ -15,6 +17,7 @@ impl Mem {
session_group,
command,
} = cmd;
log::info!("{}, session: {session_group:?} cmd: {command:?}", function_name!());
let reply_frame = self.handle_db_command(command)?;
let reply_cmd = MemToDispatcherCmd {
session_group,
Expand Down
9 changes: 4 additions & 5 deletions src/server/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@ use tokio::sync::mpsc;

use crate::dispatcher::Dispatcher;
use crate::error::Error;
use crate::listener::Listener;
use crate::listener::types::ListenerId;
use crate::listener::Listener;
use crate::mem::Mem;
use crate::server::Server;
use crate::storage::Storage;

const CHANNEL_CAPACITY: usize = 16;
const CHANNEL_CAPACITY: usize = 1024 * 16;

impl Server {
pub(crate) async fn init_modules(&mut self, runtime: &Runtime) -> Result<(), Error> {
Expand All @@ -41,8 +41,8 @@ impl Server {
listeners_to_dispatcher_sender.clone(),
dispatcher_to_listener_receiver,
)
.await
.unwrap_or_else(|_| panic!("Failed to listen at {:?}", &listeners_info.last()));
.await
.unwrap_or_else(|_| panic!("Failed to listen at {:?}", &listeners_info.last()));
listener_objs.push(listener);
}

Expand Down Expand Up @@ -89,7 +89,6 @@ impl Server {
dispatcher.run_loop().await;
});


Ok(())
}
}
1 change: 1 addition & 0 deletions src/session/frame.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ impl Session {
#[allow(clippy::unused_async)]
pub(crate) async fn send_disconnect(&mut self) -> Result<(), Error> {
self.status = Status::Disconnecting;
self.listener_sender.send(SessionToListenerCmd::Disconnect(self.id)).await?;
Ok(())
}

Expand Down
3 changes: 3 additions & 0 deletions src/session/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
// Use of this source is governed by GNU Affero General Public License
// that can be found in the LICENSE file.

use stdext::function_name;

use crate::commands::ListenerToSessionCmd;
use crate::error::Error;
use crate::session::Session;
Expand All @@ -14,6 +16,7 @@ impl Session {
match cmd {
ListenerToSessionCmd::Reply(session_id, frame) => {
assert_eq!(session_id, self.id);
log::info!("{} id: {}, send {frame:?} to client", function_name!(), self.id);
Ok(self.send_frame_to_client(frame).await?)
}
}
Expand Down
18 changes: 4 additions & 14 deletions src/session/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,8 @@

use std::time::Instant;

use crate::commands::SessionToListenerCmd;
use crate::session::status::Status;
use crate::session::Session;
use crate::session::status::Status;

impl Session {
pub async fn run_loop(mut self) {
Expand Down Expand Up @@ -40,19 +39,10 @@ impl Session {
}
}

if let Err(err) = self
.listener_sender
.send(SessionToListenerCmd::Disconnect(self.id))
.await
{
log::error!(
"Failed to send disconnect cmd to listener, id: {}, err: {:?}",
self.id,
err
);
}

log::info!("Session {} exit main loop", self.id);
// Now session object goes out of scope and stream is dropped.
if let Err(err) = self.stream.flush().await {
log::warn!("Failed to flush stream, err: {err:?}");
}
}
}

0 comments on commit 94372e4

Please sign in to comment.