Skip to content

Commit

Permalink
feat: improve logical file handling
Browse files Browse the repository at this point in the history
  • Loading branch information
roeap committed Jan 15, 2024
1 parent f072a00 commit 2d9a5ea
Show file tree
Hide file tree
Showing 21 changed files with 290 additions and 176 deletions.
2 changes: 1 addition & 1 deletion crates/benchmarks/src/bin/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ async fn benchmark_merge_tpcds(
merge: fn(DataFrame, DeltaTable) -> Result<MergeBuilder, DeltaTableError>,
) -> Result<(core::time::Duration, MergeMetrics), DataFusionError> {
let table = DeltaTableBuilder::from_uri(path).load().await?;
let file_count = table.snapshot()?.file_actions()?.len();
let file_count = table.snapshot()?.files_count();

let provider = DeltaTableProvider::try_new(
table.snapshot()?.clone(),
Expand Down
2 changes: 1 addition & 1 deletion crates/deltalake-core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,7 @@ impl TableProvider for DeltaTable {
}

fn statistics(&self) -> Option<Statistics> {
self.get_state()?.datafusion_table_statistics()
self.snapshot().ok()?.datafusion_table_statistics()
}
}

Expand Down
5 changes: 5 additions & 0 deletions crates/deltalake-core/src/kernel/actions/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -661,6 +661,11 @@ impl Remove {
pub fn dv_unique_id(&self) -> Option<String> {
self.deletion_vector.clone().map(|dv| dv.unique_id())
}

/// Convert into Action::Remove
pub fn into_action(self) -> super::Action {
super::Action::Remove(self)
}
}

/// Delta AddCDCFile action that describes a parquet CDC data file.
Expand Down
4 changes: 2 additions & 2 deletions crates/deltalake-core/src/kernel/arrow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pub mod schemas;
const MAP_ROOT_DEFAULT: &str = "entries";
const MAP_KEY_DEFAULT: &str = "keys";
const MAP_VALUE_DEFAULT: &str = "values";
const LIST_ROOT_DEFAULT: &str = "element";
const LIST_ROOT_DEFAULT: &str = "item";

impl TryFrom<ActionType> for ArrowField {
type Error = ArrowError;
Expand Down Expand Up @@ -71,7 +71,7 @@ impl TryFrom<&ArrayType> for ArrowField {
LIST_ROOT_DEFAULT,
ArrowDataType::try_from(a.element_type())?,
// TODO check how to handle nullability
true, // a.contains_null(),
a.contains_null(),
))
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/deltalake-core/src/kernel/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,7 @@ impl Display for PrimitiveType {
PrimitiveType::Date => write!(f, "date"),
PrimitiveType::Timestamp => write!(f, "timestamp"),
PrimitiveType::Decimal(precision, scale) => {
write!(f, "decimal({}, {})", precision, scale)
write!(f, "decimal({},{})", precision, scale)
}
}
}
Expand Down
185 changes: 159 additions & 26 deletions crates/deltalake-core/src/kernel/snapshot/log_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,16 @@ use std::borrow::Cow;
use std::collections::{BTreeMap, HashMap};
use std::sync::Arc;

use arrow_array::{Array, Int64Array, MapArray, RecordBatch, StringArray, StructArray};
use arrow_array::{Array, Int32Array, Int64Array, MapArray, RecordBatch, StringArray, StructArray};
use chrono::{NaiveDateTime, TimeZone, Utc};
use object_store::path::Path;
use object_store::ObjectMeta;
use percent_encoding::percent_decode_str;

use super::extract::extract_and_cast;
use crate::kernel::{DataType, Metadata, Scalar, StructField, StructType};
use super::extract::{extract_and_cast, extract_and_cast_opt};
use crate::kernel::{
DataType, DeletionVectorDescriptor, Metadata, Remove, Scalar, StructField, StructType,
};
use crate::{DeltaResult, DeltaTableError};

const COL_NUM_RECORDS: &str = "numRecords";
Expand Down Expand Up @@ -58,27 +60,101 @@ impl<T: PartitionsExt> PartitionsExt for Arc<T> {
}
}

/// A view into the log data for a single logical file.
/// Defines a deletion vector
#[derive(Debug, PartialEq, Clone)]
pub struct DeletionVector<'a> {
storage_type: &'a StringArray,
path_or_inline_dv: &'a StringArray,
size_in_bytes: &'a Int32Array,
cardinality: &'a Int64Array,
offset: Option<&'a Int32Array>,
}

/// View into a deletion vector data.
#[derive(Debug)]
pub struct DeletionVectorView<'a> {
data: &'a DeletionVector<'a>,
/// Pointer to a specific row in the log data.
index: usize,
}

impl<'a> DeletionVectorView<'a> {
/// get a unique idenitfier for the deletion vector
pub fn unique_id(&self) -> String {
if let Some(offset) = self.offset() {
format!(
"{}{}@{offset}",
self.storage_type(),
self.path_or_inline_dv()
)
} else {
format!("{}{}", self.storage_type(), self.path_or_inline_dv())
}
}

fn descriptor(&self) -> DeletionVectorDescriptor {
DeletionVectorDescriptor {
storage_type: self.storage_type().parse().unwrap(),
path_or_inline_dv: self.path_or_inline_dv().to_string(),
size_in_bytes: self.size_in_bytes(),
cardinality: self.cardinality(),
offset: self.offset(),
}
}

fn storage_type(&self) -> &str {
self.data.storage_type.value(self.index)
}
fn path_or_inline_dv(&self) -> &str {
self.data.path_or_inline_dv.value(self.index)
}
fn size_in_bytes(&self) -> i32 {
self.data.size_in_bytes.value(self.index)
}
fn cardinality(&self) -> i64 {
self.data.cardinality.value(self.index)
}
fn offset(&self) -> Option<i32> {
self.data
.offset
.and_then(|a| a.is_null(self.index).then(|| a.value(self.index)))
}
}

/// A view into the log data representiang a single logical file.
///
/// This stuct holds a pointer to a specific row in the log data and provides access to the
/// information stored in that row by tracking references to the underlying arrays.
///
/// Additionally, references to some table metadata is tracked to provide higher level
/// functionality, e.g. parsing partition values.
#[derive(Debug, PartialEq)]
pub struct FileStats<'a> {
pub struct LogicalFile<'a> {
path: &'a StringArray,
/// The on-disk size of this data file in bytes
size: &'a Int64Array,
/// Last modification time of the file in milliseconds since the epoch.
modification_time: &'a Int64Array,
/// The partition values for this logical file.
partition_values: &'a MapArray,
partition_fields: PartitionFields<'a>,
/// Struct containing all available statistics for the columns in this file.
stats: &'a StructArray,
/// Array containing the deletion vector data.
deletion_vector: Option<DeletionVector<'a>>,

/// Pointer to a specific row in the log data.
index: usize,
/// Schema fields the table is partitioned by.
partition_fields: PartitionFields<'a>,
}

impl FileStats<'_> {
impl LogicalFile<'_> {
/// Path to the files storage location.
pub fn path(&self) -> Cow<'_, str> {
percent_decode_str(self.path.value(self.index))
.decode_utf8()
.unwrap()
percent_decode_str(self.path.value(self.index)).decode_utf8_lossy()
}

/// an object store [`Path`] to the file.
/// An object store [`Path`] to the file.
///
/// this tries to parse the file string and if that fails, it will return the string as is.
// TODO assert consisent handling of the paths encoding when reading log data so this logic can be removed.
Expand Down Expand Up @@ -114,7 +190,6 @@ impl FileStats<'_> {
}

/// The partition values for this logical file.
// TODO make this fallible
pub fn partition_values(&self) -> DeltaResult<PartitionValues<'_>> {
if self.partition_fields.is_empty() {
return Ok(BTreeMap::new());
Expand Down Expand Up @@ -165,6 +240,18 @@ impl FileStats<'_> {
.collect::<DeltaResult<BTreeMap<_, _>>>()
}

/// Defines a deletion vector
pub fn deletion_vector(&self) -> Option<DeletionVectorView<'_>> {
self.deletion_vector.as_ref().and_then(|arr| {
arr.storage_type
.is_valid(self.index)
.then(|| DeletionVectorView {
data: arr,
index: self.index,
})
})
}

/// The number of records stored in the data file.
pub fn num_records(&self) -> Option<usize> {
self.stats
Expand Down Expand Up @@ -193,19 +280,46 @@ impl FileStats<'_> {
.column_by_name(COL_MAX_VALUES)
.and_then(|c| Scalar::from_array(c.as_ref(), self.index))
}

/// Create a remove action for this logical file.
pub fn remove_action(&self, data_change: bool) -> Remove {
Remove {
// TODO use the raw (still encoded) path here once we reconciled serde ...
path: self.path().to_string(),
data_change,
deletion_timestamp: Some(Utc::now().timestamp_millis()),
extended_file_metadata: Some(true),
size: Some(self.size()),
partition_values: self.partition_values().ok().map(|pv| {
pv.iter()
.map(|(k, v)| {
(
k.to_string(),
if v.is_null() {
None
} else {
Some(v.serialize())
},
)
})
.collect()
}),
deletion_vector: self.deletion_vector().map(|dv| dv.descriptor()),
tags: None,
base_row_id: None,
default_row_commit_version: None,
}
}
}

impl<'a> TryFrom<&FileStats<'a>> for ObjectMeta {
impl<'a> TryFrom<&LogicalFile<'a>> for ObjectMeta {
type Error = DeltaTableError;

fn try_from(file_stats: &FileStats<'a>) -> Result<Self, Self::Error> {
let location = file_stats.object_store_path();
let size = file_stats.size() as usize;
let last_modified = file_stats.modification_datetime()?;
fn try_from(file_stats: &LogicalFile<'a>) -> Result<Self, Self::Error> {
Ok(ObjectMeta {
location,
size,
last_modified,
location: file_stats.object_store_path(),
size: file_stats.size() as usize,
last_modified: file_stats.modification_datetime()?,
version: None,
e_tag: None,
})
Expand All @@ -219,6 +333,7 @@ pub struct FileStatsAccessor<'a> {
sizes: &'a Int64Array,
modification_times: &'a Int64Array,
stats: &'a StructArray,
deletion_vector: Option<DeletionVector<'a>>,
partition_values: &'a MapArray,
length: usize,
pointer: usize,
Expand All @@ -242,39 +357,57 @@ impl<'a> FileStatsAccessor<'a> {
.map(|c| Ok((c.as_str(), schema.field_with_name(c.as_str())?)))
.collect::<DeltaResult<BTreeMap<_, _>>>()?,
);
let deletion_vector = extract_and_cast_opt::<StructArray>(data, "add.deletionVector");
let deletion_vector = deletion_vector.and_then(|dv| {
let storage_type = extract_and_cast::<StringArray>(dv, "storageType").ok()?;
let path_or_inline_dv = extract_and_cast::<StringArray>(dv, "pathOrInlineDv").ok()?;
let size_in_bytes = extract_and_cast::<Int32Array>(dv, "sizeInBytes").ok()?;
let cardinality = extract_and_cast::<Int64Array>(dv, "cardinality").ok()?;
let offset = extract_and_cast_opt::<Int32Array>(dv, "offset");
Some(DeletionVector {
storage_type,
path_or_inline_dv,
size_in_bytes,
cardinality,
offset,
})
});

Ok(Self {
partition_fields,
paths,
sizes,
modification_times,
stats,
deletion_vector,
partition_values,
length: data.num_rows(),
pointer: 0,
})
}

pub(crate) fn get(&self, index: usize) -> DeltaResult<FileStats<'a>> {
pub(crate) fn get(&self, index: usize) -> DeltaResult<LogicalFile<'a>> {
if index >= self.length {
return Err(DeltaTableError::Generic(format!(
"index out of bounds: {} >= {}",
index, self.length
)));
}
Ok(FileStats {
Ok(LogicalFile {
path: self.paths,
size: self.sizes,
modification_time: self.modification_times,
partition_values: self.partition_values,
partition_fields: self.partition_fields.clone(),
stats: self.stats,
deletion_vector: self.deletion_vector.clone(),
index,
})
}
}

impl<'a> Iterator for FileStatsAccessor<'a> {
type Item = FileStats<'a>;
type Item = LogicalFile<'a>;

fn next(&mut self) -> Option<Self::Item> {
if self.pointer >= self.length {
Expand Down Expand Up @@ -312,7 +445,7 @@ impl<'a> LogDataHandler<'a> {
}

impl<'a> IntoIterator for LogDataHandler<'a> {
type Item = FileStats<'a>;
type Item = LogicalFile<'a>;
type IntoIter = Box<dyn Iterator<Item = Self::Item> + 'a>;

fn into_iter(self) -> Self::IntoIter {
Expand Down Expand Up @@ -557,7 +690,7 @@ mod tests {
.snapshot()
.unwrap()
.snapshot
.file_stats()
.files()
.find(|f| {
f.path().ends_with(
"part-00000-7a509247-4f58-4453-9202-51d75dee59af-c000.snappy.parquet",
Expand All @@ -569,7 +702,7 @@ mod tests {
.snapshot()
.unwrap()
.snapshot
.file_stats()
.files()
.find(|f| {
f.path().ends_with(
"part-00000-7a509247-4f58-4453-9202-51d75dee59af-c000.snappy.parquet",
Expand Down
Loading

0 comments on commit 2d9a5ea

Please sign in to comment.