-
Notifications
You must be signed in to change notification settings - Fork 49
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[write] add Transaction with commit info and commit implementation #370
Changes from 56 commits
f40221a
b16491c
432a339
93fab4d
64d7eaf
05e9488
2282cd8
21928b8
532ea8c
215ed4e
78c8464
0f1f955
8cc9cc9
114c16f
b7c351f
9a9e9d3
6b0c2d4
d1af098
0ba047d
667a8e2
52bd5f2
7696d7d
fa6c81d
a3abbfa
d7ea4c4
bac1d09
bc541dd
fa1caf4
9d875cd
023b85a
c1c6e2a
da43cf2
26b8dbd
858f3fb
1ef5ffc
6ee69e7
0b2b1ed
2258549
f463e22
1149a17
3877ccc
0b5b301
3daed9b
327bbde
6d2b41a
0abd291
b793523
2f4e4d0
68edef2
673af96
559bbea
5afe8db
7f87591
76cdfaa
cc7598c
a1ba008
525b8ff
a86495a
0a2ecfc
c22f625
630c694
f5530f9
37db615
d7ad2e6
2141ecf
75c976c
81866c9
4908174
20ffd33
4aba873
b4feb4f
1fc535e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,6 +10,7 @@ use visitors::{AddVisitor, MetadataVisitor, ProtocolVisitor}; | |
use self::deletion_vector::DeletionVectorDescriptor; | ||
use crate::actions::schemas::GetStructField; | ||
use crate::features::{ReaderFeatures, WriterFeatures}; | ||
use crate::SchemaRef; | ||
use crate::{schema::StructType, DeltaResult, EngineData}; | ||
|
||
pub mod deletion_vector; | ||
|
@@ -42,12 +43,20 @@ static LOG_SCHEMA: LazyLock<StructType> = LazyLock::new(|| { | |
]) | ||
}); | ||
|
||
static LOG_COMMIT_INFO_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| { | ||
StructType::new([Option::<CommitInfo>::get_struct_field(COMMIT_INFO_NAME)]).into() | ||
}); | ||
|
||
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))] | ||
#[cfg_attr(not(feature = "developer-visibility"), visibility::make(pub(crate)))] | ||
fn get_log_schema() -> &'static StructType { | ||
&LOG_SCHEMA | ||
} | ||
|
||
pub(crate) fn get_log_commit_info_schema() -> &'static SchemaRef { | ||
&LOG_COMMIT_INFO_SCHEMA | ||
} | ||
|
||
#[derive(Debug, Clone, PartialEq, Eq, Schema)] | ||
pub struct Format { | ||
/// Name of the encoding for files in this table | ||
|
@@ -137,8 +146,33 @@ impl Protocol { | |
} | ||
|
||
#[derive(Debug, Clone, PartialEq, Eq, Schema)] | ||
pub struct CommitInfo { | ||
pub kernel_version: Option<String>, | ||
struct OperationParameters { | ||
operation_parameters: Option<i64>, | ||
} | ||
|
||
#[derive(Debug, Clone, PartialEq, Eq, Schema)] | ||
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))] | ||
#[cfg_attr(not(feature = "developer-visibility"), visibility::make(pub(crate)))] | ||
struct CommitInfo { | ||
/// The time this logical file was created, as milliseconds since the epoch. | ||
pub(crate) timestamp: Option<i64>, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this be a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we don't impl Schema on There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, this is a long time issue in that java doesn't have unsigned types. Somewhere back in our issue list is something about figuring this out :p There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We may or may not need
|
||
/// An arbitrary string that identifies the operation associated with this commit. This is | ||
/// specified by the engine. | ||
pub(crate) operation: Option<String>, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is this actually optional? Would a commit ever not have an associated action? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it is indeed optional according to the protocol. that is, we should never require it to be present when reading commit info There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Technically, the protocol has nothing to say about any of this. It just so happens that Delta-spark requires this field, along with our favorite (put another way -- it's optional from kernel-as-reader perspective, but if we want compat with Delta-spark then it's required from kernel-as-writer perspective) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. right, to be clear I mean that it is optional because the protocol says commit info can be anything. since we don't really have a way of communicating reader/writer optionality I can just add a comment saying that this is optional since it is actually optional from a read/enforcement perspective but that kernel always writes it |
||
/// Map of arbitrary string key-value pairs that provide additional information about the | ||
/// operation. This is specified by the engine. | ||
/// | ||
/// For now this is always empty; and since we don't have the ability to construct an empty | ||
/// string-string map, we spoof the operation_parameters with an empty struct so it serializes | ||
/// the same as an empty map (as `{}`). | ||
operation_parameters: Option<OperationParameters>, | ||
/// The version of the delta_kernel crate used to write this commit. The kernel will always | ||
/// write this field, but it is optional since many tables will not have this field (i.e. any | ||
/// tables not written by kernel). | ||
pub(crate) kernel_version: Option<String>, | ||
/// A place for the engine to store additional metadata associated with this commit encoded as | ||
/// a map of strings. | ||
pub(crate) engine_commit_info: Option<HashMap<String, String>>, | ||
} | ||
|
||
#[derive(Debug, Clone, PartialEq, Eq, Schema)] | ||
|
@@ -417,4 +451,36 @@ mod tests { | |
)])); | ||
assert_eq!(schema, expected); | ||
} | ||
|
||
#[test] | ||
fn test_commit_info_schema() { | ||
let schema = get_log_schema() | ||
.project(&["commitInfo"]) | ||
.expect("Couldn't get commitInfo field"); | ||
|
||
let expected = Arc::new(StructType::new(vec![StructField::new( | ||
"commitInfo", | ||
StructType::new(vec![ | ||
StructField::new("timestamp", DataType::LONG, true), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So to my questions above, are you using these options as nullable fields than truly optional ones? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
StructField::new("operation", DataType::STRING, true), | ||
StructField::new( | ||
"operationParameters", | ||
StructType::new([StructField::new( | ||
"operationParameters", | ||
DataType::LONG, | ||
true, | ||
)]), | ||
true, | ||
), | ||
StructField::new("kernelVersion", DataType::STRING, true), | ||
StructField::new( | ||
"engineCommitInfo", | ||
MapType::new(DataType::STRING, DataType::STRING, false), | ||
true, | ||
), | ||
]), | ||
true, | ||
)])); | ||
assert_eq!(schema, expected); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,7 +16,7 @@ use arrow_array::{ | |
cast::AsArray, new_null_array, Array as ArrowArray, GenericListArray, OffsetSizeTrait, | ||
RecordBatch, StringArray, StructArray, | ||
}; | ||
use arrow_json::ReaderBuilder; | ||
use arrow_json::{LineDelimitedWriter, ReaderBuilder}; | ||
use arrow_schema::{ | ||
DataType as ArrowDataType, Field as ArrowField, FieldRef as ArrowFieldRef, Fields, | ||
SchemaRef as ArrowSchemaRef, | ||
|
@@ -662,6 +662,22 @@ fn parse_json_impl(json_strings: &StringArray, schema: ArrowSchemaRef) -> DeltaR | |
Ok(concat_batches(&schema, output.iter())?) | ||
} | ||
|
||
/// write an arrow RecordBatch to a JSON string by appending to a buffer. | ||
/// | ||
/// TODO (zach): this should stream data to the JSON writer and output an iterator. | ||
zachschuermann marked this conversation as resolved.
Show resolved
Hide resolved
|
||
pub(crate) fn write_json( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. naming is hard, but maybe There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. oh yea sgtm thanks! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. or rather how about |
||
data: impl Iterator<Item = Box<dyn EngineData>> + Send, | ||
) -> DeltaResult<Vec<u8>> { | ||
let mut writer = LineDelimitedWriter::new(Vec::new()); | ||
for chunk in data.into_iter() { | ||
let arrow_data = ArrowEngineData::try_from_engine_data(chunk)?; | ||
let record_batch = arrow_data.record_batch(); | ||
writer.write(record_batch)?; | ||
} | ||
writer.finish()?; | ||
Ok(writer.into_inner()) | ||
} | ||
|
||
#[cfg(test)] | ||
mod tests { | ||
use std::sync::Arc; | ||
|
@@ -1408,4 +1424,24 @@ mod tests { | |
assert_eq!(mask_indices, expect_mask); | ||
assert_eq!(reorder_indices, expect_reorder); | ||
} | ||
|
||
#[test] | ||
fn test_write_json() -> DeltaResult<()> { | ||
let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( | ||
"string", | ||
ArrowDataType::Utf8, | ||
true, | ||
)])); | ||
let data = RecordBatch::try_new( | ||
schema.clone(), | ||
vec![Arc::new(StringArray::from(vec!["string1", "string2"]))], | ||
)?; | ||
let data: Box<dyn EngineData> = Box::new(ArrowEngineData::new(data)); | ||
let json = write_json(Box::new(std::iter::once(data)))?; | ||
assert_eq!( | ||
json, | ||
"{\"string\":\"string1\"}\n{\"string\":\"string2\"}\n".as_bytes() | ||
); | ||
Ok(()) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,10 +11,12 @@ use bytes::{Buf, Bytes}; | |
use futures::{StreamExt, TryStreamExt}; | ||
use object_store::path::Path; | ||
use object_store::{DynObjectStore, GetResultPayload}; | ||
use url::Url; | ||
|
||
use super::executor::TaskExecutor; | ||
use super::file_stream::{FileOpenFuture, FileOpener, FileStream}; | ||
use crate::engine::arrow_utils::parse_json as arrow_parse_json; | ||
use crate::engine::arrow_utils::write_json; | ||
use crate::schema::SchemaRef; | ||
use crate::{ | ||
DeltaResult, EngineData, Error, ExpressionRef, FileDataReadResultIterator, FileMeta, | ||
|
@@ -89,6 +91,31 @@ impl<E: TaskExecutor> JsonHandler for DefaultJsonHandler<E> { | |
self.readahead, | ||
) | ||
} | ||
|
||
// note: for now we just buffer all the data and write it out all at once | ||
fn write_json_file( | ||
&self, | ||
path: &Url, | ||
data: Box<dyn Iterator<Item = Box<dyn EngineData>> + Send>, | ||
_overwrite: bool, | ||
) -> DeltaResult<()> { | ||
Comment on lines
+95
to
+101
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. todo: zach needs to make an issue for follow up There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
let buffer = write_json(data)?; | ||
// Put if absent | ||
let store = self.store.clone(); // cheap Arc | ||
let path = Path::from(path.path()); | ||
let path_str = path.to_string(); | ||
self.task_executor | ||
.block_on(async move { | ||
store | ||
.put_opts(&path, buffer.into(), object_store::PutMode::Create.into()) | ||
.await | ||
}) | ||
.map_err(|e| match e { | ||
object_store::Error::AlreadyExists { .. } => Error::FileAlreadyExists(path_str), | ||
e => e.into(), | ||
})?; | ||
Ok(()) | ||
} | ||
} | ||
|
||
/// A [`FileOpener`] that opens a JSON file and yields a [`FileOpenFuture`] | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
arrow for some reason doesn't like empty fields. will need to revisit this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also follow up for this whole hack in general: #419
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean you can't do just:
It's unfortunate because the schema gets weird this way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea.. i had tried this and our expressions will end up constructing an empty StructArray in arrow and it doesn't play nicely when there are no fields. I get the error
Error: Arrow(InvalidArgumentError("Incorrect array length for StructArray field \"operationParameters\", expected 1 got 0"))
let me take another stab at it