Skip to content

Commit

Permalink
Add methods for constructing LogSegment for Snapshot and for TableC…
Browse files Browse the repository at this point in the history
…hanges (#495)

<!--
Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://github.com/delta-incubator/delta-kernel-rs/blob/main/CONTRIBUTING.md
2. Run `cargo t --all-features --all-targets` to get started testing,
and run `cargo fmt`.
  3. Ensure you have added or run the appropriate tests for your PR.
4. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]
Your PR title ...'.
  5. Be sure to keep the PR description updated to reflect all changes.
-->

## What changes are proposed in this pull request?
<!--
Please clarify what changes you are proposing and why the changes are
needed.
The purpose of this section is to outline the changes, why they are
needed, and how this PR fixes the issue.
If the reason for the change is already explained clearly in an issue,
then it does not need to be restated here.
1. If you propose a new API or feature, clarify the use case for a new
API or feature.
  2. If you fix a bug, you can clarify why it is a bug.
-->
This introduces two methods to construct `LogSegment`. The first is
constructing LogSegment for Snapshots using `LogSegment::for_snapshot`.
The second constructs LogSegment for the upcoming `TableChanges` type.

This PR also refactors log listing functions to reduce duplication in
the code. We do so by creating a function `get_parsed_log_files_iter` to
list, filter, and parse log files.

This adds a test function to `test-utils` called
`delta_path_for_multipart_checkpoint`. This function can be used to
create a multipart checkpoint path.

This replaces the changes proposed in #457
<!--
Uncomment this section if there are any changes affecting public APIs:
### This PR affects the following public APIs

If there are breaking changes, please ensure the `breaking-changes`
label gets added by CI, and describe why the changes are needed.

Note that _new_ public APIs are not considered breaking.
-->


## How was this change tested?
<!--
Please make sure to add test cases that check the changes thoroughly
including negative and positive cases if possible.
If it was tested in a way different from regular unit tests, please
clarify how you tested, ideally via a reproducible test documented in
the PR description.
-->
This change introduces tests for the following:
- reading log with out of date checkpoint hint
- reading log with up to date checkpoint hint
- creating snapshot log segment without a checkpoint hint
- Creating snapshot with a multi-part checkpoint
- Multipart checkpoint with incorrect number of parts fails.
- creating snapshot with a start checkpoint and an end time travel
version
- Creating a snapshot with a checkpoint hint higher than the time travel
version
- Creating log segments for table changes
- Checking contiguity of the log is always preserved.
- Checking that `for_table_changes` fails when the start version >
end_version

This PR also adds an ignored test that checks for desired behaviour. The
test `build_snapshot_with_missing_checkpoint_part_no_hint` checks that
an incomplete checkpoint is not used in a LogSegment. A checkpoint is
incomplete if it does not have all the parts specified in
`LogPathFileType::MultiPartCheckpoint.num_parts`.

---------

Co-authored-by: Ryan Johnson <[email protected]>
Co-authored-by: Zach Schuermann <[email protected]>
  • Loading branch information
3 people authored Nov 15, 2024
1 parent 4ad2f8b commit 3e7ad45
Show file tree
Hide file tree
Showing 3 changed files with 824 additions and 323 deletions.
361 changes: 288 additions & 73 deletions kernel/src/log_segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,184 @@
use crate::actions::{get_log_schema, Metadata, Protocol, METADATA_NAME, PROTOCOL_NAME};
use crate::path::ParsedLogPath;
use crate::schema::SchemaRef;
use crate::{DeltaResult, Engine, EngineData, Error, Expression, ExpressionRef, FileMeta};

use crate::snapshot::CheckpointMetadata;
use crate::utils::require;
use crate::{
DeltaResult, Engine, EngineData, Error, Expression, ExpressionRef, FileSystemClient, Version,
};
use itertools::Itertools;
use std::cmp::Ordering;
use std::convert::identity;
use std::sync::{Arc, LazyLock};
use tracing::warn;
use url::Url;

#[cfg(test)]
mod tests;

/// A [`LogSegment`] represents a contiguous section of the log and is made of checkpoint files
/// and commit files and guarantees the following:
/// 1. Commit file versions will not have any gaps between them.
/// 2. If checkpoint(s) is/are present in the range, only commits with versions greater than the most
/// recent checkpoint version are retained. There will not be a gap between the checkpoint
/// version and the first commit version.
/// 3. All checkpoint_parts must belong to the same checkpoint version, and must form a complete
/// version. Multi-part checkpoints must have all their parts.
///
/// [`LogSegment`] is used in [`Snapshot`] when built with [`LogSegment::for_snapshot`], and
/// and in `TableChanges` when built with [`LogSegment::for_table_changes`].
///
/// [`Snapshot`]: crate::snapshot::Snapshot
#[derive(Debug)]
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
pub(crate) struct LogSegment {
pub end_version: Version,
pub log_root: Url,
/// Reverse order sorted commit files in the log segment
pub commit_files: Vec<ParsedLogPath>,
/// checkpoint files in the log segment.
pub checkpoint_files: Vec<ParsedLogPath>,
/// Sorted commit files in the log segment (ascending)
pub ascending_commit_files: Vec<ParsedLogPath>,
/// Checkpoint files in the log segment.
pub checkpoint_parts: Vec<ParsedLogPath>,
}

impl LogSegment {
fn try_new(
ascending_commit_files: Vec<ParsedLogPath>,
checkpoint_parts: Vec<ParsedLogPath>,
log_root: Url,
end_version: Option<Version>,
) -> DeltaResult<Self> {
// We require that commits that are contiguous. In other words, there must be no gap between commit versions.
require!(
ascending_commit_files
.windows(2)
.all(|cfs| cfs[0].version + 1 == cfs[1].version),
Error::generic(format!(
"Expected ordered contiguous commit files {:?}",
ascending_commit_files
))
);

// There must be no gap between a checkpoint and the first commit version. Note that
// that all checkpoint parts share the same version.
if let (Some(checkpoint_file), Some(commit_file)) =
(checkpoint_parts.first(), ascending_commit_files.first())
{
require!(
checkpoint_file.version + 1 == commit_file.version,
Error::generic(format!(
"Gap between checkpoint version {} and next commit {}",
checkpoint_file.version, commit_file.version,
))
)
}

// Get the effective version from chosen files
let version_eff = ascending_commit_files
.last()
.or(checkpoint_parts.first())
.ok_or(Error::generic("No files in log segment"))?
.version;
if let Some(end_version) = end_version {
require!(
version_eff == end_version,
Error::generic(format!(
"LogSegment end version {} not the same as the specified end version {}",
version_eff, end_version
))
);
}
Ok(LogSegment {
end_version: version_eff,
log_root,
ascending_commit_files,
checkpoint_parts,
})
}

/// Constructs a [`LogSegment`] to be used for [`Snapshot`]. For a `Snapshot` at version `n`:
/// Its LogSegment is made of zero or one checkpoint, and all commits between the checkpoint up
/// to and including the end version `n`. Note that a checkpoint may be made of multiple
/// parts. All these parts will have the same checkpoint version.
///
/// The options for constructing a LogSegment for Snapshot are as follows:
/// - `checkpoint_hint`: a `CheckpointMetadata` to start the log segment from (e.g. from reading the `last_checkpoint` file).
/// - `time_travel_version`: The version of the log that the Snapshot will be at.
///
/// [`Snapshot`]: crate::snapshot::Snapshot
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
pub(crate) fn for_snapshot(
fs_client: &dyn FileSystemClient,
log_root: Url,
checkpoint_hint: impl Into<Option<CheckpointMetadata>>,
time_travel_version: impl Into<Option<Version>>,
) -> DeltaResult<Self> {
let time_travel_version = time_travel_version.into();

let (mut ascending_commit_files, checkpoint_parts) =
match (checkpoint_hint.into(), time_travel_version) {
(Some(cp), None) => {
list_log_files_with_checkpoint(&cp, fs_client, &log_root, None)?
}
(Some(cp), Some(end_version)) if cp.version <= end_version => {
list_log_files_with_checkpoint(&cp, fs_client, &log_root, Some(end_version))?
}
_ => list_log_files_with_version(fs_client, &log_root, None, time_travel_version)?,
};

// Commit file versions must be greater than the most recent checkpoint version if it exists
if let Some(checkpoint_file) = checkpoint_parts.first() {
ascending_commit_files.retain(|log_path| checkpoint_file.version < log_path.version);
}

LogSegment::try_new(
ascending_commit_files,
checkpoint_parts,
log_root,
time_travel_version,
)
}

/// Constructs a [`LogSegment`] to be used for `TableChanges`. For a TableChanges between versions
/// `start_version` and `end_version`: Its LogSegment is made of zero checkpoints and all commits
/// between versions `start_version` (inclusive) and `end_version` (inclusive). If no `end_version`
/// is specified it will be the most recent version by default.
#[allow(unused)]
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
pub(crate) fn for_table_changes(
fs_client: &dyn FileSystemClient,
log_root: Url,
start_version: Version,
end_version: impl Into<Option<Version>>,
) -> DeltaResult<Self> {
let end_version = end_version.into();
if let Some(end_version) = end_version {
if start_version > end_version {
return Err(Error::generic(
"Failed to build LogSegment: start_version cannot be greater than end_version",
));
}
}

let ascending_commit_files: Vec<_> =
list_log_files(fs_client, &log_root, start_version, end_version)?
.filter_ok(|x| x.is_commit())
.try_collect()?;

// - Here check that the start version is correct.
// - [`LogSegment::try_new`] will verify that the `end_version` is correct if present.
// - [`LogSegment::try_new`] also checks that there are no gaps between commits.
// If all three are satisfied, this implies that all the desired commits are present.
require!(
ascending_commit_files
.first()
.is_some_and(|first_commit| first_commit.version == start_version),
Error::generic(format!(
"Expected the first commit to have version {}",
start_version
))
);
LogSegment::try_new(ascending_commit_files, vec![], log_root, end_version)
}
/// Read a stream of log data from this log segment.
///
/// The log files will be read from most recent to oldest.
Expand All @@ -40,30 +201,27 @@ impl LogSegment {
checkpoint_read_schema: SchemaRef,
meta_predicate: Option<ExpressionRef>,
) -> DeltaResult<impl Iterator<Item = DeltaResult<(Box<dyn EngineData>, bool)>> + Send> {
// `replay` expects commit files to be sorted in descending order, so we reverse the sorted
// commit files
let commit_files: Vec<_> = self
.ascending_commit_files
.iter()
.rev()
.map(|f| f.location.clone())
.collect();
let commit_stream = engine
.get_json_handler()
.read_json_files(
&self
.commit_files
.iter()
.map(|f| f.location.clone())
.collect::<Vec<FileMeta>>(),
commit_read_schema,
meta_predicate.clone(),
)?
.read_json_files(&commit_files, commit_read_schema, meta_predicate.clone())?
.map_ok(|batch| (batch, true));

let checkpoint_parts: Vec<_> = self
.checkpoint_parts
.iter()
.map(|f| f.location.clone())
.collect();
let checkpoint_stream = engine
.get_parquet_handler()
.read_parquet_files(
&self
.checkpoint_files
.iter()
.map(|f| f.location.clone())
.collect::<Vec<FileMeta>>(),
checkpoint_read_schema,
meta_predicate,
)?
.read_parquet_files(&checkpoint_parts, checkpoint_read_schema, meta_predicate)?
.map_ok(|batch| (batch, false));

Ok(commit_stream.chain(checkpoint_stream))
Expand Down Expand Up @@ -112,54 +270,111 @@ impl LogSegment {
}
}

#[cfg(test)]
mod tests {
use std::path::PathBuf;

use itertools::Itertools;

use crate::{engine::sync::SyncEngine, Table};

// NOTE: In addition to testing the meta-predicate for metadata replay, this test also verifies
// that the parquet reader properly infers nullcount = rowcount for missing columns. The two
// checkpoint part files that contain transaction app ids have truncated schemas that would
// otherwise fail skipping due to their missing nullcount stat:
//
// Row group 0: count: 1 total(compressed): 111 B total(uncompressed):107 B
// --------------------------------------------------------------------------------
// type nulls min / max
// txn.appId BINARY 0 "3ae45b72-24e1-865a-a211-3..." / "3ae45b72-24e1-865a-a211-3..."
// txn.version INT64 0 "4390" / "4390"
#[test]
fn test_replay_for_metadata() {
let path = std::fs::canonicalize(PathBuf::from("./tests/data/parquet_row_group_skipping/"));
let url = url::Url::from_directory_path(path.unwrap()).unwrap();
let engine = SyncEngine::new();

let table = Table::new(url);
let snapshot = table.snapshot(&engine, None).unwrap();
let data: Vec<_> = snapshot
.log_segment
.replay_for_metadata(&engine)
.unwrap()
.try_collect()
.unwrap();

// The checkpoint has five parts, each containing one action:
// 1. txn (physically missing P&M columns)
// 2. metaData
// 3. protocol
// 4. add
// 5. txn (physically missing P&M columns)
//
// The parquet reader should skip parts 1, 3, and 5. Note that the actual `read_metadata`
// always skips parts 4 and 5 because it terminates the iteration after finding both P&M.
//
// NOTE: Each checkpoint part is a single-row file -- guaranteed to produce one row group.
//
// WARNING: https://github.com/delta-incubator/delta-kernel-rs/issues/434 -- We currently
// read parts 1 and 5 (4 in all instead of 2) because row group skipping is disabled for
// missing columns, but can still skip part 3 because has valid nullcount stats for P&M.
assert_eq!(data.len(), 4);
/// Returns a fallible iterator of [`ParsedLogPath`] that are between the provided `start_version` (inclusive)
/// and `end_version` (inclusive). [`ParsedLogPath`] may be a commit or a checkpoint. If `start_version` is
/// not specified, the files will begin from version number 0. If `end_version` is not specified, files up to
/// the most recent version will be included.
///
/// Note: this calls [`FileSystemClient::list_from`] to get the list of log files.
fn list_log_files(
fs_client: &dyn FileSystemClient,
log_root: &Url,
start_version: impl Into<Option<Version>>,
end_version: impl Into<Option<Version>>,
) -> DeltaResult<impl Iterator<Item = DeltaResult<ParsedLogPath>>> {
let start_version = start_version.into().unwrap_or(0);
let end_version = end_version.into();
let version_prefix = format!("{:020}", start_version);
let start_from = log_root.join(&version_prefix)?;

Ok(fs_client
.list_from(&start_from)?
.map(|meta| ParsedLogPath::try_from(meta?))
// TODO this filters out .crc files etc which start with "." - how do we want to use these kind of files?
.filter_map_ok(identity)
.take_while(move |path_res| match path_res {
Ok(path) => !end_version.is_some_and(|end_version| end_version < path.version),
Err(_) => true,
}))
}
/// List all commit and checkpoint files with versions above the provided `start_version` (inclusive).
/// If successful, this returns a tuple `(ascending_commit_files, checkpoint_parts)` of type
/// `(Vec<ParsedLogPath>, Vec<ParsedLogPath>)`. The commit files are guaranteed to be sorted in
/// ascending order by version. The elements of `checkpoint_parts` are all the parts of the same
/// checkpoint. Checkpoint parts share the same version.
fn list_log_files_with_version(
fs_client: &dyn FileSystemClient,
log_root: &Url,
start_version: Option<Version>,
end_version: Option<Version>,
) -> DeltaResult<(Vec<ParsedLogPath>, Vec<ParsedLogPath>)> {
// We expect 10 commit files per checkpoint, so start with that size. We could adjust this based
// on config at some point
let mut commit_files = Vec::with_capacity(10);
let mut checkpoint_parts = vec![];
let mut max_checkpoint_version = start_version;

for parsed_path in list_log_files(fs_client, log_root, start_version, end_version)? {
let parsed_path = parsed_path?;
if parsed_path.is_commit() {
commit_files.push(parsed_path);
} else if parsed_path.is_checkpoint() {
let path_version = parsed_path.version;
match max_checkpoint_version {
None => {
checkpoint_parts.push(parsed_path);
max_checkpoint_version = Some(path_version);
}
Some(checkpoint_version) => match path_version.cmp(&checkpoint_version) {
Ordering::Greater => {
max_checkpoint_version = Some(path_version);
checkpoint_parts.clear();
checkpoint_parts.push(parsed_path);
}
Ordering::Equal => checkpoint_parts.push(parsed_path),
Ordering::Less => {}
},
}
}
}

Ok((commit_files, checkpoint_parts))
}

/// List all commit and checkpoint files after the provided checkpoint. It is guaranteed that all
/// the returned [`ParsedLogPath`]s will have a version less than or equal to the `end_version`.
/// See [`list_log_files_with_version`] for details on the return type.
fn list_log_files_with_checkpoint(
checkpoint_metadata: &CheckpointMetadata,
fs_client: &dyn FileSystemClient,
log_root: &Url,
end_version: Option<Version>,
) -> DeltaResult<(Vec<ParsedLogPath>, Vec<ParsedLogPath>)> {
let (commit_files, checkpoint_parts) = list_log_files_with_version(
fs_client,
log_root,
Some(checkpoint_metadata.version),
end_version,
)?;

let Some(latest_checkpoint) = checkpoint_parts.last() else {
// TODO: We could potentially recover here
return Err(Error::generic(
"Had a _last_checkpoint hint but didn't find any checkpoints",
));
};
if latest_checkpoint.version != checkpoint_metadata.version {
warn!(
"_last_checkpoint hint is out of date. _last_checkpoint version: {}. Using actual most recent: {}",
checkpoint_metadata.version,
latest_checkpoint.version
);
} else if checkpoint_parts.len() != checkpoint_metadata.parts.unwrap_or(1) {
return Err(Error::Generic(format!(
"_last_checkpoint indicated that checkpoint should have {} parts, but it has {}",
checkpoint_metadata.parts.unwrap_or(1),
checkpoint_parts.len()
)));
}
Ok((commit_files, checkpoint_parts))
}
Loading

0 comments on commit 3e7ad45

Please sign in to comment.