Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve append #46

Merged
merged 33 commits into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
95e59d4
find manifest
JanKaul Oct 21, 2024
79a2b92
fix compiler errors
JanKaul Oct 23, 2024
145c5bc
fix merge issues
JanKaul Oct 23, 2024
78f588f
improve rewrite operation
JanKaul Oct 24, 2024
24ab564
fix clippy warnings
JanKaul Oct 24, 2024
2f8b7d2
fix out of bound bug in sub
JanKaul Oct 24, 2024
ec1aefc
fix_cmp _dist bug
JanKaul Oct 24, 2024
9a57a75
add tests for contains
JanKaul Oct 24, 2024
71c3e93
tests for cmp_with_priority
JanKaul Oct 24, 2024
3b14ef9
expand with node test
JanKaul Oct 24, 2024
b09e813
test expand rectangle
JanKaul Oct 24, 2024
db15ad8
remove generic from rectangle
JanKaul Oct 24, 2024
7725fde
fix partition column type
JanKaul Oct 25, 2024
037b982
fix public function
JanKaul Oct 25, 2024
98d497d
fix identity
JanKaul Oct 25, 2024
5f07a9a
rename struct_to_smallvec
JanKaul Oct 25, 2024
023d547
add documentation
JanKaul Oct 25, 2024
e050686
implement sub for string
JanKaul Oct 25, 2024
dc7a29d
clippy fixes
JanKaul Oct 25, 2024
e715e2c
add documentation
JanKaul Oct 27, 2024
49cb036
expand documentation
JanKaul Oct 28, 2024
12af6d7
remove public from split_datafiles_once
JanKaul Oct 29, 2024
76de2f8
use normal vec for split_datafiles
JanKaul Oct 29, 2024
93b768d
remove tryadd
JanKaul Oct 29, 2024
e677ec6
implement trysub for uuid and fixed
JanKaul Oct 29, 2024
97927a5
add assert for splitting manifest
JanKaul Oct 29, 2024
b5ca169
select manifest
JanKaul Oct 29, 2024
7b52090
create new manifest-writer
JanKaul Oct 30, 2024
78596fa
refactor manifest-writer
JanKaul Oct 30, 2024
4cc2bff
fix clippy warnings
JanKaul Oct 30, 2024
f3532f9
Refactor and fix select_manifest
rdettai Oct 31, 2024
12de20a
Refactor n_split computation
rdettai Oct 31, 2024
7861b35
Merge pull request #47 from rdettai/improve-append
JanKaul Oct 31, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 5 additions & 7 deletions iceberg-file-catalog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ impl Catalog for FileCatalog {
.map_err(IcebergError::from)
.map_ok(|x| {
let path = x.location.as_ref();
self.identifier(&path)
self.identifier(path)
})
.try_collect()
.await
Expand Down Expand Up @@ -501,15 +501,15 @@ impl Catalog for FileCatalog {

impl FileCatalog {
fn namespace_path(&self, namespace: &str) -> String {
self.path.as_str().trim_end_matches('/').to_owned() + "/" + &self.name + "/" + &namespace
self.path.as_str().trim_end_matches('/').to_owned() + "/" + &self.name + "/" + namespace
}

fn tabular_path(&self, namespace: &str, name: &str) -> String {
self.path.as_str().trim_end_matches('/').to_owned()
+ "/"
+ &self.name
+ "/"
+ &namespace
+ namespace
+ "/"
+ name
}
Expand All @@ -523,7 +523,7 @@ impl FileCatalog {
.try_filter(|x| {
future::ready(
x.ends_with("metadata.json")
&& x.starts_with(&(path.clone() + "/v").trim_start_matches('/')),
&& x.starts_with((path.clone() + "/v").trim_start_matches('/')),
)
})
.try_collect()
Expand Down Expand Up @@ -551,9 +551,7 @@ impl FileCatalog {
let parts = path
.trim_start_matches(self.path.trim_start_matches('/'))
.trim_start_matches('/')
.split('/')
.skip(1)
.next()
.split('/').nth(1)
.ok_or(IcebergError::InvalidFormat("Namespace in path".to_owned()))?
.to_owned();
Namespace::try_new(&[parts]).map_err(IcebergError::from)
Expand Down
2 changes: 2 additions & 0 deletions iceberg-rust-spec/src/spec/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,10 @@ pub struct ManifestEntry {
status: Status,
/// Snapshot id where the file was added, or deleted if status is 2.
/// Inherited when null.
#[builder(setter(strip_option), default)]
snapshot_id: Option<i64>,
/// Sequence number when the file was added. Inherited when null.
#[builder(setter(strip_option), default)]
sequence_number: Option<i64>,
/// File path, partition tuple, metrics, …
data_file: DataFile,
Expand Down
6 changes: 3 additions & 3 deletions iceberg-rust-spec/src/spec/manifest_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ impl<'a, 'metadata, R: Read> ManifestListReader<'a, 'metadata, R> {
FormatVersion::V2 => manifest_list_schema_v2(),
};
Ok(Self {
reader: AvroReader::with_schema(&schema, reader)?
reader: AvroReader::with_schema(schema, reader)?
.zip(repeat(table_metadata))
.map(avro_value_to_manifest_file),
})
Expand Down Expand Up @@ -781,7 +781,7 @@ mod tests {

let schema = manifest_list_schema_v2();

let mut writer = apache_avro::Writer::new(&schema, Vec::new());
let mut writer = apache_avro::Writer::new(schema, Vec::new());

writer.append_ser(manifest_file.clone()).unwrap();

Expand Down Expand Up @@ -861,7 +861,7 @@ mod tests {

let schema = manifest_list_schema_v1();

let mut writer = apache_avro::Writer::new(&schema, Vec::new());
let mut writer = apache_avro::Writer::new(schema, Vec::new());

writer.append_ser(manifest_file.clone()).unwrap();

Expand Down
2 changes: 1 addition & 1 deletion iceberg-rust-spec/src/spec/table_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -733,7 +733,7 @@ pub struct SnapshotLog {
pub timestamp_ms: i64,
}

#[derive(Debug, Serialize_repr, Deserialize_repr, PartialEq, Eq, Clone)]
#[derive(Debug, Serialize_repr, Deserialize_repr, PartialEq, Eq, Clone, Copy)]
#[repr(u8)]
/// Iceberg format version
#[derive(Default)]
Expand Down
62 changes: 61 additions & 1 deletion iceberg-rust-spec/src/spec/values.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ use std::{
any::Any,
collections::{btree_map::Keys, BTreeMap, HashMap},
fmt,
hash::Hash,
io::Cursor,
slice::Iter,
};

use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, TimeZone, Utc};
use itertools::Itertools;
use ordered_float::OrderedFloat;
use rust_decimal::Decimal;
use serde::{
Expand Down Expand Up @@ -128,7 +130,7 @@ impl fmt::Display for Value {
/// The partition struct stores the tuple of partition values for each file.
/// Its type is derived from the partition fields of the partition spec used to write the manifest file.
/// In v2, the partition struct’s field ids must match the ids from the partition spec.
#[derive(Debug, Clone, Eq, Hash, PartialOrd, Ord)]
#[derive(Debug, Clone, Eq, PartialOrd, Ord)]
pub struct Struct {
/// Vector to store the field values
pub fields: Vec<Option<Value>>,
Expand Down Expand Up @@ -273,6 +275,15 @@ impl PartialEq for Struct {
}
}

impl Hash for Struct {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
for key in self.keys().sorted() {
key.hash(state);
self.get(&key).hash(state);
}
}
}

impl Value {
/// Perform a partition transformation for the given value
pub fn tranform(&self, transform: &Transform) -> Result<Value, Error> {
Expand Down Expand Up @@ -817,6 +828,55 @@ mod datetime {
}
}

pub trait TryAdd: Sized {
JanKaul marked this conversation as resolved.
Show resolved Hide resolved
fn try_add(&self, other: &Self) -> Result<Self, Error>;
}
pub trait TrySub: Sized {
fn try_sub(&self, other: &Self) -> Result<Self, Error>;
}

impl TryAdd for Value {
fn try_add(&self, other: &Self) -> Result<Self, Error> {
match (self, other) {
(Value::Int(own), Value::Int(other)) => Ok(Value::Int(own + other)),
(Value::LongInt(own), Value::LongInt(other)) => Ok(Value::LongInt(own + other)),
(Value::Float(own), Value::Float(other)) => Ok(Value::Float(*own + *other)),
(Value::Double(own), Value::Double(other)) => Ok(Value::Double(*own + *other)),
(Value::Date(own), Value::Date(other)) => Ok(Value::Date(own + other)),
(Value::Time(own), Value::Time(other)) => Ok(Value::Time(own + other)),
(Value::Timestamp(own), Value::Timestamp(other)) => Ok(Value::Timestamp(own + other)),
(Value::TimestampTZ(own), Value::TimestampTZ(other)) => {
Ok(Value::TimestampTZ(own + other))
}
(x, y) => Err(Error::Type(
x.datatype().to_string(),
y.datatype().to_string(),
)),
}
}
}

impl TrySub for Value {
fn try_sub(&self, other: &Self) -> Result<Self, Error> {
match (self, other) {
(Value::Int(own), Value::Int(other)) => Ok(Value::Int(own - other)),
(Value::LongInt(own), Value::LongInt(other)) => Ok(Value::LongInt(own - other)),
(Value::Float(own), Value::Float(other)) => Ok(Value::Float(*own - *other)),
(Value::Double(own), Value::Double(other)) => Ok(Value::Double(*own - *other)),
(Value::Date(own), Value::Date(other)) => Ok(Value::Date(own - other)),
(Value::Time(own), Value::Time(other)) => Ok(Value::Time(own - other)),
(Value::Timestamp(own), Value::Timestamp(other)) => Ok(Value::Timestamp(own - other)),
(Value::TimestampTZ(own), Value::TimestampTZ(other)) => {
Ok(Value::TimestampTZ(own - other))
}
(x, y) => Err(Error::Type(
x.datatype().to_string(),
y.datatype().to_string(),
)),
JanKaul marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

#[cfg(test)]
mod tests {

Expand Down
1 change: 1 addition & 0 deletions iceberg-rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,5 @@ thrift = { version = "0.17.0", default-features = false }
thiserror = { workspace = true }
derive-getters = { workspace = true }
iceberg-rust-spec = { path = "../iceberg-rust-spec", version = "0.5.8" }
smallvec = { version = "1.13.2", features = ["const_generics"] }
JanKaul marked this conversation as resolved.
Show resolved Hide resolved

1 change: 1 addition & 0 deletions iceberg-rust/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@ pub mod materialized_view;
pub mod spec;
pub mod sql;
pub mod table;
pub(crate) mod util;
pub mod view;
84 changes: 84 additions & 0 deletions iceberg-rust/src/table/transaction/append.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
use std::cmp::Ordering;

use iceberg_rust_spec::{manifest::ManifestEntry, values::Value};
use smallvec::{smallvec, SmallVec};

use crate::{
error::Error,
util::{cmp_dist, struct_to_smallvec, sub, Rectangle},
};

/// Split sets of datafiles depending on their partition_values
#[allow(clippy::type_complexity)]
pub(crate) fn split_datafiles_once(
JanKaul marked this conversation as resolved.
Show resolved Hide resolved
files: impl Iterator<Item = Result<ManifestEntry, Error>>,
rect: Rectangle<Value>,
names: &SmallVec<[&str; 4]>,
) -> Result<[(Vec<ManifestEntry>, Rectangle<Value>); 2], Error> {
let mut smaller = Vec::new();
let mut larger = Vec::new();
let mut smaller_rect = None;
let mut larger_rect = None;

for manifest_entry in files {
let manifest_entry = manifest_entry?;
let position = struct_to_smallvec(manifest_entry.data_file().partition(), names)?;
JanKaul marked this conversation as resolved.
Show resolved Hide resolved
// Check distance to upper and lower bound
if let Ordering::Greater =
cmp_dist(&sub(&position, &rect.min)?, &sub(&rect.max, &position)?)?
{
// if closer to upper bound
larger.push(manifest_entry);

if larger_rect.is_none() {
larger_rect = Some(Rectangle::new(position.clone(), position));
} else if let Some(larger_rect) = larger_rect.as_mut() {
larger_rect.expand_with_node(position);
}
} else {
// if closer to lower bound
smaller.push(manifest_entry);

if smaller_rect.is_none() {
smaller_rect = Some(Rectangle::new(position.clone(), position));
} else if let Some(smaller_rect) = smaller_rect.as_mut() {
smaller_rect.expand_with_node(position);
}
}
}
Ok([
(
smaller,
smaller_rect.ok_or(Error::NotFound("Lower".to_owned(), "rectangle".to_owned()))?,
),
(
larger,
larger_rect.ok_or(Error::NotFound("Upper".to_owned(), "rectangle".to_owned()))?,
),
])
}

pub(crate) fn split_datafiles(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally, this should be unit tested as well.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Creating mocked data for a unit test is quite difficult here. I expanded a datafusion test to cover the manifest splitting case.

files: impl Iterator<Item = Result<ManifestEntry, Error>>,
rect: Rectangle<Value>,
names: &SmallVec<[&str; 4]>,
JanKaul marked this conversation as resolved.
Show resolved Hide resolved
n_split: u32,
) -> Result<SmallVec<[Vec<ManifestEntry>; 2]>, Error> {
JanKaul marked this conversation as resolved.
Show resolved Hide resolved
let [(smaller, smaller_rect), (larger, larger_rect)] =
split_datafiles_once(files, rect, names)?;
if n_split == 1 {
Ok(smallvec![smaller, larger])
} else {
let mut smaller = split_datafiles(
smaller.into_iter().map(Ok),
smaller_rect,
names,
n_split - 1,
)?;
let mut larger =
split_datafiles(larger.into_iter().map(Ok), larger_rect, names, n_split - 1)?;

smaller.append(&mut larger);
Ok(smaller)
}
}
5 changes: 3 additions & 2 deletions iceberg-rust/src/table/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use self::operation::Operation;

use super::delete_files;

pub(crate) mod append;
pub(crate) mod operation;

pub(crate) static APPEND_KEY: &str = "append";
Expand Down Expand Up @@ -55,7 +56,7 @@ impl<'table> TableTransaction<'table> {
self.operations
.entry(APPEND_KEY.to_owned())
.and_modify(|mut x| {
if let Operation::NewAppend {
if let Operation::Append {
branch: _,
files: old,
additional_summary: None,
Expand All @@ -64,7 +65,7 @@ impl<'table> TableTransaction<'table> {
old.extend_from_slice(&files)
}
})
.or_insert(Operation::NewAppend {
.or_insert(Operation::Append {
branch: self.branch.clone(),
files,
additional_summary: None,
Expand Down
Loading
Loading