From 3ceb0b3ed1d9b57f57bb4032f4aaf055fd8b1786 Mon Sep 17 00:00:00 2001 From: laurent Date: Thu, 12 Sep 2024 19:04:45 +0200 Subject: [PATCH] Avoid sending incomplete ogg packets. --- Cargo.toml | 2 +- src/opus.rs | 33 +++++++++++++++++++++++++-------- 2 files changed, 26 insertions(+), 9 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index f4f0993..04e838f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "sphn" -version = "0.1.3" +version = "0.1.4" edition = "2021" license = "MIT/Apache-2.0" description = "pyo3 wrappers to read/write audio files" diff --git a/src/opus.rs b/src/opus.rs index d071ff2..ee79318 100644 --- a/src/opus.rs +++ b/src/opus.rs @@ -200,16 +200,30 @@ pub fn write_ogg_stereo( } } -struct BufferStreamW(std::sync::mpsc::Sender>); +struct BufferStreamW { + sender: std::sync::mpsc::Sender>, + page_buffer: Vec, +} + +impl BufferStreamW { + fn new(sender: std::sync::mpsc::Sender>) -> Self { + let page_buffer = Vec::with_capacity(32768); + Self { sender, page_buffer } + } + + fn on_end_of_packet(&mut self) -> Result<()> { + if !self.page_buffer.is_empty() { + let mut to_send = Vec::with_capacity(32768); + std::mem::swap(&mut self.page_buffer, &mut to_send); + self.sender.send(to_send)? + } + Ok(()) + } +} impl std::io::Write for BufferStreamW { fn write(&mut self, buf: &[u8]) -> std::io::Result { - if self.0.send(buf.to_vec()).is_err() { - return Err(std::io::Error::new( - std::io::ErrorKind::NotConnected, - "opus stream writer error".to_string(), - )); - }; + self.page_buffer.extend_from_slice(buf); Ok(buf.len()) } @@ -234,14 +248,16 @@ impl StreamWriter { let encoder = opus::Encoder::new(sample_rate, opus::Channels::Mono, opus::Application::Voip)?; let (tx, rx) = std::sync::mpsc::channel(); - let mut pw = ogg::PacketWriter::new(BufferStreamW(tx)); + let mut pw = ogg::PacketWriter::new(BufferStreamW::new(tx)); let out_encoded = vec![0u8; 50_000]; let mut head = Vec::new(); write_opus_header(&mut head, 1u8, sample_rate)?; pw.write_packet(head, 42, ogg::PacketWriteEndInfo::EndPage, 0)?; + pw.inner_mut().on_end_of_packet()?; let mut tags = Vec::new(); write_opus_tags(&mut tags)?; pw.write_packet(tags, 42, ogg::PacketWriteEndInfo::EndPage, 0)?; + pw.inner_mut().on_end_of_packet()?; Ok(Self { pw, encoder, out_encoded, total_data: 0, rx }) } @@ -256,6 +272,7 @@ impl StreamWriter { let msg = self.out_encoded[..size].to_vec(); self.total_data += pcm.len() as u64; self.pw.write_packet(msg, 42, ogg::PacketWriteEndInfo::EndPage, self.total_data)?; + self.pw.inner_mut().on_end_of_packet()?; Ok(()) }