From 83439ef5ce7834206f6e151254cf20cb6077ab00 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Tue, 5 Nov 2024 10:47:20 -0800 Subject: [PATCH] feat: add support for "take" operation to balanced storage (#3079) Adds support for "take" to datasets that have balanced storage Requires balanced storage datasets to use stable ids Refactors `RowAddress` slightly to remove `id` from the method names (e.g. `row_id` -> `row_offset`) to help avoid future confusion. Fixes an intermittent panic in `SharedStream` (this panic had been seen before but now that we are using `SharedStream` more it was showing up more frequently) Begins aligning on new terminology for balanced storage (the old "blob storage" terminology is confusing because other things are also called "blobs": * Each field has a storage class * There is (still) a storage class called "blob" for large fields * These fields are called "sibling fields" and "local/sibling" should be used when contrasting between fields with the default storage class and any other storage class. In a future PR I'll clean up the append/write impl to use this terminology Closes #3030 --- python/python/tests/test_balanced.py | 136 +++++++++++++ python/python/tests/test_blob.py | 65 ------- rust/lance-core/src/datatypes/schema.rs | 28 +++ rust/lance-core/src/utils/address.rs | 20 +- rust/lance-core/src/utils/futures.rs | 7 +- rust/lance-datafusion/src/projection.rs | 29 ++- rust/lance-index/src/scalar/flat.rs | 2 +- rust/lance-table/src/format/manifest.rs | 3 + rust/lance-table/src/rowids/index.rs | 2 +- rust/lance/Cargo.toml | 1 + rust/lance/src/dataset.rs | 212 ++++++++++++++++++++- rust/lance/src/dataset/blob.rs | 2 +- rust/lance/src/dataset/optimize.rs | 4 +- rust/lance/src/dataset/rowids.rs | 12 +- rust/lance/src/dataset/take.rs | 111 +++++++++-- rust/lance/src/dataset/write.rs | 7 + rust/lance/src/index/vector/ivf/builder.rs | 10 +- rust/lance/src/index/vector/pq.rs | 2 +- 18 files changed, 547 insertions(+), 106 deletions(-) create mode 100644 python/python/tests/test_balanced.py diff --git a/python/python/tests/test_balanced.py b/python/python/tests/test_balanced.py new file mode 100644 index 0000000000..59490cdfed --- /dev/null +++ b/python/python/tests/test_balanced.py @@ -0,0 +1,136 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright The Lance Authors + +import lance +import pyarrow as pa +import pytest + + +@pytest.fixture(scope="module") +def big_val(): + # 1 MiB per value + return b"0" * 1024 * 1024 + + +# 16 batches of 8 rows = 128 rows +def balanced_datagen(big_val): + for batch_idx in range(16): + start = batch_idx * 8 + end = start + 8 + values = pa.array([big_val for _ in range(start, end)], pa.large_binary()) + idx = pa.array(range(start, end), pa.uint64()) + table = pa.record_batch( + [values, idx], + schema=pa.schema( + [ + pa.field( + "blobs", + pa.large_binary(), + metadata={ + "lance-schema:storage-class": "blob", + }, + ), + pa.field("idx", pa.uint64()), + ] + ), + ) + yield table + + +@pytest.fixture +def balanced_dataset(tmp_path, big_val): + # 16 MiB per file, 128 total MiB, so we should have 8 blob files + # + # In addition, max_rows_per_file=64 means we should get 2 regular files + schema = next(iter(balanced_datagen(big_val))).schema + return lance.write_dataset( + balanced_datagen(big_val), + tmp_path / "test_ds", + max_bytes_per_file=16 * 1024 * 1024, + max_rows_per_file=64, + schema=schema, + ) + + +def test_append_then_take(balanced_dataset, tmp_path, big_val): + blob_dir = tmp_path / "test_ds" / "_blobs" / "data" + assert len(list(blob_dir.iterdir())) == 8 + + # A read will only return non-blob columns + assert balanced_dataset.to_table() == pa.table( + { + "idx": pa.array(range(128), pa.uint64()), + } + ) + + # Now verify we can append some data + ds = lance.write_dataset( + balanced_datagen(big_val), + tmp_path / "test_ds", + max_bytes_per_file=32 * 1024 * 1024, + schema=balanced_dataset.schema, + mode="append", + ) + + assert len(list(blob_dir.iterdir())) == 12 + + assert ds.to_table() == pa.table( + { + "idx": pa.array(list(range(128)) + list(range(128)), pa.uint64()), + } + ) + + # Verify we can take blob values + row_ids = ds.to_table(columns=[], with_row_id=True).column("_rowid") + + take_tbl = ds._take_rows(row_ids.to_pylist(), columns=["idx", "blobs"]) + + blobs = take_tbl.column("blobs") + for val in blobs: + assert val.as_py() == big_val + + +def test_delete(balanced_dataset): + # This will delete some of the first fragment (deletion vector) and + # the entire second fragment + balanced_dataset.delete("idx >= 40") + + row_ids = balanced_dataset.to_table(columns=[], with_row_id=True).column("_rowid") + + assert len(row_ids) == 40 + + assert balanced_dataset._take_rows( + row_ids.to_pylist(), columns=["idx"] + ) == pa.table( + { + "idx": pa.array(list(range(40)), pa.uint64()), + } + ) + + assert ( + len(balanced_dataset._take_rows(row_ids.to_pylist(), columns=["blobs"])) == 40 + ) + + assert len(balanced_dataset._take_rows([100], columns=["idx"])) == 0 + assert len(balanced_dataset._take_rows([100], columns=["blobs"])) == 0 + + assert len(balanced_dataset._take_rows(range(20, 80), columns=["idx"])) == 20 + assert len(balanced_dataset._take_rows(range(20, 80), columns=["blobs"])) == 20 + + +# TODO: Once https://github.com/lancedb/lance/pull/3041 merges we will +# want to test partial appends. We need to make sure an append of +# non-blob data is supported. In order to do this we need to make +# sure a blob tx is created that marks the row ids as used so that +# the two row id sequences stay in sync. +# +# def test_one_sided_append(balanced_dataset, tmp_path): +# # Write new data, but only to the idx column +# ds = lance.write_dataset( +# pa.table({"idx": pa.array(range(128, 256), pa.uint64())}), +# tmp_path / "test_ds", +# max_bytes_per_file=32 * 1024 * 1024, +# mode="append", +# ) + +# print(ds.to_table()) diff --git a/python/python/tests/test_blob.py b/python/python/tests/test_blob.py index 250c9d2ade..59da924d72 100644 --- a/python/python/tests/test_blob.py +++ b/python/python/tests/test_blob.py @@ -122,68 +122,3 @@ def test_take_deleted_blob(tmp_path, dataset_with_blobs): match="A take operation that includes row addresses must not target deleted", ): dataset_with_blobs.take_blobs(row_ids, "blobs") - - -def test_blob_storage_class(tmp_path): - # 1 MiB per value - big_val = "0" * 1024 * 1024 - - # 16 batches of 8 rows = 128 rows - def datagen(): - for batch_idx in range(16): - start = batch_idx * 8 - end = start + 8 - values = pa.array([big_val for _ in range(start, end)], pa.large_binary()) - idx = pa.array(range(start, end), pa.uint64()) - table = pa.record_batch( - [values, idx], - schema=pa.schema( - [ - pa.field( - "blobs", - pa.large_binary(), - metadata={ - "lance-schema:storage-class": "blob", - }, - ), - pa.field("idx", pa.uint64()), - ] - ), - ) - yield table - - schema = next(iter(datagen())).schema - ds = lance.write_dataset( - datagen(), - tmp_path / "test_ds", - max_bytes_per_file=16 * 1024 * 1024, - schema=schema, - ) - - # 16 MiB per file, 128 total MiB, so we should have 8 files - blob_dir = tmp_path / "test_ds" / "_blobs" / "data" - assert len(list(blob_dir.iterdir())) == 8 - - # A read will only return non-blob columns - assert ds.to_table() == pa.table( - { - "idx": pa.array(range(128), pa.uint64()), - } - ) - - # Now verify we can append some data - ds = lance.write_dataset( - datagen(), - tmp_path / "test_ds", - max_bytes_per_file=32 * 1024 * 1024, - schema=schema, - mode="append", - ) - - assert len(list(blob_dir.iterdir())) == 12 - - assert ds.to_table() == pa.table( - { - "idx": pa.array(list(range(128)) + list(range(128)), pa.uint64()), - } - ) diff --git a/rust/lance-core/src/datatypes/schema.rs b/rust/lance-core/src/datatypes/schema.rs index fdb481ceff..8c3077bb7d 100644 --- a/rust/lance-core/src/datatypes/schema.rs +++ b/rust/lance-core/src/datatypes/schema.rs @@ -155,6 +155,34 @@ impl Schema { } } + /// Splits the schema into two schemas, one with default storage class fields and the other with blob storage class fields. + /// If there are no blob storage class fields, the second schema will be `None`. + /// The order of fields is preserved. + pub fn partition_by_storage_class(&self) -> (Self, Option) { + let mut local_fields = Vec::with_capacity(self.fields.len()); + let mut sibling_fields = Vec::with_capacity(self.fields.len()); + for field in self.fields.iter() { + match field.storage_class() { + StorageClass::Default => local_fields.push(field.clone()), + StorageClass::Blob => sibling_fields.push(field.clone()), + } + } + ( + Self { + fields: local_fields, + metadata: self.metadata.clone(), + }, + if sibling_fields.is_empty() { + None + } else { + Some(Self { + fields: sibling_fields, + metadata: self.metadata.clone(), + }) + }, + ) + } + pub fn has_dictionary_types(&self) -> bool { self.fields.iter().any(|f| f.has_dictionary_types()) } diff --git a/rust/lance-core/src/utils/address.rs b/rust/lance-core/src/utils/address.rs index 526a3f755e..aa5ceaa720 100644 --- a/rust/lance-core/src/utils/address.rs +++ b/rust/lance-core/src/utils/address.rs @@ -13,19 +13,19 @@ impl RowAddress { // A row id that will never be used pub const TOMBSTONE_ROW: u64 = 0xffffffffffffffff; - pub fn new_from_id(row_id: u64) -> Self { - Self(row_id) + pub fn new_from_u64(row_addr: u64) -> Self { + Self(row_addr) } - pub fn new_from_parts(fragment_id: u32, row_id: u32) -> Self { - Self(((fragment_id as u64) << 32) | row_id as u64) + pub fn new_from_parts(fragment_id: u32, row_offset: u32) -> Self { + Self(((fragment_id as u64) << 32) | row_offset as u64) } pub fn first_row(fragment_id: u32) -> Self { Self::new_from_parts(fragment_id, 0) } - pub fn fragment_range(fragment_id: u32) -> Range { + pub fn address_range(fragment_id: u32) -> Range { u64::from(Self::first_row(fragment_id))..u64::from(Self::first_row(fragment_id + 1)) } @@ -33,7 +33,7 @@ impl RowAddress { (self.0 >> 32) as u32 } - pub fn row_id(&self) -> u32 { + pub fn row_offset(&self) -> u32 { self.0 as u32 } } @@ -44,6 +44,12 @@ impl From for u64 { } } +impl From for RowAddress { + fn from(row_id: u64) -> Self { + Self(row_id) + } +} + impl std::fmt::Debug for RowAddress { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { write!(f, "{}", self) // use Display @@ -52,6 +58,6 @@ impl std::fmt::Debug for RowAddress { impl std::fmt::Display for RowAddress { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "({}, {})", self.fragment_id(), self.row_id()) + write!(f, "({}, {})", self.fragment_id(), self.row_offset()) } } diff --git a/rust/lance-core/src/utils/futures.rs b/rust/lance-core/src/utils/futures.rs index 7c847124b8..9acce93ce2 100644 --- a/rust/lance-core/src/utils/futures.rs +++ b/rust/lance-core/src/utils/futures.rs @@ -121,7 +121,12 @@ impl<'a, T: Clone> Stream for SharedStream<'a, T> { if let Some(polling_side) = inner_state.polling.as_ref() { if *polling_side != self.side { // Another task is already polling the inner stream, so we don't need to do anything - debug_assert!(inner_state.waker.is_none()); + + // Per rust docs: + // Note that on multiple calls to poll, only the Waker from the Context + // passed to the most recent call should be scheduled to receive a wakeup. + // + // So it is safe to replace a potentially stale waker here. inner_state.waker = Some(cx.waker().clone()); return std::task::Poll::Pending; } diff --git a/rust/lance-datafusion/src/projection.rs b/rust/lance-datafusion/src/projection.rs index 5951766127..d3abca23b7 100644 --- a/rust/lance-datafusion/src/projection.rs +++ b/rust/lance-datafusion/src/projection.rs @@ -27,11 +27,15 @@ use crate::{ planner::Planner, }; +#[derive(Debug)] pub struct ProjectionPlan { /// The physical schema (before dynamic projection) that must be loaded from the dataset pub physical_schema: Arc, pub physical_df_schema: Arc, + /// The schema of the sibling fields that must be loaded + pub sibling_schema: Option>, + /// The expressions for all the columns to be in the output /// Note: this doesn't include _distance, and _rowid pub requested_output_expr: Option>, @@ -98,7 +102,9 @@ impl ProjectionPlan { output.insert(output_name.as_ref().to_string(), expr); } - let mut physical_schema = Arc::new(base_schema.project(&physical_cols)?); + let physical_schema = Arc::new(base_schema.project(&physical_cols)?); + let (physical_schema, sibling_schema) = physical_schema.partition_by_storage_class(); + let mut physical_schema = Arc::new(physical_schema); if !load_blobs { physical_schema = Self::unload_blobs(&physical_schema); } @@ -112,22 +118,37 @@ impl ProjectionPlan { let physical_df_schema = Arc::new(DFSchema::try_from(physical_arrow_schema).unwrap()); Ok(Self { physical_schema, + sibling_schema: sibling_schema.map(Arc::new), physical_df_schema, requested_output_expr, }) } pub fn new_empty(base_schema: Arc, load_blobs: bool) -> Self { - let base_schema = if !load_blobs { + let (physical_schema, sibling_schema) = base_schema.partition_by_storage_class(); + Self::inner_new( + Arc::new(physical_schema), + load_blobs, + sibling_schema.map(Arc::new), + ) + } + + pub fn inner_new( + base_schema: Arc, + load_blobs: bool, + sibling_schema: Option>, + ) -> Self { + let physical_schema = if !load_blobs { Self::unload_blobs(&base_schema) } else { base_schema }; - let physical_arrow_schema = ArrowSchema::from(base_schema.as_ref()); + let physical_arrow_schema = ArrowSchema::from(physical_schema.as_ref()); let physical_df_schema = Arc::new(DFSchema::try_from(physical_arrow_schema).unwrap()); Self { - physical_schema: base_schema, + physical_schema, + sibling_schema, physical_df_schema, requested_output_expr: None, } diff --git a/rust/lance-index/src/scalar/flat.rs b/rust/lance-index/src/scalar/flat.rs index aa1ef75789..66a69e95e5 100644 --- a/rust/lance-index/src/scalar/flat.rs +++ b/rust/lance-index/src/scalar/flat.rs @@ -182,7 +182,7 @@ impl Index for FlatIndex { .ids() .as_primitive::() .iter() - .map(|row_id| RowAddress::new_from_id(row_id.unwrap()).fragment_id()) + .map(|row_id| RowAddress::from(row_id.unwrap()).fragment_id()) .collect::>(); frag_ids.sort(); frag_ids.dedup(); diff --git a/rust/lance-table/src/format/manifest.rs b/rust/lance-table/src/format/manifest.rs index 45244c1d11..0546e040f4 100644 --- a/rust/lance-table/src/format/manifest.rs +++ b/rust/lance-table/src/format/manifest.rs @@ -46,6 +46,9 @@ pub struct Manifest { pub writer_version: Option, /// Fragments, the pieces to build the dataset. + /// + /// This list is stored in order, sorted by fragment id. However, the fragment id + /// sequence may have gaps. pub fragments: Arc>, /// The file position of the version aux data. diff --git a/rust/lance-table/src/rowids/index.rs b/rust/lance-table/src/rowids/index.rs index 622cbcf91a..e9d954f4d6 100644 --- a/rust/lance-table/src/rowids/index.rs +++ b/rust/lance-table/src/rowids/index.rs @@ -56,7 +56,7 @@ impl RowIdIndex { let (row_id_segment, address_segment) = self.0.get(&row_id)?; let pos = row_id_segment.position(row_id)?; let address = address_segment.get(pos)?; - Some(RowAddress::new_from_id(address)) + Some(RowAddress::from(address)) } } diff --git a/rust/lance/Cargo.toml b/rust/lance/Cargo.toml index ce95a41a8d..f4de96c82a 100644 --- a/rust/lance/Cargo.toml +++ b/rust/lance/Cargo.toml @@ -65,6 +65,7 @@ log = { workspace = true } serde_json = { workspace = true } serde = { workspace = true } moka.workspace = true +permutation = { version = "0.4.0" } tantivy.workspace = true tfrecord = { version = "0.15.0", optional = true, features = ["async"] } aws-sdk-dynamodb = { workspace = true, optional = true } diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index 71902ca4da..7668156888 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -11,7 +11,9 @@ use deepsize::DeepSizeOf; use futures::future::BoxFuture; use futures::stream::{self, StreamExt, TryStreamExt}; use futures::{FutureExt, Stream}; +use itertools::Itertools; use lance_core::datatypes::NullabilityComparison; +use lance_core::utils::address::RowAddress; use lance_core::utils::tokio::get_num_compute_intensive_cpus; use lance_core::{datatypes::SchemaCompareOptions, traits::DatasetTakeRows}; use lance_datafusion::projection::ProjectionPlan; @@ -30,10 +32,12 @@ use lance_table::io::commit::{ ManifestLocation, ManifestNamingScheme, }; use lance_table::io::manifest::{read_manifest, write_manifest}; -use log::warn; +use log::{info, warn}; use object_store::path::Path; use prost::Message; +use rowids::get_row_id_index; use snafu::{location, Location}; +use std::borrow::Cow; use std::collections::{BTreeMap, HashMap}; use std::ops::Range; use std::pin::Pin; @@ -551,6 +555,16 @@ impl Dataset { // append + input schema different from existing schema = error if matches!(params.mode, WriteMode::Append) { if let Some(d) = dataset.as_ref() { + // If the dataset is already using (or not using) move stable row ids, we need to match + // and ignore whatever the user provided as input + if params.enable_move_stable_row_ids != d.manifest.uses_move_stable_row_ids() { + info!( + "Ignoring user provided move stable row ids setting of {}, dataset already has it set to {}", + params.enable_move_stable_row_ids, + d.manifest.uses_move_stable_row_ids() + ); + params.enable_move_stable_row_ids = d.manifest.uses_move_stable_row_ids(); + } let m = d.manifest.as_ref(); schema.check_compatible( &m.schema, @@ -567,6 +581,15 @@ impl Dataset { } } + // If we are writing a dataset with non-default storage, we need to enable move stable row ids + if dataset.is_none() + && !params.enable_move_stable_row_ids + && schema.fields.iter().any(|f| !f.is_default_storage()) + { + info!("Enabling move stable row ids because non-default storage is used"); + params.enable_move_stable_row_ids = true; + } + let manifest_naming_scheme = if let Some(d) = dataset.as_ref() { d.manifest_naming_scheme } else if params.enable_v2_manifest_paths { @@ -1437,6 +1460,173 @@ impl Dataset { &self.manifest.fragments } + // Gets a filtered list of fragments from ids in O(N) time instead of using + // `get_fragment` which would require O(N^2) time. + fn get_frags_from_ordered_ids(&self, ordered_ids: &[u32]) -> Vec> { + let mut fragments = Vec::with_capacity(ordered_ids.len()); + let mut id_iter = ordered_ids.iter(); + let mut id = id_iter.next(); + // This field is just used to assert the ids are in order + let mut last_id: i64 = -1; + for frag in self.manifest.fragments.iter() { + let mut the_id = if let Some(id) = id { *id } else { break }; + // Assert the given ids are, in fact, in order + assert!(the_id as i64 > last_id); + // For any IDs we've passed we can assume that no fragment exists any longer + // with that ID. + while the_id < frag.id as u32 { + fragments.push(None); + last_id = the_id as i64; + id = id_iter.next(); + the_id = if let Some(id) = id { *id } else { break }; + } + + if the_id == frag.id as u32 { + fragments.push(Some(FileFragment::new( + Arc::new(self.clone()), + frag.clone(), + ))); + last_id = the_id as i64; + id = id_iter.next(); + } + } + fragments + } + + // This method filters deleted items from `addr_or_ids` using `addrs` as a reference + async fn filter_addr_or_ids(&self, addr_or_ids: &[u64], addrs: &[u64]) -> Result> { + if addrs.is_empty() { + return Ok(Vec::new()); + } + + let mut perm = permutation::sort(addrs); + // First we sort the addrs, then we transform from Vec to Vec> and then + // we un-sort and use the None values to filter `addr_or_ids` + let sorted_addrs = perm.apply_slice(addrs); + + // Only collect deletion vectors for the fragments referenced by the given addrs + let referenced_frag_ids = sorted_addrs + .iter() + .map(|addr| RowAddress::from(*addr).fragment_id()) + .dedup() + .collect::>(); + let frags = self.get_frags_from_ordered_ids(&referenced_frag_ids); + let dv_futs = frags + .iter() + .map(|frag| { + if let Some(frag) = frag { + frag.get_deletion_vector().boxed() + } else { + std::future::ready(Ok(None)).boxed() + } + }) + .collect::>(); + let dvs = stream::iter(dv_futs) + .buffered(self.object_store.io_parallelism()) + .try_collect::>() + .await?; + + // Iterate through the sorted addresses and sorted fragments (and sorted deletion vectors) + // and filter out addresses that have been deleted + let mut filtered_sorted_ids = Vec::with_capacity(sorted_addrs.len()); + let mut sorted_addr_iter = sorted_addrs.into_iter().map(RowAddress::from); + let mut next_addr = sorted_addr_iter.next().unwrap(); + let mut exhausted = false; + + for frag_dv in frags.iter().zip(dvs).zip(referenced_frag_ids.iter()) { + let ((frag, dv), frag_id) = frag_dv; + if frag.is_some() { + // Frag exists + if let Some(dv) = dv.as_ref() { + // Deletion vector exists, scan DV + for deleted in dv.to_sorted_iter() { + while next_addr.fragment_id() == *frag_id + && next_addr.row_offset() < deleted + { + filtered_sorted_ids.push(Some(u64::from(next_addr))); + if let Some(next) = sorted_addr_iter.next() { + next_addr = next; + } else { + exhausted = true; + break; + } + } + if exhausted { + break; + } + if next_addr.fragment_id() != *frag_id { + break; + } + if next_addr.row_offset() == deleted { + filtered_sorted_ids.push(None); + if let Some(next) = sorted_addr_iter.next() { + next_addr = next; + } else { + exhausted = true; + break; + } + } + } + } + if exhausted { + break; + } + // Either no deletion vector, or we've exhausted it, keep everything else + // in this frag + while next_addr.fragment_id() == *frag_id { + filtered_sorted_ids.push(Some(u64::from(next_addr))); + if let Some(next) = sorted_addr_iter.next() { + next_addr = next; + } else { + break; + } + } + } else { + // Frag doesn't exist (possibly deleted), delete all items + while next_addr.fragment_id() == *frag_id { + filtered_sorted_ids.push(None); + if let Some(next) = sorted_addr_iter.next() { + next_addr = next; + } else { + break; + } + } + } + } + + // filtered_sorted_ids is now a Vec with the same size as sorted_addrs, but with None + // values where the corresponding address was deleted. We now need to un-sort it and + // filter out the deleted addresses. + perm.apply_inv_slice_in_place(&mut filtered_sorted_ids); + Ok(addr_or_ids + .iter() + .zip(filtered_sorted_ids) + .filter_map(|(addr_or_id, maybe_addr)| maybe_addr.map(|_| *addr_or_id)) + .collect()) + } + + // Leaving this here so it is more obvious to future readers that we can do this and + // someone doesn't go off and create a new function to do this. Delete this comment + // if you use this method. + #[allow(unused)] + pub(crate) async fn filter_deleted_addresses(&self, addrs: &[u64]) -> Result> { + self.filter_addr_or_ids(addrs, addrs).await + } + + pub(crate) async fn filter_deleted_ids(&self, ids: &[u64]) -> Result> { + let addresses = if let Some(row_id_index) = get_row_id_index(self).await? { + let addresses = ids + .iter() + .filter_map(|id| row_id_index.get(*id).map(|address| address.into())) + .collect::>(); + Cow::Owned(addresses) + } else { + Cow::Borrowed(ids) + }; + + self.filter_addr_or_ids(ids, &addresses).await + } + /// Gets the number of files that are so small they don't even have a full /// group. These are considered too small because reading many of them is /// much less efficient than reading a single file because the separate files @@ -1474,6 +1664,26 @@ impl Dataset { } } + // Fragments are sorted in increasing fragment id order + self.manifest + .fragments + .iter() + .map(|f| f.id) + .try_fold(0, |prev, id| { + if id < prev { + Err(Error::corrupt_file( + self.base.clone(), + format!( + "Fragment ids are not sorted in increasing fragment-id order. Found {} after {} in dataset {:?}", + id, prev, self.base + ), + location!(), + )) + } else { + Ok(id) + } + })?; + // All fragments have equal lengths futures::stream::iter(self.get_fragments()) .map(|f| async move { f.validate().await }) diff --git a/rust/lance/src/dataset/blob.rs b/rust/lance/src/dataset/blob.rs index 839104054a..67f8c7081b 100644 --- a/rust/lance/src/dataset/blob.rs +++ b/rust/lance/src/dataset/blob.rs @@ -58,7 +58,7 @@ impl BlobFile { position: u64, size: u64, ) -> Self { - let frag_id = RowAddress::new_from_id(row_addr).fragment_id(); + let frag_id = RowAddress::from(row_addr).fragment_id(); let frag = dataset.get_fragment(frag_id as usize).unwrap(); let data_file = frag.data_file_for_field(field_id).unwrap().path.clone(); let data_file = dataset.data_dir().child(data_file); diff --git a/rust/lance/src/dataset/optimize.rs b/rust/lance/src/dataset/optimize.rs index 8f2a5b12be..0f4037ed2b 100644 --- a/rust/lance/src/dataset/optimize.rs +++ b/rust/lance/src/dataset/optimize.rs @@ -1025,8 +1025,8 @@ mod tests { .map(|key| { format!( "{}:{:?}", - RowAddress::new_from_id(*key), - map[key].map(RowAddress::new_from_id) + RowAddress::from(*key), + map[key].map(RowAddress::from) ) }) .collect::>() diff --git a/rust/lance/src/dataset/rowids.rs b/rust/lance/src/dataset/rowids.rs index f3feb7b13b..e8ab5f64e0 100644 --- a/rust/lance/src/dataset/rowids.rs +++ b/rust/lance/src/dataset/rowids.rs @@ -157,7 +157,8 @@ mod test { .unwrap(); assert!(!dataset.manifest().uses_move_stable_row_ids()); - // Trying to append without stable row ids should fail. + // Trying to append without stable row ids should pass (a warning is emitted) but should not + // affect the move_stable_row_ids setting. let write_params = WriteParams { enable_move_stable_row_ids: true, mode: WriteMode::Append, @@ -165,11 +166,10 @@ mod test { }; let reader = RecordBatchIterator::new(vec![batch.clone()].into_iter().map(Ok), batch.schema()); - let result = Dataset::write(reader, tmp_path, Some(write_params)).await; - assert!(result.is_err()); - assert!(matches!(result.unwrap_err(), - Error::NotSupported { source, .. } - if source.to_string().contains("Cannot enable stable row ids on existing dataset"))); + let dataset = Dataset::write(reader, tmp_path, Some(write_params)) + .await + .unwrap(); + assert!(!dataset.manifest().uses_move_stable_row_ids()); } #[tokio::test] diff --git a/rust/lance/src/dataset/take.rs b/rust/lance/src/dataset/take.rs index 111da60681..f3ff60dd3f 100644 --- a/rust/lance/src/dataset/take.rs +++ b/rust/lance/src/dataset/take.rs @@ -148,13 +148,10 @@ pub async fn take( } /// Take rows by the internal ROW ids. -async fn take_rows(builder: TakeBuilder) -> Result { - if builder.row_ids.is_empty() { - return Ok(RecordBatch::new_empty(Arc::new( - builder.projection.output_schema()?, - ))); - } - +async fn do_take_rows( + builder: TakeBuilder, + projection: Arc, +) -> Result { let row_addrs = if let Some(row_id_index) = get_row_id_index(&builder.dataset).await? { let addresses = builder .row_ids @@ -166,7 +163,13 @@ async fn take_rows(builder: TakeBuilder) -> Result { builder.row_ids }; - let projection = Arc::new(builder.projection); + if row_addrs.is_empty() { + // It is possible that `row_id_index` returns None when a fragment has been wholly deleted + return Ok(RecordBatch::new_empty(Arc::new( + builder.projection.output_schema()?, + ))); + } + let row_addr_stats = check_row_addrs(&row_addrs); // This method is mostly to annotate the send bound to avoid the @@ -275,9 +278,9 @@ async fn take_rows(builder: TakeBuilder) -> Result { // Group ROW Ids by the fragment let mut row_addrs_per_fragment: BTreeMap> = BTreeMap::new(); sorted_row_addrs.iter().for_each(|row_addr| { - let row_addr = RowAddress::new_from_id(*row_addr); + let row_addr = RowAddress::from(*row_addr); let fragment_id = row_addr.fragment_id(); - let offset = row_addr.row_id(); + let offset = row_addr.row_offset(); row_addrs_per_fragment .entry(fragment_id) .and_modify(|v| v.push(offset)) @@ -359,6 +362,89 @@ async fn take_rows(builder: TakeBuilder) -> Result { } } +// Given a local take and a sibling take this function zips the results together +async fn zip_takes( + local: RecordBatch, + sibling: RecordBatch, + orig_projection_plan: &ProjectionPlan, +) -> Result { + let mut all_cols = Vec::with_capacity(local.num_columns() + sibling.num_columns()); + all_cols.extend(local.columns().iter().cloned()); + all_cols.extend(sibling.columns().iter().cloned()); + + let mut all_fields = Vec::with_capacity(local.num_columns() + sibling.num_columns()); + all_fields.extend(local.schema().fields().iter().cloned()); + all_fields.extend(sibling.schema().fields().iter().cloned()); + + let all_batch = RecordBatch::try_new(Arc::new(ArrowSchema::new(all_fields)), all_cols).unwrap(); + + orig_projection_plan.project_batch(all_batch).await +} + +async fn take_rows(builder: TakeBuilder) -> Result { + if builder.row_ids.is_empty() { + return Ok(RecordBatch::new_empty(Arc::new( + builder.projection.output_schema()?, + ))); + } + + let projection = builder.projection.clone(); + let sibling_ds: Arc; + + // If we have sibling columns then we load those in parallel to the local + // columns and zip the results together. + let sibling_take = if let Some(sibling_schema) = projection.sibling_schema.as_ref() { + let filtered_row_ids = builder.dataset.filter_deleted_ids(&builder.row_ids).await?; + if filtered_row_ids.is_empty() { + return Ok(RecordBatch::new_empty(Arc::new( + builder.projection.output_schema()?, + ))); + } + sibling_ds = builder + .dataset + .blobs_dataset() + .await? + .ok_or_else(|| Error::Internal { + message: "schema referenced sibling columns but there was no blob dataset".into(), + location: location!(), + })?; + // The sibling take only takes valid row ids and sibling columns + let mut builder = builder.clone(); + builder.dataset = sibling_ds; + builder.row_ids = filtered_row_ids; + let blobs_projection = Arc::new(ProjectionPlan::inner_new( + sibling_schema.clone(), + false, + None, + )); + Some(async move { do_take_rows(builder, blobs_projection).await }) + } else { + None + }; + + if let Some(sibling_take) = sibling_take { + if projection.physical_schema.fields.is_empty() { + // Nothing we need from local dataset, just take from blob dataset + sibling_take.await + } else { + // Need to take from both and zip together + let local_projection = ProjectionPlan { + physical_df_schema: projection.physical_df_schema.clone(), + physical_schema: projection.physical_schema.clone(), + sibling_schema: None, + // These will be applied in zip_takes + requested_output_expr: None, + }; + let local_take = do_take_rows(builder, Arc::new(local_projection)); + let (local, blobs) = futures::join!(local_take, sibling_take); + + zip_takes(local?, blobs?, &projection).await + } + } else { + do_take_rows(builder, projection).await + } +} + /// Get a stream of batches based on iterator of ranges of row numbers. /// /// This is an experimental API. It may change at any time. @@ -420,10 +506,11 @@ fn check_row_addrs(row_ids: &[u64]) -> RowAddressStats { } /// Builder for the `take` operation. +#[derive(Clone, Debug)] pub struct TakeBuilder { dataset: Arc, row_ids: Vec, - projection: ProjectionPlan, + projection: Arc, with_row_address: bool, } @@ -436,7 +523,7 @@ impl TakeBuilder { ) -> Result { Ok(Self { row_ids, - projection: projection.into_projection_plan(dataset.schema())?, + projection: Arc::new(projection.into_projection_plan(dataset.schema())?), dataset, with_row_address: false, }) diff --git a/rust/lance/src/dataset/write.rs b/rust/lance/src/dataset/write.rs index d95784bae8..4e9b42ef20 100644 --- a/rust/lance/src/dataset/write.rs +++ b/rust/lance/src/dataset/write.rs @@ -361,6 +361,13 @@ pub async fn write_fragments_internal( ..Default::default() }; + if blob_data.is_some() && !params.enable_move_stable_row_ids { + return Err(Error::invalid_input( + "The blob storage class requires move stable row ids", + location!(), + )); + } + let frag_schema = schema.retain_storage_class(StorageClass::Default); let fragments_fut = do_write_fragments( object_store.clone(), diff --git a/rust/lance/src/index/vector/ivf/builder.rs b/rust/lance/src/index/vector/ivf/builder.rs index 07025a3132..02df4cc0b3 100644 --- a/rust/lance/src/index/vector/ivf/builder.rs +++ b/rust/lance/src/index/vector/ivf/builder.rs @@ -132,8 +132,9 @@ async fn load_precomputed_partitions( .iter() .zip(partitions.values().iter()) .for_each(|(row_id, partition)| { - let addr = RowAddress::new_from_id(*row_id); - lookup[addr.fragment_id() as usize][addr.row_id() as usize] = *partition as i32; + let addr = RowAddress::from(*row_id); + lookup[addr.fragment_id() as usize][addr.row_offset() as usize] = + *partition as i32; }); async move { Ok(lookup) } }) @@ -157,8 +158,9 @@ fn add_precomputed_partitions( .values() .iter() .filter_map(|row_id| { - let addr = RowAddress::new_from_id(*row_id); - let part_id = partition_map[addr.fragment_id() as usize][addr.row_id() as usize]; + let addr = RowAddress::from(*row_id); + let part_id = + partition_map[addr.fragment_id() as usize][addr.row_offset() as usize]; if part_id < 0 { None } else { diff --git a/rust/lance/src/index/vector/pq.rs b/rust/lance/src/index/vector/pq.rs index 8039e621c6..9aac8424c1 100644 --- a/rust/lance/src/index/vector/pq.rs +++ b/rust/lance/src/index/vector/pq.rs @@ -156,7 +156,7 @@ impl Index for PQIndex { let mut frag_ids = row_ids .values() .iter() - .map(|&row_id| RowAddress::new_from_id(row_id).fragment_id()) + .map(|&row_id| RowAddress::from(row_id).fragment_id()) .collect::>(); frag_ids.sort(); frag_ids.dedup();