-
Notifications
You must be signed in to change notification settings - Fork 54
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[write] Transaction append data API #393
Changes from 19 commits
f4eabf0
59fd679
5926d96
3c7dd37
cc01511
a6290f2
c7dac51
8f1a446
b41d255
9838e35
bdea457
51e7cfb
c67862e
b2f8414
9a2ae28
edd6c4b
1d4c914
92d29a6
4675231
a7455f8
0044d8e
59166c2
cab14d6
796f2c4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||
---|---|---|---|---|---|---|---|---|
@@ -1,23 +1,30 @@ | ||||||||
//! Default Parquet handler implementation | ||||||||
|
||||||||
use std::collections::HashMap; | ||||||||
use std::ops::Range; | ||||||||
use std::sync::Arc; | ||||||||
|
||||||||
use arrow_array::builder::{MapBuilder, MapFieldNames, StringBuilder}; | ||||||||
use arrow_array::{BooleanArray, Int64Array, RecordBatch, StringArray}; | ||||||||
use futures::StreamExt; | ||||||||
use object_store::path::Path; | ||||||||
use object_store::DynObjectStore; | ||||||||
use parquet::arrow::arrow_reader::{ | ||||||||
ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReaderBuilder, | ||||||||
}; | ||||||||
use parquet::arrow::arrow_writer::ArrowWriter; | ||||||||
use parquet::arrow::async_reader::{ParquetObjectReader, ParquetRecordBatchStreamBuilder}; | ||||||||
use uuid::Uuid; | ||||||||
|
||||||||
use super::file_stream::{FileOpenFuture, FileOpener, FileStream}; | ||||||||
use crate::engine::arrow_data::ArrowEngineData; | ||||||||
use crate::engine::arrow_utils::{generate_mask, get_requested_indices, reorder_struct_array}; | ||||||||
use crate::engine::default::executor::TaskExecutor; | ||||||||
use crate::engine::parquet_row_group_skipping::ParquetRowGroupSkipping; | ||||||||
use crate::schema::SchemaRef; | ||||||||
use crate::{ | ||||||||
DeltaResult, Error, ExpressionRef, FileDataReadResultIterator, FileMeta, ParquetHandler, | ||||||||
DeltaResult, EngineData, Error, ExpressionRef, FileDataReadResultIterator, FileMeta, | ||||||||
ParquetHandler, | ||||||||
}; | ||||||||
|
||||||||
#[derive(Debug)] | ||||||||
|
@@ -27,6 +34,64 @@ pub struct DefaultParquetHandler<E: TaskExecutor> { | |||||||
readahead: usize, | ||||||||
} | ||||||||
|
||||||||
/// Metadata of a parquet file, currently just includes the file metadata but will expand to | ||||||||
/// include file statistics and other metadata in the future. | ||||||||
#[derive(Debug)] | ||||||||
pub struct DataFileMetadata { | ||||||||
file_meta: FileMeta, | ||||||||
} | ||||||||
|
||||||||
impl DataFileMetadata { | ||||||||
pub fn new(file_meta: FileMeta) -> Self { | ||||||||
Self { file_meta } | ||||||||
} | ||||||||
|
||||||||
// convert ParquetMetadata into a record batch which matches the 'write_metadata' schema | ||||||||
fn as_record_batch( | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we really want a single-row version of this? Or is it better to take a slice of some kind? I could totally buy the argument that partition values and data_change are a "bulk" concept... but in that case we should definitely have the record batch generator take a slice of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note: I don't see any call sites for The fact that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I think this design is a consequence of What we really want here (i think) is just an "opaque" type that allows being turned into So Given the scope of that, if we like it, maybe we stick with this and tackle that as a follow-up? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yea and for context on the parquet file writing - I think we likely do want to support writing multiple files at once - java kernel does something similar taking an Iterator of column batches to write. I've aimed to derisk this whole thing by implementing this just within our default engine (no write parquet file engine API) and we can expand the API later? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. and yea @nicklan I haven't thought about that sort of trait-based API (that would be a trait that needs There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm okay with that. Note that the trait probably wouldn't have There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. just opened #466 since I'm on a roll and love making new issues |
||||||||
&self, | ||||||||
partition_values: &HashMap<String, String>, | ||||||||
data_change: bool, | ||||||||
) -> DeltaResult<Box<dyn EngineData>> { | ||||||||
let DataFileMetadata { | ||||||||
file_meta: | ||||||||
FileMeta { | ||||||||
location, | ||||||||
last_modified, | ||||||||
size, | ||||||||
}, | ||||||||
} = self; | ||||||||
let write_metadata_schema = crate::transaction::get_write_metadata_schema(); | ||||||||
|
||||||||
// create the record batch of the write metadata | ||||||||
let path = Arc::new(StringArray::from(vec![location.to_string()])); | ||||||||
let key_builder = StringBuilder::new(); | ||||||||
let val_builder = StringBuilder::new(); | ||||||||
let names = MapFieldNames { | ||||||||
entry: "key_value".to_string(), | ||||||||
key: "key".to_string(), | ||||||||
value: "value".to_string(), | ||||||||
zachschuermann marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||
}; | ||||||||
let mut builder = MapBuilder::new(Some(names), key_builder, val_builder); | ||||||||
for (k, v) in partition_values { | ||||||||
builder.keys().append_value(k); | ||||||||
builder.values().append_value(v); | ||||||||
} | ||||||||
builder.append(true).unwrap(); | ||||||||
let partitions = Arc::new(builder.finish()); | ||||||||
// this means max size we can write is i64::MAX (~8EB) | ||||||||
let size: i64 = (*size) | ||||||||
.try_into() | ||||||||
.map_err(|_| Error::generic("Failed to convert parquet metadata 'size' to i64"))?; | ||||||||
Comment on lines
+82
to
+84
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. aside: I almost wonder if We're anyway nowhere near hitting any meaningful object size limits, so the cast should never fail:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. how about There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. But then we're back to not having a schema type for it... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh right haha. doing |
||||||||
let size = Arc::new(Int64Array::from(vec![size])); | ||||||||
let data_change = Arc::new(BooleanArray::from(vec![data_change])); | ||||||||
let modification_time = Arc::new(Int64Array::from(vec![*last_modified])); | ||||||||
Ok(Box::new(ArrowEngineData::new(RecordBatch::try_new( | ||||||||
Arc::new(write_metadata_schema.as_ref().try_into()?), | ||||||||
vec![path, partitions, size, modification_time, data_change], | ||||||||
)?))) | ||||||||
} | ||||||||
} | ||||||||
|
||||||||
impl<E: TaskExecutor> DefaultParquetHandler<E> { | ||||||||
pub fn new(store: Arc<DynObjectStore>, task_executor: Arc<E>) -> Self { | ||||||||
Self { | ||||||||
|
@@ -43,6 +108,63 @@ impl<E: TaskExecutor> DefaultParquetHandler<E> { | |||||||
self.readahead = readahead; | ||||||||
self | ||||||||
} | ||||||||
|
||||||||
// Write `data` to `path`/<uuid>.parquet as parquet using ArrowWriter and return the parquet | ||||||||
zachschuermann marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||
// metadata (where <uuid> is a generated UUIDv4). | ||||||||
// | ||||||||
// Note: after encoding the data as parquet, this issues a PUT followed by a HEAD to storage in | ||||||||
// order to obtain metadata about the object just written. | ||||||||
async fn write_parquet( | ||||||||
&self, | ||||||||
path: &url::Url, | ||||||||
data: Box<dyn EngineData>, | ||||||||
) -> DeltaResult<DataFileMetadata> { | ||||||||
let batch: Box<_> = ArrowEngineData::try_from_engine_data(data)?; | ||||||||
let record_batch = batch.record_batch(); | ||||||||
|
||||||||
let mut buffer = vec![]; | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It sucks that object_store doesn't have any notion of taking a "stream of bytes" and instead wants everything as a single chunk. We should probably at least file a tracking issue to see if we can improve this somehow. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think a streaming write to an object store would have to take the form of a multipart upload which I've already got #418 open for doing streaming JSON puts, and i've just gone ahead and expanded that issue to be streaming JSON/parquet since I think it will generally be the same There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That Parquet will be trickier because somebody (engine or default engine) has to encode+compress the bytes that will eventually be written. Parquet is designed to be streamed (row groups, and footer comes last), but I don't know how well arrow-parquet supports it for default client? There's e.g. SerializedFileWriter that uses Write, so I guess we'd have to manually tie that in with the object store machinery ( Also: I hope there's some easy way to write out arrow data as parquet? From my quick skim, it looks like we'd be wrangling individual row groups and column chunks. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yep gonna punt on this for now and optimize in a follow-up.
we are just using There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Without disagreeing with anything above, I also meant that It's going to write it all as the body of the request out to the network/disk, so it should be able to just stream the bytes rather than needing them in one continuous chunk. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ah yea got it! I can create an issue for this since it's seeming different than the full 'stream to multipart' one? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry, too much time invested in my aside here. It would be an issue on |
||||||||
let mut writer = ArrowWriter::try_new(&mut buffer, record_batch.schema(), None)?; | ||||||||
writer.write(record_batch)?; | ||||||||
writer.close()?; // writer must be closed to write footer | ||||||||
|
||||||||
let size = buffer.len(); | ||||||||
let name: String = Uuid::new_v4().to_string() + ".parquet"; | ||||||||
zachschuermann marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||
// FIXME test with trailing '/' and omitting? | ||||||||
let path = path.join(&name)?; | ||||||||
|
||||||||
self.store | ||||||||
.put(&Path::from(path.path()), buffer.into()) | ||||||||
.await?; | ||||||||
|
||||||||
let metadata = self.store.head(&Path::from(path.path())).await?; | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we want any kind of "todo" here? We could avoid this and just assume size will be correct and use current time as modification time. That would be an optimization people might want, so maybe we could have an option for it? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yea good call! i'll make an issue for follow up There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||||||||
let modification_time = metadata.last_modified.timestamp_millis(); | ||||||||
if size != metadata.size { | ||||||||
return Err(Error::generic(format!( | ||||||||
"Size mismatch after writing parquet file: expected {}, got {}", | ||||||||
size, metadata.size | ||||||||
))); | ||||||||
} | ||||||||
|
||||||||
let file_meta = FileMeta::new(path, modification_time, size); | ||||||||
Ok(DataFileMetadata::new(file_meta)) | ||||||||
} | ||||||||
|
||||||||
/// Write `data` to `path`/<uuid>.parquet as parquet using ArrowWriter and return the parquet | ||||||||
/// metadata as an EngineData batch which matches the [write metadata] schema (where <uuid> is | ||||||||
/// a generated UUIDv4). | ||||||||
/// | ||||||||
/// [write metadata]: crate::transaction::get_write_metadata_schema | ||||||||
pub async fn write_parquet_file( | ||||||||
&self, | ||||||||
path: &url::Url, | ||||||||
data: Box<dyn EngineData>, | ||||||||
partition_values: HashMap<String, String>, | ||||||||
data_change: bool, | ||||||||
) -> DeltaResult<Box<dyn EngineData>> { | ||||||||
let parquet_metadata = self.write_parquet(path, data).await?; | ||||||||
let write_metadata = parquet_metadata.as_record_batch(&partition_values, data_change)?; | ||||||||
Ok(write_metadata) | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yea actually just did parquet_metadata.as_record_batch(&partition_values, data_change) |
||||||||
} | ||||||||
} | ||||||||
|
||||||||
impl<E: TaskExecutor> ParquetHandler for DefaultParquetHandler<E> { | ||||||||
|
@@ -242,9 +364,12 @@ impl FileOpener for PresignedUrlOpener { | |||||||
#[cfg(test)] | ||||||||
mod tests { | ||||||||
use std::path::PathBuf; | ||||||||
use std::time::{SystemTime, UNIX_EPOCH}; | ||||||||
|
||||||||
use arrow_array::array::Array; | ||||||||
use arrow_array::RecordBatch; | ||||||||
use object_store::{local::LocalFileSystem, ObjectStore}; | ||||||||
use object_store::{local::LocalFileSystem, memory::InMemory, ObjectStore}; | ||||||||
use url::Url; | ||||||||
|
||||||||
use crate::engine::arrow_data::ArrowEngineData; | ||||||||
use crate::engine::default::executor::tokio::TokioBackgroundExecutor; | ||||||||
|
@@ -297,4 +422,122 @@ mod tests { | |||||||
assert_eq!(data.len(), 1); | ||||||||
assert_eq!(data[0].num_rows(), 10); | ||||||||
} | ||||||||
|
||||||||
#[test] | ||||||||
fn test_as_record_batch() { | ||||||||
let location = Url::parse("file:///test_url").unwrap(); | ||||||||
let size = 1_000_000; | ||||||||
let last_modified = 10000000000; | ||||||||
let file_metadata = FileMeta::new(location.clone(), last_modified, size as usize); | ||||||||
let data_file_metadata = DataFileMetadata::new(file_metadata); | ||||||||
let partition_values = HashMap::from([("partition1".to_string(), "a".to_string())]); | ||||||||
let data_change = true; | ||||||||
let actual = data_file_metadata | ||||||||
.as_record_batch(&partition_values, data_change) | ||||||||
.unwrap(); | ||||||||
let actual = ArrowEngineData::try_from_engine_data(actual).unwrap(); | ||||||||
|
||||||||
let schema = Arc::new( | ||||||||
crate::transaction::get_write_metadata_schema() | ||||||||
.as_ref() | ||||||||
.try_into() | ||||||||
.unwrap(), | ||||||||
); | ||||||||
let key_builder = StringBuilder::new(); | ||||||||
let val_builder = StringBuilder::new(); | ||||||||
let mut partition_values_builder = MapBuilder::new( | ||||||||
Some(MapFieldNames { | ||||||||
entry: "key_value".to_string(), | ||||||||
key: "key".to_string(), | ||||||||
value: "value".to_string(), | ||||||||
}), | ||||||||
key_builder, | ||||||||
val_builder, | ||||||||
); | ||||||||
partition_values_builder.keys().append_value("partition1"); | ||||||||
partition_values_builder.values().append_value("a"); | ||||||||
partition_values_builder.append(true).unwrap(); | ||||||||
let partition_values = partition_values_builder.finish(); | ||||||||
let expected = RecordBatch::try_new( | ||||||||
schema, | ||||||||
vec![ | ||||||||
Arc::new(StringArray::from(vec![location.to_string()])), | ||||||||
Arc::new(partition_values), | ||||||||
Arc::new(Int64Array::from(vec![size])), | ||||||||
Arc::new(Int64Array::from(vec![last_modified])), | ||||||||
Arc::new(BooleanArray::from(vec![data_change])), | ||||||||
], | ||||||||
) | ||||||||
.unwrap(); | ||||||||
|
||||||||
assert_eq!(actual.record_batch(), &expected); | ||||||||
} | ||||||||
|
||||||||
#[tokio::test] | ||||||||
async fn test_write_parquet() { | ||||||||
let store = Arc::new(InMemory::new()); | ||||||||
let parquet_handler = | ||||||||
DefaultParquetHandler::new(store.clone(), Arc::new(TokioBackgroundExecutor::new())); | ||||||||
|
||||||||
let data = Box::new(ArrowEngineData::new( | ||||||||
RecordBatch::try_from_iter(vec![( | ||||||||
"a", | ||||||||
Arc::new(Int64Array::from(vec![1, 2, 3])) as Arc<dyn Array>, | ||||||||
)]) | ||||||||
.unwrap(), | ||||||||
)); | ||||||||
|
||||||||
let write_metadata = parquet_handler | ||||||||
.write_parquet(&Url::parse("memory:///data/").unwrap(), data) | ||||||||
.await | ||||||||
.unwrap(); | ||||||||
|
||||||||
let DataFileMetadata { | ||||||||
file_meta: | ||||||||
ref parquet_file @ FileMeta { | ||||||||
ref location, | ||||||||
last_modified, | ||||||||
size, | ||||||||
}, | ||||||||
} = write_metadata; | ||||||||
let expected_location = Url::parse("memory:///data/").unwrap(); | ||||||||
let expected_size = 497; | ||||||||
|
||||||||
// check that last_modified is within 10s of now | ||||||||
let now: i64 = SystemTime::now() | ||||||||
.duration_since(UNIX_EPOCH) | ||||||||
.unwrap() | ||||||||
.as_millis() | ||||||||
.try_into() | ||||||||
.unwrap(); | ||||||||
|
||||||||
let filename = location.path().split('/').last().unwrap(); | ||||||||
assert_eq!(&expected_location.join(filename).unwrap(), location); | ||||||||
assert_eq!(expected_size, size); | ||||||||
assert!(now - last_modified < 10_000); | ||||||||
|
||||||||
// check we can read back | ||||||||
let path = Path::from(location.path()); | ||||||||
let meta = store.head(&path).await.unwrap(); | ||||||||
let reader = ParquetObjectReader::new(store.clone(), meta.clone()); | ||||||||
let physical_schema = ParquetRecordBatchStreamBuilder::new(reader) | ||||||||
.await | ||||||||
.unwrap() | ||||||||
.schema() | ||||||||
.clone(); | ||||||||
|
||||||||
let data: Vec<RecordBatch> = parquet_handler | ||||||||
.read_parquet_files( | ||||||||
&[parquet_file.clone()], | ||||||||
Arc::new(physical_schema.try_into().unwrap()), | ||||||||
None, | ||||||||
) | ||||||||
.unwrap() | ||||||||
.map(into_record_batch) | ||||||||
.try_collect() | ||||||||
.unwrap(); | ||||||||
|
||||||||
assert_eq!(data.len(), 1); | ||||||||
assert_eq!(data[0].num_rows(), 3); | ||||||||
} | ||||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally we should be creating the expression evaluator just once?
Or do we worry the record batch schemas will somehow vary in e.g. field order?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
well for now, writing single parquet files aren't we bound to the input schema? That is, for each input data we have (possibly) different input schema which then means different
logical_to_physical_expr
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Partition values could be one, I guess. Normally a partition-aware engine would want to pass us the partition value literals and let us decide whether to embed them in the outgoing data? And I guess some future table feature could conceivably require per-file awareness?