Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid sending incomplete ogg packets. #4

Merged
merged 1 commit into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "sphn"
version = "0.1.3"
version = "0.1.4"
edition = "2021"
license = "MIT/Apache-2.0"
description = "pyo3 wrappers to read/write audio files"
Expand Down
33 changes: 25 additions & 8 deletions src/opus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,16 +200,30 @@ pub fn write_ogg_stereo<W: std::io::Write>(
}
}

struct BufferStreamW(std::sync::mpsc::Sender<Vec<u8>>);
struct BufferStreamW {
sender: std::sync::mpsc::Sender<Vec<u8>>,
page_buffer: Vec<u8>,
}

impl BufferStreamW {
fn new(sender: std::sync::mpsc::Sender<Vec<u8>>) -> Self {
let page_buffer = Vec::with_capacity(32768);
Self { sender, page_buffer }
}

fn on_end_of_packet(&mut self) -> Result<()> {
if !self.page_buffer.is_empty() {
let mut to_send = Vec::with_capacity(32768);
std::mem::swap(&mut self.page_buffer, &mut to_send);
self.sender.send(to_send)?
}
Ok(())
}
}

impl std::io::Write for BufferStreamW {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
if self.0.send(buf.to_vec()).is_err() {
return Err(std::io::Error::new(
std::io::ErrorKind::NotConnected,
"opus stream writer error".to_string(),
));
};
self.page_buffer.extend_from_slice(buf);
Ok(buf.len())
}

Expand All @@ -234,14 +248,16 @@ impl StreamWriter {
let encoder =
opus::Encoder::new(sample_rate, opus::Channels::Mono, opus::Application::Voip)?;
let (tx, rx) = std::sync::mpsc::channel();
let mut pw = ogg::PacketWriter::new(BufferStreamW(tx));
let mut pw = ogg::PacketWriter::new(BufferStreamW::new(tx));
let out_encoded = vec![0u8; 50_000];
let mut head = Vec::new();
write_opus_header(&mut head, 1u8, sample_rate)?;
pw.write_packet(head, 42, ogg::PacketWriteEndInfo::EndPage, 0)?;
pw.inner_mut().on_end_of_packet()?;
let mut tags = Vec::new();
write_opus_tags(&mut tags)?;
pw.write_packet(tags, 42, ogg::PacketWriteEndInfo::EndPage, 0)?;
pw.inner_mut().on_end_of_packet()?;
Ok(Self { pw, encoder, out_encoded, total_data: 0, rx })
}

Expand All @@ -256,6 +272,7 @@ impl StreamWriter {
let msg = self.out_encoded[..size].to_vec();
self.total_data += pcm.len() as u64;
self.pw.write_packet(msg, 42, ogg::PacketWriteEndInfo::EndPage, self.total_data)?;
self.pw.inner_mut().on_end_of_packet()?;
Ok(())
}

Expand Down
Loading