Skip to content

Commit

Permalink
chore: refactor JsonWriter to utilize the DeltaTable configuration more
Browse files Browse the repository at this point in the history
The JsonWriter was created before a lot of other code and was in a need
of a little refactor. The writer does not commit to the Delta table on
its own, which can be a benefit for some performance specific use-cases.
This change does however enforce that it must be initialized with a
valid Delta table path which will ensure it can use table configuration
properly

Signed-off-by: R. Tyler Croy <[email protected]>
  • Loading branch information
rtyler committed Nov 23, 2024
1 parent 9014e6a commit d82b9b3
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 96 deletions.
1 change: 1 addition & 0 deletions crates/core/src/table/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ macro_rules! table_config {
}

/// Well known delta table configuration
#[derive(Debug)]
pub struct TableConfig<'a>(pub(crate) &'a HashMap<String, Option<String>>);

/// Default num index cols
Expand Down
171 changes: 79 additions & 92 deletions crates/core/src/writer/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use bytes::Bytes;
use delta_kernel::expressions::Scalar;
use indexmap::IndexMap;
use object_store::path::Path;
use object_store::ObjectStore;
use parquet::{
arrow::ArrowWriter, basic::Compression, errors::ParquetError,
file::properties::WriterProperties,
Expand All @@ -26,26 +25,26 @@ use super::utils::{
use super::{DeltaWriter, DeltaWriterError, WriteMode};
use crate::errors::DeltaTableError;
use crate::kernel::{scalars::ScalarExt, Add, PartitionsExt, StructType};
use crate::storage::ObjectStoreRetryExt;
use crate::storage::retry_ext::ObjectStoreRetryExt;
use crate::table::builder::DeltaTableBuilder;
use crate::table::config::DEFAULT_NUM_INDEX_COLS;
use crate::writer::utils::ShareableBuffer;
use crate::DeltaTable;

type BadValue = (Value, ParquetError);

/// Writes messages to a delta lake table.
#[derive(Debug)]
pub struct JsonWriter {
storage: Arc<dyn ObjectStore>,
arrow_schema_ref: Arc<arrow_schema::Schema>,
table: DeltaTable,
/// Optional schema to use, otherwise try to rely on the schema from the [DeltaTable]
schema_ref: Option<ArrowSchemaRef>,
writer_properties: WriterProperties,
partition_columns: Vec<String>,
arrow_writers: HashMap<String, DataArrowWriter>,
num_indexed_cols: i32,
stats_columns: Option<Vec<String>>,
}

/// Writes messages to an underlying arrow buffer.
#[derive(Debug)]
pub(crate) struct DataArrowWriter {
arrow_schema: Arc<ArrowSchema>,
writer_properties: WriterProperties,
Expand Down Expand Up @@ -183,78 +182,49 @@ impl DataArrowWriter {

impl JsonWriter {
/// Create a new JsonWriter instance
pub fn try_new(
pub async fn try_new(
table_uri: String,
schema: ArrowSchemaRef,
schema_ref: ArrowSchemaRef,
partition_columns: Option<Vec<String>>,
storage_options: Option<HashMap<String, String>>,
) -> Result<Self, DeltaTableError> {
let delta_table = DeltaTableBuilder::from_uri(table_uri)
let table = DeltaTableBuilder::from_uri(table_uri)
.with_storage_options(storage_options.unwrap_or_default())
.build()?;
.load()
.await?;
// Initialize writer properties for the underlying arrow writer
let writer_properties = WriterProperties::builder()
// NOTE: Consider extracting config for writer properties and setting more than just compression
.set_compression(Compression::SNAPPY)
.build();

// if metadata fails to load, use an empty hashmap and default values for num_indexed_cols and stats_columns
let configuration: HashMap<String, Option<String>> = delta_table.metadata().map_or_else(
|_| HashMap::new(),
|metadata| metadata.configuration.clone(),
);

Ok(Self {
storage: delta_table.object_store(),
arrow_schema_ref: schema,
table,
schema_ref: Some(schema_ref),
writer_properties,
partition_columns: partition_columns.unwrap_or_default(),
arrow_writers: HashMap::new(),
num_indexed_cols: configuration
.get("delta.dataSkippingNumIndexedCols")
.and_then(|v| v.clone().map(|v| v.parse::<i32>().unwrap()))
.unwrap_or(DEFAULT_NUM_INDEX_COLS),
stats_columns: configuration
.get("delta.dataSkippingStatsColumns")
.and_then(|v| {
v.as_ref()
.map(|v| v.split(',').map(|s| s.to_string()).collect())
}),
})
}

/// Creates a JsonWriter to write to the given table
pub fn for_table(table: &DeltaTable) -> Result<JsonWriter, DeltaTableError> {
// Initialize an arrow schema ref from the delta table schema
let metadata = table.metadata()?;
let arrow_schema = <ArrowSchema as TryFrom<&StructType>>::try_from(&metadata.schema()?)?;
let arrow_schema_ref = Arc::new(arrow_schema);
let partition_columns = metadata.partition_columns.clone();

// Initialize writer properties for the underlying arrow writer
let writer_properties = WriterProperties::builder()
// NOTE: Consider extracting config for writer properties and setting more than just compression
.set_compression(Compression::SNAPPY)
.build();
let configuration: HashMap<String, Option<String>> =
table.metadata()?.configuration.clone();

Ok(Self {
storage: table.object_store(),
arrow_schema_ref,
table: table.clone(),
writer_properties,
partition_columns,
schema_ref: None,
arrow_writers: HashMap::new(),
num_indexed_cols: configuration
.get("delta.dataSkippingNumIndexedCols")
.and_then(|v| v.clone().map(|v| v.parse::<i32>().unwrap()))
.unwrap_or(DEFAULT_NUM_INDEX_COLS),
stats_columns: configuration
.get("delta.dataSkippingStatsColumns")
.and_then(|v| {
v.as_ref()
.map(|v| v.split(',').map(|s| s.to_string()).collect())
}),
})
}

Expand All @@ -277,10 +247,20 @@ impl JsonWriter {
self.arrow_writers.clear();
}

/// Returns the arrow schema representation of the delta table schema defined for the wrapped
/// Returns the user-defined arrow schema representation or the schema defined for the wrapped
/// table.
///
pub fn arrow_schema(&self) -> Arc<arrow::datatypes::Schema> {
self.arrow_schema_ref.clone()
if let Some(schema_ref) = self.schema_ref.as_ref() {
return schema_ref.clone();
}
let schema = self
.table
.schema()
.expect("Failed to unwrap schema for table");
<ArrowSchema as TryFrom<&StructType>>::try_from(schema)
.expect("Failed to coerce delta schema to arrow")
.into()
}

fn divide_by_partition_values(
Expand Down Expand Up @@ -378,7 +358,11 @@ impl DeltaWriter<Vec<Value>> for JsonWriter {
Ok(())
}

/// Writes the existing parquet bytes to storage and resets internal state to handle another file.
/// Writes the existing parquet bytes to storage and resets internal state to handle another
/// file.
///
/// This function returns the [Add] actions which should be committed to the [DeltaTable] for
/// the written data files
async fn flush(&mut self) -> Result<Vec<Add>, DeltaTableError> {
let writers = std::mem::take(&mut self.arrow_writers);
let mut actions = Vec::new();
Expand All @@ -392,17 +376,20 @@ impl DeltaWriter<Vec<Value>> for JsonWriter {
let path = next_data_path(&prefix, 0, &uuid, &writer.writer_properties);
let obj_bytes = Bytes::from(writer.buffer.to_vec());
let file_size = obj_bytes.len() as i64;
self.storage
self.table
.object_store()
.put_with_retries(&path, obj_bytes.into(), 15)
.await?;

let table_config = self.table.snapshot()?.table_config();

actions.push(create_add(
&writer.partition_values,
path.to_string(),
file_size,
&metadata,
self.num_indexed_cols,
&self.stats_columns,
table_config.num_indexed_cols(),
&table_config.stats_columns(),
)?);
}
Ok(actions)
Expand Down Expand Up @@ -464,35 +451,49 @@ fn extract_partition_values(

#[cfg(test)]
mod tests {
use super::*;

use arrow_schema::ArrowError;
use parquet::file::reader::FileReader;
use parquet::file::serialized_reader::SerializedFileReader;
use std::fs::File;
use std::sync::Arc;

use super::*;
use crate::arrow::array::Int32Array;
use crate::arrow::datatypes::{
DataType as ArrowDataType, Field as ArrowField, Schema as ArrowSchema,
};
use crate::arrow::datatypes::{DataType as ArrowDataType, Field as ArrowField};
use crate::kernel::DataType;
use crate::operations::create::CreateBuilder;
use crate::writer::test_utils::get_delta_schema;
use crate::writer::DeltaWriter;
use crate::writer::JsonWriter;

#[tokio::test]
async fn test_partition_not_written_to_parquet() {
let table_dir = tempfile::tempdir().unwrap();
/// Generate a simple test table which has been pre-created at version 0
async fn get_test_table(table_dir: &tempfile::TempDir) -> DeltaTable {
let schema = get_delta_schema();
let path = table_dir.path().to_str().unwrap().to_string();

let arrow_schema = <ArrowSchema as TryFrom<&StructType>>::try_from(&schema).unwrap();
let mut table = CreateBuilder::new()
.with_location(&path)
.with_table_name("test-table")
.with_comment("A table for running tests")
.with_columns(schema.fields().cloned())
.await
.unwrap();
table.load().await.expect("Failed to load table");
assert_eq!(table.version(), 0);
table
}

#[tokio::test]
async fn test_partition_not_written_to_parquet() {
let table_dir = tempfile::tempdir().unwrap();
let table = get_test_table(&table_dir).await;
let schema = table.schema().unwrap();
let arrow_schema = <ArrowSchema as TryFrom<&StructType>>::try_from(schema).unwrap();
let mut writer = JsonWriter::try_new(
path.clone(),
table.table_uri(),
Arc::new(arrow_schema),
Some(vec!["modified".to_string()]),
None,
)
.await
.unwrap();

let data = serde_json::json!(
Expand Down Expand Up @@ -564,16 +565,17 @@ mod tests {
#[tokio::test]
async fn test_parsing_error() {
let table_dir = tempfile::tempdir().unwrap();
let schema = get_delta_schema();
let path = table_dir.path().to_str().unwrap().to_string();
let table = get_test_table(&table_dir).await;

let arrow_schema = <ArrowSchema as TryFrom<&StructType>>::try_from(&schema).unwrap();
let arrow_schema =
<ArrowSchema as TryFrom<&StructType>>::try_from(table.schema().unwrap()).unwrap();
let mut writer = JsonWriter::try_new(
path.clone(),
table.table_uri(),
Arc::new(arrow_schema),
Some(vec!["modified".to_string()]),
None,
)
.await
.unwrap();

let data = serde_json::json!(
Expand Down Expand Up @@ -601,16 +603,17 @@ mod tests {
#[tokio::test]
async fn test_json_write_mismatched_values() {
let table_dir = tempfile::tempdir().unwrap();
let schema = get_delta_schema();
let path = table_dir.path().to_str().unwrap().to_string();
let table = get_test_table(&table_dir).await;

let arrow_schema = <ArrowSchema as TryFrom<&StructType>>::try_from(&schema).unwrap();
let arrow_schema =
<ArrowSchema as TryFrom<&StructType>>::try_from(table.schema().unwrap()).unwrap();
let mut writer = JsonWriter::try_new(
path.clone(),
table.table_uri(),
Arc::new(arrow_schema),
Some(vec!["modified".to_string()]),
None,
)
.await
.unwrap();

let data = serde_json::json!(
Expand Down Expand Up @@ -639,28 +642,18 @@ mod tests {

#[tokio::test]
async fn test_json_write_mismatched_schema() {
use crate::operations::create::CreateBuilder;
let table_dir = tempfile::tempdir().unwrap();
let schema = get_delta_schema();
let path = table_dir.path().to_str().unwrap().to_string();

let mut table = CreateBuilder::new()
.with_location(&path)
.with_table_name("test-table")
.with_comment("A table for running tests")
.with_columns(schema.fields().cloned())
.await
.unwrap();
table.load().await.expect("Failed to load table");
assert_eq!(table.version(), 0);
let mut table = get_test_table(&table_dir).await;

let arrow_schema = <ArrowSchema as TryFrom<&StructType>>::try_from(&schema).unwrap();
let schema = table.schema().unwrap();
let arrow_schema = <ArrowSchema as TryFrom<&StructType>>::try_from(schema).unwrap();
let mut writer = JsonWriter::try_new(
path.clone(),
table.table_uri(),
Arc::new(arrow_schema),
Some(vec!["modified".to_string()]),
None,
)
.await
.unwrap();

let data = serde_json::json!(
Expand Down Expand Up @@ -691,7 +684,6 @@ mod tests {

#[tokio::test]
async fn test_json_write_checkpoint() {
use crate::operations::create::CreateBuilder;
use std::fs;

let table_dir = tempfile::tempdir().unwrap();
Expand Down Expand Up @@ -740,8 +732,6 @@ mod tests {

#[tokio::test]
async fn test_json_write_data_skipping_stats_columns() {
use crate::operations::create::CreateBuilder;

let table_dir = tempfile::tempdir().unwrap();
let schema = get_delta_schema();
let path = table_dir.path().to_str().unwrap().to_string();
Expand Down Expand Up @@ -790,8 +780,6 @@ mod tests {

#[tokio::test]
async fn test_json_write_data_skipping_num_indexed_cols() {
use crate::operations::create::CreateBuilder;

let table_dir = tempfile::tempdir().unwrap();
let schema = get_delta_schema();
let path = table_dir.path().to_str().unwrap().to_string();
Expand Down Expand Up @@ -837,5 +825,4 @@ mod tests {
.unwrap()
);
}

}
Loading

0 comments on commit d82b9b3

Please sign in to comment.