From 4eee45114835c013df139e052ab0b1c5be981512 Mon Sep 17 00:00:00 2001 From: Thomas Peiselt Date: Thu, 21 Dec 2023 11:15:51 +0100 Subject: [PATCH 1/4] feat(test): Integration test for S3 log store with pyspark --- .../test_concurrent_write_s3_dynamo.py | 196 ++++++++++++++++++ python/tests/pyspark_integration/utils.py | 10 +- 2 files changed, 205 insertions(+), 1 deletion(-) create mode 100644 python/tests/pyspark_integration/test_concurrent_write_s3_dynamo.py diff --git a/python/tests/pyspark_integration/test_concurrent_write_s3_dynamo.py b/python/tests/pyspark_integration/test_concurrent_write_s3_dynamo.py new file mode 100644 index 0000000000..185d2ed7eb --- /dev/null +++ b/python/tests/pyspark_integration/test_concurrent_write_s3_dynamo.py @@ -0,0 +1,196 @@ +"""Test concurrent writes between pyspark and deltalake(delta-rs).""" + +import concurrent.futures as ft +import os +import subprocess +import time +import uuid + +import pandas as pd +import pyspark +import pytest + +from deltalake import DeltaTable, write_deltalake +from tests.pyspark_integration.utils import get_spark + + +def configure_spark_session_for_s3( + spark: pyspark.sql.SparkSession, lock_table_name: str +): + spark.conf.set( + "spark.hadoop.fs.s3a.aws.credentials.provider", + "com.amazonaws.auth.profile.ProfileCredentialsProvider", + ) + spark.conf.set( + "spark.delta.logStore.s3a.impl", "io.delta.storage.S3DynamoDBLogStore" + ) + spark.conf.set( + "spark.io.delta.storage.S3DynamoDBLogStore.ddb.region", "eu-central-1" + ) + spark.conf.set( + "spark.io.delta.storage.S3DynamoDBLogStore.ddb.tableName", lock_table_name + ) + return spark.newSession() + + +def written_by_spark(filename: str) -> bool: + return filename.startswith("part-00000-") + + +def run_spark_insert(table_uri: str, lock_table_name: str, run_until: float) -> int: + region = os.environ.get("AWS_REGION", "us-east-1") + spark = get_spark() + spark.conf.set( + "spark.io.delta.storage.S3DynamoDBLogStore.ddb.tableName", lock_table_name + ) + spark.conf.set( + "spark.hadoop.fs.s3a.aws.credentials.provider", + "com.amazonaws.auth.profile.ProfileCredentialsProvider", + ) + spark.conf.set("spark.io.delta.storage.S3DynamoDBLogStore.ddb.region", region) + row = 0 + while True: + row += 1 + spark.createDataFrame( + [(row, "created by spark")], schema=["row", "creator"] + ).coalesce(1).write.save( + table_uri, + mode="append", + format="delta", + ) + if time.time() >= run_until: + break + return row + + +def run_delta_rs_insert(dt: DeltaTable, run_until: float) -> int: + row = 0 + while True: + row += 1 + df = pd.DataFrame({"row": [row], "creator": ["created by delta-rs"]}) + write_deltalake(dt, df, schema=dt.schema(), mode="append") + if time.time() > run_until: + break + return row + + +@pytest.mark.pyspark +@pytest.mark.integration +def test_concurrent_writes_pyspark_delta(setup): + run_duration = 30 + + (table_uri, table_name) = setup + # this is to create the table, in a non-concurrent way + initial_insert = run_spark_insert(table_uri, table_name, time.time()) + assert initial_insert == 1 + + os.environ["AWS_S3_LOCKING_PROVIDER"] = "dynamodb" + os.environ["DELTA_DYNAMO_TABLE_NAME"] = table_name + dt = DeltaTable(table_uri) + with ft.ThreadPoolExecutor(max_workers=2) as executor: + spark_insert = executor.submit( + run_spark_insert, + table_uri, + table_name, + time.time() + run_duration, + ) + delta_insert = executor.submit( + run_delta_rs_insert, + dt, + time.time() + run_duration, + ) + spark_total_inserts = spark_insert.result() + delta_total_inserts = delta_insert.result() + dt = DeltaTable(table_uri) + assert dt.version() == spark_total_inserts + delta_total_inserts + + files_from_spark = len(list(filter(written_by_spark, dt.files()))) + assert files_from_spark == spark_total_inserts + 1 + # +1 is for the initial write that created the table + assert dt.files().__len__() == spark_total_inserts + delta_total_inserts + 1 + + +def create_bucket(bucket_url: str): + env = os.environ.copy() + ( + subprocess.run( + [ + "aws", + "s3", + "mb", + bucket_url, + ], + env=env, + ), + ) + + +def remove_bucket(bucket_url: str): + env = os.environ.copy() + ( + subprocess.run( + [ + "aws", + "s3", + "rb", + bucket_url, + "--force", + ], + env=env, + ), + ) + + +def create_dynamodb_table(table_name: str): + env = os.environ.copy() + ( + subprocess.run( + [ + "aws", + "dynamodb", + "create-table", + "--table-name", + table_name, + "--attribute-definitions", + "AttributeName=tablePath,AttributeType=S", + "AttributeName=fileName,AttributeType=S", + "--key-schema", + "AttributeName=tablePath,KeyType=HASH", + "AttributeName=fileName,KeyType=RANGE", + "--provisioned-throughput", + "ReadCapacityUnits=5,WriteCapacityUnits=5", + ], + env=env, + ), + ) + + +def delete_dynamodb_table(table_name: str): + env = os.environ.copy() + ( + subprocess.run( + [ + "aws", + "dynamodb", + "delete-table", + "--table-name", + table_name, + ], + env=env, + ), + ) + + +@pytest.fixture +def setup(): + id = uuid.uuid4() + bucket_name = f"delta-rs-integration-{id}" + bucket_url = f"s3://{bucket_name}" + + create_bucket(bucket_url) + create_dynamodb_table(bucket_name) + # spark always uses s3a://, so delta has to as well, because the dynamodb primary key is the + # table root path and it must match between all writers + yield (f"s3a://{bucket_name}/test-concurrent-writes", bucket_name) + delete_dynamodb_table(bucket_name) + remove_bucket(bucket_url) diff --git a/python/tests/pyspark_integration/utils.py b/python/tests/pyspark_integration/utils.py index 5ec23317a0..d222599018 100644 --- a/python/tests/pyspark_integration/utils.py +++ b/python/tests/pyspark_integration/utils.py @@ -26,8 +26,16 @@ def get_spark(): "spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog", ) + # log store setting must be present at SparkSession creation, and can't be configured later + .config("spark.delta.logStore.s3a.impl", "io.delta.storage.S3DynamoDBLogStore") ) - return delta.pip_utils.configure_spark_with_delta_pip(builder).getOrCreate() + extra_packages = [ + "io.delta:delta-storage-s3-dynamodb:3.0.0", + "org.apache.hadoop:hadoop-aws:3.3.1", + ] + return delta.pip_utils.configure_spark_with_delta_pip( + builder, extra_packages + ).getOrCreate() def assert_spark_read_equal( From 32e2c28cc113758512cf9794dda3812c8fd5dc76 Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Tue, 23 Jan 2024 23:58:43 -0800 Subject: [PATCH 2/4] Point pyspark test to the local dockjer-compose setup This still is not working, but it's not totally failing I guess --- .../pyspark_integration/test_concurrent_write_s3_dynamo.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/python/tests/pyspark_integration/test_concurrent_write_s3_dynamo.py b/python/tests/pyspark_integration/test_concurrent_write_s3_dynamo.py index 185d2ed7eb..64479e96c8 100644 --- a/python/tests/pyspark_integration/test_concurrent_write_s3_dynamo.py +++ b/python/tests/pyspark_integration/test_concurrent_write_s3_dynamo.py @@ -183,6 +183,11 @@ def delete_dynamodb_table(table_name: str): @pytest.fixture def setup(): + os.environ['AWS_ENDPOINT_URL'] = 'http://localhost:4566' + os.environ['AWS_REGION'] = 'us-east-1' + os.environ['AWS_ACCESS_KEY_ID'] = 'deltalake' + os.environ['AWS_SECRET_ACCESS_KEY'] = 'weloverust' + os.environ['AWS_ALLOW_HTTP'] = 'true' id = uuid.uuid4() bucket_name = f"delta-rs-integration-{id}" bucket_url = f"s3://{bucket_name}" From 893d6311307978c00cc4b4f39d1a7ece1e83c4ba Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philippe=20No=C3=ABl?= <21990816+philippemnoel@users.noreply.github.com> Date: Tue, 30 Jan 2024 12:12:43 -0500 Subject: [PATCH 3/4] chore: upgrade to DataFusion 35.0 (#2121) # Description This PR upgrades `delta-rs` to using DataFusion 35.0, which was recently released. In order to do this, I had to fix a few breaking changes, and also upgrade Arrow to 50 and `sqlparser` to 0.41. # Related Issue(s) N/A # Documentation See here for the list of PRs which required code change: - https://github.com/apache/arrow-datafusion/pull/8703 - https://github.com/apache/arrow-datafusion/blob/ec6abece2dcfa68007b87c69eefa6b0d7333f628/dev/changelog/35.0.0.md?plain=1#L227 --------- Co-authored-by: Ming Ying --- Cargo.toml | 39 ++++---- crates/azure/src/config.rs | 1 + crates/azure/src/error.rs | 1 + crates/core/Cargo.toml | 2 +- crates/core/src/delta_datafusion/mod.rs | 8 +- crates/core/src/kernel/snapshot/log_data.rs | 2 +- crates/core/src/operations/delete.rs | 1 - crates/core/src/operations/merge/barrier.rs | 4 +- crates/core/src/operations/merge/mod.rs | 6 +- crates/core/src/operations/optimize.rs | 46 +++++++--- .../core/src/operations/transaction/state.rs | 15 +++- crates/core/src/operations/update.rs | 10 +-- crates/gcp/src/error.rs | 1 + crates/sql/src/parser.rs | 88 +------------------ 14 files changed, 80 insertions(+), 144 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 5d03cd562d..cfcb4eaf3c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,28 +19,27 @@ debug = "line-tables-only" [workspace.dependencies] # arrow -arrow = { version = "49" } -arrow-arith = { version = "49" } -arrow-array = { version = "49" } -arrow-buffer = { version = "49" } -arrow-cast = { version = "49" } -arrow-ipc = { version = "49" } -arrow-json = { version = "49" } -arrow-ord = { version = "49" } -arrow-row = { version = "49" } -arrow-schema = { version = "49" } -arrow-select = { version = "49" } -object_store = { version = "0.8" } -parquet = { version = "49" } +arrow = { version = "50" } +arrow-arith = { version = "50" } +arrow-array = { version = "50" } +arrow-buffer = { version = "50" } +arrow-cast = { version = "50" } +arrow-ipc = { version = "50" } +arrow-json = { version = "50" } +arrow-ord = { version = "50" } +arrow-row = { version = "50" } +arrow-schema = { version = "50" } +arrow-select = { version = "50" } +object_store = { version = "0.9" } +parquet = { version = "50" } # datafusion -datafusion = { version = "34" } -datafusion-expr = { version = "34" } -datafusion-common = { version = "34" } -datafusion-proto = { version = "34" } -datafusion-sql = { version = "34" } -datafusion-physical-expr = { version = "34" } - +datafusion = { version = "35" } +datafusion-expr = { version = "35" } +datafusion-common = { version = "35" } +datafusion-proto = { version = "35" } +datafusion-sql = { version = "35" } +datafusion-physical-expr = { version = "35" } # serde serde = { version = "1.0.194", features = ["derive"] } diff --git a/crates/azure/src/config.rs b/crates/azure/src/config.rs index ccb06f171a..d30272768e 100644 --- a/crates/azure/src/config.rs +++ b/crates/azure/src/config.rs @@ -35,6 +35,7 @@ enum AzureCredential { /// Authorizing with secret ClientSecret, /// Using a shared access signature + #[allow(dead_code)] ManagedIdentity, /// Using a shared access signature SasKey, diff --git a/crates/azure/src/error.rs b/crates/azure/src/error.rs index aca1321c3d..acc18f67f9 100644 --- a/crates/azure/src/error.rs +++ b/crates/azure/src/error.rs @@ -4,6 +4,7 @@ pub(crate) type Result = std::result::Result; #[derive(thiserror::Error, Debug)] pub(crate) enum Error { + #[allow(dead_code)] #[error("failed to parse config: {0}")] Parse(String), diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index f8d3778eca..9773f82c46 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -97,7 +97,7 @@ reqwest = { version = "0.11.18", default-features = false, features = [ "rustls-tls", "json", ], optional = true } -sqlparser = { version = "0.40", optional = true } +sqlparser = { version = "0.41", optional = true } [dev-dependencies] criterion = "0.5" diff --git a/crates/core/src/delta_datafusion/mod.rs b/crates/core/src/delta_datafusion/mod.rs index ca64c9ef63..d7a10edc1f 100644 --- a/crates/core/src/delta_datafusion/mod.rs +++ b/crates/core/src/delta_datafusion/mod.rs @@ -90,7 +90,7 @@ pub mod physical; impl From for DataFusionError { fn from(err: DeltaTableError) -> Self { match err { - DeltaTableError::Arrow { source } => DataFusionError::ArrowError(source), + DeltaTableError::Arrow { source } => DataFusionError::ArrowError(source, None), DeltaTableError::Io { source } => DataFusionError::IoError(source), DeltaTableError::ObjectStore { source } => DataFusionError::ObjectStore(source), DeltaTableError::Parquet { source } => DataFusionError::ParquetError(source), @@ -102,7 +102,7 @@ impl From for DataFusionError { impl From for DeltaTableError { fn from(err: DataFusionError) -> Self { match err { - DataFusionError::ArrowError(source) => DeltaTableError::Arrow { source }, + DataFusionError::ArrowError(source, _) => DeltaTableError::Arrow { source }, DataFusionError::IoError(source) => DeltaTableError::Io { source }, DataFusionError::ObjectStore(source) => DeltaTableError::ObjectStore { source }, DataFusionError::ParquetError(source) => DeltaTableError::Parquet { source }, @@ -430,7 +430,6 @@ impl<'a> DeltaScanBuilder<'a> { limit: self.limit, table_partition_cols, output_ordering: vec![], - infinite_source: false, }, logical_filter.as_ref(), ) @@ -808,7 +807,7 @@ pub(crate) fn logical_expr_to_physical_expr( ) -> Arc { let df_schema = schema.clone().to_dfschema().unwrap(); let execution_props = ExecutionProps::new(); - create_physical_expr(expr, &df_schema, schema, &execution_props).unwrap() + create_physical_expr(expr, &df_schema, &execution_props).unwrap() } pub(crate) async fn execute_plan_to_batch( @@ -1238,7 +1237,6 @@ pub(crate) async fn find_files_scan<'a>( let predicate_expr = create_physical_expr( &Expr::IsTrue(Box::new(expression.clone())), &input_dfschema, - &input_schema, state.execution_props(), )?; diff --git a/crates/core/src/kernel/snapshot/log_data.rs b/crates/core/src/kernel/snapshot/log_data.rs index 525f3db64b..b874b53421 100644 --- a/crates/core/src/kernel/snapshot/log_data.rs +++ b/crates/core/src/kernel/snapshot/log_data.rs @@ -245,7 +245,7 @@ impl LogicalFile<'_> { self.deletion_vector.as_ref().and_then(|arr| { arr.storage_type .is_valid(self.index) - .then(|| DeletionVectorView { + .then_some(DeletionVectorView { data: arr, index: self.index, }) diff --git a/crates/core/src/operations/delete.rs b/crates/core/src/operations/delete.rs index 1e0f196aa3..2e3e99bde2 100644 --- a/crates/core/src/operations/delete.rs +++ b/crates/core/src/operations/delete.rs @@ -152,7 +152,6 @@ async fn excute_non_empty_expr( let predicate_expr = create_physical_expr( &negated_expression, &input_dfschema, - &input_schema, state.execution_props(), )?; let filter: Arc = diff --git a/crates/core/src/operations/merge/barrier.rs b/crates/core/src/operations/merge/barrier.rs index 6883f61253..f1df28c4a4 100644 --- a/crates/core/src/operations/merge/barrier.rs +++ b/crates/core/src/operations/merge/barrier.rs @@ -293,7 +293,9 @@ impl Stream for MergeBarrierStream { .iter() .map(|c| { arrow::compute::take(c.as_ref(), &indices, None) - .map_err(DataFusionError::ArrowError) + .map_err(|err| { + DataFusionError::ArrowError(err, None) + }) }) .collect::>>()?; diff --git a/crates/core/src/operations/merge/mod.rs b/crates/core/src/operations/merge/mod.rs index ffe2e78e38..07f65f4cf8 100644 --- a/crates/core/src/operations/merge/mod.rs +++ b/crates/core/src/operations/merge/mod.rs @@ -32,7 +32,6 @@ use std::collections::HashMap; use std::sync::Arc; use std::time::Instant; -use arrow_schema::Schema as ArrowSchema; use async_trait::async_trait; use datafusion::datasource::provider_as_source; use datafusion::error::Result as DataFusionResult; @@ -657,11 +656,10 @@ impl ExtensionPlanner for MergeMetricExtensionPlanner { if let Some(barrier) = node.as_any().downcast_ref::() { let schema = barrier.input.schema(); - let exec_schema: ArrowSchema = schema.as_ref().to_owned().into(); return Ok(Some(Arc::new(MergeBarrierExec::new( physical_inputs.first().unwrap().clone(), barrier.file_column.clone(), - planner.create_physical_expr(&barrier.expr, schema, &exec_schema, session_state)?, + planner.create_physical_expr(&barrier.expr, schema, session_state)?, )))); } @@ -1418,9 +1416,7 @@ impl std::future::IntoFuture for MergeBuilder { PROTOCOL.can_write_to(&this.snapshot)?; let state = this.state.unwrap_or_else(|| { - //TODO: Datafusion's Hashjoin has some memory issues. Running with all cores results in a OoM. Can be removed when upstream improvemetns are made. let config: SessionConfig = DeltaSessionConfig::default().into(); - let config = config.with_target_partitions(1); let session = SessionContext::new_with_config(config); // If a user provides their own their DF state then they must register the store themselves diff --git a/crates/core/src/operations/optimize.rs b/crates/core/src/operations/optimize.rs index 1fc0286754..c67b31a71b 100644 --- a/crates/core/src/operations/optimize.rs +++ b/crates/core/src/operations/optimize.rs @@ -552,7 +552,7 @@ impl MergePlan { use datafusion::prelude::{col, ParquetReadOptions}; use datafusion_common::Column; use datafusion_expr::expr::ScalarFunction; - use datafusion_expr::Expr; + use datafusion_expr::{Expr, ScalarUDF}; let locations = files .iter() @@ -578,7 +578,7 @@ impl MergePlan { .map(|col| Expr::Column(Column::from_qualified_name_ignore_case(col))) .collect_vec(); let expr = Expr::ScalarFunction(ScalarFunction::new_udf( - Arc::new(zorder::datafusion::zorder_key_udf()), + Arc::new(ScalarUDF::from(zorder::datafusion::ZOrderUDF)), cols, )); let df = df.with_column(ZORDER_KEY_COLUMN, expr)?; @@ -1139,10 +1139,10 @@ pub(super) mod zorder { use arrow_schema::DataType; use datafusion_common::DataFusionError; use datafusion_expr::{ - ColumnarValue, ReturnTypeFunction, ScalarFunctionImplementation, ScalarUDF, Signature, - TypeSignature, Volatility, + ColumnarValue, ScalarUDF, ScalarUDFImpl, Signature, TypeSignature, Volatility, }; use itertools::Itertools; + use std::any::Any; pub const ZORDER_UDF_NAME: &str = "zorder_key"; @@ -1166,20 +1166,38 @@ pub(super) mod zorder { use url::Url; let ctx = SessionContext::new_with_config_rt(SessionConfig::default(), runtime); - ctx.register_udf(datafusion::zorder_key_udf()); + ctx.register_udf(ScalarUDF::from(datafusion::ZOrderUDF)); Ok(Self { columns, ctx }) } } - /// Get the DataFusion UDF struct for zorder_key - pub fn zorder_key_udf() -> ScalarUDF { - let signature = Signature { - type_signature: TypeSignature::VariadicAny, - volatility: Volatility::Immutable, - }; - let return_type: ReturnTypeFunction = Arc::new(|_| Ok(Arc::new(DataType::Binary))); - let fun: ScalarFunctionImplementation = Arc::new(zorder_key_datafusion); - ScalarUDF::new(ZORDER_UDF_NAME, &signature, &return_type, &fun) + // DataFusion UDF impl for zorder_key + #[derive(Debug)] + pub struct ZOrderUDF; + + impl ScalarUDFImpl for ZOrderUDF { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + ZORDER_UDF_NAME + } + + fn signature(&self) -> &Signature { + &Signature { + type_signature: TypeSignature::VariadicAny, + volatility: Volatility::Immutable, + } + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Ok(DataType::Binary) + } + + fn invoke(&self, args: &[ColumnarValue]) -> Result { + zorder_key_datafusion(args) + } } /// Datafusion zorder UDF body diff --git a/crates/core/src/operations/transaction/state.rs b/crates/core/src/operations/transaction/state.rs index d3f680fcea..ab778f2cb6 100644 --- a/crates/core/src/operations/transaction/state.rs +++ b/crates/core/src/operations/transaction/state.rs @@ -1,6 +1,7 @@ +use std::collections::HashSet; use std::sync::Arc; -use arrow::array::ArrayRef; +use arrow::array::{ArrayRef, BooleanArray}; use arrow::datatypes::{ DataType, Field as ArrowField, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, }; @@ -296,6 +297,12 @@ impl<'a> PruningStatistics for AddContainer<'a> { }); ScalarValue::iter_to_array(values).ok() } + + // This function is required since DataFusion 35.0, but is implemented as a no-op + // https://github.com/apache/arrow-datafusion/blob/ec6abece2dcfa68007b87c69eefa6b0d7333f628/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs#L550 + fn contained(&self, _column: &Column, _value: &HashSet) -> Option { + None + } } impl PruningStatistics for DeltaTableState { @@ -333,6 +340,12 @@ impl PruningStatistics for DeltaTableState { let container = AddContainer::new(&files, partition_columns, self.arrow_schema().ok()?); container.null_counts(column) } + + // This function is required since DataFusion 35.0, but is implemented as a no-op + // https://github.com/apache/arrow-datafusion/blob/ec6abece2dcfa68007b87c69eefa6b0d7333f628/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs#L550 + fn contained(&self, _column: &Column, _value: &HashSet) -> Option { + None + } } #[cfg(test)] diff --git a/crates/core/src/operations/update.rs b/crates/core/src/operations/update.rs index 582a37da28..d07f3f9fc0 100644 --- a/crates/core/src/operations/update.rs +++ b/crates/core/src/operations/update.rs @@ -263,12 +263,7 @@ async fn execute( let predicate_null = when(predicate.clone(), lit(true)).otherwise(lit(ScalarValue::Boolean(None)))?; - let predicate_expr = create_physical_expr( - &predicate_null, - &input_dfschema, - &input_schema, - execution_props, - )?; + let predicate_expr = create_physical_expr(&predicate_null, &input_dfschema, execution_props)?; expressions.push((predicate_expr, "__delta_rs_update_predicate".to_string())); let projection_predicate: Arc = @@ -315,8 +310,7 @@ async fn execute( let expr = case(col("__delta_rs_update_predicate")) .when(lit(true), expr.to_owned()) .otherwise(col(column.to_owned()))?; - let predicate_expr = - create_physical_expr(&expr, &input_dfschema, &input_schema, execution_props)?; + let predicate_expr = create_physical_expr(&expr, &input_dfschema, execution_props)?; map.insert(column.name.clone(), expressions.len()); let c = "__delta_rs_".to_string() + &column.name; expressions.push((predicate_expr, c.clone())); diff --git a/crates/gcp/src/error.rs b/crates/gcp/src/error.rs index aca1321c3d..acc18f67f9 100644 --- a/crates/gcp/src/error.rs +++ b/crates/gcp/src/error.rs @@ -4,6 +4,7 @@ pub(crate) type Result = std::result::Result; #[derive(thiserror::Error, Debug)] pub(crate) enum Error { + #[allow(dead_code)] #[error("failed to parse config: {0}")] Parse(String), diff --git a/crates/sql/src/parser.rs b/crates/sql/src/parser.rs index 3287c87215..10e7252730 100644 --- a/crates/sql/src/parser.rs +++ b/crates/sql/src/parser.rs @@ -1,7 +1,7 @@ use std::collections::VecDeque; use std::fmt; -use datafusion_sql::parser::{DFParser, DescribeTableStmt, Statement as DFStatement}; +use datafusion_sql::parser::{DFParser, Statement as DFStatement}; use datafusion_sql::sqlparser::ast::{ObjectName, Value}; use datafusion_sql::sqlparser::dialect::{keywords::Keyword, Dialect, GenericDialect}; use datafusion_sql::sqlparser::parser::{Parser, ParserError}; @@ -138,10 +138,6 @@ impl<'a> DeltaParser<'a> { match self.parser.peek_token().token { Token::Word(w) => { match w.keyword { - Keyword::DESCRIBE => { - self.parser.next_token(); - self.parse_describe() - } Keyword::VACUUM => { self.parser.next_token(); self.parse_vacuum() @@ -167,50 +163,6 @@ impl<'a> DeltaParser<'a> { } } - /// Parse a SQL `DESCRIBE` statement - pub fn parse_describe(&mut self) -> Result { - match self.parser.peek_token().token { - Token::Word(w) => match w.keyword { - Keyword::DETAIL => { - self.parser.next_token(); - let table = self.parser.parse_object_name()?; - Ok(Statement::Describe(DescribeStatement { - table, - operation: DescribeOperation::Detail, - })) - } - Keyword::HISTORY => { - self.parser.next_token(); - let table = self.parser.parse_object_name()?; - Ok(Statement::Describe(DescribeStatement { - table, - operation: DescribeOperation::History, - })) - } - Keyword::FILES => { - self.parser.next_token(); - let table = self.parser.parse_object_name()?; - Ok(Statement::Describe(DescribeStatement { - table, - operation: DescribeOperation::Files, - })) - } - _ => { - let table = self.parser.parse_object_name()?; - Ok(Statement::Datafusion(DFStatement::DescribeTableStmt( - DescribeTableStmt { table_name: table }, - ))) - } - }, - _ => { - let table_name = self.parser.parse_object_name()?; - Ok(Statement::Datafusion(DFStatement::DescribeTableStmt( - DescribeTableStmt { table_name }, - ))) - } - } - } - pub fn parse_vacuum(&mut self) -> Result { let table_name = self.parser.parse_object_name()?; match self.parser.peek_token().token { @@ -287,44 +239,6 @@ mod tests { Ok(()) } - #[test] - fn test_parse_describe() { - let stmt = Statement::Describe(DescribeStatement { - table: ObjectName(vec![Ident { - value: "data_table".to_string(), - quote_style: None, - }]), - operation: DescribeOperation::History, - }); - assert!(expect_parse_ok("DESCRIBE HISTORY data_table", stmt).is_ok()); - - let stmt = Statement::Describe(DescribeStatement { - table: ObjectName(vec![Ident { - value: "data_table".to_string(), - quote_style: None, - }]), - operation: DescribeOperation::Detail, - }); - assert!(expect_parse_ok("DESCRIBE DETAIL data_table", stmt).is_ok()); - - let stmt = Statement::Describe(DescribeStatement { - table: ObjectName(vec![Ident { - value: "data_table".to_string(), - quote_style: None, - }]), - operation: DescribeOperation::Files, - }); - assert!(expect_parse_ok("DESCRIBE FILES data_table", stmt).is_ok()); - - let stmt = Statement::Datafusion(DFStatement::DescribeTableStmt(DescribeTableStmt { - table_name: ObjectName(vec![Ident { - value: "data_table".to_string(), - quote_style: None, - }]), - })); - assert!(expect_parse_ok("DESCRIBE data_table", stmt).is_ok()) - } - #[test] fn test_parse_vacuum() { let stmt = Statement::Vacuum(VacuumStatement { From d1b24f5e7d1f4ea152fe4a76be599c0e4b3fc603 Mon Sep 17 00:00:00 2001 From: Thomas Peiselt Date: Tue, 30 Jan 2024 21:43:59 +0100 Subject: [PATCH 4/4] fix(s3): restore working test for DynamoDb log store repair log on read (#2120) # Description Make sure the read path for delta table commit entries passes through the log store, enabling it to ensure the invariants and potentially repair a broken commit in the context of S3 / DynamoDb log store implementation. This also adds another test in the context of S3 log store: repairing a log store on load was not implemented previously. Note that this a stopgap and not a complete solution: it comes with a performance penalty as we're triggering a redundant object store list operation just for the purpose of "triggering" the log store functionality. fixes #2109 --------- Co-authored-by: Ion Koutsouris <15728914+ion-elgreco@users.noreply.github.com> Co-authored-by: R. Tyler Croy --- crates/aws/src/logstore.rs | 14 +++++++++++ crates/aws/tests/integration_s3_dynamodb.rs | 14 ++++++++++- .../core/src/kernel/snapshot/log_segment.rs | 13 ++++++++--- crates/core/src/kernel/snapshot/mod.rs | 23 +++++++++---------- crates/core/src/logstore/mod.rs | 5 ++++ crates/core/src/table/mod.rs | 6 +---- crates/core/src/table/state.rs | 5 ++-- .../test_concurrent_write_s3_dynamo.py | 10 ++++---- 8 files changed, 62 insertions(+), 28 deletions(-) diff --git a/crates/aws/src/logstore.rs b/crates/aws/src/logstore.rs index 8e02659d87..123aadd2d1 100644 --- a/crates/aws/src/logstore.rs +++ b/crates/aws/src/logstore.rs @@ -166,6 +166,20 @@ impl LogStore for S3DynamoDbLogStore { self.table_path.clone() } + async fn refresh(&self) -> DeltaResult<()> { + let entry = self + .lock_client + .get_latest_entry(&self.table_path) + .await + .map_err(|err| DeltaTableError::GenericError { + source: Box::new(err), + })?; + if let Some(entry) = entry { + self.repair_entry(&entry).await?; + } + Ok(()) + } + async fn read_commit_entry(&self, version: i64) -> DeltaResult> { let entry = self .lock_client diff --git a/crates/aws/tests/integration_s3_dynamodb.rs b/crates/aws/tests/integration_s3_dynamodb.rs index ff8f0ae7e9..179c46fc5a 100644 --- a/crates/aws/tests/integration_s3_dynamodb.rs +++ b/crates/aws/tests/integration_s3_dynamodb.rs @@ -156,7 +156,6 @@ async fn test_repair_commit_entry() -> TestResult<()> { #[tokio::test] #[serial] -#[ignore = "https://github.com/delta-io/delta-rs/issues/2109"] async fn test_repair_on_update() -> TestResult<()> { let context = IntegrationContext::new(Box::new(S3Integration::default()))?; let mut table = prepare_table(&context, "repair_on_update").await?; @@ -168,6 +167,19 @@ async fn test_repair_on_update() -> TestResult<()> { Ok(()) } +#[tokio::test] +#[serial] +async fn test_repair_on_load() -> TestResult<()> { + let context = IntegrationContext::new(Box::new(S3Integration::default()))?; + let mut table = prepare_table(&context, "repair_on_update").await?; + let _entry = create_incomplete_commit_entry(&table, 1, "unfinished_commit").await?; + table.load_version(1).await?; + // table should fix the broken entry while loading a specific version + assert_eq!(table.version(), 1); + validate_lock_table_state(&table, 1).await?; + Ok(()) +} + const WORKERS: i64 = 3; const COMMITS: i64 = 15; diff --git a/crates/core/src/kernel/snapshot/log_segment.rs b/crates/core/src/kernel/snapshot/log_segment.rs index 66cc428c3f..6ad1690db1 100644 --- a/crates/core/src/kernel/snapshot/log_segment.rs +++ b/crates/core/src/kernel/snapshot/log_segment.rs @@ -18,6 +18,7 @@ use tracing::debug; use super::parse; use crate::kernel::{arrow::json, Action, ActionType, Metadata, Protocol, Schema, StructType}; +use crate::logstore::LogStore; use crate::operations::transaction::get_commit_bytes; use crate::protocol::DeltaOperation; use crate::{DeltaResult, DeltaTableConfig, DeltaTableError}; @@ -148,15 +149,21 @@ impl LogSegment { table_root: &Path, start_version: i64, end_version: Option, - store: &dyn ObjectStore, + log_store: &dyn LogStore, ) -> DeltaResult { debug!( "try_new_slice: start_version: {}, end_version: {:?}", start_version, end_version ); + log_store.refresh().await?; let log_url = table_root.child("_delta_log"); - let (mut commit_files, checkpoint_files) = - list_log_files(store, &log_url, end_version, Some(start_version)).await?; + let (mut commit_files, checkpoint_files) = list_log_files( + log_store.object_store().as_ref(), + &log_url, + end_version, + Some(start_version), + ) + .await?; // remove all files above requested version if let Some(version) = end_version { commit_files.retain(|meta| meta.location.commit_version() <= Some(version)); diff --git a/crates/core/src/kernel/snapshot/mod.rs b/crates/core/src/kernel/snapshot/mod.rs index 715fb2feec..d12018c245 100644 --- a/crates/core/src/kernel/snapshot/mod.rs +++ b/crates/core/src/kernel/snapshot/mod.rs @@ -30,6 +30,7 @@ use self::parse::{read_adds, read_removes}; use self::replay::{LogMapper, LogReplayScanner, ReplayStream}; use super::{Action, Add, CommitInfo, DataType, Metadata, Protocol, Remove, StructField}; use crate::kernel::StructType; +use crate::logstore::LogStore; use crate::table::config::TableConfig; use crate::{DeltaResult, DeltaTableConfig, DeltaTableError}; @@ -108,16 +109,16 @@ impl Snapshot { /// Update the snapshot to the given version pub async fn update( &mut self, - store: Arc, + log_store: Arc, target_version: Option, ) -> DeltaResult<()> { - self.update_inner(store, target_version).await?; + self.update_inner(log_store, target_version).await?; Ok(()) } async fn update_inner( &mut self, - store: Arc, + log_store: Arc, target_version: Option, ) -> DeltaResult> { if let Some(version) = target_version { @@ -125,16 +126,14 @@ impl Snapshot { return Ok(None); } if version < self.version() { - return Err(DeltaTableError::Generic( - "Cannoit downgrade snapshot".into(), - )); + return Err(DeltaTableError::Generic("Cannot downgrade snapshot".into())); } } let log_segment = LogSegment::try_new_slice( &Path::default(), self.version() + 1, target_version, - store.as_ref(), + log_store.as_ref(), ) .await?; if log_segment.commit_files.is_empty() && log_segment.checkpoint_files.is_empty() { @@ -142,7 +141,7 @@ impl Snapshot { } let (protocol, metadata) = log_segment - .read_metadata(store.clone(), &self.config) + .read_metadata(log_store.object_store().clone(), &self.config) .await?; if let Some(protocol) = protocol { self.protocol = protocol; @@ -376,7 +375,7 @@ impl EagerSnapshot { /// Update the snapshot to the given version pub async fn update( &mut self, - store: Arc, + log_store: Arc, target_version: Option, ) -> DeltaResult<()> { if Some(self.version()) == target_version { @@ -384,12 +383,12 @@ impl EagerSnapshot { } let new_slice = self .snapshot - .update_inner(store.clone(), target_version) + .update_inner(log_store.clone(), target_version) .await?; if let Some(new_slice) = new_slice { let files = std::mem::take(&mut self.files); let log_stream = new_slice.commit_stream( - store.clone(), + log_store.object_store().clone(), &log_segment::COMMIT_SCHEMA, &self.snapshot.config, )?; @@ -398,7 +397,7 @@ impl EagerSnapshot { } else { new_slice .checkpoint_stream( - store, + log_store.object_store(), &log_segment::CHECKPOINT_SCHEMA, &self.snapshot.config, ) diff --git a/crates/core/src/logstore/mod.rs b/crates/core/src/logstore/mod.rs index 5deaa9cd36..e6b4c6e2d4 100644 --- a/crates/core/src/logstore/mod.rs +++ b/crates/core/src/logstore/mod.rs @@ -165,6 +165,11 @@ pub trait LogStore: Sync + Send { /// Return the name of this LogStore implementation fn name(&self) -> String; + /// Trigger sync operation on log store to. + async fn refresh(&self) -> DeltaResult<()> { + Ok(()) + } + /// Read data for commit entry with the given version. async fn read_commit_entry(&self, version: i64) -> DeltaResult>; diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs index ad260295ba..7615c72dc3 100644 --- a/crates/core/src/table/mod.rs +++ b/crates/core/src/table/mod.rs @@ -345,11 +345,7 @@ impl DeltaTable { self.version(), ); match self.state.as_mut() { - Some(state) => { - state - .update(self.log_store.object_store(), max_version) - .await - } + Some(state) => state.update(self.log_store.clone(), max_version).await, _ => { let state = DeltaTableState::try_new( &Path::default(), diff --git a/crates/core/src/table/state.rs b/crates/core/src/table/state.rs index 104ba2bd32..ab5c229c49 100644 --- a/crates/core/src/table/state.rs +++ b/crates/core/src/table/state.rs @@ -14,6 +14,7 @@ use crate::kernel::{ Action, Add, DataType, EagerSnapshot, LogDataHandler, LogicalFile, Metadata, Protocol, Remove, StructType, }; +use crate::logstore::LogStore; use crate::partitions::{DeltaTablePartition, PartitionFilter}; use crate::protocol::DeltaOperation; use crate::{DeltaResult, DeltaTableError}; @@ -196,10 +197,10 @@ impl DeltaTableState { /// Update the state of the table to the given version. pub async fn update( &mut self, - store: Arc, + log_store: Arc, version: Option, ) -> Result<(), DeltaTableError> { - self.snapshot.update(store, version).await?; + self.snapshot.update(log_store, version).await?; Ok(()) } diff --git a/python/tests/pyspark_integration/test_concurrent_write_s3_dynamo.py b/python/tests/pyspark_integration/test_concurrent_write_s3_dynamo.py index 64479e96c8..2cb4f05805 100644 --- a/python/tests/pyspark_integration/test_concurrent_write_s3_dynamo.py +++ b/python/tests/pyspark_integration/test_concurrent_write_s3_dynamo.py @@ -183,11 +183,11 @@ def delete_dynamodb_table(table_name: str): @pytest.fixture def setup(): - os.environ['AWS_ENDPOINT_URL'] = 'http://localhost:4566' - os.environ['AWS_REGION'] = 'us-east-1' - os.environ['AWS_ACCESS_KEY_ID'] = 'deltalake' - os.environ['AWS_SECRET_ACCESS_KEY'] = 'weloverust' - os.environ['AWS_ALLOW_HTTP'] = 'true' + os.environ["AWS_ENDPOINT_URL"] = "http://localhost:4566" + os.environ["AWS_REGION"] = "us-east-1" + os.environ["AWS_ACCESS_KEY_ID"] = "deltalake" + os.environ["AWS_SECRET_ACCESS_KEY"] = "weloverust" + os.environ["AWS_ALLOW_HTTP"] = "true" id = uuid.uuid4() bucket_name = f"delta-rs-integration-{id}" bucket_url = f"s3://{bucket_name}"