Skip to content

Commit

Permalink
fix: various changes to fix backpressure (#2721)
Browse files Browse the repository at this point in the history
* Reworked the binary decoder so that the binary data is not transferred
from the I/O scheduler until the decoder is ready for it (previously
this transfer was happening in the indirect task)
* Significantly modified the I/O scheduler. It now uses a custom queue
instead of a simple priority queue. This queue manages both the # IOPS
limit and the # bytes in flight limit.
* Scans now set the file priority explicitly. Before the ScanScheduler
was using a counter but since we open fragments in parallel this meant
that earlier fragments might not get the lowest priority which was
leading to out-of-order scheduling.
* Removed the deadlock prevention timeout. Now, we avoid deadlock by
allowing the backpressure limit to be slightly violated. We keep track
of the priority of all in-flight requests. If a new request comes in
that is lower priority than any in-flight request we go ahead and allow
it even if it would violate backpressure. In practice, this means only a
few extra requests sneak in and overall backpressure is maintained. We
can improve this in the future once we move to a max of 2 levels of
indirection by having indirect requests estimate how many bytes they
will need.
  • Loading branch information
westonpace authored Aug 12, 2024
1 parent 1a52b94 commit 30b3df7
Show file tree
Hide file tree
Showing 15 changed files with 476 additions and 341 deletions.
6 changes: 1 addition & 5 deletions python/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,7 @@ use arrow::pyarrow::PyArrowType;
use arrow_array::{RecordBatch, RecordBatchReader, UInt32Array};
use arrow_schema::Schema as ArrowSchema;
use futures::stream::StreamExt;
use lance::{
io::{ObjectStore, RecordBatchStream},
utils::default_deadlock_prevention_timeout,
};
use lance::io::{ObjectStore, RecordBatchStream};
use lance_encoding::decoder::{DecoderMiddlewareChain, FilterExpression};
use lance_file::{
v2::{
Expand Down Expand Up @@ -291,7 +288,6 @@ impl LanceFileReader {
Arc::new(object_store),
SchedulerConfig {
io_buffer_size_bytes: 2 * 1024 * 1024 * 1024,
deadlock_prevention_timeout: default_deadlock_prevention_timeout(),
},
);
let file = scheduler.open_file(&path).await.infer_error()?;
Expand Down
8 changes: 7 additions & 1 deletion rust/lance-encoding/src/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,13 @@ impl<'a> DecoderMiddlewareChainCursor<'a> {
) -> Result<ChosenFieldScheduler<'a>> {
self.path.push_back(child_idx);
self.cur_idx = 0;
self.next(field, column_infos, buffers)
match self.next(field, column_infos, buffers) {
Ok(mut next) => {
next.0.path.pop_back();
Ok(next)
}
Err(e) => Err(e),
}
}

/// Starts the decoding process for a field
Expand Down
35 changes: 28 additions & 7 deletions rust/lance-encoding/src/encodings/physical/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use arrow_array::cast::AsArray;
use arrow_array::types::UInt64Type;
use arrow_array::{Array, ArrayRef};
use arrow_buffer::{BooleanBuffer, BooleanBufferBuilder, ScalarBuffer};
use futures::TryFutureExt;
use futures::{future::BoxFuture, FutureExt};

use crate::decoder::LogicalPageDecoder;
Expand Down Expand Up @@ -109,6 +110,13 @@ impl BinaryPageScheduler {
}
}

struct IndirectData {
decoded_indices: UInt64Array,
offsets_type: DataType,
validity: BooleanBuffer,
bytes_decoder_fut: BoxFuture<'static, Result<Box<dyn PrimitivePageDecoder>>>,
}

impl PageScheduler for BinaryPageScheduler {
fn schedule_ranges(
&self,
Expand Down Expand Up @@ -200,21 +208,34 @@ impl PageScheduler for BinaryPageScheduler {
let (indices, validity) = indices_builder.into_parts();
let decoded_indices = UInt64Array::from(indices);

// Schedule the bytes for decoding
let bytes_page_decoder =
// In the indirect task we schedule the bytes, but we do not await them. We don't want to
// await the bytes until the decoder is ready for them so that we don't release the backpressure
// too early
let bytes_decoder_fut =
copy_bytes_scheduler.schedule_ranges(&bytes_ranges, &copy_scheduler, top_level_row);

let bytes_decoder: Box<dyn PrimitivePageDecoder> = bytes_page_decoder.await?;

Ok(Box::new(BinaryPageDecoder {
Ok(IndirectData {
decoded_indices,
validity,
offsets_type,
bytes_decoder,
}) as Box<dyn PrimitivePageDecoder>)
bytes_decoder_fut,
})
})
// Propagate join panic
.map(|join_handle| join_handle.unwrap())
.and_then(|indirect_data| {
async move {
// Later, this will be called once the decoder actually starts polling. At that point
// we await the bytes (releasing the backpressure)
let bytes_decoder = indirect_data.bytes_decoder_fut.await?;
Ok(Box::new(BinaryPageDecoder {
decoded_indices: indirect_data.decoded_indices,
offsets_type: indirect_data.offsets_type,
validity: indirect_data.validity,
bytes_decoder,
}) as Box<dyn PrimitivePageDecoder>)
}
})
.boxed()
}
}
Expand Down
3 changes: 2 additions & 1 deletion rust/lance-encoding/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub mod version;
///
/// In general, it is assumed that this trait will be implemented by some kind of "file reader"
/// or "file scheduler". The encodings here are all limited to accessing a single file.
pub trait EncodingsIo: Send + Sync {
pub trait EncodingsIo: std::fmt::Debug + Send + Sync {
/// Submit an I/O request
///
/// The response must contain a `Bytes` object for each range requested even if the underlying
Expand Down Expand Up @@ -62,6 +62,7 @@ pub trait EncodingsIo: Send + Sync {
}

/// An implementation of EncodingsIo that serves data from an in-memory buffer
#[derive(Debug)]
pub struct BufferScheduler {
data: Bytes,
}
Expand Down
1 change: 1 addition & 0 deletions rust/lance-encoding/src/testing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use crate::{

const MAX_PAGE_BYTES: u64 = 32 * 1024 * 1024;

#[derive(Debug)]
pub(crate) struct SimulatedScheduler {
data: Bytes,
}
Expand Down
2 changes: 1 addition & 1 deletion rust/lance-index/src/scalar/lance_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ impl LanceIndexStore {
let object_store = Arc::new(object_store);
let scheduler = ScanScheduler::new(
object_store.clone(),
SchedulerConfig::fast_and_not_too_ram_intensive(),
SchedulerConfig::max_bandwidth(&object_store),
);
Self {
object_store,
Expand Down
14 changes: 5 additions & 9 deletions rust/lance-index/src/vector/ivf/shuffler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -366,10 +366,8 @@ impl IvfShuffler {
let reader = FileReader::try_new_self_described(&object_store, &path, None).await?;
total_batches.push(reader.num_batches());
} else {
let scheduler = ScanScheduler::new(
object_store.into(),
SchedulerConfig::fast_and_not_too_ram_intensive(),
);
let scheduler_config = SchedulerConfig::max_bandwidth(&object_store);
let scheduler = ScanScheduler::new(object_store.into(), scheduler_config);
let file = scheduler.open_file(&path).await?;
let reader = Lancev2FileReader::try_open(file, None, Default::default()).await?;
let num_batches = reader.metadata().num_rows / (SHUFFLE_BATCH_SIZE as u64);
Expand All @@ -384,7 +382,7 @@ impl IvfShuffler {
let mut partition_sizes = vec![0; self.num_partitions as usize];
let scheduler = ScanScheduler::new(
Arc::new(object_store.clone()),
SchedulerConfig::fast_and_not_too_ram_intensive(),
SchedulerConfig::max_bandwidth(&object_store),
);

for &ShuffleInput {
Expand Down Expand Up @@ -524,10 +522,8 @@ impl IvfShuffler {
Self::process_batch_in_shuffle(batch, &mut partitioned_batches).await?;
}
} else {
let scheduler = ScanScheduler::new(
Arc::new(object_store),
SchedulerConfig::fast_and_not_too_ram_intensive(),
);
let scheduler_config = SchedulerConfig::max_bandwidth(&object_store);
let scheduler = ScanScheduler::new(Arc::new(object_store), scheduler_config);
let file = scheduler.open_file(&path).await?;
let reader = Lancev2FileReader::try_open(file, None, Default::default()).await?;
let mut stream = reader
Expand Down
6 changes: 2 additions & 4 deletions rust/lance-index/src/vector/v3/shuffler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,10 +223,8 @@ impl IvfShufflerReader {
output_dir: Path,
partition_sizes: Vec<usize>,
) -> Self {
let scheduler = ScanScheduler::new(
object_store,
SchedulerConfig::fast_and_not_too_ram_intensive(),
);
let scheduler_config = SchedulerConfig::max_bandwidth(&object_store);
let scheduler = ScanScheduler::new(object_store, scheduler_config);
Self {
scheduler,
output_dir,
Expand Down
Loading

0 comments on commit 30b3df7

Please sign in to comment.