Skip to content

Commit

Permalink
Fix JsonWriter and RecordBatchWriter to respect stats skipping
Browse files Browse the repository at this point in the history
This is an update to JsonWriter and RecordBatchWriter to allow them to
write commit log stats information in accordance with
delta.dataSkippingNumIndexedCols and
delta.dataSkippingStatsColumns if present on the table.  If these fields
are unset, then the default behavior of collecting stats for the first 32
columns is preserved

Signed-off-by: Justin Jossick <[email protected]>
  • Loading branch information
jjossick authored and rtyler committed Nov 23, 2024
1 parent 68d57ef commit 9014e6a
Show file tree
Hide file tree
Showing 3 changed files with 268 additions and 14 deletions.
142 changes: 136 additions & 6 deletions crates/core/src/writer/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ pub struct JsonWriter {
writer_properties: WriterProperties,
partition_columns: Vec<String>,
arrow_writers: HashMap<String, DataArrowWriter>,
num_indexed_cols: i32,
stats_columns: Option<Vec<String>>,
}

/// Writes messages to an underlying arrow buffer.
Expand Down Expand Up @@ -187,22 +189,37 @@ impl JsonWriter {
partition_columns: Option<Vec<String>>,
storage_options: Option<HashMap<String, String>>,
) -> Result<Self, DeltaTableError> {
let storage = DeltaTableBuilder::from_uri(table_uri)
let delta_table = DeltaTableBuilder::from_uri(table_uri)
.with_storage_options(storage_options.unwrap_or_default())
.build_storage()?;

.build()?;
// Initialize writer properties for the underlying arrow writer
let writer_properties = WriterProperties::builder()
// NOTE: Consider extracting config for writer properties and setting more than just compression
.set_compression(Compression::SNAPPY)
.build();

// if metadata fails to load, use an empty hashmap and default values for num_indexed_cols and stats_columns
let configuration: HashMap<String, Option<String>> = delta_table.metadata().map_or_else(
|_| HashMap::new(),
|metadata| metadata.configuration.clone(),
);

Ok(Self {
storage: storage.object_store(),
storage: delta_table.object_store(),
arrow_schema_ref: schema,
writer_properties,
partition_columns: partition_columns.unwrap_or_default(),
arrow_writers: HashMap::new(),
num_indexed_cols: configuration
.get("delta.dataSkippingNumIndexedCols")
.and_then(|v| v.clone().map(|v| v.parse::<i32>().unwrap()))
.unwrap_or(DEFAULT_NUM_INDEX_COLS),
stats_columns: configuration
.get("delta.dataSkippingStatsColumns")
.and_then(|v| {
v.as_ref()
.map(|v| v.split(',').map(|s| s.to_string()).collect())
}),
})
}

Expand All @@ -219,13 +236,25 @@ impl JsonWriter {
// NOTE: Consider extracting config for writer properties and setting more than just compression
.set_compression(Compression::SNAPPY)
.build();
let configuration: HashMap<String, Option<String>> =
table.metadata()?.configuration.clone();

Ok(Self {
storage: table.object_store(),
arrow_schema_ref,
writer_properties,
partition_columns,
arrow_writers: HashMap::new(),
num_indexed_cols: configuration
.get("delta.dataSkippingNumIndexedCols")
.and_then(|v| v.clone().map(|v| v.parse::<i32>().unwrap()))
.unwrap_or(DEFAULT_NUM_INDEX_COLS),
stats_columns: configuration
.get("delta.dataSkippingStatsColumns")
.and_then(|v| {
v.as_ref()
.map(|v| v.split(',').map(|s| s.to_string()).collect())
}),
})
}

Expand Down Expand Up @@ -372,8 +401,8 @@ impl DeltaWriter<Vec<Value>> for JsonWriter {
path.to_string(),
file_size,
&metadata,
DEFAULT_NUM_INDEX_COLS,
&None,
self.num_indexed_cols,
&self.stats_columns,
)?);
}
Ok(actions)
Expand Down Expand Up @@ -708,4 +737,105 @@ mod tests {
.collect();
assert_eq!(entries.len(), 1);
}

#[tokio::test]
async fn test_json_write_data_skipping_stats_columns() {
use crate::operations::create::CreateBuilder;

let table_dir = tempfile::tempdir().unwrap();
let schema = get_delta_schema();
let path = table_dir.path().to_str().unwrap().to_string();
let config: HashMap<String, Option<String>> = vec![(
"delta.dataSkippingStatsColumns".to_string(),
Some("id,value".to_string()),
)]
.into_iter()
.collect();
let mut table = CreateBuilder::new()
.with_location(&path)
.with_table_name("test-table")
.with_comment("A table for running tests")
.with_columns(schema.fields().cloned())
.with_configuration(config)
.await
.unwrap();
assert_eq!(table.version(), 0);
let mut writer = JsonWriter::for_table(&table).unwrap();
let data = serde_json::json!(
{
"id" : "A",
"value": 42,
"modified": "2021-02-01"
}
);

writer.write(vec![data]).await.unwrap();
writer.flush_and_commit(&mut table).await.unwrap();
assert_eq!(table.version(), 1);
let add_actions = table.state.unwrap().file_actions().unwrap();
assert_eq!(add_actions.len(), 1);
let expected_stats = "{\"numRecords\":1,\"minValues\":{\"id\":\"A\",\"value\":42},\"maxValues\":{\"id\":\"A\",\"value\":42},\"nullCount\":{\"id\":0,\"value\":0}}";
assert_eq!(
expected_stats.parse::<serde_json::Value>().unwrap(),
add_actions
.into_iter()
.nth(0)
.unwrap()
.stats
.unwrap()
.parse::<serde_json::Value>()
.unwrap()
);
}

#[tokio::test]
async fn test_json_write_data_skipping_num_indexed_cols() {
use crate::operations::create::CreateBuilder;

let table_dir = tempfile::tempdir().unwrap();
let schema = get_delta_schema();
let path = table_dir.path().to_str().unwrap().to_string();
let config: HashMap<String, Option<String>> = vec![(
"delta.dataSkippingNumIndexedCols".to_string(),
Some("1".to_string()),
)]
.into_iter()
.collect();
let mut table = CreateBuilder::new()
.with_location(&path)
.with_table_name("test-table")
.with_comment("A table for running tests")
.with_columns(schema.fields().cloned())
.with_configuration(config)
.await
.unwrap();
assert_eq!(table.version(), 0);
let mut writer = JsonWriter::for_table(&table).unwrap();
let data = serde_json::json!(
{
"id" : "A",
"value": 42,
"modified": "2021-02-01"
}
);

writer.write(vec![data]).await.unwrap();
writer.flush_and_commit(&mut table).await.unwrap();
assert_eq!(table.version(), 1);
let add_actions = table.state.unwrap().file_actions().unwrap();
assert_eq!(add_actions.len(), 1);
let expected_stats = "{\"numRecords\":1,\"minValues\":{\"id\":\"A\"},\"maxValues\":{\"id\":\"A\"},\"nullCount\":{\"id\":0}}";
assert_eq!(
expected_stats.parse::<serde_json::Value>().unwrap(),
add_actions
.into_iter()
.nth(0)
.unwrap()
.stats
.unwrap()
.parse::<serde_json::Value>()
.unwrap()
);
}

}
138 changes: 131 additions & 7 deletions crates/core/src/writer/record_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ pub struct RecordBatchWriter {
should_evolve: bool,
partition_columns: Vec<String>,
arrow_writers: HashMap<String, PartitionWriter>,
num_indexed_cols: i32,
stats_columns: Option<Vec<String>>,
}

impl std::fmt::Debug for RecordBatchWriter {
Expand All @@ -60,25 +62,39 @@ impl RecordBatchWriter {
partition_columns: Option<Vec<String>>,
storage_options: Option<HashMap<String, String>>,
) -> Result<Self, DeltaTableError> {
let storage = DeltaTableBuilder::from_uri(table_uri)
let delta_table = DeltaTableBuilder::from_uri(table_uri)
.with_storage_options(storage_options.unwrap_or_default())
.build_storage()?
.object_store();

.build()?;
// Initialize writer properties for the underlying arrow writer
let writer_properties = WriterProperties::builder()
// NOTE: Consider extracting config for writer properties and setting more than just compression
.set_compression(Compression::SNAPPY)
.build();

// if metadata fails to load, use an empty hashmap and default values for num_indexed_cols and stats_columns
let configuration: HashMap<String, Option<String>> = delta_table.metadata().map_or_else(
|_| HashMap::new(),
|metadata| metadata.configuration.clone(),
);

Ok(Self {
storage,
storage: delta_table.object_store(),
arrow_schema_ref: schema.clone(),
original_schema_ref: schema,
writer_properties,
partition_columns: partition_columns.unwrap_or_default(),
should_evolve: false,
arrow_writers: HashMap::new(),
num_indexed_cols: configuration
.get("delta.dataSkippingNumIndexedCols")
.and_then(|v| v.clone().map(|v| v.parse::<i32>().unwrap()))
.unwrap_or(DEFAULT_NUM_INDEX_COLS),
stats_columns: configuration
.get("delta.dataSkippingStatsColumns")
.and_then(|v| {
v.as_ref()
.map(|v| v.split(',').map(|s| s.to_string()).collect())
}),
})
}

Expand All @@ -96,6 +112,8 @@ impl RecordBatchWriter {
// NOTE: Consider extracting config for writer properties and setting more than just compression
.set_compression(Compression::SNAPPY)
.build();
let configuration: HashMap<String, Option<String>> =
table.metadata()?.configuration.clone();

Ok(Self {
storage: table.object_store(),
Expand All @@ -105,6 +123,16 @@ impl RecordBatchWriter {
partition_columns,
should_evolve: false,
arrow_writers: HashMap::new(),
num_indexed_cols: configuration
.get("delta.dataSkippingNumIndexedCols")
.and_then(|v| v.clone().map(|v| v.parse::<i32>().unwrap()))
.unwrap_or(DEFAULT_NUM_INDEX_COLS),
stats_columns: configuration
.get("delta.dataSkippingStatsColumns")
.and_then(|v| {
v.as_ref()
.map(|v| v.split(',').map(|s| s.to_string()).collect())
}),
})
}

Expand Down Expand Up @@ -233,8 +261,8 @@ impl DeltaWriter<RecordBatch> for RecordBatchWriter {
path.to_string(),
file_size,
&metadata,
DEFAULT_NUM_INDEX_COLS,
&None,
self.num_indexed_cols,
&self.stats_columns,
)?);
}
Ok(actions)
Expand Down Expand Up @@ -985,4 +1013,100 @@ mod tests {
);
}
}

#[tokio::test]
async fn test_write_data_skipping_stats_columns() {
let batch = get_record_batch(None, false);
let partition_cols: &[String] = &vec![];
let table_schema: StructType = get_delta_schema();
let table_dir = tempfile::tempdir().unwrap();
let table_path = table_dir.path();
let config: HashMap<String, Option<String>> = vec![(
"delta.dataSkippingStatsColumns".to_string(),
Some("id,value".to_string()),
)]
.into_iter()
.collect();

let mut table = CreateBuilder::new()
.with_location(table_path.to_str().unwrap())
.with_table_name("test-table")
.with_comment("A table for running tests")
.with_columns(table_schema.fields().cloned())
.with_configuration(config)
.with_partition_columns(partition_cols)
.await
.unwrap();

let mut writer = RecordBatchWriter::for_table(&table).unwrap();
let partitions = writer.divide_by_partition_values(&batch).unwrap();

assert_eq!(partitions.len(), 1);
assert_eq!(partitions[0].record_batch, batch);
writer.write(batch).await.unwrap();
writer.flush_and_commit(&mut table).await.unwrap();
assert_eq!(table.version(), 1);
let add_actions = table.state.unwrap().file_actions().unwrap();
assert_eq!(add_actions.len(), 1);
let expected_stats ="{\"numRecords\":11,\"minValues\":{\"value\":1,\"id\":\"A\"},\"maxValues\":{\"id\":\"B\",\"value\":11},\"nullCount\":{\"id\":0,\"value\":0}}";
assert_eq!(
expected_stats.parse::<serde_json::Value>().unwrap(),
add_actions
.into_iter()
.nth(0)
.unwrap()
.stats
.unwrap()
.parse::<serde_json::Value>()
.unwrap()
);
}

#[tokio::test]
async fn test_write_data_skipping_num_indexed_colsn() {
let batch = get_record_batch(None, false);
let partition_cols: &[String] = &vec![];
let table_schema: StructType = get_delta_schema();
let table_dir = tempfile::tempdir().unwrap();
let table_path = table_dir.path();
let config: HashMap<String, Option<String>> = vec![(
"delta.dataSkippingNumIndexedCols".to_string(),
Some("1".to_string()),
)]
.into_iter()
.collect();

let mut table = CreateBuilder::new()
.with_location(table_path.to_str().unwrap())
.with_table_name("test-table")
.with_comment("A table for running tests")
.with_columns(table_schema.fields().cloned())
.with_configuration(config)
.with_partition_columns(partition_cols)
.await
.unwrap();

let mut writer = RecordBatchWriter::for_table(&table).unwrap();
let partitions = writer.divide_by_partition_values(&batch).unwrap();

assert_eq!(partitions.len(), 1);
assert_eq!(partitions[0].record_batch, batch);
writer.write(batch).await.unwrap();
writer.flush_and_commit(&mut table).await.unwrap();
assert_eq!(table.version(), 1);
let add_actions = table.state.unwrap().file_actions().unwrap();
assert_eq!(add_actions.len(), 1);
let expected_stats = "{\"numRecords\":11,\"minValues\":{\"id\":\"A\"},\"maxValues\":{\"id\":\"B\"},\"nullCount\":{\"id\":0}}";
assert_eq!(
expected_stats.parse::<serde_json::Value>().unwrap(),
add_actions
.into_iter()
.nth(0)
.unwrap()
.stats
.unwrap()
.parse::<serde_json::Value>()
.unwrap()
);
}
}
2 changes: 1 addition & 1 deletion crates/core/src/writer/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ pub fn create_bare_table() -> DeltaTable {
}

pub async fn create_initialized_table(partition_cols: &[String]) -> DeltaTable {
let table_schema = get_delta_schema();
let table_schema: StructType = get_delta_schema();
let table_dir = tempfile::tempdir().unwrap();
let table_path = table_dir.path();

Expand Down

0 comments on commit 9014e6a

Please sign in to comment.