-
Notifications
You must be signed in to change notification settings - Fork 67
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ScanFile expression and visitor for CDF #546
ScanFile expression and visitor for CDF #546
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #546 +/- ##
==========================================
+ Coverage 82.28% 82.45% +0.16%
==========================================
Files 71 72 +1
Lines 15777 16054 +277
Branches 15777 16054 +277
==========================================
+ Hits 12982 13237 +255
- Misses 2167 2175 +8
- Partials 628 642 +14 ☔ View full report in Codecov by Sentry. |
febf7b1
to
e384be6
Compare
7f1098b
to
6190992
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks fine. Please note in the PR that this is internal for now. That is, at some point we may want to expose this to engines, but for the first draft of CDF we are only allowing a full execute()
which just uses this internally and doesn't expose it
/// | ||
/// } | ||
/// ``` | ||
pub(crate) fn scan_row_schema() -> SchemaRef { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably doesn't need to be pub(crate)
. I think it should only be used in this file
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will be used by the LogReplayScanner
since this is the output schema for the transformed EngineData.
6190992
to
2eb5383
Compare
todo: rename to |
TODO: Remove the comment on schema because that can become out of sync |
TODO: Make these changes: #540 (comment) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
quick drive-by, but will wait for pending changes to do a full review
#[allow(unused)] | ||
pub(crate) fn scan_data_to_scan_file( | ||
scan_data: impl Iterator<Item = DeltaResult<TableChangesScanData>>, | ||
) -> impl Iterator<Item = DeltaResult<(CDFScanFile, Option<Arc<HashMap<String, DvInfo>>>)>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That type is a real mouthful... worth factoring it out as a named type
or even a struct
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was considering introducing two types:
#[allow(unused)]
pub(crate) struct UnresolvedCDFScanFile {
scan_file: CDFScanFile,
remove_dvs: Arc<HashMap<String, DvInfo>>,
}
#[allow(unused)]
pub(crate) struct ResolvedCDFScanFile {
scan_file: CDFScanFile,
selection_vector: Vec<bool>,
}
Because the step after this will be to resolve deletion vectors. Is this sensible naming?
3e7202f
to
e3504c5
Compare
@scovich you had a question about whether we need to use unsigned ints to track commit versions. I'll check the spec at some point. Visitor code expects that all numbers are signed ints, but we declare |
Can we make a tracking issue, with the goal to decide once and for all our stance on signed vs. unsigned? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
given that this is all internal, stamping for now and we can iterate later if needed
} | ||
|
||
fn selected_column_names_and_types(&self) -> (&'static [ColumnName], &'static [DataType]) { | ||
static NAMES_AND_TYPES: LazyLock<ColumnNamesAndTypes> = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's a really nice way to handle this :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Approach looks sound. Several nits to address (especially the one about schema prining/nullability), but it should be quick and easy.
use crate::{DeltaResult, EngineData, Error, RowVisitor}; | ||
|
||
#[allow(unused)] | ||
pub(crate) struct UnresolvedCDFScanFile { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
naming nit: There are no good ways to handle acronyms in CamelCase, but I believe the general consensus is that the acronym should be init-cap instead of all-cap so readers can tell where "word" boundaries are
pub(crate) struct UnresolvedCDFScanFile { | |
pub(crate) struct UnresolvedCdfScanFile { |
(see e.g. Cdf
action, DvInfo
below, etc)
(many more below)
remove_dvs: Arc<HashMap<String, DvInfo>>, | ||
} | ||
#[allow(unused)] | ||
pub(crate) struct ResolvedCDFScanFile { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be helpful to have doc comments explaining the difference between "resolved" and "unresolved" CDF files? (I believe it refers to whether the file's deletion vector has been loaded yet)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added docs
let deletion_vector = StructType::new([ | ||
StructField::new("storageType", DataType::STRING, true), | ||
StructField::new("pathOrInlineDv", DataType::STRING, true), | ||
StructField::new("offset", DataType::INTEGER, true), | ||
StructField::new("sizeInBytes", DataType::INTEGER, true), | ||
StructField::new("cardinality", DataType::LONG, true), | ||
]); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AFAIK, this schema is not pruned; should we just use DeletionVectorDescriptor::to_schema
instead?
(I think the answer depends on whether we want this read to automatically pick up any new fields added in the future, or if we intentionally restrict it to this specific set of fields)
NOTE: Most of these fields are non-nullable, which is allowed for a read schema. The rule that e.g. arrow-rust follows is that a non-nullable field is allowed inside a nullable field, and will have a null value exactly and only when the outer field was already null. That seems a useful thing to enforce for the read schema.
The non-nullable observation applies to several fields in other structs below, like {add, remove, cdc}.path; the fileConstantValues
field should probably be non-nullable as well, but it won't matter in practice because that's anyway a generated field. Note that the Delta spec is ambiguous about whether partitionValues
is nullable: it's required for Add, optional for Remove, and unspecified for Cdc.
NOTE: Visitor schemas are different from read schemas, because visitor schemas are exploded out as stand-alone leaf fields. At that point all parent/child relationships are lost, so a field passed to the visitor must be nullable if it or any parent is nullable. Which means all-nullable for all action visitors. Which is why the visitor API takes name/type pairs (no nullability param) and the getters are all nullable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we just use DeletionVectorDescriptor::to_schema instead
I'd like to, but we hard code the number of fields in the visitor getters. So any change to the DV schema would break the visitor.
The schema stuff makes sense! I'll keep the top level fields nullable and patch up the nested ones.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do the nullability semantics work atm?
I tried the following schema:
static CDF_SCAN_ROW_SCHEMA: LazyLock<Arc<StructType>> = LazyLock::new(|| {
let deletion_vector = StructType::new([
StructField::new("storageType", DataType::STRING, false),
StructField::new("pathOrInlineDv", DataType::STRING, false),
StructField::new("offset", DataType::INTEGER, true),
StructField::new("sizeInBytes", DataType::INTEGER, false),
StructField::new("cardinality", DataType::LONG, false),
]);
let partition_values = MapType::new(DataType::STRING, DataType::STRING, true);
let file_constant_values =
StructType::new([StructField::new("partitionValues", partition_values, true)]);
let add = StructType::new([
StructField::new("path", DataType::STRING, false),
StructField::new("deletionVector", deletion_vector.clone(), true),
StructField::new("fileConstantValues", file_constant_values.clone(), false),
]);
let remove = StructType::new([
StructField::new("path", DataType::STRING, false),
StructField::new("deletionVector", deletion_vector, true),
StructField::new("fileConstantValues", file_constant_values.clone(), false),
]);
let cdc = StructType::new([
StructField::new("path", DataType::STRING, false),
StructField::new("fileConstantValues", file_constant_values, false),
]);
Arc::new(StructType::new([
StructField::new("add", add, true),
StructField::new("remove", remove, true),
StructField::new("cdc", cdc, true),
StructField::new("timestamp", DataType::LONG, false),
StructField::new("commit_version", DataType::LONG, false),
]))
});
The top level is nullable, but I seem to get an issue with arrow on the path
field of all things!
Arrow(InvalidArgumentError("Found unmasked nulls for non-
nullable StructArray field \"path\"")
My guess is that it's treating path as non-nullable even when the top level is stated to be nullable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also given that partitionValues is technically nullable for removes, I'm constructing a default map if I don't find anything. Is this necessary/sensible?
let partition_values = partition_values.unwrap_or_else(Default::default);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Found unmasked nulls for non-nullable StructArray field "path"
That error indicates that add
was non-null but path
was null, which is definitely not a good thing. Do you have a backtrace? I wonder if this tickled another latent bug in our arrow data code, similar to the one that @nicklan hit a while back?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm I'll try to track this down and report back 🫡
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay I've narrowed down the point in the code where we fail for this. It's actually in the expression transformation, and not the visitor!
The specific line is here. I don't see anything that handles nested nullability, but maybe arrow is supposed to do that on its own?
I think I've got a reasonably MRE. I'll post an issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll punt on this for now and keep everything nullable. This is what we do in the normal read path, so I think it should be okay for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
attn @nick
73bd980
to
bd9585d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add a scan_data_to_scan_file
test (just something through that API) otherwise LGTM!
(and we can take some of the todo's above from my other reviews as follow-up or just quickly in this PR, either way) |
and this shouldn't be a breaking change right? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Late find while reviewing #568
let scan_data = scan_data?; | ||
let callback: CDFScanCallback<Vec<CDFScanFile>> = | ||
|context, scan_file| context.push(scan_file); | ||
let result = visit_cdf_scan_files( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand why this method can't take scan_data
directly (instead of breaking it up and passing (almost) all its fields as individual args? Then we could directly probe the remove_dvs while visiting adds, and CDFScanFile
could hold dv_info for both add and remove, and the next PR would have a much easier time resolving DVs (no need to pass the hashmap to it).
Is it attempting to preserve similarity to the analagous visit_scan_files
or something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, I was preserving the similarity with visit_scan-files
. iirc this part of the code has ffi-related reasons for being the way that it is 🤔 I'll look into it, but I'll ping @nicklan if he has input
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I ended up switching it and it simplified it nicely. Ig we can deal with engine support later given that we aren't targeting that rn
@zachschuermann @scovich I updated the test to use the log replay. PTAL |
let deletion_vector = StructType::new([ | ||
StructField::new("storageType", DataType::STRING, true), | ||
StructField::new("pathOrInlineDv", DataType::STRING, true), | ||
StructField::new("offset", DataType::INTEGER, true), | ||
StructField::new("sizeInBytes", DataType::INTEGER, true), | ||
StructField::new("cardinality", DataType::LONG, true), | ||
]); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
attn @nick
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM couple questions/nits!
use crate::{DeltaResult, Error, RowVisitor}; | ||
|
||
// The type of action associated with a [`CdfScanFile`]. | ||
#[allow(unused)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's not forget to remove all the #[allow(unused)]
when we actually use it :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I think I'll leave this to the final execute
pr.
/// Transforms an iterator of TableChangesScanData into an iterator of | ||
/// `UnresolvedCdfScanFile` by visiting the engine data. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/// Transforms an iterator of TableChangesScanData into an iterator of | |
/// `UnresolvedCdfScanFile` by visiting the engine data. | |
/// Transforms an iterator of [`TableChangesScanData`] into an iterator of | |
/// [`CdfScanFile`] by visiting the engine data. |
struct CdfScanFileVisitor<'a, T> { | ||
callback: CdfScanCallback<T>, | ||
selection_vector: &'a [bool], | ||
remove_dvs: &'a Arc<HashMap<String, DvInfo>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why ref arc? should this just be arc?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Didn't see a reason to do an entire arc clone in the loop. I changed this to instead have &'a HashMap<String, DvInfo>
, and we use the arc as_ref
What changes are proposed in this pull request?
This PR introduces four concepts:
cdf_scan_row_schema
: This is the schema that engine data will be transformed into at the end of the log replay phase. This schema prunes the log schema down only to the fields necessary to produce CDF columns.cdf_scan_row_expression
: This is a function that generates an expression to transform an engine data into thecdf_scan_row_schema
. The function takes timestamp and commit number as arguments because it inserts these as columns into the output engine data.CDFScanFile
: This is a type that holds all the information needed to read a data file and generate its CDF rows. It holds path, deletion vector, the type of action, and the paired remove deletion vector. The action type is encoded as an enumCDFScanFileType
CDFScanFileVisitor
: This is a visitor that reads engine data with thecdf_scan_row_schema
and constructsCDFScanFile
s.This PR is only for internal use, and is only expected to be used by
TableChangesScan::execute
when it is implemented. Engines must not use the visitor norCDFScanFile
.This PR affects the following public APIs
No public APIs are changed.
How was this change tested?
I generate a table with add, remove and cdc actions. Then:
table_changes_action_iter
which in transforms the engine data into thecdf_scan_row_schema
using thecdf_scan_row_expression
CDFScanFileVisitor
and assert that theCDFScanFile
s are as expected.This test checks the following cases:
None
partition values. An empty hashmap for partition values should be used.