Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TableChangesScan::execute and end to end testing for CDF #580

Merged
merged 75 commits into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from 69 commits
Commits
Show all changes
75 commits
Select commit Hold shift + click to select a range
bd9585d
scan file implementation for cdf
OussamaSaoudi-db Nov 25, 2024
e2c2d90
change naming and update documentation for cdf scan files
OussamaSaoudi-db Dec 6, 2024
3db127f
fixup some naming
OussamaSaoudi-db Dec 6, 2024
ec6e629
inline dvinfo creation
OussamaSaoudi-db Dec 7, 2024
74eed10
Change name of expression transform
OussamaSaoudi-db Dec 7, 2024
9fe16db
Add test for null parttion columns
OussamaSaoudi-db Dec 7, 2024
8635e38
Improve testing for scan_file
OussamaSaoudi-db Dec 8, 2024
a17f4b0
Change visitor, remove (un)resolved cdf scan file
OussamaSaoudi-db Dec 8, 2024
5fab00c
Only keep track of remove dv instead of hashmap
OussamaSaoudi-db Dec 8, 2024
21553a4
Fix remove dv
OussamaSaoudi-db Dec 8, 2024
08867f8
patch rm_dv
OussamaSaoudi-db Dec 8, 2024
1228107
Update comment for CdfScanFileVisitor
OussamaSaoudi-db Dec 8, 2024
97a5790
Initial cdf read phase with deletion vector resolution
OussamaSaoudi-db Dec 6, 2024
4969e44
lazily construct empty treemaps
OussamaSaoudi-db Dec 6, 2024
dcb17fa
Change treemap handling in kernel, use selection vector, and simplify…
OussamaSaoudi-db Dec 6, 2024
ebaf225
Rebase onto scan file
OussamaSaoudi-db Dec 6, 2024
bd49142
update doc comment for dvs
OussamaSaoudi-db Dec 6, 2024
6287e6e
fix comment
OussamaSaoudi-db Dec 7, 2024
7fe4f4d
prototype of add/rm dv in cdfscanfile
OussamaSaoudi-db Dec 8, 2024
d4f95d5
Use default for dv info
OussamaSaoudi-db Dec 8, 2024
8a8f6bf
shorten tests, check error cases
OussamaSaoudi-db Dec 8, 2024
eeaabb0
Rename to physical_to_logical, add comment explaining the deletion/se…
OussamaSaoudi-db Dec 8, 2024
fa13ade
Add note about ordinary scans for dv resolution
OussamaSaoudi-db Dec 8, 2024
4dabdaf
Add doc comment to treemap_to_bools_with
OussamaSaoudi-db Dec 8, 2024
7fcb531
Update resolve_scan_file_dv docs
OussamaSaoudi-db Dec 8, 2024
577b424
Add test and docs
OussamaSaoudi-db Dec 8, 2024
a970df1
Add documentation
OussamaSaoudi-db Dec 8, 2024
5b70aab
Rename to resolve-dvs
OussamaSaoudi-db Dec 8, 2024
3e0ead6
Fixup naming and docs
OussamaSaoudi-db Dec 9, 2024
0b207c4
address pr comments
OussamaSaoudi-db Dec 9, 2024
68a4fb1
Merge branch 'main' into cdf_dv_resolve
OussamaSaoudi-db Dec 9, 2024
3259769
Add test comment, ignore sv to bools test
OussamaSaoudi-db Dec 10, 2024
2e9c29e
remove ignore from test
OussamaSaoudi-db Dec 10, 2024
70fe573
initial schema and expr for phys to log
OussamaSaoudi-db Dec 6, 2024
876dd15
physical to logical transform
OussamaSaoudi-db Dec 8, 2024
35b38d4
logical to physical
OussamaSaoudi-db Dec 8, 2024
dfdc491
remove to_string from generated columns
OussamaSaoudi-db Dec 8, 2024
020a19d
Add read phase and a test
OussamaSaoudi-db Dec 9, 2024
8a92379
factor out test
OussamaSaoudi-db Dec 9, 2024
3d2e53a
Add cdf test and text
OussamaSaoudi-db Dec 9, 2024
8df172f
Add tests for cdf
OussamaSaoudi-db Dec 9, 2024
7846757
remove unneeded import
OussamaSaoudi-db Dec 9, 2024
67a9a18
more formatting
OussamaSaoudi-db Dec 9, 2024
fa042c1
Add some docs
OussamaSaoudi-db Dec 9, 2024
4098b67
Removed allow(unused)
OussamaSaoudi-db Dec 9, 2024
610d62e
Remove data for next PR
OussamaSaoudi-db Dec 9, 2024
4bc5819
Remove dv test file
OussamaSaoudi-db Dec 9, 2024
02599e6
appease clippy
OussamaSaoudi-db Dec 9, 2024
bd43bba
Add expression test
OussamaSaoudi-db Dec 9, 2024
61da4c3
Address PR comments
OussamaSaoudi-db Dec 10, 2024
65bce5f
Remove read_scan_data
OussamaSaoudi-db Dec 10, 2024
bea39ba
fix compiler warnings
OussamaSaoudi-db Dec 10, 2024
5622874
fix test
OussamaSaoudi-db Dec 10, 2024
221b96f
Switch to no timezone
OussamaSaoudi-db Dec 10, 2024
857d644
Address pr comments
OussamaSaoudi-db Dec 10, 2024
496a69a
Merge branch 'main' into cdf_phys_to_log
OussamaSaoudi-db Dec 10, 2024
e3031c3
Remove unneeded changes
OussamaSaoudi-db Dec 10, 2024
88df7fa
make raw mask private
OussamaSaoudi-db Dec 10, 2024
2eeb144
Revert "Remove data for next PR"
OussamaSaoudi-db Dec 9, 2024
376c061
Revert "Remove dv test file"
OussamaSaoudi-db Dec 9, 2024
c03b58f
Move integration tests into cdf.rs
OussamaSaoudi-db Dec 9, 2024
59c90f8
remove unused
OussamaSaoudi-db Dec 9, 2024
6ff5b63
remove failing test
OussamaSaoudi-db Dec 9, 2024
23ce3cf
add back read_scan_file
OussamaSaoudi-db Dec 10, 2024
09b709b
remove unused, move read_scan_file
OussamaSaoudi-db Dec 10, 2024
17eaf3e
Rename read_scan_data
OussamaSaoudi-db Dec 10, 2024
44c8ec9
change to read_scan_file
OussamaSaoudi-db Dec 10, 2024
e3f183c
Refactor assert_batches_sorted_eq
OussamaSaoudi-db Dec 10, 2024
2ecf506
Fix compilation errors
OussamaSaoudi-db Dec 10, 2024
29b7548
Fix tests by projecting timestamp
OussamaSaoudi-db Dec 10, 2024
50b5d45
compress tests
OussamaSaoudi-db Dec 10, 2024
c6c5bef
Remove Clone from ColumnType. Change it to Arc
OussamaSaoudi-db Dec 10, 2024
1e4cc61
Change naming from read_schema to physical_schema
OussamaSaoudi-db Dec 10, 2024
75445af
Merge branch 'main' into cdf_execute
OussamaSaoudi-db Dec 10, 2024
192849d
Remove reimplemented function
OussamaSaoudi-db Dec 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions kernel/src/expressions/scalars.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,16 @@ impl Scalar {
pub fn is_null(&self) -> bool {
matches!(self, Self::Null(_))
}

/// Constructs a Scalar timestamp with no timezone from an `i64` millisecond since unix epoch
pub(crate) fn timestamp_ntz_from_millis(millis: i64) -> DeltaResult<Self> {
let Some(timestamp) = DateTime::from_timestamp_millis(millis) else {
return Err(Error::generic(format!(
"Failed to create millisecond timestamp from {millis}"
)));
};
Ok(Self::TimestampNtz(timestamp.timestamp_micros()))
}
}

impl Display for Scalar {
Expand Down
1 change: 0 additions & 1 deletion kernel/src/log_segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,6 @@ impl LogSegment {
/// `start_version` and `end_version`: Its LogSegment is made of zero checkpoints and all commits
/// between versions `start_version` (inclusive) and `end_version` (inclusive). If no `end_version`
/// is specified it will be the most recent version by default.
#[allow(unused)]
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
pub(crate) fn for_table_changes(
fs_client: &dyn FileSystemClient,
Expand Down
9 changes: 6 additions & 3 deletions kernel/src/scan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ pub struct ScanResult {
pub raw_data: DeltaResult<Box<dyn EngineData>>,
/// Raw row mask.
// TODO(nick) this should be allocated by the engine
raw_mask: Option<Vec<bool>>,
pub(crate) raw_mask: Option<Vec<bool>>,
}

impl ScanResult {
Expand Down Expand Up @@ -160,7 +160,7 @@ impl ScanResult {
/// store the name of the column, as that's all that's needed during the actual query. For
/// `Partition` we store an index into the logical schema for this query since later we need the
/// data type as well to materialize the partition column.
#[derive(PartialEq, Debug)]
#[derive(Clone, PartialEq, Debug)]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clone needed because now we process the selected column types separately for each commit, where the normal log replay scan does it all in one shot?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah good point. The reason is really that I don't want to hold a reference to all_fields in TableChangesScan because the iterator needs to be free from lifetimes.

But really all I need is to Arc it. Now we don't need to clone the Vec.

Note that the existing Scan::execute still borrows self

    pub fn execute(
        &self,
        engine: Arc<dyn Engine>,
    ) -> DeltaResult<impl Iterator<Item = DeltaResult<ScanResult>> + '_> {

as you can see with that anonymous lifetime '_. I was planning on opening a followup PR to take that out too.

pub enum ColumnType {
// A column, selected from the data, as is
Selected(String),
Expand Down Expand Up @@ -384,7 +384,10 @@ pub fn scan_row_schema() -> Schema {
log_replay::SCAN_ROW_SCHEMA.as_ref().clone()
}

fn parse_partition_value(raw: Option<&String>, data_type: &DataType) -> DeltaResult<Scalar> {
pub(crate) fn parse_partition_value(
raw: Option<&String>,
data_type: &DataType,
) -> DeltaResult<Scalar> {
match (raw, data_type.as_primitive_opt()) {
(Some(v), Some(primitive)) => primitive.parse_scalar(v),
(Some(_), None) => Err(Error::generic(format!(
Expand Down
2 changes: 0 additions & 2 deletions kernel/src/table_changes/log_replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ use itertools::Itertools;
mod tests;

/// Scan data for a Change Data Feed query. This holds metadata that is needed to read data rows.
#[allow(unused)]
pub(crate) struct TableChangesScanData {
/// Engine data with the schema defined in [`scan_row_schema`]
///
Expand Down Expand Up @@ -127,7 +126,6 @@ struct LogReplayScanner {
//
// Note: This will be used once an expression is introduced to transform the engine data in
// [`TableChangesScanData`]
#[allow(unused)]
timestamp: i64,
}

Expand Down
13 changes: 9 additions & 4 deletions kernel/src/table_changes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,21 @@ use crate::utils::require;
use crate::{DeltaResult, Engine, Error, Version};

mod log_replay;
mod physical_to_logical;
mod resolve_dvs;
pub mod scan;
mod scan_file;

static CHANGE_TYPE_COL_NAME: &str = "_change_type";
static COMMIT_VERSION_COL_NAME: &str = "_commit_version";
static COMMIT_TIMESTAMP_COL_NAME: &str = "_commit_timestamp";
static ADD_CHANGE_TYPE: &str = "insert";
static REMOVE_CHANGE_TYPE: &str = "delete";
static CDF_FIELDS: LazyLock<[StructField; 3]> = LazyLock::new(|| {
[
StructField::new("_change_type", DataType::STRING, false),
StructField::new("_commit_version", DataType::LONG, false),
StructField::new("_commit_timestamp", DataType::TIMESTAMP, false),
StructField::new(CHANGE_TYPE_COL_NAME, DataType::STRING, false),
StructField::new(COMMIT_VERSION_COL_NAME, DataType::LONG, false),
StructField::new(COMMIT_TIMESTAMP_COL_NAME, DataType::TIMESTAMP, false),
]
});

Expand Down Expand Up @@ -172,7 +178,6 @@ impl TableChanges {
&self.table_root
}
/// The partition columns that will be read.
#[allow(unused)]
pub(crate) fn partition_columns(&self) -> &Vec<String> {
&self.end_snapshot.metadata().partition_columns
}
Expand Down
139 changes: 139 additions & 0 deletions kernel/src/table_changes/physical_to_logical.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
use std::collections::HashMap;
use std::iter;

use itertools::Itertools;

use crate::expressions::Scalar;
use crate::scan::{parse_partition_value, ColumnType};
use crate::schema::{ColumnName, DataType, SchemaRef, StructField, StructType};
use crate::{DeltaResult, Error, Expression};

use super::scan_file::{CdfScanFile, CdfScanFileType};
use super::{
ADD_CHANGE_TYPE, CHANGE_TYPE_COL_NAME, COMMIT_TIMESTAMP_COL_NAME, COMMIT_VERSION_COL_NAME,
REMOVE_CHANGE_TYPE,
};

/// Returns a map from change data feed column name to an expression that generates the row data.
fn get_cdf_columns(scan_file: &CdfScanFile) -> DeltaResult<HashMap<&str, Expression>> {
let timestamp = Scalar::timestamp_ntz_from_millis(scan_file.commit_timestamp)?;
let version = scan_file.commit_version;
let change_type: Expression = match scan_file.scan_type {
CdfScanFileType::Cdc => Expression::column([CHANGE_TYPE_COL_NAME]),
CdfScanFileType::Add => ADD_CHANGE_TYPE.into(),
CdfScanFileType::Remove => REMOVE_CHANGE_TYPE.into(),
};
let expressions = [
(CHANGE_TYPE_COL_NAME, change_type),
(COMMIT_VERSION_COL_NAME, Expression::literal(version)),
(COMMIT_TIMESTAMP_COL_NAME, timestamp.into()),
];
Ok(expressions.into_iter().collect())
}

/// Generates the expression used to convert physical data from the `scan_file` path into logical
/// data matching the `logical_schema`
pub(crate) fn physical_to_logical_expr(
scan_file: &CdfScanFile,
logical_schema: &StructType,
all_fields: &[ColumnType],
) -> DeltaResult<Expression> {
let mut cdf_columns = get_cdf_columns(scan_file)?;
let all_fields = all_fields
.iter()
.map(|field| match field {
ColumnType::Partition(field_idx) => {
let field = logical_schema.fields.get_index(*field_idx);
let Some((_, field)) = field else {
return Err(Error::generic(
"logical schema did not contain expected field, can't transform data",
));
};
let name = field.physical_name();
let value_expression =
parse_partition_value(scan_file.partition_values.get(name), field.data_type())?;
Ok(value_expression.into())
}
ColumnType::Selected(field_name) => {
// Remove to take ownership
let generated_column = cdf_columns.remove(field_name.as_str());
Ok(generated_column.unwrap_or_else(|| ColumnName::new([field_name]).into()))
}
})
.try_collect()?;
Ok(Expression::Struct(all_fields))
}

/// Gets the physical schema that will be used to read data in the `scan_file` path.
pub(crate) fn scan_file_read_schema(
scan_file: &CdfScanFile,
read_schema: &StructType,
) -> SchemaRef {
if scan_file.scan_type == CdfScanFileType::Cdc {
let change_type = StructField::new(CHANGE_TYPE_COL_NAME, DataType::STRING, false);
let fields = read_schema.fields().cloned().chain(iter::once(change_type));
StructType::new(fields).into()
} else {
read_schema.clone().into()
}
}

#[cfg(test)]
mod tests {
use std::collections::HashMap;

use crate::expressions::{column_expr, Expression, Scalar};
use crate::scan::ColumnType;
use crate::schema::{DataType, StructField, StructType};
use crate::table_changes::physical_to_logical::physical_to_logical_expr;
use crate::table_changes::scan_file::{CdfScanFile, CdfScanFileType};
use crate::table_changes::{
ADD_CHANGE_TYPE, CHANGE_TYPE_COL_NAME, COMMIT_TIMESTAMP_COL_NAME, COMMIT_VERSION_COL_NAME,
REMOVE_CHANGE_TYPE,
};

#[test]
fn verify_physical_to_logical_expression() {
let test = |scan_type, expected_expr| {
let scan_file = CdfScanFile {
scan_type,
path: "fake_path".to_string(),
dv_info: Default::default(),
remove_dv: None,
partition_values: HashMap::from([("age".to_string(), "20".to_string())]),
commit_version: 42,
commit_timestamp: 1234,
};
let logical_schema = StructType::new([
StructField::new("id", DataType::STRING, true),
StructField::new("age", DataType::LONG, false),
StructField::new(CHANGE_TYPE_COL_NAME, DataType::STRING, false),
StructField::new(COMMIT_VERSION_COL_NAME, DataType::LONG, false),
StructField::new(COMMIT_TIMESTAMP_COL_NAME, DataType::TIMESTAMP, false),
]);
let all_fields = vec![
ColumnType::Selected("id".to_string()),
ColumnType::Partition(1),
ColumnType::Selected(CHANGE_TYPE_COL_NAME.to_string()),
ColumnType::Selected(COMMIT_VERSION_COL_NAME.to_string()),
ColumnType::Selected(COMMIT_TIMESTAMP_COL_NAME.to_string()),
];
let phys_to_logical_expr =
physical_to_logical_expr(&scan_file, &logical_schema, &all_fields).unwrap();
let expected_expr = Expression::struct_from([
column_expr!("id"),
Scalar::Long(20).into(),
expected_expr,
Expression::literal(42i64),
Scalar::TimestampNtz(1234000).into(), // Microsecond is 1000x millisecond
]);

assert_eq!(phys_to_logical_expr, expected_expr)
};

let cdc_change_type = Expression::column([CHANGE_TYPE_COL_NAME]);
test(CdfScanFileType::Add, ADD_CHANGE_TYPE.into());
test(CdfScanFileType::Remove, REMOVE_CHANGE_TYPE.into());
test(CdfScanFileType::Cdc, cdc_change_type);
}
}
10 changes: 4 additions & 6 deletions kernel/src/table_changes/resolve_dvs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,15 @@ use crate::{DeltaResult, Engine, Error};

/// A [`CdfScanFile`] with its associated `selection_vector`. The `scan_type` is resolved to
/// match the `_change_type` that its rows will have in the change data feed.
#[allow(unused)]
struct ResolvedCdfScanFile {
pub(crate) struct ResolvedCdfScanFile {
/// The scan file that holds the path the data file to be read. The `scan_type` field is
/// resolved to the `_change_type` of the rows for this data file.
scan_file: CdfScanFile,
pub(crate) scan_file: CdfScanFile,
/// Optional vector of bools. If `selection_vector[i] = true`, then that row must be included
/// in the CDF output. Otherwise the row must be filtered out. The vector may be shorter than
/// the data file. In this case, all the remaining rows are *not* selected. If `selection_vector`
/// is `None`, then all rows are selected.
selection_vector: Option<Vec<bool>>,
pub(crate) selection_vector: Option<Vec<bool>>,
}

/// Resolves the deletion vectors for a [`CdfScanFile`]. This function handles two
Expand All @@ -33,8 +32,7 @@ struct ResolvedCdfScanFile {
/// 2. The second case handles all other add, remove, and cdc [`CdfScanFile`]s. These will simply
/// read the deletion vector (if present), and each is converted into a [`ResolvedCdfScanFile`].
/// No changes are made to the `scan_type`.
#[allow(unused)]
fn resolve_scan_file_dv(
pub(crate) fn resolve_scan_file_dv(
engine: &dyn Engine,
table_root: &Url,
scan_file: CdfScanFile,
Expand Down
Loading
Loading