Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[write] Transaction append data API #393

Merged
merged 24 commits into from
Nov 8, 2024
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions kernel/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ default-engine = [
"parquet/object_store",
"reqwest",
"tokio",
"uuid/v4",
"uuid/fast-rng",
]

developer-visibility = []
Expand Down
6 changes: 3 additions & 3 deletions kernel/src/engine/arrow_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -665,11 +665,11 @@ fn parse_json_impl(json_strings: &StringArray, schema: ArrowSchemaRef) -> DeltaR
/// serialize an arrow RecordBatch to a JSON string by appending to a buffer.
// TODO (zach): this should stream data to the JSON writer and output an iterator.
pub(crate) fn to_json_bytes(
data: impl Iterator<Item = Box<dyn EngineData>> + Send,
data: impl Iterator<Item = DeltaResult<Box<dyn EngineData>>> + Send,
) -> DeltaResult<Vec<u8>> {
let mut writer = LineDelimitedWriter::new(Vec::new());
for chunk in data.into_iter() {
let arrow_data = ArrowEngineData::try_from_engine_data(chunk)?;
let arrow_data = ArrowEngineData::try_from_engine_data(chunk?)?;
let record_batch = arrow_data.record_batch();
writer.write(record_batch)?;
}
Expand Down Expand Up @@ -1436,7 +1436,7 @@ mod tests {
vec![Arc::new(StringArray::from(vec!["string1", "string2"]))],
)?;
let data: Box<dyn EngineData> = Box::new(ArrowEngineData::new(data));
let json = to_json_bytes(Box::new(std::iter::once(data)))?;
let json = to_json_bytes(Box::new(std::iter::once(Ok(data))))?;
assert_eq!(
json,
"{\"string\":\"string1\"}\n{\"string\":\"string2\"}\n".as_bytes()
Expand Down
2 changes: 1 addition & 1 deletion kernel/src/engine/default/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ impl<E: TaskExecutor> JsonHandler for DefaultJsonHandler<E> {
fn write_json_file(
&self,
path: &Url,
data: Box<dyn Iterator<Item = Box<dyn EngineData>> + Send>,
data: Box<dyn Iterator<Item = DeltaResult<Box<dyn EngineData>>> + Send + '_>,
_overwrite: bool,
) -> DeltaResult<()> {
let buffer = to_json_bytes(data)?;
Expand Down
33 changes: 32 additions & 1 deletion kernel/src/engine/default/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
//! a separate thread pool, provided by the [`TaskExecutor`] trait. Read more in
//! the [executor] module.

use std::collections::HashMap;
use std::sync::Arc;

use self::storage::parse_url_opts;
Expand All @@ -16,9 +17,13 @@ use self::executor::TaskExecutor;
use self::filesystem::ObjectStoreFileSystemClient;
use self::json::DefaultJsonHandler;
use self::parquet::DefaultParquetHandler;
use super::arrow_data::ArrowEngineData;
use super::arrow_expression::ArrowExpressionHandler;
use crate::schema::Schema;
use crate::transaction::WriteContext;
use crate::{
DeltaResult, Engine, ExpressionHandler, FileSystemClient, JsonHandler, ParquetHandler,
DeltaResult, Engine, EngineData, ExpressionHandler, FileSystemClient, JsonHandler,
ParquetHandler,
};

pub mod executor;
Expand Down Expand Up @@ -108,6 +113,32 @@ impl<E: TaskExecutor> DefaultEngine<E> {
pub fn get_object_store_for_url(&self, _url: &Url) -> Option<Arc<DynObjectStore>> {
Some(self.store.clone())
}

pub async fn write_parquet(
&self,
data: &ArrowEngineData,
write_context: &WriteContext,
partition_values: HashMap<String, String>,
data_change: bool,
) -> DeltaResult<Box<dyn EngineData>> {
let transform = write_context.logical_to_physical();
let input_schema: Schema = data.record_batch().schema().try_into()?;
let output_schema = write_context.schema();
let logical_to_physical_expr = self.get_expression_handler().get_evaluator(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally we should be creating the expression evaluator just once?
Or do we worry the record batch schemas will somehow vary in e.g. field order?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well for now, writing single parquet files aren't we bound to the input schema? That is, for each input data we have (possibly) different input schema which then means different logical_to_physical_expr?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Partition values could be one, I guess. Normally a partition-aware engine would want to pass us the partition value literals and let us decide whether to embed them in the outgoing data? And I guess some future table feature could conceivably require per-file awareness?

input_schema.into(),
transform.clone(),
output_schema.clone().into(),
);
let physical_data = logical_to_physical_expr.evaluate(data)?;
self.parquet
.write_parquet_file(
write_context.target_dir(),
physical_data,
partition_values,
data_change,
)
.await
}
}

impl<E: TaskExecutor> Engine for DefaultEngine<E> {
Expand Down
132 changes: 131 additions & 1 deletion kernel/src/engine/default/parquet.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,30 @@
//! Default Parquet handler implementation

use std::collections::HashMap;
use std::ops::Range;
use std::sync::Arc;

use arrow_array::builder::{MapBuilder, MapFieldNames, StringBuilder};
use arrow_array::{BooleanArray, Int64Array, RecordBatch, StringArray};
use futures::StreamExt;
use object_store::path::Path;
use object_store::DynObjectStore;
use parquet::arrow::arrow_reader::{
ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReaderBuilder,
};
use parquet::arrow::arrow_writer::ArrowWriter;
use parquet::arrow::async_reader::{ParquetObjectReader, ParquetRecordBatchStreamBuilder};
use uuid::Uuid;

use super::file_stream::{FileOpenFuture, FileOpener, FileStream};
use crate::engine::arrow_data::ArrowEngineData;
use crate::engine::arrow_utils::{generate_mask, get_requested_indices, reorder_struct_array};
use crate::engine::default::executor::TaskExecutor;
use crate::engine::parquet_row_group_skipping::ParquetRowGroupSkipping;
use crate::schema::SchemaRef;
use crate::{
DeltaResult, Error, ExpressionRef, FileDataReadResultIterator, FileMeta, ParquetHandler,
DeltaResult, EngineData, Error, ExpressionRef, FileDataReadResultIterator, FileMeta,
ParquetHandler,
};

#[derive(Debug)]
Expand All @@ -27,6 +34,66 @@ pub struct DefaultParquetHandler<E: TaskExecutor> {
readahead: usize,
}

/// Metadata of a parquet file, currently just includes the file metadata but will expand to
/// include file statistics and other metadata in the future.
#[derive(Debug)]
pub struct ParquetWriteMetadata {
zachschuermann marked this conversation as resolved.
Show resolved Hide resolved
file_meta: FileMeta,
}

impl ParquetWriteMetadata {
pub fn new(file_meta: FileMeta) -> Self {
Self { file_meta }
}

// convert ParquetMetadata into a record batch which matches the 'write_metadata' schema
fn as_record_batch(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really want a single-row version of this? Or is it better to take a slice of some kind?
Also, why are partition values etc. provided externally, rather than part of self?

I could totally buy the argument that partition values and data_change are a "bulk" concept... but in that case we should definitely have the record batch generator take a slice of Self, along with those other bits of information.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: I don't see any call sites for write_parquet_file, so it's hard to guess all the implications of my suggestion. But I would think any reasonable engine implementation will work with iterators or sequences of files, and that we could inject the as_record_batch calls there?

The fact that write_parquet_file is async is a strong indication that we expect it to be part of a lazy iterator of some sort, no? If nothing else, some default engine code somewhere will have to wait on that async call to complete, since the public kernel API surface is all sync.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

write_parquet_file is called from write_parquet here

I think this design is a consequence of write_parquet only taking one file at a time, and that kernel has to go into EngineData at that point. If you could pass multiple files to write_parquet then you'd write them all and build one record batch with all the metadata. But since we do it one at a time, and kernel has to "save" each metadata so it can loop over it in the commit, we have to do this one row approach.

What we really want here (i think) is just an "opaque" type that allows being turned into EngineData, potentially in a batch.

So ParquetMetadata would become a type at the kernel level, not just in the engine, but it's just a marker. And then kernel saves up all the ParquetMetadatas (whatever the engine wants to return for them) and then calls into_engine_data(metadatas) (another new kernel level api), where the engine can turn all the metadata at once into its EngineData.

Given the scope of that, if we like it, maybe we stick with this and tackle that as a follow-up?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea and for context on the parquet file writing - I think we likely do want to support writing multiple files at once - java kernel does something similar taking an Iterator of column batches to write. I've aimed to derisk this whole thing by implementing this just within our default engine (no write parquet file engine API) and we can expand the API later?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and yea @nicklan I haven't thought about that sort of trait-based API (that would be a trait that needs into_engine_data(..) right?) that sounds good and yea probably try to do simple here and i'll make an issue to follow up?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm okay with that. Note that the trait probably wouldn't have into_engine_data, but rather something in the engine would need an into_engine_data(Vec<TheNewTrait>) (or maybe iter). You need a collection of them all at once

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just opened #466 since I'm on a roll and love making new issues

&self,
partition_values: HashMap<String, String>,
data_change: bool,
) -> DeltaResult<Box<dyn EngineData>> {
let ParquetWriteMetadata { file_meta } = self;
let FileMeta {
location,
last_modified,
size,
} = file_meta;
zachschuermann marked this conversation as resolved.
Show resolved Hide resolved
let write_metadata_schema = crate::transaction::get_write_metadata_schema();

// create the record batch of the write metadata
let path = Arc::new(StringArray::from(vec![location.to_string()]));
let key_builder = StringBuilder::new();
let val_builder = StringBuilder::new();
let names = MapFieldNames {
entry: "key_value".to_string(),
key: "key".to_string(),
value: "value".to_string(),
zachschuermann marked this conversation as resolved.
Show resolved Hide resolved
};
let mut builder = MapBuilder::new(Some(names), key_builder, val_builder);
if partition_values.is_empty() {
builder.append(true).unwrap();
} else {
for (k, v) in partition_values {
builder.keys().append_value(&k);
builder.values().append_value(&v);
builder.append(true).unwrap();
}
}
let partitions = Arc::new(builder.finish());
// this means max size we can write is i64::MAX (~8EB)
let size: i64 = (*size)
.try_into()
.map_err(|_| Error::generic("Failed to convert parquet metadata 'size' to i64"))?;
Comment on lines +82 to +84
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aside: I almost wonder if FileMeta should just use size: i64 instead of size: usize? At least that way the burden of casting is paid only once, by whoever creates it, rather than (potentially) on every read scattered through the code base?

We're anyway nowhere near hitting any meaningful object size limits, so the cast should never fail:

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about u64? a little safer?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But then we're back to not having a schema type for it...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh right haha. doing i64

let size = Arc::new(Int64Array::from(vec![size]));
let data_change = Arc::new(BooleanArray::from(vec![data_change]));
let modification_time = Arc::new(Int64Array::from(vec![*last_modified]));
Ok(Box::new(ArrowEngineData::new(RecordBatch::try_new(
Arc::new(write_metadata_schema.as_ref().try_into()?),
vec![path, partitions, size, modification_time, data_change],
)?)))
}
}

impl<E: TaskExecutor> DefaultParquetHandler<E> {
pub fn new(store: Arc<DynObjectStore>, task_executor: Arc<E>) -> Self {
Self {
Expand All @@ -43,6 +110,63 @@ impl<E: TaskExecutor> DefaultParquetHandler<E> {
self.readahead = readahead;
self
}

// Write `data` to `path`/<uuid>.parquet as parquet using ArrowWriter and return the parquet
zachschuermann marked this conversation as resolved.
Show resolved Hide resolved
// metadata (where <uuid> is a generated UUIDv4).
//
// Note: after encoding the data as parquet, this issues a PUT followed by a HEAD to storage in
// order to obtain metadata about the object just written.
async fn write_parquet(
&self,
path: &url::Url,
data: Box<dyn EngineData>,
) -> DeltaResult<ParquetWriteMetadata> {
let batch: Box<_> = ArrowEngineData::try_from_engine_data(data)?;
let record_batch = batch.record_batch();

let mut buffer = vec![];
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It sucks that object_store doesn't have any notion of taking a "stream of bytes" and instead wants everything as a single chunk.

We should probably at least file a tracking issue to see if we can improve this somehow.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a streaming write to an object store would have to take the form of a multipart upload which object_store does support with the MultipartUpload trait and perhaps we can just implement that in some async way to stream our data? also ran across WriteMultipart but I'll do some more digging.

I've already got #418 open for doing streaming JSON puts, and i've just gone ahead and expanded that issue to be streaming JSON/parquet since I think it will generally be the same

Copy link
Collaborator

@scovich scovich Oct 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That WriteMultipart does indeed seem relevant. Looks like it's up to us to sort out simple PUT vs. MPU tho (similar to how arrow parquet reads are super low-level, I guess).

Parquet will be trickier because somebody (engine or default engine) has to encode+compress the bytes that will eventually be written. Parquet is designed to be streamed (row groups, and footer comes last), but I don't know how well arrow-parquet supports it for default client? There's e.g. SerializedFileWriter that uses Write, so I guess we'd have to manually tie that in with the object store machinery (WriteMultipart doesn't seem to impl Write). Our Write impl would have to decide PUT vs. MPU dynamically based on data size being written, I suppose.

Also: I hope there's some easy way to write out arrow data as parquet? From my quick skim, it looks like we'd be wrangling individual row groups and column chunks.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep gonna punt on this for now and optimize in a follow-up.

Also: I hope there's some easy way to write out arrow data as parquet? From my quick skim, it looks like we'd be wrangling individual row groups and column chunks.

we are just using parquet::arrow::arrow_writer::ArrowWriter here! (rather straightforward to use to write record batches - not sure if that's what you were asking)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without disagreeing with anything above, I also meant that object_store should be able to just do a "normal" put on anything that implements std::io::Read.

It's going to write it all as the body of the request out to the network/disk, so it should be able to just stream the bytes rather than needing them in one continuous chunk.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah yea got it! I can create an issue for this since it's seeming different than the full 'stream to multipart' one?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, too much time invested in my aside here. It would be an issue on object_store which we could look at, but probably not now, and probably not something to make an issue for in this repo.

let mut writer = ArrowWriter::try_new(&mut buffer, record_batch.schema(), None)?;
writer.write(record_batch)?;
writer.close()?; // writer must be closed to write footer

let size = buffer.len();
let name: String = Uuid::new_v4().to_string() + ".parquet";
zachschuermann marked this conversation as resolved.
Show resolved Hide resolved
// FIXME test with trailing '/' and omitting?
let path = path.join(&name)?;

self.store
.put(&Path::from(path.path()), buffer.into())
.await?;

let metadata = self.store.head(&Path::from(path.path())).await?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want any kind of "todo" here? We could avoid this and just assume size will be correct and use current time as modification time. That would be an optimization people might want, so maybe we could have an option for it?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea good call! i'll make an issue for follow up

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let modification_time = metadata.last_modified.timestamp();
if size != metadata.size {
return Err(Error::generic(format!(
"Size mismatch after writing parquet file: expected {}, got {}",
size, metadata.size
)));
}

let file_meta = FileMeta::new(path, modification_time, size);
Ok(ParquetWriteMetadata::new(file_meta))
}

/// Write `data` to `path`/<uuid>.parquet as parquet using ArrowWriter and return the parquet
/// metadata as an EngineData batch which matches the [write metadata] schema (where <uuid> is
/// a generated UUIDv4).
///
/// [write metadata]: crate::transaction::get_write_metadata_schema
pub async fn write_parquet_file(
&self,
path: &url::Url,
data: Box<dyn EngineData>,
partition_values: HashMap<String, String>,
data_change: bool,
) -> DeltaResult<Box<dyn EngineData>> {
let parquet_metadata = self.write_parquet(path, data).await?;
let write_metadata = parquet_metadata.as_record_batch(partition_values, data_change)?;
Ok(write_metadata)
}
}

impl<E: TaskExecutor> ParquetHandler for DefaultParquetHandler<E> {
Expand Down Expand Up @@ -297,4 +421,10 @@ mod tests {
assert_eq!(data.len(), 1);
assert_eq!(data[0].num_rows(), 10);
}

#[test]
fn test_into_write_metadata() {}

#[tokio::test]
async fn test_write_parquet() {}
}
6 changes: 3 additions & 3 deletions kernel/src/engine/sync/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ impl JsonHandler for SyncJsonHandler {
fn write_json_file(
&self,
path: &Url,
data: Box<dyn Iterator<Item = Box<dyn EngineData>> + Send>,
data: Box<dyn Iterator<Item = DeltaResult<Box<dyn EngineData>>> + Send + '_>,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

curious of what people think of this API change. if we don't propagate results to here then we end up in a weird situation of either having to traverse everything to filter out Err cases or just unwap stuff? is there a better way to handle?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are we trying to achieve? Seems like it'll still be an issue that we might write some data and then fail. Without checking the whole thing for Err cases I'm not sure how you can avoid that?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have things like expression evaluation happening at every chunk of the iterator (e.g. before we get here we do the transformation from 'write_metadata' to actions) which might fail. Thus we have to 'hold on' to those Results and propagate them through to this API. And yea not sure if there's a way around that other than something that scans the whole thing ahead of time?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But are you going to somehow recover from those errors? Why not just propagate earlier?

To make sure I'm getting it. This would for example propagate any errors in generate_commit_info down here, rather than early returning from commit() when generate_commit_info failed, right? Early fail in commit actually seems better, so I'm not seeing the benefit here.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But are you going to somehow recover from those errors? Why not just propagate earlier?

probably not? but unsure how to propagate early without traversing the whole iterator.

To make sure I'm getting it. This would for example propagate any errors in generate_commit_info down here, rather than early returning from commit() when generate_commit_info failed, right? Early fail in commit actually seems better, so I'm not seeing the benefit here.

since generate_commit_info just always appends the first batch and we could fail early I'll use generate_adds as an example: we lazily generate the adds to produce an impl Iterator<Item = DeltaResult<Box<dyn EngineData>>>. that is, generate_adds takes an iterator of write_metadata and produces an iterator of result (as engine data). The result is injected here since we do some fallible work like expression evaluation to transform into the actions. Then, we are left with handing this iterator of results to the write_json API unless we want to traverse the whole iterator prior to passing it off to check that one of the batches didn't fail.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ahh, finally I get it. Yeah, we want to let the errors propagate until we need their result. This makes sense. Thanks for bearing with me :)

_overwrite: bool,
) -> DeltaResult<()> {
let path = path
Expand Down Expand Up @@ -120,10 +120,10 @@ mod tests {

let url = Url::from_file_path(path.clone()).unwrap();
handler
.write_json_file(&url, Box::new(std::iter::once(data)), false)
.write_json_file(&url, Box::new(std::iter::once(Ok(data))), false)
.expect("write json file");
assert!(matches!(
handler.write_json_file(&url, Box::new(std::iter::once(empty)), false),
handler.write_json_file(&url, Box::new(std::iter::once(Ok(empty))), false),
Err(Error::FileAlreadyExists(_))
));

Expand Down
13 changes: 12 additions & 1 deletion kernel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,17 @@ impl PartialOrd for FileMeta {
}
}

impl FileMeta {
/// Create a new instance of `FileMeta`
pub fn new(location: Url, last_modified: i64, size: usize) -> Self {
Self {
location,
last_modified,
size,
}
}
}

/// Trait for implementing an Expression evaluator.
///
/// It contains one Expression which can be evaluated on multiple ColumnarBatches.
Expand Down Expand Up @@ -233,7 +244,7 @@ pub trait JsonHandler: Send + Sync {
fn write_json_file(
&self,
path: &Url,
data: Box<dyn Iterator<Item = Box<dyn EngineData>> + Send>,
data: Box<dyn Iterator<Item = DeltaResult<Box<dyn EngineData>>> + Send + '_>,
overwrite: bool,
) -> DeltaResult<()>;
}
Expand Down
Loading
Loading