Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: arrow backed log replay and table state #2037

Merged
merged 32 commits into from
Jan 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
6e5a898
feat: read LogSegment's
roeap Jan 4, 2024
8218634
chore: update kernel types
roeap Jan 5, 2024
49c7ff2
feat: read log commit files
roeap Jan 5, 2024
978c468
feat: basic log replay
roeap Jan 5, 2024
2cdbe54
feat: basic checkpoint file reading
roeap Jan 5, 2024
e01ec53
feat: protocol & metadata query
roeap Jan 5, 2024
14dfc48
feat: add eager snapshot
roeap Jan 6, 2024
51ca3ac
test: reconcile snapshot tests
roeap Jan 6, 2024
e721531
feat: update log segement and eager snapshot
roeap Jan 6, 2024
56272fe
chore: clippy
roeap Jan 6, 2024
75ea658
feat: read commit info actions
roeap Jan 6, 2024
2d14a63
refactor: update state.merge call sites
roeap Jan 6, 2024
54f49ba
refactor: always use metadata action
roeap Jan 6, 2024
7cc1f04
feat: read tombstones from log
roeap Jan 6, 2024
a71e047
refactor: consume async tombstones
roeap Jan 6, 2024
cfa647a
feat: make snapshots serializable
roeap Jan 7, 2024
5e86161
refactor: always use Metadata action
roeap Jan 7, 2024
a2fc20f
feat: use EagerSnapshot
roeap Jan 7, 2024
34153f0
chore: remove parquet and arrow feature
roeap Jan 7, 2024
7d00aaf
chore: cleanup
roeap Jan 8, 2024
ae7d162
feat: add stats_parsed in file data
roeap Jan 10, 2024
2836a1e
refactor: simplify module structure
roeap Jan 12, 2024
5415073
feat: arrow-backed file statistics
roeap Jan 13, 2024
f072a00
feat: stats and partition value handling
roeap Jan 15, 2024
2d9a5ea
feat: improve logical file handling
roeap Jan 15, 2024
9061f4f
fix: test expectations
roeap Jan 15, 2024
6755707
Merge branch 'main' into log-replay
roeap Jan 16, 2024
729a0d1
fix: minor s3 test fixes
roeap Jan 16, 2024
69db52d
refactor: simplify kernel module structure
roeap Jan 16, 2024
2a296c7
Merge branch 'main' into log-replay
roeap Jan 16, 2024
3cba9f4
Merge branch 'main' into log-replay
rtyler Jan 23, 2024
72ea9b1
Correct an error from conflict resolution
rtyler Jan 23, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ arrow-arith = { version = "49" }
arrow-array = { version = "49" }
arrow-buffer = { version = "49" }
arrow-cast = { version = "49" }
arrow-ipc = { version = "49" }
arrow-json = { version = "49" }
arrow-ord = { version = "49" }
arrow-row = { version = "49" }
arrow-schema = { version = "49" }
Expand Down
4 changes: 2 additions & 2 deletions crates/benchmarks/src/bin/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,10 +193,10 @@ async fn benchmark_merge_tpcds(
merge: fn(DataFrame, DeltaTable) -> Result<MergeBuilder, DeltaTableError>,
) -> Result<(core::time::Duration, MergeMetrics), DataFusionError> {
let table = DeltaTableBuilder::from_uri(path).load().await?;
let file_count = table.state.files().len();
let file_count = table.snapshot()?.files_count();

let provider = DeltaTableProvider::try_new(
table.state.clone(),
table.snapshot()?.clone(),
table.log_store(),
DeltaScanConfig {
file_column_name: Some("file_path".to_string()),
Expand Down
1 change: 1 addition & 0 deletions crates/deltalake-aws/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ url = { workspace = true }
backoff = { version = "0.4", features = [ "tokio" ] }

[dev-dependencies]
deltalake-core = { path = "../deltalake-core", features = ["datafusion"] }
chrono = { workspace = true }
serial_test = "3"
deltalake-test = { path = "../deltalake-test" }
Expand Down
10 changes: 5 additions & 5 deletions crates/deltalake-aws/tests/integration_s3_dynamodb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,12 +267,11 @@ fn add_action(name: &str) -> Action {
let ts = (SystemTime::now() - Duration::from_secs(1800))
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs();
Action::Add(Add {
.as_millis();
Add {
path: format!("{}.parquet", name),
size: 396,
partition_values: HashMap::new(),
partition_values_parsed: None,
modification_time: ts as i64,
data_change: true,
stats: None,
Expand All @@ -282,7 +281,8 @@ fn add_action(name: &str) -> Action {
base_row_id: None,
default_row_commit_version: None,
clustering_provider: None,
})
}
.into()
}

async fn prepare_table(context: &IntegrationContext, table_name: &str) -> TestResult<DeltaTable> {
Expand Down Expand Up @@ -322,7 +322,7 @@ async fn append_to_table(
table.log_store().as_ref(),
&actions,
operation,
&table.state,
Some(table.snapshot()?),
metadata,
)
.await
Expand Down
43 changes: 15 additions & 28 deletions crates/deltalake-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,22 @@ features = ["azure", "datafusion", "gcs", "hdfs", "json", "python", "s3", "unity

[dependencies]
# arrow
arrow = { workspace = true, optional = true }
arrow-arith = { workspace = true, optional = true }
arrow-array = { workspace = true, optional = true }
arrow-buffer = { workspace = true, optional = true }
arrow-cast = { workspace = true, optional = true }
arrow-ord = { workspace = true, optional = true }
arrow-row = { workspace = true, optional = true }
arrow-schema = { workspace = true, optional = true, features = ["serde"] }
arrow-select = { workspace = true, optional = true }
arrow = { workspace = true }
arrow-arith = { workspace = true }
arrow-array = { workspace = true }
arrow-buffer = { workspace = true }
arrow-cast = { workspace = true }
arrow-ipc = { workspace = true }
arrow-json = { workspace = true }
arrow-ord = { workspace = true }
arrow-row = { workspace = true }
arrow-schema = { workspace = true, features = ["serde"] }
arrow-select = { workspace = true }
parquet = { workspace = true, features = [
"async",
"object_store",
], optional = true }
] }
pin-project-lite = "^0.2.7"

# datafusion
datafusion = { workspace = true, optional = true }
Expand All @@ -48,6 +51,7 @@ serde_json = { workspace = true }
# "stdlib"
bytes = { workspace = true }
chrono = { workspace = true, default-features = false, features = ["clock"] }
hashbrown = "*"
regex = { workspace = true }
thiserror = { workspace = true }
uuid = { workspace = true, features = ["serde", "v4"] }
Expand Down Expand Up @@ -111,18 +115,7 @@ tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
utime = "0.3"

[features]
arrow = [
"dep:arrow",
"arrow-arith",
"arrow-array",
"arrow-cast",
"arrow-ord",
"arrow-row",
"arrow-schema",
"arrow-select",
"arrow-buffer",
]
default = ["arrow", "parquet"]
default = []
datafusion = [
"dep:datafusion",
"datafusion-expr",
Expand All @@ -131,14 +124,8 @@ datafusion = [
"datafusion-physical-expr",
"datafusion-sql",
"sqlparser",
"arrow",
"parquet",
]
datafusion-ext = ["datafusion"]
json = ["parquet/json"]
python = ["arrow/pyarrow"]
unity-experimental = ["reqwest", "hyper"]

[[bench]]
name = "read_checkpoint"
harness = false
29 changes: 0 additions & 29 deletions crates/deltalake-core/benches/read_checkpoint.rs

This file was deleted.

6 changes: 4 additions & 2 deletions crates/deltalake-core/src/delta_datafusion/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,8 @@ mod test {
.cast_to::<DFSchema>(
&arrow_schema::DataType::Utf8,
&table
.state
.snapshot()
.unwrap()
.input_schema()
.unwrap()
.as_ref()
Expand Down Expand Up @@ -612,7 +613,8 @@ mod test {
assert_eq!(test.expected, actual);

let actual_expr = table
.state
.snapshot()
.unwrap()
.parse_predicate_expression(actual, &session.state())
.unwrap();

Expand Down
Loading
Loading