Skip to content

Commit

Permalink
make object-store file layout configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
JanKaul committed Oct 3, 2024
1 parent d72e557 commit 1e7417d
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 21 deletions.
6 changes: 6 additions & 0 deletions iceberg-rust-spec/src/spec/table_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ pub static MAIN_BRANCH: &str = "main";
static DEFAULT_SORT_ORDER_ID: i32 = 0;
static DEFAULT_SPEC_ID: i32 = 0;

// Properties

pub const WRITE_PARQUET_COMPRESSION_CODEC: &str = "write.parquet.compression-codec";
pub const WRITE_PARQUET_COMPRESSION_LEVEL: &str = "write.parquet.compression-level";
pub const WRITE_OBJECT_STORAGE_ENABLED: &str = "write.object-storage.enabled";

pub use _serde::{TableMetadataV1, TableMetadataV2};

use _serde::TableMetadataEnum;
Expand Down
57 changes: 38 additions & 19 deletions iceberg-rust/src/arrow/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use iceberg_rust_spec::{
manifest::DataFile, partition::PartitionSpec, schema::Schema,
table_metadata::TableMetadata, values::Value,
},
table_metadata::WRITE_OBJECT_STORAGE_ENABLED,
util::strip_prefix,
};
use parquet::{
Expand Down Expand Up @@ -53,12 +54,21 @@ pub async fn write_parquet_partitioned(
let (mut sender, reciever) = unbounded();

if partition_spec.fields().is_empty() {
let partition_path = if metadata
.properties
.get(WRITE_OBJECT_STORAGE_ENABLED)
.is_some_and(|x| x == "true")
{
Some("".to_owned())
} else {
None
};
let files = write_parquet_files(
data_location,
schema,
&arrow_schema,
partition_spec,
&Vec::new(),
partition_path,
batches,
object_store.clone(),
)
Expand All @@ -74,12 +84,21 @@ pub async fn write_parquet_partitioned(
let object_store = object_store.clone();
let mut sender = sender.clone();
async move {
let partition_path = if metadata
.properties
.get(WRITE_OBJECT_STORAGE_ENABLED)
.is_some_and(|x| x == "true")
{
Some(generate_partition_path(partition_spec, &partition_values)?)
} else {
None
};
let files = write_parquet_files(
data_location,
schema,
&arrow_schema,
partition_spec,
&partition_values,
partition_path,
batches,
object_store.clone(),
)
Expand Down Expand Up @@ -111,15 +130,15 @@ async fn write_parquet_files(
schema: &Schema,
arrow_schema: &ArrowSchema,
partition_spec: &PartitionSpec,
partiton_values: &[Value],
partition_path: Option<String>,
batches: impl Stream<Item = Result<RecordBatch, ArrowError>> + Send,
object_store: Arc<dyn ObjectStore>,
) -> Result<Vec<DataFile>, ArrowError> {
let bucket = Bucket::from_path(data_location)?;
let current_writer = Arc::new(Mutex::new(
create_arrow_writer(
data_location,
&generate_partition_path(partition_spec, partiton_values)?,
partition_path.clone(),
arrow_schema,
object_store.clone(),
)
Expand All @@ -136,6 +155,7 @@ async fn write_parquet_files(
let mut writer_sender = writer_sender.clone();
let num_bytes = num_bytes.clone();
let object_store = object_store.clone();
let partition_path = partition_path.clone();
async move {
let mut current_writer = current_writer.lock().await;
let current =
Expand All @@ -151,7 +171,7 @@ async fn write_parquet_files(
&mut *current_writer,
create_arrow_writer(
data_location,
&generate_partition_path(partition_spec, partiton_values)?,
partition_path,
arrow_schema,
object_store,
)
Expand Down Expand Up @@ -218,7 +238,7 @@ fn generate_partition_path(

async fn create_arrow_writer(
data_location: &str,
partition_path: &str,
partition_path: Option<String>,
schema: &arrow::datatypes::Schema,
object_store: Arc<dyn ObjectStore>,
) -> Result<(String, AsyncArrowWriter<BufWriter>), ArrowError> {
Expand All @@ -227,19 +247,18 @@ async fn create_arrow_writer(
.map_err(|err| ArrowError::ExternalError(Box::new(err)))
.unwrap();

let prefix = rand[0..3]
.iter()
.fold(String::with_capacity(8), |mut acc, x| {
write!(&mut acc, "{:02x}", x).unwrap();
acc
});

let parquet_path = strip_prefix(data_location)
+ &prefix
+ "/"
+ partition_path
+ &Uuid::now_v1(&rand).to_string()
+ ".parquet";
let path = partition_path.unwrap_or_else(|| {
rand[0..3]
.iter()
.fold(String::with_capacity(8), |mut acc, x| {
write!(&mut acc, "{:x}", x).unwrap();
acc
})
+ "/"
});

let parquet_path =
strip_prefix(data_location) + &path + &Uuid::now_v1(&rand).to_string() + ".parquet";

let writer = BufWriter::new(object_store.clone(), parquet_path.clone().into());

Expand Down
9 changes: 7 additions & 2 deletions iceberg-rust/src/catalog/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ use iceberg_rust_spec::{
table_metadata::TableMetadata,
view_metadata::{Version, ViewMetadata, DEFAULT_VERSION_ID},
},
table_metadata::{
WRITE_OBJECT_STORAGE_ENABLED, WRITE_PARQUET_COMPRESSION_CODEC,
WRITE_PARQUET_COMPRESSION_LEVEL,
},
view_metadata::Materialization,
};
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -78,10 +82,11 @@ impl CreateTableBuilder {

let create = self
.with_property((
"write.parquet.compression-codec".to_owned(),
WRITE_PARQUET_COMPRESSION_CODEC.to_owned(),
"zstd".to_owned(),
))
.with_property(("write.parquet.compression-level".to_owned(), 1.to_string()))
.with_property((WRITE_PARQUET_COMPRESSION_LEVEL.to_owned(), 1.to_string()))
.with_property((WRITE_OBJECT_STORAGE_ENABLED.to_owned(), "true".to_owned()))
.create()?;

// Register table in catalog
Expand Down

0 comments on commit 1e7417d

Please sign in to comment.