From 8876dadf4e1e173e7d60a2805c4b9ebdd833e7ad Mon Sep 17 00:00:00 2001 From: jaydxn1 Date: Thu, 17 Oct 2024 19:08:07 +0800 Subject: [PATCH 1/3] added params for ca certs in mssql --- connectorx/src/sources/mssql/mod.rs | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/connectorx/src/sources/mssql/mod.rs b/connectorx/src/sources/mssql/mod.rs index 2a88f24ca2..69ce0ed6e0 100644 --- a/connectorx/src/sources/mssql/mod.rs +++ b/connectorx/src/sources/mssql/mod.rs @@ -84,6 +84,16 @@ pub fn mssql_config(url: &Url) -> Config { decode(url.password().unwrap_or(""))?.to_owned(), )); + match params.get("trust_server_certificate") { + Some(v) if v.to_lowercase() == "true" => config.trust_cert(), + _ => {}, + }; + + match params.get("trust_server_certificate_ca") { + Some(v) => config.trust_cert_ca(v), + _ => {}, + }; + match params.get("encrypt") { Some(v) if v.to_lowercase() == "true" => config.encryption(EncryptionLevel::Required), _ => config.encryption(EncryptionLevel::NotSupported), From 83b4d5032b5502c8a6601e33dc83fcd9ef586970 Mon Sep 17 00:00:00 2001 From: jaydxn1 Date: Fri, 18 Oct 2024 14:10:27 +0800 Subject: [PATCH 2/3] format & update docs for mssql params --- connectorx/src/sources/mssql/mod.rs | 4 ++-- docs/databases/mssql.md | 14 ++++++++++---- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/connectorx/src/sources/mssql/mod.rs b/connectorx/src/sources/mssql/mod.rs index 69ce0ed6e0..3346f097b8 100644 --- a/connectorx/src/sources/mssql/mod.rs +++ b/connectorx/src/sources/mssql/mod.rs @@ -86,12 +86,12 @@ pub fn mssql_config(url: &Url) -> Config { match params.get("trust_server_certificate") { Some(v) if v.to_lowercase() == "true" => config.trust_cert(), - _ => {}, + _ => {} }; match params.get("trust_server_certificate_ca") { Some(v) => config.trust_cert_ca(v), - _ => {}, + _ => {} }; match params.get("encrypt") { diff --git a/docs/databases/mssql.md b/docs/databases/mssql.md index f637f4ec81..fb197b247e 100644 --- a/docs/databases/mssql.md +++ b/docs/databases/mssql.md @@ -6,10 +6,6 @@ SQLServer does not need to specify protocol. ### MsSQL Connection ```{hint} -By adding `trusted_connection=true` to connection uri parameter, windows authentication will be enabled. Example: `mssql://host:port/db?trusted_connection=true` -By adding `encrypt=true` to connection uri parameter, SQLServer will use SSL encryption. Example: `mssql://host:port/db?encrypt=true&trusted_connection=true` -``` -```{hint} if the user password has special characters, they need to be sanitized. example: `from urllib import parse; password = parse.quote_plus(password)` ``` @@ -20,6 +16,16 @@ query = 'SELECT * FROM table' # query string cx.read_sql(conn, query) # read data from MsSQL ``` +### Connection Parameters +* By adding `trusted_connection=true` to connection uri parameter, windows authentication will be enabled. + * Example: `mssql://host:port/db?trusted_connection=true` +* By adding `encrypt=true` to connection uri parameter, SQLServer will use SSL encryption. + * Example: `mssql://host:port/db?encrypt=true&trusted_connection=true` +* By adding `trust_server_certificate=true` to connection uri parameter, the SQLServer certificate will not be validated and it is accepted as-is. + * Example: `mssql://host:port/db?trust_server_certificate=true&encrypt=true` +* By adding `trust_server_certificate_ca=/path/to/ca-cert.crt` to connection uri parameter, the SQLServer certificate will be validated against the given CA certificate in addition to the system-truststore. + * Example: `mssql://host:port/db?encrypt=true&trust_server_certificate_ca=/path/to/ca-cert.crt` + ### SQLServer-Pandas Type Mapping | SQLServer Type | Pandas Type | Comment | |:---------------:|:---------------------------:|:----------------------------------:| From e6b197dffb0da411ed1168e0e0e96303b834afbf Mon Sep 17 00:00:00 2001 From: jaydxn1 Date: Fri, 18 Oct 2024 22:45:19 +0800 Subject: [PATCH 3/3] bump tiberius & bb8-tiberius, replaced deprecated queryresult method w/ querystream in mssql impl --- Cargo.lock | 111 ++++------------------------ connectorx/Cargo.toml | 4 +- connectorx/src/sources/mssql/mod.rs | 34 +++++---- 3 files changed, 36 insertions(+), 113 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index bbc6c8d1fa..318f8fde33 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -384,17 +384,6 @@ dependencies = [ "zstd 0.12.4", ] -[[package]] -name = "async-channel" -version = "1.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "81953c529336010edd6d8e358f886d9581267795c61b19475b71314bffa46d35" -dependencies = [ - "concurrent-queue", - "event-listener", - "futures-core", -] - [[package]] name = "async-compression" version = "0.4.6" @@ -413,79 +402,29 @@ dependencies = [ "zstd-safe 7.0.0", ] -[[package]] -name = "async-lock" -version = "2.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "287272293e9d8c41773cec55e365490fe034813a2f172f502d6ddcf75b2f582b" -dependencies = [ - "event-listener", -] - [[package]] name = "async-native-tls" -version = "0.3.3" +version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9e9e7a929bd34c68a82d58a4de7f86fffdaf97fb2af850162a7bb19dd7269b33" +checksum = "d57d4cec3c647232e1094dc013546c0b33ce785d8aeb251e1f20dfaf8a9a13fe" dependencies = [ - "async-std", + "futures-util", "native-tls", "thiserror", "url", ] -[[package]] -name = "async-std" -version = "1.12.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "62565bb4402e926b29953c785397c6dc0391b7b446e45008b0049eb43cec6f5d" -dependencies = [ - "async-channel", - "async-lock", - "crossbeam-utils", - "futures-channel", - "futures-core", - "futures-io", - "memchr", - "once_cell", - "pin-project-lite", - "pin-utils", - "slab", - "wasm-bindgen-futures", -] - -[[package]] -name = "async-stream" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "22068c0c19514942eefcfd4daf8976ef1aad84e61539f95cd200c35202f80af5" -dependencies = [ - "async-stream-impl 0.2.1", - "futures-core", -] - [[package]] name = "async-stream" version = "0.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cd56dd203fef61ac097dd65721a419ddccb106b2d2b70ba60a6b529f03961a51" dependencies = [ - "async-stream-impl 0.3.5", + "async-stream-impl", "futures-core", "pin-project-lite", ] -[[package]] -name = "async-stream-impl" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "25f9db3b38af870bf7e5cc649167533b493928e50744e2c30ae350230b414670" -dependencies = [ - "proc-macro2", - "quote", - "syn 1.0.109", -] - [[package]] name = "async-stream-impl" version = "0.3.5" @@ -589,9 +528,9 @@ dependencies = [ [[package]] name = "bb8-tiberius" -version = "0.5.2" +version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "648d5365b34a2a362d5b8790d3c1b230d263d2377e563c76cb79c10d326b917e" +checksum = "d8a33c87124c1938413e45ab6a6655e49d9e4cd015e05db61d6ab5a4f96b2c83" dependencies = [ "async-trait", "bb8", @@ -950,15 +889,6 @@ dependencies = [ "unicode-width", ] -[[package]] -name = "concurrent-queue" -version = "2.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d16048cd947b08fa32c24458a22f5dc5e835264f689f4f5653210c69fd107363" -dependencies = [ - "crossbeam-utils", -] - [[package]] name = "connection-string" version = "0.1.14" @@ -1720,12 +1650,6 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b90ca2580b73ab6a1f724b76ca11ab632df820fd6040c336200d2c1df7b3c82c" -[[package]] -name = "event-listener" -version = "2.5.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" - [[package]] name = "fallible-iterator" version = "0.2.0" @@ -2004,7 +1928,7 @@ version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09ab5966c98f6d4e71e247cda6a6d8497bc8a1df3a4ba9ee548087842cffc21d" dependencies = [ - "async-stream 0.3.5", + "async-stream", "hyper", "hyper-rustls 0.23.2", "log", @@ -2238,7 +2162,7 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite", - "socket2 0.5.5", + "socket2 0.4.10", "tokio", "tower-service", "tracing", @@ -2737,12 +2661,6 @@ version = "0.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7e6bcd6433cff03a4bfc3d9834d504467db1f1cf6d0ea765d37d330249ed629d" -[[package]] -name = "md5" -version = "0.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "490cc448043f947bae3cbee9c203358d62dbee0db12107a74be5c30ccfd09771" - [[package]] name = "memchr" version = "2.7.1" @@ -4999,12 +4917,11 @@ dependencies = [ [[package]] name = "tiberius" -version = "0.5.16" +version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e08c782c165a53700c17e4b15a1f6facc21e40a6a80402c518e0f3a2c3fcedd4" +checksum = "833311bc8e26e96c73ad1b5c1f488c588808c747a318905ec67e43d422ea2c08" dependencies = [ "async-native-tls", - "async-stream 0.2.1", "async-trait", "asynchronous-codec", "byteorder", @@ -5024,6 +4941,8 @@ dependencies = [ "pretty-hex", "rust_decimal", "thiserror", + "tokio", + "tokio-util 0.6.10", "tracing", "uuid 0.8.2", "winauth", @@ -5380,10 +5299,6 @@ name = "uuid" version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bc5cf98d8186244414c848017f0e2676b3fcb46807f6668a97dfe67359a3c4b7" -dependencies = [ - "getrandom 0.2.12", - "md5 0.7.0", -] [[package]] name = "uuid" @@ -5598,7 +5513,7 @@ checksum = "8f820cd208ce9c6b050812dc2d724ba98c6c1e9db5ce9b3f58d925ae5723a5e6" dependencies = [ "bitflags 1.2.1", "byteorder", - "md5 0.6.1", + "md5", "rand 0.7.3", "winapi", ] diff --git a/connectorx/Cargo.toml b/connectorx/Cargo.toml index 434351386d..e6edd0cb78 100644 --- a/connectorx/Cargo.toml +++ b/connectorx/Cargo.toml @@ -25,7 +25,7 @@ chrono = "0.4" arrow = {workspace = true, optional = true} arrow2 = {workspace = true, default-features = false, optional = true} bb8 = {version = "0.7", optional = true} -bb8-tiberius = {version = "0.5", optional = true} +bb8-tiberius = {version = "0.8", optional = true} csv = {version = "1", optional = true} fallible-streaming-iterator = {version = "0.1", optional = true} futures = {version = "0.3", optional = true} @@ -50,7 +50,7 @@ regex = {version = "1", optional = true} rusqlite = {version = "0.30.0", features = ["column_decltype", "chrono", "bundled"], optional = true} rust_decimal = {version = "1", features = ["db-postgres"], optional = true} rust_decimal_macros = {version = "1", optional = true} -tiberius = {version = "0.5", features = ["rust_decimal", "chrono", "integrated-auth-gssapi"], optional = true} +tiberius = {version = "0.7.3", features = ["rust_decimal", "chrono", "integrated-auth-gssapi"], optional = true} tokio = {version = "1", features = ["rt", "rt-multi-thread", "net"], optional = true} tokio-util = {version = "0.6", optional = true} urlencoding = {version = "2.1", optional = true} diff --git a/connectorx/src/sources/mssql/mod.rs b/connectorx/src/sources/mssql/mod.rs index 3346f097b8..bf1fec4d71 100644 --- a/connectorx/src/sources/mssql/mod.rs +++ b/connectorx/src/sources/mssql/mod.rs @@ -26,7 +26,7 @@ use rust_decimal::Decimal; use sqlparser::dialect::MsSqlDialect; use std::collections::HashMap; use std::sync::Arc; -use tiberius::{AuthMethod, Config, EncryptionLevel, QueryResult, Row}; +use tiberius::{AuthMethod, Config, EncryptionLevel, QueryItem, QueryStream, Row}; use tokio::runtime::{Handle, Runtime}; use url::Url; use urlencoding::decode; @@ -157,11 +157,8 @@ where let mut conn = self.rt.block_on(self.pool.get())?; let first_query = &self.queries[0]; let (names, types) = match self.rt.block_on(conn.query(first_query.as_str(), &[])) { - Ok(stream) => { - let columns = stream.columns().ok_or_else(|| { - anyhow!("MsSQL failed to get the columns of query: {}", first_query) - })?; - columns + Ok(mut stream) => match self.rt.block_on(async { stream.columns().await }) { + Ok(Some(columns)) => columns .iter() .map(|col| { ( @@ -169,10 +166,18 @@ where MsSQLTypeSystem::from(&col.column_type()), ) }) - .unzip() - } + .unzip(), + Ok(None) => { + throw!(anyhow!( + "MsSQL returned no columns for query: {}", + first_query + )); + } + Err(e) => { + throw!(anyhow!("Error fetching columns: {}", e)); + } + }, Err(e) => { - // tried the last query but still get an error debug!( "cannot get metadata for '{}', try next query: {}", first_query, e @@ -279,7 +284,7 @@ impl SourcePartition for MsSQLSourcePartition { #[throws(MsSQLSourceError)] fn parser<'a>(&'a mut self) -> Self::Parser<'a> { let conn = self.rt.block_on(self.pool.get())?; - let rows: OwningHandle>, DummyBox>> = + let rows: OwningHandle>, DummyBox>> = OwningHandle::new_with_fn(Box::new(conn), |conn: *const Conn<'a>| unsafe { let conn = &mut *(conn as *mut Conn<'a>); @@ -304,7 +309,7 @@ impl SourcePartition for MsSQLSourcePartition { pub struct MsSQLSourceParser<'a> { rt: &'a Handle, - iter: OwningHandle>, DummyBox>>, + iter: OwningHandle>, DummyBox>>, rowbuf: Vec, ncols: usize, current_col: usize, @@ -315,7 +320,7 @@ pub struct MsSQLSourceParser<'a> { impl<'a> MsSQLSourceParser<'a> { fn new( rt: &'a Handle, - iter: OwningHandle>, DummyBox>>, + iter: OwningHandle>, DummyBox>>, schema: &[MsSQLTypeSystem], ) -> Self { Self { @@ -358,7 +363,10 @@ impl<'a> PartitionParser<'a> for MsSQLSourceParser<'a> { for _ in 0..DB_BUFFER_SIZE { if let Some(item) = self.rt.block_on(self.iter.next()) { - self.rowbuf.push(item?); + match item.map_err(MsSQLSourceError::MsSQLError)? { + QueryItem::Row(row) => self.rowbuf.push(row), + _ => continue, + } } else { self.is_finished = true; break;