Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[write] Transaction append data API #393

Merged
merged 24 commits into from
Nov 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions kernel/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ default-engine = [
"parquet/object_store",
"reqwest",
"tokio",
"uuid/v4",
"uuid/fast-rng",
]

developer-visibility = []
Expand Down
6 changes: 3 additions & 3 deletions kernel/src/engine/arrow_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -665,11 +665,11 @@ fn parse_json_impl(json_strings: &StringArray, schema: ArrowSchemaRef) -> DeltaR
/// serialize an arrow RecordBatch to a JSON string by appending to a buffer.
// TODO (zach): this should stream data to the JSON writer and output an iterator.
pub(crate) fn to_json_bytes(
data: impl Iterator<Item = Box<dyn EngineData>> + Send,
data: impl Iterator<Item = DeltaResult<Box<dyn EngineData>>> + Send,
) -> DeltaResult<Vec<u8>> {
let mut writer = LineDelimitedWriter::new(Vec::new());
for chunk in data.into_iter() {
let arrow_data = ArrowEngineData::try_from_engine_data(chunk)?;
let arrow_data = ArrowEngineData::try_from_engine_data(chunk?)?;
let record_batch = arrow_data.record_batch();
writer.write(record_batch)?;
}
Expand Down Expand Up @@ -1436,7 +1436,7 @@ mod tests {
vec![Arc::new(StringArray::from(vec!["string1", "string2"]))],
)?;
let data: Box<dyn EngineData> = Box::new(ArrowEngineData::new(data));
let json = to_json_bytes(Box::new(std::iter::once(data)))?;
let json = to_json_bytes(Box::new(std::iter::once(Ok(data))))?;
assert_eq!(
json,
"{\"string\":\"string1\"}\n{\"string\":\"string2\"}\n".as_bytes()
Expand Down
2 changes: 1 addition & 1 deletion kernel/src/engine/default/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ impl<E: TaskExecutor> JsonHandler for DefaultJsonHandler<E> {
fn write_json_file(
&self,
path: &Url,
data: Box<dyn Iterator<Item = Box<dyn EngineData>> + Send>,
data: Box<dyn Iterator<Item = DeltaResult<Box<dyn EngineData>>> + Send + '_>,
_overwrite: bool,
) -> DeltaResult<()> {
let buffer = to_json_bytes(data)?;
Expand Down
33 changes: 32 additions & 1 deletion kernel/src/engine/default/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
//! a separate thread pool, provided by the [`TaskExecutor`] trait. Read more in
//! the [executor] module.

use std::collections::HashMap;
use std::sync::Arc;

use self::storage::parse_url_opts;
Expand All @@ -16,9 +17,13 @@ use self::executor::TaskExecutor;
use self::filesystem::ObjectStoreFileSystemClient;
use self::json::DefaultJsonHandler;
use self::parquet::DefaultParquetHandler;
use super::arrow_data::ArrowEngineData;
use super::arrow_expression::ArrowExpressionHandler;
use crate::schema::Schema;
use crate::transaction::WriteContext;
use crate::{
DeltaResult, Engine, ExpressionHandler, FileSystemClient, JsonHandler, ParquetHandler,
DeltaResult, Engine, EngineData, ExpressionHandler, FileSystemClient, JsonHandler,
ParquetHandler,
};

pub mod executor;
Expand Down Expand Up @@ -108,6 +113,32 @@ impl<E: TaskExecutor> DefaultEngine<E> {
pub fn get_object_store_for_url(&self, _url: &Url) -> Option<Arc<DynObjectStore>> {
Some(self.store.clone())
}

pub async fn write_parquet(
&self,
data: &ArrowEngineData,
write_context: &WriteContext,
partition_values: HashMap<String, String>,
data_change: bool,
) -> DeltaResult<Box<dyn EngineData>> {
let transform = write_context.logical_to_physical();
let input_schema: Schema = data.record_batch().schema().try_into()?;
let output_schema = write_context.schema();
let logical_to_physical_expr = self.get_expression_handler().get_evaluator(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally we should be creating the expression evaluator just once?
Or do we worry the record batch schemas will somehow vary in e.g. field order?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well for now, writing single parquet files aren't we bound to the input schema? That is, for each input data we have (possibly) different input schema which then means different logical_to_physical_expr?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Partition values could be one, I guess. Normally a partition-aware engine would want to pass us the partition value literals and let us decide whether to embed them in the outgoing data? And I guess some future table feature could conceivably require per-file awareness?

input_schema.into(),
transform.clone(),
output_schema.clone().into(),
);
let physical_data = logical_to_physical_expr.evaluate(data)?;
self.parquet
.write_parquet_file(
write_context.target_dir(),
physical_data,
partition_values,
data_change,
)
.await
}
}

impl<E: TaskExecutor> Engine for DefaultEngine<E> {
Expand Down
Loading
Loading