Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MOQT clientのSubscribe OKメッセージ送信対応 #58

Merged
merged 16 commits into from
Aug 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions js/main.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ init().then(async () => {
console.log(client.id, client)
console.log('URL:', client.url())

// TODO: Move track management to lib.rs
const announcedTrackNamespaces = []

const ary = new Uint8Array([1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144, 233])
client.array_buffer_sample_method(ary)
client.array_buffer_sample_method(ary)
Expand All @@ -27,6 +30,14 @@ init().then(async () => {
client.onSubscribe(async (subscribeResponse) => {
console.log('relay will want to subscribe')
console.log({ subscribeResponse })

// TODO: Move error handling to lib.rs
if (announcedTrackNamespace.includes(subscribeResponse.track_namespace)) {
client.sendSubscribeOkMessage(subscribeResponse.track_namespace, subscribeResponse.track_name, 0n, 0n)
console.log('send subscribe ok')
} else {
// TODO: Send subscribe error message
}
})

client.onSubscribeResponse(async (subscribeResponse) => {
Expand Down Expand Up @@ -66,6 +77,8 @@ init().then(async () => {
break
case 'announce':
await client.sendAnnounceMessage(trackNamespace, 1, authInfo)
// TODO: Move track management to lib.rs
announcedTrackNamespaces.push(trackNamespace)
break
case 'unannounce':
await client.sendUnannounceMessage(trackNamespace)
Expand Down
97 changes: 87 additions & 10 deletions moqt-client-sample/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ impl MOQTClient {
#[wasm_bindgen(js_name = sendAnnounceMessage)]
pub async fn send_announce_message(
&self,
track_name_space: String,
track_namespace: String,
number_of_parameters: u8,
auth_info: String, // param[0]
) -> Result<JsValue, JsValue> {
Expand All @@ -154,7 +154,7 @@ impl MOQTClient {
VersionSpecificParameter::AuthorizationInfo(AuthorizationInfo::new(auth_info));

let announce_message = AnnounceMessage::new(
track_name_space,
track_namespace,
number_of_parameters,
vec![auth_info_parameter],
);
Expand All @@ -181,13 +181,13 @@ impl MOQTClient {
#[wasm_bindgen(js_name = sendUnannounceMessage)]
pub async fn send_unannounce_message(
&self,
track_name_space: String,
track_namespace: String,
) -> Result<JsValue, JsValue> {
if let Some(writer) = &*self.control_stream_writer.borrow() {
// TODO: construct UnAnnounce Message
let mut buf = Vec::new();
buf.put_u8(0x09); // unannounce
buf.extend(write_variable_bytes(&track_name_space.as_bytes().to_vec()));
buf.extend(write_variable_bytes(&track_namespace.as_bytes().to_vec()));

let buffer = js_sys::Uint8Array::new_with_length(buf.len() as u32);
buffer.copy_from(&buf);
Expand All @@ -202,7 +202,7 @@ impl MOQTClient {
#[wasm_bindgen(js_name = sendSubscribeMessage)]
pub async fn send_subscribe_message(
&self,
track_name_space: String,
track_namespace: String,
track_name: String,
// start_group: Option<String>,
// start_object: Option<String>,
Expand All @@ -217,7 +217,7 @@ impl MOQTClient {
let version_specific_parameters = vec![auth_info];
let subscribe_message =
moqt_core::messages::subscribe_request_message::SubscribeRequestMessage::new(
track_name_space,
track_namespace,
track_name,
moqt_core::messages::subscribe_request_message::Location::RelativePrevious(0),
moqt_core::messages::subscribe_request_message::Location::Absolute(0),
Expand All @@ -243,16 +243,83 @@ impl MOQTClient {
}
}

#[wasm_bindgen(js_name = sendSubscribeOkMessage)]
pub async fn send_subscribe_ok_message(
&self,
track_namespace: String,
track_name: String,
track_id: u64,
expires: u64,
) -> Result<JsValue, JsValue> {
if let Some(writer) = &*self.control_stream_writer.borrow() {
let subscribe_ok_message = moqt_core::messages::subscribe_ok_message::SubscribeOk::new(
track_namespace,
track_name,
track_id,
expires,
);
let mut subscribe_ok_message_buf = BytesMut::new();
subscribe_ok_message.packetize(&mut subscribe_ok_message_buf);

let mut buf = Vec::new();
buf.extend(write_variable_integer(
u8::from(MessageType::SubscribeOk) as u64
)); // subscribe ok
buf.extend(subscribe_ok_message_buf);

let buffer = js_sys::Uint8Array::new_with_length(buf.len() as u32);
buffer.copy_from(&buf);

JsFuture::from(writer.write_with_chunk(&buffer)).await
} else {
Err(JsValue::from_str("control_stream_writer is None"))
}
}

#[wasm_bindgen(js_name = sendSubscribeErrorMessage)]
pub async fn send_subscribe_error_message(
&self,
track_namespace: String,
track_name: String,
error_code: u64,
reason_phrase: String,
) -> Result<JsValue, JsValue> {
if let Some(writer) = &*self.control_stream_writer.borrow() {
let subscribe_error_message =
moqt_core::messages::subscribe_error_message::SubscribeError::new(
track_namespace,
track_name,
error_code,
reason_phrase,
);
let mut subscribe_error_message_buf = BytesMut::new();
subscribe_error_message.packetize(&mut subscribe_error_message_buf);

let mut buf = Vec::new();
buf.extend(write_variable_integer(
u8::from(MessageType::SubscribeError) as u64,
)); // subscribe error
buf.extend(subscribe_error_message_buf);

let buffer = js_sys::Uint8Array::new_with_length(buf.len() as u32);
buffer.copy_from(&buf);

JsFuture::from(writer.write_with_chunk(&buffer)).await
} else {
Err(JsValue::from_str("control_stream_writer is None"))
}
}

#[wasm_bindgen(js_name = sendUnsubscribeMessage)]
pub async fn send_unsubscribe_message(
&self,
track_name_space: String,
track_namespace: String,
track_name: String,
) -> Result<JsValue, JsValue> {
if let Some(writer) = &*self.control_stream_writer.borrow() {
let unsubscribe_message =
moqt_core::messages::unsubscribe_message::UnsubscribeMessage::new(
track_name_space,
track_namespace,
track_name,
);
let mut unsubscribe_message_buf = BytesMut::new();
Expand Down Expand Up @@ -569,6 +636,18 @@ async fn message_handler(
callback.call1(&JsValue::null(), &(v)).unwrap();
}
}
MessageType::Subscribe => {
let subscribe_message =
moqt_core::messages::subscribe_request_message::SubscribeRequestMessage::depacketize(
&mut buf,
)?;

let v = serde_wasm_bindgen::to_value(&subscribe_message).unwrap();

if let Some(callback) = callbacks.borrow().subscribe_callback() {
callback.call1(&JsValue::null(), &(v)).unwrap();
}
}
MessageType::SubscribeOk => {
let subscribe_ok_message =
moqt_core::messages::subscribe_ok_message::SubscribeOk::depacketize(
Expand Down Expand Up @@ -668,8 +747,6 @@ impl MOQTCallbacks {
self.announce_callback = Some(callback);
}

// 未実装のためallow dead codeをつけている
#[allow(dead_code)]
pub fn subscribe_callback(&self) -> Option<js_sys::Function> {
self.subscribe_callback.clone()
}
Expand Down
5 changes: 3 additions & 2 deletions moqt-core/src/modules/messages/subscribe_request_message.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use anyhow::{bail, Context};
use serde::Serialize;
use std::any::Any;
use tracing;

Expand All @@ -12,7 +13,7 @@ use crate::{

use super::{moqt_payload::MOQTPayload, version_specific_parameters::VersionSpecificParameter};

#[derive(Debug, Clone, PartialEq)]
#[derive(Debug, Serialize, Clone, PartialEq)]
pub struct SubscribeRequestMessage {
track_namespace: String,
track_name: String,
Expand Down Expand Up @@ -121,7 +122,7 @@ impl MOQTPayload for SubscribeRequestMessage {
}
}

#[derive(Debug, Clone, PartialEq)]
#[derive(Debug, Serialize, Clone, PartialEq)]
pub enum Location {
None, // 0x00
Absolute(u64), // 0x01
Expand Down
11 changes: 6 additions & 5 deletions moqt-core/src/modules/messages/version_specific_parameters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@ use crate::{
};
use anyhow::Ok;
use num_enum::TryFromPrimitive;
use serde::Serialize;
use std::any::Any;

/// This structure is a parameter that uses a version-specific namespace, unlike Setup parameters,
/// which uses a namespace that is constant across all MoQ Transport versions.
///
/// This structure is referred by messages using parameters other than Setup parameters.
#[derive(Debug, Clone, PartialEq)]
#[derive(Debug, Serialize, Clone, PartialEq)]
pub enum VersionSpecificParameter {
GroupSequence(GroupSequence),
ObjectSequence(ObjectSequence),
Expand Down Expand Up @@ -102,7 +103,7 @@ impl MOQTPayload for VersionSpecificParameter {
}
}

#[derive(Debug, Clone, Copy, TryFromPrimitive, PartialEq)]
#[derive(Debug, Serialize, Clone, Copy, TryFromPrimitive, PartialEq)]
#[repr(u8)]
pub enum VersionSpecificParameterType {
GroupSequence = 0x00,
Expand All @@ -116,7 +117,7 @@ impl From<VersionSpecificParameterType> for u64 {
}
}

#[derive(Debug, Clone, PartialEq)]
#[derive(Debug, Serialize, Clone, PartialEq)]
pub struct GroupSequence {
parameter_type: VersionSpecificParameterType,
length: u8,
Expand All @@ -136,7 +137,7 @@ impl GroupSequence {
}
}

#[derive(Debug, Clone, PartialEq)]
#[derive(Debug, Serialize, Clone, PartialEq)]
pub struct ObjectSequence {
parameter_type: VersionSpecificParameterType,
length: u8,
Expand All @@ -156,7 +157,7 @@ impl ObjectSequence {
}
}

#[derive(Debug, Clone, PartialEq)]
#[derive(Debug, Serialize, Clone, PartialEq)]
pub struct AuthorizationInfo {
parameter_type: VersionSpecificParameterType,
length: u8,
Expand Down