Skip to content

Commit

Permalink
feat: add support for "take" operation to balanced storage (#3079)
Browse files Browse the repository at this point in the history
Adds support for "take" to datasets that have balanced storage

Requires balanced storage datasets to use stable ids

Refactors `RowAddress` slightly to remove `id` from the method names
(e.g. `row_id` -> `row_offset`) to help avoid future confusion.

Fixes an intermittent panic in `SharedStream` (this panic had been seen
before but now that we are using `SharedStream` more it was showing up
more frequently)

Begins aligning on new terminology for balanced storage (the old "blob
storage" terminology is confusing because other things are also called
"blobs":
* Each field has a storage class
* There is (still) a storage class called "blob" for large fields
* These fields are called "sibling fields" and "local/sibling" should be
used when contrasting between fields with the default storage class and
any other storage class. In a future PR I'll clean up the append/write
impl to use this terminology

Closes #3030
  • Loading branch information
westonpace authored Nov 5, 2024
1 parent b7663fb commit 83439ef
Show file tree
Hide file tree
Showing 18 changed files with 547 additions and 106 deletions.
136 changes: 136 additions & 0 deletions python/python/tests/test_balanced.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright The Lance Authors

import lance
import pyarrow as pa
import pytest


@pytest.fixture(scope="module")
def big_val():
# 1 MiB per value
return b"0" * 1024 * 1024


# 16 batches of 8 rows = 128 rows
def balanced_datagen(big_val):
for batch_idx in range(16):
start = batch_idx * 8
end = start + 8
values = pa.array([big_val for _ in range(start, end)], pa.large_binary())
idx = pa.array(range(start, end), pa.uint64())
table = pa.record_batch(
[values, idx],
schema=pa.schema(
[
pa.field(
"blobs",
pa.large_binary(),
metadata={
"lance-schema:storage-class": "blob",
},
),
pa.field("idx", pa.uint64()),
]
),
)
yield table


@pytest.fixture
def balanced_dataset(tmp_path, big_val):
# 16 MiB per file, 128 total MiB, so we should have 8 blob files
#
# In addition, max_rows_per_file=64 means we should get 2 regular files
schema = next(iter(balanced_datagen(big_val))).schema
return lance.write_dataset(
balanced_datagen(big_val),
tmp_path / "test_ds",
max_bytes_per_file=16 * 1024 * 1024,
max_rows_per_file=64,
schema=schema,
)


def test_append_then_take(balanced_dataset, tmp_path, big_val):
blob_dir = tmp_path / "test_ds" / "_blobs" / "data"
assert len(list(blob_dir.iterdir())) == 8

# A read will only return non-blob columns
assert balanced_dataset.to_table() == pa.table(
{
"idx": pa.array(range(128), pa.uint64()),
}
)

# Now verify we can append some data
ds = lance.write_dataset(
balanced_datagen(big_val),
tmp_path / "test_ds",
max_bytes_per_file=32 * 1024 * 1024,
schema=balanced_dataset.schema,
mode="append",
)

assert len(list(blob_dir.iterdir())) == 12

assert ds.to_table() == pa.table(
{
"idx": pa.array(list(range(128)) + list(range(128)), pa.uint64()),
}
)

# Verify we can take blob values
row_ids = ds.to_table(columns=[], with_row_id=True).column("_rowid")

take_tbl = ds._take_rows(row_ids.to_pylist(), columns=["idx", "blobs"])

blobs = take_tbl.column("blobs")
for val in blobs:
assert val.as_py() == big_val


def test_delete(balanced_dataset):
# This will delete some of the first fragment (deletion vector) and
# the entire second fragment
balanced_dataset.delete("idx >= 40")

row_ids = balanced_dataset.to_table(columns=[], with_row_id=True).column("_rowid")

assert len(row_ids) == 40

assert balanced_dataset._take_rows(
row_ids.to_pylist(), columns=["idx"]
) == pa.table(
{
"idx": pa.array(list(range(40)), pa.uint64()),
}
)

assert (
len(balanced_dataset._take_rows(row_ids.to_pylist(), columns=["blobs"])) == 40
)

assert len(balanced_dataset._take_rows([100], columns=["idx"])) == 0
assert len(balanced_dataset._take_rows([100], columns=["blobs"])) == 0

assert len(balanced_dataset._take_rows(range(20, 80), columns=["idx"])) == 20
assert len(balanced_dataset._take_rows(range(20, 80), columns=["blobs"])) == 20


# TODO: Once https://github.com/lancedb/lance/pull/3041 merges we will
# want to test partial appends. We need to make sure an append of
# non-blob data is supported. In order to do this we need to make
# sure a blob tx is created that marks the row ids as used so that
# the two row id sequences stay in sync.
#
# def test_one_sided_append(balanced_dataset, tmp_path):
# # Write new data, but only to the idx column
# ds = lance.write_dataset(
# pa.table({"idx": pa.array(range(128, 256), pa.uint64())}),
# tmp_path / "test_ds",
# max_bytes_per_file=32 * 1024 * 1024,
# mode="append",
# )

# print(ds.to_table())
65 changes: 0 additions & 65 deletions python/python/tests/test_blob.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,68 +122,3 @@ def test_take_deleted_blob(tmp_path, dataset_with_blobs):
match="A take operation that includes row addresses must not target deleted",
):
dataset_with_blobs.take_blobs(row_ids, "blobs")


def test_blob_storage_class(tmp_path):
# 1 MiB per value
big_val = "0" * 1024 * 1024

# 16 batches of 8 rows = 128 rows
def datagen():
for batch_idx in range(16):
start = batch_idx * 8
end = start + 8
values = pa.array([big_val for _ in range(start, end)], pa.large_binary())
idx = pa.array(range(start, end), pa.uint64())
table = pa.record_batch(
[values, idx],
schema=pa.schema(
[
pa.field(
"blobs",
pa.large_binary(),
metadata={
"lance-schema:storage-class": "blob",
},
),
pa.field("idx", pa.uint64()),
]
),
)
yield table

schema = next(iter(datagen())).schema
ds = lance.write_dataset(
datagen(),
tmp_path / "test_ds",
max_bytes_per_file=16 * 1024 * 1024,
schema=schema,
)

# 16 MiB per file, 128 total MiB, so we should have 8 files
blob_dir = tmp_path / "test_ds" / "_blobs" / "data"
assert len(list(blob_dir.iterdir())) == 8

# A read will only return non-blob columns
assert ds.to_table() == pa.table(
{
"idx": pa.array(range(128), pa.uint64()),
}
)

# Now verify we can append some data
ds = lance.write_dataset(
datagen(),
tmp_path / "test_ds",
max_bytes_per_file=32 * 1024 * 1024,
schema=schema,
mode="append",
)

assert len(list(blob_dir.iterdir())) == 12

assert ds.to_table() == pa.table(
{
"idx": pa.array(list(range(128)) + list(range(128)), pa.uint64()),
}
)
28 changes: 28 additions & 0 deletions rust/lance-core/src/datatypes/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,34 @@ impl Schema {
}
}

/// Splits the schema into two schemas, one with default storage class fields and the other with blob storage class fields.
/// If there are no blob storage class fields, the second schema will be `None`.
/// The order of fields is preserved.
pub fn partition_by_storage_class(&self) -> (Self, Option<Self>) {
let mut local_fields = Vec::with_capacity(self.fields.len());
let mut sibling_fields = Vec::with_capacity(self.fields.len());
for field in self.fields.iter() {
match field.storage_class() {
StorageClass::Default => local_fields.push(field.clone()),
StorageClass::Blob => sibling_fields.push(field.clone()),
}
}
(
Self {
fields: local_fields,
metadata: self.metadata.clone(),
},
if sibling_fields.is_empty() {
None
} else {
Some(Self {
fields: sibling_fields,
metadata: self.metadata.clone(),
})
},
)
}

pub fn has_dictionary_types(&self) -> bool {
self.fields.iter().any(|f| f.has_dictionary_types())
}
Expand Down
20 changes: 13 additions & 7 deletions rust/lance-core/src/utils/address.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,27 +13,27 @@ impl RowAddress {
// A row id that will never be used
pub const TOMBSTONE_ROW: u64 = 0xffffffffffffffff;

pub fn new_from_id(row_id: u64) -> Self {
Self(row_id)
pub fn new_from_u64(row_addr: u64) -> Self {
Self(row_addr)
}

pub fn new_from_parts(fragment_id: u32, row_id: u32) -> Self {
Self(((fragment_id as u64) << 32) | row_id as u64)
pub fn new_from_parts(fragment_id: u32, row_offset: u32) -> Self {
Self(((fragment_id as u64) << 32) | row_offset as u64)
}

pub fn first_row(fragment_id: u32) -> Self {
Self::new_from_parts(fragment_id, 0)
}

pub fn fragment_range(fragment_id: u32) -> Range<u64> {
pub fn address_range(fragment_id: u32) -> Range<u64> {
u64::from(Self::first_row(fragment_id))..u64::from(Self::first_row(fragment_id + 1))
}

pub fn fragment_id(&self) -> u32 {
(self.0 >> 32) as u32
}

pub fn row_id(&self) -> u32 {
pub fn row_offset(&self) -> u32 {
self.0 as u32
}
}
Expand All @@ -44,6 +44,12 @@ impl From<RowAddress> for u64 {
}
}

impl From<u64> for RowAddress {
fn from(row_id: u64) -> Self {
Self(row_id)
}
}

impl std::fmt::Debug for RowAddress {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "{}", self) // use Display
Expand All @@ -52,6 +58,6 @@ impl std::fmt::Debug for RowAddress {

impl std::fmt::Display for RowAddress {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "({}, {})", self.fragment_id(), self.row_id())
write!(f, "({}, {})", self.fragment_id(), self.row_offset())
}
}
7 changes: 6 additions & 1 deletion rust/lance-core/src/utils/futures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,12 @@ impl<'a, T: Clone> Stream for SharedStream<'a, T> {
if let Some(polling_side) = inner_state.polling.as_ref() {
if *polling_side != self.side {
// Another task is already polling the inner stream, so we don't need to do anything
debug_assert!(inner_state.waker.is_none());

// Per rust docs:
// Note that on multiple calls to poll, only the Waker from the Context
// passed to the most recent call should be scheduled to receive a wakeup.
//
// So it is safe to replace a potentially stale waker here.
inner_state.waker = Some(cx.waker().clone());
return std::task::Poll::Pending;
}
Expand Down
29 changes: 25 additions & 4 deletions rust/lance-datafusion/src/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,15 @@ use crate::{
planner::Planner,
};

#[derive(Debug)]
pub struct ProjectionPlan {
/// The physical schema (before dynamic projection) that must be loaded from the dataset
pub physical_schema: Arc<Schema>,
pub physical_df_schema: Arc<DFSchema>,

/// The schema of the sibling fields that must be loaded
pub sibling_schema: Option<Arc<Schema>>,

/// The expressions for all the columns to be in the output
/// Note: this doesn't include _distance, and _rowid
pub requested_output_expr: Option<Vec<(Expr, String)>>,
Expand Down Expand Up @@ -98,7 +102,9 @@ impl ProjectionPlan {
output.insert(output_name.as_ref().to_string(), expr);
}

let mut physical_schema = Arc::new(base_schema.project(&physical_cols)?);
let physical_schema = Arc::new(base_schema.project(&physical_cols)?);
let (physical_schema, sibling_schema) = physical_schema.partition_by_storage_class();
let mut physical_schema = Arc::new(physical_schema);
if !load_blobs {
physical_schema = Self::unload_blobs(&physical_schema);
}
Expand All @@ -112,22 +118,37 @@ impl ProjectionPlan {
let physical_df_schema = Arc::new(DFSchema::try_from(physical_arrow_schema).unwrap());
Ok(Self {
physical_schema,
sibling_schema: sibling_schema.map(Arc::new),
physical_df_schema,
requested_output_expr,
})
}

pub fn new_empty(base_schema: Arc<Schema>, load_blobs: bool) -> Self {
let base_schema = if !load_blobs {
let (physical_schema, sibling_schema) = base_schema.partition_by_storage_class();
Self::inner_new(
Arc::new(physical_schema),
load_blobs,
sibling_schema.map(Arc::new),
)
}

pub fn inner_new(
base_schema: Arc<Schema>,
load_blobs: bool,
sibling_schema: Option<Arc<Schema>>,
) -> Self {
let physical_schema = if !load_blobs {
Self::unload_blobs(&base_schema)
} else {
base_schema
};

let physical_arrow_schema = ArrowSchema::from(base_schema.as_ref());
let physical_arrow_schema = ArrowSchema::from(physical_schema.as_ref());
let physical_df_schema = Arc::new(DFSchema::try_from(physical_arrow_schema).unwrap());
Self {
physical_schema: base_schema,
physical_schema,
sibling_schema,
physical_df_schema,
requested_output_expr: None,
}
Expand Down
2 changes: 1 addition & 1 deletion rust/lance-index/src/scalar/flat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ impl Index for FlatIndex {
.ids()
.as_primitive::<UInt64Type>()
.iter()
.map(|row_id| RowAddress::new_from_id(row_id.unwrap()).fragment_id())
.map(|row_id| RowAddress::from(row_id.unwrap()).fragment_id())
.collect::<Vec<_>>();
frag_ids.sort();
frag_ids.dedup();
Expand Down
3 changes: 3 additions & 0 deletions rust/lance-table/src/format/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ pub struct Manifest {
pub writer_version: Option<WriterVersion>,

/// Fragments, the pieces to build the dataset.
///
/// This list is stored in order, sorted by fragment id. However, the fragment id
/// sequence may have gaps.
pub fragments: Arc<Vec<Fragment>>,

/// The file position of the version aux data.
Expand Down
Loading

0 comments on commit 83439ef

Please sign in to comment.