Skip to content

Commit

Permalink
(feat): TcpServer/Connection + basic GateServer<->Client connection s…
Browse files Browse the repository at this point in the history
…etup done
  • Loading branch information
Perseus committed Feb 4, 2024
1 parent ad77e3f commit 431cbdb
Show file tree
Hide file tree
Showing 34 changed files with 1,264 additions and 1,241 deletions.
8 changes: 8 additions & 0 deletions .idea/.gitignore

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions .idea/modules.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions .idea/top-server-rewrite.iml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions .idea/vcs.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

64 changes: 64 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"type": "lldb",
"request": "launch",
"name": "Debug executable 'gate_server'",
"cargo": {
"args": [
"build",
"--bin=gate_server",
"--package=gate_server"
],
"filter": {
"name": "gate_server",
"kind": "bin"
}
},
"args": [],
"cwd": "${workspaceFolder}"
},
{
"type": "lldb",
"request": "launch",
"name": "Debug unit tests in executable 'gate_server'",
"cargo": {
"args": [
"test",
"--no-run",
"--bin=gate_server",
"--package=gate_server"
],
"filter": {
"name": "gate_server",
"kind": "bin"
}
},
"args": [],
"cwd": "${workspaceFolder}"
},
{
"type": "lldb",
"request": "launch",
"name": "Debug unit tests in library 'common_utils'",
"cargo": {
"args": [
"test",
"--no-run",
"--lib",
"--package=common_utils"
],
"filter": {
"name": "common_utils",
"kind": "lib"
}
},
"args": [],
"cwd": "${workspaceFolder}"
}
]
}
3 changes: 3 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"editor.formatOnSave": true
}
9 changes: 6 additions & 3 deletions common_utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,14 @@ edition = "2021"
[dependencies]
anyhow = "1.0.65"
serde_yaml = "0.9.13"
serde = { version = "1.0", features = [ "derive" ] }
serde = { version = "1.0", features = ["derive"] }
bytes = { version = "1.0.1" }
num_enum = { version = "0.5.7" }
byteorder = { version = "1.4.3" }
bincode = { version = "1.3.3" }
bincode = { version = "1.3.3" }
hex = { version = "0.4.3" }
num = { version = "0.4.0" }
derive_more = { version = "0.99.17" }
derive_more = { version = "0.99.17" }
tokio = { version = "1.9.0" }
chrono = { version = "0.4.33" }
nanoid = { version = "0.4.0" }
3 changes: 2 additions & 1 deletion common_utils/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
pub mod network;
pub mod packet;
pub mod parser;
pub mod packet;
3 changes: 3 additions & 0 deletions common_utils/src/network/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
pub mod rpc;
pub mod tcp_connection;
pub mod tcp_server;
1 change: 1 addition & 0 deletions common_utils/src/network/rpc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

1 change: 1 addition & 0 deletions common_utils/src/network/tcp_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
struct TcpClient {}
137 changes: 137 additions & 0 deletions common_utils/src/network/tcp_connection.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
use bytes::{Buf, BytesMut};
use std::mem;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use tokio::time::interval;
use tokio::{io::AsyncReadExt, time::Instant};

use tokio::net::TcpStream;
use tokio::sync::{mpsc, Mutex};

use crate::packet::heartbeat::{self, HeartbeatPacket};
use crate::packet::{BasePacket, PacketWriter, TypedPacket};

pub struct TcpConnection<T> {
id: String,
stream: Option<TcpStream>,
socket_addr: SocketAddr,
application_context: Option<T>,
recv_buffer: BytesMut,
last_received_data_at: Arc<Mutex<Instant>>,
}

const MAX_PACKET_SIZE: usize = 4096;

impl<T> TcpConnection<T> {
pub fn new(id: String, stream: TcpStream, socket_addr: SocketAddr) -> Self {
let connection = TcpConnection {
id,
socket_addr,
stream: Some(stream),
application_context: None,
recv_buffer: BytesMut::with_capacity(MAX_PACKET_SIZE),
last_received_data_at: Arc::new(Mutex::new(Instant::now())),
};

connection
}

fn heartbeat_handler(&self, data_send_channel_producer: mpsc::Sender<BasePacket>) {
let last_received_data_at = self.last_received_data_at.clone();
tokio::spawn(async move {
let mut interval = interval(Duration::from_millis(100));
loop {
interval.tick().await;
let last_received_data_at = last_received_data_at.lock().await;
if last_received_data_at.elapsed().as_secs() >= 2 {
let heartbeat_packet = HeartbeatPacket::default();
let base_packet: BasePacket = heartbeat_packet.to_base_packet().unwrap();
println!("Sending heartbeat ping");
data_send_channel_producer.send(base_packet).await;
}
}
});
}

pub async fn start_processing(
&mut self,
_data_recv_channel_producer: mpsc::Sender<BasePacket>,
mut data_send_channel_consumer: mpsc::Receiver<BasePacket>,
mut data_send_channel_producer: mpsc::Sender<BasePacket>,
) {
let stream = self.stream.take();
let (mut reader, writer) = stream.unwrap().into_split();
tokio::spawn(async move {
let stream_writer = writer;
loop {
match data_send_channel_consumer.recv().await {
Some(packet) => {
let packet_as_vec = packet.get_as_bytes().to_vec();
let packet_as_slice = packet_as_vec.as_slice();

println!("Packet as slice {:?}", packet_as_slice);
let bytes_written = stream_writer.try_write(packet_as_slice);
println!("Bytes written - {:?}", bytes_written);
}
None => {}
}
}
});

self.heartbeat_handler(data_send_channel_producer);

let mut buffer = BytesMut::with_capacity(MAX_PACKET_SIZE);
loop {
match reader.read_buf(&mut buffer).await {
Ok(0) => {
println!("Connection closed by client.");
break;
}

Ok(n) => {
println!("Received {} bytes", n);
println!("Buffer: {:?}", buffer);
let mut last_received_data_at = self.last_received_data_at.lock().await;
*last_received_data_at = Instant::now();

match BasePacket::check_frame(&buffer) {
Err(crate::packet::FrameError::Incomplete) => {
println!("Incomplete frame");
continue;
}

Err(crate::packet::FrameError::Invalid) => {
println!("Invalid frame");
break;
}

Ok(n) => {
println!("Frame length - {}", n);
let packet = BasePacket::parse_frame(&mut buffer, n).unwrap();
println!("Packet - {:?}", packet);
if packet.get_len() == 2 {
println!("Received heartbeat ping");
buffer.advance(n);

continue;
}
_data_recv_channel_producer.send(packet).await.unwrap();
buffer.advance(n);
}
}
}

Err(err) => panic!("{:?}", err),
}
}
}

pub fn set_application_context(&mut self, context: T) {
self.application_context = Some(context);
}

pub fn get_application_context(&self) -> Option<&T> {
self.application_context.as_ref()
}
}
76 changes: 76 additions & 0 deletions common_utils/src/network/tcp_server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
use std::net::SocketAddr;
use tokio::{
net::{TcpListener, TcpStream},
sync::mpsc,
task::AbortHandle,
};

use crate::packet::BasePacket;

use super::tcp_connection::TcpConnection;

pub struct TcpServer {
server: TcpListener,
}
pub trait ServerHandler<ApplicationContextType> {
fn on_connect(
id: String,
stream: TcpStream,
socket: SocketAddr,
) -> TcpConnection<ApplicationContextType>;
fn on_connected(
connection: TcpConnection<ApplicationContextType>,
) -> anyhow::Result<mpsc::Sender<BasePacket>>;
// fn on_disconnect(self);
fn on_data(packet: BasePacket);
// fn on_error(&mut self, stream: TcpListener, error: &str);
}

impl TcpServer {
pub async fn start<HandlerType, ApplicationContextType>(
connect_addr: String,
client_comm_channel_tracker_tx: mpsc::Sender<(String, mpsc::Sender<BasePacket>)>,
) -> anyhow::Result<()>
where
HandlerType: ServerHandler<ApplicationContextType>,
{
let server = match TcpListener::bind(&connect_addr).await {
Ok(listener) => listener,
Err(err) => panic!(
"Unable to start the server at {}, error - {:?}",
connect_addr, err
),
};

loop {
let (socket, socket_addr) = server.accept().await?;
TcpServer::handle_new_connection::<HandlerType, ApplicationContextType>(
socket,
socket_addr,
client_comm_channel_tracker_tx.clone(),
)
.await?;
}
}

async fn handle_new_connection<HandlerType, ApplicationContextType>(
socket: TcpStream,
socket_addr: SocketAddr,
client_comm_channel_tracker_tx: mpsc::Sender<(String, mpsc::Sender<BasePacket>)>,
) -> anyhow::Result<()>
where
HandlerType: ServerHandler<ApplicationContextType>,
{
let id = nanoid::nanoid!();
let handler = HandlerType::on_connect(id.clone(), socket, socket_addr);
if let Ok(client_comm_channel) = HandlerType::on_connected(handler) {
client_comm_channel_tracker_tx
.send((id, client_comm_channel))
.await?;
} else {
// HandlerType::on_disconnect();
}

Ok(())
}
}
2 changes: 2 additions & 0 deletions common_utils/src/packet/auth.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@


13 changes: 13 additions & 0 deletions common_utils/src/packet/commands.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
use num_enum::{IntoPrimitive, TryFromPrimitive};

#[derive(PartialEq, Debug, TryFromPrimitive, Clone, IntoPrimitive)]
#[repr(u16)]
pub enum Command {
None,

// Client to GateServer
CTGTLogin = 431,

// GateServer to Client
GTTCEstablishConnection = 940,
}
Loading

0 comments on commit 431cbdb

Please sign in to comment.