From 0065765af8b4ee9533776e9b2f1ad919a58be94b Mon Sep 17 00:00:00 2001 From: Xiaoying Wang Date: Tue, 11 Jun 2024 20:17:01 -0400 Subject: [PATCH] adding microsecond timestamp type (postgres->arrow2), related to #644 #634 --- .gitignore | 2 +- .../src/destinations/arrow2/arrow_assoc.rs | 135 ++++++++++++++++++ .../src/destinations/arrow2/typesystem.rs | 57 +++++--- connectorx/src/transports/postgres_arrow2.rs | 31 +++- 4 files changed, 199 insertions(+), 26 deletions(-) diff --git a/.gitignore b/.gitignore index 34023c943..40fb375ec 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,5 @@ *.swp - +.DS_Store **/target .vscode connectorx-python/connectorx/*.so diff --git a/connectorx/src/destinations/arrow2/arrow_assoc.rs b/connectorx/src/destinations/arrow2/arrow_assoc.rs index e9b5e76a7..22b9b27df 100644 --- a/connectorx/src/destinations/arrow2/arrow_assoc.rs +++ b/connectorx/src/destinations/arrow2/arrow_assoc.rs @@ -1,3 +1,4 @@ +use super::typesystem::{DateTimeWrapperMicro, NaiveDateTimeWrapperMicro, NaiveTimeWrapperMicro}; use arrow2::{ array::*, datatypes::{DataType as ArrowDataType, Field, TimeUnit}, @@ -250,6 +251,54 @@ impl ArrowAssoc for Option> { } } +impl ArrowAssoc for DateTimeWrapperMicro { + type Builder = MutablePrimitiveArray; + + fn builder(nrows: usize) -> Self::Builder { + MutablePrimitiveArray::with_capacity(nrows).to(ArrowDataType::Timestamp( + TimeUnit::Microsecond, + Some("UTC".to_string()), + )) + } + + #[inline] + fn push(builder: &mut Self::Builder, value: DateTimeWrapperMicro) { + builder.push(Some(value).map(|x| x.0.timestamp_micros())); + } + + fn field(header: &str) -> Field { + Field::new( + header, + ArrowDataType::Timestamp(TimeUnit::Microsecond, Some("UTC".to_string())), + true, + ) + } +} + +impl ArrowAssoc for Option { + type Builder = MutablePrimitiveArray; + + fn builder(nrows: usize) -> Self::Builder { + MutablePrimitiveArray::with_capacity(nrows).to(ArrowDataType::Timestamp( + TimeUnit::Microsecond, + Some("UTC".to_string()), + )) + } + + #[inline] + fn push(builder: &mut Self::Builder, value: Option) { + builder.push(value.map(|x| x.0.timestamp_micros())); + } + + fn field(header: &str) -> Field { + Field::new( + header, + ArrowDataType::Timestamp(TimeUnit::Microsecond, Some("UTC".to_string())), + false, + ) + } +} + fn naive_date_to_date32(nd: NaiveDate) -> i32 { match nd.and_hms_opt(0, 0, 0) { Some(dt) => (dt.and_utc().timestamp() / SECONDS_IN_DAY) as i32, @@ -257,6 +306,10 @@ fn naive_date_to_date32(nd: NaiveDate) -> i32 { } } +fn naive_time_to_time64_micros(nd: NaiveTime) -> i64 { + nd.num_seconds_from_midnight() as i64 * 1_000_000 + (nd.nanosecond() as i64 / 1000) +} + fn naive_time_to_time64_nanos(nd: NaiveTime) -> i64 { nd.num_seconds_from_midnight() as i64 * 1_000_000_000 + nd.nanosecond() as i64 } @@ -295,6 +348,53 @@ impl ArrowAssoc for NaiveDate { } } +impl ArrowAssoc for Option { + type Builder = MutablePrimitiveArray; + + fn builder(nrows: usize) -> Self::Builder { + // naive => None + MutablePrimitiveArray::with_capacity(nrows) + .to(ArrowDataType::Timestamp(TimeUnit::Microsecond, None)) + } + + #[inline] + fn push(builder: &mut Self::Builder, value: Option) { + builder.push(value.map(|x| x.0.and_utc().timestamp_micros())); + } + + fn field(header: &str) -> Field { + // naive => None + Field::new( + header, + ArrowDataType::Timestamp(TimeUnit::Microsecond, None), + true, + ) + } +} + +impl ArrowAssoc for NaiveDateTimeWrapperMicro { + type Builder = MutablePrimitiveArray; + + fn builder(nrows: usize) -> Self::Builder { + // naive => None + MutablePrimitiveArray::with_capacity(nrows) + .to(ArrowDataType::Timestamp(TimeUnit::Microsecond, None)) + } + + fn push(builder: &mut Self::Builder, value: NaiveDateTimeWrapperMicro) { + builder.push(Some(value).map(|x| x.0.and_utc().timestamp_micros())); + } + + fn field(header: &str) -> Field { + // naive => None + Field::new( + header, + ArrowDataType::Timestamp(TimeUnit::Microsecond, None), + true, + ) + } +} + impl ArrowAssoc for Option { type Builder = MutablePrimitiveArray; @@ -350,6 +450,41 @@ impl ArrowAssoc for NaiveDateTime { } } +impl ArrowAssoc for Option { + type Builder = MutablePrimitiveArray; + + fn builder(nrows: usize) -> Self::Builder { + MutablePrimitiveArray::with_capacity(nrows).to(ArrowDataType::Time64(TimeUnit::Microsecond)) + } + + fn push(builder: &mut Self::Builder, value: Option) { + builder.push(match value { + Some(val) => Some(naive_time_to_time64_micros(val.0)), + None => None, + }); + } + + fn field(header: &str) -> Field { + Field::new(header, ArrowDataType::Time64(TimeUnit::Microsecond), true) + } +} + +impl ArrowAssoc for NaiveTimeWrapperMicro { + type Builder = MutablePrimitiveArray; + + fn builder(nrows: usize) -> Self::Builder { + MutablePrimitiveArray::with_capacity(nrows).to(ArrowDataType::Time64(TimeUnit::Microsecond)) + } + + fn push(builder: &mut Self::Builder, value: NaiveTimeWrapperMicro) { + builder.push(Some(value.0).map(naive_time_to_time64_nanos)); + } + + fn field(header: &str) -> Field { + Field::new(header, ArrowDataType::Time64(TimeUnit::Microsecond), false) + } +} + impl ArrowAssoc for Option { type Builder = MutablePrimitiveArray; diff --git a/connectorx/src/destinations/arrow2/typesystem.rs b/connectorx/src/destinations/arrow2/typesystem.rs index ffb222bce..5aa4a6084 100644 --- a/connectorx/src/destinations/arrow2/typesystem.rs +++ b/connectorx/src/destinations/arrow2/typesystem.rs @@ -1,6 +1,15 @@ use crate::impl_typesystem; use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, Utc}; +#[derive(Debug, Clone, Copy)] +pub struct DateTimeWrapperMicro(pub DateTime); + +#[derive(Debug, Clone, Copy)] +pub struct NaiveTimeWrapperMicro(pub NaiveTime); + +#[derive(Debug, Clone, Copy)] +pub struct NaiveDateTimeWrapperMicro(pub NaiveDateTime); + #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)] pub enum Arrow2TypeSystem { Int32(bool), @@ -14,8 +23,11 @@ pub enum Arrow2TypeSystem { LargeBinary(bool), Date32(bool), Date64(bool), + Date64Micro(bool), Time64(bool), + Time64Micro(bool), DateTimeTz(bool), + DateTimeTzMicro(bool), BoolArray(bool), Int32Array(bool), Int64Array(bool), @@ -29,26 +41,29 @@ pub enum Arrow2TypeSystem { impl_typesystem! { system = Arrow2TypeSystem, mappings = { - { Int32 => i32 } - { Int64 => i64 } - { UInt32 => u32 } - { UInt64 => u64 } - { Float64 => f64 } - { Float32 => f32 } - { Boolean => bool } - { LargeUtf8 => String } - { LargeBinary => Vec } - { Date32 => NaiveDate } - { Date64 => NaiveDateTime } - { Time64 => NaiveTime } - { DateTimeTz => DateTime } - { BoolArray => Vec } - { Int32Array => Vec } - { Int64Array => Vec } - { UInt32Array => Vec } - { UInt64Array => Vec } - { Float32Array => Vec } - { Float64Array => Vec } - { Utf8Array => Vec } + { Int32 => i32 } + { Int64 => i64 } + { UInt32 => u32 } + { UInt64 => u64 } + { Float64 => f64 } + { Float32 => f32 } + { Boolean => bool } + { LargeUtf8 => String } + { LargeBinary => Vec } + { Date32 => NaiveDate } + { Date64 => NaiveDateTime } + { Date64Micro => NaiveDateTimeWrapperMicro } + { Time64 => NaiveTime } + { Time64Micro => NaiveTimeWrapperMicro } + { DateTimeTz => DateTime } + { DateTimeTzMicro => DateTimeWrapperMicro } + { BoolArray => Vec } + { Int32Array => Vec } + { Int64Array => Vec } + { UInt32Array => Vec } + { UInt64Array => Vec } + { Float32Array => Vec } + { Float64Array => Vec } + { Utf8Array => Vec } } } diff --git a/connectorx/src/transports/postgres_arrow2.rs b/connectorx/src/transports/postgres_arrow2.rs index 7d3f31af0..492d0c74e 100644 --- a/connectorx/src/transports/postgres_arrow2.rs +++ b/connectorx/src/transports/postgres_arrow2.rs @@ -1,7 +1,10 @@ //! Transport from Postgres Source to Arrow2 Destination. use crate::destinations::arrow2::{ - typesystem::Arrow2TypeSystem, Arrow2Destination, Arrow2DestinationError, + typesystem::{ + Arrow2TypeSystem, DateTimeWrapperMicro, NaiveDateTimeWrapperMicro, NaiveTimeWrapperMicro, + }, + Arrow2Destination, Arrow2DestinationError, }; use crate::sources::postgres::{ BinaryProtocol, CSVProtocol, CursorProtocol, PostgresSource, PostgresSourceError, @@ -53,10 +56,10 @@ macro_rules! impl_postgres_transport { { VarChar[&'r str] => LargeUtf8[String] | conversion none } { Enum[&'r str] => LargeUtf8[String] | conversion none } { Name[&'r str] => LargeUtf8[String] | conversion none } - { Timestamp[NaiveDateTime] => Date64[NaiveDateTime] | conversion auto } + { Timestamp[NaiveDateTime] => Date64Micro[NaiveDateTimeWrapperMicro] | conversion option } { Date[NaiveDate] => Date32[NaiveDate] | conversion auto } - { Time[NaiveTime] => Time64[NaiveTime] | conversion auto } - { TimestampTz[DateTime] => DateTimeTz[DateTime] | conversion auto } + { Time[NaiveTime] => Time64Micro[NaiveTimeWrapperMicro] | conversion option } + { TimestampTz[DateTime] => DateTimeTzMicro[DateTimeWrapperMicro] | conversion option } { UUID[Uuid] => LargeUtf8[String] | conversion option } { Char[&'r str] => LargeUtf8[String] | conversion none } { ByteA[Vec] => LargeBinary[Vec] | conversion auto } @@ -86,6 +89,26 @@ impl_postgres_transport!(CursorProtocol, MakeTlsConnector); impl_postgres_transport!(SimpleProtocol, NoTls); impl_postgres_transport!(SimpleProtocol, MakeTlsConnector); +impl TypeConversion for PostgresArrow2Transport { + fn convert(val: NaiveTime) -> NaiveTimeWrapperMicro { + NaiveTimeWrapperMicro(val) + } +} + +impl TypeConversion + for PostgresArrow2Transport +{ + fn convert(val: NaiveDateTime) -> NaiveDateTimeWrapperMicro { + NaiveDateTimeWrapperMicro(val) + } +} + +impl TypeConversion, DateTimeWrapperMicro> for PostgresArrow2Transport { + fn convert(val: DateTime) -> DateTimeWrapperMicro { + DateTimeWrapperMicro(val) + } +} + impl TypeConversion for PostgresArrow2Transport { fn convert(val: Uuid) -> String { val.to_string()