Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix build warnings and clippy warnings #328

Merged
merged 6 commits into from
Jul 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
[workspace]
resolver = "2"
members = [
"bin",
"packages/protocol",
Expand All @@ -12,7 +13,7 @@ members = [
]

[workspace.dependencies]
sans-io-runtime = { git = "https://github.com/giangndm/sans-io-runtime.git", rev = "c781cef12b2a435b5e31a6ede69d301a23719452" }
sans-io-runtime = { git = "https://github.com/giangndm/sans-io-runtime.git", rev = "c781cef12b2a435b5e31a6ede69d301a23719452" , default-features = false}
atm0s-sdn = { git = "https://github.com/giangndm/8xFF-decentralized-sdn.git", rev = "2ad5d3a092b63f871a90a9600d2fcc8cb3027a24" }
tracing-subscriber = { version = "0.3", features = ["env-filter", "std"] }
convert-enum = "0.1"
Expand Down
2 changes: 1 addition & 1 deletion bin/src/http/api_console.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,5 @@

async fn api_checker(req: &Request, api_key: ApiKey) -> Option<()> {
let data = req.data::<ConsoleApisCtx>()?;
data.secure.validate_token(&api_key.key).then(|| ())
data.secure.validate_token(&api_key.key).then_some(())

Check warning on line 31 in bin/src/http/api_console.rs

View check run for this annotation

Codecov / codecov/patch

bin/src/http/api_console.rs#L31

Added line #L31 was not covered by tests
}
1 change: 1 addition & 0 deletions bin/src/http/api_console/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ impl Apis {
}

/// get events
#[allow(clippy::too_many_arguments)]
#[oai(path = "/:node/log/events", method = "get")]
async fn events(
&self,
Expand Down
4 changes: 2 additions & 2 deletions bin/src/quinn/vnet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use super::vsocket::VirtualUdpSocket;

#[derive(Debug)]
pub struct NetworkPkt {
pub local_port: u16,
pub _local_port: u16,
pub remote: NodeId,
pub remote_port: u16,
pub data: Buffer,
Expand Down Expand Up @@ -77,7 +77,7 @@ impl VirtualNetwork {
let event = event?;
match event {
socket::Event::RecvFrom(local_port, remote, remote_port, data, meta) => {
let pkt = NetworkPkt { data, local_port, remote, remote_port, meta };
let pkt = NetworkPkt { data, _local_port: local_port, remote, remote_port, meta };
if let Some(socket_tx) = self.sockets.get(&local_port) {
if let Err(e) = socket_tx.try_send(pkt) {
log::error!("Send to socket {} error {:?}", local_port, e);
Expand Down
2 changes: 1 addition & 1 deletion bin/src/server/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
pub async fn run_media_connector(workers: usize, node: NodeConfig, args: Args) {
rustls::crypto::ring::default_provider().install_default().expect("should install ring as default");

let mut connector_storage = Arc::new(ConnectorStorage::new(&args.db_uri).await);
let connector_storage = Arc::new(ConnectorStorage::new(&args.db_uri).await);

Check warning on line 52 in bin/src/server/connector.rs

View check run for this annotation

Codecov / codecov/patch

bin/src/server/connector.rs#L52

Added line #L52 was not covered by tests

let default_cluster_cert_buf = include_bytes!("../../certs/cluster.cert");
let default_cluster_key_buf = include_bytes!("../../certs/cluster.key");
Expand Down
8 changes: 4 additions & 4 deletions bin/src/server/console/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ impl Storage {
ClusterNodeInfo::Console(generic) => {
let zone_id = node & 0xFF_FF_FF_00;
log::info!("Zone {zone_id} on console ping, zones {}", self.zones.len());
let zone = self.zones.entry(zone_id).or_insert_with(Default::default);
let zone = self.zones.entry(zone_id).or_default();
zone.consoles.insert(
node,
ConsoleContainer {
Expand All @@ -165,7 +165,7 @@ impl Storage {
ClusterNodeInfo::Gateway(generic, info) => {
let zone_id = node & 0xFF_FF_FF_00;
log::info!("Zone {zone_id} on gateway ping");
let zone = self.zones.entry(zone_id).or_insert_with(Default::default);
let zone = self.zones.entry(zone_id).or_default();
zone.lat = info.lat;
zone.lon = info.lon;
zone.gateways.insert(
Expand All @@ -181,7 +181,7 @@ impl Storage {
ClusterNodeInfo::Media(generic, info) => {
let zone_id = node & 0xFF_FF_FF_00;
log::info!("Zone {zone_id} on media ping");
let zone = self.zones.entry(zone_id).or_insert_with(Default::default);
let zone = self.zones.entry(zone_id).or_default();
zone.medias.insert(
node,
MediaContainer {
Expand All @@ -195,7 +195,7 @@ impl Storage {
ClusterNodeInfo::Connector(generic) => {
let zone_id = node & 0xFF_FF_FF_00;
log::info!("Zone {zone_id} on connector ping, zones {}", self.zones.len());
let zone = self.zones.entry(zone_id).or_insert_with(Default::default);
let zone = self.zones.entry(zone_id).or_default();
zone.connectors.insert(
node,
ConnectorContainer {
Expand Down
2 changes: 1 addition & 1 deletion bin/src/server/gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@
media_server_gateway::store_service::Event::FindNodeRes(req_id, res) => requester.on_find_node_res(req_id, res),
},
SdnExtOut::ServicesEvent(_, _, SE::Connector(event)) => match event {
media_server_connector::agent_service::Event::Stats { queue, inflight, acked } => {}
media_server_connector::agent_service::Event::Stats { queue: _, inflight: _, acked: _ } => {}

Check warning on line 224 in bin/src/server/gateway.rs

View check run for this annotation

Codecov / codecov/patch

bin/src/server/gateway.rs#L224

Added line #L224 was not covered by tests
},
SdnExtOut::FeaturesEvent(_, FeaturesEvent::Socket(event)) => {
if let Err(e) = vnet_tx.try_send(event) {
Expand Down
2 changes: 1 addition & 1 deletion bin/src/server/media.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@
if let Some(metrics) = node_metrics_collector.pop_measure() {
controller.send_to(
0, //because sdn controller allway is run inside worker 0
ExtIn::NodeStats(metrics).into(),
ExtIn::NodeStats(metrics),

Check warning on line 148 in bin/src/server/media.rs

View check run for this annotation

Codecov / codecov/patch

bin/src/server/media.rs#L148

Added line #L148 was not covered by tests
);
}
while let Ok(control) = vnet_rx.try_recv() {
Expand Down
3 changes: 1 addition & 2 deletions bin/src/server/media/runtime_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
use media_server_protocol::transport::{RpcReq, RpcRes};
use media_server_runner::{Input as WorkerInput, MediaConfig, MediaServerWorker, Output as WorkerOutput, Owner, UserData, SC, SE, TC, TW};
use media_server_secure::MediaEdgeSecure;
use rand::random;
use sans_io_runtime::{BusChannelControl, BusControl, BusEvent, WorkerInner, WorkerInnerInput, WorkerInnerOutput};

use crate::NodeConfig;
Expand Down Expand Up @@ -58,7 +57,7 @@
let worker = MediaServerWorker::new(
index,
cfg.node.node_id,
random(),
cfg.session,

Check warning on line 60 in bin/src/server/media/runtime_worker.rs

View check run for this annotation

Codecov / codecov/patch

bin/src/server/media/runtime_worker.rs#L60

Added line #L60 was not covered by tests
dhilipsiva marked this conversation as resolved.
Show resolved Hide resolved
&cfg.node.secret,
cfg.controller,
cfg.node.udp_port,
Expand Down
4 changes: 3 additions & 1 deletion packages/audio_mixer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,9 @@ impl<Src: Debug + Clone + Eq + Hash> AudioMixer<Src> {
for (i, slot) in self.outputs.iter().enumerate() {
if let Some(OutputSlotState { audio_level, source }) = slot {
if let Some((_, _, lowest_slot_audio_level)) = &mut lowest {
if *audio_level < *lowest_slot_audio_level || (*audio_level == *lowest_slot_audio_level) {
dhilipsiva marked this conversation as resolved.
Show resolved Hide resolved
// TODO: We need to process some case we have same audio_level. Just check with smaller only:
// https://github.com/8xFF/atm0s-media-server/pull/328#discussion_r1667336073
if *audio_level <= *lowest_slot_audio_level {
lowest = Some((i, source.clone(), *audio_level));
}
} else {
Expand Down
12 changes: 12 additions & 0 deletions packages/media_connector/src/agent_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@
_tmp: std::marker::PhantomData<(UserData, SC, SE, TC, TW)>,
}

impl<UserData, SC, SE, TC, TW> Default for ConnectorAgentService<UserData, SC, SE, TC, TW> {
fn default() -> Self {
Self::new()
}

Check warning on line 39 in packages/media_connector/src/agent_service.rs

View check run for this annotation

Codecov / codecov/patch

packages/media_connector/src/agent_service.rs#L37-L39

Added lines #L37 - L39 were not covered by tests
}

impl<UserData, SC, SE, TC, TW> ConnectorAgentService<UserData, SC, SE, TC, TW> {
pub fn new() -> Self {
Self {
Expand Down Expand Up @@ -159,6 +165,12 @@
_tmp: std::marker::PhantomData<(UserData, SC, SE, TC, TW)>,
}

impl<UserData, SC, SE, TC, TW> Default for ConnectorAgentServiceBuilder<UserData, SC, SE, TC, TW> {
fn default() -> Self {
Self::new()
}

Check warning on line 171 in packages/media_connector/src/agent_service.rs

View check run for this annotation

Codecov / codecov/patch

packages/media_connector/src/agent_service.rs#L169-L171

Added lines #L169 - L171 were not covered by tests
}

impl<UserData, SC, SE, TC, TW> ConnectorAgentServiceBuilder<UserData, SC, SE, TC, TW> {
pub fn new() -> Self {
Self { _tmp: std::marker::PhantomData }
Expand Down
12 changes: 12 additions & 0 deletions packages/media_connector/src/handler_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@
_tmp: std::marker::PhantomData<(UserData, SC, SE, TC, TW)>,
}

impl<UserData, SC, SE, TC, TW> Default for ConnectorHandlerService<UserData, SC, SE, TC, TW> {
fn default() -> Self {
Self::new()
}

Check warning on line 43 in packages/media_connector/src/handler_service.rs

View check run for this annotation

Codecov / codecov/patch

packages/media_connector/src/handler_service.rs#L41-L43

Added lines #L41 - L43 were not covered by tests
}

impl<UserData, SC, SE, TC, TW> ConnectorHandlerService<UserData, SC, SE, TC, TW> {
pub fn new() -> Self {
Self {
Expand Down Expand Up @@ -152,6 +158,12 @@
_tmp: std::marker::PhantomData<(UserData, SC, SE, TC, TW)>,
}

impl<UserData, SC, SE, TC, TW> Default for ConnectorHandlerServiceBuilder<UserData, SC, SE, TC, TW> {
fn default() -> Self {
Self::new()
}

Check warning on line 164 in packages/media_connector/src/handler_service.rs

View check run for this annotation

Codecov / codecov/patch

packages/media_connector/src/handler_service.rs#L162-L164

Added lines #L162 - L164 were not covered by tests
}

impl<UserData, SC, SE, TC, TW> ConnectorHandlerServiceBuilder<UserData, SC, SE, TC, TW> {
pub fn new() -> Self {
Self { _tmp: std::marker::PhantomData }
Expand Down
4 changes: 2 additions & 2 deletions packages/media_connector/src/msg_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,15 @@ impl<M: Message, const MAX_INFLIGHT: usize> MessageQueue<M, MAX_INFLIGHT> {

pub fn pop(&mut self, now_ms: u64) -> Option<&M> {
if let Some(msg_id) = self.pop_retry_msg_id(now_ms) {
let entry = self.inflight_ts.entry(now_ms).or_insert_with(Default::default);
let entry = self.inflight_ts.entry(now_ms).or_default();
entry.push(msg_id);
return Some(self.inflight.get(&msg_id).expect("should exist retry_msg_id"));
}

if self.inflight.len() < MAX_INFLIGHT {
let msg = self.queue.pop_front()?;
let msg_id = msg.msg_id();
let entry = self.inflight_ts.entry(now_ms).or_insert_with(Default::default);
let entry = self.inflight_ts.entry(now_ms).or_default();
entry.push(msg_id);
self.inflight.insert(msg_id, msg);
self.inflight.get(&msg_id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ impl MigrationTrait for Migration {
enum Room {
Table,
Id,
#[allow(clippy::enum_variant_names)]
Room,
CreatedAt,
}
Expand All @@ -130,6 +131,7 @@ enum Peer {
Table,
Id,
Room,
#[allow(clippy::enum_variant_names)]
Peer,
CreatedAt,
}
Expand Down Expand Up @@ -165,6 +167,7 @@ enum Event {
NodeTs,
Session,
CreatedAt,
#[allow(clippy::enum_variant_names)]
Event,
Meta,
}
4 changes: 3 additions & 1 deletion packages/media_core/src/cluster/room/audio_mixer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ pub enum Output<Endpoint> {
OnResourceEmpty,
}

type AudioMixerManuals<T> = TaskSwitcherBranch<TaskGroup<manual::Input, Output<T>, ManualMixer<T>, 4>, (usize, Output<T>)>;

pub struct AudioMixer<Endpoint: Clone> {
room: ClusterRoomHash,
mix_channel_id: ChannelId,
Expand All @@ -64,7 +66,7 @@ pub struct AudioMixer<Endpoint: Clone> {
subscriber1: TaskSwitcherBranch<AudioMixerSubscriber<Endpoint, 1>, Output<Endpoint>>,
subscriber2: TaskSwitcherBranch<AudioMixerSubscriber<Endpoint, 2>, Output<Endpoint>>,
subscriber3: TaskSwitcherBranch<AudioMixerSubscriber<Endpoint, 3>, Output<Endpoint>>,
manuals: TaskSwitcherBranch<TaskGroup<manual::Input, Output<Endpoint>, ManualMixer<Endpoint>, 4>, (usize, Output<Endpoint>)>,
manuals: AudioMixerManuals<Endpoint>,
switcher: TaskSwitcher,
last_tick: Instant,
}
Expand Down
8 changes: 4 additions & 4 deletions packages/media_core/src/cluster/room/audio_mixer/manual.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ impl<Endpoint: Clone> ManualMixer<Endpoint> {

fn attach(&mut self, _now: Instant, source: TrackSource) {
let channel_id = id_generator::gen_channel_id(self.room, &source.peer, &source.track);
if !self.sources.contains_key(&channel_id) {
if let std::collections::hash_map::Entry::Vacant(e) = self.sources.entry(channel_id) {
log::info!("[ClusterManualMixer] add source {:?} => sub {channel_id}", source);
self.sources.insert(channel_id, source);
e.insert(source);
self.queue.push_back(Output::Pubsub(pubsub::Control(channel_id, pubsub::ChannelControl::SubAuto)));
}
}
Expand Down Expand Up @@ -85,7 +85,7 @@ impl<Endpoint: Clone> ManualMixer<Endpoint> {

fn detach(&mut self, _now: Instant, source: TrackSource) {
let channel_id = id_generator::gen_channel_id(self.room, &source.peer, &source.track);
if let Some(_) = self.sources.remove(&channel_id) {
if self.sources.remove(&channel_id).is_some() {
log::info!("[ClusterManualMixer] remove source {:?} => unsub {channel_id}", source);
self.queue.push_back(Output::Pubsub(pubsub::Control(channel_id, pubsub::ChannelControl::UnsubAuto)));
}
Expand Down Expand Up @@ -124,7 +124,7 @@ impl<Endpoint: Clone> Task<Input, Output<Endpoint>> for ManualMixer<Endpoint> {
Input::LeaveRoom => {
// We need manual release sources because it is from client request,
// we cannot ensure client will release it before it disconnect.
let sources = std::mem::replace(&mut self.sources, Default::default());
let sources = std::mem::take(&mut self.sources);
for (channel_id, source) in sources {
log::info!("[ClusterManualMixer] remove source {:?} on queue => unsub {channel_id}", source);
self.queue.push_back(Output::Pubsub(pubsub::Control(channel_id, pubsub::ChannelControl::UnsubAuto)));
Expand Down
4 changes: 3 additions & 1 deletion packages/media_core/src/endpoint/internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,12 @@ pub enum InternalOutput {
Destroy,
}

type EndpointInternalWaitJoin = Option<(EndpointReqId, RoomId, PeerId, PeerMeta, RoomInfoPublish, RoomInfoSubscribe, Option<AudioMixerConfig>)>;

pub struct EndpointInternal {
cfg: EndpointCfg,
state: Option<(Instant, TransportState)>,
wait_join: Option<(EndpointReqId, RoomId, PeerId, PeerMeta, RoomInfoPublish, RoomInfoSubscribe, Option<AudioMixerConfig>)>,
wait_join: EndpointInternalWaitJoin,
joined: Option<(ClusterRoomHash, RoomId, PeerId, Option<AudioMixerMode>)>,
local_tracks_id: Small2dMap<LocalTrackId, usize>,
remote_tracks_id: Small2dMap<RemoteTrackId, usize>,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::VecDeque;
use std::{cmp::Ordering, collections::VecDeque};

use media_server_protocol::media::{MediaLayersBitrate, MediaMeta, MediaPacket};

Expand Down Expand Up @@ -53,36 +53,42 @@
if let MediaMeta::H264 { key, profile: _, sim: Some(_sim) } = &mut pkt.meta {
match (self.current, self.target) {
(Some(current), Some(target)) => {
if target < current {
//down spatial => need wait key-frame
if *key {
log::info!("[H264SimSelector] down {} => {} with key", current, target);
ctx.seq_rewrite.reinit();
ctx.ts_rewrite.reinit();
self.current = self.target;
} else {
self.queue.push_back(Action::RequestKeyFrame);
match target.cmp(&current) {
Ordering::Less => {
// down spatial => need wait key-frame
if *key {
log::info!("[H264SimSelector] down {} => {} with key", current, target);
ctx.seq_rewrite.reinit();
ctx.ts_rewrite.reinit();
self.current = self.target;
} else {
self.queue.push_back(Action::RequestKeyFrame);
}
}
} else if target > current {
//up spatial => need wait key-frame
if *key {
log::info!("[H264SimSelector] up {} => {} with key", current, target);
ctx.seq_rewrite.reinit();
ctx.ts_rewrite.reinit();
self.current = Some(target);
} else if !*key {
self.queue.push_back(Action::RequestKeyFrame);
Ordering::Greater => {
// up spatial => need wait key-frame
if *key {
log::info!("[H264SimSelector] up {} => {} with key", current, target);
ctx.seq_rewrite.reinit();
ctx.ts_rewrite.reinit();
self.current = Some(target);
} else {
self.queue.push_back(Action::RequestKeyFrame);
}
}
Ordering::Equal => {
// target is equal to current, handle if needed

Check warning on line 80 in packages/media_core/src/endpoint/internal/local_track/packet_selector/video_h264_sim.rs

View check run for this annotation

Codecov / codecov/patch

packages/media_core/src/endpoint/internal/local_track/packet_selector/video_h264_sim.rs#L79-L80

Added lines #L79 - L80 were not covered by tests
}
}
}
(Some(current), None) => {
//need pause
//TODO wait current frame finished for avoiding interrupt client
// need pause
// TODO: wait current frame finished for avoiding interrupt client

Check warning on line 86 in packages/media_core/src/endpoint/internal/local_track/packet_selector/video_h264_sim.rs

View check run for this annotation

Codecov / codecov/patch

packages/media_core/src/endpoint/internal/local_track/packet_selector/video_h264_sim.rs#L85-L86

Added lines #L85 - L86 were not covered by tests
log::info!("[H264SimSelector] pause from {}", current);
self.current = None;
}
(None, Some(target)) => {
//need resume or start => need wait key_frame
// need resume or start => need wait key-frame
if *key {
log::info!("[H264SimSelector] resume to {} with key", target);
// with other spatial we have difference tl0xidx and pic_id offset
Expand All @@ -91,7 +97,7 @@
}
}
(None, None) => {
//reject
// reject

Check warning on line 100 in packages/media_core/src/endpoint/internal/local_track/packet_selector/video_h264_sim.rs

View check run for this annotation

Codecov / codecov/patch

packages/media_core/src/endpoint/internal/local_track/packet_selector/video_h264_sim.rs#L100

Added line #L100 was not covered by tests
}
}
}
Expand Down
Loading
Loading