Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[write] add Transaction with commit info and commit implementation #370

Merged
merged 72 commits into from
Oct 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
72 commits
Select commit Hold shift + click to select a range
f40221a
new Transaction API, write_json. empty commit for now
zachschuermann Oct 1, 2024
b16491c
commit info working
zachschuermann Oct 2, 2024
432a339
fix commit info
zachschuermann Oct 2, 2024
93fab4d
well that was a mess
zachschuermann Oct 3, 2024
64d7eaf
better
zachschuermann Oct 4, 2024
05e9488
cleanup
zachschuermann Oct 7, 2024
2282cd8
fmt
zachschuermann Oct 7, 2024
21928b8
test cleanup
zachschuermann Oct 7, 2024
532ea8c
appease clippy
zachschuermann Oct 8, 2024
215ed4e
fmt
zachschuermann Oct 8, 2024
78c8464
lil cleanup
zachschuermann Oct 8, 2024
0f1f955
add a test
zachschuermann Oct 8, 2024
8cc9cc9
better assert
zachschuermann Oct 8, 2024
114c16f
address feedback
zachschuermann Oct 8, 2024
b7c351f
address feedback, cleanup
zachschuermann Oct 10, 2024
9a9e9d3
fmt
zachschuermann Oct 10, 2024
6b0c2d4
Update kernel/src/engine/sync/json.rs
zachschuermann Oct 10, 2024
d1af098
more feedback
zachschuermann Oct 10, 2024
0ba047d
nits
zachschuermann Oct 10, 2024
667a8e2
add empty commit test
zachschuermann Oct 10, 2024
52bd5f2
add empty commit info tests, debugging expr
zachschuermann Oct 10, 2024
7696d7d
just make my test fail
zachschuermann Oct 10, 2024
fa6c81d
try to leverage ParsedLogPath?
zachschuermann Oct 11, 2024
a3abbfa
fmt
zachschuermann Oct 11, 2024
d7ea4c4
enforce single-row commit info
zachschuermann Oct 11, 2024
bac1d09
error FFI
zachschuermann Oct 11, 2024
bc541dd
better path api
zachschuermann Oct 11, 2024
fa1caf4
comment
zachschuermann Oct 11, 2024
9d875cd
clean
zachschuermann Oct 11, 2024
023b85a
fix all the schema mess
zachschuermann Oct 11, 2024
c1c6e2a
remove lifetime
zachschuermann Oct 11, 2024
da43cf2
fix executor
zachschuermann Oct 11, 2024
26b8dbd
docs and i forgot a test
zachschuermann Oct 11, 2024
858f3fb
add commit info schema test
zachschuermann Oct 13, 2024
1ef5ffc
add sync json writer, add FileAlreadyExists error
zachschuermann Oct 13, 2024
6ee69e7
fix rebase
zachschuermann Oct 14, 2024
0b2b1ed
remove old file
zachschuermann Oct 14, 2024
2258549
revert arrow_expression and default/mod.rs
zachschuermann Oct 14, 2024
f463e22
revert little spelling fix (in separate pr)
zachschuermann Oct 14, 2024
1149a17
clean up some crate:: with use
zachschuermann Oct 14, 2024
3877ccc
cleanup
zachschuermann Oct 14, 2024
0b5b301
Merge remote-tracking branch 'upstream/main' into transaction
zachschuermann Oct 17, 2024
3daed9b
it's getting close
zachschuermann Oct 18, 2024
327bbde
have i done it?
zachschuermann Oct 18, 2024
6d2b41a
wip
zachschuermann Oct 21, 2024
0abd291
remove my wrong null_literal for map lol rip
zachschuermann Oct 21, 2024
b793523
back to using empty struct for operationParameters
zachschuermann Oct 22, 2024
2f4e4d0
comment
zachschuermann Oct 22, 2024
68edef2
Merge remote-tracking branch 'upstream/main' into transaction
zachschuermann Oct 22, 2024
673af96
wip need to fix commit info operationParameters
zachschuermann Oct 22, 2024
559bbea
fix commit info
zachschuermann Oct 22, 2024
5afe8db
fix error ffi
zachschuermann Oct 22, 2024
7f87591
fmt
zachschuermann Oct 22, 2024
76cdfaa
remove my debugging
zachschuermann Oct 22, 2024
cc7598c
docs, cleanup, better tests
zachschuermann Oct 23, 2024
a1ba008
clippy
zachschuermann Oct 23, 2024
525b8ff
rename + docs
zachschuermann Oct 23, 2024
a86495a
make CommitInfo have correct schema and isolate the hack inside gener…
zachschuermann Oct 23, 2024
0a2ecfc
Merge remote-tracking branch 'upstream/main' into transaction
zachschuermann Oct 23, 2024
c22f625
fix tests to match on Backtraced { .. }
zachschuermann Oct 23, 2024
630c694
appease clippy
zachschuermann Oct 23, 2024
f5530f9
fmt
zachschuermann Oct 23, 2024
37db615
Merge remote-tracking branch 'upstream/main' into transaction
zachschuermann Oct 23, 2024
d7ad2e6
use column_* macros
zachschuermann Oct 23, 2024
2141ecf
Update kernel/src/engine/arrow_utils.rs
zachschuermann Oct 23, 2024
75c976c
rename
zachschuermann Oct 23, 2024
81866c9
Merge remote-tracking branch 'refs/remotes/origin/transaction' into t…
zachschuermann Oct 23, 2024
4908174
make generate_commit_info take & not Arc
zachschuermann Oct 24, 2024
20ffd33
fix unwrap
zachschuermann Oct 24, 2024
4aba873
address comments
zachschuermann Oct 24, 2024
b4feb4f
Merge remote-tracking branch 'upstream/main' into transaction
zachschuermann Oct 24, 2024
1fc535e
make it with_operation and with_commit_info
zachschuermann Oct 25, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions ffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,9 @@ pub enum KernelError {
InternalError,
InvalidExpression,
InvalidLogPath,
InvalidCommitInfo,
FileAlreadyExists,
MissingCommitInfo,
}

impl From<Error> for KernelError {
Expand Down Expand Up @@ -376,6 +379,9 @@ impl From<Error> for KernelError {
} => Self::from(*source),
Error::InvalidExpressionEvaluation(_) => KernelError::InvalidExpression,
Error::InvalidLogPath(_) => KernelError::InvalidLogPath,
Error::InvalidCommitInfo(_) => KernelError::InvalidCommitInfo,
Error::FileAlreadyExists(_) => KernelError::FileAlreadyExists,
Error::MissingCommitInfo => KernelError::MissingCommitInfo,
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions kernel/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ delta_kernel_derive = { path = "../derive-macros", version = "0.4.0" }
# used for developer-visibility
visibility = "0.1.1"

# Used in the sync engine
tempfile = { version = "3", optional = true }
# Used in default engine
arrow-buffer = { workspace = true, optional = true }
arrow-array = { workspace = true, optional = true, features = ["chrono-tz"] }
Expand Down Expand Up @@ -99,6 +101,7 @@ sync-engine = [
"arrow-json",
"arrow-select",
"parquet",
"tempfile",
]
integration-test = [
"hdfs-native-object-store/integration-test",
Expand Down
58 changes: 56 additions & 2 deletions kernel/src/actions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ static LOG_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
.into()
});

static LOG_COMMIT_INFO_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
StructType::new([Option::<CommitInfo>::get_struct_field(COMMIT_INFO_NAME)]).into()
});

#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
#[cfg_attr(not(feature = "developer-visibility"), visibility::make(pub(crate)))]
fn get_log_schema() -> &'static SchemaRef {
Expand All @@ -58,6 +62,10 @@ fn get_log_add_schema() -> &'static SchemaRef {
&LOG_ADD_SCHEMA
}

pub(crate) fn get_log_commit_info_schema() -> &'static SchemaRef {
&LOG_COMMIT_INFO_SCHEMA
}

#[derive(Debug, Clone, PartialEq, Eq, Schema)]
pub struct Format {
/// Name of the encoding for files in this table
Expand Down Expand Up @@ -147,8 +155,26 @@ impl Protocol {
}

#[derive(Debug, Clone, PartialEq, Eq, Schema)]
pub struct CommitInfo {
pub kernel_version: Option<String>,
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
#[cfg_attr(not(feature = "developer-visibility"), visibility::make(pub(crate)))]
struct CommitInfo {
/// The time this logical file was created, as milliseconds since the epoch.
/// Read: optional, write: required (that is, kernel always writes).
/// If in-commit timestamps are enabled, this is always required.
pub(crate) timestamp: Option<i64>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be a u64? There is no chance of this being negative?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't impl Schema on u64 - the LONG type is i64

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this is a long time issue in that java doesn't have unsigned types. Somewhere back in our issue list is something about figuring this out :p

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may or may not need u64 support in general, but parquet timestamps are signed by definition:

In data annotated with the TIMESTAMP logical type, each value is a single int64 number

/// An arbitrary string that identifies the operation associated with this commit. This is
/// specified by the engine. Read: optional, write: required (that is, kernel alwarys writes).
pub(crate) operation: Option<String>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this actually optional? Would a commit ever not have an associated action?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is indeed optional according to the protocol. that is, we should never require it to be present when reading commit info

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically, the protocol has nothing to say about any of this.

It just so happens that Delta-spark requires this field, along with our favorite operationParameters. It's NOT optional AFAICT?

(put another way -- it's optional from kernel-as-reader perspective, but if we want compat with Delta-spark then it's required from kernel-as-writer perspective)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right, to be clear I mean that it is optional because the protocol says commit info can be anything.

since we don't really have a way of communicating reader/writer optionality I can just add a comment saying that this is optional since it is actually optional from a read/enforcement perspective but that kernel always writes it

/// Map of arbitrary string key-value pairs that provide additional information about the
/// operation. This is specified by the engine. For now this is always empty on write.
pub(crate) operation_parameters: Option<HashMap<String, String>>,
/// The version of the delta_kernel crate used to write this commit. The kernel will always
/// write this field, but it is optional since many tables will not have this field (i.e. any
/// tables not written by kernel).
pub(crate) kernel_version: Option<String>,
/// A place for the engine to store additional metadata associated with this commit encoded as
/// a map of strings.
pub(crate) engine_commit_info: Option<HashMap<String, String>>,
}

#[derive(Debug, Clone, PartialEq, Eq, Schema)]
Expand Down Expand Up @@ -427,4 +453,32 @@ mod tests {
)]));
assert_eq!(schema, expected);
}

#[test]
fn test_commit_info_schema() {
let schema = get_log_schema()
.project(&["commitInfo"])
.expect("Couldn't get commitInfo field");

let expected = Arc::new(StructType::new(vec![StructField::new(
"commitInfo",
StructType::new(vec![
StructField::new("timestamp", DataType::LONG, true),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So to my questions above, are you using these options as nullable fields than truly optional ones?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Option in schema-land is just a nullable field so I think the answer is yes?

StructField::new("operation", DataType::STRING, true),
StructField::new(
"operationParameters",
MapType::new(DataType::STRING, DataType::STRING, false),
true,
),
StructField::new("kernelVersion", DataType::STRING, true),
StructField::new(
"engineCommitInfo",
MapType::new(DataType::STRING, DataType::STRING, false),
true,
),
]),
true,
)]));
assert_eq!(schema, expected);
}
}
37 changes: 36 additions & 1 deletion kernel/src/engine/arrow_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use arrow_array::{
cast::AsArray, new_null_array, Array as ArrowArray, GenericListArray, OffsetSizeTrait,
RecordBatch, StringArray, StructArray,
};
use arrow_json::ReaderBuilder;
use arrow_json::{LineDelimitedWriter, ReaderBuilder};
use arrow_schema::{
DataType as ArrowDataType, Field as ArrowField, FieldRef as ArrowFieldRef, Fields,
SchemaRef as ArrowSchemaRef,
Expand Down Expand Up @@ -662,6 +662,21 @@ fn parse_json_impl(json_strings: &StringArray, schema: ArrowSchemaRef) -> DeltaR
Ok(concat_batches(&schema, output.iter())?)
}

/// serialize an arrow RecordBatch to a JSON string by appending to a buffer.
// TODO (zach): this should stream data to the JSON writer and output an iterator.
pub(crate) fn to_json_bytes(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there was a comment thread before, recommending to rename this from write_json_bytes to to_json_bytes. But that seems to be based on current behavior rather than the final plan?

(tho I guess as pub(crate) we can change it easily enough when the time comes?)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup probably advocate to just keep this to_json_bytes since that's what it does and we can rename/evolve in the future?

data: impl Iterator<Item = Box<dyn EngineData>> + Send,
) -> DeltaResult<Vec<u8>> {
let mut writer = LineDelimitedWriter::new(Vec::new());
for chunk in data.into_iter() {
let arrow_data = ArrowEngineData::try_from_engine_data(chunk)?;
let record_batch = arrow_data.record_batch();
writer.write(record_batch)?;
}
writer.finish()?;
Ok(writer.into_inner())
}

#[cfg(test)]
mod tests {
use std::sync::Arc;
Expand Down Expand Up @@ -1408,4 +1423,24 @@ mod tests {
assert_eq!(mask_indices, expect_mask);
assert_eq!(reorder_indices, expect_reorder);
}

#[test]
fn test_write_json() -> DeltaResult<()> {
let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
"string",
ArrowDataType::Utf8,
true,
)]));
let data = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(StringArray::from(vec!["string1", "string2"]))],
)?;
let data: Box<dyn EngineData> = Box::new(ArrowEngineData::new(data));
let json = to_json_bytes(Box::new(std::iter::once(data)))?;
assert_eq!(
json,
"{\"string\":\"string1\"}\n{\"string\":\"string2\"}\n".as_bytes()
);
Ok(())
}
}
27 changes: 27 additions & 0 deletions kernel/src/engine/default/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@ use bytes::{Buf, Bytes};
use futures::{StreamExt, TryStreamExt};
use object_store::path::Path;
use object_store::{DynObjectStore, GetResultPayload};
use url::Url;

use super::executor::TaskExecutor;
use super::file_stream::{FileOpenFuture, FileOpener, FileStream};
use crate::engine::arrow_utils::parse_json as arrow_parse_json;
use crate::engine::arrow_utils::to_json_bytes;
use crate::schema::SchemaRef;
use crate::{
DeltaResult, EngineData, Error, ExpressionRef, FileDataReadResultIterator, FileMeta,
Expand Down Expand Up @@ -89,6 +91,31 @@ impl<E: TaskExecutor> JsonHandler for DefaultJsonHandler<E> {
self.readahead,
)
}

// note: for now we just buffer all the data and write it out all at once
fn write_json_file(
&self,
path: &Url,
data: Box<dyn Iterator<Item = Box<dyn EngineData>> + Send>,
_overwrite: bool,
) -> DeltaResult<()> {
Comment on lines +95 to +101
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

todo: zach needs to make an issue for follow up

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let buffer = to_json_bytes(data)?;
// Put if absent
let store = self.store.clone(); // cheap Arc
let path = Path::from(path.path());
let path_str = path.to_string();
self.task_executor
.block_on(async move {
store
.put_opts(&path, buffer.into(), object_store::PutMode::Create.into())
.await
})
.map_err(|e| match e {
object_store::Error::AlreadyExists { .. } => Error::FileAlreadyExists(path_str),
e => e.into(),
})?;
Ok(())
}
}

/// A [`FileOpener`] that opens a JSON file and yields a [`FileOpenFuture`]
Expand Down
102 changes: 100 additions & 2 deletions kernel/src/engine/sync/json.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
use std::{fs::File, io::BufReader};
use std::{fs::File, io::BufReader, io::Write};

use arrow_schema::SchemaRef as ArrowSchemaRef;
use tempfile::NamedTempFile;
use url::Url;

use super::read_files;
use crate::engine::arrow_data::ArrowEngineData;
use crate::engine::arrow_utils::parse_json as arrow_parse_json;
use crate::engine::arrow_utils::to_json_bytes;
use crate::schema::SchemaRef;
use crate::{
DeltaResult, EngineData, ExpressionRef, FileDataReadResultIterator, FileMeta, JsonHandler,
DeltaResult, EngineData, Error, ExpressionRef, FileDataReadResultIterator, FileMeta,
JsonHandler,
};

pub(crate) struct SyncJsonHandler;
Expand Down Expand Up @@ -41,4 +45,98 @@ impl JsonHandler for SyncJsonHandler {
) -> DeltaResult<Box<dyn EngineData>> {
arrow_parse_json(json_strings, output_schema)
}

// For sync writer we write data to a tmp file then atomically rename it to the final path.
// This is highly OS-dependent and for now relies on the atomicity of tempfile's
// `persist_noclobber`.
fn write_json_file(
&self,
path: &Url,
data: Box<dyn Iterator<Item = Box<dyn EngineData>> + Send>,
_overwrite: bool,
) -> DeltaResult<()> {
let path = path
.to_file_path()
.map_err(|_| crate::Error::generic("sync client can only read local files"))?;
let Some(parent) = path.parent() else {
return Err(crate::Error::generic(format!(
"no parent found for {:?}",
path
)));
};

// write data to tmp file
let mut tmp_file = NamedTempFile::new_in(parent)?;
let buf = to_json_bytes(data)?;
tmp_file.write_all(&buf)?;
tmp_file.flush()?;

// use 'persist_noclobber' to atomically rename tmp file to final path
tmp_file
.persist_noclobber(path.clone())
.map_err(|e| match e {
tempfile::PersistError { error, .. }
if error.kind() == std::io::ErrorKind::AlreadyExists =>
{
Error::FileAlreadyExists(path.to_string_lossy().to_string())
}
e => Error::IOError(e.into()),
})?;
Ok(())
}
}

#[cfg(test)]
mod tests {
use super::*;

use std::sync::Arc;

use arrow_array::{RecordBatch, StringArray};
use arrow_schema::DataType as ArrowDataType;
use arrow_schema::Field;
use arrow_schema::Schema as ArrowSchema;
use serde_json::json;
use url::Url;

#[test]
fn test_write_json_file() -> DeltaResult<()> {
let test_dir = tempfile::tempdir().unwrap();
let path = test_dir.path().join("00000000000000000001.json");
let handler = SyncJsonHandler;

let schema = Arc::new(ArrowSchema::new(vec![Field::new(
"dog",
ArrowDataType::Utf8,
true,
)]));
let data = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(StringArray::from(vec!["remi", "wilson"]))],
)?;
let data: Box<dyn EngineData> = Box::new(ArrowEngineData::new(data));
let empty: Box<dyn EngineData> =
Box::new(ArrowEngineData::new(RecordBatch::new_empty(schema)));

let url = Url::from_file_path(path.clone()).unwrap();
handler
.write_json_file(&url, Box::new(std::iter::once(data)), false)
.expect("write json file");
assert!(matches!(
handler.write_json_file(&url, Box::new(std::iter::once(empty)), false),
Err(Error::FileAlreadyExists(_))
));

let file = std::fs::read_to_string(path)?;
let json: Vec<_> = serde_json::Deserializer::from_str(&file)
.into_iter::<serde_json::Value>()
.flatten()
.collect();
assert_eq!(
json,
vec![json!({"dog": "remi"}), json!({"dog": "wilson"}),]
);

Ok(())
}
}
12 changes: 12 additions & 0 deletions kernel/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,18 @@ pub enum Error {
/// Unable to parse the name of a log path
#[error("Invalid log path: {0}")]
InvalidLogPath(String),

/// Invalid commit info passed to the transaction
#[error("Invalid commit info: {0}")]
InvalidCommitInfo(String),

/// Commit info was not passed to the transaction
#[error("Missing commit info")]
MissingCommitInfo,

/// The file already exists at the path, prohibiting a non-overwrite write
#[error("File already exists: {0}")]
FileAlreadyExists(String),
}

// Convenience constructors for Error types that take a String argument
Expand Down
Loading
Loading