From dba1a77e22c10c61b92ad7238cc7388e14ab6650 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Thu, 27 Mar 2025 17:23:29 -0400 Subject: [PATCH 1/8] Exponential overflow header buffer --- src/cog.rs | 10 +- src/ifd.rs | 12 +-- src/reader.rs | 225 +++++++++++++++++++++++---------------- tests/image_tiff/util.rs | 4 +- 4 files changed, 145 insertions(+), 106 deletions(-) diff --git a/src/cog.rs b/src/cog.rs index e9e3c5b..3abf59e 100644 --- a/src/cog.rs +++ b/src/cog.rs @@ -1,5 +1,3 @@ -use std::sync::Arc; - use crate::error::AsyncTiffResult; use crate::ifd::ImageFileDirectories; use crate::reader::{AsyncCursor, AsyncFileReader}; @@ -15,7 +13,7 @@ impl TIFF { /// Open a new TIFF file. /// /// This will read all the Image File Directories (IFDs) in the file. - pub async fn try_open(reader: Arc) -> AsyncTiffResult { + pub async fn try_open(reader: &mut dyn AsyncFileReader) -> AsyncTiffResult { let mut cursor = AsyncCursor::try_open_tiff(reader).await?; let version = cursor.read_u16().await?; @@ -74,13 +72,13 @@ mod test { let folder = "/Users/kyle/github/developmentseed/async-tiff/"; let path = object_store::path::Path::parse("m_4007307_sw_18_060_20220803.tif").unwrap(); let store = Arc::new(LocalFileSystem::new_with_prefix(folder).unwrap()); - let reader = Arc::new(ObjectReader::new(store, path)); + let mut reader = Box::new(ObjectReader::new(store, path)); - let cog_reader = TIFF::try_open(reader.clone()).await.unwrap(); + let cog_reader = TIFF::try_open(reader.as_mut()).await.unwrap(); let ifd = &cog_reader.ifds.as_ref()[1]; let decoder_registry = DecoderRegistry::default(); - let tile = ifd.fetch_tile(0, 0, reader.as_ref()).await.unwrap(); + let tile = ifd.fetch_tile(0, 0, reader.as_mut()).await.unwrap(); let tile = tile.decode(&decoder_registry).unwrap(); std::fs::write("img.buf", tile).unwrap(); } diff --git a/src/ifd.rs b/src/ifd.rs index 90f399f..89e2ad8 100644 --- a/src/ifd.rs +++ b/src/ifd.rs @@ -37,7 +37,7 @@ impl AsRef<[ImageFileDirectory]> for ImageFileDirectories { impl ImageFileDirectories { pub(crate) async fn open( - cursor: &mut AsyncCursor, + cursor: &mut AsyncCursor<'_>, ifd_offset: u64, bigtiff: bool, ) -> AsyncTiffResult { @@ -184,7 +184,7 @@ pub struct ImageFileDirectory { impl ImageFileDirectory { /// Read and parse the IFD starting at the given file offset async fn read( - cursor: &mut AsyncCursor, + cursor: &mut AsyncCursor<'_>, ifd_start: u64, bigtiff: bool, ) -> AsyncTiffResult { @@ -774,7 +774,7 @@ impl ImageFileDirectory { &self, x: usize, y: usize, - reader: &dyn AsyncFileReader, + reader: &mut dyn AsyncFileReader, ) -> AsyncTiffResult { let range = self .get_tile_byte_range(x, y) @@ -795,7 +795,7 @@ impl ImageFileDirectory { &self, x: &[usize], y: &[usize], - reader: &dyn AsyncFileReader, + reader: &mut dyn AsyncFileReader, ) -> AsyncTiffResult> { assert_eq!(x.len(), y.len(), "x and y should have same len"); @@ -838,7 +838,7 @@ impl ImageFileDirectory { } /// Read a single tag from the cursor -async fn read_tag(cursor: &mut AsyncCursor, bigtiff: bool) -> AsyncTiffResult<(Tag, Value)> { +async fn read_tag(cursor: &mut AsyncCursor<'_>, bigtiff: bool) -> AsyncTiffResult<(Tag, Value)> { let start_cursor_position = cursor.position(); let tag_name = Tag::from_u16_exhaustive(cursor.read_u16().await?); @@ -868,7 +868,7 @@ async fn read_tag(cursor: &mut AsyncCursor, bigtiff: bool) -> AsyncTiffResult<(T // This is derived from the upstream tiff crate: // https://github.com/image-rs/image-tiff/blob/6dc7a266d30291db1e706c8133357931f9e2a053/src/decoder/ifd.rs#L369-L639 async fn read_tag_value( - cursor: &mut AsyncCursor, + cursor: &mut AsyncCursor<'_>, tag_type: Type, count: u64, bigtiff: bool, diff --git a/src/reader.rs b/src/reader.rs index 6c3dcb5..e525c1c 100644 --- a/src/reader.rs +++ b/src/reader.rs @@ -7,7 +7,7 @@ use std::sync::Arc; use byteorder::{BigEndian, LittleEndian, ReadBytesExt}; use bytes::buf::Reader; -use bytes::{Buf, Bytes}; +use bytes::{Buf, Bytes, BytesMut}; use futures::future::{BoxFuture, FutureExt}; use futures::TryFutureExt; @@ -36,15 +36,15 @@ use crate::error::{AsyncTiffError, AsyncTiffResult}; /// [`tokio::fs::File`]: https://docs.rs/tokio/latest/tokio/fs/struct.File.html pub trait AsyncFileReader: Debug + Send + Sync { /// Retrieve the bytes in `range` as part of a request for header metadata. - fn get_metadata_bytes(&self, range: Range) -> BoxFuture<'_, AsyncTiffResult>; + fn get_metadata_bytes(&mut self, range: Range) -> BoxFuture<'_, AsyncTiffResult>; /// Retrieve the bytes in `range` as part of a request for image data, not header metadata. - fn get_image_bytes(&self, range: Range) -> BoxFuture<'_, AsyncTiffResult>; + fn get_image_bytes(&mut self, range: Range) -> BoxFuture<'_, AsyncTiffResult>; /// Retrieve multiple byte ranges as part of a request for image data, not header metadata. The /// default implementation will call `get_image_bytes` sequentially fn get_image_byte_ranges( - &self, + &mut self, ranges: Vec>, ) -> BoxFuture<'_, AsyncTiffResult>> { async move { @@ -63,75 +63,55 @@ pub trait AsyncFileReader: Debug + Send + Sync { /// This allows Box to be used as an AsyncFileReader, impl AsyncFileReader for Box { - fn get_metadata_bytes(&self, range: Range) -> BoxFuture<'_, AsyncTiffResult> { - self.as_ref().get_metadata_bytes(range) + fn get_metadata_bytes(&mut self, range: Range) -> BoxFuture<'_, AsyncTiffResult> { + self.as_mut().get_metadata_bytes(range) } - fn get_image_bytes(&self, range: Range) -> BoxFuture<'_, AsyncTiffResult> { - self.as_ref().get_image_bytes(range) + fn get_image_bytes(&mut self, range: Range) -> BoxFuture<'_, AsyncTiffResult> { + self.as_mut().get_image_bytes(range) } fn get_image_byte_ranges( - &self, + &mut self, ranges: Vec>, ) -> BoxFuture<'_, AsyncTiffResult>> { - self.as_ref().get_image_byte_ranges(ranges) + self.as_mut().get_image_byte_ranges(ranges) } } -/// A wrapper for things that implement [AsyncRead] and [AsyncSeek] to also implement -/// [AsyncFileReader]. -/// -/// This wrapper is needed because `AsyncRead` and `AsyncSeek` require mutable access to seek and -/// read data, while the `AsyncFileReader` trait requires immutable access to read data. -/// -/// This wrapper stores the inner reader in a `Mutex`. -/// -/// [AsyncRead]: tokio::io::AsyncRead -/// [AsyncSeek]: tokio::io::AsyncSeek #[cfg(feature = "tokio")] -#[derive(Debug)] -pub struct TokioReader( - tokio::sync::Mutex, -); - -#[cfg(feature = "tokio")] -impl TokioReader { - /// Create a new TokioReader from a reader. - pub fn new(inner: T) -> Self { - Self(tokio::sync::Mutex::new(inner)) +impl AsyncFileReader + for T +{ + fn get_metadata_bytes(&mut self, range: Range) -> BoxFuture<'_, AsyncTiffResult> { + make_tokio_range_request(self, range).boxed() } - async fn make_range_request(&self, range: Range) -> AsyncTiffResult { - use std::io::SeekFrom; - use tokio::io::{AsyncReadExt, AsyncSeekExt}; - - let mut file = self.0.lock().await; - - file.seek(SeekFrom::Start(range.start)).await?; - - let to_read = range.end - range.start; - let mut buffer = Vec::with_capacity(to_read as usize); - let read = file.read(&mut buffer).await? as u64; - if read != to_read { - return Err(AsyncTiffError::EndOfFile(to_read, read)); - } - - Ok(buffer.into()) + fn get_image_bytes(&mut self, range: Range) -> BoxFuture<'_, AsyncTiffResult> { + make_tokio_range_request(self, range).boxed() } } #[cfg(feature = "tokio")] -impl AsyncFileReader - for TokioReader -{ - fn get_metadata_bytes(&self, range: Range) -> BoxFuture<'_, AsyncTiffResult> { - self.make_range_request(range).boxed() - } +async fn make_tokio_range_request< + T: tokio::io::AsyncRead + tokio::io::AsyncSeek + Unpin + Send + Sync + Debug, +>( + file: &mut T, + range: Range, +) -> AsyncTiffResult { + use std::io::SeekFrom; + use tokio::io::{AsyncReadExt, AsyncSeekExt}; - fn get_image_bytes(&self, range: Range) -> BoxFuture<'_, AsyncTiffResult> { - self.make_range_request(range).boxed() + file.seek(SeekFrom::Start(range.start)).await?; + + let to_read = range.end - range.start; + let mut buffer = Vec::with_capacity(to_read as usize); + let read = file.read(&mut buffer).await? as u64; + if read != to_read { + return Err(AsyncTiffError::EndOfFile(to_read, read)); } + + Ok(buffer.into()) } /// An AsyncFileReader that reads from an [`ObjectStore`] instance. @@ -162,16 +142,16 @@ impl ObjectReader { #[cfg(feature = "object_store")] impl AsyncFileReader for ObjectReader { - fn get_metadata_bytes(&self, range: Range) -> BoxFuture<'_, AsyncTiffResult> { + fn get_metadata_bytes(&mut self, range: Range) -> BoxFuture<'_, AsyncTiffResult> { self.make_range_request(range).boxed() } - fn get_image_bytes(&self, range: Range) -> BoxFuture<'_, AsyncTiffResult> { + fn get_image_bytes(&mut self, range: Range) -> BoxFuture<'_, AsyncTiffResult> { self.make_range_request(range).boxed() } fn get_image_byte_ranges( - &self, + &mut self, ranges: Vec>, ) -> BoxFuture<'_, AsyncTiffResult>> where @@ -227,11 +207,11 @@ impl ReqwestReader { #[cfg(feature = "reqwest")] impl AsyncFileReader for ReqwestReader { - fn get_metadata_bytes(&self, range: Range) -> BoxFuture<'_, AsyncTiffResult> { + fn get_metadata_bytes(&mut self, range: Range) -> BoxFuture<'_, AsyncTiffResult> { self.make_range_request(range) } - fn get_image_bytes(&self, range: Range) -> BoxFuture<'_, AsyncTiffResult> { + fn get_image_bytes(&mut self, range: Range) -> BoxFuture<'_, AsyncTiffResult> { self.make_range_request(range) } } @@ -239,42 +219,107 @@ impl AsyncFileReader for ReqwestReader { /// An AsyncFileReader that caches the first `prefetch` bytes of a file. #[derive(Debug)] pub struct PrefetchReader { - reader: Arc, - buffer: Bytes, + reader: Box, + /// Invariant: buffers are monotonically increasing buffers starting at the beginning of the + /// file + buffers: Vec, + /// The exponent used for deciding how much more data to fetch on overflow of the existing buffer. + /// + /// buffer_length ^ fetch_exponent + overflow_fetch_exponent: f64, } impl PrefetchReader { /// Construct a new PrefetchReader, catching the first `prefetch` bytes of the file. - pub async fn new(reader: Arc, prefetch: u64) -> AsyncTiffResult { + pub async fn new( + mut reader: Box, + prefetch: u64, + overflow_fetch_exponent: f64, + ) -> AsyncTiffResult { let buffer = reader.get_metadata_bytes(0..prefetch).await?; - Ok(Self { reader, buffer }) + Ok(Self { + reader, + buffers: vec![buffer], + overflow_fetch_exponent, + }) + } + + /// Expand the length of buffers that have been pre-fetched + /// + /// Returns the desired range and adds it to the cached buffers. + async fn expand_prefetch(&mut self, range: Range) -> AsyncTiffResult { + let existing_buffer_length = self.buffer_length() as u64; + let additional_fetch = + (existing_buffer_length as f64).powf(self.overflow_fetch_exponent) as u64; + + // Make sure that we fetch at least the entire desired range + let new_range = + existing_buffer_length..range.end.max(existing_buffer_length + additional_fetch); + let buffer = self.reader.get_metadata_bytes(new_range).await?; + self.buffers.push(buffer); + + // Now extract the desired slice range + Ok(self.buffer_slice(range)) + } + + /// The length of all cached buffers + fn buffer_length(&self) -> usize { + self.buffers.iter().fold(0, |acc, x| acc + x.len()) + } + + /// Access the buffer range out of the cached buffers + /// + /// ## Panics + /// + /// If the range does not fall completely within the pre-cached buffers. + fn buffer_slice(&self, range: Range) -> Bytes { + // Slices of the underlying cached buffers + let mut output_buffers: Vec = vec![]; + + // A counter that describes the global start of the currently-iterated `buf` + let mut global_byte_offset: u64 = 0; + + for buf in self.buffers.iter() { + // Does this buffer piece overlap with the overall range + let global_range = global_byte_offset..global_byte_offset + buf.len() as u64; + if ranges_overlap(&global_range, &range) { + let local_range = (range.start - global_byte_offset) as usize + ..((range.end - global_byte_offset) as usize).min(buf.len()); + output_buffers.push(buf.slice(local_range)); + } + + global_byte_offset += buf.len() as u64; + } + + if output_buffers.len() == 1 { + output_buffers.into_iter().next().unwrap() + } else { + let mut result = BytesMut::with_capacity(range.end as usize - range.start as usize); + for output_buf in output_buffers.into_iter() { + result.extend_from_slice(&output_buf); + } + result.freeze() + } } } impl AsyncFileReader for PrefetchReader { - fn get_metadata_bytes(&self, range: Range) -> BoxFuture<'_, AsyncTiffResult> { - if range.start < self.buffer.len() as _ { - if range.end < self.buffer.len() as _ { - let usize_range = range.start as usize..range.end as usize; - let result = self.buffer.slice(usize_range); - async { Ok(result) }.boxed() - } else { - // TODO: reuse partial internal buffer - self.reader.get_metadata_bytes(range) - } + fn get_metadata_bytes(&mut self, range: Range) -> BoxFuture<'_, AsyncTiffResult> { + if range.end < self.buffer_length() as _ { + async { Ok(self.buffer_slice(range)) }.boxed() } else { - self.reader.get_metadata_bytes(range) + self.expand_prefetch(range).boxed() } } - fn get_image_bytes(&self, range: Range) -> BoxFuture<'_, AsyncTiffResult> { + fn get_image_bytes(&mut self, range: Range) -> BoxFuture<'_, AsyncTiffResult> { // In practice, get_image_bytes is only used for fetching tiles, which are unlikely // to overlap a metadata prefetch. self.reader.get_image_bytes(range) } fn get_image_byte_ranges( - &self, + &mut self, ranges: Vec>, ) -> BoxFuture<'_, AsyncTiffResult>> where @@ -295,15 +340,15 @@ pub(crate) enum Endianness { /// A wrapper around an [ObjectStore] that provides a seek-oriented interface // TODO: in the future add buffering to this #[derive(Debug)] -pub(crate) struct AsyncCursor { - reader: Arc, +pub(crate) struct AsyncCursor<'a> { + reader: &'a mut dyn AsyncFileReader, offset: u64, endianness: Endianness, } -impl AsyncCursor { +impl<'a> AsyncCursor<'a> { /// Create a new AsyncCursor from a reader and endianness. - pub(crate) fn new(reader: Arc, endianness: Endianness) -> Self { + pub(crate) fn new(reader: &'a mut dyn AsyncFileReader, endianness: Endianness) -> Self { Self { reader, offset: 0, @@ -313,7 +358,9 @@ impl AsyncCursor { /// Create a new AsyncCursor for a TIFF file, automatically inferring endianness from the first /// two bytes. - pub(crate) async fn try_open_tiff(reader: Arc) -> AsyncTiffResult { + pub(crate) async fn try_open_tiff( + reader: &'a mut dyn AsyncFileReader, + ) -> AsyncTiffResult { // Initialize with little endianness and then set later let mut cursor = Self::new(reader, Endianness::LittleEndian); let magic_bytes = cursor.read(2).await?; @@ -333,12 +380,6 @@ impl AsyncCursor { Ok(cursor) } - /// Consume self and return the underlying [`AsyncFileReader`]. - #[allow(dead_code)] - pub(crate) fn into_inner(self) -> Arc { - self.reader - } - /// Read the given number of bytes, advancing the internal cursor state by the same amount. pub(crate) async fn read(&mut self, length: u64) -> AsyncTiffResult { let range = self.offset as _..(self.offset + length) as _; @@ -398,11 +439,6 @@ impl AsyncCursor { self.read(8).await?.read_f64() } - #[allow(dead_code)] - pub(crate) fn reader(&self) -> &Arc { - &self.reader - } - #[allow(dead_code)] pub(crate) fn endianness(&self) -> Endianness { self.endianness @@ -511,3 +547,8 @@ impl Read for EndianAwareReader { self.reader.read(buf) } } + +// https://stackoverflow.com/a/12888920 +fn ranges_overlap(r1: &Range, r2: &Range) -> bool { + r1.start.max(r2.start) <= r1.end.min(r2.end) +} diff --git a/tests/image_tiff/util.rs b/tests/image_tiff/util.rs index 834ccc7..bf7891f 100644 --- a/tests/image_tiff/util.rs +++ b/tests/image_tiff/util.rs @@ -10,6 +10,6 @@ const TEST_IMAGE_DIR: &str = "tests/image_tiff/images/"; pub(crate) async fn open_tiff(filename: &str) -> TIFF { let store = Arc::new(LocalFileSystem::new_with_prefix(current_dir().unwrap()).unwrap()); let path = format!("{TEST_IMAGE_DIR}/{filename}"); - let reader = Arc::new(ObjectReader::new(store.clone(), path.as_str().into())); - TIFF::try_open(reader).await.unwrap() + let mut reader = Box::new(ObjectReader::new(store.clone(), path.as_str().into())); + TIFF::try_open(reader.as_mut()).await.unwrap() } From de63504dea79f2b71cc8b0d47746dc80d9842602 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Thu, 27 Mar 2025 17:52:08 -0400 Subject: [PATCH 2/8] Add test for prefetch overflow --- src/reader.rs | 96 +++++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 89 insertions(+), 7 deletions(-) diff --git a/src/reader.rs b/src/reader.rs index e525c1c..e1967b3 100644 --- a/src/reader.rs +++ b/src/reader.rs @@ -280,12 +280,14 @@ impl PrefetchReader { let mut global_byte_offset: u64 = 0; for buf in self.buffers.iter() { - // Does this buffer piece overlap with the overall range - let global_range = global_byte_offset..global_byte_offset + buf.len() as u64; - if ranges_overlap(&global_range, &range) { - let local_range = (range.start - global_byte_offset) as usize - ..((range.end - global_byte_offset) as usize).min(buf.len()); - output_buffers.push(buf.slice(local_range)); + // Subtract off the global_byte_offset and then see if it overlaps the current buffer + let local_range = + range.start.saturating_sub(global_byte_offset)..range.end - global_byte_offset; + + if ranges_overlap(&local_range, &(0..buf.len() as u64)) { + let start = local_range.start as usize; + let end = (local_range.end as usize).min(buf.len()); + output_buffers.push(buf.slice(start..end)); } global_byte_offset += buf.len() as u64; @@ -305,7 +307,7 @@ impl PrefetchReader { impl AsyncFileReader for PrefetchReader { fn get_metadata_bytes(&mut self, range: Range) -> BoxFuture<'_, AsyncTiffResult> { - if range.end < self.buffer_length() as _ { + if range.end <= self.buffer_length() as _ { async { Ok(self.buffer_slice(range)) }.boxed() } else { self.expand_prefetch(range).boxed() @@ -552,3 +554,83 @@ impl Read for EndianAwareReader { fn ranges_overlap(r1: &Range, r2: &Range) -> bool { r1.start.max(r2.start) <= r1.end.min(r2.end) } + +#[cfg(test)] +mod test { + use super::*; + + #[derive(Debug)] + struct TestAsyncFileReader { + buffer: Bytes, + } + + impl TestAsyncFileReader { + async fn get_range(&self, range: Range) -> AsyncTiffResult { + assert!(range.start < self.buffer.len() as _); + let end = range.end.min(self.buffer.len() as _); + Ok(self.buffer.slice(range.start as usize..end as usize)) + } + } + + impl AsyncFileReader for TestAsyncFileReader { + fn get_metadata_bytes( + &mut self, + range: Range, + ) -> BoxFuture<'_, AsyncTiffResult> { + self.get_range(range).boxed() + } + + fn get_image_bytes(&mut self, range: Range) -> BoxFuture<'_, AsyncTiffResult> { + self.get_range(range).boxed() + } + } + + #[tokio::test] + async fn test_prefetch_overflow() { + let underlying_buffer = b"abcdefghijklmno"; + let reader = TestAsyncFileReader { + buffer: Bytes::from_static(underlying_buffer), + }; + let mut prefetch_reader = PrefetchReader::new(Box::new(reader), 5, 1.5).await.unwrap(); + + // Cached + assert_eq!( + prefetch_reader + .get_metadata_bytes(0..3) + .await + .unwrap() + .as_ref(), + b"abc" + ); + + // Cached + assert_eq!( + prefetch_reader + .get_metadata_bytes(0..5) + .await + .unwrap() + .as_ref(), + b"abcde" + ); + + // Expand fetch + assert_eq!( + prefetch_reader + .get_metadata_bytes(0..10) + .await + .unwrap() + .as_ref(), + b"abcdefghij" + ); + + // Cached + assert_eq!( + prefetch_reader + .get_metadata_bytes(0..10) + .await + .unwrap() + .as_ref(), + b"abcdefghij" + ); + } +} From 71ab6a06a1f52231ebaf0742dc8957225b2393f1 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Thu, 27 Mar 2025 17:59:40 -0400 Subject: [PATCH 3/8] update test --- src/reader.rs | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/src/reader.rs b/src/reader.rs index e1967b3..e9ae34a 100644 --- a/src/reader.rs +++ b/src/reader.rs @@ -591,7 +591,7 @@ mod test { let reader = TestAsyncFileReader { buffer: Bytes::from_static(underlying_buffer), }; - let mut prefetch_reader = PrefetchReader::new(Box::new(reader), 5, 1.5).await.unwrap(); + let mut prefetch_reader = PrefetchReader::new(Box::new(reader), 5, 1.1).await.unwrap(); // Cached assert_eq!( @@ -632,5 +632,15 @@ mod test { .as_ref(), b"abcdefghij" ); + + // Cached + assert_eq!( + prefetch_reader + .get_metadata_bytes(0..15) + .await + .unwrap() + .as_ref(), + underlying_buffer + ); } } From 860155f04c6ba2e1c481b13ebb65940d3de4da10 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Thu, 27 Mar 2025 18:01:04 -0400 Subject: [PATCH 4/8] assert cached --- src/reader.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/reader.rs b/src/reader.rs index e9ae34a..483173d 100644 --- a/src/reader.rs +++ b/src/reader.rs @@ -591,7 +591,7 @@ mod test { let reader = TestAsyncFileReader { buffer: Bytes::from_static(underlying_buffer), }; - let mut prefetch_reader = PrefetchReader::new(Box::new(reader), 5, 1.1).await.unwrap(); + let mut prefetch_reader = PrefetchReader::new(Box::new(reader), 5, 1.).await.unwrap(); // Cached assert_eq!( @@ -642,5 +642,10 @@ mod test { .as_ref(), underlying_buffer ); + + // Assert underlying buffers were cached + assert_eq!(prefetch_reader.buffers[0].as_ref(), b"abcde"); + assert_eq!(prefetch_reader.buffers[1].as_ref(), b"fghij"); + assert_eq!(prefetch_reader.buffers[2].as_ref(), b"klmno"); } } From 351b12e874effe0c2f705be35d03f0ba68d5af5a Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Fri, 28 Mar 2025 17:21:27 -0400 Subject: [PATCH 5/8] cleanup --- src/cog.rs | 13 ++- src/ifd.rs | 2 +- src/metadata/fetch.rs | 185 +++++++++++++++++++++++++++----- src/metadata/reader.rs | 2 +- src/reader.rs | 237 +---------------------------------------- 5 files changed, 173 insertions(+), 266 deletions(-) diff --git a/src/cog.rs b/src/cog.rs index 0973f05..61f4890 100644 --- a/src/cog.rs +++ b/src/cog.rs @@ -1,7 +1,4 @@ -use crate::error::AsyncTiffResult; use crate::ifd::ImageFileDirectory; -use crate::reader::{AsyncCursor, AsyncFileReader}; -use crate::tiff::{TiffError, TiffFormatError}; /// A TIFF file. #[derive(Debug, Clone)] @@ -28,7 +25,7 @@ mod test { use crate::decoder::DecoderRegistry; use crate::metadata::{PrefetchBuffer, TiffMetadataReader}; - use crate::reader::{AsyncFileReader, ObjectReader}; + use crate::reader::ObjectReader; use super::*; use object_store::local::LocalFileSystem; @@ -41,9 +38,11 @@ mod test { let path = object_store::path::Path::parse("m_4007307_sw_18_060_20220803.tif").unwrap(); let store = Arc::new(LocalFileSystem::new_with_prefix(folder).unwrap()); let mut reader = ObjectReader::new(store, path); - let prefetch_reader = PrefetchBuffer::new(reader, 32 * 1024).await.unwrap(); + let mut prefetch_reader = PrefetchBuffer::new(&mut reader, 32 * 1024, 1.5) + .await + .unwrap(); - let mut metadata_reader = TiffMetadataReader::try_open(&prefetch_reader) + let mut metadata_reader = TiffMetadataReader::try_open(&mut prefetch_reader) .await .unwrap(); let ifds = metadata_reader @@ -54,7 +53,7 @@ mod test { let ifd = &tiff.ifds[1]; let decoder_registry = DecoderRegistry::default(); - let tile = ifd.fetch_tile(0, 0, reader.as_mut()).await.unwrap(); + let tile = ifd.fetch_tile(0, 0, &mut reader).await.unwrap(); let tile = tile.decode(&decoder_registry).unwrap(); std::fs::write("img.buf", tile).unwrap(); } diff --git a/src/ifd.rs b/src/ifd.rs index 83d91bb..e5d8512 100644 --- a/src/ifd.rs +++ b/src/ifd.rs @@ -9,7 +9,7 @@ use crate::geo::{GeoKeyDirectory, GeoKeyTag}; use crate::reader::AsyncFileReader; use crate::tiff::tags::{ CompressionMethod, PhotometricInterpretation, PlanarConfiguration, Predictor, ResolutionUnit, - SampleFormat, Tag, + SampleFormat, Tag, Type, }; use crate::tiff::{TiffError, Value}; use crate::tile::Tile; diff --git a/src/metadata/fetch.rs b/src/metadata/fetch.rs index 126b3ae..8a41845 100644 --- a/src/metadata/fetch.rs +++ b/src/metadata/fetch.rs @@ -1,6 +1,6 @@ use std::ops::Range; -use bytes::Bytes; +use bytes::{Bytes, BytesMut}; use futures::future::BoxFuture; use futures::FutureExt; @@ -16,11 +16,11 @@ pub trait MetadataFetch { /// /// Note the returned type is a boxed future, often created by /// [futures::FutureExt::boxed]. See the trait documentation for an example. - fn fetch(&self, range: Range) -> BoxFuture<'_, AsyncTiffResult>; + fn fetch(&mut self, range: Range) -> BoxFuture<'_, AsyncTiffResult>; } impl MetadataFetch for T { - fn fetch(&self, range: Range) -> BoxFuture<'_, AsyncTiffResult> { + fn fetch(&mut self, range: Range) -> BoxFuture<'_, AsyncTiffResult> { self.get_bytes(range) } } @@ -30,44 +30,111 @@ impl MetadataFetch for T { /// This is designed so that the async requests made by the underlying tag reader get intercepted /// here and served from the existing buffer when possible. #[derive(Debug)] -pub struct PrefetchBuffer { - fetch: F, - buffer: Bytes, +pub struct PrefetchBuffer<'a, F: MetadataFetch + Send + Sync> { + fetch: &'a mut F, + /// Invariant: buffers are monotonically increasing buffers starting at the beginning of the + /// file + buffers: Vec, + /// The exponent used for deciding how much more data to fetch on overflow of the existing buffer. + /// + /// buffer_length ^ fetch_exponent + overflow_fetch_exponent: f64, } -impl PrefetchBuffer { +impl<'a, F: MetadataFetch + Send + Sync> PrefetchBuffer<'a, F> { /// Construct a new PrefetchBuffer, catching the first `prefetch` bytes of the file. - pub async fn new(fetch: F, prefetch: u64) -> AsyncTiffResult { + pub async fn new( + fetch: &'a mut F, + prefetch: u64, + overflow_fetch_exponent: f64, + ) -> AsyncTiffResult { let buffer = fetch.fetch(0..prefetch).await?; - Ok(Self { fetch, buffer }) + Ok(Self { + fetch, + buffers: vec![buffer], + overflow_fetch_exponent, + }) } -} -impl MetadataFetch for PrefetchBuffer { - fn fetch(&self, range: Range) -> BoxFuture<'_, AsyncTiffResult> { - if range.start < self.buffer.len() as _ { - if range.end < self.buffer.len() as _ { - let usize_range = range.start as usize..range.end as usize; - let result = self.buffer.slice(usize_range); - async { Ok(result) }.boxed() - } else { - // TODO: reuse partial internal buffer - self.fetch.fetch(range) + /// Expand the length of buffers that have been pre-fetched + /// + /// Returns the desired range and adds it to the cached buffers. + async fn expand_prefetch(&mut self, range: Range) -> AsyncTiffResult { + let existing_buffer_length = self.buffer_length() as u64; + let additional_fetch = + (existing_buffer_length as f64).powf(self.overflow_fetch_exponent) as u64; + + // Make sure that we fetch at least the entire desired range + let new_range = + existing_buffer_length..range.end.max(existing_buffer_length + additional_fetch); + let buffer = self.fetch.fetch(new_range).await?; + self.buffers.push(buffer); + + // Now extract the desired slice range + Ok(self.buffer_slice(range)) + } + + /// The length of all cached buffers + fn buffer_length(&self) -> usize { + self.buffers.iter().fold(0, |acc, x| acc + x.len()) + } + + /// Access the buffer range out of the cached buffers + /// + /// ## Panics + /// + /// If the range does not fall completely within the pre-cached buffers. + fn buffer_slice(&self, range: Range) -> Bytes { + // Slices of the underlying cached buffers + let mut output_buffers: Vec = vec![]; + + // A counter that describes the global start of the currently-iterated `buf` + let mut global_byte_offset: u64 = 0; + + for buf in self.buffers.iter() { + // Subtract off the global_byte_offset and then see if it overlaps the current buffer + let local_range = + range.start.saturating_sub(global_byte_offset)..range.end - global_byte_offset; + + if ranges_overlap(&local_range, &(0..buf.len() as u64)) { + let start = local_range.start as usize; + let end = (local_range.end as usize).min(buf.len()); + output_buffers.push(buf.slice(start..end)); } + + global_byte_offset += buf.len() as u64; + } + + if output_buffers.len() == 1 { + output_buffers.into_iter().next().unwrap() + } else { + let mut result = BytesMut::with_capacity(range.end as usize - range.start as usize); + for output_buf in output_buffers.into_iter() { + result.extend_from_slice(&output_buf); + } + result.freeze() + } + } +} + +impl<'a, F: MetadataFetch + Send + Sync> MetadataFetch for PrefetchBuffer<'a, F> { + fn fetch(&mut self, range: Range) -> BoxFuture<'_, AsyncTiffResult> { + if range.end <= self.buffer_length() as _ { + async { Ok(self.buffer_slice(range)) }.boxed() } else { - self.fetch.fetch(range) + self.expand_prefetch(range).boxed() } } } pub(crate) struct MetadataCursor<'a, F: MetadataFetch> { - fetch: &'a F, + fetch: &'a mut F, offset: u64, endianness: Endianness, } impl<'a, F: MetadataFetch> MetadataCursor<'a, F> { - pub fn new(fetch: &'a F, endianness: Endianness) -> Self { + pub fn new(fetch: &'a mut F, endianness: Endianness) -> Self { Self { fetch, offset: 0, @@ -75,7 +142,7 @@ impl<'a, F: MetadataFetch> MetadataCursor<'a, F> { } } - pub fn new_with_offset(fetch: &'a F, endianness: Endianness, offset: u64) -> Self { + pub fn new_with_offset(fetch: &'a mut F, endianness: Endianness, offset: u64) -> Self { Self { fetch, offset, @@ -153,3 +220,73 @@ impl<'a, F: MetadataFetch> MetadataCursor<'a, F> { self.read(8).await?.read_f64() } } + +// https://stackoverflow.com/a/12888920 +fn ranges_overlap(r1: &Range, r2: &Range) -> bool { + r1.start.max(r2.start) <= r1.end.min(r2.end) +} + +#[cfg(test)] +mod test { + use super::*; + + #[derive(Debug)] + struct TestAsyncFileReader { + buffer: Bytes, + } + + impl TestAsyncFileReader { + async fn get_range(&self, range: Range) -> AsyncTiffResult { + assert!(range.start < self.buffer.len() as _); + let end = range.end.min(self.buffer.len() as _); + Ok(self.buffer.slice(range.start as usize..end as usize)) + } + } + + impl MetadataFetch for TestAsyncFileReader { + fn fetch(&mut self, range: Range) -> BoxFuture<'_, AsyncTiffResult> { + self.get_range(range).boxed() + } + } + + #[tokio::test] + async fn test_prefetch_overflow() { + let underlying_buffer = b"abcdefghijklmno"; + let mut reader = TestAsyncFileReader { + buffer: Bytes::from_static(underlying_buffer), + }; + let mut prefetch_reader = PrefetchBuffer::new(&mut reader, 5, 1.).await.unwrap(); + + // Cached + assert_eq!(prefetch_reader.fetch(0..3).await.unwrap().as_ref(), b"abc"); + + // Cached + assert_eq!( + prefetch_reader.fetch(0..5).await.unwrap().as_ref(), + b"abcde" + ); + + // Expand fetch + assert_eq!( + prefetch_reader.fetch(0..10).await.unwrap().as_ref(), + b"abcdefghij" + ); + + // Cached + assert_eq!( + prefetch_reader.fetch(0..10).await.unwrap().as_ref(), + b"abcdefghij" + ); + + // Cached + assert_eq!( + prefetch_reader.fetch(0..15).await.unwrap().as_ref(), + underlying_buffer + ); + + // Assert underlying buffers were cached + assert_eq!(prefetch_reader.buffers[0].as_ref(), b"abcde"); + assert_eq!(prefetch_reader.buffers[1].as_ref(), b"fghij"); + assert_eq!(prefetch_reader.buffers[2].as_ref(), b"klmno"); + } +} diff --git a/src/metadata/reader.rs b/src/metadata/reader.rs index cc02322..446cea2 100644 --- a/src/metadata/reader.rs +++ b/src/metadata/reader.rs @@ -31,7 +31,7 @@ impl TiffMetadataReader { /// the bigtiff flag. /// /// This does not read any IFD metadata. - pub async fn try_open(fetch: &F) -> AsyncTiffResult { + pub async fn try_open(fetch: &mut F) -> AsyncTiffResult { let magic_bytes = fetch.fetch(0..2).await?; // Should be b"II" for little endian or b"MM" for big endian diff --git a/src/reader.rs b/src/reader.rs index 8664379..5b9f86e 100644 --- a/src/reader.rs +++ b/src/reader.rs @@ -7,7 +7,7 @@ use std::sync::Arc; use byteorder::{BigEndian, LittleEndian, ReadBytesExt}; use bytes::buf::Reader; -use bytes::{Buf, Bytes, BytesMut}; +use bytes::{Buf, Bytes}; use futures::future::{BoxFuture, FutureExt}; use futures::TryFutureExt; @@ -69,26 +69,12 @@ impl AsyncFileReader for Box { } } -/// This allows Arc to be used as an AsyncFileReader, -impl AsyncFileReader for Arc { - fn get_bytes(&self, range: Range) -> BoxFuture<'_, AsyncTiffResult> { - self.as_ref().get_bytes(range) - } - - fn get_byte_ranges( - &self, - ranges: Vec>, - ) -> BoxFuture<'_, AsyncTiffResult>> { - self.as_ref().get_byte_ranges(ranges) - } -} - #[cfg(feature = "tokio")] impl AsyncFileReader for T { fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, AsyncTiffResult> { - self.make_range_request(range).boxed() + make_tokio_range_request(self, range).boxed() } } @@ -102,6 +88,8 @@ async fn make_tokio_range_request< use std::io::SeekFrom; use tokio::io::{AsyncReadExt, AsyncSeekExt}; + use crate::error::AsyncTiffError; + file.seek(SeekFrom::Start(range.start)).await?; let to_read = range.end - range.start; @@ -208,123 +196,6 @@ impl AsyncFileReader for ReqwestReader { } } -/// An AsyncFileReader that caches the first `prefetch` bytes of a file. -#[derive(Debug)] -pub struct PrefetchReader { - reader: Box, - /// Invariant: buffers are monotonically increasing buffers starting at the beginning of the - /// file - buffers: Vec, - /// The exponent used for deciding how much more data to fetch on overflow of the existing buffer. - /// - /// buffer_length ^ fetch_exponent - overflow_fetch_exponent: f64, -} - -impl PrefetchReader { - /// Construct a new PrefetchReader, catching the first `prefetch` bytes of the file. - pub async fn new( - mut reader: Box, - prefetch: u64, - overflow_fetch_exponent: f64, - ) -> AsyncTiffResult { - let buffer = reader.get_metadata_bytes(0..prefetch).await?; - Ok(Self { - reader, - buffers: vec![buffer], - overflow_fetch_exponent, - }) - } - - /// Expand the length of buffers that have been pre-fetched - /// - /// Returns the desired range and adds it to the cached buffers. - async fn expand_prefetch(&mut self, range: Range) -> AsyncTiffResult { - let existing_buffer_length = self.buffer_length() as u64; - let additional_fetch = - (existing_buffer_length as f64).powf(self.overflow_fetch_exponent) as u64; - - // Make sure that we fetch at least the entire desired range - let new_range = - existing_buffer_length..range.end.max(existing_buffer_length + additional_fetch); - let buffer = self.reader.get_metadata_bytes(new_range).await?; - self.buffers.push(buffer); - - // Now extract the desired slice range - Ok(self.buffer_slice(range)) - } - - /// The length of all cached buffers - fn buffer_length(&self) -> usize { - self.buffers.iter().fold(0, |acc, x| acc + x.len()) - } - - /// Access the buffer range out of the cached buffers - /// - /// ## Panics - /// - /// If the range does not fall completely within the pre-cached buffers. - fn buffer_slice(&self, range: Range) -> Bytes { - // Slices of the underlying cached buffers - let mut output_buffers: Vec = vec![]; - - // A counter that describes the global start of the currently-iterated `buf` - let mut global_byte_offset: u64 = 0; - - for buf in self.buffers.iter() { - // Subtract off the global_byte_offset and then see if it overlaps the current buffer - let local_range = - range.start.saturating_sub(global_byte_offset)..range.end - global_byte_offset; - - if ranges_overlap(&local_range, &(0..buf.len() as u64)) { - let start = local_range.start as usize; - let end = (local_range.end as usize).min(buf.len()); - output_buffers.push(buf.slice(start..end)); - } - - global_byte_offset += buf.len() as u64; - } - - if output_buffers.len() == 1 { - output_buffers.into_iter().next().unwrap() - } else { - let mut result = BytesMut::with_capacity(range.end as usize - range.start as usize); - for output_buf in output_buffers.into_iter() { - result.extend_from_slice(&output_buf); - } - result.freeze() - } - } -} - -impl AsyncFileReader for PrefetchReader { - fn get_metadata_bytes(&mut self, range: Range) -> BoxFuture<'_, AsyncTiffResult> { - if range.end <= self.buffer_length() as _ { - async { Ok(self.buffer_slice(range)) }.boxed() - } else { - self.expand_prefetch(range).boxed() - } - } - - fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, AsyncTiffResult> { - // In practice, get_image_bytes is only used for fetching tiles, which are unlikely - // to overlap a metadata prefetch. - self.reader.get_bytes(range) - } - - fn get_image_byte_ranges( - &mut self, - ranges: Vec>, - ) -> BoxFuture<'_, AsyncTiffResult>> - where - Self: Send, - { - // In practice, get_image_byte_ranges is only used for fetching tiles, which are unlikely - // to overlap a metadata prefetch. - self.reader.get_image_byte_ranges(ranges) - } -} - /// Endianness #[derive(Debug, Clone, Copy)] pub enum Endianness { @@ -430,103 +301,3 @@ impl Read for EndianAwareReader { self.reader.read(buf) } } - -// https://stackoverflow.com/a/12888920 -fn ranges_overlap(r1: &Range, r2: &Range) -> bool { - r1.start.max(r2.start) <= r1.end.min(r2.end) -} - -#[cfg(test)] -mod test { - use super::*; - - #[derive(Debug)] - struct TestAsyncFileReader { - buffer: Bytes, - } - - impl TestAsyncFileReader { - async fn get_range(&self, range: Range) -> AsyncTiffResult { - assert!(range.start < self.buffer.len() as _); - let end = range.end.min(self.buffer.len() as _); - Ok(self.buffer.slice(range.start as usize..end as usize)) - } - } - - impl AsyncFileReader for TestAsyncFileReader { - fn get_metadata_bytes( - &mut self, - range: Range, - ) -> BoxFuture<'_, AsyncTiffResult> { - self.get_range(range).boxed() - } - - fn get_image_bytes(&mut self, range: Range) -> BoxFuture<'_, AsyncTiffResult> { - self.get_range(range).boxed() - } - } - - #[tokio::test] - async fn test_prefetch_overflow() { - let underlying_buffer = b"abcdefghijklmno"; - let reader = TestAsyncFileReader { - buffer: Bytes::from_static(underlying_buffer), - }; - let mut prefetch_reader = PrefetchReader::new(Box::new(reader), 5, 1.).await.unwrap(); - - // Cached - assert_eq!( - prefetch_reader - .get_metadata_bytes(0..3) - .await - .unwrap() - .as_ref(), - b"abc" - ); - - // Cached - assert_eq!( - prefetch_reader - .get_metadata_bytes(0..5) - .await - .unwrap() - .as_ref(), - b"abcde" - ); - - // Expand fetch - assert_eq!( - prefetch_reader - .get_metadata_bytes(0..10) - .await - .unwrap() - .as_ref(), - b"abcdefghij" - ); - - // Cached - assert_eq!( - prefetch_reader - .get_metadata_bytes(0..10) - .await - .unwrap() - .as_ref(), - b"abcdefghij" - ); - - // Cached - assert_eq!( - prefetch_reader - .get_metadata_bytes(0..15) - .await - .unwrap() - .as_ref(), - underlying_buffer - ); - - // Assert underlying buffers were cached - assert_eq!(prefetch_reader.buffers[0].as_ref(), b"abcde"); - assert_eq!(prefetch_reader.buffers[1].as_ref(), b"fghij"); - assert_eq!(prefetch_reader.buffers[2].as_ref(), b"klmno"); - } -} From 7f6be9189af93b5851e542238d73937f87a260a8 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Fri, 28 Mar 2025 17:26:37 -0400 Subject: [PATCH 6/8] fix compile --- src/cog.rs | 2 +- src/ifd.rs | 369 +-------------------------------------- src/metadata/fetch.rs | 2 +- src/metadata/reader.rs | 17 +- tests/image_tiff/util.rs | 8 +- 5 files changed, 17 insertions(+), 381 deletions(-) diff --git a/src/cog.rs b/src/cog.rs index 61f4890..a942571 100644 --- a/src/cog.rs +++ b/src/cog.rs @@ -46,7 +46,7 @@ mod test { .await .unwrap(); let ifds = metadata_reader - .read_all_ifds(&prefetch_reader) + .read_all_ifds(&mut prefetch_reader) .await .unwrap(); let tiff = TIFF::new(ifds); diff --git a/src/ifd.rs b/src/ifd.rs index e5d8512..cb57c56 100644 --- a/src/ifd.rs +++ b/src/ifd.rs @@ -9,7 +9,7 @@ use crate::geo::{GeoKeyDirectory, GeoKeyTag}; use crate::reader::AsyncFileReader; use crate::tiff::tags::{ CompressionMethod, PhotometricInterpretation, PlanarConfiguration, Predictor, ResolutionUnit, - SampleFormat, Tag, Type, + SampleFormat, Tag, }; use crate::tiff::{TiffError, Value}; use crate::tile::Tile; @@ -742,370 +742,3 @@ impl ImageFileDirectory { Some((x_count as usize, y_count as usize)) } } - -/// Read a single tag from the cursor -async fn read_tag(cursor: &mut AsyncCursor<'_>, bigtiff: bool) -> AsyncTiffResult<(Tag, Value)> { - let start_cursor_position = cursor.position(); - - let tag_name = Tag::from_u16_exhaustive(cursor.read_u16().await?); - - let tag_type_code = cursor.read_u16().await?; - let tag_type = Type::from_u16(tag_type_code).expect( - "Unknown tag type {tag_type_code}. TODO: we should skip entries with unknown tag types.", - ); - let count = if bigtiff { - cursor.read_u64().await? - } else { - cursor.read_u32().await?.into() - }; - - let tag_value = read_tag_value(cursor, tag_type, count, bigtiff).await?; - - // TODO: better handle management of cursor state - let ifd_entry_size = if bigtiff { 20 } else { 12 }; - cursor.seek(start_cursor_position + ifd_entry_size); - - Ok((tag_name, tag_value)) -} - -/// Read a tag's value from the cursor -/// -/// NOTE: this does not maintain cursor state -// This is derived from the upstream tiff crate: -// https://github.com/image-rs/image-tiff/blob/6dc7a266d30291db1e706c8133357931f9e2a053/src/decoder/ifd.rs#L369-L639 -async fn read_tag_value( - cursor: &mut AsyncCursor<'_>, - tag_type: Type, - count: u64, - bigtiff: bool, -) -> AsyncTiffResult { - // Case 1: there are no values so we can return immediately. - if count == 0 { - return Ok(Value::List(vec![])); - } - - let tag_size = match tag_type { - Type::BYTE | Type::SBYTE | Type::ASCII | Type::UNDEFINED => 1, - Type::SHORT | Type::SSHORT => 2, - Type::LONG | Type::SLONG | Type::FLOAT | Type::IFD => 4, - Type::LONG8 - | Type::SLONG8 - | Type::DOUBLE - | Type::RATIONAL - | Type::SRATIONAL - | Type::IFD8 => 8, - }; - - let value_byte_length = count.checked_mul(tag_size).unwrap(); - - // Case 2: there is one value. - if count == 1 { - // 2a: the value is 5-8 bytes and we're in BigTiff mode. - if bigtiff && value_byte_length > 4 && value_byte_length <= 8 { - let mut data = cursor.read(value_byte_length).await?; - - return Ok(match tag_type { - Type::LONG8 => Value::UnsignedBig(data.read_u64()?), - Type::SLONG8 => Value::SignedBig(data.read_i64()?), - Type::DOUBLE => Value::Double(data.read_f64()?), - Type::RATIONAL => Value::Rational(data.read_u32()?, data.read_u32()?), - Type::SRATIONAL => Value::SRational(data.read_i32()?, data.read_i32()?), - Type::IFD8 => Value::IfdBig(data.read_u64()?), - Type::BYTE - | Type::SBYTE - | Type::ASCII - | Type::UNDEFINED - | Type::SHORT - | Type::SSHORT - | Type::LONG - | Type::SLONG - | Type::FLOAT - | Type::IFD => unreachable!(), - }); - } - - // NOTE: we should only be reading value_byte_length when it's 4 bytes or fewer. Right now - // we're reading even if it's 8 bytes, but then only using the first 4 bytes of this - // buffer. - let mut data = cursor.read(value_byte_length).await?; - - // 2b: the value is at most 4 bytes or doesn't fit in the offset field. - return Ok(match tag_type { - Type::BYTE | Type::UNDEFINED => Value::Byte(data.read_u8()?), - Type::SBYTE => Value::Signed(data.read_i8()? as i32), - Type::SHORT => Value::Short(data.read_u16()?), - Type::SSHORT => Value::Signed(data.read_i16()? as i32), - Type::LONG => Value::Unsigned(data.read_u32()?), - Type::SLONG => Value::Signed(data.read_i32()?), - Type::FLOAT => Value::Float(data.read_f32()?), - Type::ASCII => { - if data.as_ref()[0] == 0 { - Value::Ascii("".to_string()) - } else { - panic!("Invalid tag"); - // return Err(TiffError::FormatError(TiffFormatError::InvalidTag)); - } - } - Type::LONG8 => { - let offset = data.read_u32()?; - cursor.seek(offset as _); - Value::UnsignedBig(cursor.read_u64().await?) - } - Type::SLONG8 => { - let offset = data.read_u32()?; - cursor.seek(offset as _); - Value::SignedBig(cursor.read_i64().await?) - } - Type::DOUBLE => { - let offset = data.read_u32()?; - cursor.seek(offset as _); - Value::Double(cursor.read_f64().await?) - } - Type::RATIONAL => { - let offset = data.read_u32()?; - cursor.seek(offset as _); - let numerator = cursor.read_u32().await?; - let denominator = cursor.read_u32().await?; - Value::Rational(numerator, denominator) - } - Type::SRATIONAL => { - let offset = data.read_u32()?; - cursor.seek(offset as _); - let numerator = cursor.read_i32().await?; - let denominator = cursor.read_i32().await?; - Value::SRational(numerator, denominator) - } - Type::IFD => Value::Ifd(data.read_u32()?), - Type::IFD8 => { - let offset = data.read_u32()?; - cursor.seek(offset as _); - Value::IfdBig(cursor.read_u64().await?) - } - }); - } - - // Case 3: There is more than one value, but it fits in the offset field. - if value_byte_length <= 4 || bigtiff && value_byte_length <= 8 { - let mut data = cursor.read(value_byte_length).await?; - if bigtiff { - cursor.advance(8 - value_byte_length); - } else { - cursor.advance(4 - value_byte_length); - } - - match tag_type { - Type::BYTE | Type::UNDEFINED => { - return { - Ok(Value::List( - (0..count) - .map(|_| Value::Byte(data.read_u8().unwrap())) - .collect(), - )) - }; - } - Type::SBYTE => { - return { - Ok(Value::List( - (0..count) - .map(|_| Value::Signed(data.read_i8().unwrap() as i32)) - .collect(), - )) - } - } - Type::ASCII => { - let mut buf = vec![0; count as usize]; - data.read_exact(&mut buf)?; - if buf.is_ascii() && buf.ends_with(&[0]) { - let v = std::str::from_utf8(&buf) - .map_err(|err| AsyncTiffError::General(err.to_string()))?; - let v = v.trim_matches(char::from(0)); - return Ok(Value::Ascii(v.into())); - } else { - panic!("Invalid tag"); - // return Err(TiffError::FormatError(TiffFormatError::InvalidTag)); - } - } - Type::SHORT => { - let mut v = Vec::new(); - for _ in 0..count { - v.push(Value::Short(data.read_u16()?)); - } - return Ok(Value::List(v)); - } - Type::SSHORT => { - let mut v = Vec::new(); - for _ in 0..count { - v.push(Value::Signed(i32::from(data.read_i16()?))); - } - return Ok(Value::List(v)); - } - Type::LONG => { - let mut v = Vec::new(); - for _ in 0..count { - v.push(Value::Unsigned(data.read_u32()?)); - } - return Ok(Value::List(v)); - } - Type::SLONG => { - let mut v = Vec::new(); - for _ in 0..count { - v.push(Value::Signed(data.read_i32()?)); - } - return Ok(Value::List(v)); - } - Type::FLOAT => { - let mut v = Vec::new(); - for _ in 0..count { - v.push(Value::Float(data.read_f32()?)); - } - return Ok(Value::List(v)); - } - Type::IFD => { - let mut v = Vec::new(); - for _ in 0..count { - v.push(Value::Ifd(data.read_u32()?)); - } - return Ok(Value::List(v)); - } - Type::LONG8 - | Type::SLONG8 - | Type::RATIONAL - | Type::SRATIONAL - | Type::DOUBLE - | Type::IFD8 => { - unreachable!() - } - } - } - - // Seek cursor - let offset = if bigtiff { - cursor.read_u64().await? - } else { - cursor.read_u32().await?.into() - }; - cursor.seek(offset); - - // Case 4: there is more than one value, and it doesn't fit in the offset field. - match tag_type { - // TODO check if this could give wrong results - // at a different endianess of file/computer. - Type::BYTE | Type::UNDEFINED => { - let mut v = Vec::with_capacity(count as _); - for _ in 0..count { - v.push(Value::Byte(cursor.read_u8().await?)) - } - Ok(Value::List(v)) - } - Type::SBYTE => { - let mut v = Vec::with_capacity(count as _); - for _ in 0..count { - v.push(Value::Signed(cursor.read_i8().await? as i32)) - } - Ok(Value::List(v)) - } - Type::SHORT => { - let mut v = Vec::with_capacity(count as _); - for _ in 0..count { - v.push(Value::Short(cursor.read_u16().await?)) - } - Ok(Value::List(v)) - } - Type::SSHORT => { - let mut v = Vec::with_capacity(count as _); - for _ in 0..count { - v.push(Value::Signed(cursor.read_i16().await? as i32)) - } - Ok(Value::List(v)) - } - Type::LONG => { - let mut v = Vec::with_capacity(count as _); - for _ in 0..count { - v.push(Value::Unsigned(cursor.read_u32().await?)) - } - Ok(Value::List(v)) - } - Type::SLONG => { - let mut v = Vec::with_capacity(count as _); - for _ in 0..count { - v.push(Value::Signed(cursor.read_i32().await?)) - } - Ok(Value::List(v)) - } - Type::FLOAT => { - let mut v = Vec::with_capacity(count as _); - for _ in 0..count { - v.push(Value::Float(cursor.read_f32().await?)) - } - Ok(Value::List(v)) - } - Type::DOUBLE => { - let mut v = Vec::with_capacity(count as _); - for _ in 0..count { - v.push(Value::Double(cursor.read_f64().await?)) - } - Ok(Value::List(v)) - } - Type::RATIONAL => { - let mut v = Vec::with_capacity(count as _); - for _ in 0..count { - v.push(Value::Rational( - cursor.read_u32().await?, - cursor.read_u32().await?, - )) - } - Ok(Value::List(v)) - } - Type::SRATIONAL => { - let mut v = Vec::with_capacity(count as _); - for _ in 0..count { - v.push(Value::SRational( - cursor.read_i32().await?, - cursor.read_i32().await?, - )) - } - Ok(Value::List(v)) - } - Type::LONG8 => { - let mut v = Vec::with_capacity(count as _); - for _ in 0..count { - v.push(Value::UnsignedBig(cursor.read_u64().await?)) - } - Ok(Value::List(v)) - } - Type::SLONG8 => { - let mut v = Vec::with_capacity(count as _); - for _ in 0..count { - v.push(Value::SignedBig(cursor.read_i64().await?)) - } - Ok(Value::List(v)) - } - Type::IFD => { - let mut v = Vec::with_capacity(count as _); - for _ in 0..count { - v.push(Value::Ifd(cursor.read_u32().await?)) - } - Ok(Value::List(v)) - } - Type::IFD8 => { - let mut v = Vec::with_capacity(count as _); - for _ in 0..count { - v.push(Value::IfdBig(cursor.read_u64().await?)) - } - Ok(Value::List(v)) - } - Type::ASCII => { - let mut out = vec![0; count as _]; - let mut buf = cursor.read(count).await?; - buf.read_exact(&mut out)?; - - // Strings may be null-terminated, so we trim anything downstream of the null byte - if let Some(first) = out.iter().position(|&b| b == 0) { - out.truncate(first); - } - Ok(Value::Ascii( - String::from_utf8(out).map_err(|err| AsyncTiffError::General(err.to_string()))?, - )) - } - } -} diff --git a/src/metadata/fetch.rs b/src/metadata/fetch.rs index 8a41845..f74285b 100644 --- a/src/metadata/fetch.rs +++ b/src/metadata/fetch.rs @@ -117,7 +117,7 @@ impl<'a, F: MetadataFetch + Send + Sync> PrefetchBuffer<'a, F> { } } -impl<'a, F: MetadataFetch + Send + Sync> MetadataFetch for PrefetchBuffer<'a, F> { +impl MetadataFetch for PrefetchBuffer<'_, F> { fn fetch(&mut self, range: Range) -> BoxFuture<'_, AsyncTiffResult> { if range.end <= self.buffer_length() as _ { async { Ok(self.buffer_slice(range)) }.boxed() diff --git a/src/metadata/reader.rs b/src/metadata/reader.rs index 446cea2..9414004 100644 --- a/src/metadata/reader.rs +++ b/src/metadata/reader.rs @@ -109,7 +109,7 @@ impl TiffMetadataReader { /// If there are no more IFDs, returns `None`. pub async fn read_next_ifd( &mut self, - fetch: &F, + fetch: &mut F, ) -> AsyncTiffResult> { if let Some(ifd_start) = self.next_ifd_offset { let ifd_reader = @@ -127,7 +127,7 @@ impl TiffMetadataReader { /// Read all IFDs from the file. pub async fn read_all_ifds( &mut self, - fetch: &F, + fetch: &mut F, ) -> AsyncTiffResult> { let mut ifds = vec![]; while let Some(ifd) = self.read_next_ifd(fetch).await? { @@ -162,7 +162,7 @@ pub struct ImageFileDirectoryReader { impl ImageFileDirectoryReader { /// Read and parse the IFD starting at the given file offset pub async fn open( - fetch: &F, + fetch: &mut F, ifd_start_offset: u64, bigtiff: bool, endianness: Endianness, @@ -205,7 +205,7 @@ impl ImageFileDirectoryReader { /// [`ImageFileDirectory::from_tags`] on the resulting collection of tags. pub async fn read_tag( &self, - fetch: &F, + fetch: &mut F, tag_idx: u64, ) -> AsyncTiffResult<(Tag, Value)> { assert!(tag_idx < self.tag_count); @@ -220,7 +220,10 @@ impl ImageFileDirectoryReader { /// /// Keep in mind that you'll still need to call [`finish`][Self::finish] to get the byte offset /// of the next IFD. - pub async fn read(&self, fetch: &F) -> AsyncTiffResult { + pub async fn read( + &self, + fetch: &mut F, + ) -> AsyncTiffResult { let mut tags = HashMap::with_capacity(self.tag_count as usize); for tag_idx in 0..self.tag_count { let (tag, value) = self.read_tag(fetch, tag_idx).await?; @@ -230,7 +233,7 @@ impl ImageFileDirectoryReader { } /// Finish this reader, reading the byte offset of the next IFD - pub async fn finish(self, fetch: &F) -> AsyncTiffResult> { + pub async fn finish(self, fetch: &mut F) -> AsyncTiffResult> { // The byte offset for reading the next ifd let next_ifd_byte_offset = self.ifd_start_offset + self.tag_count_byte_size @@ -255,7 +258,7 @@ impl ImageFileDirectoryReader { /// Read a single tag from the cursor async fn read_tag( - fetch: &F, + fetch: &mut F, tag_offset: u64, endianness: Endianness, bigtiff: bool, diff --git a/tests/image_tiff/util.rs b/tests/image_tiff/util.rs index 94a29c1..e4f9ace 100644 --- a/tests/image_tiff/util.rs +++ b/tests/image_tiff/util.rs @@ -2,7 +2,7 @@ use std::env::current_dir; use std::sync::Arc; use async_tiff::metadata::TiffMetadataReader; -use async_tiff::reader::{AsyncFileReader, ObjectReader}; +use async_tiff::reader::ObjectReader; use async_tiff::TIFF; use object_store::local::LocalFileSystem; @@ -11,8 +11,8 @@ const TEST_IMAGE_DIR: &str = "tests/image_tiff/images/"; pub(crate) async fn open_tiff(filename: &str) -> TIFF { let store = Arc::new(LocalFileSystem::new_with_prefix(current_dir().unwrap()).unwrap()); let path = format!("{TEST_IMAGE_DIR}/{filename}"); - let mut reader = Box::new(ObjectReader::new(store.clone(), path.as_str().into())); - let mut metadata_reader = TiffMetadataReader::try_open(&reader).await.unwrap(); - let ifds = metadata_reader.read_all_ifds(&reader).await.unwrap(); + let mut reader = ObjectReader::new(store.clone(), path.as_str().into()); + let mut metadata_reader = TiffMetadataReader::try_open(&mut reader).await.unwrap(); + let ifds = metadata_reader.read_all_ifds(&mut reader).await.unwrap(); TIFF::new(ifds) } From 7564a5c27902f24789acf44e06ddc2bcc7b45449 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Fri, 28 Mar 2025 17:42:33 -0400 Subject: [PATCH 7/8] modify api --- src/cog.rs | 4 +--- src/metadata/fetch.rs | 25 +++++++++++++++++-------- 2 files changed, 18 insertions(+), 11 deletions(-) diff --git a/src/cog.rs b/src/cog.rs index a942571..e955322 100644 --- a/src/cog.rs +++ b/src/cog.rs @@ -38,9 +38,7 @@ mod test { let path = object_store::path::Path::parse("m_4007307_sw_18_060_20220803.tif").unwrap(); let store = Arc::new(LocalFileSystem::new_with_prefix(folder).unwrap()); let mut reader = ObjectReader::new(store, path); - let mut prefetch_reader = PrefetchBuffer::new(&mut reader, 32 * 1024, 1.5) - .await - .unwrap(); + let mut prefetch_reader = PrefetchBuffer::new(&mut reader, 32 * 1024).await.unwrap(); let mut metadata_reader = TiffMetadataReader::try_open(&mut prefetch_reader) .await diff --git a/src/metadata/fetch.rs b/src/metadata/fetch.rs index f74285b..546f0a3 100644 --- a/src/metadata/fetch.rs +++ b/src/metadata/fetch.rs @@ -35,7 +35,8 @@ pub struct PrefetchBuffer<'a, F: MetadataFetch + Send + Sync> { /// Invariant: buffers are monotonically increasing buffers starting at the beginning of the /// file buffers: Vec, - /// The exponent used for deciding how much more data to fetch on overflow of the existing buffer. + /// The exponent used for deciding how much more data to fetch on overflow of the existing + /// buffer. /// /// buffer_length ^ fetch_exponent overflow_fetch_exponent: f64, @@ -43,19 +44,24 @@ pub struct PrefetchBuffer<'a, F: MetadataFetch + Send + Sync> { impl<'a, F: MetadataFetch + Send + Sync> PrefetchBuffer<'a, F> { /// Construct a new PrefetchBuffer, catching the first `prefetch` bytes of the file. - pub async fn new( - fetch: &'a mut F, - prefetch: u64, - overflow_fetch_exponent: f64, - ) -> AsyncTiffResult { + pub async fn new(fetch: &'a mut F, prefetch: u64) -> AsyncTiffResult { let buffer = fetch.fetch(0..prefetch).await?; Ok(Self { fetch, buffers: vec![buffer], - overflow_fetch_exponent, + overflow_fetch_exponent: 1.5, }) } + /// Set the exponent used for deciding how much more data to fetch on overflow of the existing + /// buffer. + pub fn with_overflow_fetch_exponent(self, overflow_fetch_exponent: f64) -> Self { + Self { + overflow_fetch_exponent, + ..self + } + } + /// Expand the length of buffers that have been pre-fetched /// /// Returns the desired range and adds it to the cached buffers. @@ -255,7 +261,10 @@ mod test { let mut reader = TestAsyncFileReader { buffer: Bytes::from_static(underlying_buffer), }; - let mut prefetch_reader = PrefetchBuffer::new(&mut reader, 5, 1.).await.unwrap(); + let mut prefetch_reader = PrefetchBuffer::new(&mut reader, 5) + .await + .unwrap() + .with_overflow_fetch_exponent(1.); // Cached assert_eq!(prefetch_reader.fetch(0..3).await.unwrap().as_ref(), b"abc"); From 0a434b7c593b33800af91a3b11cc9af65ae1cd49 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Wed, 2 Apr 2025 15:04:45 -0400 Subject: [PATCH 8/8] wip python mutable --- python/src/reader.rs | 11 +++++------ python/src/tiff.rs | 16 +++++++++------- 2 files changed, 14 insertions(+), 13 deletions(-) diff --git a/python/src/reader.rs b/python/src/reader.rs index 73580d4..4535605 100644 --- a/python/src/reader.rs +++ b/python/src/reader.rs @@ -1,5 +1,4 @@ use std::ops::Range; -use std::sync::Arc; use async_tiff::error::{AsyncTiffError, AsyncTiffResult}; use async_tiff::reader::{AsyncFileReader, ObjectReader}; @@ -21,12 +20,12 @@ pub(crate) enum StoreInput { } impl StoreInput { - pub(crate) fn into_async_file_reader(self, path: String) -> Arc { + pub(crate) fn into_async_file_reader(self, path: String) -> Box { match self { Self::ObjectStore(store) => { - Arc::new(ObjectReader::new(store.into_inner(), path.into())) + Box::new(ObjectReader::new(store.into_inner(), path.into())) } - Self::ObspecBackend(backend) => Arc::new(ObspecReader { backend, path }), + Self::ObspecBackend(backend) => Box::new(ObspecReader { backend, path }), } } } @@ -115,12 +114,12 @@ struct ObspecReader { } impl AsyncFileReader for ObspecReader { - fn get_bytes(&self, range: Range) -> BoxFuture<'_, AsyncTiffResult> { + fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, AsyncTiffResult> { self.backend.get_range_wrapper(&self.path, range).boxed() } fn get_byte_ranges( - &self, + &mut self, ranges: Vec>, ) -> BoxFuture<'_, AsyncTiffResult>> { self.backend.get_ranges_wrapper(&self.path, ranges).boxed() diff --git a/python/src/tiff.rs b/python/src/tiff.rs index 82f1351..f89fd74 100644 --- a/python/src/tiff.rs +++ b/python/src/tiff.rs @@ -15,7 +15,7 @@ use crate::PyImageFileDirectory; #[pyclass(name = "TIFF", frozen)] pub(crate) struct PyTIFF { tiff: TIFF, - reader: Arc, + reader: Box, } #[pymethods] @@ -29,13 +29,15 @@ impl PyTIFF { store: StoreInput, prefetch: u64, ) -> PyResult> { - let reader = store.into_async_file_reader(path); + let mut reader = store.into_async_file_reader(path); let cog_reader = future_into_py(py, async move { - let metadata_fetch = PrefetchBuffer::new(reader.clone(), prefetch).await.unwrap(); - let mut metadata_reader = TiffMetadataReader::try_open(&metadata_fetch).await.unwrap(); + let mut metadata_fetch = PrefetchBuffer::new(&mut reader, prefetch).await.unwrap(); + let mut metadata_reader = TiffMetadataReader::try_open(&mut metadata_fetch) + .await + .unwrap(); let ifds = metadata_reader - .read_all_ifds(&metadata_fetch) + .read_all_ifds(&mut metadata_fetch) .await .unwrap(); let tiff = TIFF::new(ifds); @@ -57,7 +59,7 @@ impl PyTIFF { y: usize, z: usize, ) -> PyResult> { - let reader = self.reader.clone(); + let reader = &self.reader; let ifd = self .tiff .ifds() @@ -67,7 +69,7 @@ impl PyTIFF { // TODO: avoid this clone; add Arc to underlying rust code? .clone(); future_into_py(py, async move { - let tile = ifd.fetch_tile(x, y, reader.as_ref()).await.unwrap(); + let tile = ifd.fetch_tile(x, y, reader.as_mut()).await.unwrap(); Ok(PyTile::new(tile)) }) }