-
Notifications
You must be signed in to change notification settings - Fork 54
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[write] Transaction append data API #393
[write] Transaction append data API #393
Conversation
cc33640
to
f4eabf0
Compare
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #393 +/- ##
==========================================
+ Coverage 78.41% 78.88% +0.46%
==========================================
Files 55 55
Lines 11806 12157 +351
Branches 11806 12157 +351
==========================================
+ Hits 9258 9590 +332
- Misses 2041 2047 +6
- Partials 507 520 +13 ☔ View full report in Codecov by Sentry. |
note: did a sanity check and we can read partitioned table with delta-spark (v3.2.0)
|
kernel/src/engine/default/mod.rs
Outdated
pub fn get_parquet_handler(&self) -> Arc<DefaultParquetHandler<E>> { | ||
self.parquet.clone() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't particularly like this but we need a way to get back the actual DefaultParquetHandler
instead of the type-erased Arc<dyn ParquetHandler>
. I think we can do it a couple ways: either like this, or we implement as_any to allow us to just cast the types. thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this because we want to be able to call write_parquet_files
? Surely that should be part of the ParquetHandler
api spec?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yup! we originally didn't want to impose that requirement on engines since they should elect to just write parquet files however they want. if we added it to ParquetHandler then we would be requiring an API that we (kernel) don't actually use (only the default engine). That being said, we will likely need a write_parquet_files
API for writing checkpoints so we could just introduce the requirement here and use it in the default engine but only really require it in the future once we are writing checkpoints
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mmm okay. yeah the other options are a bit... messy, downcasting is not trivial.
can we make it pub(crate)
or does that not work?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unfortunately doesn't work - could use developer-visibility but that's the best i could come up with for hiding it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like other engines will face a similar issue, and even our own default engine already hit it previously with trait EngineData
(see its as_any
and into_any
methods). We also know it will be a Big Deal for FFI, where we need the ability to downcast not merely to a concrete rust struct, but to a native engine's C/C++ struct.
I suspect it's time to generalize any-casting capability and bake it into all these engine traits, i.e. #450?
@@ -52,7 +52,7 @@ impl JsonHandler for SyncJsonHandler { | |||
fn write_json_file( | |||
&self, | |||
path: &Url, | |||
data: Box<dyn Iterator<Item = Box<dyn EngineData>> + Send>, | |||
data: Box<dyn Iterator<Item = DeltaResult<Box<dyn EngineData>>> + Send + '_>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
curious of what people think of this API change. if we don't propagate results to here then we end up in a weird situation of either having to traverse everything to filter out Err
cases or just unwap stuff? is there a better way to handle?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What are we trying to achieve? Seems like it'll still be an issue that we might write some data and then fail. Without checking the whole thing for Err
cases I'm not sure how you can avoid that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have things like expression evaluation happening at every chunk of the iterator (e.g. before we get here we do the transformation from 'write_metadata' to actions) which might fail. Thus we have to 'hold on' to those Result
s and propagate them through to this API. And yea not sure if there's a way around that other than something that scans the whole thing ahead of time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But are you going to somehow recover from those errors? Why not just propagate earlier?
To make sure I'm getting it. This would for example propagate any errors in generate_commit_info
down here, rather than early returning from commit()
when generate_commit_info
failed, right? Early fail in commit actually seems better, so I'm not seeing the benefit here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But are you going to somehow recover from those errors? Why not just propagate earlier?
probably not? but unsure how to propagate early without traversing the whole iterator.
To make sure I'm getting it. This would for example propagate any errors in generate_commit_info down here, rather than early returning from commit() when generate_commit_info failed, right? Early fail in commit actually seems better, so I'm not seeing the benefit here.
since generate_commit_info
just always appends the first batch and we could fail early I'll use generate_adds
as an example: we lazily generate the adds to produce an impl Iterator<Item = DeltaResult<Box<dyn EngineData>>>
. that is, generate_adds
takes an iterator of write_metadata
and produces an iterator of result (as engine data). The result is injected here since we do some fallible work like expression evaluation to transform into the actions. Then, we are left with handing this iterator of results to the write_json API unless we want to traverse the whole iterator prior to passing it off to check that one of the batches didn't fail.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ahh, finally I get it. Yeah, we want to let the errors propagate until we need their result. This makes sense. Thanks for bearing with me :)
@@ -56,14 +81,14 @@ impl Transaction { | |||
read_snapshot: snapshot.into(), | |||
operation: None, | |||
commit_info: None, | |||
write_metadata: vec![], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
note we make write_metadata
a vec so that we can keep all the batches to traverse multiple times in the case we need to retry the commit. one alternative here is to just require the engine to do the concatenation and hand us one big batch of write metadata
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Java kernel requires engine to hand over an iterator, potentially multiple times if there are retries. That way it's engine's job to deal with memory management and spilling on big commits.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea I think we could have them pass something that's impl IntoIterator
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
opened #465 to have some more discussion and take this as a follow-up :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
flushing some initial comments.
kernel/src/engine/default/mod.rs
Outdated
pub fn get_parquet_handler(&self) -> Arc<DefaultParquetHandler<E>> { | ||
self.parquet.clone() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this because we want to be able to call write_parquet_files
? Surely that should be part of the ParquetHandler
api spec?
let batch: Box<_> = ArrowEngineData::try_from_engine_data(data)?; | ||
let record_batch = batch.record_batch(); | ||
|
||
let mut buffer = vec![]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It sucks that object_store doesn't have any notion of taking a "stream of bytes" and instead wants everything as a single chunk.
We should probably at least file a tracking issue to see if we can improve this somehow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think a streaming write to an object store would have to take the form of a multipart upload which object_store
does support with the MultipartUpload
trait and perhaps we can just implement that in some async way to stream our data? also ran across WriteMultipart
but I'll do some more digging.
I've already got #418 open for doing streaming JSON puts, and i've just gone ahead and expanded that issue to be streaming JSON/parquet since I think it will generally be the same
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That WriteMultipart
does indeed seem relevant. Looks like it's up to us to sort out simple PUT vs. MPU tho (similar to how arrow parquet reads are super low-level, I guess).
Parquet will be trickier because somebody (engine or default engine) has to encode+compress the bytes that will eventually be written. Parquet is designed to be streamed (row groups, and footer comes last), but I don't know how well arrow-parquet supports it for default client? There's e.g. SerializedFileWriter that uses Write, so I guess we'd have to manually tie that in with the object store machinery (WriteMultipart
doesn't seem to impl Write
). Our Write
impl would have to decide PUT vs. MPU dynamically based on data size being written, I suppose.
Also: I hope there's some easy way to write out arrow data as parquet? From my quick skim, it looks like we'd be wrangling individual row groups and column chunks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep gonna punt on this for now and optimize in a follow-up.
Also: I hope there's some easy way to write out arrow data as parquet? From my quick skim, it looks like we'd be wrangling individual row groups and column chunks.
we are just using parquet::arrow::arrow_writer::ArrowWriter
here! (rather straightforward to use to write record batches - not sure if that's what you were asking)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Without disagreeing with anything above, I also meant that object_store
should be able to just do a "normal" put
on anything that implements std::io::Read
.
It's going to write it all as the body of the request out to the network/disk, so it should be able to just stream the bytes rather than needing them in one continuous chunk.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah yea got it! I can create an issue for this since it's seeming different than the full 'stream to multipart' one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, too much time invested in my aside here. It would be an issue on object_store
which we could look at, but probably not now, and probably not something to make an issue for in this repo.
@@ -52,7 +52,7 @@ impl JsonHandler for SyncJsonHandler { | |||
fn write_json_file( | |||
&self, | |||
path: &Url, | |||
data: Box<dyn Iterator<Item = Box<dyn EngineData>> + Send>, | |||
data: Box<dyn Iterator<Item = DeltaResult<Box<dyn EngineData>>> + Send + '_>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What are we trying to achieve? Seems like it'll still be an issue that we might write some data and then fail. Without checking the whole thing for Err
cases I'm not sure how you can avoid that?
kernel/src/engine/default/mod.rs
Outdated
pub fn get_parquet_handler(&self) -> Arc<DefaultParquetHandler<E>> { | ||
self.parquet.clone() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mmm okay. yeah the other options are a bit... messy, downcasting is not trivial.
can we make it pub(crate)
or does that not work?
kernel/src/engine/default/parquet.rs
Outdated
/// Metadata of a parquet file, currently just includes the file metadata but will expand to | ||
/// include file statistics and other metadata in the future. | ||
#[derive(Debug)] | ||
pub struct ParquetMetadata { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggest giving this a different name to avoid confusion with the parquet crate's metadata
I wanted to suggest just using their types, since there is overlap, but this feels more like maybe ParquetWriteMetadata
or something, i.e. metadata relating to the write rather than the parquet file itself.
Also, all the stuff that goes into WRITE_METADATA_SCHEMA
is buried in the FileMeta
. I'd prefer to have a struct
that could #[derive(Schema)]
for the metadata than have the schema manually declared. AFAICT you're not using the fact that this is a FileMeta
anywhere, and are just using it as a container inside a container.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yup - good point! originally, I just had ParquetMetadata
with path, size, modification_time and just realized that was the same as FileMeta so tried to unify there. It sounds like instead you're thinking of just flattening those fields under a ParquetWriteMetadata
struct that will live in the kernel (instead of just in default engine) and that uses #[derive(Schema)]
to get the 'write metadata schema' instead of the separate schema definition.
That seems to be more elegant thanks! :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, thinking about this more, we have two steps to this data flow:
- engine collecting the data returned from each parquet file written:
- path
- size
- last_modified
- engine communicating to kernel the data required for each write:
- path
- size
- last_modified
- partition_values
- data_change
In (2) we add partition_values and data_change, which wouldn't necessarily be inherent to the parquet write but layers on more information about that file (e.g. engine just wrote this parquet file corresponding to partition_col = part1
)
I had introduced ParquetMetadata
to house (1) which is purely an engine concept. Then, in kernel I had WRITE_METADATA_SCHEMA
which specified the schema for (2).
Thinking more about the suggestion, maybe we need a rename of (1) and then maybe change (2) to be a struct that does #[derive(Schema)]
? I'm thinking now about reasons for struct vs. just the schema definition like I have but wanted to post this comment to add some more context and then we can discuss more :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Flushing comments... hopefully can finish soon
kernel/src/engine/default/mod.rs
Outdated
pub fn get_parquet_handler(&self) -> Arc<DefaultParquetHandler<E>> { | ||
self.parquet.clone() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like other engines will face a similar issue, and even our own default engine already hit it previously with trait EngineData
(see its as_any
and into_any
methods). We also know it will be a Big Deal for FFI, where we need the ability to downcast not merely to a concrete rust struct, but to a native engine's C/C++ struct.
I suspect it's time to generalize any-casting capability and bake it into all these engine traits, i.e. #450?
} | ||
|
||
// convert ParquetMetadata into a record batch which matches the 'write_metadata' schema | ||
fn as_record_batch( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we really want a single-row version of this? Or is it better to take a slice of some kind?
Also, why are partition values etc. provided externally, rather than part of self
?
I could totally buy the argument that partition values and data_change are a "bulk" concept... but in that case we should definitely have the record batch generator take a slice of Self
, along with those other bits of information.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: I don't see any call sites for write_parquet_file
, so it's hard to guess all the implications of my suggestion. But I would think any reasonable engine implementation will work with iterators or sequences of files, and that we could inject the as_record_batch
calls there?
The fact that write_parquet_file
is async
is a strong indication that we expect it to be part of a lazy iterator of some sort, no? If nothing else, some default engine code somewhere will have to wait on that async call to complete, since the public kernel API surface is all sync.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
write_parquet_file
is called from write_parquet
here
I think this design is a consequence of write_parquet
only taking one file at a time, and that kernel has to go into EngineData
at that point. If you could pass multiple files to write_parquet
then you'd write them all and build one record batch with all the metadata. But since we do it one at a time, and kernel has to "save" each metadata so it can loop over it in the commit, we have to do this one row approach.
What we really want here (i think) is just an "opaque" type that allows being turned into EngineData
, potentially in a batch.
So ParquetMetadata
would become a type at the kernel level, not just in the engine, but it's just a marker. And then kernel saves up all the ParquetMetadata
s (whatever the engine wants to return for them) and then calls into_engine_data(metadatas)
(another new kernel level api), where the engine can turn all the metadata at once into its EngineData
.
Given the scope of that, if we like it, maybe we stick with this and tackle that as a follow-up?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea and for context on the parquet file writing - I think we likely do want to support writing multiple files at once - java kernel does something similar taking an Iterator of column batches to write. I've aimed to derisk this whole thing by implementing this just within our default engine (no write parquet file engine API) and we can expand the API later?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and yea @nicklan I haven't thought about that sort of trait-based API (that would be a trait that needs into_engine_data(..)
right?) that sounds good and yea probably try to do simple here and i'll make an issue to follow up?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm okay with that. Note that the trait probably wouldn't have into_engine_data
, but rather something in the engine would need an into_engine_data(Vec<TheNewTrait>)
(or maybe iter). You need a collection of them all at once
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just opened #466 since I'm on a roll and love making new issues
let size: i64 = (*size) | ||
.try_into() | ||
.map_err(|_| Error::generic("Failed to convert parquet metadata 'size' to i64"))?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
aside: I almost wonder if FileMeta
should just use size: i64
instead of size: usize
? At least that way the burden of casting is paid only once, by whoever creates it, rather than (potentially) on every read scattered through the code base?
We're anyway nowhere near hitting any meaningful object size limits, so the cast should never fail:
- S3 - 5TB - https://aws.amazon.com/s3/faqs/
- GCS - 5TB - https://support.google.com/cloud/answer/6250993?hl=en
- Azure Blob - 8TB (page blob) or 190TB (block blob) - https://learn.microsoft.com/en-us/azure/storage/blobs/scalability-targets#scale-targets-for-blob-storage
- ADLS - 4TB - https://learn.microsoft.com/en-us/azure/storage/files/storage-files-scale-targets
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about u64
? a little safer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But then we're back to not having a schema type for it...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh right haha. doing i64
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getting close, just a couple more things I think :)
} | ||
|
||
// convert ParquetMetadata into a record batch which matches the 'write_metadata' schema | ||
fn as_record_batch( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm okay with that. Note that the trait probably wouldn't have into_engine_data
, but rather something in the engine would need an into_engine_data(Vec<TheNewTrait>)
(or maybe iter). You need a collection of them all at once
let batch: Box<_> = ArrowEngineData::try_from_engine_data(data)?; | ||
let record_batch = batch.record_batch(); | ||
|
||
let mut buffer = vec![]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, too much time invested in my aside here. It would be an issue on object_store
which we could look at, but probably not now, and probably not something to make an issue for in this repo.
@@ -52,7 +52,7 @@ impl JsonHandler for SyncJsonHandler { | |||
fn write_json_file( | |||
&self, | |||
path: &Url, | |||
data: Box<dyn Iterator<Item = Box<dyn EngineData>> + Send>, | |||
data: Box<dyn Iterator<Item = DeltaResult<Box<dyn EngineData>>> + Send + '_>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ahh, finally I get it. Yeah, we want to let the errors propagate until we need their result. This makes sense. Thanks for bearing with me :)
.put(&Path::from(path.path()), buffer.into()) | ||
.await?; | ||
|
||
let metadata = self.store.head(&Path::from(path.path())).await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want any kind of "todo" here? We could avoid this and just assume size will be correct and use current time as modification time. That would be an optimization people might want, so maybe we could have an option for it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea good call! i'll make an issue for follow up
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
path: &str, | ||
new_value: serde_json::Value, | ||
) -> Result<(), Box<dyn std::error::Error>> { | ||
let mut path_string = path.replace(".", "/"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
curious, why not just take a json pointer string as the path?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
eh just used to the field1.field2
syntax so was trying to make it easier on future usage (instead of having to write /field1/field2
happy to change if the latter seems better
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cool thanks! exciting to get write support! I'll review a bit more, but approving now as I think this is okay to merge and we can tackle improvements as follow-ups.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking good, other than a bunch of nits to fix before merge.
kernel/src/transaction.rs
Outdated
Expression::struct_from(self.read_snapshot.schema().fields().filter_map(|f| { | ||
if self | ||
.read_snapshot | ||
.metadata() | ||
.partition_columns | ||
.contains(f.name()) | ||
{ | ||
None | ||
} else { | ||
let col_name = ColumnName::new([f.name()]); | ||
Some(col_name.into()) | ||
} | ||
})) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Expression::struct_from(self.read_snapshot.schema().fields().filter_map(|f| { | |
if self | |
.read_snapshot | |
.metadata() | |
.partition_columns | |
.contains(f.name()) | |
{ | |
None | |
} else { | |
let col_name = ColumnName::new([f.name()]); | |
Some(col_name.into()) | |
} | |
})) | |
let partition_columns = self.read_snapshot.metadata().partition_columns; | |
Expression::struct_from(self.read_snapshot.schema().fields().filter_map(|f| { | |
(!partition_columns.contains(f.name()).then(|| Expression::column([f.name()]) | |
})) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But it might be cleaner to just do a filter+map pair?
Expression::struct_from(self.read_snapshot.schema().fields().filter_map(|f| { | |
if self | |
.read_snapshot | |
.metadata() | |
.partition_columns | |
.contains(f.name()) | |
{ | |
None | |
} else { | |
let col_name = ColumnName::new([f.name()]); | |
Some(col_name.into()) | |
} | |
})) | |
let partition_columns = &self.read_snapshot.metadata().partition_columns; | |
let fields = self.read_snapshot.schema().fields(); | |
let fields = fields | |
.filter(|f| !partition_columns.contains(f.name())) | |
.map(|f| Expression::column([f.name()])); | |
Expression::struct_from(fields) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
think I have to clone partition columns and I just did a filter_map version of your second suggestion - lmk what you think :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
think I have to clone partition columns
Why does the borrowed reference not work, sorry? It was accessing self.read_snapshot.metadata().partition_columns
inside the closure before, so borrow checker shouldn't be a problem?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, I'm curious why not use Expression::column
for these cases, instead of ColumnName::new(...).into()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you're right on both! for the first I needed to prepend &
to make sure it is capturing the ref and for (2) I got the column function after rebase :)
included in #468
let logical_to_physical = self.generate_logical_to_physical(); | ||
WriteContext::new( | ||
target_dir.clone(), | ||
Arc::new(snapshot_schema.clone()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
aside: Should the snapshot's schema be an arc? Wide tables could lead to expensive clone
...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea I think that makes sense. opened #463 for more discussion :)
/// multiple times to add multiple batches. | ||
/// | ||
/// The expected schema for `write_metadata` is given by [`get_write_metadata_schema`]. | ||
pub fn add_write_metadata(&mut self, write_metadata: Box<dyn EngineData>) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I originally thought that most of our API used Arc<dyn EngineData>
instead of box... but a quick search says we only have one Arc -- the transaction's commit_info
. Maybe we should fix that other one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh interesting - I'll have that as another little follow-up: #464
@@ -56,14 +81,14 @@ impl Transaction { | |||
read_snapshot: snapshot.into(), | |||
operation: None, | |||
commit_info: None, | |||
write_metadata: vec![], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Java kernel requires engine to hand over an iterator, potentially multiple times if there are retries. That way it's engine's job to deal with memory management and spilling on big commits.
kernel/src/engine/default/parquet.rs
Outdated
let write_metadata = parquet_metadata.as_record_batch(&partition_values, data_change)?; | ||
Ok(write_metadata) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit
let write_metadata = parquet_metadata.as_record_batch(&partition_values, data_change)?; | |
Ok(write_metadata) | |
Ok(parquet_metadata.as_record_batch(&partition_values, data_change)?) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea actually just did
parquet_metadata.as_record_batch(&partition_values, data_change)
fn generate_adds<'a>( | ||
engine: &dyn Engine, | ||
write_metadata: impl Iterator<Item = &'a dyn EngineData> + Send + 'a, | ||
) -> impl Iterator<Item = DeltaResult<Box<dyn EngineData>>> + Send + 'a { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting... this says the items returned by the iterator do not outlive the iterator itself. I would have expected something like
fn generate_adds<'a>( | |
engine: &dyn Engine, | |
write_metadata: impl Iterator<Item = &'a dyn EngineData> + Send + 'a, | |
) -> impl Iterator<Item = DeltaResult<Box<dyn EngineData>>> + Send + 'a { | |
fn generate_adds<'i, 'a: 'i>( | |
engine: &dyn Engine, | |
write_metadata: impl Iterator<Item = &'a dyn EngineData> + Send + 'i, | |
) -> impl Iterator<Item = DeltaResult<Box<dyn EngineData>>> + Send + 'i { |
(but I guess if it's not causing any lifetime issues as-is, then there's no need to change it)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i suppose the existing semantics imply that it's okay to have the 'items returned by the iterator not outliving the iterator itself'?
@@ -251,3 +270,382 @@ async fn test_invalid_commit_info() -> Result<(), Box<dyn std::error::Error>> { | |||
)); | |||
Ok(()) | |||
} | |||
|
|||
// check that the timestamps in commit_info and add actions are within 10s of SystemTime::now() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we care, sorry?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this was just a rough sanity check that we aren't wayy off. (otherwise we don't check the time field at all)
@nicklan actually suggested this and it caught a bug where one of the file's timestamps was accidentally in seconds instead of milliseconds!
let mut parsed_commits: Vec<_> = Deserializer::from_slice(&commit1.bytes().await?) | ||
.into_iter::<serde_json::Value>() | ||
.try_collect()?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Does this work, out of curiosity?
let mut parsed_commits: Vec<_> = Deserializer::from_slice(&commit1.bytes().await?) | |
.into_iter::<serde_json::Value>() | |
.try_collect()?; | |
let commit1 = commit1.bytes().await?; | |
let mut parsed_commits: Vec<serde_json::Value> = Deserializer::from_slice(&commit1) | |
.into_iter() | |
.try_collect()?; |
(but actually it's harder to read that than the turbofish, so meh)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it does work! but yea i agree original is more readable to me?
This PR is the second (of two) major pieces for supporting simple blind appends. It implements:
Transaction
APIs for appending data to delta tables:a.
get_write_context()
to get aWriteContext
to pass to the data path which includes all information needed to write:target directory
,snapshot schema
,transformation expression
, and (future: columns to collect stats on)b.
add_write_metadata(impl EngineData)
to add metadata about a write to the transaction along with a new static methodtransaction::get_write_metadata_schema
to provide the expected schema of this engine data.c. new machinery in 'commit' method to commit new
Add
actions for each row of write_metadata from the API above.a. parquet handler can now
write_parquet_file(EngineData)
b. usage example in
write.rs
tests for nowwrite.rs
integration test suiteDetails and some follow-ups:
test_append_invalid_schema
inwrite.rs
integration test). we may want to change this in the future to eagerly error based on the engine providing a schema up front at metadata time (transaction creation time)based on #370
resolves #390