diff --git a/Cargo.toml b/Cargo.toml index 753a4c9..e67e01f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,6 +37,7 @@ rand = "0.8" [dev-dependencies] env_logger = "0.10.0" +tempfile = "3.8.0" [features] python = ["pyo3", "pyo3-log"] diff --git a/assets/xsd/FLUTE-FDT-3GPP-2005-Extensions.xsd b/assets/xsd/FLUTE-FDT-3GPP-2005-Extensions.xsd new file mode 100644 index 0000000..6373d41 --- /dev/null +++ b/assets/xsd/FLUTE-FDT-3GPP-2005-Extensions.xsd @@ -0,0 +1,20 @@ + + + + + + + + + + + + + + + + diff --git a/assets/xsd/FLUTE-FDT-3GPP-2007-Extensions.xsd b/assets/xsd/FLUTE-FDT-3GPP-2007-Extensions.xsd new file mode 100644 index 0000000..8d67301 --- /dev/null +++ b/assets/xsd/FLUTE-FDT-3GPP-2007-Extensions.xsd @@ -0,0 +1,17 @@ + + + + + + + + + + + + + diff --git a/assets/xsd/FLUTE-FDT-3GPP-2008-Extensions.xsd b/assets/xsd/FLUTE-FDT-3GPP-2008-Extensions.xsd new file mode 100644 index 0000000..66f090b --- /dev/null +++ b/assets/xsd/FLUTE-FDT-3GPP-2008-Extensions.xsd @@ -0,0 +1,8 @@ + + + + diff --git a/assets/xsd/FLUTE-FDT-3GPP-2009-Extensions.xsd b/assets/xsd/FLUTE-FDT-3GPP-2009-Extensions.xsd new file mode 100644 index 0000000..9fccb7f --- /dev/null +++ b/assets/xsd/FLUTE-FDT-3GPP-2009-Extensions.xsd @@ -0,0 +1,7 @@ + + + + diff --git a/assets/xsd/FLUTE-FDT-3GPP-2012-Extensions.xsd b/assets/xsd/FLUTE-FDT-3GPP-2012-Extensions.xsd new file mode 100644 index 0000000..801eefd --- /dev/null +++ b/assets/xsd/FLUTE-FDT-3GPP-2012-Extensions.xsd @@ -0,0 +1,21 @@ + + + + + + + + + + + + + + + diff --git a/assets/xsd/FLUTE-FDT-3GPP-2015-Extensions.xsd b/assets/xsd/FLUTE-FDT-3GPP-2015-Extensions.xsd new file mode 100644 index 0000000..206704b --- /dev/null +++ b/assets/xsd/FLUTE-FDT-3GPP-2015-Extensions.xsd @@ -0,0 +1,11 @@ + + + + + + + diff --git a/assets/xsd/FLUTE-FDT-3GPP-Main.xsd b/assets/xsd/FLUTE-FDT-3GPP-Main.xsd new file mode 100644 index 0000000..2368b26 --- /dev/null +++ b/assets/xsd/FLUTE-FDT-3GPP-Main.xsd @@ -0,0 +1,90 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/assets/xsd/schema-version.xsd b/assets/xsd/schema-version.xsd new file mode 100644 index 0000000..3559649 --- /dev/null +++ b/assets/xsd/schema-version.xsd @@ -0,0 +1,8 @@ + + + + + diff --git a/examples/udpreceiver.rs b/examples/udpreceiver.rs index e394904..ef4edd8 100644 --- a/examples/udpreceiver.rs +++ b/examples/udpreceiver.rs @@ -1,4 +1,7 @@ -use flute::receiver::{writer, MultiReceiver, UDPEndpoint}; +use flute::{ + core::UDPEndpoint, + receiver::{writer, MultiReceiver}, +}; use std::{net::UdpSocket, rc::Rc}; fn main() { diff --git a/examples/udpsender.rs b/examples/udpsender.rs index 65100a3..75fa094 100644 --- a/examples/udpsender.rs +++ b/examples/udpsender.rs @@ -1,10 +1,14 @@ -use flute::sender::{Cenc, ObjectDesc, Sender}; +use flute::{ + core::UDPEndpoint, + sender::{Cenc, ObjectDesc, Sender}, +}; use std::{net::UdpSocket, time::SystemTime}; fn main() { std::env::set_var("RUST_LOG", "info"); env_logger::builder().try_init().ok(); let dest = "224.0.0.1:3400"; + let endpoint = UDPEndpoint::new(None, "224.0.0.1".to_owned(), 3400); let args: Vec = std::env::args().collect(); if args.len() == 1 { @@ -14,11 +18,12 @@ fn main() { } log::info!("Create UDP Socket"); + let udp_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); log::info!("Create FLUTE Sender"); let tsi = 1; - let mut sender = Sender::new(tsi, &Default::default(), &Default::default()); + let mut sender = Sender::new(endpoint, tsi, &Default::default(), &Default::default()); log::info!("Connect to {}", dest); udp_socket.connect(dest).expect("Connection failed"); @@ -39,6 +44,7 @@ fn main() { 1, None, None, + None, Cenc::Null, true, None, diff --git a/src/common/alc.rs b/src/common/alc.rs index 35e4072..1c849a3 100644 --- a/src/common/alc.rs +++ b/src/common/alc.rs @@ -235,6 +235,18 @@ pub fn parse_payload_id(pkt: &AlcPkt, oti: &oti::Oti) -> Result { codec.get_fec_payload_id(pkt, oti) } +/// Get Inline Payload ID +pub fn get_fec_inline_payload_id(pkt: &AlcPkt) -> Result { + let fec: oti::FECEncodingID = pkt + .lct + .cp + .try_into() + .map_err(|_| FluteError::new(format!("Codepoint {} not supported", pkt.lct.cp)))?; + + let codec = ::instance(fec); + codec.get_fec_inline_payload_id(pkt) +} + fn parse_ext_fdt(ext: &[u8]) -> Result> { if ext.len() != 4 { return Err(FluteError::new("Wrong size of FDT Extension")); @@ -349,6 +361,7 @@ fn parse_sct(ext: &[u8]) -> Result> { 1 => u32::from_be_bytes(ext[8..12].as_ref().try_into().unwrap()), _ => 0, }; + let ntp: u64 = ((ntp_seconds as u64) << 32) | (ntp_faction as u64); tools::ntp_to_system_time(ntp).map(|op| Some(op)) } diff --git a/src/common/alccodec/alcnocode.rs b/src/common/alccodec/alcnocode.rs index 619e68b..a18f4c5 100644 --- a/src/common/alccodec/alcnocode.rs +++ b/src/common/alccodec/alcnocode.rs @@ -63,9 +63,7 @@ impl AlcCodec for AlcNoCode { maximum_source_block_length, encoding_symbol_length, max_number_of_parity_symbols: 0, - reed_solomon_scheme_specific: None, - raptorq_scheme_specific: None, - raptor_scheme_specific: None, + scheme_specific: None, inband_fti: true, }; @@ -77,6 +75,10 @@ impl AlcCodec for AlcNoCode { pkt: &alc::AlcPkt, _oti: &oti::Oti, ) -> crate::error::Result { + self.get_fec_inline_payload_id(pkt) + } + + fn get_fec_inline_payload_id(&self, pkt: &alc::AlcPkt) -> crate::error::Result { let data = &pkt.data[pkt.data_alc_header_offset..pkt.data_payload_offset]; let arr: [u8; 4] = match data.try_into() { Ok(arr) => arr, diff --git a/src/common/alccodec/alcraptor.rs b/src/common/alccodec/alcraptor.rs index 552366b..1628a5e 100644 --- a/src/common/alccodec/alcraptor.rs +++ b/src/common/alccodec/alcraptor.rs @@ -1,6 +1,10 @@ use super::AlcCodec; use crate::{ - common::{alc, lct, oti, pkt}, + common::{ + alc, lct, + oti::{self, SchemeSpecific}, + pkt, + }, error::FluteError, }; @@ -31,18 +35,19 @@ impl AlcCodec for AlcRaptor { let transfer_header: u64 = (transfer_length << 24) | (oti.encoding_symbol_length as u64 & 0xFFFF); - assert!(oti.raptor_scheme_specific.is_some()); - let raptor = oti.raptor_scheme_specific.as_ref().unwrap(); - - let padding: u16 = 0; - - data.extend(ext_header.to_be_bytes()); - data.extend(transfer_header.to_be_bytes()); - data.extend(raptor.source_blocks_length.to_be_bytes()); - data.extend(raptor.sub_blocks_length.to_be_bytes()); - data.extend(raptor.symbol_alignment.to_be_bytes()); - data.extend(padding.to_be_bytes()); - lct::inc_hdr_len(data, len); + assert!(oti.scheme_specific.is_some()); + if let SchemeSpecific::Raptor(raptor) = oti.scheme_specific.as_ref().unwrap() { + let padding: u16 = 0; + data.extend(ext_header.to_be_bytes()); + data.extend(transfer_header.to_be_bytes()); + data.extend(raptor.source_blocks_length.to_be_bytes()); + data.extend(raptor.sub_blocks_length.to_be_bytes()); + data.extend(raptor.symbol_alignment.to_be_bytes()); + data.extend(padding.to_be_bytes()); + lct::inc_hdr_len(data, len); + } else { + assert!(false); + } } fn get_fti( @@ -86,13 +91,11 @@ impl AlcCodec for AlcRaptor { maximum_source_block_length: maximum_source_block_length as u32, encoding_symbol_length: symbol_size, max_number_of_parity_symbols: 0, // Unknown for RaptorQ - reed_solomon_scheme_specific: None, - raptorq_scheme_specific: None, - raptor_scheme_specific: Some(oti::RaptorSchemeSpecific { + scheme_specific: Some(SchemeSpecific::Raptor(oti::RaptorSchemeSpecific { source_blocks_length: z, sub_blocks_length: n, symbol_alignment: al, - }), + })), inband_fti: true, }; @@ -115,6 +118,10 @@ impl AlcCodec for AlcRaptor { pkt: &alc::AlcPkt, _oti: &oti::Oti, ) -> crate::error::Result { + self.get_fec_inline_payload_id(pkt) + } + + fn get_fec_inline_payload_id(&self, pkt: &alc::AlcPkt) -> crate::error::Result { let data = &pkt.data[pkt.data_alc_header_offset..pkt.data_payload_offset]; let arr: [u8; 4] = match data.try_into() { Ok(arr) => arr, diff --git a/src/common/alccodec/alcraptorq.rs b/src/common/alccodec/alcraptorq.rs index eb9d5bd..db73e5c 100644 --- a/src/common/alccodec/alcraptorq.rs +++ b/src/common/alccodec/alcraptorq.rs @@ -1,6 +1,10 @@ use super::AlcCodec; use crate::{ - common::{alc, lct, oti, pkt}, + common::{ + alc, lct, + oti::{self, SchemeSpecific}, + pkt, + }, error::FluteError, }; @@ -31,18 +35,19 @@ impl AlcCodec for AlcRaptorQ { let transfer_header: u64 = (transfer_length << 24) | (oti.encoding_symbol_length as u64 & 0xFFFF); - assert!(oti.raptorq_scheme_specific.is_some()); - let raptorq = oti.raptorq_scheme_specific.as_ref().unwrap(); - - let padding: u16 = 0; - - data.extend(ext_header.to_be_bytes()); - data.extend(transfer_header.to_be_bytes()); - data.push(raptorq.source_blocks_length); - data.extend(raptorq.sub_blocks_length.to_be_bytes()); - data.push(raptorq.symbol_alignment); - data.extend(padding.to_be_bytes()); - lct::inc_hdr_len(data, len); + assert!(oti.scheme_specific.is_some()); + if let SchemeSpecific::RaptorQ(raptorq) = oti.scheme_specific.as_ref().unwrap() { + let padding: u16 = 0; + data.extend(ext_header.to_be_bytes()); + data.extend(transfer_header.to_be_bytes()); + data.push(raptorq.source_blocks_length); + data.extend(raptorq.sub_blocks_length.to_be_bytes()); + data.push(raptorq.symbol_alignment); + data.extend(padding.to_be_bytes()); + lct::inc_hdr_len(data, len); + } else { + assert!(false); + } } fn get_fti( @@ -94,13 +99,11 @@ impl AlcCodec for AlcRaptorQ { maximum_source_block_length: maximum_source_block_length as u32, encoding_symbol_length: symbol_size, max_number_of_parity_symbols: 0, // Unknown for RaptorQ - reed_solomon_scheme_specific: None, - raptorq_scheme_specific: Some(oti::RaptorQSchemeSpecific { + scheme_specific: Some(SchemeSpecific::RaptorQ(oti::RaptorQSchemeSpecific { source_blocks_length: z, sub_blocks_length: n, symbol_alignment: al, - }), - raptor_scheme_specific: None, + })), inband_fti: true, }; @@ -125,6 +128,11 @@ impl AlcCodec for AlcRaptorQ { pkt: &alc::AlcPkt, _oti: &oti::Oti, ) -> crate::error::Result { + self.get_fec_inline_payload_id(pkt) + } + + fn get_fec_inline_payload_id(&self, pkt: &alc::AlcPkt) -> crate::error::Result { + let data = &pkt.data[pkt.data_alc_header_offset..pkt.data_payload_offset]; let arr: [u8; 4] = match data.try_into() { Ok(arr) => arr, diff --git a/src/common/alccodec/alcrs28.rs b/src/common/alccodec/alcrs28.rs index 542e5d1..8c464a0 100644 --- a/src/common/alccodec/alcrs28.rs +++ b/src/common/alccodec/alcrs28.rs @@ -61,9 +61,7 @@ impl AlcCodec for AlcRS28 { encoding_symbol_length, max_number_of_parity_symbols: num_encoding_symbols as u32 - maximum_source_block_length as u32, - reed_solomon_scheme_specific: None, - raptorq_scheme_specific: None, - raptor_scheme_specific: None, + scheme_specific: None, inband_fti: true, }; @@ -89,6 +87,10 @@ impl AlcCodec for AlcRS28 { pkt: &alc::AlcPkt, _oti: &oti::Oti, ) -> crate::error::Result { + self.get_fec_inline_payload_id(pkt) + } + + fn get_fec_inline_payload_id(&self, pkt: &alc::AlcPkt) -> crate::error::Result { let data = &pkt.data[pkt.data_alc_header_offset..pkt.data_payload_offset]; let arr: [u8; 4] = match data.try_into() { Ok(arr) => arr, diff --git a/src/common/alccodec/alcrs28underspecified.rs b/src/common/alccodec/alcrs28underspecified.rs index b67fefc..81a8e59 100644 --- a/src/common/alccodec/alcrs28underspecified.rs +++ b/src/common/alccodec/alcrs28underspecified.rs @@ -67,9 +67,7 @@ impl AlcCodec for AlcRS28UnderSpecified { encoding_symbol_length, max_number_of_parity_symbols: num_encoding_symbols as u32 - maximum_source_block_length as u32, - reed_solomon_scheme_specific: None, - raptorq_scheme_specific: None, - raptor_scheme_specific: None, + scheme_specific: None, inband_fti: true, }; @@ -99,6 +97,10 @@ impl AlcCodec for AlcRS28UnderSpecified { pkt: &alc::AlcPkt, _oti: &oti::Oti, ) -> crate::error::Result { + self.get_fec_inline_payload_id(pkt) + } + + fn get_fec_inline_payload_id(&self, pkt: &alc::AlcPkt) -> crate::error::Result { let data = &pkt.data[pkt.data_alc_header_offset..pkt.data_payload_offset]; let arr: [u8; 8] = match data.try_into() { Ok(arr) => arr, diff --git a/src/common/alccodec/alcrs2m.rs b/src/common/alccodec/alcrs2m.rs index 9994d51..bd62a2c 100644 --- a/src/common/alccodec/alcrs2m.rs +++ b/src/common/alccodec/alcrs2m.rs @@ -2,7 +2,7 @@ use super::AlcCodec; use crate::{ common::{ alc, lct, - oti::{self, ReedSolomonGF2MSchemeSpecific}, + oti::{self, ReedSolomonGF2MSchemeSpecific, SchemeSpecific}, pkt, }, error::FluteError, @@ -24,20 +24,24 @@ impl AlcCodec for AlcRS2m { | Max Source Block Length (B) | Max Nb Enc. Symbols (max_n) | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+*/ - let scheme_specific = oti.reed_solomon_scheme_specific.clone().unwrap_or_default(); - let ext_header_l: u64 = - (lct::Ext::Fti as u64) << 56 | 4u64 << 48 | transfer_length & 0xFFFFFFFFFFFF; - - let b = oti.maximum_source_block_length as u16; - let max_n = (oti.max_number_of_parity_symbols + oti.maximum_source_block_length) as u16; - - data.extend(ext_header_l.to_be_bytes()); - data.push(scheme_specific.m); - data.push(scheme_specific.g); - data.extend(oti.encoding_symbol_length.to_be_bytes()); - data.extend(b.to_be_bytes()); - data.extend(max_n.to_be_bytes()); - lct::inc_hdr_len(data, 4); + if let SchemeSpecific::ReedSolomon(scheme_specific) = oti.scheme_specific.as_ref().unwrap() + { + let ext_header_l: u64 = + (lct::Ext::Fti as u64) << 56 | 4u64 << 48 | transfer_length & 0xFFFFFFFFFFFF; + + let b = oti.maximum_source_block_length as u16; + let max_n = (oti.max_number_of_parity_symbols + oti.maximum_source_block_length) as u16; + + data.extend(ext_header_l.to_be_bytes()); + data.push(scheme_specific.m); + data.push(scheme_specific.g); + data.extend(oti.encoding_symbol_length.to_be_bytes()); + data.extend(b.to_be_bytes()); + data.extend(max_n.to_be_bytes()); + lct::inc_hdr_len(data, 4); + } else { + assert!(false); + } } fn get_fti( @@ -71,7 +75,7 @@ impl AlcCodec for AlcRS2m { maximum_source_block_length: b as u32, encoding_symbol_length, max_number_of_parity_symbols: max_n as u32 - b as u32, - reed_solomon_scheme_specific: Some(ReedSolomonGF2MSchemeSpecific { + scheme_specific: Some(SchemeSpecific::ReedSolomon(ReedSolomonGF2MSchemeSpecific { g: match g { 0 => 1, g => g, @@ -80,9 +84,7 @@ impl AlcCodec for AlcRS2m { 0 => 8, m => m, }, - }), - raptorq_scheme_specific: None, - raptor_scheme_specific: None, + })), inband_fti: true, }; @@ -91,9 +93,12 @@ impl AlcCodec for AlcRS2m { fn add_fec_payload_id(&self, data: &mut Vec, oti: &oti::Oti, pkt: &pkt::Pkt) { let m = oti - .reed_solomon_scheme_specific + .scheme_specific .as_ref() - .map(|f| f.m) + .map(|f| match f { + SchemeSpecific::ReedSolomon(s) => s.m, + _ => 8, + }) .unwrap_or(8); let sbn = pkt.sbn; @@ -126,9 +131,12 @@ impl AlcCodec for AlcRS2m { let payload_id_header = u32::from_be_bytes(arr); let m = oti - .reed_solomon_scheme_specific + .scheme_specific .as_ref() - .map(|f| f.m) + .map(|f| match f { + SchemeSpecific::ReedSolomon(s) => s.m, + _ => 8, + }) .unwrap_or(8); let sbn = payload_id_header >> m; @@ -142,6 +150,13 @@ impl AlcCodec for AlcRS2m { }) } + fn get_fec_inline_payload_id( + &self, + _pkt: &alc::AlcPkt, + ) -> crate::error::Result { + return Err(FluteError::new("not supported")); + } + fn fec_payload_id_block_length(&self) -> usize { 4 } diff --git a/src/common/alccodec/mod.rs b/src/common/alccodec/mod.rs index 78ba9f8..2545892 100644 --- a/src/common/alccodec/mod.rs +++ b/src/common/alccodec/mod.rs @@ -16,6 +16,7 @@ pub trait AlcCodec { fn get_fti(&self, data: &[u8], lct_header: &lct::LCTHeader) -> Result>; fn add_fec_payload_id(&self, data: &mut Vec, oti: &oti::Oti, pkt: &pkt::Pkt); fn get_fec_payload_id(&self, pkt: &AlcPkt, oti: &oti::Oti) -> Result; + fn get_fec_inline_payload_id(&self, pkt: &AlcPkt) -> Result; fn fec_payload_id_block_length(&self) -> usize; } diff --git a/src/common/fdtinstance.rs b/src/common/fdtinstance.rs index 6414b6a..5d5023d 100644 --- a/src/common/fdtinstance.rs +++ b/src/common/fdtinstance.rs @@ -16,6 +16,7 @@ use opentelemetry::{ use super::oti::{ self, RaptorQSchemeSpecific, RaptorSchemeSpecific, ReedSolomonGF2MSchemeSpecific, + SchemeSpecific, }; fn xmlns_mbms_2005(os: &Option, serializer: S) -> std::result::Result @@ -215,13 +216,17 @@ pub struct FdtInstance { #[serde( rename = "sv:delimiter", + alias = "delimiter", skip_serializing_if = "Option::is_none", skip_deserializing )] #[serde(alias = "delimiter")] pub delimiter: Option, - #[serde(rename = "Group", skip_serializing_if = "Option::is_none")] + #[serde( + rename = "Group", + skip_serializing_if = "Option::is_none" + )] pub group: Option>, #[serde( @@ -383,36 +388,36 @@ pub struct File { fn reed_solomon_scheme_specific( fec_oti_scheme_specific_info: &Option, -) -> Result> { +) -> Result> { if fec_oti_scheme_specific_info.is_none() { return Ok(None); } let scheme = ReedSolomonGF2MSchemeSpecific::decode(fec_oti_scheme_specific_info.as_ref().unwrap())?; - Ok(Some(scheme)) + Ok(Some(SchemeSpecific::ReedSolomon(scheme))) } fn raptorq_scheme_specific( fec_oti_scheme_specific_info: &Option, -) -> Result> { +) -> Result> { if fec_oti_scheme_specific_info.is_none() { return Ok(None); } let scheme = RaptorQSchemeSpecific::decode(fec_oti_scheme_specific_info.as_ref().unwrap())?; - Ok(Some(scheme)) + Ok(Some(SchemeSpecific::RaptorQ(scheme))) } fn raptor_scheme_specific( fec_oti_scheme_specific_info: &Option, -) -> Result> { +) -> Result> { if fec_oti_scheme_specific_info.is_none() { return Ok(None); } let scheme = RaptorSchemeSpecific::decode(fec_oti_scheme_specific_info.as_ref().unwrap())?; - Ok(Some(scheme)) + Ok(Some(SchemeSpecific::Raptor(scheme))) } impl FdtInstance { @@ -474,21 +479,13 @@ impl FdtInstance { let fec_encoding_id: oti::FECEncodingID = self.fec_oti_fec_encoding_id.unwrap().try_into().ok()?; - let reed_solomon_scheme_specific = match fec_encoding_id { + let scheme_specific = match fec_encoding_id { oti::FECEncodingID::ReedSolomonGF2M => { reed_solomon_scheme_specific(&self.fec_oti_scheme_specific_info).unwrap_or(None) } - _ => None, - }; - - let raptorq_scheme_specific = match fec_encoding_id { oti::FECEncodingID::RaptorQ => { raptorq_scheme_specific(&self.fec_oti_scheme_specific_info).unwrap_or(None) } - _ => None, - }; - - let raptor_scheme_specific = match fec_encoding_id { oti::FECEncodingID::Raptor => { raptor_scheme_specific(&self.fec_oti_scheme_specific_info).unwrap_or(None) } @@ -507,9 +504,7 @@ impl FdtInstance { max_number_of_parity_symbols: (fec_oti_max_number_of_encoding_symbols - self.fec_oti_maximum_source_block_length.unwrap()) as u32, - reed_solomon_scheme_specific, - raptorq_scheme_specific, - raptor_scheme_specific, + scheme_specific, inband_fti: false, }) } @@ -568,21 +563,13 @@ impl File { let fec_encoding_id: oti::FECEncodingID = self.fec_oti_fec_encoding_id.unwrap().try_into().ok()?; - let reed_solomon_scheme_specific = match fec_encoding_id { + let scheme_specific = match fec_encoding_id { oti::FECEncodingID::ReedSolomonGF2M => { reed_solomon_scheme_specific(&self.fec_oti_scheme_specific_info).unwrap_or(None) } - _ => None, - }; - - let raptorq_scheme_specific = match fec_encoding_id { oti::FECEncodingID::RaptorQ => { raptorq_scheme_specific(&self.fec_oti_scheme_specific_info).unwrap_or(None) } - _ => None, - }; - - let raptor_scheme_specific = match fec_encoding_id { oti::FECEncodingID::Raptor => { raptor_scheme_specific(&self.fec_oti_scheme_specific_info).unwrap_or(None) } @@ -601,9 +588,7 @@ impl File { max_number_of_parity_symbols: (fec_oti_max_number_of_encoding_symbols - self.fec_oti_maximum_source_block_length.unwrap()) as u32, - reed_solomon_scheme_specific, - raptorq_scheme_specific, - raptor_scheme_specific, + scheme_specific, inband_fti: false, }) } diff --git a/src/common/lct.rs b/src/common/lct.rs index 9cb6f50..f4d123b 100644 --- a/src/common/lct.rs +++ b/src/common/lct.rs @@ -88,7 +88,7 @@ impl Cenc { } } -fn nb_bytes_128(cci: &u128) -> u32 { +fn nb_bytes_128(cci: &u128, min: u32) -> u32 { if (cci & 0xFFFF0000000000000000000000000000) != 0x0 { return 16; } @@ -121,10 +121,10 @@ fn nb_bytes_128(cci: &u128) -> u32 { return 2; } - return 0; + return min; } -fn nb_bytes_64(n: u64) -> u32 { +fn nb_bytes_64(n: u64, min: u32) -> u32 { if (n & 0xFFFF000000000000) != 0x0 { return 8; } @@ -141,7 +141,7 @@ fn nb_bytes_64(n: u64) -> u32 { return 2; } - return 0; + return min; } /** @@ -234,9 +234,9 @@ pub fn push_lct_header( close_object: bool, close_session: bool, ) { - let cci_size = nb_bytes_128(cci); - let tsi_size = nb_bytes_64(tsi); - let toi_size = nb_bytes_128(toi); + let cci_size = nb_bytes_128(cci, 0); + let tsi_size = nb_bytes_64(tsi, 2); + let toi_size = nb_bytes_128(toi, 2); let h_tsi = (tsi_size & 2) >> 1; // Is TSI half-word ? let h_toi = (toi_size & 2) >> 1; // Is TOI half-word ? diff --git a/src/common/mod.rs b/src/common/mod.rs index 2a6dc9f..ffcf8b6 100644 --- a/src/common/mod.rs +++ b/src/common/mod.rs @@ -17,3 +17,4 @@ pub mod lct; pub mod oti; pub mod partition; pub mod pkt; +pub mod udpendpoint; diff --git a/src/common/oti.rs b/src/common/oti.rs index e37abe4..c45fc2a 100644 --- a/src/common/oti.rs +++ b/src/common/oti.rs @@ -158,6 +158,19 @@ impl RaptorSchemeSpecific { } } +/// +/// Scheme Specific information +/// +#[derive(Clone, Debug)] +pub enum SchemeSpecific { + /// if `fec_encoding_id` is `FECEncodingID::ReedSolomonGF2M` + ReedSolomon(ReedSolomonGF2MSchemeSpecific), + /// if `fec_encoding_id` is `FECEncodingID::RaptorQ` + RaptorQ(RaptorQSchemeSpecific), + /// if `fec_encoding_id` is `FECEncodingID::Raptor` + Raptor(RaptorSchemeSpecific), +} + /// /// FEC Object Transmission Information /// Contains the parameters using the build the blocks and FEC for the objects transmission @@ -174,12 +187,8 @@ pub struct Oti { pub encoding_symbol_length: u16, /// Maximum number of repairing symbols (FEC) pub max_number_of_parity_symbols: u32, - /// Optional, only if `fec_encoding_id` is `FECEncodingID::ReedSolomonGF2M` - pub reed_solomon_scheme_specific: Option, - /// Optional, only if `fec_encoding_id` is `FECEncodingID::RaptorQ` - pub raptorq_scheme_specific: Option, - /// Optional, only if `fec_encoding_id` is `FECEncodingID::Raptor` - pub raptor_scheme_specific: Option, + /// Optional, FEC scheme specific + pub scheme_specific: Option, /// If `true`, FTI is added to every ALC/LCT packets /// If `false`, FTI is only available inside the FDT pub inband_fti: bool, @@ -226,9 +235,7 @@ impl Oti { maximum_source_block_length: maximum_source_block_length as u32, encoding_symbol_length: encoding_symbol_length, max_number_of_parity_symbols: 0, - reed_solomon_scheme_specific: None, - raptorq_scheme_specific: None, - raptor_scheme_specific: None, + scheme_specific: None, inband_fti: true, } } @@ -279,9 +286,7 @@ impl Oti { maximum_source_block_length: maximum_source_block_length as u32, encoding_symbol_length: encoding_symbol_length, max_number_of_parity_symbols: max_number_of_parity_symbols as u32, - reed_solomon_scheme_specific: None, - raptorq_scheme_specific: None, - raptor_scheme_specific: None, + scheme_specific: None, inband_fti: true, }) } @@ -332,9 +337,7 @@ impl Oti { maximum_source_block_length: maximum_source_block_length as u32, encoding_symbol_length: encoding_symbol_length, max_number_of_parity_symbols: max_number_of_parity_symbols as u32, - reed_solomon_scheme_specific: None, - raptorq_scheme_specific: None, - raptor_scheme_specific: None, + scheme_specific: None, inband_fti: true, }) } @@ -392,13 +395,11 @@ impl Oti { maximum_source_block_length: maximum_source_block_length as u32, encoding_symbol_length: encoding_symbol_length, max_number_of_parity_symbols: max_number_of_parity_symbols as u32, - reed_solomon_scheme_specific: None, - raptorq_scheme_specific: Some(RaptorQSchemeSpecific { + scheme_specific: Some(SchemeSpecific::RaptorQ(RaptorQSchemeSpecific { source_blocks_length: 0, sub_blocks_length: sub_blocks_length, symbol_alignment: symbol_alignment, - }), - raptor_scheme_specific: None, + })), inband_fti: true, }) } @@ -456,13 +457,11 @@ impl Oti { maximum_source_block_length: maximum_source_block_length as u32, encoding_symbol_length: encoding_symbol_length, max_number_of_parity_symbols: max_number_of_parity_symbols as u32, - reed_solomon_scheme_specific: None, - raptorq_scheme_specific: None, - raptor_scheme_specific: Some(RaptorSchemeSpecific { + scheme_specific: Some(SchemeSpecific::Raptor(RaptorSchemeSpecific { source_blocks_length: 0, sub_blocks_length: sub_blocks_length, symbol_alignment: symbol_alignment, - }), + })), inband_fti: true, }) } @@ -523,18 +522,18 @@ impl Oti { fn scheme_specific_info(&self) -> Option { match self.fec_encoding_id { FECEncodingID::NoCode => None, - FECEncodingID::ReedSolomonGF2M => match self.reed_solomon_scheme_specific.as_ref() { - Some(scheme) => Some(scheme.scheme_specific()), - None => None, + FECEncodingID::ReedSolomonGF2M => match self.scheme_specific.as_ref() { + Some(SchemeSpecific::ReedSolomon(scheme)) => Some(scheme.scheme_specific()), + _ => None, }, FECEncodingID::ReedSolomonGF28 => None, - FECEncodingID::RaptorQ => match self.raptorq_scheme_specific.as_ref() { - Some(scheme) => Some(scheme.scheme_specific()), - None => None, + FECEncodingID::RaptorQ => match self.scheme_specific.as_ref() { + Some(SchemeSpecific::RaptorQ(scheme)) => Some(scheme.scheme_specific()), + _ => None, }, - FECEncodingID::Raptor => match self.raptor_scheme_specific.as_ref() { - Some(scheme) => Some(scheme.scheme_specific()), - None => None, + FECEncodingID::Raptor => match self.scheme_specific.as_ref() { + Some(SchemeSpecific::Raptor(scheme)) => Some(scheme.scheme_specific()), + _ => None, }, FECEncodingID::ReedSolomonGF28UnderSpecified => None, } diff --git a/src/common/udpendpoint.rs b/src/common/udpendpoint.rs new file mode 100644 index 0000000..218d36c --- /dev/null +++ b/src/common/udpendpoint.rs @@ -0,0 +1,56 @@ +use std::{ + collections::hash_map::DefaultHasher, + hash::{Hash, Hasher}, + time::SystemTime, +}; + +use chrono::Datelike; +use serde::{Deserialize, Serialize}; + +/// UDP Endpoint +#[derive(Debug, PartialEq, Deserialize, Serialize, Clone, Eq, Hash)] +pub struct UDPEndpoint { + /// Network source adress + pub source_address: Option, + /// Network destination group address (multicast ip) + pub destination_group_address: String, + /// port + pub port: u16, +} + +impl UDPEndpoint { + /// Create a new UDP Endpoint + pub fn new(src: Option, dest: String, port: u16) -> Self { + Self { + source_address: src, + destination_group_address: dest, + port: port, + } + } + + /// Generate a u128bits Trace-ID + pub fn trace_id( + &self, + tsi: u64, + toi: u128, + fdt_instance_id: Option, + now: SystemTime, + ) -> u128 { + let mut hasher_endpoint = DefaultHasher::new(); + let mut hasher_tsi_toi = DefaultHasher::new(); + self.hash(&mut hasher_endpoint); + + tsi.hash(&mut hasher_tsi_toi); + toi.hash(&mut hasher_tsi_toi); + fdt_instance_id.hash(&mut hasher_tsi_toi); + + let date: chrono::DateTime = now.into(); + let day = date.day(); + day.hash(&mut hasher_tsi_toi); + + let endpoint_hash = hasher_endpoint.finish(); + let toi_tsi_hash = hasher_tsi_toi.finish(); + + ((endpoint_hash as u128) << 64) | toi_tsi_hash as u128 + } +} diff --git a/src/lib.rs b/src/lib.rs index 7298452..9ac9af5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -33,6 +33,7 @@ //! use flute::sender::Sender; //! use flute::sender::ObjectDesc; //! use flute::sender::Cenc; +//! use flute::core::UDPEndpoint; //! use std::net::UdpSocket; //! use std::time::SystemTime; //! @@ -44,11 +45,12 @@ //! let tsi = 1; //! let oti = Default::default(); //! let config = Default::default(); -//! let mut sender = Sender::new(tsi, &oti, &config); +//! let endpoint = UDPEndpoint::new(None, "224.0.0.1".to_string(), 3400); +//! let mut sender = Sender::new(endpoint, tsi, &oti, &config); //! //! // Add object(s) (files) to the FLUTE sender //! let obj = ObjectDesc::create_from_buffer(b"hello world", "text/plain", -//! &url::Url::parse("file:///hello.txt").unwrap(), 1, None, None, Cenc::Null, true, None, true).unwrap(); +//! &url::Url::parse("file:///hello.txt").unwrap(), 1, None, None, None, Cenc::Null, true, None, true).unwrap(); //! sender.add_object(obj); //! //! // Always call publish after adding objects @@ -66,7 +68,8 @@ //! Receive files from a UDP/IP network //! //!``` -//! use flute::receiver::{writer, MultiReceiver, UDPEndpoint}; +//! use flute::receiver::{writer, MultiReceiver}; +//! use flute::core::UDPEndpoint; //! use std::net::UdpSocket; //! use std::time::SystemTime; //! use std::rc::Rc; @@ -109,11 +112,13 @@ //!```rust //! use flute::sender::Oti; //! use flute::sender::Sender; +//! use flute::core::UDPEndpoint; //! //! // Reed Solomon 2^8 with encoding blocks composed of //! // 60 source symbols and 4 repair symbols of 1424 bytes per symbol +//! let endpoint = UDPEndpoint::new(None, "224.0.0.1".to_string(), 3400); //! let oti = Oti::new_reed_solomon_rs28(1424, 60, 4).unwrap(); -//! let mut sender = Sender::new(1, &oti, &Default::default()); +//! let mut sender = Sender::new(endpoint, 1, &oti, &Default::default()); //!``` //! //! # Content Encoding (CENC) @@ -141,6 +146,7 @@ //!```rust //! use flute::sender::Sender; //! use flute::sender::Config; +//! use flute::core::UDPEndpoint; //! //! let config = Config { //! // Transfer a maximum of 3 files in parallel @@ -150,7 +156,8 @@ //! ..Default::default() //! }; //! -//! let mut sender = Sender::new(1, &Default::default(), &config); +//! let endpoint = UDPEndpoint::new(None, "224.0.0.1".to_string(), 3400); +//! let mut sender = Sender::new(endpoint, 1, &Default::default(), &config); //! //!``` @@ -168,15 +175,15 @@ pub use crate::tools::error; /// Core module with low-level function pub mod core { - pub use crate::common::alc::AlcPkt; - pub use crate::common::lct::LCTHeader; - pub use crate::common::alc::PayloadID; pub use crate::common::alc::get_sender_current_time; pub use crate::common::alc::parse_alc_pkt; pub use crate::common::alc::parse_payload_id; + pub use crate::common::alc::AlcPkt; + pub use crate::common::alc::PayloadID; + pub use crate::common::lct::LCTHeader; + pub use crate::common::udpendpoint::UDPEndpoint; } - #[cfg(feature = "python")] mod py; diff --git a/src/py/receiver/udpendpoint.rs b/src/py/receiver/udpendpoint.rs index 9c95ab1..d190438 100644 --- a/src/py/receiver/udpendpoint.rs +++ b/src/py/receiver/udpendpoint.rs @@ -3,7 +3,7 @@ use pyo3::prelude::*; #[pyclass(unsendable)] #[derive(Debug)] pub struct UDPEndpoint { - pub inner: crate::receiver::UDPEndpoint, + pub inner: crate::core::UDPEndpoint, } #[pymethods] @@ -15,7 +15,7 @@ impl UDPEndpoint { source_address: Option<&str>, ) -> PyResult { Ok(Self { - inner: crate::receiver::UDPEndpoint { + inner: crate::core::UDPEndpoint { source_address: source_address.map(|f| f.to_string()), destination_group_address: destination_group_address.to_string(), port, diff --git a/src/py/sender/senderpy.rs b/src/py/sender/senderpy.rs index a9ddf5d..9da7d63 100644 --- a/src/py/sender/senderpy.rs +++ b/src/py/sender/senderpy.rs @@ -14,7 +14,12 @@ impl Sender { #[new] pub fn new(tsi: u64, oti: &oti::Oti, config: &config::Config) -> Self { Self { - 0: crate::sender::Sender::new(tsi, &oti.0, &config.0), + 0: crate::sender::Sender::new( + crate::core::UDPEndpoint::new(None, "224.0.0.1".to_owned(), 0), // FIXME + tsi, + &oti.0, + &config.0, + ), } } @@ -36,6 +41,7 @@ impl Sender { 1, None, None, + None, crate::sender::Cenc::Null, true, oti, @@ -76,6 +82,7 @@ impl Sender { 1, None, None, + None, cenc, true, oti, diff --git a/src/receiver/blockdecoder.rs b/src/receiver/blockdecoder.rs index f4f6c88..b71ba69 100644 --- a/src/receiver/blockdecoder.rs +++ b/src/receiver/blockdecoder.rs @@ -4,7 +4,10 @@ use opentelemetry::{ KeyValue, }; -use crate::common::{alc, oti}; +use crate::common::{ + alc, + oti::{self, SchemeSpecific}, +}; use crate::error::FluteError; use crate::fec; use crate::fec::nocode; @@ -87,20 +90,20 @@ impl BlockDecoder { log::warn!("Not implemented") } oti::FECEncodingID::RaptorQ => { - if oti.raptorq_scheme_specific.is_none() { + if let Some(SchemeSpecific::RaptorQ(scheme)) = oti.scheme_specific.as_ref() { + let codec = fec::raptorq::RaptorQDecoder::new( + sbn, + nb_source_symbols as usize, + oti.encoding_symbol_length as usize, + scheme, + ); + self.decoder = Some(Box::new(codec)); + } else { return Err(FluteError::new("RaptorQ Scheme not found")); } - - let codec = fec::raptorq::RaptorQDecoder::new( - sbn, - nb_source_symbols as usize, - oti.encoding_symbol_length as usize, - oti.raptorq_scheme_specific.as_ref().unwrap(), - ); - self.decoder = Some(Box::new(codec)); } oti::FECEncodingID::Raptor => { - if oti.raptor_scheme_specific.is_none() { + if oti.scheme_specific.is_none() { return Err(FluteError::new("Raptor Scheme not found")); } diff --git a/src/receiver/blockwriter.rs b/src/receiver/blockwriter.rs index 023d104..a4b6931 100644 --- a/src/receiver/blockwriter.rs +++ b/src/receiver/blockwriter.rs @@ -162,6 +162,10 @@ impl BlockWriter { } } + pub fn left(&self) -> usize { + self.bytes_left + } + pub fn is_completed(&self) -> bool { self.bytes_left == 0 } diff --git a/src/receiver/fdtreceiver.rs b/src/receiver/fdtreceiver.rs index 389ec4f..8d79179 100644 --- a/src/receiver/fdtreceiver.rs +++ b/src/receiver/fdtreceiver.rs @@ -1,5 +1,6 @@ +use super::objectreceiver; use super::writer::ObjectWriterBuilder; -use super::{objectreceiver, UDPEndpoint}; +use crate::common::udpendpoint::UDPEndpoint; use crate::common::{alc, fdtinstance::FdtInstance, lct}; use crate::{receiver::writer::ObjectMetadata, tools}; use crate::{receiver::writer::ObjectWriter, tools::error::Result}; @@ -74,7 +75,10 @@ impl FdtReceiver { endpoint, tsi, &lct::TOI_FDT, + Some(fdt_id), fdt_builder, + true, + now, ))), inner: inner.clone(), fdt_instance: None, @@ -158,6 +162,11 @@ impl FdtReceiver { self.get_server_time(now) > expires } + + pub fn get_expiration_time(&self) -> Option { + let inner = self.inner.borrow(); + return inner.expires; + } } impl FdtWriterBuilder { @@ -173,6 +182,7 @@ impl ObjectWriterBuilder for FdtWriterBuilder { _tsi: &u64, _toi: &u128, _meta: Option<&ObjectMetadata>, + _now: std::time::SystemTime, ) -> Box { Box::new(FdtWriter { inner: self.inner.clone(), @@ -189,7 +199,15 @@ impl ObjectWriterBuilder for FdtWriterBuilder { ) { } - fn fdt_received(&self, _endpoint: &UDPEndpoint, _tsi: &u64, _fdt_xml: &str) {} + fn fdt_received( + &self, + _endpoint: &UDPEndpoint, + _tsi: &u64, + _fdt_xml: &str, + _expires: std::time::SystemTime, + _now: std::time::SystemTime, + ) { + } } impl ObjectWriter for FdtWriter { diff --git a/src/receiver/mod.rs b/src/receiver/mod.rs index 1655387..039d428 100644 --- a/src/receiver/mod.rs +++ b/src/receiver/mod.rs @@ -9,7 +9,6 @@ mod multireceiver; mod objectreceiver; mod receiver; mod tsifilter; -mod udpendpoint; mod uncompress; #[cfg(feature = "opentelemetry")] @@ -19,4 +18,3 @@ pub mod writer; pub use multireceiver::MultiReceiver; pub use receiver::Config; pub use receiver::Receiver; -pub use udpendpoint::UDPEndpoint; diff --git a/src/receiver/multireceiver.rs b/src/receiver/multireceiver.rs index bde9836..531e216 100644 --- a/src/receiver/multireceiver.rs +++ b/src/receiver/multireceiver.rs @@ -1,8 +1,8 @@ use super::receiver::{Config, Receiver}; use super::tsifilter::TSIFilter; use super::writer::ObjectWriterBuilder; -use super::UDPEndpoint; use crate::common::alc; +use crate::common::udpendpoint::UDPEndpoint; use crate::tools::error::Result; use std::collections::HashMap; use std::rc::Rc; @@ -43,7 +43,8 @@ impl MultiReceiver { /// ``` /// // Receive objects from Transport Session 1 /// use flute::receiver::writer::ObjectWriterBufferBuilder; - /// use flute::receiver::{MultiReceiver, UDPEndpoint}; + /// use flute::receiver::{MultiReceiver}; + /// use flute::core::UDPEndpoint; /// use std::rc::Rc; /// /// let tsi: u64 = 1; diff --git a/src/receiver/objectreceiver.rs b/src/receiver/objectreceiver.rs index 58c85dd..f4fa8a6 100644 --- a/src/receiver/objectreceiver.rs +++ b/src/receiver/objectreceiver.rs @@ -1,7 +1,7 @@ use super::blockdecoder::BlockDecoder; use super::blockwriter::BlockWriter; use super::writer::ObjectWriterBuilder; -use super::UDPEndpoint; +use crate::common::udpendpoint::UDPEndpoint; use crate::common::{alc, fdtinstance::FdtInstance, lct, oti, partition}; use crate::receiver::writer::{ObjectMetadata, ObjectWriter}; use crate::tools::error::{FluteError, Result}; @@ -44,9 +44,10 @@ pub struct ObjectReceiver { cache_size: usize, blocks: Vec, blocks_variable_size: bool, - transfer_length: Option, + pub transfer_length: Option, cenc: Option, - content_md5: Option, + pub content_md5: Option, + enable_md5_check: bool, a_large: u64, a_small: u64, nb_a_large: u64, @@ -58,6 +59,7 @@ pub struct ObjectReceiver { last_activity: Instant, pub cache_expiration_date: Option, pub content_location: Option, + #[cfg(feature = "opentelemetry")] logger: ObjectReceiverLogger, } @@ -67,7 +69,10 @@ impl ObjectReceiver { endpoint: &UDPEndpoint, tsi: u64, toi: &u128, + _fdt_instance_id: Option, object_writer_builder: Rc, + enable_md5_check: bool, + _now: SystemTime, ) -> ObjectReceiver { log::debug!("Create new Object Receiver with toi {}", toi); ObjectReceiver { @@ -79,6 +84,7 @@ impl ObjectReceiver { transfer_length: None, cenc: None, content_md5: None, + enable_md5_check, blocks_variable_size: false, a_large: 0, a_small: 0, @@ -95,7 +101,7 @@ impl ObjectReceiver { cache_expiration_date: None, content_location: None, #[cfg(feature = "opentelemetry")] - logger: ObjectReceiverLogger::new(tsi, toi.clone()), + logger: ObjectReceiverLogger::new(endpoint, tsi, toi.clone(), _fdt_instance_id, _now), } } @@ -122,7 +128,7 @@ impl ObjectReceiver { self.set_oti_from_pkt(pkt); self.init_blocks_partitioning(); - self.init_object_writer(); + self.init_object_writer(now); self.push_from_cache(now); if self.oti.is_none() { @@ -272,10 +278,22 @@ impl ObjectReceiver { } }; + let mut groups = match fdt.group.as_ref() { + Some(groups) => groups.clone(), + None => vec![], + }; + + match file.group.as_ref() { + Some(group) => groups.append(&mut group.iter().map(|g| g.clone()).collect()), + None => {} + }; + #[cfg(feature = "opentelemetry")] let _span = self.logger.fdt_attached(); - self.content_md5 = file.content_md5.clone(); + if self.enable_md5_check { + self.content_md5 = file.content_md5.clone(); + } self.fdt_instance_id = Some(fdt_instance_id); let cache_duration = file.get_cache_duration(fdt.get_expiration_date(), server_time); @@ -292,17 +310,28 @@ impl ObjectReceiver { content_length: file.content_length.map(|s| s as usize), content_type: file.content_type.clone(), cache_duration, + groups: match groups.is_empty() { + true => None, + false => Some(groups), + }, }); self.init_blocks_partitioning(); - self.init_object_writer(); + self.init_object_writer(now); self.push_from_cache(now); self.write_blocks(0, now) .unwrap_or_else(|_| self.error("Fail to write blocks to storage")); true } - fn init_object_writer(&mut self) { + pub fn byte_left(&self) -> usize { + if let Some(w) = self.block_writer.as_ref() { + return w.left(); + } + 0 + } + + fn init_object_writer(&mut self, now: SystemTime) { if self.object_writer.is_some() { return; } @@ -316,6 +345,7 @@ impl ObjectReceiver { &self.tsi, &self.toi, self.meta.as_ref(), + now, ); assert!(self.block_writer.is_none()); @@ -398,7 +428,7 @@ impl ObjectReceiver { ); self.error(&format!( - "MD5 does not match expects {:?} receiver {:?}", + "MD5 does not match expects {:?} received {:?}", self.content_md5, &md5 )); } diff --git a/src/receiver/objectreceiverlogger.rs b/src/receiver/objectreceiverlogger.rs index fa4e02c..30ecbf0 100644 --- a/src/receiver/objectreceiverlogger.rs +++ b/src/receiver/objectreceiverlogger.rs @@ -1,9 +1,13 @@ +use std::time::SystemTime; + use opentelemetry::{ global::{self, BoxedSpan}, - trace::{Span, Status, TraceContextExt, Tracer}, + trace::{Span, Status, TraceContextExt, TraceId, Tracer}, Context, KeyValue, }; +use crate::common::udpendpoint::UDPEndpoint; + pub struct ObjectReceiverLogger { cx: Context, } @@ -15,36 +19,60 @@ impl std::fmt::Debug for ObjectReceiverLogger { } impl ObjectReceiverLogger { - pub fn new(tsi: u64, toi: u128) -> Self { - let tracer = global::tracer("ObjectReceiverLogger"); + pub fn new( + endpoint: &UDPEndpoint, + tsi: u64, + toi: u128, + fdt_instance_id: Option, + now: SystemTime, + ) -> Self { + let tracer = global::tracer("FluteLogger"); let name = match toi { 0 => "FDT", _ => "FLUTEObject", }; - let mut span = tracer.start(name); - span.add_event( - "object", - vec![ - KeyValue::new("tsi", tsi.to_string()), - KeyValue::new("toi", toi.to_string()), - ], - ); + + let mut span = tracer + .span_builder(name) + .with_trace_id(TraceId::from(endpoint.trace_id( + tsi, + toi, + fdt_instance_id, + now, + ))) + .start(&tracer); + + span.set_attribute(KeyValue::new("flute.toi", toi.to_string())); + span.set_attribute(KeyValue::new("flute.tsi", tsi.to_string())); + span.set_attribute(KeyValue::new("flute.port", endpoint.port.to_string())); + if let Some(source_address) = endpoint.source_address.as_ref() { + span.set_attribute(KeyValue::new( + "flute.source_address", + source_address.to_string(), + )); + } + span.set_attribute(KeyValue::new( + "flute.destination_group_address", + endpoint.destination_group_address.to_string(), + )); + + span.add_event("object", vec![KeyValue::new("start", "")]); let cx = Context::current_with_span(span); Self { cx } } pub fn block_span(&mut self) -> BoxedSpan { - let tracer = global::tracer("ObjectReceiverLogger"); + let tracer = global::tracer("FluteLogger"); tracer.start_with_context("block", &self.cx) } pub fn fdt_attached(&mut self) -> BoxedSpan { - let tracer = global::tracer("ObjectReceiverLogger"); + let tracer = global::tracer("FluteLogger"); tracer.start_with_context("fdt_attached", &self.cx) } pub fn complete(&mut self) -> BoxedSpan { - let tracer = global::tracer("ObjectReceiverLogger"); + let tracer = global::tracer("FluteLogger"); let span = self.cx.span(); span.set_status(Status::Ok); @@ -53,7 +81,7 @@ impl ObjectReceiverLogger { } pub fn error(&mut self, description: &str) -> BoxedSpan { - let tracer = global::tracer("ObjectReceiverLogger"); + let tracer = global::tracer("FluteLogger"); let span = self.cx.span(); span.set_status(Status::Error { diff --git a/src/receiver/receiver.rs b/src/receiver/receiver.rs index 3ba018f..f580958 100644 --- a/src/receiver/receiver.rs +++ b/src/receiver/receiver.rs @@ -1,8 +1,9 @@ +use super::fdtreceiver; use super::fdtreceiver::FdtReceiver; use super::objectreceiver; use super::objectreceiver::ObjectReceiver; use super::writer::ObjectWriterBuilder; -use super::{fdtreceiver, UDPEndpoint}; +use crate::common::udpendpoint::UDPEndpoint; use crate::common::{alc, lct}; use crate::tools::error::FluteError; use crate::tools::error::Result; @@ -34,6 +35,10 @@ pub struct Config { /// Objects expire if no data has been received before this timeout /// `None` Objects never expires, not recommended as object that are not fully reconstructed might continue to consume memory for an finite amount of time. pub object_timeout: Option, + /// Enable MD5 check of the received objects. Default `true` + pub enable_md5_check: bool, + /// CHeck if FDT is already received + pub check_fdt_received: bool, } impl Default for Config { @@ -43,6 +48,8 @@ impl Default for Config { max_objects_error: 0, session_timeout: None, object_timeout: Some(Duration::from_secs(10)), + enable_md5_check: true, + check_fdt_received: true } } } @@ -157,14 +164,20 @@ impl Receiver { let duration = object.last_activity_duration_since(now); if duration.gt(object_timeout) { log::warn!( - "Object Expired ! tsi={} toi={} state : {:?} location: {:?} attached={:?} blocks completed={}/{}", + "Object Expired ! tsi={} toi={} state : {:?} + location: {:?} attached={:?} blocks completed={}/{} duration={:?} max_duration={:?} + transfer_length={:?} byte_left={:?}", object.tsi, object.toi, object.state, object.content_location.as_ref().map(|u| u.to_string()), object.fdt_instance_id, object.nb_block_completed(), - object.nb_block() + object.nb_block(), + duration, + object_timeout, + object.transfer_length, + object.byte_left() ); Some(key.clone()) } else { @@ -233,7 +246,7 @@ impl Receiver { .map(|f| f.fdt_instance_id) .unwrap(); - if self.is_fdt_received(fdt_instance_id) { + if self.config.check_fdt_received && self.is_fdt_received(fdt_instance_id) { return Ok(()); } @@ -268,9 +281,18 @@ impl Receiver { fdtreceiver::State::Complete => {} fdtreceiver::State::Error => return Err(FluteError::new("Fail to decode FDT")), fdtreceiver::State::Expired => { + + let expiration = fdt_receiver.get_expiration_time().unwrap_or(now); + let server_time = fdt_receiver.get_server_time(now); + + let expiration: chrono::DateTime = expiration.into(); + let server_time: chrono::DateTime = server_time.into(); + log::warn!( - "TSI={} FDT has been received but is already expired", - self.tsi + "TSI={} FDT has been received but is already expired expiration time={} server time={}", + self.tsi, + expiration.to_rfc3339(), + server_time.to_rfc3339() ); return Ok(()); } @@ -289,9 +311,15 @@ impl Receiver { } let fdt_current = self.fdt_receivers.remove(&fdt_instance_id); - if let Some(fdt_current) = fdt_current { + if let Some(mut fdt_current) = fdt_current { if let Some(xml) = fdt_current.fdt_xml_str() { - self.writer.fdt_received(&self.endpoint, &self.tsi, &xml); + let expiration_date = fdt_current + .fdt_instance() + .map(|inst| inst.get_expiration_date().unwrap_or(now)) + .unwrap_or(now); + + self.writer + .fdt_received(&self.endpoint, &self.tsi, &xml, expiration_date, now); } self.fdt_current.push_front(fdt_current); self.attach_latest_fdt_to_objects(now); @@ -336,6 +364,32 @@ impl Receiver { let files = fdt_instance.file.as_ref()?; let expiration_date = fdt_instance.get_expiration_date(); + if let Some(true) = fdt_instance.full_fdt { + let files_toi: std::collections::HashMap> = files.iter().map(|f| (f.toi.parse().unwrap_or_default(), f.content_md5.as_ref())).collect(); + let remove_candidates: std::collections::HashMap = self.objects_completed.iter().filter_map(|(toi, meta)| match files_toi.contains_key(toi) { + true => None, + false => Some((toi.clone(), meta.clone())) + }).collect(); + + if !remove_candidates.is_empty() { + let content_locations: std::collections::HashSet<&str> = files.iter().map(|f| f.content_location.as_str()).collect(); + let duration = std::time::Duration::from_secs(4); + for (toi, meta) in &remove_candidates { + let content_location = meta.content_location.to_string(); + if !content_locations.contains(content_location.as_str()) && meta.expiration_date > now + duration { + self.writer.set_cache_duration( + &self.endpoint, + &self.tsi, + &toi, + &meta.content_location, + &duration, + ); + } + } + self.objects_completed.retain(|f, _| !remove_candidates.contains_key(f)); + } + } + for file in files { let toi: u128 = file.toi.parse().unwrap_or_default(); let cache_duration = file.get_cache_duration(expiration_date, server_time); @@ -376,7 +430,14 @@ impl Receiver { } } if self.objects_error.contains(&pkt.lct.toi) { - return Ok(()); + + let payload_id = alc::get_fec_inline_payload_id(pkt)?; + if payload_id.sbn == 0 && payload_id.esi == 0 { + log::warn!("Re-download object after errors"); + self.objects_error.remove(&pkt.lct.toi); + } else { + return Ok(()); + } } let mut obj = self.objects.get_mut(&pkt.lct.toi); @@ -457,6 +518,15 @@ impl Receiver { } fn gc_object_completed(&mut self, now: SystemTime) { + + if let Some(fdt) = self.fdt_current.front_mut() { + if let Some(instance) = fdt.fdt_instance() { + if let Some(true) = instance.full_fdt { + return; + } + } + } + let before = self.objects_completed.len(); self.objects_completed .retain(|_toi, meta| meta.expiration_date > now); @@ -488,9 +558,14 @@ impl Receiver { &self.endpoint, self.tsi, toi, + None, self.writer.clone(), + self.config.enable_md5_check, + now )); + + let mut is_attached = false; let mut fdt_index = 0; for fdt in &mut self.fdt_current.iter_mut() { let fdt_id = fdt.fdt_id; @@ -500,6 +575,7 @@ impl Receiver { if let Some(fdt_instance) = fdt.fdt_instance() { let success = obj.attach_fdt(fdt_id, fdt_instance, now, server_time); if success { + is_attached = true; if fdt_index != 0 { log::warn!( "TSI={} TOI={} Attaching an object to an FDT that is not the latest (index={}) ", @@ -516,6 +592,40 @@ impl Receiver { fdt_index += 1; } + if is_attached == false { + log::warn!("Object received before the FDT"); + } + self.objects.insert(toi.clone(), obj); } } + + + +impl Drop for Receiver { + fn drop(&mut self) { + + log::info!("Drop Flute Receiver"); + + if let Some(fdt) = self.fdt_current.front_mut() { + + if let Some(instance) = fdt.fdt_instance() { + if instance.full_fdt == Some(true) { + let duration = std::time::Duration::from_secs(0); + for obj in &self.objects_completed { + log::info!("Remove from cache {}", &obj.1.content_location.to_string()); + self.writer.set_cache_duration( + &self.endpoint, + &self.tsi, + &obj.0, + &obj.1.content_location, + &duration, + ); + } + + } + } + + } + } +} diff --git a/src/receiver/tsifilter.rs b/src/receiver/tsifilter.rs index 2c5a192..7a899c9 100644 --- a/src/receiver/tsifilter.rs +++ b/src/receiver/tsifilter.rs @@ -1,4 +1,4 @@ -use super::UDPEndpoint; +use crate::common::udpendpoint::UDPEndpoint; struct TSI { endpoints: std::collections::HashMap, diff --git a/src/receiver/udpendpoint.rs b/src/receiver/udpendpoint.rs deleted file mode 100644 index 5ab12a0..0000000 --- a/src/receiver/udpendpoint.rs +++ /dev/null @@ -1,23 +0,0 @@ -use serde::{Deserialize, Serialize}; - -/// UDP Endpoint -#[derive(Debug, PartialEq, Deserialize, Serialize, Clone, Eq, Hash)] -pub struct UDPEndpoint { - /// Network source adress - pub source_address: Option, - /// Network destination group address (multicast ip) - pub destination_group_address: String, - /// port - pub port: u16, -} - -impl UDPEndpoint { - /// Create a new UDP Endpoint - pub fn new(src: Option, dest: String, port: u16) -> Self { - Self { - source_address: src, - destination_group_address: dest, - port: port, - } - } -} diff --git a/src/receiver/writer/mod.rs b/src/receiver/writer/mod.rs index c142268..3f0d7f4 100644 --- a/src/receiver/writer/mod.rs +++ b/src/receiver/writer/mod.rs @@ -12,6 +12,7 @@ use std::time::Duration; +use crate::common::udpendpoint::UDPEndpoint; use crate::tools::error::Result; /// @@ -29,6 +30,8 @@ pub struct ObjectMetadata { pub content_type: Option, /// Object cache duration hint pub cache_duration: Option, + /// List of groups + pub groups: Option>, } /// @@ -42,6 +45,7 @@ pub trait ObjectWriterBuilder { tsi: &u64, toi: &u128, meta: Option<&ObjectMetadata>, + now: std::time::SystemTime, ) -> Box; /// Update cache duration of an object fn set_cache_duration( @@ -53,7 +57,14 @@ pub trait ObjectWriterBuilder { duration: &Duration, ); /// Called when an FDT is received - fn fdt_received(&self, endpoint: &UDPEndpoint, tsi: &u64, fdt_xml: &str); + fn fdt_received( + &self, + endpoint: &UDPEndpoint, + tsi: &u64, + fdt_xml: &str, + expires: std::time::SystemTime, + now: std::time::SystemTime, + ); } /// @@ -90,5 +101,3 @@ pub use objectwriterbuffer::ObjectWriterBufferBuilder; pub use objectwriterfs::ObjectWriterFS; pub use objectwriterfs::ObjectWriterFSBuilder; - -use super::UDPEndpoint; diff --git a/src/receiver/writer/objectwriterbuffer.rs b/src/receiver/writer/objectwriterbuffer.rs index 47c796e..47f7e35 100644 --- a/src/receiver/writer/objectwriterbuffer.rs +++ b/src/receiver/writer/objectwriterbuffer.rs @@ -1,5 +1,5 @@ use super::{ObjectMetadata, ObjectWriter, ObjectWriterBuilder}; -use crate::{receiver::UDPEndpoint, tools::error::Result}; +use crate::{tools::error::Result, common::udpendpoint::UDPEndpoint}; use std::{cell::RefCell, rc::Rc}; /// @@ -48,6 +48,7 @@ impl ObjectWriterBuilder for ObjectWriterBufferBuilder { _tsi: &u64, _toi: &u128, meta: Option<&ObjectMetadata>, + _now: std::time::SystemTime, ) -> Box { let obj = Rc::new(RefCell::new(ObjectWriterBuffer { complete: false, @@ -71,7 +72,15 @@ impl ObjectWriterBuilder for ObjectWriterBufferBuilder { ) { } - fn fdt_received(&self, _endpoint: &UDPEndpoint, _tsi: &u64, _fdt_xml: &str) {} + fn fdt_received( + &self, + _endpoint: &UDPEndpoint, + _tsi: &u64, + _fdt_xml: &str, + _expires: std::time::SystemTime, + _now: std::time::SystemTime, + ) { + } } impl ObjectWriter for ObjectWriterBufferWrapper { diff --git a/src/receiver/writer/objectwriterfs.rs b/src/receiver/writer/objectwriterfs.rs index e7c455a..e49ae83 100644 --- a/src/receiver/writer/objectwriterfs.rs +++ b/src/receiver/writer/objectwriterfs.rs @@ -1,7 +1,7 @@ use super::{ObjectMetadata, ObjectWriter, ObjectWriterBuilder}; use crate::{ + common::udpendpoint::UDPEndpoint, error::{FluteError, Result}, - receiver::UDPEndpoint, }; use std::{cell::RefCell, io::Write}; @@ -33,6 +33,7 @@ impl ObjectWriterBuilder for ObjectWriterFSBuilder { _tsi: &u64, _toi: &u128, meta: Option<&ObjectMetadata>, + _now: std::time::SystemTime, ) -> Box { let obj = Box::new(ObjectWriterFS { dest: self.dest.clone(), @@ -55,7 +56,15 @@ impl ObjectWriterBuilder for ObjectWriterFSBuilder { ) { } - fn fdt_received(&self, _endpoint: &UDPEndpoint, _tsi: &u64, _fdt_xml: &str) {} + fn fdt_received( + &self, + _endpoint: &UDPEndpoint, + _tsi: &u64, + _fdt_xml: &str, + _expires: std::time::SystemTime, + _now: std::time::SystemTime, + ) { + } } /// diff --git a/src/sender/block.rs b/src/sender/block.rs index ffc4e1e..57166ea 100644 --- a/src/sender/block.rs +++ b/src/sender/block.rs @@ -1,4 +1,4 @@ -use crate::common::oti::{self, Oti}; +use crate::common::oti::{self, Oti, SchemeSpecific}; use crate::fec::{self, FecShard}; use crate::fec::{DataFecShard, FecEncoder}; use crate::tools::error::{FluteError, Result}; @@ -121,15 +121,21 @@ impl Block { ) -> Result>> { assert!(nb_source_symbols <= oti.maximum_source_block_length as usize); assert!(nb_source_symbols <= block_length as usize); - assert!(oti.raptorq_scheme_specific.is_some()); - let encoder = fec::raptorq::RaptorQEncoder::new( - nb_source_symbols, - oti.max_number_of_parity_symbols as usize, - oti.encoding_symbol_length as usize, - oti.raptorq_scheme_specific.as_ref().unwrap(), - ); - let shards = encoder.encode(&buffer)?; - Ok(shards) + assert!(oti.scheme_specific.is_some()); + + if let Some(SchemeSpecific::RaptorQ(scheme)) = oti.scheme_specific.as_ref() { + let encoder = fec::raptorq::RaptorQEncoder::new( + nb_source_symbols, + oti.max_number_of_parity_symbols as usize, + oti.encoding_symbol_length as usize, + scheme, + ); + + let shards = encoder.encode(&buffer)?; + Ok(shards) + } else { + return Err(FluteError::new("Scheme specific for Raptorq not defined")); + } } fn create_shards_raptor( @@ -140,7 +146,8 @@ impl Block { ) -> Result>> { assert!(nb_source_symbols <= oti.maximum_source_block_length as usize); assert!(nb_source_symbols <= block_length as usize); - assert!(oti.raptor_scheme_specific.is_some()); + assert!(oti.scheme_specific.is_some()); + let encoder = fec::raptor::RaptorEncoder::new( nb_source_symbols, oti.max_number_of_parity_symbols as usize, diff --git a/src/sender/fdt.rs b/src/sender/fdt.rs index be07e9a..4720993 100644 --- a/src/sender/fdt.rs +++ b/src/sender/fdt.rs @@ -1,6 +1,7 @@ use super::filedesc::FileDesc; use super::objectdesc; use super::observer::ObserverList; +use super::toiallocator::{Toi, ToiAllocator}; use crate::common::{fdtinstance::FdtInstance, lct, oti}; use crate::sender::observer; use crate::sender::TOIMaxLength; @@ -30,6 +31,8 @@ pub struct Fdt { inband_sct: bool, last_publish: Option, observers: ObserverList, + groups: Option>, + toi_allocator: Arc, } impl Fdt { @@ -44,6 +47,7 @@ impl Fdt { observers: ObserverList, toi_max_length: TOIMaxLength, toi_initial_value: Option, + groups: Option>, ) -> Fdt { let toi = match toi_initial_value { Some(0) => 1, @@ -54,7 +58,9 @@ impl Fdt { match toi_max_length { TOIMaxLength::ToiMax16 => toi &= 0xFFFFu128, TOIMaxLength::ToiMax32 => toi &= 0xFFFFFFFFu128, + TOIMaxLength::ToiMax48 => toi &= 0xFFFFFFFFFFFFu128, TOIMaxLength::ToiMax64 => toi &= 0xFFFFFFFFFFFFFFFFu128, + TOIMaxLength::ToiMax80 => toi &= 0xFFFFFFFFFFFFFFFFFFFFu128, TOIMaxLength::ToiMax112 => {} }; if toi == 0 { @@ -82,6 +88,8 @@ impl Fdt { last_publish: None, observers, toi_max_length, + groups, + toi_allocator: ToiAllocator::new(), } } @@ -142,13 +150,19 @@ impl Fdt { full_fdt: None, base_url_1: None, base_url_2: None, - group: None, + group: self.groups.clone(), mbms_session_identity_expiry: None, schema_version: Some(4), delimiter: Some(0), } } + pub fn allocate_toi(&mut self) -> Arc { + let ret = ToiAllocator::allocate(&self.toi_allocator, self.toi); + self.inc_toi(); + ret + } + pub fn add_object(&mut self, obj: Box) -> Result { if self.complete == Some(true) { return Err(FluteError::new( @@ -156,8 +170,15 @@ impl Fdt { )); } - let toi = self.toi; - self.inc_toi(); + let toi = match obj.toi.as_ref() { + Some(toi) => toi.get(), + None => { + let ret = self.toi; + self.inc_toi(); + ret + } + }; + let filedesc = Arc::new(FileDesc::new(obj, &self.oti, &toi, None, false)?); assert!(self.files.contains_key(&filedesc.toi) == false); @@ -172,7 +193,9 @@ impl Fdt { match self.toi_max_length { TOIMaxLength::ToiMax16 => self.toi &= 0xFFFFu128, TOIMaxLength::ToiMax32 => self.toi &= 0xFFFFFFFFu128, + TOIMaxLength::ToiMax48 => self.toi &= 0xFFFFFFFFFFFFu128, TOIMaxLength::ToiMax64 => self.toi &= 0xFFFFFFFFFFFFFFFFu128, + TOIMaxLength::ToiMax80 => self.toi &= 0xFFFFFFFFFFFFFFFFFFFFu128, TOIMaxLength::ToiMax112 => {} } @@ -180,11 +203,11 @@ impl Fdt { self.toi = 1; } - if !self.files.contains_key(&self.toi) { + if !self.files.contains_key(&self.toi) && !self.toi_allocator.contains(&self.toi) { break; } - log::warn!("TOI {} is already used by a file", self.toi) + log::warn!("TOI {} is already used by a file or reserved", self.toi) } } @@ -224,6 +247,7 @@ impl Fdt { 1, Some(self.carousel), None, + self.groups.clone(), self.cenc, true, None, @@ -408,10 +432,7 @@ mod tests { use super::objectdesc; use super::oti; - #[test] - pub fn test_fdt() { - crate::tests::init(); - + fn create_fdt() -> super::Fdt { let oti: oti::Oti = Default::default(); let mut fdt = super::Fdt::new( 10, @@ -424,14 +445,16 @@ mod tests { ObserverList::new(), crate::sender::TOIMaxLength::ToiMax112, Some(1), + Some(vec!["Group1".to_owned()]), ); - let obj = objectdesc::ObjectDesc::create_from_buffer( + let obj1 = objectdesc::ObjectDesc::create_from_buffer( &Vec::new(), - "txt", - &url::Url::parse("file:///").unwrap(), + "plain/txt", + &url::Url::parse("file:///object1").unwrap(), 2, None, None, + Some(vec!["Test1".to_owned()]), lct::Cenc::Null, true, None, @@ -439,10 +462,63 @@ mod tests { ) .unwrap(); - fdt.add_object(obj).unwrap(); + let obj2 = objectdesc::ObjectDesc::create_from_buffer( + &Vec::new(), + "plain/txt", + &url::Url::parse("file:///object2").unwrap(), + 2, + None, + None, + None, + lct::Cenc::Gzip, + true, + None, + true, + ) + .unwrap(); + fdt.add_object(obj1).unwrap(); + fdt.add_object(obj2).unwrap(); + fdt + } + + #[test] + pub fn test_fdt() { + use std::{io::Write, process::Command}; + + crate::tests::init(); + let fdt = create_fdt(); let buffer = fdt.to_xml(SystemTime::now()).unwrap(); let content = String::from_utf8(buffer.clone()).unwrap(); - log::info!("content={}", content); + log::info!("{}", content); + + let check_fdt_folder = "./assets/xsd/"; + let xsd_filename = "FLUTE-FDT-3GPP-Main.xsd"; + let xsd_path = std::path::Path::new(check_fdt_folder).join(xsd_filename); + + let xml_generated_data = String::from_utf8(buffer).unwrap(); + let tmp_fdt_file = tempfile::Builder::new() + .prefix("TempFile") + .suffix(".xml") + .tempfile() + .unwrap(); + write!(&tmp_fdt_file, "{}", &xml_generated_data).unwrap(); + + let output = Command::new("xmllint") + .arg("--schema") + .arg(xsd_path) + .arg(&tmp_fdt_file.path()) + .arg("--noout") + .output() + .expect("failed to execute process"); + + let output_print = std::str::from_utf8(&output.stderr).expect("ascii to text went wrong "); + + assert!( + output.status.success(), + "\n\nValidation failed\n\n{}\n\n", + output_print + ); + // log::info!("content={}", content); } } diff --git a/src/sender/filedesc.rs b/src/sender/filedesc.rs index 23dff25..80a665e 100644 --- a/src/sender/filedesc.rs +++ b/src/sender/filedesc.rs @@ -1,4 +1,5 @@ use super::objectdesc::{create_fdt_cache_control, ObjectDesc}; +use crate::common::oti::SchemeSpecific; use crate::common::{fdtinstance, oti, partition}; use crate::error::{FluteError, Result}; use std::sync::RwLock; @@ -8,6 +9,7 @@ use std::time::SystemTime; struct TransferInfo { transferring: bool, transfer_count: u32, + total_nb_transfer: u64, last_transfer: Option, } @@ -54,7 +56,7 @@ impl FileDesc { ); if oti.fec_encoding_id == oti::FECEncodingID::RaptorQ { - if oti.raptorq_scheme_specific.is_none() { + if oti.scheme_specific.is_none() { return Err(FluteError::new( "FEC RaptorQ is selected, however scheme parameters are not defined", )); @@ -68,10 +70,11 @@ impl FileDesc { )) })?; - let scheme = oti.raptorq_scheme_specific.as_mut().unwrap(); - scheme.source_blocks_length = nb_blocks; + if let SchemeSpecific::RaptorQ(scheme) = oti.scheme_specific.as_mut().unwrap() { + scheme.source_blocks_length = nb_blocks; + } } else if oti.fec_encoding_id == oti::FECEncodingID::Raptor { - if oti.raptor_scheme_specific.is_none() { + if oti.scheme_specific.is_none() { return Err(FluteError::new( "FEC Raptor is selected, however scheme parameters are not defined", )); @@ -85,8 +88,9 @@ impl FileDesc { )) })?; - let scheme = oti.raptor_scheme_specific.as_mut().unwrap(); - scheme.source_blocks_length = nb_blocks; + if let SchemeSpecific::Raptor(scheme) = oti.scheme_specific.as_mut().unwrap() { + scheme.source_blocks_length = nb_blocks; + } } } @@ -100,10 +104,16 @@ impl FileDesc { transferring: false, transfer_count: 0, last_transfer: None, + total_nb_transfer: 0, }), }) } + pub fn _total_nb_transfer(&self) -> u64 { + let info = self.transfer_info.read().unwrap(); + info.total_nb_transfer + } + pub fn transfer_started(&self) { let mut info = self.transfer_info.write().unwrap(); info.transferring = true; @@ -120,6 +130,7 @@ impl FileDesc { assert!(info.transferring == true); info.transferring = false; info.transfer_count += 1; + info.total_nb_transfer += 1; info.last_transfer = Some(now); } diff --git a/src/sender/mod.rs b/src/sender/mod.rs index db9be9a..943b0a3 100644 --- a/src/sender/mod.rs +++ b/src/sender/mod.rs @@ -11,6 +11,10 @@ mod objectdesc; mod observer; mod sender; mod sendersession; +mod toiallocator; + +#[cfg(feature = "opentelemetry")] +mod objectsenderlogger; pub use crate::common::lct::Cenc; pub use crate::common::oti::FECEncodingID; @@ -24,3 +28,4 @@ pub use observer::Subscriber; pub use sender::Config; pub use sender::Sender; pub use sender::TOIMaxLength; +pub use toiallocator::Toi; diff --git a/src/sender/objectdesc.rs b/src/sender/objectdesc.rs index a151e43..4176270 100644 --- a/src/sender/objectdesc.rs +++ b/src/sender/objectdesc.rs @@ -1,10 +1,12 @@ use base64::Engine; use super::compress; +use super::toiallocator::Toi; use crate::common::{fdtinstance, lct, oti}; use crate::tools; use crate::tools::error::Result; use std::ffi::OsStr; +use std::sync::Arc; use std::time::SystemTime; /// Cache Control @@ -82,6 +84,10 @@ pub struct ObjectDesc { pub carousel_delay: Option, /// Define object cache control pub cache_control: Option, + /// Add file to a list of groups + pub groups: Option>, + /// Assign an optional TOI to this object + pub toi: Option>, } impl ObjectDesc { @@ -93,6 +99,7 @@ impl ObjectDesc { max_transfer_count: u32, carousel_delay: Option, cache_control: Option, + groups: Option>, cenc: lct::Cenc, inband_cenc: bool, oti: Option, @@ -119,6 +126,7 @@ impl ObjectDesc { max_transfer_count, carousel_delay, cache_control, + groups, cenc, inband_cenc, oti, @@ -134,6 +142,7 @@ impl ObjectDesc { max_transfer_count: u32, carousel_delay: Option, cache_control: Option, + groups: Option>, cenc: lct::Cenc, inband_cenc: bool, oti: Option, @@ -147,6 +156,7 @@ impl ObjectDesc { max_transfer_count, carousel_delay, cache_control, + groups, cenc, inband_cenc, oti, @@ -162,6 +172,7 @@ impl ObjectDesc { max_transfer_count: u32, carousel_delay: Option, cache_control: Option, + groups: Option>, cenc: lct::Cenc, inband_cenc: bool, oti: Option, @@ -203,6 +214,8 @@ impl ObjectDesc { max_transfer_count, carousel_delay, cache_control, + groups, + toi: None, })) } } diff --git a/src/sender/objectsenderlogger.rs b/src/sender/objectsenderlogger.rs new file mode 100644 index 0000000..79da4d1 --- /dev/null +++ b/src/sender/objectsenderlogger.rs @@ -0,0 +1,68 @@ +use std::time::SystemTime; + +use opentelemetry::{ + global::{self}, + trace::{Span, TraceContextExt, TraceId, Tracer}, + Context, KeyValue, +}; + +use crate::common::udpendpoint::UDPEndpoint; + +pub struct ObjectSenderLogger { + _cx: Context, +} + +impl std::fmt::Debug for ObjectSenderLogger { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ObjectSenderLogger").finish() + } +} + +impl ObjectSenderLogger { + pub fn new( + endpoint: &UDPEndpoint, + tsi: u64, + toi: u128, + fdt_instance_id: Option, + now: SystemTime, + ) -> Self { + let tracer = global::tracer("FluteLogger"); + let name = match toi { + 0 => "FDT Transfer", + _ => "Object Transfer", + }; + + let trace_id = endpoint.trace_id(tsi, toi, fdt_instance_id, now); + + log::info!( + "Create {:?} {} {} {:?} trace_id={:?}", + endpoint, + tsi, + toi, + fdt_instance_id, + trace_id + ); + let mut span = tracer + .span_builder(name) + .with_trace_id(TraceId::from(trace_id)) + .start(&tracer); + + span.set_attribute(KeyValue::new("flute.toi", toi.to_string())); + span.set_attribute(KeyValue::new("flute.tsi", tsi.to_string())); + span.set_attribute(KeyValue::new("flute.port", endpoint.port.to_string())); + if let Some(source_address) = endpoint.source_address.as_ref() { + span.set_attribute(KeyValue::new( + "flute.source_address", + source_address.to_string(), + )); + } + span.set_attribute(KeyValue::new( + "flute.destination_group_address", + endpoint.destination_group_address.to_string(), + )); + + span.add_event("object", vec![KeyValue::new("start", "")]); + let cx = Context::current_with_span(span); + Self { _cx: cx } + } +} diff --git a/src/sender/sender.rs b/src/sender/sender.rs index 59204ca..5b17c0b 100644 --- a/src/sender/sender.rs +++ b/src/sender/sender.rs @@ -1,8 +1,9 @@ use super::fdt::Fdt; use super::observer::ObserverList; use super::sendersession::SenderSession; -use super::{objectdesc, Subscriber}; +use super::{objectdesc, Subscriber, Toi}; use crate::common::{alc, lct, oti, Profile}; +use crate::core::UDPEndpoint; use crate::tools::error::Result; use std::sync::Arc; use std::time::SystemTime; @@ -14,8 +15,12 @@ pub enum TOIMaxLength { ToiMax16, /// 32 bits ToiMax32, + /// 48 bits + ToiMax48, /// 64 bits ToiMax64, + /// 80 bits + ToiMax80, /// 112 bits ToiMax112, } @@ -52,6 +57,8 @@ pub struct Config { /// TOI value must be > 0 /// None : Initialize the TOI to a random value pub toi_initial_value: Option, + /// List of groups added to the FDT-Instance + pub groups: Option>, } impl Default for Config { @@ -67,6 +74,7 @@ impl Default for Config { profile: Profile::RFC6726, toi_max_length: TOIMaxLength::ToiMax112, toi_initial_value: Some(1), + groups: None, } } } @@ -78,17 +86,19 @@ impl Default for Config { #[derive(Debug)] pub struct Sender { fdt: Fdt, + fdt_session: SenderSession, sessions: Vec, session_index: usize, observers: ObserverList, tsi: u64, + endpoint: UDPEndpoint, } impl Sender { /// /// Creation of a FLUTE Sender /// - pub fn new(tsi: u64, oti: &oti::Oti, config: &Config) -> Sender { + pub fn new(endpoint: UDPEndpoint, tsi: u64, oti: &oti::Oti, config: &Config) -> Sender { let observers = ObserverList::new(); let fdt = Fdt::new( @@ -102,6 +112,7 @@ impl Sender { observers.clone(), config.toi_max_length, config.toi_initial_value, + config.groups.clone(), ); let multiplex_files = match config.multiplex_files { @@ -109,23 +120,34 @@ impl Sender { n => n + 1, }; - let sessions = (0..multiplex_files) - .map(|index| { + let fdt_session = SenderSession::new( + tsi, + config.interleave_blocks as usize, + true, + config.profile, + endpoint.clone(), + ); + + let sessions = (0..multiplex_files - 1) + .map(|_| { SenderSession::new( tsi, config.interleave_blocks as usize, - index == 0, + false, config.profile, + endpoint.clone(), ) }) .collect(); Sender { fdt, + fdt_session, sessions, session_index: 0, observers, tsi, + endpoint, } } @@ -139,10 +161,22 @@ impl Sender { self.observers.unsubscribe(s); } + /// Get UDP endpoint + pub fn get_udp_endpoint(&self) -> &UDPEndpoint { + &self.endpoint + } + + /// Get TSI + pub fn get_tsi(&self) -> u64 { + self.tsi + } + /// Add an object to the FDT /// /// After calling this function, a call to `publish()` to publish your modifications /// + /// If a TOI as been set to the ObjectDesc, there is no need to release it + /// /// # Returns /// /// A `Result` containing an `u128` representing the unique identifier of the added object (TOI), if the operation was successful. @@ -192,10 +226,20 @@ impl Sender { alc::new_alc_pkt_close_session(&0u128, self.tsi) } + /// Allocate a TOI + /// TOI must be either release or assigned to an object and call add_object()` + pub fn allocate_toi(&mut self) -> Arc { + self.fdt.allocate_toi() + } + /// Read the next ALC/LCT packet /// return None if there is no new packet to be transferred /// ALC/LCT packet should be encapsulated into a UDP/IP payload and transferred via UDP/multicast pub fn read(&mut self, now: SystemTime) -> Option> { + if let Some(fdt_data) = self.fdt_session.run(&mut self.fdt, now) { + return Some(fdt_data); + } + let session_index_orig = self.session_index; loop { let session = self.sessions.get_mut(self.session_index).unwrap(); @@ -222,6 +266,7 @@ impl Sender { mod tests { use crate::common::lct; + use crate::core::UDPEndpoint; use super::objectdesc; use super::oti; @@ -235,6 +280,7 @@ mod tests { 1, None, None, + None, lct::Cenc::Null, true, None, @@ -248,7 +294,8 @@ mod tests { crate::tests::init(); let oti: oti::Oti = Default::default(); - let mut sender = super::Sender::new(1, &oti, &Default::default()); + let endpoint = UDPEndpoint::new(None, "224.0.0.1".to_owned(), 1234); + let mut sender = super::Sender::new(endpoint, 1, &oti, &Default::default()); let nb_pkt = oti.encoding_symbol_length as usize * 3; @@ -266,9 +313,10 @@ mod tests { pub fn test_sender_file_too_large() { crate::tests::init(); let oti = oti::Oti::new_no_code(4, 2); + let endpoint = UDPEndpoint::new(None, "224.0.0.1".to_owned(), 1234); // Create a buffer larger that the max transfer length let object = create_obj(oti.max_transfer_length() + 1); - let mut sender = super::Sender::new(1, &oti, &Default::default()); + let mut sender = super::Sender::new(endpoint, 1, &oti, &Default::default()); let res = sender.add_object(object); assert!(res.is_err()); } @@ -279,8 +327,8 @@ mod tests { let oti = Default::default(); let object = create_obj(1024); - - let mut sender = super::Sender::new(1, &oti, &Default::default()); + let endpoint = UDPEndpoint::new(None, "224.0.0.1".to_owned(), 1234); + let mut sender = super::Sender::new(endpoint, 1, &oti, &Default::default()); assert!(sender.nb_objects() == 0); let toi = sender.add_object(object).unwrap(); @@ -296,7 +344,8 @@ mod tests { crate::tests::init(); let oti = Default::default(); - let mut sender = super::Sender::new(1, &oti, &Default::default()); + let endpoint = UDPEndpoint::new(None, "224.0.0.1".to_owned(), 1234); + let mut sender = super::Sender::new(endpoint, 1, &oti, &Default::default()); let object1 = create_obj(1024); let object2 = create_obj(1024); diff --git a/src/sender/sendersession.rs b/src/sender/sendersession.rs index 93c3dae..9a9dffd 100644 --- a/src/sender/sendersession.rs +++ b/src/sender/sendersession.rs @@ -1,19 +1,26 @@ use super::blockencoder::BlockEncoder; use super::fdt::Fdt; use super::filedesc::FileDesc; +#[cfg(feature = "opentelemetry")] +use super::objectsenderlogger::ObjectSenderLogger; use super::Profile; use crate::common::alc; +use crate::core::UDPEndpoint; use std::sync::Arc; use std::time::SystemTime; +#[allow(dead_code)] #[derive(Debug)] pub struct SenderSession { + endpoint: UDPEndpoint, tsi: u64, file: Option>, encoder: Option, interleave_blocks: usize, transfer_fdt_only: bool, profile: Profile, + #[cfg(feature = "opentelemetry")] + logger: Option, } impl SenderSession { @@ -22,14 +29,18 @@ impl SenderSession { interleave_blocks: usize, transfer_fdt_only: bool, profile: Profile, + endpoint: UDPEndpoint, ) -> SenderSession { SenderSession { + endpoint, tsi, file: None, encoder: None, interleave_blocks, transfer_fdt_only, profile, + #[cfg(feature = "opentelemetry")] + logger: None, } } @@ -74,6 +85,21 @@ impl SenderSession { if self.file.is_none() { return; } + + #[cfg(feature = "opentelemetry")] + if !self.transfer_fdt_only { + let file = self.file.as_ref().unwrap(); + if file._total_nb_transfer() == 0 { + self.logger = Some(ObjectSenderLogger::new( + &self.endpoint, + self.tsi, + file.toi, + file.fdt_id, + now + )); + } + } + self.encoder = Some(BlockEncoder::new( self.file.as_ref().unwrap().clone(), self.interleave_blocks, @@ -87,5 +113,11 @@ impl SenderSession { }; self.file = None; self.encoder = None; + + #[cfg(feature = "opentelemetry")] + { + self.logger = None; + } + } } diff --git a/src/sender/toiallocator.rs b/src/sender/toiallocator.rs new file mode 100644 index 0000000..2c72c26 --- /dev/null +++ b/src/sender/toiallocator.rs @@ -0,0 +1,58 @@ +use std::sync::{Arc, Mutex}; + +#[derive(Debug)] +pub struct ToiAllocator { + toi_reserved: Mutex>, +} + +/// Struct containing a TOI +#[derive(Debug)] +pub struct Toi { + allocator: Arc, + value: u128, +} + +impl Drop for Toi { + fn drop(&mut self) { + self.allocator.release(self.value); + } +} + +impl Toi { + /// Get Value of TOI + pub fn get(&self) -> u128 { + self.value + } +} + +impl ToiAllocator { + pub fn new() -> Arc { + Arc::new(Self { + toi_reserved: Mutex::new(std::collections::HashSet::new()), + }) + } + + pub fn allocate(allocator: &Arc, toi: u128) -> Arc { + { + let mut db = allocator.toi_reserved.lock().unwrap(); + let success = db.insert(toi); + assert!(success); + } + + Arc::new(Toi { + allocator: allocator.clone(), + value: toi, + }) + } + + pub fn release(&self, toi: u128) { + let mut db = self.toi_reserved.lock().unwrap(); + let success = db.remove(&toi); + assert!(success); + } + + pub fn contains(&self, toi: &u128) -> bool { + let db = self.toi_reserved.lock().unwrap(); + db.contains(toi) + } +} diff --git a/tests/flute.rs b/tests/flute.rs index cbee929..aa96020 100644 --- a/tests/flute.rs +++ b/tests/flute.rs @@ -1,5 +1,5 @@ mod tests { - use flute::receiver::UDPEndpoint; + use flute::core::UDPEndpoint; use rand::RngCore; use std::rc::Rc; @@ -25,7 +25,8 @@ mod tests { fdt_cenc: cenc, ..Default::default() }); - let mut sender = Box::new(sender::Sender::new(1, &oti, &config)); + let endpoint = UDPEndpoint::new(None, "224.0.0.1".to_owned(), 5000); + let mut sender = Box::new(sender::Sender::new(endpoint, 1, &oti, &config)); sender .add_object( sender::ObjectDesc::create_from_buffer( @@ -35,6 +36,7 @@ mod tests { 1, None, None, + None, cenc, inband_cenc, object_oti.map(|e| e.clone()), @@ -72,7 +74,7 @@ mod tests { break; } - if (i & 3) == 0 { + if (i & 7) == 0 { log::info!("ALC pkt {} is lost", i) } else { receiver