-
Notifications
You must be signed in to change notification settings - Fork 49
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[write] add Transaction with commit info and commit implementation #370
Conversation
8c11dc0
to
740d112
Compare
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #370 +/- ##
==========================================
+ Coverage 78.94% 79.71% +0.76%
==========================================
Files 52 53 +1
Lines 10756 11316 +560
Branches 10756 11316 +560
==========================================
+ Hits 8491 9020 +529
Misses 1796 1796
- Partials 469 500 +31 ☔ View full report in Codecov by Sentry. |
f1dcda9
to
5779f06
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just some style nits
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Flushing comments at EOD... will try to finish the review ASAP.
kernel/src/engine/default/json.rs
Outdated
.put_opts( | ||
&Path::from(path.path()), | ||
buffer.into(), | ||
object_store::PutMode::Create.into(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
aside: Does object_store support the new S3 put-if-absent capability?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good question! theres a recent issue but I haven't followed up much yet on it: apache/arrow-rs#6285
kernel/src/engine/default/json.rs
Outdated
fn write_json_file( | ||
&self, | ||
path: &url::Url, | ||
data: Box<dyn Iterator<Item = Box<dyn EngineData>> + Send + '_>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Out of curiosity, why does this need a lifetime specification?
It doesn't outlive the method call and isn't ever passed to any async code?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also: can we get away with this as a workaround?
data: Box<dyn Iterator<Item = Box<dyn EngineData>> + Send + '_>, | |
data: impl Iterator<Item = Box<dyn EngineData>> + Send, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lifetime specification is since it defaults to 'static
and we don't want to impose that requirement. seemed odd to me that it would default to static but didn't take too much time to dive into it yet. and secondarily I think if we use impl Iterator
it causes the trait not to be object-safe
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Common 'static
misconception strikes again :) I don't think it's much of a burden, just means it has to be a "real owned" type. given we're just gonna iterate the data here, what's the use case for object safety?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm seeing a compiler error that whenever we have impl Iterator
: it breaks JsonHandler
from being object-safe (which it must be since we use it as a trait object in Arc<dyn JsonHandler>
And you're right - didn't need the '_
at all :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
flushing comments
kernel/src/engine/default/json.rs
Outdated
fn write_json_file( | ||
&self, | ||
path: &url::Url, | ||
data: Box<dyn Iterator<Item = Box<dyn EngineData>> + Send + '_>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Common 'static
misconception strikes again :) I don't think it's much of a burden, just means it has to be a "real owned" type. given we're just gonna iterate the data here, what's the use case for object safety?
struct CommitInfo { | ||
/// The time this logical file was created, as milliseconds since the epoch. | ||
/// TODO should this be a Timestamp? | ||
pub(crate) timestamp: i64, | ||
pub(crate) timestamp: Option<i64>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be a u64
? There is no chance of this being negative?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we don't impl Schema on u64
- the LONG
type is i64
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, this is a long time issue in that java doesn't have unsigned types. Somewhere back in our issue list is something about figuring this out :p
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We may or may not need u64
support in general, but parquet timestamps are signed by definition:
In data annotated with the TIMESTAMP logical type, each value is a single int64 number
/// An arbitrary string that identifies the operation associated with this commit. This is | ||
/// specified by the engine. | ||
pub(crate) operation: String, | ||
pub(crate) operation: Option<String>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this actually optional? Would a commit ever not have an associated action?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is indeed optional according to the protocol. that is, we should never require it to be present when reading commit info
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Technically, the protocol has nothing to say about any of this.
It just so happens that Delta-spark requires this field, along with our favorite operationParameters
. It's NOT optional AFAICT?
(put another way -- it's optional from kernel-as-reader perspective, but if we want compat with Delta-spark then it's required from kernel-as-writer perspective)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
right, to be clear I mean that it is optional because the protocol says commit info can be anything.
since we don't really have a way of communicating reader/writer optionality I can just add a comment saying that this is optional since it is actually optional from a read/enforcement perspective but that kernel always writes it
@@ -454,12 +461,12 @@ mod tests { | |||
let expected = Arc::new(StructType::new(vec![StructField::new( | |||
"commitInfo", | |||
StructType::new(vec![ | |||
StructField::new("timestamp", DataType::LONG, false), | |||
StructField::new("operation", DataType::STRING, false), | |||
StructField::new("timestamp", DataType::LONG, true), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So to my questions above, are you using these options as nullable fields than truly optional ones?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Option
in schema-land is just a nullable field so I think the answer is yes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
flushing some comments.
struct CommitInfo { | ||
/// The time this logical file was created, as milliseconds since the epoch. | ||
/// TODO should this be a Timestamp? | ||
pub(crate) timestamp: i64, | ||
pub(crate) timestamp: Option<i64>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, this is a long time issue in that java doesn't have unsigned types. Somewhere back in our issue list is something about figuring this out :p
Co-authored-by: Nick Lanham <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm! just one thing to fix.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall approach looking good. Several questions, nits, and potential panic sites, tho.
struct CommitInfo { | ||
/// The time this logical file was created, as milliseconds since the epoch. | ||
/// TODO should this be a Timestamp? | ||
pub(crate) timestamp: i64, | ||
pub(crate) timestamp: Option<i64>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We may or may not need u64
support in general, but parquet timestamps are signed by definition:
In data annotated with the TIMESTAMP logical type, each value is a single int64 number
/// An arbitrary string that identifies the operation associated with this commit. This is | ||
/// specified by the engine. | ||
pub(crate) operation: String, | ||
pub(crate) operation: Option<String>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Technically, the protocol has nothing to say about any of this.
It just so happens that Delta-spark requires this field, along with our favorite operationParameters
. It's NOT optional AFAICT?
(put another way -- it's optional from kernel-as-reader perspective, but if we want compat with Delta-spark then it's required from kernel-as-writer perspective)
@@ -662,6 +662,21 @@ fn parse_json_impl(json_strings: &StringArray, schema: ArrowSchemaRef) -> DeltaR | |||
Ok(concat_batches(&schema, output.iter())?) | |||
} | |||
|
|||
/// serialize an arrow RecordBatch to a JSON string by appending to a buffer. | |||
// TODO (zach): this should stream data to the JSON writer and output an iterator. | |||
pub(crate) fn to_json_bytes( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there was a comment thread before, recommending to rename this from write_json_bytes
to to_json_bytes
. But that seems to be based on current behavior rather than the final plan?
(tho I guess as pub(crate)
we can change it easily enough when the time comes?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yup probably advocate to just keep this to_json_bytes
since that's what it does and we can rename/evolve in the future?
const UNKNOWN_OPERATION: &str = "UNKNOWN"; | ||
|
||
/// A transaction represents an in-progress write to a table. | ||
pub struct Transaction { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How is commit info a required field, if it's passed as Option
?
let timestamp: i64 = SystemTime::now() | ||
.duration_since(UNIX_EPOCH) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Timestamp manipulation code is starting to proliferate... and it's very manual/duplicated right now. Maybe it's time to start thinking about an internal API we can use for working with timestamp values?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yup +1 I can create a follow-up. And I think it makes testing hard since we have places that actually use local machine time and we need to have deterministic tests
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This PR is the second (of two) major pieces for supporting simple blind appends. It implements: 1. **new `Transaction` APIs** for appending data to delta tables: a. `get_write_context()` to get a `WriteContext` to pass to the data path which includes all information needed to write: `target directory`, `snapshot schema`, `transformation expression`, and (future: columns to collect stats on) b. `add_write_metadata(impl EngineData)` to add metadata about a write to the transaction along with a new static method `transaction::get_write_metadata_schema` to provide the expected schema of this engine data. c. new machinery in 'commit' method to commit new `Add` actions for each row of write_metadata from the API above. 2. **new default engine capabilities** for using the default engine to write parquet data (to append to tables): a. parquet handler can now `write_parquet_file(EngineData)` b. usage example in `write.rs` tests for now 3. **new append tests** in the `write.rs` integration test suite Details and some follow-ups: - the parquet writing (similar to JSON) currently just buffers everything into memory before issuing one big PUT. we should make this smarter: single PUT for small data and MultipartUpload for larger data. tracking in #418 - schema enforcement is done at the data layer. this means it is up to the engine to call the expression evaluation and we expect this to fail if the output schema is incorrect (see `test_append_invalid_schema` in `write.rs` integration test). we may want to change this in the future to eagerly error based on the engine providing a schema up front at metadata time (transaction creation time) based on #370 resolves #390
This PR does 4 main things:
reorganizetransaction.rs
so that the transaction action is now moved to actions moduleEDIT: now in move
transaction
module intoactions/
and rename toset_transaction
#386Transaction
API which includes:a.
Table.new_transaction()
to create a new transaction from the latest snapshot of the tableb.
Transaction.with_commit_info(engine_commit_info: Box<dyn EngineData>)
to add single-row commit info in the form of amap<string, string>
. required to commit.c.
Transaction.with_operation(operation: String)
to set the operation name of the transaction (persisted in commit info)d.
Transaction.commit() // consumes transaction
to commit the transaction to the log (currently only supporting committing the commit info)write_json_file(impl Iterator<Item = Box<dyn EngineData>>)
(and a default engine implementation for this)write.rs
to house many of our write tests as it's implementedresolves #378