Skip to content

Commit

Permalink
feat: connector (#316)
Browse files Browse the repository at this point in the history
* WIP: mock for connector node

* WIP: connector server received event from agent

* WIP: connector remote rpc and sql storage

* WIP: media_core feedback peer events

* fixed typos

* added stream event and added index to migration

* added simple test for connector sql_storage
  • Loading branch information
giangndm committed Jun 25, 2024
1 parent e892eb5 commit ec28ecf
Show file tree
Hide file tree
Showing 62 changed files with 4,874 additions and 188 deletions.
1,129 changes: 1,098 additions & 31 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ members = [
"packages/transport_webrtc",
"packages/media_secure",
"packages/media_gateway",
"packages/audio_mixer",
"packages/audio_mixer", "packages/media_connector",
]

[workspace.dependencies]
Expand Down
6 changes: 4 additions & 2 deletions bin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ media-server-protocol = { path = "../packages/protocol", features = ["quinn-rpc"
media-server-secure = { path = "../packages/media_secure" }
media-server-runner = { path = "../packages/media_runner", optional = true }
media-server-gateway = { path = "../packages/media_gateway", optional = true }
media-server-connector = { path = "../packages/media_connector", optional = true }
media-server-utils = { path = "../packages/media_utils", optional = true }
local-ip-address = "0.6"
serde = { version = "1.0", features = ["derive"] }
quinn = { version = "0.11", optional = true }
Expand All @@ -33,10 +35,10 @@ sysinfo = { version = "0.30", optional = true }

[features]
default = ["console", "gateway", "media", "connector", "cert_utils"]
gateway = ["media-server-gateway", "quinn_vnet", "node_metrics", "maxminddb"]
gateway = ["media-server-gateway", "media-server-connector", "quinn_vnet", "node_metrics", "maxminddb"]
media = ["media-server-runner", "quinn_vnet", "node_metrics"]
console = []
connector = ["quinn_vnet"]
connector = ["quinn_vnet", "media-server-connector", "media-server-utils"]
cert_utils = ["rcgen", "rustls"]
quinn_vnet = ["rustls", "quinn"]
node_metrics = ["sysinfo"]
9 changes: 9 additions & 0 deletions bin/connector_z0_n1.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
RUST_LOG=info \
RUST_BACKTRACE=1 \
cargo run -- \
--http-port 3000 \
--node-id 4 \
--sdn-port 10004 \
--sdn-zone 0 \
--seeds 0@/ip4/127.0.0.1/udp/10000 \
connector
9 changes: 9 additions & 0 deletions bin/connector_z256_n1.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
RUST_LOG=info \
RUST_BACKTRACE=1 \
cargo run -- \
--http-port 3000 \
--node-id 259 \
--sdn-port 11003 \
--sdn-zone 256 \
--seeds 256@/ip4/127.0.0.1/udp/11000 \
connector
15 changes: 14 additions & 1 deletion bin/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ use std::net::SocketAddr;
use std::sync::Arc;

use media_server_protocol::endpoint::ClusterConnId;
#[cfg(feature = "console")]
use media_server_protocol::protobuf::cluster_connector::MediaConnectorServiceClient;
#[cfg(feature = "console")]
use media_server_protocol::rpc::quinn::{QuinnClient, QuinnStream};
use media_server_protocol::transport::{RpcReq, RpcRes};
use media_server_secure::{MediaEdgeSecure, MediaGatewaySecure};
use poem::endpoint::StaticFilesEndpoint;
Expand Down Expand Up @@ -48,6 +52,7 @@ pub async fn run_console_http_server(
port: u16,
secure: media_server_secure::jwt::MediaConsoleSecureJwt,
storage: crate::server::console_storage::StorageShared,
connector: MediaConnectorServiceClient<SocketAddr, QuinnClient, QuinnStream>,
) -> Result<(), Box<dyn std::error::Error>> {
let user_service: OpenApiService<_, ()> = OpenApiService::new(api_console::user::Apis, "Console User APIs", env!("CARGO_PKG_VERSION")).server("/api/user/");
let user_ui = user_service.swagger_ui();
Expand All @@ -57,7 +62,11 @@ pub async fn run_console_http_server(
let cluster_ui = cluster_service.swagger_ui();
let cluster_spec = cluster_service.spec();

let ctx = api_console::ConsoleApisCtx { secure, storage };
let connector_service: OpenApiService<_, ()> = OpenApiService::new(api_console::connector::Apis, "Console Connector APIs", env!("CARGO_PKG_VERSION")).server("/api/connector/");
let connector_ui = connector_service.swagger_ui();
let connector_spec = connector_service.spec();

let ctx = api_console::ConsoleApisCtx { secure, storage, connector };

let route = Route::new()
//TODO build UI and embed to here
Expand All @@ -70,6 +79,10 @@ pub async fn run_console_http_server(
.nest("/api/cluster/", cluster_service.data(ctx.clone()))
.nest("/api/cluster/ui", cluster_ui)
.at("/api/cluster/spec", poem::endpoint::make_sync(move |_| cluster_spec.clone()))
//connector
.nest("/api/connector/", connector_service.data(ctx.clone()))
.nest("/api/connector/ui", connector_ui)
.at("/api/connector/spec", poem::endpoint::make_sync(move |_| connector_spec.clone()))
.with(Cors::new());

Server::new(TcpListener::bind(SocketAddr::new([0, 0, 0, 0].into(), port))).run(route).await?;
Expand Down
8 changes: 8 additions & 0 deletions bin/src/http/api_console.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,24 @@
use std::net::SocketAddr;

use media_server_protocol::{
protobuf::cluster_connector::MediaConnectorServiceClient,
rpc::quinn::{QuinnClient, QuinnStream},
};
use media_server_secure::{jwt::MediaConsoleSecureJwt, MediaConsoleSecure};
use poem::Request;
use poem_openapi::{auth::ApiKey, SecurityScheme};

use crate::server::console_storage::StorageShared;

pub mod cluster;
pub mod connector;
pub mod user;

#[derive(Clone)]
pub struct ConsoleApisCtx {
pub secure: MediaConsoleSecureJwt, //TODO make it generic
pub storage: StorageShared,
pub connector: MediaConnectorServiceClient<SocketAddr, QuinnClient, QuinnStream>,
}

/// ApiKey authorization
Expand Down
235 changes: 235 additions & 0 deletions bin/src/http/api_console/connector.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,235 @@
use super::{super::Response, ConsoleApisCtx, ConsoleAuthorization};
use media_server_protocol::{
connector::CONNECTOR_RPC_PORT,
protobuf::cluster_connector::{GetEventParams, GetParams, GetPeerParams},
rpc::node_vnet_addr,
};
use poem::web::Data;
use poem_openapi::{
param::{Path, Query},
payload::Json,
OpenApi,
};

#[derive(poem_openapi::Object)]
pub struct RoomInfo {
pub id: i32,
pub room: String,
}

#[derive(poem_openapi::Object)]
pub struct PeerSession {
pub id: i32,
/// u64 cause wrong parse in js, so we convert it to string
pub session: String,
pub peer_id: i32,
pub peer: String,
pub created_at: u64,
pub joined_at: u64,
pub leaved_at: Option<u64>,
}

#[derive(poem_openapi::Object)]
pub struct PeerInfo {
pub id: i32,
pub room_id: i32,
pub room: String,
pub peer: String,
pub created_at: u64,
pub sessions: Vec<PeerSession>,
}

#[derive(poem_openapi::Object)]
pub struct SessionInfo {
/// u64 cause wrong parse in js, so we convert it to string
pub id: String,
pub ip: Option<String>,
pub user_agent: Option<String>,
pub sdk: Option<String>,
pub created_at: u64,
pub sessions: Vec<PeerSession>,
}

#[derive(poem_openapi::Object)]
pub struct EventInfo {
pub id: i32,
/// u64 cause wrong parse in js, so we convert it to string
pub session: String,
pub node: u32,
pub node_ts: u64,
pub created_at: u64,
pub event: String,
pub meta: Option<String>,
}

pub struct Apis;

#[OpenApi]
impl Apis {
/// get rooms
#[oai(path = "/:node/log/rooms", method = "get")]
async fn rooms(&self, _auth: ConsoleAuthorization, Data(ctx): Data<&ConsoleApisCtx>, Path(node): Path<u32>, Query(page): Query<u32>, Query(limit): Query<u32>) -> Json<Response<Vec<RoomInfo>>> {
match ctx.connector.rooms(node_vnet_addr(node, CONNECTOR_RPC_PORT), GetParams { page, limit }).await {
Some(res) => Json(Response {
status: true,
error: None,
data: Some(res.rooms.into_iter().map(|e| RoomInfo { id: e.id, room: e.room }).collect::<Vec<_>>()),
}),
None => Json(Response {
status: false,
error: Some("CLUSTER_ERROR".to_string()),
data: None,
}),
}
}

/// get peers
#[oai(path = "/:node/log/peers", method = "get")]
async fn peers(
&self,
_auth: ConsoleAuthorization,
Data(ctx): Data<&ConsoleApisCtx>,
Path(node): Path<u32>,
Query(room): Query<Option<i32>>,
Query(page): Query<u32>,
Query(limit): Query<u32>,
) -> Json<Response<Vec<PeerInfo>>> {
match ctx.connector.peers(node_vnet_addr(node, CONNECTOR_RPC_PORT), GetPeerParams { room, page, limit }).await {
Some(res) => Json(Response {
status: true,
error: None,
data: Some(
res.peers
.into_iter()
.map(|p| PeerInfo {
id: p.id,
room_id: p.room_id,
room: p.room,
peer: p.peer,
created_at: p.created_at,
sessions: p
.sessions
.into_iter()
.map(|s| PeerSession {
id: s.id,
session: s.session.to_string(),
peer_id: s.peer_id,
peer: s.peer,
created_at: s.created_at,
joined_at: s.joined_at,
leaved_at: s.leaved_at,
})
.collect::<Vec<_>>(),
})
.collect::<Vec<_>>(),
),
}),
None => Json(Response {
status: false,
error: Some("CLUSTER_ERROR".to_string()),
data: None,
}),
}
}

/// get peers
#[oai(path = "/:node/log/sessions", method = "get")]
async fn sessions(
&self,
_auth: ConsoleAuthorization,
Data(ctx): Data<&ConsoleApisCtx>,
Path(node): Path<u32>,
Query(page): Query<u32>,
Query(limit): Query<u32>,
) -> Json<Response<Vec<SessionInfo>>> {
match ctx.connector.sessions(node_vnet_addr(node, CONNECTOR_RPC_PORT), GetParams { page, limit }).await {
Some(res) => Json(Response {
status: true,
error: None,
data: Some(
res.sessions
.into_iter()
.map(|p| SessionInfo {
id: p.id.to_string(),
ip: p.ip,
user_agent: p.user_agent,
sdk: p.sdk,
created_at: p.created_at,
sessions: p
.peers
.into_iter()
.map(|s| PeerSession {
id: s.id,
session: s.session.to_string(),
peer_id: s.peer_id,
peer: s.peer,
created_at: s.created_at,
joined_at: s.joined_at,
leaved_at: s.leaved_at,
})
.collect::<Vec<_>>(),
})
.collect::<Vec<_>>(),
),
}),
None => Json(Response {
status: false,
error: Some("CLUSTER_ERROR".to_string()),
data: None,
}),
}
}

/// get events
#[oai(path = "/:node/log/events", method = "get")]
async fn events(
&self,
_auth: ConsoleAuthorization,
Data(ctx): Data<&ConsoleApisCtx>,
Path(node): Path<u32>,
Query(session): Query<Option<u64>>,
Query(start_ts): Query<Option<u64>>,
Query(end_ts): Query<Option<u64>>,
Query(page): Query<u32>,
Query(limit): Query<u32>,
) -> Json<Response<Vec<EventInfo>>> {
match ctx
.connector
.events(
node_vnet_addr(node, CONNECTOR_RPC_PORT),
GetEventParams {
session,
start_ts,
end_ts,
page,
limit,
},
)
.await
{
Some(res) => Json(Response {
status: true,
error: None,
data: Some(
res.events
.into_iter()
.map(|e| EventInfo {
id: e.id,
session: e.session.to_string(),
node: e.node,
node_ts: e.node_ts,
created_at: e.created_at,
event: e.event,
meta: e.meta,
})
.collect::<Vec<_>>(),
),
}),
None => Json(Response {
status: false,
error: Some("CLUSTER_ERROR".to_string()),
data: None,
}),
}
}
}
Loading

0 comments on commit ec28ecf

Please sign in to comment.