Skip to content

Commit

Permalink
Merge pull request #683 from splitgraph/sync-tables-with-prefixes
Browse files Browse the repository at this point in the history
Fix prefix swallowing when creating the object store
  • Loading branch information
gruuya authored Sep 30, 2024
2 parents 58f211b + 67c3252 commit d518e12
Show file tree
Hide file tree
Showing 8 changed files with 120 additions and 88 deletions.
2 changes: 1 addition & 1 deletion ci/clippy.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@
# https://users.rust-lang.org/t/pre-commit-clippy-fix/66584

cargo clippy --all-targets --workspace --fix --allow-dirty --allow-staged --allow-no-vcs
cargo clippy --all-targets --workspace -- -D warnings
cargo clippy --all-targets --workspace -- -D warnings --allow clippy::needless_return # remove once 13458 is resolved
4 changes: 2 additions & 2 deletions src/config/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,13 +228,13 @@ impl<'de> Deserialize<'de> for AccessSettings {
D: serde::Deserializer<'de>,
{
let s = String::deserialize(d)?;
return match s.as_str() {
match s.as_str() {
"any" => Ok(AccessSettings::Any),
"off" => Ok(AccessSettings::Off),
s => Ok(AccessSettings::Password {
sha256_hash: s.to_string(),
}),
};
}
}
}

Expand Down
31 changes: 24 additions & 7 deletions src/object_store/factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ impl ObjectStoreFactory {
}
pub async fn get_log_store_for_table(
&self,
url: Url,
mut url: Url,
options: HashMap<String, String>,
table_path: String,
) -> Result<Arc<dyn LogStore>, object_store::Error> {
Expand Down Expand Up @@ -134,16 +134,33 @@ impl ObjectStoreFactory {
}
};

// Any path provided in the url has not been included in the object store root, so it
// needs to become a part of the prefix, alongside with the actual table name (unless
// it's a file/memory store)
let prefix = if !url.path().is_empty()
&& url.scheme() != "file"
&& url.scheme() != "memory"
{
format!("{}/{table_path}", url.path())
} else {
table_path.clone()
};

// This is the least surprising way to extend the path, and make the url point to the table
// root: https://github.com/servo/rust-url/issues/333
url.path_segments_mut()
.map_err(|_| object_store::Error::Generic {
store: "object_store_factory",
source: "The provided URL is a cannot-be-a-base URL".into(),
})?
.push(&table_path);

let prefixed_store: PrefixStore<Arc<dyn ObjectStore>> =
PrefixStore::new(store, table_path.clone());
PrefixStore::new(store, prefix);

Ok(default_logstore(
Arc::from(prefixed_store),
&url.join(&table_path)
.map_err(|e| object_store::Error::Generic {
store: "object_store_factory",
source: Box::new(e),
})?,
&url,
&Default::default(),
))
}
Expand Down
100 changes: 50 additions & 50 deletions src/repository/default.rs
Original file line number Diff line number Diff line change
@@ -1,53 +1,53 @@
/// Default implementation for a Repository that factors out common
/// query patterns / SQL queries between Postgres and SQLite.
///
/// Usage:
///
/// The struct has to have certain fields, since this macro relies on them:
///
/// ```ignore
/// pub struct MyRepository {
/// pub executor: sqlx::Pool<sqlx::SqlxDatabaseType>
/// }
///
/// impl MyRepository {
/// pub const MIGRATOR: sqlx::Migrator = sqlx::migrate!("my/migrations");
/// pub const QUERIES: RepositoryQueries = RepositoryQueries {
/// all_columns_in_database: "SELECT ...",
/// }
/// pub fn interpret_error(error: sqlx::Error) -> Error {
/// // Interpret the database-specific error code and turn some sqlx errors
/// // into the Error enum values like UniqueConstraintViolation/FKConstraintViolation
/// // ...
/// }
/// }
///
/// implement_repository!(SqliteRepository)
/// ```
///
/// Gigajank alert: why are we doing this? The code between PG and SQLite is extremely similar.
/// But, I couldn't find a better way to factor it out in order to reduce duplication.
/// Here's what I tried:
///
/// - Use a generic `Pool<Any>`. This causes a weird borrow checker error when using a
/// `QueryBuilder` (https://github.com/launchbadge/sqlx/issues/1978)
/// - Make the implementation generic over any DB (that implements sqlx::Database). In that
/// case, we need to add a bunch of `where` clauses to the implementation giving constraints
/// on the argument, the query and the result types (see https://stackoverflow.com/a/70573732).
/// And, when we do that, we hit the borrow checker error again from #1.
/// - Add macros with default implementations for everything in the Repository trait and use them
/// instead of putting the whole implementation in a macro. This conflicts with the #[async_trait]
/// macro and breaks it (see https://stackoverflow.com/q/68573578). Another solution in that SO
/// question is generating the implementation functions with a macro and calling them
/// from the trait, which could work but still means we have to write out all functions in the
/// PG implementation, SQLite implementation and the macros for both variants of the implementation
/// functions (since we can't build a function that's generic over any DB)
///
/// In any case, this means we have to remove compile-time query checking (even if we duplicate the code
/// completely), see https://github.com/launchbadge/sqlx/issues/121 and
/// https://github.com/launchbadge/sqlx/issues/916.
/// Queries that are different between SQLite and PG
//! Default implementation for a Repository that factors out common
//! query patterns / SQL queries between Postgres and SQLite.
//!
//! Usage:
//!
//! The struct has to have certain fields, since this macro relies on them:
//!
//! ```ignore
//! pub struct MyRepository {
//! pub executor: sqlx::Pool<sqlx::SqlxDatabaseType>
//! }
//!
//! impl MyRepository {
//! pub const MIGRATOR: sqlx::Migrator = sqlx::migrate!("my/migrations");
//! pub const QUERIES: RepositoryQueries = RepositoryQueries {
//! all_columns_in_database: "SELECT ...",
//! }
//! pub fn interpret_error(error: sqlx::Error) -> Error {
//! // Interpret the database-specific error code and turn some sqlx errors
//! // into the Error enum values like UniqueConstraintViolation/FKConstraintViolation
//! // ...
//! }
//! }
//!
//! implement_repository!(SqliteRepository)
//! ```
//!
//! Gigajank alert: why are we doing this? The code between PG and SQLite is extremely similar.
//! But, I couldn't find a better way to factor it out in order to reduce duplication.
//! Here's what I tried:
//!
//! - Use a generic `Pool<Any>`. This causes a weird borrow checker error when using a
//! `QueryBuilder` (https://github.com/launchbadge/sqlx/issues/1978)
//! - Make the implementation generic over any DB (that implements sqlx::Database). In that
//! case, we need to add a bunch of `where` clauses to the implementation giving constraints
//! on the argument, the query and the result types (see https://stackoverflow.com/a/70573732).
//! And, when we do that, we hit the borrow checker error again from #1.
//! - Add macros with default implementations for everything in the Repository trait and use them
//! instead of putting the whole implementation in a macro. This conflicts with the #[async_trait]
//! macro and breaks it (see https://stackoverflow.com/q/68573578). Another solution in that SO
//! question is generating the implementation functions with a macro and calling them
//! from the trait, which could work but still means we have to write out all functions in the
//! PG implementation, SQLite implementation and the macros for both variants of the implementation
//! functions (since we can't build a function that's generic over any DB)
//!
//! In any case, this means we have to remove compile-time query checking (even if we duplicate the code
//! completely), see https://github.com/launchbadge/sqlx/issues/121 and
//! https://github.com/launchbadge/sqlx/issues/916.
// Queries that are different between SQLite and PG
pub struct RepositoryQueries {
pub latest_table_versions: &'static str,
pub cast_timestamp: &'static str,
Expand Down
1 change: 0 additions & 1 deletion src/wasm_udf/data_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,6 @@ pub struct CreateFunctionDetails {
}

#[cfg(test)]

mod tests {
use super::*;

Expand Down
68 changes: 41 additions & 27 deletions tests/fixtures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,29 @@ pub fn schemas(include_file_without_store: bool) -> ListSchemaResponse {
})
}

let minio_options = HashMap::from([
(
AmazonS3ConfigKey::Endpoint.as_ref().to_string(),
"http://127.0.0.1:9000".to_string(),
),
(
AmazonS3ConfigKey::AccessKeyId.as_ref().to_string(),
"minioadmin".to_string(),
),
(
AmazonS3ConfigKey::SecretAccessKey.as_ref().to_string(),
"minioadmin".to_string(),
),
(
// This has been removed from the config enum, but it can
// still be picked up via `AmazonS3ConfigKey::from_str`
AmazonS3ConfigKey::Client(ClientConfigKey::AllowHttp)
.as_ref()
.to_string(),
"true".to_string(),
),
]);

ListSchemaResponse {
schemas: vec![
SchemaObject {
Expand All @@ -45,11 +68,18 @@ pub fn schemas(include_file_without_store: bool) -> ListSchemaResponse {
},
SchemaObject {
name: "s3".to_string(),
tables: vec![TableObject {
name: "minio".to_string(),
path: "test-data/delta-0.8.0-partitioned".to_string(),
store: Some("minio".to_string()),
}],
tables: vec![
TableObject {
name: "minio".to_string(),
path: "test-data/delta-0.8.0-partitioned".to_string(),
store: Some("minio".to_string()),
},
TableObject {
name: "minio_prefix".to_string(),
path: "delta-0.8.0-partitioned".to_string(),
store: Some("minio-prefix".to_string()),
},
],
},
SchemaObject {
name: "gcs".to_string(),
Expand All @@ -64,28 +94,12 @@ pub fn schemas(include_file_without_store: bool) -> ListSchemaResponse {
StorageLocation {
name: "minio".to_string(),
location: "s3://seafowl-test-bucket".to_string(),
options: HashMap::from([
(
AmazonS3ConfigKey::Endpoint.as_ref().to_string(),
"http://127.0.0.1:9000".to_string(),
),
(
AmazonS3ConfigKey::AccessKeyId.as_ref().to_string(),
"minioadmin".to_string(),
),
(
AmazonS3ConfigKey::SecretAccessKey.as_ref().to_string(),
"minioadmin".to_string(),
),
(
// This has been removed from the config enum, but it can
// still be picked up via `AmazonS3ConfigKey::from_str`
AmazonS3ConfigKey::Client(ClientConfigKey::AllowHttp)
.as_ref()
.to_string(),
"true".to_string(),
),
]),
options: minio_options.clone(),
},
StorageLocation {
name: "minio-prefix".to_string(),
location: "s3://seafowl-test-bucket/test-data".to_string(),
options: minio_options,
},
StorageLocation {
name: "fake-gcs".to_string(),
Expand Down
1 change: 1 addition & 0 deletions tests/flight/inline_metastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use crate::flight::*;
// Testing with properly sent inline metastore
#[case("local.file_with_store", TestServerType::InlineOnly, false)]
#[case("s3.minio", TestServerType::InlineOnly, false)]
#[case("s3.minio_prefix", TestServerType::InlineOnly, false)]
#[case("gcs.fake", TestServerType::InlineOnly, false)]
#[tokio::test]
async fn test_inline_query(
Expand Down
1 change: 1 addition & 0 deletions tests/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ impl TestSeafowl {
}

// Actual Seafowl target running in a separate process
#[allow(clippy::zombie_processes)]
#[fixture]
pub async fn test_seafowl() -> TestSeafowl {
// Pick free ports for the frontends
Expand Down

0 comments on commit d518e12

Please sign in to comment.