Skip to content

Commit

Permalink
better
Browse files Browse the repository at this point in the history
  • Loading branch information
zachschuermann committed Oct 4, 2024
1 parent 740d112 commit f1dcda9
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 133 deletions.
1 change: 0 additions & 1 deletion kernel/src/engine/default/filesystem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ impl<E: TaskExecutor> FileSystemClient for ObjectStoreFileSystemClient<E> {
match meta {
Ok(meta) => {
let mut location = url.clone();
println!("listed location: {:?}", meta.location);
location.set_path(&format!("/{}", meta.location.as_ref()));
sender
.send(Ok(FileMeta {
Expand Down
13 changes: 1 addition & 12 deletions kernel/src/engine/default/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,6 @@ impl<E: TaskExecutor> JsonHandler for DefaultJsonHandler<E> {
data: Box<dyn Iterator<Item = Box<dyn EngineData>> + Send + 'a>,
_overwrite: bool,
) -> DeltaResult<()> {
// Initialize schema and batches
let mut schema: Option<ArrowSchemaRef> = None;
let mut batches: Vec<RecordBatch> = Vec::new();

Expand All @@ -155,29 +154,19 @@ impl<E: TaskExecutor> JsonHandler for DefaultJsonHandler<E> {
schema = Some(record_batch.schema());
}

println!("[Engine put_json] schema: {:#?}", record_batch.schema());
batches.push(record_batch.clone());
}

for batch in batches.iter() {
println!("===============================================================");
println!("[Engine put_json] batch: {:#?}", batch);
println!("[Engine put_json] schema: {:#?}", batch.schema());
println!("===============================================================");
}

// collect all batches
let batches: Vec<&RecordBatch> = batches.iter().collect();

// Write the concatenated batch to JSON
// write the batches to JSON
let mut writer = arrow_json::LineDelimitedWriter::new(Vec::new());
writer.write_batches(&batches)?;
writer.finish()?;

let buffer = writer.into_inner();

println!("[Engine put_json] commit path: {:?}", path.path());

// Put if absent
futures::executor::block_on(async {
self.store
Expand Down
2 changes: 0 additions & 2 deletions kernel/src/engine/default/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ impl<E: TaskExecutor> DefaultEngine<E> {
{
// table root is the path of the table in the ObjectStore
let (store, table_root) = parse_url_opts(table_root, options)?;
println!("DEFAULT ENGINE INIT try_new table root: {:?}", table_root);
let store = Arc::new(store);
Ok(Self {
file_system: Arc::new(ObjectStoreFileSystemClient::new(
Expand All @@ -79,7 +78,6 @@ impl<E: TaskExecutor> DefaultEngine<E> {
/// - `table_root_path`: The root path of the table within storage.
/// - `task_executor`: Used to spawn async IO tasks. See [executor::TaskExecutor].
pub fn new(store: Arc<DynObjectStore>, table_root_path: Path, task_executor: Arc<E>) -> Self {
println!("DEFAULT ENGINE INIT new table root: {:?}", table_root_path);
Self {
file_system: Arc::new(ObjectStoreFileSystemClient::new(
store.clone(),
Expand Down
2 changes: 1 addition & 1 deletion kernel/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ impl Table {
Snapshot::try_new(self.location.clone(), engine, version)
}

/// Create a new write transaction builder for the table.
/// Create a new write transaction for this table.
pub fn new_transaction(&self, engine: &dyn Engine) -> DeltaResult<Transaction> {
let latest_snapshot = Snapshot::try_new(self.location.clone(), engine, None)?;
Ok(Transaction::new(latest_snapshot))
Expand Down
226 changes: 109 additions & 117 deletions kernel/src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,17 @@ use std::sync::Arc;

use crate::actions::get_log_schema;
use crate::expressions::Scalar;
use crate::schema::{Schema, StructType};
use crate::schema::{Schema, SchemaRef, StructType};
use crate::snapshot::Snapshot;
use crate::{DataType, Expression};
use crate::{DeltaResult, Engine, EngineData};

pub struct Transaction {
read_snapshot: Arc<Snapshot>,
commit_info: Option<CommitInfoData>,
commit_info: Option<EngineCommitInfo>,
}

pub struct CommitInfoData {
struct EngineCommitInfo {
data: Box<dyn EngineData>,
schema: Schema,
}
Expand All @@ -36,120 +36,11 @@ impl Transaction {
}

pub fn commit(self, engine: &dyn Engine) -> DeltaResult<CommitResult> {
use crate::actions::{
ADD_NAME, COMMIT_INFO_NAME, METADATA_NAME, PROTOCOL_NAME, REMOVE_NAME, TRANSACTION_NAME,
};

// step one: construct the iterator of actions we want to commit
let action_schema = get_log_schema();

let actions = self.commit_info.into_iter().map(|commit_info| {
// expression to select all the columns
let mut commit_info_expr = vec![Expression::literal("v0.3.1")];
commit_info_expr.extend(
commit_info
.schema
.fields()
.map(|f| Expression::column(f.name()))
.collect::<Vec<_>>(),
);
let commit_info_expr = Expression::Struct(vec![
Expression::Literal(Scalar::Null(
action_schema.field(ADD_NAME).unwrap().data_type().clone(),
)),
Expression::Literal(Scalar::Null(
action_schema
.field(REMOVE_NAME)
.unwrap()
.data_type()
.clone(),
)),
Expression::Literal(Scalar::Null(
action_schema
.field(METADATA_NAME)
.unwrap()
.data_type()
.clone(),
)),
Expression::Literal(Scalar::Null(
action_schema
.field(PROTOCOL_NAME)
.unwrap()
.data_type()
.clone(),
)),
Expression::Literal(Scalar::Null(
action_schema
.field(TRANSACTION_NAME)
.unwrap()
.data_type()
.clone(),
)),
Expression::Struct(commit_info_expr),
]);

// add the commit info fields to the action schema.
// e.g. if engine's commit info is {engineInfo: string, operation: string}
// then the 'commit_info' field in the actions will be:
// {kernelVersion: string, engineInfo: string, operation: string}
// let action_fields = action_schema
// .project_as_struct(&[
// ADD_NAME,
// REMOVE_NAME,
// METADATA_NAME,
// PROTOCOL_NAME,
// TRANSACTION_NAME,
// ])
// .unwrap()
// .fields();
// let kernel_commit_info_fields = action_schema
// .project_as_struct(&[COMMIT_INFO_NAME])
// .unwrap()
// .fields();
// let engine_commit_info_fields = commit_info.schema.fields();
// let commit_info_fields = kernel_commit_info_fields.chain(engine_commit_info_fields);
// let action_schema = StructType::new(
// std::iter::once(commit_info_fields)
// .chain(action_fields)
// .collect::Vec<_>()
// );

let mut action_fields = action_schema.fields().collect::<Vec<_>>();
let commit_info_field = action_fields.pop().unwrap();
let mut commit_info_fields =
if let DataType::Struct(commit_info_schema) = commit_info_field.data_type() {
commit_info_schema.fields().collect::<Vec<_>>()
} else {
unreachable!()
};
commit_info_fields.extend(commit_info.schema.fields());
let commit_info_schema =
StructType::new(commit_info_fields.into_iter().map(|f| f.clone()).collect());
let mut action_fields = action_fields
.into_iter()
.map(|f| f.clone())
.collect::<Vec<_>>();
action_fields.push(crate::schema::StructField::new(
COMMIT_INFO_NAME,
commit_info_schema,
true,
));
let action_schema = StructType::new(action_fields);

println!("commit_info schema: {:#?}", commit_info.schema);
println!("action_schema: {:#?}", action_schema);

// commit info has arbitrary schema ex: {engineInfo: string, operation: string}
// we want to bundle it up and put it in the commit_info field of the actions.
let commit_info_evaluator = engine.get_expression_handler().get_evaluator(
commit_info.schema.into(),
commit_info_expr,
action_schema.into(),
);
commit_info_evaluator
.evaluate(commit_info.data.as_ref())
.unwrap()
});
//
// TODO: enforce single row commit info
// TODO: for now we always require commit info
let (actions, _actions_schema) = generate_commit_info(engine, self.commit_info)?;

// step two: figure out the commit version and path to write
let commit_version = &self.read_snapshot.version() + 1;
Expand All @@ -171,7 +62,7 @@ impl Transaction {
/// first action in the commit. Note it is required in order to commit. If the engine does not
/// require any commit info, pass an empty `EngineData`.
pub fn commit_info(&mut self, commit_info: Box<dyn EngineData>, schema: Schema) {
self.commit_info = Some(CommitInfoData {
self.commit_info = Some(EngineCommitInfo {
data: commit_info,
schema,
});
Expand All @@ -181,3 +72,104 @@ impl Transaction {
pub enum CommitResult {
Committed(crate::Version),
}

fn generate_commit_info<'a>(
engine: &'a dyn Engine,
engine_commit_info: Option<EngineCommitInfo>,
) -> DeltaResult<(
Box<dyn Iterator<Item = Box<dyn EngineData>> + Send + 'a>,
SchemaRef,
)> {
use crate::actions::{
ADD_NAME, COMMIT_INFO_NAME, METADATA_NAME, PROTOCOL_NAME, REMOVE_NAME, TRANSACTION_NAME,
};

let action_schema =
Arc::new(engine_commit_info
.as_ref()
.map_or(get_log_schema().clone(), |commit_info| {
let mut action_fields = get_log_schema().fields().collect::<Vec<_>>();
let commit_info_field = action_fields.pop().unwrap();
let mut commit_info_fields =
if let DataType::Struct(commit_info_schema) = commit_info_field.data_type() {
commit_info_schema.fields().collect::<Vec<_>>()
} else {
unreachable!()
};
commit_info_fields.extend(commit_info.schema.fields());
let commit_info_schema =
StructType::new(commit_info_fields.into_iter().map(|f| f.clone()).collect());
let mut action_fields = action_fields
.into_iter()
.map(|f| f.clone())
.collect::<Vec<_>>();
action_fields.push(crate::schema::StructField::new(
COMMIT_INFO_NAME,
commit_info_schema,
true,
));
StructType::new(action_fields)
}));

let action_schema_ref = Arc::clone(&action_schema);
let actions = engine_commit_info.into_iter().map(move |commit_info| {
// TODO RENAME
let engine_commit_info_data = commit_info.data;
let engine_commit_info_schema = commit_info.schema;
// expression to select all the columns
let mut commit_info_expr = vec![Expression::literal("v0.3.1")];
commit_info_expr.extend(
engine_commit_info_schema
.fields()
.map(|f| Expression::column(f.name()))
.collect::<Vec<_>>(),
);
let commit_info_expr = Expression::Struct(vec![
Expression::Literal(Scalar::Null(
action_schema_ref.field(ADD_NAME).unwrap().data_type().clone(),
)),
Expression::Literal(Scalar::Null(
action_schema_ref
.field(REMOVE_NAME)
.unwrap()
.data_type()
.clone(),
)),
Expression::Literal(Scalar::Null(
action_schema_ref
.field(METADATA_NAME)
.unwrap()
.data_type()
.clone(),
)),
Expression::Literal(Scalar::Null(
action_schema_ref
.field(PROTOCOL_NAME)
.unwrap()
.data_type()
.clone(),
)),
Expression::Literal(Scalar::Null(
action_schema_ref
.field(TRANSACTION_NAME)
.unwrap()
.data_type()
.clone(),
)),
Expression::Struct(commit_info_expr),
]);

// commit info has arbitrary schema ex: {engineInfo: string, operation: string}
// we want to bundle it up and put it in the commit_info field of the actions.
let commit_info_evaluator = engine.get_expression_handler().get_evaluator(
engine_commit_info_schema.into(),
commit_info_expr,
action_schema_ref.clone().into(),
);

commit_info_evaluator
.evaluate(engine_commit_info_data.as_ref())
.unwrap()
});
Ok((Box::new(actions), action_schema))
}

0 comments on commit f1dcda9

Please sign in to comment.