diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index aed9e603e..02905ca0e 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -137,7 +137,7 @@ jobs: container: ubuntu:20.04 strategy: matrix: - python-version: [[39, "3.9"], [310, "3.10"], [311, "3.11"], [312, "3.12"]] + python-version: ["3.9", "3.10", "3.11", "3.12"] services: # Label used to access the service container postgres: @@ -182,16 +182,16 @@ jobs: - uses: actions/checkout@v4 - name: Set python version - run: | - echo "/opt/python/cp${{ matrix.python-version[0] }}-cp${{ matrix.python-version[0] }}/bin" >> $GITHUB_PATH + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} - name: Install tools run: | apt-get update - apt-get install -y curl postgresql-client build-essential python3-dev python3-pip pkg-config libssl-dev git sqlite3 libsqlite3-dev mysql-client libmysqlclient-dev python3 python3-pip libicu66 libkrb5-dev libclang-dev + apt-get install -y curl postgresql-client build-essential pkg-config libssl-dev git sqlite3 libsqlite3-dev mysql-client libmysqlclient-dev libicu66 libkrb5-dev libclang-dev pip3 install mssql-cli pip3 install cli-helpers==2.2.0 - ln -s /usr/bin/python3 /usr/bin/python env: DEBIAN_FRONTEND: noninteractive diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 5b9181f10..33b8b7cf0 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -12,7 +12,7 @@ jobs: container: quay.io/pypa/manylinux_2_28_x86_64 strategy: matrix: - python-version: [[38, "3.8"], [39, "3.9"], [310, "3.10"], [311, "3.11"], [312, "3.12"]] + python-version: [[39, "3.9"], [310, "3.10"], [311, "3.11"], [312, "3.12"]] steps: - uses: actions/checkout@v4 @@ -162,7 +162,7 @@ jobs: strategy: matrix: os: ["windows-latest", "macos-latest"] - python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"] + python-version: ["3.9", "3.10", "3.11", "3.12"] include: - os: "macos-latest" features: "--features integrated-auth-gssapi" @@ -226,7 +226,7 @@ jobs: runs-on: macos-latest strategy: matrix: - python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"] + python-version: ["3.9", "3.10", "3.11", "3.12"] steps: - uses: actions/checkout@v4 @@ -290,7 +290,7 @@ jobs: needs: [win-and-mac, linux, apple-arm] strategy: matrix: - python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"] + python-version: ["3.9", "3.10", "3.11", "3.12"] os: [macos-latest, ubuntu-latest, windows-latest] steps: - uses: actions/checkout@v4 diff --git a/connectorx/src/sources/bigquery/mod.rs b/connectorx/src/sources/bigquery/mod.rs index d7bbe3d52..8140e4f0a 100644 --- a/connectorx/src/sources/bigquery/mod.rs +++ b/connectorx/src/sources/bigquery/mod.rs @@ -1136,7 +1136,8 @@ impl<'r, 'a> Produce<'r, Option>> for BigQuerySourceParser { * 1e9) as i64; let secs = timestamp_ns / 1000000000; let nsecs = (timestamp_ns % 1000000000) as u32; - NaiveDateTime::from_timestamp_opt(secs, nsecs).map(|ndt| DateTime::::from_naive_utc_and_offset(ndt, Utc)) + NaiveDateTime::from_timestamp_opt(secs, nsecs) + .map(|ndt| DateTime::::from_naive_utc_and_offset(ndt, Utc)) } } } diff --git a/connectorx/src/sources/oracle/mod.rs b/connectorx/src/sources/oracle/mod.rs index 07d4c3337..50b58e933 100644 --- a/connectorx/src/sources/oracle/mod.rs +++ b/connectorx/src/sources/oracle/mod.rs @@ -38,10 +38,7 @@ impl Dialect for OracleDialect { } fn is_identifier_part(&self, ch: char) -> bool { - ch.is_ascii_lowercase() - || ch.is_ascii_uppercase() - || ch.is_ascii_digit() - || ch == '_' + ch.is_ascii_lowercase() || ch.is_ascii_uppercase() || ch.is_ascii_digit() || ch == '_' } } diff --git a/connectorx/src/sources/postgres/mod.rs b/connectorx/src/sources/postgres/mod.rs index 0799d17af..0cee5d5f0 100644 --- a/connectorx/src/sources/postgres/mod.rs +++ b/connectorx/src/sources/postgres/mod.rs @@ -165,10 +165,7 @@ where .unzip(); self.names = names; - self.schema = pg_types - .iter() - .map(PostgresTypeSystem::from) - .collect(); + self.schema = pg_types.iter().map(PostgresTypeSystem::from).collect(); self.pg_schema = self .schema .iter() @@ -512,7 +509,7 @@ impl<'r, 'a> Produce<'r, Option> for PostgresBinarySourcePartitio Some(postgres::types::Timestamp::PosInfinity) => Some(NaiveDateTime::MAX), Some(postgres::types::Timestamp::NegInfinity) => Some(NaiveDateTime::MIN), Some(postgres::types::Timestamp::Value(t)) => t, - None => None + None => None, } } } @@ -545,7 +542,7 @@ impl<'r, 'a> Produce<'r, Option>> for PostgresBinarySourcePartitio Some(postgres::types::Timestamp::PosInfinity) => Some(DateTime::::MAX_UTC), Some(postgres::types::Timestamp::NegInfinity) => Some(DateTime::::MIN_UTC), Some(postgres::types::Timestamp::Value(t)) => t, - None => None + None => None, } } } @@ -578,7 +575,7 @@ impl<'r, 'a> Produce<'r, Option> for PostgresBinarySourcePartitionPar Some(postgres::types::Date::PosInfinity) => Some(NaiveDate::MAX), Some(postgres::types::Date::NegInfinity) => Some(NaiveDate::MIN), Some(postgres::types::Date::Value(t)) => t, - None => None + None => None, } } } @@ -853,7 +850,6 @@ impl<'r, 'a> Produce<'r, Option>> for PostgresCSVSourceParser<'a> { } } - impl<'r, 'a> Produce<'r, Decimal> for PostgresCSVSourceParser<'a> { type Error = PostgresSourceError; @@ -863,14 +859,13 @@ impl<'r, 'a> Produce<'r, Decimal> for PostgresCSVSourceParser<'a> { match &self.rowbuf[ridx][cidx][..] { "Infinity" => Decimal::MAX, "-Infinity" => Decimal::MIN, - v => v.parse().map_err(|_| { - ConnectorXError::cannot_produce::(Some(v.into())) - })? + v => v + .parse() + .map_err(|_| ConnectorXError::cannot_produce::(Some(v.into())))?, } } } - impl<'r, 'a> Produce<'r, Option> for PostgresCSVSourceParser<'a> { type Error = PostgresSourceError; @@ -881,14 +876,14 @@ impl<'r, 'a> Produce<'r, Option> for PostgresCSVSourceParser<'a> { "" => None, "Infinity" => Some(Decimal::MAX), "-Infinity" => Some(Decimal::MIN), - v => Some(v.parse().map_err(|_| { - ConnectorXError::cannot_produce::(Some(v.into())) - })?), + v => Some( + v.parse() + .map_err(|_| ConnectorXError::cannot_produce::(Some(v.into())))?, + ), } } } - impl<'r, 'a> Produce<'r, DateTime> for PostgresCSVSourceParser<'a> { type Error = PostgresSourceError; @@ -899,9 +894,9 @@ impl<'r, 'a> Produce<'r, DateTime> for PostgresCSVSourceParser<'a> { "infinity" => DateTime::::MAX_UTC, "-infinity" => DateTime::::MIN_UTC, // postgres csv return example: 1970-01-01 00:00:01+00 - v => format!("{}:00", v).parse().map_err(|_| { - ConnectorXError::cannot_produce::>(Some(v.into())) - })? + v => format!("{}:00", v) + .parse() + .map_err(|_| ConnectorXError::cannot_produce::>(Some(v.into())))?, } } } @@ -935,9 +930,8 @@ impl<'r, 'a> Produce<'r, NaiveDate> for PostgresCSVSourceParser<'a> { match &self.rowbuf[ridx][cidx][..] { "infinity" => NaiveDate::MAX, "-infinity" => NaiveDate::MIN, - v => NaiveDate::parse_from_str(v, "%Y-%m-%d").map_err(|_| { - ConnectorXError::cannot_produce::(Some(v.into())) - })? + v => NaiveDate::parse_from_str(v, "%Y-%m-%d") + .map_err(|_| ConnectorXError::cannot_produce::(Some(v.into())))?, } } } @@ -969,13 +963,8 @@ impl<'r, 'a> Produce<'r, NaiveDateTime> for PostgresCSVSourceParser<'a> { match &self.rowbuf[ridx][cidx] { "infinity" => NaiveDateTime::MAX, "-infinity" => NaiveDateTime::MIN, - v => NaiveDateTime::parse_from_str(v, "%Y-%m-%d %H:%M:%S").map_err( - |_| { - ConnectorXError::cannot_produce::(Some( - v.into(), - )) - }, - )? + v => NaiveDateTime::parse_from_str(v, "%Y-%m-%d %H:%M:%S") + .map_err(|_| ConnectorXError::cannot_produce::(Some(v.into())))?, } } } @@ -1245,9 +1234,8 @@ impl<'r, 'a> Produce<'r, Option>> for PostgresRawSourceParser<'a> Some(postgres::types::Timestamp::PosInfinity) => Some(DateTime::::MAX_UTC), Some(postgres::types::Timestamp::NegInfinity) => Some(DateTime::::MIN_UTC), Some(postgres::types::Timestamp::Value(t)) => t, - None => None + None => None, } - } } @@ -1279,9 +1267,8 @@ impl<'r, 'a> Produce<'r, Option> for PostgresRawSourceParser<'a> Some(postgres::types::Timestamp::PosInfinity) => Some(NaiveDateTime::MAX), Some(postgres::types::Timestamp::NegInfinity) => Some(NaiveDateTime::MIN), Some(postgres::types::Timestamp::Value(t)) => t, - None => None + None => None, } - } } @@ -1315,7 +1302,6 @@ impl<'r, 'a> Produce<'r, Option> for PostgresRawSourceParser<'a> { Some(postgres::types::Date::Value(t)) => t, None => None, } - } } @@ -1483,9 +1469,7 @@ impl<'r> Produce<'r, Decimal> for PostgresSimpleSourceParser { Some(s) => s .parse() .map_err(|_| ConnectorXError::cannot_produce::(Some(s.into())))?, - None => throw!(anyhow!( - "Cannot parse NULL in NOT NULL column." - )), + None => throw!(anyhow!("Cannot parse NULL in NOT NULL column.")), }, SimpleQueryMessage::CommandComplete(c) => { panic!("get command: {}", c); @@ -1787,9 +1771,10 @@ impl<'r> Produce<'r, NaiveDate> for PostgresSimpleSourceParser { Some(s) => match s { "infinity" => NaiveDate::MAX, "-infinity" => NaiveDate::MIN, - s => NaiveDate::parse_from_str(s, "%Y-%m-%d") - .map_err(|_| ConnectorXError::cannot_produce::(Some(s.into())))?, - } + s => NaiveDate::parse_from_str(s, "%Y-%m-%d").map_err(|_| { + ConnectorXError::cannot_produce::(Some(s.into())) + })?, + }, None => throw!(anyhow!("Cannot parse NULL in non-NULL column.")), }, SimpleQueryMessage::CommandComplete(c) => { diff --git a/connectorx/src/transports/trino_arrow.rs b/connectorx/src/transports/trino_arrow.rs index d498fb615..61c4e67c8 100644 --- a/connectorx/src/transports/trino_arrow.rs +++ b/connectorx/src/transports/trino_arrow.rs @@ -1,9 +1,7 @@ //! Transport from Trino Source to Arrow Destination. use crate::{ - destinations::arrow::{ - typesystem::ArrowTypeSystem, ArrowDestination, ArrowDestinationError, - }, + destinations::arrow::{typesystem::ArrowTypeSystem, ArrowDestination, ArrowDestinationError}, impl_transport, sources::trino::{TrinoSource, TrinoSourceError, TrinoTypeSystem}, typesystem::TypeConversion, diff --git a/connectorx/tests/test_postgres.rs b/connectorx/tests/test_postgres.rs index 6ece56f65..b0cab5c31 100644 --- a/connectorx/tests/test_postgres.rs +++ b/connectorx/tests/test_postgres.rs @@ -1,18 +1,21 @@ -use chrono::{NaiveDate, NaiveDateTime, DateTime, Utc}; -use rust_decimal::Decimal; use arrow::{ array::{BooleanArray, Float64Array, Int64Array, StringArray}, record_batch::RecordBatch, }; +use chrono::{DateTime, NaiveDate, NaiveDateTime, Utc}; use connectorx::{ destinations::arrow::ArrowDestination, prelude::*, - sources::postgres::{rewrite_tls_args, BinaryProtocol, CSVProtocol, PostgresSource, SimpleProtocol, CursorProtocol}, + sources::postgres::{ + rewrite_tls_args, BinaryProtocol, CSVProtocol, CursorProtocol, PostgresSource, + SimpleProtocol, + }, sources::PartitionParser, sql::CXQuery, transports::PostgresArrowTransport, }; use postgres::NoTls; +use rust_decimal::Decimal; use std::env; use url::Url; @@ -153,17 +156,22 @@ fn test_postgres() { #[test] fn test_csv_infinite_values_binary_proto_option() { - let _ = env_logger::builder().is_test(true).try_init(); let dburl = env::var("POSTGRES_URL").unwrap(); #[derive(Debug, PartialEq)] - struct Row(Option, Option, Option>); + struct Row( + Option, + Option, + Option>, + ); let url = Url::parse(dburl.as_str()).unwrap(); let (config, _tls) = rewrite_tls_args(&url).unwrap(); let mut source = PostgresSource::::new(config, NoTls, 1).unwrap(); - source.set_queries(&[CXQuery::naked("SELECT test_date, test_timestamp, test_timestamp_timezone FROM test_infinite_values")]); + source.set_queries(&[CXQuery::naked( + "SELECT test_date, test_timestamp, test_timestamp_timezone FROM test_infinite_values", + )]); source.fetch_metadata().unwrap(); let mut partitions = source.partition().unwrap(); @@ -192,8 +200,16 @@ fn test_csv_infinite_values_binary_proto_option() { } assert_eq!( vec![ - Row(Some(NaiveDate::MAX), Some(NaiveDateTime::MAX), Some(DateTime::::MAX_UTC)), - Row(Some(NaiveDate::MIN), Some(NaiveDateTime::MIN), Some(DateTime::::MIN_UTC)), + Row( + Some(NaiveDate::MAX), + Some(NaiveDateTime::MAX), + Some(DateTime::::MAX_UTC) + ), + Row( + Some(NaiveDate::MIN), + Some(NaiveDateTime::MIN), + Some(DateTime::::MIN_UTC) + ), Row(None, None, None), ], rows @@ -202,17 +218,22 @@ fn test_csv_infinite_values_binary_proto_option() { #[test] fn test_infinite_values_cursor_proto_option() { - let _ = env_logger::builder().is_test(true).try_init(); let dburl = env::var("POSTGRES_URL").unwrap(); #[derive(Debug, PartialEq)] - struct Row(Option, Option, Option>); + struct Row( + Option, + Option, + Option>, + ); let url = Url::parse(dburl.as_str()).unwrap(); let (config, _tls) = rewrite_tls_args(&url).unwrap(); let mut source = PostgresSource::::new(config, NoTls, 1).unwrap(); - source.set_queries(&[CXQuery::naked("SELECT test_date, test_timestamp, test_timestamp_timezone FROM test_infinite_values")]); + source.set_queries(&[CXQuery::naked( + "SELECT test_date, test_timestamp, test_timestamp_timezone FROM test_infinite_values", + )]); source.fetch_metadata().unwrap(); let mut partitions = source.partition().unwrap(); @@ -241,8 +262,16 @@ fn test_infinite_values_cursor_proto_option() { } assert_eq!( vec![ - Row(Some(NaiveDate::MAX), Some(NaiveDateTime::MAX), Some(DateTime::::MAX_UTC)), - Row(Some(NaiveDate::MIN), Some(NaiveDateTime::MIN), Some(DateTime::::MIN_UTC)), + Row( + Some(NaiveDate::MAX), + Some(NaiveDateTime::MAX), + Some(DateTime::::MAX_UTC) + ), + Row( + Some(NaiveDate::MIN), + Some(NaiveDateTime::MIN), + Some(DateTime::::MIN_UTC) + ), Row(None, None, None), ], rows @@ -251,7 +280,6 @@ fn test_infinite_values_cursor_proto_option() { #[test] fn test_csv_infinite_values_cursor_proto() { - let _ = env_logger::builder().is_test(true).try_init(); let dburl = env::var("POSTGRES_URL").unwrap(); @@ -299,7 +327,6 @@ fn test_csv_infinite_values_cursor_proto() { #[test] fn test_csv_infinite_values_simple_proto() { - let _ = env_logger::builder().is_test(true).try_init(); let dburl = env::var("POSTGRES_URL").unwrap(); @@ -309,7 +336,9 @@ fn test_csv_infinite_values_simple_proto() { let url = Url::parse(dburl.as_str()).unwrap(); let (config, _tls) = rewrite_tls_args(&url).unwrap(); let mut source = PostgresSource::::new(config, NoTls, 1).unwrap(); - source.set_queries(&[CXQuery::naked("select * from test_infinite_values WHERE test_date IS NOT NULL")]); + source.set_queries(&[CXQuery::naked( + "select * from test_infinite_values WHERE test_date IS NOT NULL", + )]); source.fetch_metadata().unwrap(); let mut partitions = source.partition().unwrap(); @@ -340,8 +369,20 @@ fn test_csv_infinite_values_simple_proto() { } assert_eq!( vec![ - Row(1, NaiveDate::MAX, NaiveDateTime::MAX, Decimal::MAX, DateTime::::MAX_UTC), - Row(2, NaiveDate::MIN, NaiveDateTime::MIN, Decimal::MIN, DateTime::::MIN_UTC), + Row( + 1, + NaiveDate::MAX, + NaiveDateTime::MAX, + Decimal::MAX, + DateTime::::MAX_UTC + ), + Row( + 2, + NaiveDate::MIN, + NaiveDateTime::MIN, + Decimal::MIN, + DateTime::::MIN_UTC + ), ], rows ); @@ -349,12 +390,17 @@ fn test_csv_infinite_values_simple_proto() { #[test] fn test_csv_infinite_values_simple_proto_option() { - let _ = env_logger::builder().is_test(true).try_init(); let dburl = env::var("POSTGRES_URL").unwrap(); #[derive(Debug, PartialEq)] - struct Row(i32, Option, Option, Option, Option>); + struct Row( + i32, + Option, + Option, + Option, + Option>, + ); let url = Url::parse(dburl.as_str()).unwrap(); let (config, _tls) = rewrite_tls_args(&url).unwrap(); @@ -390,9 +436,21 @@ fn test_csv_infinite_values_simple_proto_option() { } assert_eq!( vec![ - Row(1, Some(NaiveDate::MAX), Some(NaiveDateTime::MAX), Some(Decimal::MAX), Some(DateTime::::MAX_UTC)), - Row(2, Some(NaiveDate::MIN), Some(NaiveDateTime::MIN), Some(Decimal::MIN), Some(DateTime::::MIN_UTC)), - Row(3, None, None, None, None, ) + Row( + 1, + Some(NaiveDate::MAX), + Some(NaiveDateTime::MAX), + Some(Decimal::MAX), + Some(DateTime::::MAX_UTC) + ), + Row( + 2, + Some(NaiveDate::MIN), + Some(NaiveDateTime::MIN), + Some(Decimal::MIN), + Some(DateTime::::MIN_UTC) + ), + Row(3, None, None, None, None,) ], rows ); @@ -400,7 +458,6 @@ fn test_csv_infinite_values_simple_proto_option() { #[test] fn test_csv_infinite_values() { - let _ = env_logger::builder().is_test(true).try_init(); let dburl = env::var("POSTGRES_URL").unwrap(); @@ -410,7 +467,9 @@ fn test_csv_infinite_values() { let url = Url::parse(dburl.as_str()).unwrap(); let (config, _tls) = rewrite_tls_args(&url).unwrap(); let mut source = PostgresSource::::new(config, NoTls, 1).unwrap(); - source.set_queries(&[CXQuery::naked("select * from test_infinite_values WHERE test_date IS NOT NULL")]); + source.set_queries(&[CXQuery::naked( + "select * from test_infinite_values WHERE test_date IS NOT NULL", + )]); source.fetch_metadata().unwrap(); let mut partitions = source.partition().unwrap(); @@ -441,8 +500,20 @@ fn test_csv_infinite_values() { } assert_eq!( vec![ - Row(1, NaiveDate::MAX, NaiveDateTime::MAX, Decimal::MAX, DateTime::::MAX_UTC), - Row(2, NaiveDate::MIN, NaiveDateTime::MIN, Decimal::MIN, DateTime::::MIN_UTC), + Row( + 1, + NaiveDate::MAX, + NaiveDateTime::MAX, + Decimal::MAX, + DateTime::::MAX_UTC + ), + Row( + 2, + NaiveDate::MIN, + NaiveDateTime::MIN, + Decimal::MIN, + DateTime::::MIN_UTC + ), ], rows ); @@ -450,12 +521,17 @@ fn test_csv_infinite_values() { #[test] fn test_csv_infinite_values_option() { - let _ = env_logger::builder().is_test(true).try_init(); let dburl = env::var("POSTGRES_URL").unwrap(); #[derive(Debug, PartialEq)] - struct Row(i32, Option, Option, Option, Option>); + struct Row( + i32, + Option, + Option, + Option, + Option>, + ); let url = Url::parse(dburl.as_str()).unwrap(); let (config, _tls) = rewrite_tls_args(&url).unwrap(); @@ -491,15 +567,26 @@ fn test_csv_infinite_values_option() { } assert_eq!( vec![ - Row(1, Some(NaiveDate::MAX), Some(NaiveDateTime::MAX), Some(Decimal::MAX), Some(DateTime::::MAX_UTC)), - Row(2, Some(NaiveDate::MIN), Some(NaiveDateTime::MIN), Some(Decimal::MIN), Some(DateTime::::MIN_UTC)), - Row(3, None, None, None, None, ) + Row( + 1, + Some(NaiveDate::MAX), + Some(NaiveDateTime::MAX), + Some(Decimal::MAX), + Some(DateTime::::MAX_UTC) + ), + Row( + 2, + Some(NaiveDate::MIN), + Some(NaiveDateTime::MIN), + Some(Decimal::MIN), + Some(DateTime::::MIN_UTC) + ), + Row(3, None, None, None, None,) ], rows ); } - #[test] fn test_postgres_csv() { let _ = env_logger::builder().is_test(true).try_init();