Skip to content

Exponential overflow header buffer #83

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions python/src/reader.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::ops::Range;
use std::sync::Arc;

use async_tiff::error::{AsyncTiffError, AsyncTiffResult};
use async_tiff::reader::{AsyncFileReader, ObjectReader};
Expand All @@ -21,12 +20,12 @@ pub(crate) enum StoreInput {
}

impl StoreInput {
pub(crate) fn into_async_file_reader(self, path: String) -> Arc<dyn AsyncFileReader> {
pub(crate) fn into_async_file_reader(self, path: String) -> Box<dyn AsyncFileReader> {
match self {
Self::ObjectStore(store) => {
Arc::new(ObjectReader::new(store.into_inner(), path.into()))
Box::new(ObjectReader::new(store.into_inner(), path.into()))
}
Self::ObspecBackend(backend) => Arc::new(ObspecReader { backend, path }),
Self::ObspecBackend(backend) => Box::new(ObspecReader { backend, path }),
}
}
}
Expand Down Expand Up @@ -115,12 +114,12 @@ struct ObspecReader {
}

impl AsyncFileReader for ObspecReader {
fn get_bytes(&self, range: Range<u64>) -> BoxFuture<'_, AsyncTiffResult<Bytes>> {
fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, AsyncTiffResult<Bytes>> {
self.backend.get_range_wrapper(&self.path, range).boxed()
}

fn get_byte_ranges(
&self,
&mut self,
ranges: Vec<Range<u64>>,
) -> BoxFuture<'_, AsyncTiffResult<Vec<Bytes>>> {
self.backend.get_ranges_wrapper(&self.path, ranges).boxed()
Expand Down
16 changes: 9 additions & 7 deletions python/src/tiff.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::PyImageFileDirectory;
#[pyclass(name = "TIFF", frozen)]
pub(crate) struct PyTIFF {
tiff: TIFF,
reader: Arc<dyn AsyncFileReader>,
reader: Box<dyn AsyncFileReader>,
}

#[pymethods]
Expand All @@ -29,13 +29,15 @@ impl PyTIFF {
store: StoreInput,
prefetch: u64,
) -> PyResult<Bound<'py, PyAny>> {
let reader = store.into_async_file_reader(path);
let mut reader = store.into_async_file_reader(path);

let cog_reader = future_into_py(py, async move {
let metadata_fetch = PrefetchBuffer::new(reader.clone(), prefetch).await.unwrap();
let mut metadata_reader = TiffMetadataReader::try_open(&metadata_fetch).await.unwrap();
let mut metadata_fetch = PrefetchBuffer::new(&mut reader, prefetch).await.unwrap();
let mut metadata_reader = TiffMetadataReader::try_open(&mut metadata_fetch)
.await
.unwrap();
let ifds = metadata_reader
.read_all_ifds(&metadata_fetch)
.read_all_ifds(&mut metadata_fetch)
.await
.unwrap();
let tiff = TIFF::new(ifds);
Expand All @@ -57,7 +59,7 @@ impl PyTIFF {
y: usize,
z: usize,
) -> PyResult<Bound<'py, PyAny>> {
let reader = self.reader.clone();
let reader = &self.reader;
let ifd = self
.tiff
.ifds()
Expand All @@ -67,7 +69,7 @@ impl PyTIFF {
// TODO: avoid this clone; add Arc to underlying rust code?
.clone();
future_into_py(py, async move {
let tile = ifd.fetch_tile(x, y, reader.as_ref()).await.unwrap();
let tile = ifd.fetch_tile(x, y, reader.as_mut()).await.unwrap();
Ok(PyTile::new(tile))
})
}
Expand Down
15 changes: 7 additions & 8 deletions src/cog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ mod test {

use crate::decoder::DecoderRegistry;
use crate::metadata::{PrefetchBuffer, TiffMetadataReader};
use crate::reader::{AsyncFileReader, ObjectReader};
use crate::reader::ObjectReader;

use super::*;
use object_store::local::LocalFileSystem;
Expand All @@ -37,22 +37,21 @@ mod test {
let folder = "/Users/kyle/github/developmentseed/async-tiff/";
let path = object_store::path::Path::parse("m_4007307_sw_18_060_20220803.tif").unwrap();
let store = Arc::new(LocalFileSystem::new_with_prefix(folder).unwrap());
let reader = Arc::new(ObjectReader::new(store, path)) as Arc<dyn AsyncFileReader>;
let prefetch_reader = PrefetchBuffer::new(reader.clone(), 32 * 1024)
.await
.unwrap();
let mut metadata_reader = TiffMetadataReader::try_open(&prefetch_reader)
let mut reader = ObjectReader::new(store, path);
let mut prefetch_reader = PrefetchBuffer::new(&mut reader, 32 * 1024).await.unwrap();

let mut metadata_reader = TiffMetadataReader::try_open(&mut prefetch_reader)
.await
.unwrap();
let ifds = metadata_reader
.read_all_ifds(&prefetch_reader)
.read_all_ifds(&mut prefetch_reader)
.await
.unwrap();
let tiff = TIFF::new(ifds);

let ifd = &tiff.ifds[1];
let decoder_registry = DecoderRegistry::default();
let tile = ifd.fetch_tile(0, 0, reader.as_ref()).await.unwrap();
let tile = ifd.fetch_tile(0, 0, &mut reader).await.unwrap();
let tile = tile.decode(&decoder_registry).unwrap();
std::fs::write("img.buf", tile).unwrap();
}
Expand Down
4 changes: 2 additions & 2 deletions src/ifd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -680,7 +680,7 @@ impl ImageFileDirectory {
&self,
x: usize,
y: usize,
reader: &dyn AsyncFileReader,
reader: &mut dyn AsyncFileReader,
) -> AsyncTiffResult<Tile> {
let range = self
.get_tile_byte_range(x, y)
Expand All @@ -701,7 +701,7 @@ impl ImageFileDirectory {
&self,
x: &[usize],
y: &[usize],
reader: &dyn AsyncFileReader,
reader: &mut dyn AsyncFileReader,
) -> AsyncTiffResult<Vec<Tile>> {
assert_eq!(x.len(), y.len(), "x and y should have same len");

Expand Down
194 changes: 170 additions & 24 deletions src/metadata/fetch.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::ops::Range;

use bytes::Bytes;
use bytes::{Bytes, BytesMut};
use futures::future::BoxFuture;
use futures::FutureExt;

Expand All @@ -16,11 +16,11 @@ pub trait MetadataFetch {
///
/// Note the returned type is a boxed future, often created by
/// [futures::FutureExt::boxed]. See the trait documentation for an example.
fn fetch(&self, range: Range<u64>) -> BoxFuture<'_, AsyncTiffResult<Bytes>>;
fn fetch(&mut self, range: Range<u64>) -> BoxFuture<'_, AsyncTiffResult<Bytes>>;
}

impl<T: AsyncFileReader> MetadataFetch for T {
fn fetch(&self, range: Range<u64>) -> BoxFuture<'_, AsyncTiffResult<Bytes>> {
fn fetch(&mut self, range: Range<u64>) -> BoxFuture<'_, AsyncTiffResult<Bytes>> {
self.get_bytes(range)
}
}
Expand All @@ -30,52 +30,125 @@ impl<T: AsyncFileReader> MetadataFetch for T {
/// This is designed so that the async requests made by the underlying tag reader get intercepted
/// here and served from the existing buffer when possible.
#[derive(Debug)]
pub struct PrefetchBuffer<F: MetadataFetch> {
fetch: F,
buffer: Bytes,
pub struct PrefetchBuffer<'a, F: MetadataFetch + Send + Sync> {
fetch: &'a mut F,
/// Invariant: buffers are monotonically increasing buffers starting at the beginning of the
/// file
buffers: Vec<Bytes>,
/// The exponent used for deciding how much more data to fetch on overflow of the existing
/// buffer.
///
/// buffer_length ^ fetch_exponent
overflow_fetch_exponent: f64,
}

impl<F: MetadataFetch> PrefetchBuffer<F> {
impl<'a, F: MetadataFetch + Send + Sync> PrefetchBuffer<'a, F> {
/// Construct a new PrefetchBuffer, catching the first `prefetch` bytes of the file.
pub async fn new(fetch: F, prefetch: u64) -> AsyncTiffResult<Self> {
pub async fn new(fetch: &'a mut F, prefetch: u64) -> AsyncTiffResult<Self> {
let buffer = fetch.fetch(0..prefetch).await?;
Ok(Self { fetch, buffer })
Ok(Self {
fetch,
buffers: vec![buffer],
overflow_fetch_exponent: 1.5,
})
}

/// Set the exponent used for deciding how much more data to fetch on overflow of the existing
/// buffer.
pub fn with_overflow_fetch_exponent(self, overflow_fetch_exponent: f64) -> Self {
Self {
overflow_fetch_exponent,
..self
}
}

/// Expand the length of buffers that have been pre-fetched
///
/// Returns the desired range and adds it to the cached buffers.
async fn expand_prefetch(&mut self, range: Range<u64>) -> AsyncTiffResult<Bytes> {
let existing_buffer_length = self.buffer_length() as u64;
let additional_fetch =
(existing_buffer_length as f64).powf(self.overflow_fetch_exponent) as u64;

// Make sure that we fetch at least the entire desired range
let new_range =
existing_buffer_length..range.end.max(existing_buffer_length + additional_fetch);
let buffer = self.fetch.fetch(new_range).await?;
self.buffers.push(buffer);

// Now extract the desired slice range
Ok(self.buffer_slice(range))
}
}

impl<F: MetadataFetch> MetadataFetch for PrefetchBuffer<F> {
fn fetch(&self, range: Range<u64>) -> BoxFuture<'_, AsyncTiffResult<Bytes>> {
if range.start < self.buffer.len() as _ {
if range.end < self.buffer.len() as _ {
let usize_range = range.start as usize..range.end as usize;
let result = self.buffer.slice(usize_range);
async { Ok(result) }.boxed()
} else {
// TODO: reuse partial internal buffer
self.fetch.fetch(range)
/// The length of all cached buffers
fn buffer_length(&self) -> usize {
self.buffers.iter().fold(0, |acc, x| acc + x.len())
}

/// Access the buffer range out of the cached buffers
///
/// ## Panics
///
/// If the range does not fall completely within the pre-cached buffers.
fn buffer_slice(&self, range: Range<u64>) -> Bytes {
// Slices of the underlying cached buffers
let mut output_buffers: Vec<Bytes> = vec![];

// A counter that describes the global start of the currently-iterated `buf`
let mut global_byte_offset: u64 = 0;

for buf in self.buffers.iter() {
// Subtract off the global_byte_offset and then see if it overlaps the current buffer
let local_range =
range.start.saturating_sub(global_byte_offset)..range.end - global_byte_offset;

if ranges_overlap(&local_range, &(0..buf.len() as u64)) {
let start = local_range.start as usize;
let end = (local_range.end as usize).min(buf.len());
output_buffers.push(buf.slice(start..end));
}

global_byte_offset += buf.len() as u64;
}

if output_buffers.len() == 1 {
output_buffers.into_iter().next().unwrap()
} else {
let mut result = BytesMut::with_capacity(range.end as usize - range.start as usize);
for output_buf in output_buffers.into_iter() {
result.extend_from_slice(&output_buf);
}
result.freeze()
}
}
}

impl<F: MetadataFetch + Send + Sync> MetadataFetch for PrefetchBuffer<'_, F> {
fn fetch(&mut self, range: Range<u64>) -> BoxFuture<'_, AsyncTiffResult<Bytes>> {
if range.end <= self.buffer_length() as _ {
async { Ok(self.buffer_slice(range)) }.boxed()
} else {
self.fetch.fetch(range)
self.expand_prefetch(range).boxed()
}
}
}

pub(crate) struct MetadataCursor<'a, F: MetadataFetch> {
fetch: &'a F,
fetch: &'a mut F,
offset: u64,
endianness: Endianness,
}

impl<'a, F: MetadataFetch> MetadataCursor<'a, F> {
pub fn new(fetch: &'a F, endianness: Endianness) -> Self {
pub fn new(fetch: &'a mut F, endianness: Endianness) -> Self {
Self {
fetch,
offset: 0,
endianness,
}
}

pub fn new_with_offset(fetch: &'a F, endianness: Endianness, offset: u64) -> Self {
pub fn new_with_offset(fetch: &'a mut F, endianness: Endianness, offset: u64) -> Self {
Self {
fetch,
offset,
Expand Down Expand Up @@ -153,3 +226,76 @@ impl<'a, F: MetadataFetch> MetadataCursor<'a, F> {
self.read(8).await?.read_f64()
}
}

// https://stackoverflow.com/a/12888920
fn ranges_overlap(r1: &Range<u64>, r2: &Range<u64>) -> bool {
r1.start.max(r2.start) <= r1.end.min(r2.end)
}

#[cfg(test)]
mod test {
use super::*;

#[derive(Debug)]
struct TestAsyncFileReader {
buffer: Bytes,
}

impl TestAsyncFileReader {
async fn get_range(&self, range: Range<u64>) -> AsyncTiffResult<Bytes> {
assert!(range.start < self.buffer.len() as _);
let end = range.end.min(self.buffer.len() as _);
Ok(self.buffer.slice(range.start as usize..end as usize))
}
}

impl MetadataFetch for TestAsyncFileReader {
fn fetch(&mut self, range: Range<u64>) -> BoxFuture<'_, AsyncTiffResult<Bytes>> {
self.get_range(range).boxed()
}
}

#[tokio::test]
async fn test_prefetch_overflow() {
let underlying_buffer = b"abcdefghijklmno";
let mut reader = TestAsyncFileReader {
buffer: Bytes::from_static(underlying_buffer),
};
let mut prefetch_reader = PrefetchBuffer::new(&mut reader, 5)
.await
.unwrap()
.with_overflow_fetch_exponent(1.);

// Cached
assert_eq!(prefetch_reader.fetch(0..3).await.unwrap().as_ref(), b"abc");

// Cached
assert_eq!(
prefetch_reader.fetch(0..5).await.unwrap().as_ref(),
b"abcde"
);

// Expand fetch
assert_eq!(
prefetch_reader.fetch(0..10).await.unwrap().as_ref(),
b"abcdefghij"
);

// Cached
assert_eq!(
prefetch_reader.fetch(0..10).await.unwrap().as_ref(),
b"abcdefghij"
);

// Cached
assert_eq!(
prefetch_reader.fetch(0..15).await.unwrap().as_ref(),
underlying_buffer
);

// Assert underlying buffers were cached
assert_eq!(prefetch_reader.buffers[0].as_ref(), b"abcde");
assert_eq!(prefetch_reader.buffers[1].as_ref(), b"fghij");
assert_eq!(prefetch_reader.buffers[2].as_ref(), b"klmno");
}
}
Loading
Loading