diff --git a/python/src/reader.rs b/python/src/reader.rs index 73580d4..4535605 100644 --- a/python/src/reader.rs +++ b/python/src/reader.rs @@ -1,5 +1,4 @@ use std::ops::Range; -use std::sync::Arc; use async_tiff::error::{AsyncTiffError, AsyncTiffResult}; use async_tiff::reader::{AsyncFileReader, ObjectReader}; @@ -21,12 +20,12 @@ pub(crate) enum StoreInput { } impl StoreInput { - pub(crate) fn into_async_file_reader(self, path: String) -> Arc { + pub(crate) fn into_async_file_reader(self, path: String) -> Box { match self { Self::ObjectStore(store) => { - Arc::new(ObjectReader::new(store.into_inner(), path.into())) + Box::new(ObjectReader::new(store.into_inner(), path.into())) } - Self::ObspecBackend(backend) => Arc::new(ObspecReader { backend, path }), + Self::ObspecBackend(backend) => Box::new(ObspecReader { backend, path }), } } } @@ -115,12 +114,12 @@ struct ObspecReader { } impl AsyncFileReader for ObspecReader { - fn get_bytes(&self, range: Range) -> BoxFuture<'_, AsyncTiffResult> { + fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, AsyncTiffResult> { self.backend.get_range_wrapper(&self.path, range).boxed() } fn get_byte_ranges( - &self, + &mut self, ranges: Vec>, ) -> BoxFuture<'_, AsyncTiffResult>> { self.backend.get_ranges_wrapper(&self.path, ranges).boxed() diff --git a/python/src/tiff.rs b/python/src/tiff.rs index 82f1351..f89fd74 100644 --- a/python/src/tiff.rs +++ b/python/src/tiff.rs @@ -15,7 +15,7 @@ use crate::PyImageFileDirectory; #[pyclass(name = "TIFF", frozen)] pub(crate) struct PyTIFF { tiff: TIFF, - reader: Arc, + reader: Box, } #[pymethods] @@ -29,13 +29,15 @@ impl PyTIFF { store: StoreInput, prefetch: u64, ) -> PyResult> { - let reader = store.into_async_file_reader(path); + let mut reader = store.into_async_file_reader(path); let cog_reader = future_into_py(py, async move { - let metadata_fetch = PrefetchBuffer::new(reader.clone(), prefetch).await.unwrap(); - let mut metadata_reader = TiffMetadataReader::try_open(&metadata_fetch).await.unwrap(); + let mut metadata_fetch = PrefetchBuffer::new(&mut reader, prefetch).await.unwrap(); + let mut metadata_reader = TiffMetadataReader::try_open(&mut metadata_fetch) + .await + .unwrap(); let ifds = metadata_reader - .read_all_ifds(&metadata_fetch) + .read_all_ifds(&mut metadata_fetch) .await .unwrap(); let tiff = TIFF::new(ifds); @@ -57,7 +59,7 @@ impl PyTIFF { y: usize, z: usize, ) -> PyResult> { - let reader = self.reader.clone(); + let reader = &self.reader; let ifd = self .tiff .ifds() @@ -67,7 +69,7 @@ impl PyTIFF { // TODO: avoid this clone; add Arc to underlying rust code? .clone(); future_into_py(py, async move { - let tile = ifd.fetch_tile(x, y, reader.as_ref()).await.unwrap(); + let tile = ifd.fetch_tile(x, y, reader.as_mut()).await.unwrap(); Ok(PyTile::new(tile)) }) } diff --git a/src/cog.rs b/src/cog.rs index aee53a0..e955322 100644 --- a/src/cog.rs +++ b/src/cog.rs @@ -25,7 +25,7 @@ mod test { use crate::decoder::DecoderRegistry; use crate::metadata::{PrefetchBuffer, TiffMetadataReader}; - use crate::reader::{AsyncFileReader, ObjectReader}; + use crate::reader::ObjectReader; use super::*; use object_store::local::LocalFileSystem; @@ -37,22 +37,21 @@ mod test { let folder = "/Users/kyle/github/developmentseed/async-tiff/"; let path = object_store::path::Path::parse("m_4007307_sw_18_060_20220803.tif").unwrap(); let store = Arc::new(LocalFileSystem::new_with_prefix(folder).unwrap()); - let reader = Arc::new(ObjectReader::new(store, path)) as Arc; - let prefetch_reader = PrefetchBuffer::new(reader.clone(), 32 * 1024) - .await - .unwrap(); - let mut metadata_reader = TiffMetadataReader::try_open(&prefetch_reader) + let mut reader = ObjectReader::new(store, path); + let mut prefetch_reader = PrefetchBuffer::new(&mut reader, 32 * 1024).await.unwrap(); + + let mut metadata_reader = TiffMetadataReader::try_open(&mut prefetch_reader) .await .unwrap(); let ifds = metadata_reader - .read_all_ifds(&prefetch_reader) + .read_all_ifds(&mut prefetch_reader) .await .unwrap(); let tiff = TIFF::new(ifds); let ifd = &tiff.ifds[1]; let decoder_registry = DecoderRegistry::default(); - let tile = ifd.fetch_tile(0, 0, reader.as_ref()).await.unwrap(); + let tile = ifd.fetch_tile(0, 0, &mut reader).await.unwrap(); let tile = tile.decode(&decoder_registry).unwrap(); std::fs::write("img.buf", tile).unwrap(); } diff --git a/src/ifd.rs b/src/ifd.rs index ef91ed5..cb57c56 100644 --- a/src/ifd.rs +++ b/src/ifd.rs @@ -680,7 +680,7 @@ impl ImageFileDirectory { &self, x: usize, y: usize, - reader: &dyn AsyncFileReader, + reader: &mut dyn AsyncFileReader, ) -> AsyncTiffResult { let range = self .get_tile_byte_range(x, y) @@ -701,7 +701,7 @@ impl ImageFileDirectory { &self, x: &[usize], y: &[usize], - reader: &dyn AsyncFileReader, + reader: &mut dyn AsyncFileReader, ) -> AsyncTiffResult> { assert_eq!(x.len(), y.len(), "x and y should have same len"); diff --git a/src/metadata/fetch.rs b/src/metadata/fetch.rs index 126b3ae..546f0a3 100644 --- a/src/metadata/fetch.rs +++ b/src/metadata/fetch.rs @@ -1,6 +1,6 @@ use std::ops::Range; -use bytes::Bytes; +use bytes::{Bytes, BytesMut}; use futures::future::BoxFuture; use futures::FutureExt; @@ -16,11 +16,11 @@ pub trait MetadataFetch { /// /// Note the returned type is a boxed future, often created by /// [futures::FutureExt::boxed]. See the trait documentation for an example. - fn fetch(&self, range: Range) -> BoxFuture<'_, AsyncTiffResult>; + fn fetch(&mut self, range: Range) -> BoxFuture<'_, AsyncTiffResult>; } impl MetadataFetch for T { - fn fetch(&self, range: Range) -> BoxFuture<'_, AsyncTiffResult> { + fn fetch(&mut self, range: Range) -> BoxFuture<'_, AsyncTiffResult> { self.get_bytes(range) } } @@ -30,44 +30,117 @@ impl MetadataFetch for T { /// This is designed so that the async requests made by the underlying tag reader get intercepted /// here and served from the existing buffer when possible. #[derive(Debug)] -pub struct PrefetchBuffer { - fetch: F, - buffer: Bytes, +pub struct PrefetchBuffer<'a, F: MetadataFetch + Send + Sync> { + fetch: &'a mut F, + /// Invariant: buffers are monotonically increasing buffers starting at the beginning of the + /// file + buffers: Vec, + /// The exponent used for deciding how much more data to fetch on overflow of the existing + /// buffer. + /// + /// buffer_length ^ fetch_exponent + overflow_fetch_exponent: f64, } -impl PrefetchBuffer { +impl<'a, F: MetadataFetch + Send + Sync> PrefetchBuffer<'a, F> { /// Construct a new PrefetchBuffer, catching the first `prefetch` bytes of the file. - pub async fn new(fetch: F, prefetch: u64) -> AsyncTiffResult { + pub async fn new(fetch: &'a mut F, prefetch: u64) -> AsyncTiffResult { let buffer = fetch.fetch(0..prefetch).await?; - Ok(Self { fetch, buffer }) + Ok(Self { + fetch, + buffers: vec![buffer], + overflow_fetch_exponent: 1.5, + }) + } + + /// Set the exponent used for deciding how much more data to fetch on overflow of the existing + /// buffer. + pub fn with_overflow_fetch_exponent(self, overflow_fetch_exponent: f64) -> Self { + Self { + overflow_fetch_exponent, + ..self + } + } + + /// Expand the length of buffers that have been pre-fetched + /// + /// Returns the desired range and adds it to the cached buffers. + async fn expand_prefetch(&mut self, range: Range) -> AsyncTiffResult { + let existing_buffer_length = self.buffer_length() as u64; + let additional_fetch = + (existing_buffer_length as f64).powf(self.overflow_fetch_exponent) as u64; + + // Make sure that we fetch at least the entire desired range + let new_range = + existing_buffer_length..range.end.max(existing_buffer_length + additional_fetch); + let buffer = self.fetch.fetch(new_range).await?; + self.buffers.push(buffer); + + // Now extract the desired slice range + Ok(self.buffer_slice(range)) } -} -impl MetadataFetch for PrefetchBuffer { - fn fetch(&self, range: Range) -> BoxFuture<'_, AsyncTiffResult> { - if range.start < self.buffer.len() as _ { - if range.end < self.buffer.len() as _ { - let usize_range = range.start as usize..range.end as usize; - let result = self.buffer.slice(usize_range); - async { Ok(result) }.boxed() - } else { - // TODO: reuse partial internal buffer - self.fetch.fetch(range) + /// The length of all cached buffers + fn buffer_length(&self) -> usize { + self.buffers.iter().fold(0, |acc, x| acc + x.len()) + } + + /// Access the buffer range out of the cached buffers + /// + /// ## Panics + /// + /// If the range does not fall completely within the pre-cached buffers. + fn buffer_slice(&self, range: Range) -> Bytes { + // Slices of the underlying cached buffers + let mut output_buffers: Vec = vec![]; + + // A counter that describes the global start of the currently-iterated `buf` + let mut global_byte_offset: u64 = 0; + + for buf in self.buffers.iter() { + // Subtract off the global_byte_offset and then see if it overlaps the current buffer + let local_range = + range.start.saturating_sub(global_byte_offset)..range.end - global_byte_offset; + + if ranges_overlap(&local_range, &(0..buf.len() as u64)) { + let start = local_range.start as usize; + let end = (local_range.end as usize).min(buf.len()); + output_buffers.push(buf.slice(start..end)); + } + + global_byte_offset += buf.len() as u64; + } + + if output_buffers.len() == 1 { + output_buffers.into_iter().next().unwrap() + } else { + let mut result = BytesMut::with_capacity(range.end as usize - range.start as usize); + for output_buf in output_buffers.into_iter() { + result.extend_from_slice(&output_buf); } + result.freeze() + } + } +} + +impl MetadataFetch for PrefetchBuffer<'_, F> { + fn fetch(&mut self, range: Range) -> BoxFuture<'_, AsyncTiffResult> { + if range.end <= self.buffer_length() as _ { + async { Ok(self.buffer_slice(range)) }.boxed() } else { - self.fetch.fetch(range) + self.expand_prefetch(range).boxed() } } } pub(crate) struct MetadataCursor<'a, F: MetadataFetch> { - fetch: &'a F, + fetch: &'a mut F, offset: u64, endianness: Endianness, } impl<'a, F: MetadataFetch> MetadataCursor<'a, F> { - pub fn new(fetch: &'a F, endianness: Endianness) -> Self { + pub fn new(fetch: &'a mut F, endianness: Endianness) -> Self { Self { fetch, offset: 0, @@ -75,7 +148,7 @@ impl<'a, F: MetadataFetch> MetadataCursor<'a, F> { } } - pub fn new_with_offset(fetch: &'a F, endianness: Endianness, offset: u64) -> Self { + pub fn new_with_offset(fetch: &'a mut F, endianness: Endianness, offset: u64) -> Self { Self { fetch, offset, @@ -153,3 +226,76 @@ impl<'a, F: MetadataFetch> MetadataCursor<'a, F> { self.read(8).await?.read_f64() } } + +// https://stackoverflow.com/a/12888920 +fn ranges_overlap(r1: &Range, r2: &Range) -> bool { + r1.start.max(r2.start) <= r1.end.min(r2.end) +} + +#[cfg(test)] +mod test { + use super::*; + + #[derive(Debug)] + struct TestAsyncFileReader { + buffer: Bytes, + } + + impl TestAsyncFileReader { + async fn get_range(&self, range: Range) -> AsyncTiffResult { + assert!(range.start < self.buffer.len() as _); + let end = range.end.min(self.buffer.len() as _); + Ok(self.buffer.slice(range.start as usize..end as usize)) + } + } + + impl MetadataFetch for TestAsyncFileReader { + fn fetch(&mut self, range: Range) -> BoxFuture<'_, AsyncTiffResult> { + self.get_range(range).boxed() + } + } + + #[tokio::test] + async fn test_prefetch_overflow() { + let underlying_buffer = b"abcdefghijklmno"; + let mut reader = TestAsyncFileReader { + buffer: Bytes::from_static(underlying_buffer), + }; + let mut prefetch_reader = PrefetchBuffer::new(&mut reader, 5) + .await + .unwrap() + .with_overflow_fetch_exponent(1.); + + // Cached + assert_eq!(prefetch_reader.fetch(0..3).await.unwrap().as_ref(), b"abc"); + + // Cached + assert_eq!( + prefetch_reader.fetch(0..5).await.unwrap().as_ref(), + b"abcde" + ); + + // Expand fetch + assert_eq!( + prefetch_reader.fetch(0..10).await.unwrap().as_ref(), + b"abcdefghij" + ); + + // Cached + assert_eq!( + prefetch_reader.fetch(0..10).await.unwrap().as_ref(), + b"abcdefghij" + ); + + // Cached + assert_eq!( + prefetch_reader.fetch(0..15).await.unwrap().as_ref(), + underlying_buffer + ); + + // Assert underlying buffers were cached + assert_eq!(prefetch_reader.buffers[0].as_ref(), b"abcde"); + assert_eq!(prefetch_reader.buffers[1].as_ref(), b"fghij"); + assert_eq!(prefetch_reader.buffers[2].as_ref(), b"klmno"); + } +} diff --git a/src/metadata/reader.rs b/src/metadata/reader.rs index cc02322..9414004 100644 --- a/src/metadata/reader.rs +++ b/src/metadata/reader.rs @@ -31,7 +31,7 @@ impl TiffMetadataReader { /// the bigtiff flag. /// /// This does not read any IFD metadata. - pub async fn try_open(fetch: &F) -> AsyncTiffResult { + pub async fn try_open(fetch: &mut F) -> AsyncTiffResult { let magic_bytes = fetch.fetch(0..2).await?; // Should be b"II" for little endian or b"MM" for big endian @@ -109,7 +109,7 @@ impl TiffMetadataReader { /// If there are no more IFDs, returns `None`. pub async fn read_next_ifd( &mut self, - fetch: &F, + fetch: &mut F, ) -> AsyncTiffResult> { if let Some(ifd_start) = self.next_ifd_offset { let ifd_reader = @@ -127,7 +127,7 @@ impl TiffMetadataReader { /// Read all IFDs from the file. pub async fn read_all_ifds( &mut self, - fetch: &F, + fetch: &mut F, ) -> AsyncTiffResult> { let mut ifds = vec![]; while let Some(ifd) = self.read_next_ifd(fetch).await? { @@ -162,7 +162,7 @@ pub struct ImageFileDirectoryReader { impl ImageFileDirectoryReader { /// Read and parse the IFD starting at the given file offset pub async fn open( - fetch: &F, + fetch: &mut F, ifd_start_offset: u64, bigtiff: bool, endianness: Endianness, @@ -205,7 +205,7 @@ impl ImageFileDirectoryReader { /// [`ImageFileDirectory::from_tags`] on the resulting collection of tags. pub async fn read_tag( &self, - fetch: &F, + fetch: &mut F, tag_idx: u64, ) -> AsyncTiffResult<(Tag, Value)> { assert!(tag_idx < self.tag_count); @@ -220,7 +220,10 @@ impl ImageFileDirectoryReader { /// /// Keep in mind that you'll still need to call [`finish`][Self::finish] to get the byte offset /// of the next IFD. - pub async fn read(&self, fetch: &F) -> AsyncTiffResult { + pub async fn read( + &self, + fetch: &mut F, + ) -> AsyncTiffResult { let mut tags = HashMap::with_capacity(self.tag_count as usize); for tag_idx in 0..self.tag_count { let (tag, value) = self.read_tag(fetch, tag_idx).await?; @@ -230,7 +233,7 @@ impl ImageFileDirectoryReader { } /// Finish this reader, reading the byte offset of the next IFD - pub async fn finish(self, fetch: &F) -> AsyncTiffResult> { + pub async fn finish(self, fetch: &mut F) -> AsyncTiffResult> { // The byte offset for reading the next ifd let next_ifd_byte_offset = self.ifd_start_offset + self.tag_count_byte_size @@ -255,7 +258,7 @@ impl ImageFileDirectoryReader { /// Read a single tag from the cursor async fn read_tag( - fetch: &F, + fetch: &mut F, tag_offset: u64, endianness: Endianness, bigtiff: bool, diff --git a/src/reader.rs b/src/reader.rs index 06520ec..5b9f86e 100644 --- a/src/reader.rs +++ b/src/reader.rs @@ -33,12 +33,12 @@ pub trait AsyncFileReader: Debug + Send + Sync { /// Retrieve the bytes in `range` as part of a request for image data, not header metadata. /// /// This is also used as the default implementation of [`MetadataFetch`] if not overridden. - fn get_bytes(&self, range: Range) -> BoxFuture<'_, AsyncTiffResult>; + fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, AsyncTiffResult>; /// Retrieve multiple byte ranges as part of a request for image data, not header metadata. The /// default implementation will call `get_bytes` sequentially fn get_byte_ranges( - &self, + &mut self, ranges: Vec>, ) -> BoxFuture<'_, AsyncTiffResult>> { async move { @@ -57,83 +57,49 @@ pub trait AsyncFileReader: Debug + Send + Sync { /// This allows Box to be used as an AsyncFileReader, impl AsyncFileReader for Box { - fn get_bytes(&self, range: Range) -> BoxFuture<'_, AsyncTiffResult> { - self.as_ref().get_bytes(range) + fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, AsyncTiffResult> { + self.as_mut().get_bytes(range) } fn get_byte_ranges( - &self, + &mut self, ranges: Vec>, ) -> BoxFuture<'_, AsyncTiffResult>> { - self.as_ref().get_byte_ranges(ranges) + self.as_mut().get_byte_ranges(ranges) } } -/// This allows Arc to be used as an AsyncFileReader, -impl AsyncFileReader for Arc { - fn get_bytes(&self, range: Range) -> BoxFuture<'_, AsyncTiffResult> { - self.as_ref().get_bytes(range) - } - - fn get_byte_ranges( - &self, - ranges: Vec>, - ) -> BoxFuture<'_, AsyncTiffResult>> { - self.as_ref().get_byte_ranges(ranges) - } -} - -/// A wrapper for things that implement [AsyncRead] and [AsyncSeek] to also implement -/// [AsyncFileReader]. -/// -/// This wrapper is needed because `AsyncRead` and `AsyncSeek` require mutable access to seek and -/// read data, while the `AsyncFileReader` trait requires immutable access to read data. -/// -/// This wrapper stores the inner reader in a `Mutex`. -/// -/// [AsyncRead]: tokio::io::AsyncRead -/// [AsyncSeek]: tokio::io::AsyncSeek #[cfg(feature = "tokio")] -#[derive(Debug)] -pub struct TokioReader( - tokio::sync::Mutex, -); - -#[cfg(feature = "tokio")] -impl TokioReader { - /// Create a new TokioReader from a reader. - pub fn new(inner: T) -> Self { - Self(tokio::sync::Mutex::new(inner)) - } - - async fn make_range_request(&self, range: Range) -> AsyncTiffResult { - use std::io::SeekFrom; - use tokio::io::{AsyncReadExt, AsyncSeekExt}; - - use crate::error::AsyncTiffError; - - let mut file = self.0.lock().await; - - file.seek(SeekFrom::Start(range.start)).await?; - - let to_read = range.end - range.start; - let mut buffer = Vec::with_capacity(to_read as usize); - let read = file.read(&mut buffer).await? as u64; - if read != to_read { - return Err(AsyncTiffError::EndOfFile(to_read, read)); - } - - Ok(buffer.into()) +impl AsyncFileReader + for T +{ + fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, AsyncTiffResult> { + make_tokio_range_request(self, range).boxed() } } #[cfg(feature = "tokio")] -impl AsyncFileReader - for TokioReader -{ - fn get_bytes(&self, range: Range) -> BoxFuture<'_, AsyncTiffResult> { - self.make_range_request(range).boxed() +async fn make_tokio_range_request< + T: tokio::io::AsyncRead + tokio::io::AsyncSeek + Unpin + Send + Sync + Debug, +>( + file: &mut T, + range: Range, +) -> AsyncTiffResult { + use std::io::SeekFrom; + use tokio::io::{AsyncReadExt, AsyncSeekExt}; + + use crate::error::AsyncTiffError; + + file.seek(SeekFrom::Start(range.start)).await?; + + let to_read = range.end - range.start; + let mut buffer = Vec::with_capacity(to_read as usize); + let read = file.read(&mut buffer).await? as u64; + if read != to_read { + return Err(AsyncTiffError::EndOfFile(to_read, read)); } + + Ok(buffer.into()) } /// An AsyncFileReader that reads from an [`ObjectStore`] instance. @@ -164,11 +130,14 @@ impl ObjectReader { #[cfg(feature = "object_store")] impl AsyncFileReader for ObjectReader { - fn get_bytes(&self, range: Range) -> BoxFuture<'_, AsyncTiffResult> { + fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, AsyncTiffResult> { self.make_range_request(range).boxed() } - fn get_byte_ranges(&self, ranges: Vec>) -> BoxFuture<'_, AsyncTiffResult>> + fn get_byte_ranges( + &mut self, + ranges: Vec>, + ) -> BoxFuture<'_, AsyncTiffResult>> where Self: Send, { @@ -222,7 +191,7 @@ impl ReqwestReader { #[cfg(feature = "reqwest")] impl AsyncFileReader for ReqwestReader { - fn get_bytes(&self, range: Range) -> BoxFuture<'_, AsyncTiffResult> { + fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, AsyncTiffResult> { self.make_range_request(range) } } diff --git a/tests/image_tiff/util.rs b/tests/image_tiff/util.rs index 0b7e0c3..e4f9ace 100644 --- a/tests/image_tiff/util.rs +++ b/tests/image_tiff/util.rs @@ -2,7 +2,7 @@ use std::env::current_dir; use std::sync::Arc; use async_tiff::metadata::TiffMetadataReader; -use async_tiff::reader::{AsyncFileReader, ObjectReader}; +use async_tiff::reader::ObjectReader; use async_tiff::TIFF; use object_store::local::LocalFileSystem; @@ -11,9 +11,8 @@ const TEST_IMAGE_DIR: &str = "tests/image_tiff/images/"; pub(crate) async fn open_tiff(filename: &str) -> TIFF { let store = Arc::new(LocalFileSystem::new_with_prefix(current_dir().unwrap()).unwrap()); let path = format!("{TEST_IMAGE_DIR}/{filename}"); - let reader = Arc::new(ObjectReader::new(store.clone(), path.as_str().into())) - as Arc; - let mut metadata_reader = TiffMetadataReader::try_open(&reader).await.unwrap(); - let ifds = metadata_reader.read_all_ifds(&reader).await.unwrap(); + let mut reader = ObjectReader::new(store.clone(), path.as_str().into()); + let mut metadata_reader = TiffMetadataReader::try_open(&mut reader).await.unwrap(); + let ifds = metadata_reader.read_all_ifds(&mut reader).await.unwrap(); TIFF::new(ifds) }