Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce TableChangesScan::execute and ScanFileReader #555

Closed
wants to merge 36 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
9e5b04a
initial log replay
OussamaSaoudi-db Nov 25, 2024
2ff7061
Add basic mock table for testing
OussamaSaoudi-db Nov 25, 2024
3820dd3
Finish testing framework for commit actions
OussamaSaoudi-db Nov 25, 2024
ff7e1a1
Fix deletion vectors
OussamaSaoudi-db Nov 25, 2024
674b9df
Add protocol test
OussamaSaoudi-db Nov 26, 2024
2460f2b
Make MockTable async
OussamaSaoudi-db Nov 26, 2024
3fe9347
add schema check
OussamaSaoudi-db Nov 26, 2024
98d64d3
Add config flag parsing
OussamaSaoudi-db Nov 26, 2024
6221dc5
Add configuration check
OussamaSaoudi-db Nov 26, 2024
2a33d77
Change log replay to work with table changes scan
OussamaSaoudi-db Nov 26, 2024
fe4c0e6
add timestamp tests
OussamaSaoudi-db Nov 26, 2024
c2b1c00
Use map_ok
OussamaSaoudi-db Nov 26, 2024
63435e9
Address some pr comments
OussamaSaoudi-db Nov 26, 2024
d8b1225
Integrate with table changes builder
OussamaSaoudi-db Nov 26, 2024
f072e6e
Fix private visit_protocol, remove print
OussamaSaoudi-db Nov 26, 2024
ba76bb1
Change selection vector computation
OussamaSaoudi-db Nov 26, 2024
a08dc6e
Add comments for log replay
OussamaSaoudi-db Nov 26, 2024
93e900a
more documentation
OussamaSaoudi-db Nov 26, 2024
9702ef0
Add file-level doc
OussamaSaoudi-db Nov 26, 2024
196de69
Revert "Add file-level doc"
OussamaSaoudi-db Nov 26, 2024
d188791
Add file level doc
OussamaSaoudi-db Nov 26, 2024
33f4b7e
Initial scan file
OussamaSaoudi-db Nov 27, 2024
968b543
Move docs to fields
OussamaSaoudi-db Nov 29, 2024
b183304
Move common utils to utils::test_utils
OussamaSaoudi-db Nov 29, 2024
27d7ed5
Refactor to prepare for scan_file, remove unused annotation
OussamaSaoudi-db Nov 29, 2024
8c85361
Merge branch 'log_replay_2' into cdf_scan_file
OussamaSaoudi-db Nov 29, 2024
e384be6
add scan file visitor test
OussamaSaoudi-db Nov 29, 2024
21b7e85
Fix clippy
OussamaSaoudi-db Nov 29, 2024
2c9f483
stub out execute
OussamaSaoudi-db Nov 29, 2024
8d3d843
Add scan_data_to_scan_file phase
OussamaSaoudi-db Nov 29, 2024
ad29685
Merge branch 'cdf_scan_file' into cdf_read_phase
OussamaSaoudi-db Nov 29, 2024
a1c78ce
initial data read phase
OussamaSaoudi-db Nov 29, 2024
40dca95
initial data read phase
OussamaSaoudi-db Nov 30, 2024
7f1098b
update to scan_file expression
OussamaSaoudi-db Nov 30, 2024
6e37d99
Merge branch 'cdf_scan_file' into cdf_read_phase
OussamaSaoudi-db Nov 30, 2024
13cb83c
Working cdf dv resolution
OussamaSaoudi-db Nov 30, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions ffi/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ pub enum KernelError {
UnsupportedError,
ParseIntervalError,
ChangeDataFeedUnsupported,
ChangeDataFeedIncompatibleSchema,
}

impl From<Error> for KernelError {
Expand Down Expand Up @@ -104,6 +105,9 @@ impl From<Error> for KernelError {
Error::Unsupported(_) => KernelError::UnsupportedError,
Error::ParseIntervalError(_) => KernelError::ParseIntervalError,
Error::ChangeDataFeedUnsupported(_) => KernelError::ChangeDataFeedUnsupported,
Error::ChangeDataFeedIncompatibleSchema(_, _) => {
KernelError::ChangeDataFeedIncompatibleSchema
}
}
}
}
Expand Down
19 changes: 19 additions & 0 deletions kernel/examples/change_data_feed/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
[package]
name = "change_data_feed"
version = "0.1.0"
edition = "2021"

[dependencies]
arrow-array = { workspace = true }
arrow-schema = { workspace = true }
clap = { version = "4.5", features = ["derive"] }
delta_kernel = { path = "../../../kernel", features = [
"cloud",
"default-engine",
"developer-visibility",
"sync-engine"
] }
env_logger = "0.11.3"
url = "2"
itertools = "0.13"
arrow = { workspace = true, features = ["prettyprint"] }
52 changes: 52 additions & 0 deletions kernel/examples/change_data_feed/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
use std::sync::Arc;

use arrow::{compute::filter_record_batch, util::pretty::print_batches};
use arrow_array::RecordBatch;
use clap::Parser;
use delta_kernel::{
engine::{arrow_data::ArrowEngineData, sync::SyncEngine},
DeltaResult, Table,
};
use itertools::Itertools;

#[derive(Parser)]
#[command(author, version, about, long_about = None)]
#[command(propagate_version = true)]
struct Cli {
/// Path to the table to inspect
path: String,

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: extra spaces

start_version: u64,

end_version: Option<u64>,
}

fn main() -> DeltaResult<()> {
let cli = Cli::parse();
let table = Table::try_from_uri(cli.path)?;
let engine = Arc::new(SyncEngine::new());
let table_changes = table.table_changes(engine.as_ref(), cli.start_version, cli.end_version)?;

let x = table_changes.into_scan_builder().build()?;
let batches: Vec<RecordBatch> = x
.execute(engine)?
.map(|scan_result| -> DeltaResult<_> {
let scan_result = scan_result?;
let mask = scan_result.full_mask();
let data = scan_result.raw_data?;
let record_batch: RecordBatch = data
.into_any()
.downcast::<ArrowEngineData>()
.map_err(|_| delta_kernel::Error::EngineDataType("ArrowEngineData".to_string()))?
.into();
if let Some(mask) = mask {
Ok(filter_record_batch(&record_batch, &mask.into())?)
} else {
Ok(record_batch)
}
})
.try_collect()?;
print_batches(&batches)?;

Ok(())
}
6 changes: 3 additions & 3 deletions kernel/src/actions/deletion_vector.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
//! Code relating to parsing and using deletion vectors

use std::io::{Cursor, Read};
use std::sync::Arc;

use bytes::Bytes;
use roaring::RoaringTreemap;
use std::io::{Cursor, Read};
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you need these?

use std::sync::Arc;
use url::Url;

use delta_kernel_derive::Schema;
Expand All @@ -13,6 +12,7 @@ use crate::utils::require;
use crate::{DeltaResult, Error, FileSystemClient};

#[derive(Debug, Clone, PartialEq, Eq, Schema)]
#[cfg_attr(test, derive(serde::Serialize), serde(rename_all = "camelCase"))]
pub struct DeletionVectorDescriptor {
/// A single character to indicate how to access the DV. Legal options are: ['u', 'i', 'p'].
pub storage_type: String,
Expand Down
20 changes: 20 additions & 0 deletions kernel/src/actions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ pub(crate) fn get_log_commit_info_schema() -> &'static SchemaRef {
}

#[derive(Debug, Clone, PartialEq, Eq, Schema)]
#[cfg_attr(test, derive(Serialize), serde(rename_all = "camelCase"))]
pub struct Format {
/// Name of the encoding for files in this table
pub provider: String,
Expand All @@ -102,6 +103,7 @@ impl Default for Format {
}

#[derive(Debug, Default, Clone, PartialEq, Eq, Schema)]
#[cfg_attr(test, derive(Serialize), serde(rename_all = "camelCase"))]
pub struct Metadata {
/// Unique identifier for this table
pub id: String,
Expand Down Expand Up @@ -325,6 +327,7 @@ where
#[derive(Debug, Clone, PartialEq, Eq, Schema)]
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
#[cfg_attr(not(feature = "developer-visibility"), visibility::make(pub(crate)))]
#[cfg_attr(test, derive(Serialize, Default), serde(rename_all = "camelCase"))]
struct CommitInfo {
/// The time this logical file was created, as milliseconds since the epoch.
/// Read: optional, write: required (that is, kernel always writes).
Expand All @@ -346,6 +349,7 @@ struct CommitInfo {
}

#[derive(Debug, Clone, PartialEq, Eq, Schema)]
#[cfg_attr(test, derive(Serialize, Default), serde(rename_all = "camelCase"))]
pub struct Add {
/// A relative path to a data file from the root of the table or an absolute path to a file
/// that should be added to the table. The path is a URI as specified by
Expand Down Expand Up @@ -374,23 +378,29 @@ pub struct Add {
/// Contains [statistics] (e.g., count, min/max values for columns) about the data in this logical file.
///
/// [statistics]: https://github.com/delta-io/delta/blob/master/PROTOCOL.md#Per-file-Statistics
#[cfg_attr(test, serde(skip_serializing_if = "Option::is_none"))]
pub stats: Option<String>,

/// Map containing metadata about this logical file.
#[cfg_attr(test, serde(skip_serializing_if = "Option::is_none"))]
pub tags: Option<HashMap<String, String>>,

/// Information about deletion vector (DV) associated with this add action
#[cfg_attr(test, serde(skip_serializing_if = "Option::is_none"))]
pub deletion_vector: Option<DeletionVectorDescriptor>,

/// Default generated Row ID of the first row in the file. The default generated Row IDs
/// of the other rows in the file can be reconstructed by adding the physical index of the
/// row within the file to the base Row ID
#[cfg_attr(test, serde(skip_serializing_if = "Option::is_none"))]
pub base_row_id: Option<i64>,

/// First commit version in which an add action with the same path was committed to the table.
#[cfg_attr(test, serde(skip_serializing_if = "Option::is_none"))]
pub default_row_commit_version: Option<i64>,

/// The name of the clustering implementation
#[cfg_attr(test, serde(skip_serializing_if = "Option::is_none"))]
pub clustering_provider: Option<String>,
}

Expand All @@ -403,6 +413,7 @@ impl Add {
#[derive(Debug, Clone, PartialEq, Eq, Schema)]
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
#[cfg_attr(not(feature = "developer-visibility"), visibility::make(pub(crate)))]
#[cfg_attr(test, derive(Serialize, Default), serde(rename_all = "camelCase"))]
struct Remove {
/// A relative path to a data file from the root of the table or an absolute path to a file
/// that should be added to the table. The path is a URI as specified by
Expand All @@ -412,39 +423,48 @@ struct Remove {
pub(crate) path: String,

/// The time this logical file was created, as milliseconds since the epoch.
#[cfg_attr(test, serde(skip_serializing_if = "Option::is_none"))]
pub(crate) deletion_timestamp: Option<i64>,

/// When `false` the logical file must already be present in the table or the records
/// in the added file must be contained in one or more remove actions in the same version.
pub(crate) data_change: bool,

/// When true the fields `partition_values`, `size`, and `tags` are present
#[cfg_attr(test, serde(skip_serializing_if = "Option::is_none"))]
pub(crate) extended_file_metadata: Option<bool>,

/// A map from partition column to value for this logical file.
#[cfg_attr(test, serde(skip_serializing_if = "Option::is_none"))]
pub(crate) partition_values: Option<HashMap<String, String>>,

/// The size of this data file in bytes
#[cfg_attr(test, serde(skip_serializing_if = "Option::is_none"))]
pub(crate) size: Option<i64>,

/// Map containing metadata about this logical file.
#[cfg_attr(test, serde(skip_serializing_if = "Option::is_none"))]
pub(crate) tags: Option<HashMap<String, String>>,

/// Information about deletion vector (DV) associated with this add action
#[cfg_attr(test, serde(skip_serializing_if = "Option::is_none"))]
pub(crate) deletion_vector: Option<DeletionVectorDescriptor>,

/// Default generated Row ID of the first row in the file. The default generated Row IDs
/// of the other rows in the file can be reconstructed by adding the physical index of the
/// row within the file to the base Row ID
#[cfg_attr(test, serde(skip_serializing_if = "Option::is_none"))]
pub(crate) base_row_id: Option<i64>,

/// First commit version in which an add action with the same path was committed to the table.
#[cfg_attr(test, serde(skip_serializing_if = "Option::is_none"))]
pub(crate) default_row_commit_version: Option<i64>,
}

#[derive(Debug, Clone, PartialEq, Eq, Schema)]
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
#[cfg_attr(not(feature = "developer-visibility"), visibility::make(pub(crate)))]
#[cfg_attr(test, derive(Serialize, Default), serde(rename_all = "camelCase"))]
struct Cdc {
/// A relative path to a change data file from the root of the table or an absolute path to a
/// change data file that should be added to the table. The path is a URI as specified by
Expand Down
2 changes: 1 addition & 1 deletion kernel/src/actions/visitors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ struct ProtocolVisitor {

impl ProtocolVisitor {
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
fn visit_protocol<'a>(
pub(crate) fn visit_protocol<'a>(
row_index: usize,
min_reader_version: i32,
getters: &[&'a dyn GetData<'a>],
Expand Down
2 changes: 1 addition & 1 deletion kernel/src/engine/sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use tracing::debug;

mod fs_client;
pub(crate) mod json;
mod parquet;
pub(crate) mod parquet;

/// This is a simple implementation of [`Engine`]. It only supports reading data from the local
/// filesystem, and internally represents data using `Arrow`.
Expand Down
11 changes: 10 additions & 1 deletion kernel/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::{
str::Utf8Error,
};

use crate::schema::DataType;
use crate::schema::{DataType, StructType};
use crate::table_properties::ParseIntervalError;
use crate::Version;

Expand Down Expand Up @@ -188,6 +188,9 @@ pub enum Error {

#[error("Change data feed is unsupported for the table at version {0}")]
ChangeDataFeedUnsupported(Version),

#[error("Change data feed encountered incompatible schema. Expected {0}, got {1}")]
ChangeDataFeedIncompatibleSchema(String, String),
}

// Convenience constructors for Error types that take a String argument
Expand Down Expand Up @@ -254,6 +257,12 @@ impl Error {
pub fn change_data_feed_unsupported(version: impl Into<Version>) -> Self {
Self::ChangeDataFeedUnsupported(version.into())
}
pub fn change_data_feed_incompatible_schema(
expected: &StructType,
actual: &StructType,
) -> Self {
Self::ChangeDataFeedIncompatibleSchema(format!("{:?}", expected), format!("{:?}", actual))
}

// Capture a backtrace when the error is constructed.
#[must_use]
Expand Down
7 changes: 7 additions & 0 deletions kernel/src/expressions/scalars.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,13 @@ impl Scalar {
pub fn is_null(&self) -> bool {
matches!(self, Self::Null(_))
}

pub fn timestamp_from_millis(millis: i64) -> DeltaResult<Self> {
let Some(timestamp) = DateTime::from_timestamp_millis(millis) else {
return Err(Error::generic("Failed to convert timestamp"));
};
Ok(Self::Timestamp(timestamp.timestamp_micros()))
}
}

impl Display for Scalar {
Expand Down
4 changes: 2 additions & 2 deletions kernel/src/path.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ const MULTIPART_PART_LEN: usize = 10;
/// The number of characters in the uuid part of a uuid checkpoint
const UUID_PART_LEN: usize = 36;

#[derive(Debug)]
#[derive(Debug, Clone)]
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
#[cfg_attr(not(feature = "developer-visibility"), visibility::make(pub(crate)))]
enum LogPathFileType {
Expand All @@ -37,7 +37,7 @@ enum LogPathFileType {
Unknown,
}

#[derive(Debug)]
#[derive(Debug, Clone)]
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
#[cfg_attr(not(feature = "developer-visibility"), visibility::make(pub(crate)))]
struct ParsedLogPath<Location: AsUrl = FileMeta> {
Expand Down
1 change: 1 addition & 0 deletions kernel/src/scan/data_skipping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ fn as_data_skipping_predicate(expr: &Expr, inverted: bool) -> Option<Expr> {
DataSkippingPredicateCreator.eval_expr(expr, inverted)
}

#[derive(Clone)]
pub(crate) struct DataSkippingFilter {
stats_schema: SchemaRef,
select_stats_evaluator: Arc<dyn ExpressionEvaluator>,
Expand Down
2 changes: 1 addition & 1 deletion kernel/src/scan/log_replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ pub(crate) static SCAN_ROW_SCHEMA: LazyLock<Arc<StructType>> = LazyLock::new(||
]))
});

static SCAN_ROW_DATATYPE: LazyLock<DataType> = LazyLock::new(|| SCAN_ROW_SCHEMA.clone().into());
pub static SCAN_ROW_DATATYPE: LazyLock<DataType> = LazyLock::new(|| SCAN_ROW_SCHEMA.clone().into());

fn get_add_transform_expr() -> Expression {
Expression::Struct(vec![
Expand Down
8 changes: 4 additions & 4 deletions kernel/src/scan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::{DeltaResult, Engine, EngineData, Error, FileMeta};
use self::log_replay::scan_action_iter;
use self::state::GlobalScanState;

mod data_skipping;
pub mod data_skipping;
pub mod log_replay;
pub mod state;

Expand Down Expand Up @@ -124,7 +124,7 @@ pub struct ScanResult {
pub raw_data: DeltaResult<Box<dyn EngineData>>,
/// Raw row mask.
// TODO(nick) this should be allocated by the engine
raw_mask: Option<Vec<bool>>,
pub(crate) raw_mask: Option<Vec<bool>>,
}

impl ScanResult {
Expand Down Expand Up @@ -159,7 +159,7 @@ impl ScanResult {
/// store the name of the column, as that's all that's needed during the actual query. For
/// `Partition` we store an index into the logical schema for this query since later we need the
/// data type as well to materialize the partition column.
#[derive(PartialEq, Debug)]
#[derive(PartialEq, Debug, Clone)]
pub enum ColumnType {
// A column, selected from the data, as is
Selected(String),
Expand Down Expand Up @@ -380,7 +380,7 @@ pub fn scan_row_schema() -> Schema {
log_replay::SCAN_ROW_SCHEMA.as_ref().clone()
}

fn parse_partition_value(raw: Option<&String>, data_type: &DataType) -> DeltaResult<Scalar> {
pub fn parse_partition_value(raw: Option<&String>, data_type: &DataType) -> DeltaResult<Scalar> {
match (raw, data_type.as_primitive_opt()) {
(Some(v), Some(primitive)) => primitive.parse_scalar(v),
(Some(_), None) => Err(Error::generic(format!(
Expand Down
Loading
Loading