Skip to content

Commit

Permalink
feat: basic log replay
Browse files Browse the repository at this point in the history
  • Loading branch information
roeap committed Jan 5, 2024
1 parent b45df1b commit 845c813
Show file tree
Hide file tree
Showing 7 changed files with 608 additions and 1 deletion.
2 changes: 2 additions & 0 deletions crates/deltalake-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ parquet = { workspace = true, features = [
"async",
"object_store",
], optional = true }
pin-project-lite = "^0.2.7"

# datafusion
datafusion = { workspace = true, optional = true }
Expand All @@ -49,6 +50,7 @@ serde_json = { workspace = true }
# "stdlib"
bytes = { workspace = true }
chrono = { workspace = true, default-features = false, features = ["clock"] }
hashbrown = "*"
regex = { workspace = true }
thiserror = { workspace = true }
uuid = { workspace = true, features = ["serde", "v4"] }
Expand Down
1 change: 1 addition & 0 deletions crates/deltalake-core/src/kernel/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ pub use actions::*;
pub use error::*;
pub use expressions::*;
pub use schema::*;
pub use snapshot::*;

/// A trait for all kernel types that are used as part of data checking
pub trait DataCheck {
Expand Down
177 changes: 177 additions & 0 deletions crates/deltalake-core/src/kernel/snapshot/extract.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
use std::sync::Arc;

use arrow_array::{
Array, ArrowNativeTypeOp, ArrowNumericType, BooleanArray, ListArray, MapArray, PrimitiveArray,
RecordBatch, StringArray, StructArray,
};
use arrow_schema::{ArrowError, DataType};

use crate::{DeltaResult, DeltaTableError};

pub(crate) trait ProvidesColumnByName {
fn column_by_name(&self, name: &str) -> Option<&Arc<dyn Array>>;
}

impl ProvidesColumnByName for RecordBatch {
fn column_by_name(&self, name: &str) -> Option<&Arc<dyn Array>> {
self.column_by_name(name)
}
}

impl ProvidesColumnByName for StructArray {
fn column_by_name(&self, name: &str) -> Option<&Arc<dyn Array>> {
self.column_by_name(name)
}
}

pub(super) fn extract_and_cast<'a, T: Array + 'static>(
arr: &'a dyn ProvidesColumnByName,
name: &'a str,
) -> DeltaResult<&'a T> {
extract_and_cast_opt::<T>(arr, name).ok_or(DeltaTableError::Generic(format!(
"missing-column: {}",
name
)))
}

pub(super) fn extract_and_cast_opt<'a, T: Array + 'static>(
array: &'a dyn ProvidesColumnByName,
name: &'a str,
) -> Option<&'a T> {
let mut path_steps = name.split('.');
let first = path_steps.next()?;
extract_column(array, first, &mut path_steps)
.ok()?
.as_any()
.downcast_ref::<T>()
}

pub(super) fn extract_column<'a>(
array: &'a dyn ProvidesColumnByName,
path_step: &str,
remaining_path_steps: &mut impl Iterator<Item = &'a str>,
) -> Result<&'a Arc<dyn Array>, ArrowError> {
let child = array
.column_by_name(path_step)
.ok_or(ArrowError::SchemaError(format!(
"No such field: {}",
path_step,
)))?;

if let Some(next_path_step) = remaining_path_steps.next() {
match child.data_type() {
DataType::Map(_, _) => {
// NOTE a map has exatly one child, but we wnat to be agnostic of its name.
// so we case the current array as map, and use the entries accessor.
let maparr = column_as_map(path_step, &Some(child))?;
if let Some(next_path) = remaining_path_steps.next() {
extract_column(maparr.entries(), next_path, remaining_path_steps)
} else {
Ok(child)
// if maparr.entries().num_columns() != 2 {
// return Err(ArrowError::SchemaError(format!(
// "Map {} has {} columns, expected 2",
// path_step,
// maparr.entries().num_columns()
// )));
// }
// if next_path_step == *maparr.entries().column_names().first().unwrap() {
// Ok(maparr.entries().column(0))
// } else {
// Ok(maparr.entries().column(1))
// }
}
}
DataType::List(_) => {
let listarr = column_as_list(path_step, &Some(child))?;
if let Some(next_path) = remaining_path_steps.next() {
extract_column(
column_as_struct(next_path_step, &Some(listarr.values()))?,
next_path,
remaining_path_steps,
)
} else {
Ok(listarr.values())
}
}
_ => extract_column(
column_as_struct(path_step, &Some(child))?,
next_path_step,
remaining_path_steps,
),
}
} else {
Ok(child)
}
}

fn column_as_struct<'a>(
name: &str,
column: &Option<&'a Arc<dyn Array>>,
) -> Result<&'a StructArray, ArrowError> {
column
.ok_or(ArrowError::SchemaError(format!("No such column: {}", name)))?
.as_any()
.downcast_ref::<StructArray>()
.ok_or(ArrowError::SchemaError(format!("{} is not a struct", name)))
}

fn column_as_map<'a>(
name: &str,
column: &Option<&'a Arc<dyn Array>>,
) -> Result<&'a MapArray, ArrowError> {
column
.ok_or(ArrowError::SchemaError(format!("No such column: {}", name)))?
.as_any()
.downcast_ref::<MapArray>()
.ok_or(ArrowError::SchemaError(format!("{} is not a map", name)))
}

fn column_as_list<'a>(
name: &str,
column: &Option<&'a Arc<dyn Array>>,
) -> Result<&'a ListArray, ArrowError> {
column
.ok_or(ArrowError::SchemaError(format!("No such column: {}", name)))?
.as_any()
.downcast_ref::<ListArray>()
.ok_or(ArrowError::SchemaError(format!("{} is not a map", name)))
}

#[inline]
pub(super) fn read_str<'a>(arr: &'a StringArray, idx: usize) -> DeltaResult<&'a str> {
read_str_opt(arr, idx).ok_or(DeltaTableError::Generic("missing value".into()))
}

#[inline]
pub(super) fn read_str_opt<'a>(arr: &'a StringArray, idx: usize) -> Option<&'a str> {
arr.is_valid(idx).then(|| arr.value(idx))
}

#[inline]
pub(super) fn read_primitive<T>(arr: &PrimitiveArray<T>, idx: usize) -> DeltaResult<T::Native>
where
T: ArrowNumericType,
T::Native: ArrowNativeTypeOp,
{
read_primitive_opt(arr, idx).ok_or(DeltaTableError::Generic("missing value".into()))
}

#[inline]
pub(super) fn read_primitive_opt<T>(arr: &PrimitiveArray<T>, idx: usize) -> Option<T::Native>
where
T: ArrowNumericType,
T::Native: ArrowNativeTypeOp,
{
arr.is_valid(idx).then(|| arr.value(idx))
}

#[inline]
pub(super) fn read_bool(arr: &BooleanArray, idx: usize) -> DeltaResult<bool> {
read_bool_opt(arr, idx).ok_or(DeltaTableError::Generic("missing value".into()))
}

#[inline]
pub(super) fn read_bool_opt(arr: &BooleanArray, idx: usize) -> Option<bool> {
arr.is_valid(idx).then(|| arr.value(idx))
}
2 changes: 1 addition & 1 deletion crates/deltalake-core/src/kernel/snapshot/log_segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ impl LogSegment {
})
}

pub(super) fn log_stream(
pub(super) fn commit_stream(
&self,
store: Arc<dyn ObjectStore>,
read_schema: &Schema,
Expand Down
94 changes: 94 additions & 0 deletions crates/deltalake-core/src/kernel/snapshot/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,96 @@
use std::sync::Arc;

use arrow_array::RecordBatch;
use futures::{stream::BoxStream, Stream, StreamExt, TryStreamExt};
use object_store::path::Path;
use object_store::ObjectStore;

use self::log_segment::LogSegment;
use self::replay::ReplayStream;
use crate::kernel::{actions::ActionType, StructType};
use crate::{DeltaResult, DeltaTableConfig};

mod extract;
mod log_segment;
mod replay;

/// A snapshot of a Delta table
pub struct Snapshot {
log_segment: LogSegment,
store: Arc<dyn ObjectStore>,
config: DeltaTableConfig,
}

impl Snapshot {
/// Create a new snapshot from a log segment
pub async fn try_new(
table_root: &Path,
store: Arc<dyn ObjectStore>,
config: DeltaTableConfig,
version: Option<i64>,
) -> DeltaResult<Self> {
let log_segment = LogSegment::try_new(table_root, version, store.as_ref()).await?;
Ok(Self {
log_segment,
store,
config,
})
}

/// Get the table version of the snapshot
pub fn version(&self) -> i64 {
self.log_segment.version
}

/// Get the files in the snapshot
pub fn files(&self) -> DeltaResult<ReplayStream<BoxStream<'_, DeltaResult<RecordBatch>>>> {
let read_schema = Arc::new(StructType::new(vec![
ActionType::Add.schema_field().clone(),
ActionType::Remove.schema_field().clone(),
]));

let batches =
self.log_segment
.commit_stream(self.store.clone(), &read_schema, &self.config)?;

Ok(ReplayStream::new(batches))
}
}

#[cfg(test)]
mod tests {
use deltalake_test::utils::*;

use super::*;

#[tokio::test]
async fn test_snapshot_files() -> TestResult {
let context = IntegrationContext::new(Box::new(LocalStorageIntegration::default()))?;
context.load_table(TestTables::Checkpoints).await?;
context.load_table(TestTables::Simple).await?;

let store = context
.table_builder(TestTables::Simple)
.build_storage()?
.object_store();

let snapshot =
Snapshot::try_new(&Path::default(), store.clone(), Default::default(), None).await?;
let batches = snapshot.files()?.try_collect::<Vec<_>>().await?;

let expected = [
"+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+",
"| add |",
"+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+",
"| {path: part-00000-2befed33-c358-4768-a43c-3eda0d2a499d-c000.snappy.parquet, partitionValues: {}, size: 262, modificationTime: 1587968626000, dataChange: true, stats: , tags: , deletionVector: , baseRowId: , defaultRowCommitVersion: , clusteringProvider: } |",
"| {path: part-00000-c1777d7d-89d9-4790-b38a-6ee7e24456b1-c000.snappy.parquet, partitionValues: {}, size: 262, modificationTime: 1587968602000, dataChange: true, stats: , tags: , deletionVector: , baseRowId: , defaultRowCommitVersion: , clusteringProvider: } |",
"| {path: part-00001-7891c33d-cedc-47c3-88a6-abcfb049d3b4-c000.snappy.parquet, partitionValues: {}, size: 429, modificationTime: 1587968602000, dataChange: true, stats: , tags: , deletionVector: , baseRowId: , defaultRowCommitVersion: , clusteringProvider: } |",
"| {path: part-00004-315835fe-fb44-4562-98f6-5e6cfa3ae45d-c000.snappy.parquet, partitionValues: {}, size: 429, modificationTime: 1587968602000, dataChange: true, stats: , tags: , deletionVector: , baseRowId: , defaultRowCommitVersion: , clusteringProvider: } |",
"| {path: part-00007-3a0e4727-de0d-41b6-81ef-5223cf40f025-c000.snappy.parquet, partitionValues: {}, size: 429, modificationTime: 1587968602000, dataChange: true, stats: , tags: , deletionVector: , baseRowId: , defaultRowCommitVersion: , clusteringProvider: } |",
"+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+",
];
assert_batches_sorted_eq!(expected, &batches);

Ok(())
}
}
Loading

0 comments on commit 845c813

Please sign in to comment.