Skip to content

Commit 4bfa953

Browse files
committed
Implement Reader for individual blobs.
1 parent 719cdb4 commit 4bfa953

File tree

2 files changed

+187
-0
lines changed

2 files changed

+187
-0
lines changed

src/api/blobs.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,11 @@ use n0_future::{future, stream, Stream, StreamExt};
2929
use quinn::SendStream;
3030
use range_collections::{range_set::RangeSetRange, RangeSet2};
3131
use ref_cast::RefCast;
32+
use serde::{Deserialize, Serialize};
3233
use tokio::io::AsyncWriteExt;
3334
use tracing::trace;
35+
mod reader;
36+
pub use reader::Reader;
3437

3538
// Public reexports from the proto module.
3639
//
@@ -102,6 +105,14 @@ impl Blobs {
102105
})
103106
}
104107

108+
pub fn reader(&self, hash: impl Into<Hash>) -> Reader {
109+
self.reader_with_opts(ReaderOptions { hash: hash.into() })
110+
}
111+
112+
pub fn reader_with_opts(&self, options: ReaderOptions) -> Reader {
113+
Reader::new(self.clone(), options)
114+
}
115+
105116
/// Delete a blob.
106117
///
107118
/// This function is not public, because it does not work as expected when called manually,
@@ -647,6 +658,12 @@ impl<'a> AddProgress<'a> {
647658
}
648659
}
649660

661+
/// Options for an async reader for blobs that supports AsyncRead and AsyncSeek.
662+
#[derive(Debug, Clone, Serialize, Deserialize)]
663+
pub struct ReaderOptions {
664+
pub hash: Hash,
665+
}
666+
650667
/// An observe result. Awaiting this will return the current state.
651668
///
652669
/// Calling [`ObserveProgress::stream`] will return a stream of updates, where

src/api/blobs/reader.rs

Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
use std::{
2+
io::{self, ErrorKind, SeekFrom},
3+
ops::DerefMut,
4+
sync::{Arc, Mutex},
5+
task::Poll,
6+
};
7+
8+
use n0_future::FutureExt;
9+
10+
use crate::api::{
11+
blobs::{Blobs, ReaderOptions},
12+
RequestResult,
13+
};
14+
15+
pub struct Reader {
16+
blobs: Blobs,
17+
options: ReaderOptions,
18+
state: Arc<Mutex<ReaderState>>,
19+
}
20+
21+
#[derive(Default)]
22+
enum ReaderState {
23+
Idle {
24+
position: u64,
25+
},
26+
Seeking {
27+
position: u64,
28+
},
29+
Reading {
30+
position: u64,
31+
op: n0_future::boxed::BoxFuture<RequestResult<Vec<u8>>>,
32+
},
33+
#[default]
34+
Poisoned,
35+
}
36+
37+
impl Reader {
38+
pub fn new(blobs: Blobs, options: ReaderOptions) -> Self {
39+
Self {
40+
blobs,
41+
options,
42+
state: Arc::new(Mutex::new(ReaderState::Idle { position: 0 })),
43+
}
44+
}
45+
}
46+
47+
impl tokio::io::AsyncRead for Reader {
48+
fn poll_read(
49+
self: std::pin::Pin<&mut Self>,
50+
cx: &mut std::task::Context<'_>,
51+
buf: &mut tokio::io::ReadBuf<'_>,
52+
) -> std::task::Poll<std::io::Result<()>> {
53+
let this = self.get_mut();
54+
match std::mem::take(this.state.lock().unwrap().deref_mut()) {
55+
ReaderState::Idle { position } => {
56+
// todo: read until next page boundary instead of fixed size
57+
let len = buf.remaining().min(1024 * 16);
58+
let end = position.checked_add(len as u64).ok_or_else(|| {
59+
io::Error::new(ErrorKind::InvalidInput, "Position overflow when reading")
60+
})?;
61+
let hash = this.options.hash;
62+
let blobs = this.blobs.clone();
63+
let ranges = position..end;
64+
let op = async move { blobs.export_ranges(hash, ranges).concatenate().await };
65+
*this.state.lock().unwrap() = ReaderState::Reading {
66+
position,
67+
op: Box::pin(op),
68+
};
69+
}
70+
ReaderState::Reading { position, mut op } => {
71+
match op.poll(cx) {
72+
Poll::Ready(Ok(data)) => {
73+
let len = data.len();
74+
if len > buf.remaining() {
75+
return Poll::Ready(Err(io::Error::new(
76+
ErrorKind::UnexpectedEof,
77+
"Read more data than buffer can hold",
78+
)));
79+
}
80+
buf.put_slice(&data);
81+
let position = position + len as u64;
82+
*this.state.lock().unwrap() = ReaderState::Idle { position };
83+
return Poll::Ready(Ok(()));
84+
}
85+
Poll::Ready(Err(e)) => {
86+
*this.state.lock().unwrap() = ReaderState::Idle { position };
87+
let e = io::Error::new(ErrorKind::Other, e.to_string());
88+
return Poll::Ready(Err(e));
89+
}
90+
Poll::Pending => {
91+
// Put back the state
92+
*this.state.lock().unwrap() = ReaderState::Reading {
93+
position,
94+
op: Box::pin(op),
95+
};
96+
return Poll::Pending;
97+
}
98+
}
99+
}
100+
state @ ReaderState::Seeking { .. } => {
101+
*this.state.lock().unwrap() = state;
102+
return Poll::Ready(Err(io::Error::new(
103+
ErrorKind::Other,
104+
"Can't read while seeking",
105+
)));
106+
}
107+
ReaderState::Poisoned => {
108+
return Poll::Ready(Err(io::Error::other("Reader is poisoned")));
109+
}
110+
};
111+
todo!()
112+
}
113+
}
114+
115+
impl tokio::io::AsyncSeek for Reader {
116+
fn start_seek(
117+
self: std::pin::Pin<&mut Self>,
118+
seek_from: tokio::io::SeekFrom,
119+
) -> std::io::Result<()> {
120+
let this = self.get_mut();
121+
match std::mem::take(this.state.lock().unwrap().deref_mut()) {
122+
ReaderState::Idle { position } => {
123+
let position1 = match seek_from {
124+
SeekFrom::Start(pos) => pos,
125+
SeekFrom::Current(offset) => {
126+
position.checked_add_signed(offset).ok_or_else(|| {
127+
io::Error::new(
128+
ErrorKind::InvalidInput,
129+
"Position overflow when seeking",
130+
)
131+
})?
132+
}
133+
SeekFrom::End(_offset) => {
134+
// todo: support seeking from end if we know the size
135+
return Err(io::Error::new(
136+
ErrorKind::InvalidInput,
137+
"Seeking from end is not supported yet",
138+
))?;
139+
}
140+
};
141+
*this.state.lock().unwrap() = ReaderState::Seeking {
142+
position: position1,
143+
};
144+
Ok(())
145+
}
146+
ReaderState::Reading { .. } => Err(io::Error::other("Can't seek while reading")),
147+
ReaderState::Seeking { .. } => Err(io::Error::other("Already seeking")),
148+
ReaderState::Poisoned => Err(io::Error::other("Reader is poisoned")),
149+
}
150+
}
151+
152+
fn poll_complete(
153+
self: std::pin::Pin<&mut Self>,
154+
_cx: &mut std::task::Context<'_>,
155+
) -> std::task::Poll<std::io::Result<u64>> {
156+
let this = self.get_mut();
157+
Poll::Ready(
158+
match std::mem::take(this.state.lock().unwrap().deref_mut()) {
159+
ReaderState::Seeking { position } => {
160+
// we only put the state back if we are in the right state
161+
*this.state.lock().unwrap() = ReaderState::Idle { position };
162+
Ok(position)
163+
}
164+
ReaderState::Idle { .. } => Err(io::Error::other("No seek operation in progress")),
165+
ReaderState::Reading { .. } => Err(io::Error::other("Can't seek while reading")),
166+
ReaderState::Poisoned => Err(io::Error::other("Reader is poisoned")),
167+
},
168+
)
169+
}
170+
}

0 commit comments

Comments
 (0)