Skip to content

Commit

Permalink
(refactor): Moved groupserver connection logic to the handler, slight…
Browse files Browse the repository at this point in the history
…ly improved used of locks
  • Loading branch information
Perseus committed Feb 19, 2024
1 parent 852e7c7 commit d9aa9d4
Show file tree
Hide file tree
Showing 6 changed files with 228 additions and 178 deletions.
3 changes: 2 additions & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
{
"editor.formatOnSave": true
"editor.formatOnSave": true,
"discord.enabled": true
}
33 changes: 2 additions & 31 deletions common_utils/src/network/tcp_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,34 +26,16 @@ impl TcpClient {
}
}

async fn handle_connection<HandlerType, ApplicationContextType, CommandChannelType>(
id: u32,
server_type: String,
stream: TcpStream,
socket_addr: SocketAddr,
) -> anyhow::Result<Arc<RwLock<HandlerType>>>
where
HandlerType: ConnectionHandler<ApplicationContextType, CommandChannelType>,
{
Ok(HandlerType::on_connect(
id,
server_type,
stream,
socket_addr,
))
}

pub async fn connect<HandlerType, ApplicationContextType, CommandChannelType>(
&mut self,
retry_interval_in_secs: u64,
) -> anyhow::Result<Arc<RwLock<HandlerType>>>
) -> anyhow::Result<TcpStream>
where
HandlerType: ConnectionHandler<ApplicationContextType, CommandChannelType>,
{
loop {
match TcpStream::connect(format!("{}:{}", self.target_ip, self.target_port)).await {
Ok(stream) => {
let socket_addr = stream.peer_addr();
println!(
"{}",
format!(
Expand All @@ -62,19 +44,8 @@ impl TcpClient {
)
.green()
);
let connection = TcpClient::handle_connection::<
HandlerType,
ApplicationContextType,
CommandChannelType,
>(
self.id,
self.server_type.clone(),
stream,
socket_addr.unwrap(),
)
.await?;

return Ok(connection);
return Ok(stream);
}
Err(err) => {
println!(
Expand Down
10 changes: 5 additions & 5 deletions common_utils/src/network/tcp_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pub struct TcpConnection<T> {

rpc_mgr: Option<Arc<Mutex<RPCManager>>>,
cancellation_token: CancellationToken,
is_connected: bool,
is_ready: bool,
pub logger: Logger,
}

Expand Down Expand Up @@ -67,7 +67,7 @@ impl<T> TcpConnection<T> {
rpc_reply_recv_rx: rpc_reply_recv_channel.1,
rpc_mgr: None,
cancellation_token: CancellationToken::new(),
is_connected: false,
is_ready: false,
logger: Logger::new(logger_color, logger_prefix, false),
}
}
Expand Down Expand Up @@ -253,11 +253,11 @@ impl<T> TcpConnection<T> {

pub fn mark_connected(&mut self) {
self.logger.info("Marking connection as connected");
self.is_connected = true;
self.is_ready = true;
}

pub fn is_connected(&self) -> bool {
self.is_connected
pub fn is_ready(&self) -> bool {
self.is_ready
}

pub async fn sync_rpc(&mut self, packet: BasePacket) -> anyhow::Result<BasePacket> {
Expand Down
6 changes: 4 additions & 2 deletions gate_server/src/client_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,8 @@ impl ClientHandler {
let connection = self.connection.clone();
let gpserver_handler = self.groupserver_handler.clone().unwrap();

// let gpserver_handler = self.groupserver_handler.unwrap().as_ref();

tokio::spawn(async move {
let mut conn = connection.lock().await;
conn.logger.debug("Handling user login");
Expand Down Expand Up @@ -261,8 +263,8 @@ impl ClientHandler {
gpserver_packet.write_long(conn.get_id()).unwrap();
gpserver_packet.write_short(916).unwrap();

let gpserver_read_handler = gpserver_handler.read().await;
if let Ok(mut result) = gpserver_read_handler.sync_rpc(gpserver_packet).await {
let gpserver_handler = gpserver_handler.read().await;
if let Ok(mut result) = gpserver_handler.sync_rpc(gpserver_packet).await {
if !result.has_data() {
let err_pkt = ClientLoginErrorPacket::new(ErrorCode::ErrNetworkException)
.to_base_packet()
Expand Down
66 changes: 22 additions & 44 deletions gate_server/src/gate_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ pub struct GateServer {
// other services store U32 as the player's "gate address" which is required to communicate to
// specific players directly
player_num_counter: Arc<AtomicU32>,
group_handler: Option<Arc<RwLock<GroupServerHandler>>>,
group_handler: Arc<RwLock<GroupServerHandler>>,
}

impl GateServer {
Expand All @@ -70,7 +70,7 @@ impl GateServer {
config,
player_num_counter: Arc::new(AtomicU32::new(1)),
client_connections: Arc::new(RwLock::new(HashMap::new())),
group_handler: None,
group_handler: Arc::new(RwLock::new(GroupServerHandler::new())),
}
}

Expand Down Expand Up @@ -173,7 +173,7 @@ impl GateServer {
);

let player_counter = self.player_num_counter.clone();
let groupserver_handler = self.group_handler.clone().unwrap();
let groupserver_handler = self.group_handler.clone();
let (client_to_gate_tx, mut client_to_gate_rx) = mpsc::channel::<ClientGateCommands>(100);
let client_tcp_connections_for_client_commands_handler = client_tcp_connections.clone();

Expand Down Expand Up @@ -218,51 +218,29 @@ impl GateServer {
*/
async fn start_group_server_connection_handler(&mut self) {
let (ip, port) = self.config.get_server_ip_and_port_for_group_server();
let connect_addr = format!("{}:{}", ip, port);

let mut group_server_client = TcpClient::new(1, "GroupServer".to_string(), ip, port);
let (group_to_gate_tx, group_to_gate_rx) = mpsc::channel::<GateGroupCommands>(100);
if let Ok(group_handler) = group_server_client
.connect::<GroupServerHandler, GroupServer, GateGroupCommands>(5)
.await
{
self.group_handler = Some(group_handler);
let player_list = self.client_connections.read().await;
if let Err(err) = GroupServerHandler::on_connected(
self.group_handler.clone().unwrap(),
let (group_to_gate_tx, mut group_to_gate_rx) = mpsc::channel::<GateGroupCommands>(100);

let player_map = self.client_connections.clone();
let gate_server_name = "GateServer".to_string();
let gp_handler = self.group_handler.clone();
tokio::spawn(async move {
GroupServerHandler::start_connection_loop(
gp_handler,
player_map,
gate_server_name,
ip,
port,
group_to_gate_tx,
)
.await
{
println!(
"{} {}",
"Failed to start group server connection handler".red(),
err.to_string().red().underline(),
);
panic!()
} else {
println!("{}", "Connected to group server".green());
}

self.group_handler
.clone()
.unwrap()
.read()
.await
.sync_player_list(player_list, "GateServer".to_string())
.await;

// TODO: handle data coming in from groupserver on `group_to_gate_rx`
} else {
println!(
"{} {}",
"Unable to connect to group server at".red(),
connect_addr.red(),
);
.await;
});

sleep(tokio::time::Duration::from_secs(5)).await;
panic!()
}
tokio::spawn(async move {
while let Some(command) = group_to_gate_rx.recv().await {
match command {}
}
});
}

pub async fn start(&mut self) -> anyhow::Result<()> {
Expand Down
Loading

0 comments on commit d9aa9d4

Please sign in to comment.