diff --git a/Cargo.lock b/Cargo.lock index 7f4118d9a7..4c121d6620 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2255,9 +2255,9 @@ checksum = "348108ab3fba42ec82ff6e9564fc4ca0247bdccdc68dd8af9764bbc79c3c8ffb" [[package]] name = "libsqlite3-sys" -version = "0.22.2" +version = "0.24.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "290b64917f8b0cb885d9de0f9959fe1f775d7fa12f1da2db9001c1c8ab60f89d" +checksum = "898745e570c7d0453cc1fbc4a701eb6c662ed54e8fec8b7d14be137ebeeb9d14" dependencies = [ "cc", "pkg-config", @@ -3296,9 +3296,9 @@ dependencies = [ [[package]] name = "r2d2_sqlite" -version = "0.18.0" +version = "0.20.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d24607049214c5e42d3df53ac1d8a23c34cc6a5eefe3122acb2c72174719959" +checksum = "6fdc8e4da70586127893be32b7adf21326a4c6b1aba907611edf467d13ffe895" dependencies = [ "r2d2", "rusqlite", @@ -3528,9 +3528,9 @@ dependencies = [ [[package]] name = "rusqlite" -version = "0.25.4" +version = "0.27.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c4b1eaf239b47034fb450ee9cdedd7d0226571689d8823030c4b6c2cb407152" +checksum = "85127183a999f7db96d1a976a309eebbfb6ea3b0b400ddd8340190129de6eb7a" dependencies = [ "bitflags", "chrono", diff --git a/connectorx/src/destinations/arrow/arrow_assoc.rs b/connectorx/src/destinations/arrow/arrow_assoc.rs index d37b0cf113..ff9630dff4 100644 --- a/connectorx/src/destinations/arrow/arrow_assoc.rs +++ b/connectorx/src/destinations/arrow/arrow_assoc.rs @@ -3,7 +3,7 @@ use crate::constants::SECONDS_IN_DAY; use arrow::array::{ ArrayBuilder, BooleanBuilder, Date32Builder, Date64Builder, Float32Builder, Float64Builder, Int32Builder, Int64Builder, LargeBinaryBuilder, StringBuilder, Time64NanosecondBuilder, - UInt32Builder, UInt64Builder, + TimestampNanosecondBuilder, UInt32Builder, UInt64Builder, }; use arrow::datatypes::Field; use arrow::datatypes::{DataType as ArrowDataType, TimeUnit}; @@ -140,34 +140,44 @@ impl ArrowAssoc for Option { } impl ArrowAssoc for DateTime { - type Builder = Float64Builder; + type Builder = TimestampNanosecondBuilder; - fn builder(_nrows: usize) -> Float64Builder { - unimplemented!() + fn builder(nrows: usize) -> Self::Builder { + TimestampNanosecondBuilder::with_capacity(nrows) } - fn append(_builder: &mut Self::Builder, _value: DateTime) -> Result<()> { - unimplemented!() + #[throws(ArrowDestinationError)] + fn append(builder: &mut Self::Builder, value: DateTime) { + builder.append_value(value.timestamp_nanos()) } - fn field(_header: &str) -> Field { - unimplemented!() + fn field(header: &str) -> Field { + Field::new( + header, + ArrowDataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".to_string())), + true, + ) } } impl ArrowAssoc for Option> { - type Builder = Float64Builder; + type Builder = TimestampNanosecondBuilder; - fn builder(_nrows: usize) -> Float64Builder { - unimplemented!() + fn builder(nrows: usize) -> Self::Builder { + TimestampNanosecondBuilder::with_capacity(nrows) } - fn append(_builder: &mut Self::Builder, _value: Option>) -> Result<()> { - unimplemented!() + #[throws(ArrowDestinationError)] + fn append(builder: &mut Self::Builder, value: Option>) { + builder.append_option(value.map(|x| x.timestamp_nanos())) } - fn field(_header: &str) -> Field { - unimplemented!() + fn field(header: &str) -> Field { + Field::new( + header, + ArrowDataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".to_string())), + false, + ) } } diff --git a/connectorx/src/transports/postgres_arrow.rs b/connectorx/src/transports/postgres_arrow.rs index fabee32d4f..cda6ca4fc5 100644 --- a/connectorx/src/transports/postgres_arrow.rs +++ b/connectorx/src/transports/postgres_arrow.rs @@ -13,6 +13,7 @@ use num_traits::ToPrimitive; use postgres::NoTls; use postgres_openssl::MakeTlsConnector; use rust_decimal::Decimal; +use serde_json::Value; use std::marker::PhantomData; use thiserror::Error; use uuid::Uuid; @@ -57,6 +58,8 @@ macro_rules! impl_postgres_transport { { UUID[Uuid] => LargeUtf8[String] | conversion option } { Char[&'r str] => LargeUtf8[String] | conversion none } { ByteA[Vec] => LargeBinary[Vec] | conversion auto } + { JSON[Value] => LargeUtf8[String] | conversion option } + { JSONB[Value] => LargeUtf8[String] | conversion none } } ); } @@ -83,3 +86,9 @@ impl TypeConversion for PostgresArrowTransport { .unwrap_or_else(|| panic!("cannot convert decimal {:?} to float64", val)) } } + +impl TypeConversion for PostgresArrowTransport { + fn convert(val: Value) -> String { + val.to_string() + } +}