From f7ec750e5276838b131a3bf7a55d23a6ceb59378 Mon Sep 17 00:00:00 2001
From: Xiaoying Wang <xiaoying_wang@sfu.ca>
Date: Fri, 14 Jun 2024 21:03:44 -0400
Subject: [PATCH] microsecond support for arrow/arrow2 and
 postgres/mysql/oracle/mssql

---
 .../connectorx/tests/test_arrow.py            |  96 ++++++++++-
 .../src/destinations/arrow/arrow_assoc.rs     | 162 ++++++++++++++++--
 .../src/destinations/arrow/typesystem.rs      |  15 ++
 connectorx/src/sources/oracle/typesystem.rs   |  12 +-
 connectorx/src/transports/mssql_arrow.rs      |  33 +++-
 connectorx/src/transports/mssql_arrow2.rs     |  33 +++-
 connectorx/src/transports/mysql_arrow.rs      |  29 +++-
 connectorx/src/transports/mysql_arrow2.rs     |  27 ++-
 connectorx/src/transports/oracle_arrow.rs     |  23 ++-
 connectorx/src/transports/oracle_arrow2.rs    |  21 ++-
 connectorx/src/transports/postgres_arrow.rs   |  31 +++-
 11 files changed, 432 insertions(+), 50 deletions(-)

diff --git a/connectorx-python/connectorx/tests/test_arrow.py b/connectorx-python/connectorx/tests/test_arrow.py
index dd52371f5c..458f41d1c2 100644
--- a/connectorx-python/connectorx/tests/test_arrow.py
+++ b/connectorx-python/connectorx/tests/test_arrow.py
@@ -73,7 +73,101 @@ def test_arrow2(postgres_url: str) -> None:
     df.sort_values(by="test_int", inplace=True, ignore_index=True)
     assert_frame_equal(df, expected, check_names=True)
 
-
+def test_arrow_type(postgres_url: str) -> None:
+    query = "SELECT test_date, test_timestamp, test_timestamptz, test_int16, test_int64, test_float32, test_numeric, test_bpchar, test_char, test_varchar, test_uuid, test_time, test_bytea, test_json, test_jsonb, test_ltree, test_name FROM test_types"
+    df = read_sql(postgres_url, query, return_type="arrow")
+    df = df.to_pandas(date_as_object=False)
+    df.sort_values(by="test_int16", inplace=True, ignore_index=True)
+    expected = pd.DataFrame(
+        index=range(4),
+        data={
+            "test_date": pd.Series(
+                ["1970-01-01", "2000-02-28", "2038-01-18", None], dtype="datetime64[ms]"
+            ),
+            "test_timestamp": pd.Series(
+                [
+                    "1970-01-01 00:00:01",
+                    "2000-02-28 12:00:10",
+                    "2038-01-18 23:59:59",
+                    None,
+                ],
+                dtype="datetime64[us]",
+            ),
+            "test_timestamptz": pd.Series(
+                [
+                    "1970-01-01 00:00:01+00:00",
+                    "2000-02-28 16:00:10+00:00",
+                    "2038-01-18 15:59:59+00:00",
+                    None,
+                ],
+                dtype="datetime64[us, UTC]",
+            ),
+            "test_int16": pd.Series([0, 1, 2, 3], dtype="int64"),
+            "test_int64": pd.Series(
+                [-9223372036854775808, 0, 9223372036854775807, None], dtype="float64"
+            ),
+            "test_float32": pd.Series(
+                [None, 3.1415926535, 2.71, -1e-37], dtype="float64"
+            ),
+            "test_numeric": pd.Series([None, 521.34, 0.00, 0.00], dtype="float64"),
+            "test_bpchar": pd.Series(["a    ", "bb   ", "ccc  ", None], dtype="object"),
+            "test_char": pd.Series(["a", "b", None, "d"], dtype="object"),
+            "test_varchar": pd.Series([None, "bb", "c", "defghijklm"], dtype="object"),
+            "test_uuid": pd.Series(
+                [
+                    "86b494cc-96b2-11eb-9298-3e22fbb9fe9d",
+                    "86b49b84-96b2-11eb-9298-3e22fbb9fe9d",
+                    "86b49c42-96b2-11eb-9298-3e22fbb9fe9d",
+                    None,
+                ],
+                dtype="object",
+            ),
+            "test_time": pd.Series(
+                [
+                    datetime.time(8, 12, 40),
+                    None,
+                    datetime.time(23, 0, 10),
+                    datetime.time(18, 30),
+                ],
+                dtype="object",
+            ),
+            "test_bytea": pd.Series(
+                [
+                    None,
+                    b"\xd0\x97\xd0\xb4\xd1\x80\xd0\xb0\xcc\x81\xd0\xb2\xd1\x81\xd1\x82\xd0\xb2\xd1\x83\xd0\xb9\xd1\x82\xd0\xb5",
+                    b"",
+                    b"\xf0\x9f\x98\x9c",
+                ],
+                dtype="object",
+            ),
+            "test_json": pd.Series(
+                [
+                    '{"customer":"John Doe","items":{"product":"Beer","qty":6}}',
+                    '{"customer":"Lily Bush","items":{"product":"Diaper","qty":24}}',
+                    '{"customer":"Josh William","items":{"product":"Toy Car","qty":1}}',
+                    None,
+                ],
+                dtype="object",
+            ),
+            "test_jsonb": pd.Series(
+                [
+                    '{"product":"Beer","qty":6}',
+                    '{"product":"Diaper","qty":24}',
+                    '{"product":"Toy Car","qty":1}',
+                    None,
+                ],
+                dtype="object",
+            ),
+            "test_ltree": pd.Series(
+                ["A.B.C.D", "A.B.E", "A", None], dtype="object"
+            ),
+            "test_name": pd.Series(
+                ["0", "21", "someName", "101203203-1212323-22131235"]
+            )
+            
+        },
+    )
+    assert_frame_equal(df, expected, check_names=True)
 def test_arrow2_type(postgres_url: str) -> None:
     query = "SELECT test_date, test_timestamp, test_timestamptz, test_int16, test_int64, test_float32, test_numeric, test_bpchar, test_char, test_varchar, test_uuid, test_time, test_bytea, test_json, test_jsonb, test_f4array, test_f8array, test_narray, test_i2array, test_i4array, test_i8array, test_enum, test_ltree, test_name FROM test_types"
     df = read_sql(postgres_url, query, return_type="arrow2")
diff --git a/connectorx/src/destinations/arrow/arrow_assoc.rs b/connectorx/src/destinations/arrow/arrow_assoc.rs
index 6dd1a2e106..f553ed48e1 100644
--- a/connectorx/src/destinations/arrow/arrow_assoc.rs
+++ b/connectorx/src/destinations/arrow/arrow_assoc.rs
@@ -1,9 +1,13 @@
-use super::errors::{ArrowDestinationError, Result};
+use super::{
+    errors::{ArrowDestinationError, Result},
+    typesystem::{DateTimeWrapperMicro, NaiveDateTimeWrapperMicro, NaiveTimeWrapperMicro},
+};
 use crate::constants::SECONDS_IN_DAY;
 use arrow::array::{
-    ArrayBuilder, BooleanBuilder, Date32Builder, Date64Builder, Float32Builder, Float64Builder,
-    Int32Builder, Int64Builder, LargeBinaryBuilder, StringBuilder, Time64NanosecondBuilder,
-    TimestampNanosecondBuilder, UInt32Builder, UInt64Builder,
+    ArrayBuilder, BooleanBuilder, Date32Builder, Float32Builder, Float64Builder, Int32Builder,
+    Int64Builder, LargeBinaryBuilder, StringBuilder, Time64MicrosecondBuilder,
+    Time64NanosecondBuilder, TimestampMicrosecondBuilder, TimestampNanosecondBuilder,
+    UInt32Builder, UInt64Builder,
 };
 use arrow::datatypes::Field;
 use arrow::datatypes::{DataType as ArrowDataType, TimeUnit};
@@ -188,6 +192,48 @@ impl ArrowAssoc for Option<DateTime<Utc>> {
     }
 }
 
+impl ArrowAssoc for DateTimeWrapperMicro {
+    type Builder = TimestampMicrosecondBuilder;
+
+    fn builder(nrows: usize) -> Self::Builder {
+        TimestampMicrosecondBuilder::with_capacity(nrows).with_timezone("UTC")
+    }
+
+    #[throws(ArrowDestinationError)]
+    fn append(builder: &mut Self::Builder, value: DateTimeWrapperMicro) {
+        builder.append_value(value.0.timestamp_micros());
+    }
+
+    fn field(header: &str) -> Field {
+        Field::new(
+            header,
+            ArrowDataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
+            false,
+        )
+    }
+}
+
+impl ArrowAssoc for Option<DateTimeWrapperMicro> {
+    type Builder = TimestampMicrosecondBuilder;
+
+    fn builder(nrows: usize) -> Self::Builder {
+        TimestampMicrosecondBuilder::with_capacity(nrows).with_timezone("UTC")
+    }
+
+    #[throws(ArrowDestinationError)]
+    fn append(builder: &mut Self::Builder, value: Option<DateTimeWrapperMicro>) {
+        builder.append_option(value.map(|x| x.0.timestamp_micros()));
+    }
+
+    fn field(header: &str) -> Field {
+        Field::new(
+            header,
+            ArrowDataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
+            true,
+        )
+    }
+}
+
 fn naive_date_to_arrow(nd: NaiveDate) -> i32 {
     match nd.and_hms_opt(0, 0, 0) {
         Some(dt) => (dt.and_utc().timestamp() / SECONDS_IN_DAY) as i32,
@@ -196,7 +242,9 @@ fn naive_date_to_arrow(nd: NaiveDate) -> i32 {
 }
 
 fn naive_datetime_to_arrow(nd: NaiveDateTime) -> i64 {
-    nd.and_utc().timestamp_millis()
+    nd.and_utc()
+        .timestamp_nanos_opt()
+        .unwrap_or_else(|| panic!("out of range DateTime"))
 }
 
 impl ArrowAssoc for Option<NaiveDate> {
@@ -234,10 +282,10 @@ impl ArrowAssoc for NaiveDate {
 }
 
 impl ArrowAssoc for Option<NaiveDateTime> {
-    type Builder = Date64Builder;
+    type Builder = TimestampNanosecondBuilder;
 
     fn builder(nrows: usize) -> Self::Builder {
-        Date64Builder::with_capacity(nrows)
+        TimestampNanosecondBuilder::with_capacity(nrows)
     }
 
     fn append(builder: &mut Self::Builder, value: Option<NaiveDateTime>) -> Result<()> {
@@ -246,15 +294,19 @@ impl ArrowAssoc for Option<NaiveDateTime> {
     }
 
     fn field(header: &str) -> Field {
-        Field::new(header, ArrowDataType::Date64, true)
+        Field::new(
+            header,
+            ArrowDataType::Timestamp(TimeUnit::Nanosecond, None),
+            true,
+        )
     }
 }
 
 impl ArrowAssoc for NaiveDateTime {
-    type Builder = Date64Builder;
+    type Builder = TimestampNanosecondBuilder;
 
     fn builder(nrows: usize) -> Self::Builder {
-        Date64Builder::with_capacity(nrows)
+        TimestampNanosecondBuilder::with_capacity(nrows)
     }
 
     fn append(builder: &mut Self::Builder, value: NaiveDateTime) -> Result<()> {
@@ -263,7 +315,56 @@ impl ArrowAssoc for NaiveDateTime {
     }
 
     fn field(header: &str) -> Field {
-        Field::new(header, ArrowDataType::Date64, false)
+        Field::new(
+            header,
+            ArrowDataType::Timestamp(TimeUnit::Nanosecond, None),
+            false,
+        )
+    }
+}
+
+impl ArrowAssoc for Option<NaiveDateTimeWrapperMicro> {
+    type Builder = TimestampMicrosecondBuilder;
+
+    fn builder(nrows: usize) -> Self::Builder {
+        TimestampMicrosecondBuilder::with_capacity(nrows)
+    }
+
+    fn append(builder: &mut Self::Builder, value: Option<NaiveDateTimeWrapperMicro>) -> Result<()> {
+        builder.append_option(match value {
+            Some(v) => Some(v.0.and_utc().timestamp_micros()),
+            None => None,
+        });
+        Ok(())
+    }
+
+    fn field(header: &str) -> Field {
+        Field::new(
+            header,
+            ArrowDataType::Timestamp(TimeUnit::Microsecond, None),
+            true,
+        )
+    }
+}
+
+impl ArrowAssoc for NaiveDateTimeWrapperMicro {
+    type Builder = TimestampMicrosecondBuilder;
+
+    fn builder(nrows: usize) -> Self::Builder {
+        TimestampMicrosecondBuilder::with_capacity(nrows)
+    }
+
+    fn append(builder: &mut Self::Builder, value: NaiveDateTimeWrapperMicro) -> Result<()> {
+        builder.append_value(value.0.and_utc().timestamp_micros());
+        Ok(())
+    }
+
+    fn field(header: &str) -> Field {
+        Field::new(
+            header,
+            ArrowDataType::Timestamp(TimeUnit::Microsecond, None),
+            false,
+        )
     }
 }
 
@@ -307,6 +408,45 @@ impl ArrowAssoc for NaiveTime {
     }
 }
 
+impl ArrowAssoc for Option<NaiveTimeWrapperMicro> {
+    type Builder = Time64MicrosecondBuilder;
+
+    fn builder(nrows: usize) -> Self::Builder {
+        Time64MicrosecondBuilder::with_capacity(nrows)
+    }
+
+    fn append(builder: &mut Self::Builder, value: Option<NaiveTimeWrapperMicro>) -> Result<()> {
+        builder.append_option(value.map(|t| {
+            t.0.num_seconds_from_midnight() as i64 * 1_000_000 + (t.0.nanosecond() as i64) / 1000
+        }));
+        Ok(())
+    }
+
+    fn field(header: &str) -> Field {
+        Field::new(header, ArrowDataType::Time64(TimeUnit::Microsecond), true)
+    }
+}
+
+impl ArrowAssoc for NaiveTimeWrapperMicro {
+    type Builder = Time64MicrosecondBuilder;
+
+    fn builder(nrows: usize) -> Self::Builder {
+        Time64MicrosecondBuilder::with_capacity(nrows)
+    }
+
+    fn append(builder: &mut Self::Builder, value: NaiveTimeWrapperMicro) -> Result<()> {
+        builder.append_value(
+            value.0.num_seconds_from_midnight() as i64 * 1_000_000
+                + (value.0.nanosecond() as i64) / 1000,
+        );
+        Ok(())
+    }
+
+    fn field(header: &str) -> Field {
+        Field::new(header, ArrowDataType::Time64(TimeUnit::Microsecond), false)
+    }
+}
+
 impl ArrowAssoc for Option<Vec<u8>> {
     type Builder = LargeBinaryBuilder;
 
diff --git a/connectorx/src/destinations/arrow/typesystem.rs b/connectorx/src/destinations/arrow/typesystem.rs
index a6997a2ba2..7cf1815ff2 100644
--- a/connectorx/src/destinations/arrow/typesystem.rs
+++ b/connectorx/src/destinations/arrow/typesystem.rs
@@ -1,6 +1,15 @@
 use crate::impl_typesystem;
 use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, Utc};
 
+#[derive(Debug, Clone, Copy)]
+pub struct DateTimeWrapperMicro(pub DateTime<Utc>);
+
+#[derive(Debug, Clone, Copy)]
+pub struct NaiveTimeWrapperMicro(pub NaiveTime);
+
+#[derive(Debug, Clone, Copy)]
+pub struct NaiveDateTimeWrapperMicro(pub NaiveDateTime);
+
 #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
 pub enum ArrowTypeSystem {
     Int32(bool),
@@ -14,8 +23,11 @@ pub enum ArrowTypeSystem {
     LargeBinary(bool),
     Date32(bool),
     Date64(bool),
+    Date64Micro(bool),
     Time64(bool),
+    Time64Micro(bool),
     DateTimeTz(bool),
+    DateTimeTzMicro(bool),
 }
 
 impl_typesystem! {
@@ -32,7 +44,10 @@ impl_typesystem! {
         { LargeBinary => Vec<u8>      }
         { Date32     => NaiveDate     }
         { Date64     => NaiveDateTime }
+        { Date64Micro     => NaiveDateTimeWrapperMicro }
         { Time64     => NaiveTime     }
+        { Time64Micro     => NaiveTimeWrapperMicro }
         { DateTimeTz => DateTime<Utc> }
+        { DateTimeTzMicro => DateTimeWrapperMicro }
     }
 }
diff --git a/connectorx/src/sources/oracle/typesystem.rs b/connectorx/src/sources/oracle/typesystem.rs
index 4c7d15a621..799cb67fb6 100644
--- a/connectorx/src/sources/oracle/typesystem.rs
+++ b/connectorx/src/sources/oracle/typesystem.rs
@@ -17,6 +17,8 @@ pub enum OracleTypeSystem {
     Date(bool),
     Timestamp(bool),
     TimestampTz(bool),
+    TimestampNano(bool),
+    TimestampTzNano(bool),
 }
 
 impl_typesystem! {
@@ -26,8 +28,8 @@ impl_typesystem! {
         { Float | NumFloat | BinaryFloat | BinaryDouble => f64 }
         { Blob => Vec<u8>}
         { Clob | VarChar | Char | NVarChar | NChar => String }
-        { Date | Timestamp => NaiveDateTime }
-        { TimestampTz => DateTime<Utc> }
+        { Date | Timestamp | TimestampNano => NaiveDateTime }
+        { TimestampTz | TimestampTzNano => DateTime<Utc> }
     }
 }
 
@@ -48,7 +50,13 @@ impl<'a> From<&'a OracleType> for OracleTypeSystem {
             OracleType::Varchar2(_) => VarChar(true),
             OracleType::NVarchar2(_) => NVarChar(true),
             OracleType::Date => Date(true),
+            OracleType::Timestamp(7) | OracleType::Timestamp(8) | OracleType::Timestamp(9) => {
+                TimestampNano(true)
+            }
             OracleType::Timestamp(_) => Timestamp(true),
+            OracleType::TimestampTZ(7)
+            | OracleType::TimestampTZ(8)
+            | OracleType::TimestampTZ(9) => TimestampTzNano(true),
             OracleType::TimestampTZ(_) => TimestampTz(true),
             _ => unimplemented!("{}", format!("Type {:?} not implemented for oracle!", ty)),
         }
diff --git a/connectorx/src/transports/mssql_arrow.rs b/connectorx/src/transports/mssql_arrow.rs
index 9e3aa8e143..ae2cd4db73 100644
--- a/connectorx/src/transports/mssql_arrow.rs
+++ b/connectorx/src/transports/mssql_arrow.rs
@@ -1,6 +1,9 @@
 //! Transport from MsSQL Source to Arrow Destination.
 
-use crate::destinations::arrow::{ArrowDestination, ArrowDestinationError, ArrowTypeSystem};
+use crate::destinations::arrow::{
+    typesystem::{DateTimeWrapperMicro, NaiveDateTimeWrapperMicro, NaiveTimeWrapperMicro},
+    ArrowDestination, ArrowDestinationError, ArrowTypeSystem,
+};
 use crate::sources::mssql::{FloatN, IntN, MsSQLSource, MsSQLSourceError, MsSQLTypeSystem};
 use crate::typesystem::TypeConversion;
 use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, Utc};
@@ -50,18 +53,36 @@ impl_transport!(
         { Image[&'r [u8]]               => LargeBinary[Vec<u8>]      | conversion none }
         { Numeric[Decimal]              => Float64[f64]              | conversion option }
         { Decimal[Decimal]              => Float64[f64]              | conversion none }
-        { Datetime[NaiveDateTime]       => Date64[NaiveDateTime]     | conversion auto }
-        { Datetime2[NaiveDateTime]      => Date64[NaiveDateTime]     | conversion none }
-        { Smalldatetime[NaiveDateTime]  => Date64[NaiveDateTime]     | conversion none }
+        { Datetime[NaiveDateTime]       => Date64Micro[NaiveDateTimeWrapperMicro]     | conversion option }
+        { Datetime2[NaiveDateTime]      => Date64Micro[NaiveDateTimeWrapperMicro]     | conversion none }
+        { Smalldatetime[NaiveDateTime]  => Date64Micro[NaiveDateTimeWrapperMicro]     | conversion none }
         { Date[NaiveDate]               => Date32[NaiveDate]         | conversion auto }
-        { Datetimeoffset[DateTime<Utc>] => DateTimeTz[DateTime<Utc>] | conversion auto }
+        { Datetimeoffset[DateTime<Utc>] => DateTimeTzMicro[DateTimeWrapperMicro] | conversion option }
         { Uniqueidentifier[Uuid]        => LargeUtf8[String]         | conversion option }
-        { Time[NaiveTime]               => Time64[NaiveTime]         | conversion auto }
+        { Time[NaiveTime]               => Time64Micro[NaiveTimeWrapperMicro]         | conversion option }
         { SmallMoney[f32]               => Float32[f32]              | conversion none }
         { Money[f64]                    => Float64[f64]              | conversion none }
     }
 );
 
+impl TypeConversion<NaiveTime, NaiveTimeWrapperMicro> for MsSQLArrowTransport {
+    fn convert(val: NaiveTime) -> NaiveTimeWrapperMicro {
+        NaiveTimeWrapperMicro(val)
+    }
+}
+
+impl TypeConversion<NaiveDateTime, NaiveDateTimeWrapperMicro> for MsSQLArrowTransport {
+    fn convert(val: NaiveDateTime) -> NaiveDateTimeWrapperMicro {
+        NaiveDateTimeWrapperMicro(val)
+    }
+}
+
+impl TypeConversion<DateTime<Utc>, DateTimeWrapperMicro> for MsSQLArrowTransport {
+    fn convert(val: DateTime<Utc>) -> DateTimeWrapperMicro {
+        DateTimeWrapperMicro(val)
+    }
+}
+
 impl TypeConversion<Uuid, String> for MsSQLArrowTransport {
     fn convert(val: Uuid) -> String {
         val.to_string()
diff --git a/connectorx/src/transports/mssql_arrow2.rs b/connectorx/src/transports/mssql_arrow2.rs
index fa3370c5a9..5464367532 100644
--- a/connectorx/src/transports/mssql_arrow2.rs
+++ b/connectorx/src/transports/mssql_arrow2.rs
@@ -1,6 +1,9 @@
 //! Transport from MsSQL Source to Arrow2 Destination.
 
-use crate::destinations::arrow2::{Arrow2Destination, Arrow2DestinationError, Arrow2TypeSystem};
+use crate::destinations::arrow2::{
+    typesystem::{DateTimeWrapperMicro, NaiveDateTimeWrapperMicro, NaiveTimeWrapperMicro},
+    Arrow2Destination, Arrow2DestinationError, Arrow2TypeSystem,
+};
 use crate::sources::mssql::{FloatN, IntN, MsSQLSource, MsSQLSourceError, MsSQLTypeSystem};
 use crate::typesystem::TypeConversion;
 use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, Utc};
@@ -50,18 +53,36 @@ impl_transport!(
         { Image[&'r [u8]]               => LargeBinary[Vec<u8>]      | conversion none }
         { Numeric[Decimal]              => Float64[f64]              | conversion option }
         { Decimal[Decimal]              => Float64[f64]              | conversion none }
-        { Datetime[NaiveDateTime]       => Date64[NaiveDateTime]     | conversion auto }
-        { Datetime2[NaiveDateTime]      => Date64[NaiveDateTime]     | conversion none }
-        { Smalldatetime[NaiveDateTime]  => Date64[NaiveDateTime]     | conversion none }
+        { Datetime[NaiveDateTime]       => Date64Micro[NaiveDateTimeWrapperMicro]     | conversion option }
+        { Datetime2[NaiveDateTime]      => Date64Micro[NaiveDateTimeWrapperMicro]     | conversion none }
+        { Smalldatetime[NaiveDateTime]  => Date64Micro[NaiveDateTimeWrapperMicro]     | conversion none }
         { Date[NaiveDate]               => Date32[NaiveDate]         | conversion auto }
-        { Datetimeoffset[DateTime<Utc>] => DateTimeTz[DateTime<Utc>] | conversion auto }
+        { Datetimeoffset[DateTime<Utc>] => DateTimeTzMicro[DateTimeWrapperMicro] | conversion option }
         { Uniqueidentifier[Uuid]        => LargeUtf8[String]         | conversion option }
-        { Time[NaiveTime]               => Time64[NaiveTime]         | conversion auto }
+        { Time[NaiveTime]               => Time64Micro[NaiveTimeWrapperMicro]         | conversion option }
         { SmallMoney[f32]               => Float32[f32]              | conversion none }
         { Money[f64]                    => Float64[f64]              | conversion none }
     }
 );
 
+impl TypeConversion<NaiveTime, NaiveTimeWrapperMicro> for MsSQLArrow2Transport {
+    fn convert(val: NaiveTime) -> NaiveTimeWrapperMicro {
+        NaiveTimeWrapperMicro(val)
+    }
+}
+
+impl TypeConversion<NaiveDateTime, NaiveDateTimeWrapperMicro> for MsSQLArrow2Transport {
+    fn convert(val: NaiveDateTime) -> NaiveDateTimeWrapperMicro {
+        NaiveDateTimeWrapperMicro(val)
+    }
+}
+
+impl TypeConversion<DateTime<Utc>, DateTimeWrapperMicro> for MsSQLArrow2Transport {
+    fn convert(val: DateTime<Utc>) -> DateTimeWrapperMicro {
+        DateTimeWrapperMicro(val)
+    }
+}
+
 impl TypeConversion<Uuid, String> for MsSQLArrow2Transport {
     fn convert(val: Uuid) -> String {
         val.to_string()
diff --git a/connectorx/src/transports/mysql_arrow.rs b/connectorx/src/transports/mysql_arrow.rs
index 1885c05dd1..05675a6f32 100644
--- a/connectorx/src/transports/mysql_arrow.rs
+++ b/connectorx/src/transports/mysql_arrow.rs
@@ -1,7 +1,10 @@
 //! Transport from MySQL Source to Arrow Destination.
 
 use crate::{
-    destinations::arrow::{typesystem::ArrowTypeSystem, ArrowDestination, ArrowDestinationError},
+    destinations::arrow::{
+        typesystem::{ArrowTypeSystem, NaiveDateTimeWrapperMicro, NaiveTimeWrapperMicro},
+        ArrowDestination, ArrowDestinationError,
+    },
     impl_transport,
     sources::mysql::{
         BinaryProtocol, MySQLSource, MySQLSourceError, MySQLTypeSystem, TextProtocol,
@@ -49,10 +52,10 @@ impl_transport!(
         { UInt24[u32]                => Int64[i64]              | conversion none }
         { ULongLong[u64]             => Float64[f64]            | conversion auto }
         { Date[NaiveDate]            => Date32[NaiveDate]       | conversion auto }
-        { Time[NaiveTime]            => Time64[NaiveTime]       | conversion auto }
-        { Datetime[NaiveDateTime]    => Date64[NaiveDateTime]   | conversion auto }
+        { Time[NaiveTime]            => Time64Micro[NaiveTimeWrapperMicro]       | conversion option }
+        { Datetime[NaiveDateTime]    => Date64Micro[NaiveDateTimeWrapperMicro]   | conversion option }
         { Year[i16]                  => Int64[i64]              | conversion none}
-        { Timestamp[NaiveDateTime]   => Date64[NaiveDateTime]   | conversion none }
+        { Timestamp[NaiveDateTime]   => Date64Micro[NaiveDateTimeWrapperMicro]   | conversion none }
         { Decimal[Decimal]           => Float64[f64]            | conversion option }
         { VarChar[String]            => LargeUtf8[String]       | conversion auto }
         { Char[String]               => LargeUtf8[String]       | conversion none }
@@ -84,10 +87,10 @@ impl_transport!(
         { UInt24[u32]                => Int64[i64]              | conversion none }
         { ULongLong[u64]             => Float64[f64]            | conversion auto }
         { Date[NaiveDate]            => Date32[NaiveDate]       | conversion auto }
-        { Time[NaiveTime]            => Time64[NaiveTime]       | conversion auto }
-        { Datetime[NaiveDateTime]    => Date64[NaiveDateTime]   | conversion auto }
+        { Time[NaiveTime]            => Time64Micro[NaiveTimeWrapperMicro]       | conversion option }
+        { Datetime[NaiveDateTime]    => Date64Micro[NaiveDateTimeWrapperMicro]   | conversion option }
         { Year[i16]                  => Int64[i64]              | conversion none}
-        { Timestamp[NaiveDateTime]   => Date64[NaiveDateTime]   | conversion none }
+        { Timestamp[NaiveDateTime]   => Date64Micro[NaiveDateTimeWrapperMicro]   | conversion none }
         { Decimal[Decimal]           => Float64[f64]            | conversion option }
         { VarChar[String]            => LargeUtf8[String]       | conversion auto }
         { Char[String]               => LargeUtf8[String]       | conversion none }
@@ -100,6 +103,18 @@ impl_transport!(
     }
 );
 
+impl<P> TypeConversion<NaiveTime, NaiveTimeWrapperMicro> for MySQLArrowTransport<P> {
+    fn convert(val: NaiveTime) -> NaiveTimeWrapperMicro {
+        NaiveTimeWrapperMicro(val)
+    }
+}
+
+impl<P> TypeConversion<NaiveDateTime, NaiveDateTimeWrapperMicro> for MySQLArrowTransport<P> {
+    fn convert(val: NaiveDateTime) -> NaiveDateTimeWrapperMicro {
+        NaiveDateTimeWrapperMicro(val)
+    }
+}
+
 impl<P> TypeConversion<Decimal, f64> for MySQLArrowTransport<P> {
     fn convert(val: Decimal) -> f64 {
         val.to_f64()
diff --git a/connectorx/src/transports/mysql_arrow2.rs b/connectorx/src/transports/mysql_arrow2.rs
index 23145bef42..f239e3b32d 100644
--- a/connectorx/src/transports/mysql_arrow2.rs
+++ b/connectorx/src/transports/mysql_arrow2.rs
@@ -2,7 +2,8 @@
 
 use crate::{
     destinations::arrow2::{
-        typesystem::Arrow2TypeSystem, Arrow2Destination, Arrow2DestinationError,
+        typesystem::{Arrow2TypeSystem, NaiveDateTimeWrapperMicro, NaiveTimeWrapperMicro},
+        Arrow2Destination, Arrow2DestinationError,
     },
     impl_transport,
     sources::mysql::{
@@ -51,10 +52,10 @@ impl_transport!(
         { UInt24[u32]                => Int64[i64]              | conversion none }
         { ULongLong[u64]             => Float64[f64]            | conversion auto }
         { Date[NaiveDate]            => Date32[NaiveDate]       | conversion auto }
-        { Time[NaiveTime]            => Time64[NaiveTime]       | conversion auto }
-        { Datetime[NaiveDateTime]    => Date64[NaiveDateTime]   | conversion auto }
+        { Time[NaiveTime]            => Time64Micro[NaiveTimeWrapperMicro]       | conversion option }
+        { Datetime[NaiveDateTime]    => Date64Micro[NaiveDateTimeWrapperMicro]   | conversion option }
         { Year[i16]                  => Int64[i64]              | conversion none}
-        { Timestamp[NaiveDateTime]   => Date64[NaiveDateTime]   | conversion none }
+        { Timestamp[NaiveDateTime]   => Date64Micro[NaiveDateTimeWrapperMicro]   | conversion none }
         { Decimal[Decimal]           => Float64[f64]            | conversion option }
         { VarChar[String]            => LargeUtf8[String]       | conversion auto }
         { Char[String]               => LargeUtf8[String]       | conversion none }
@@ -86,10 +87,10 @@ impl_transport!(
         { UInt24[u32]                => Int64[i64]              | conversion none }
         { ULongLong[u64]             => Float64[f64]            | conversion auto }
         { Date[NaiveDate]            => Date32[NaiveDate]       | conversion auto }
-        { Time[NaiveTime]            => Time64[NaiveTime]       | conversion auto }
-        { Datetime[NaiveDateTime]    => Date64[NaiveDateTime]   | conversion auto }
+        { Time[NaiveTime]            => Time64Micro[NaiveTimeWrapperMicro]       | conversion option }
+        { Datetime[NaiveDateTime]    => Date64Micro[NaiveDateTimeWrapperMicro]   | conversion option }
         { Year[i16]                  => Int64[i64]              | conversion none}
-        { Timestamp[NaiveDateTime]   => Date64[NaiveDateTime]   | conversion none }
+        { Timestamp[NaiveDateTime]   => Date64Micro[NaiveDateTimeWrapperMicro]   | conversion none }
         { Decimal[Decimal]           => Float64[f64]            | conversion option }
         { VarChar[String]            => LargeUtf8[String]       | conversion auto }
         { Char[String]               => LargeUtf8[String]       | conversion none }
@@ -102,6 +103,18 @@ impl_transport!(
     }
 );
 
+impl<P> TypeConversion<NaiveTime, NaiveTimeWrapperMicro> for MySQLArrow2Transport<P> {
+    fn convert(val: NaiveTime) -> NaiveTimeWrapperMicro {
+        NaiveTimeWrapperMicro(val)
+    }
+}
+
+impl<P> TypeConversion<NaiveDateTime, NaiveDateTimeWrapperMicro> for MySQLArrow2Transport<P> {
+    fn convert(val: NaiveDateTime) -> NaiveDateTimeWrapperMicro {
+        NaiveDateTimeWrapperMicro(val)
+    }
+}
+
 impl<P> TypeConversion<Decimal, f64> for MySQLArrow2Transport<P> {
     fn convert(val: Decimal) -> f64 {
         val.to_f64()
diff --git a/connectorx/src/transports/oracle_arrow.rs b/connectorx/src/transports/oracle_arrow.rs
index 7f08297f23..da7d657c16 100644
--- a/connectorx/src/transports/oracle_arrow.rs
+++ b/connectorx/src/transports/oracle_arrow.rs
@@ -1,5 +1,8 @@
 use crate::{
-    destinations::arrow::{typesystem::ArrowTypeSystem, ArrowDestination, ArrowDestinationError},
+    destinations::arrow::{
+        typesystem::{ArrowTypeSystem, DateTimeWrapperMicro, NaiveDateTimeWrapperMicro},
+        ArrowDestination, ArrowDestinationError,
+    },
     impl_transport,
     sources::oracle::{OracleSource, OracleSourceError, OracleTypeSystem},
     typesystem::TypeConversion,
@@ -39,7 +42,21 @@ impl_transport!(
         { NVarChar[String]           => LargeUtf8[String]          | conversion none }
         { NChar[String]              => LargeUtf8[String]          | conversion none }
         { Date[NaiveDateTime]        => Date64[NaiveDateTime]      | conversion auto }
-        { Timestamp[NaiveDateTime]   => Date64[NaiveDateTime]      | conversion none }
-        { TimestampTz[DateTime<Utc>] => DateTimeTz[DateTime<Utc>]  | conversion auto }
+        { Timestamp[NaiveDateTime]   => Date64Micro[NaiveDateTimeWrapperMicro]  | conversion option }
+        { TimestampNano[NaiveDateTime]   => Date64[NaiveDateTime]      | conversion none }
+        { TimestampTz[DateTime<Utc>] => DateTimeTz[DateTimeWrapperMicro]        | conversion option }
+        { TimestampTzNano[DateTime<Utc>] => DateTimeTz[DateTime<Utc>]  | conversion auto }
     }
 );
+
+impl TypeConversion<NaiveDateTime, NaiveDateTimeWrapperMicro> for OracleArrowTransport {
+    fn convert(val: NaiveDateTime) -> NaiveDateTimeWrapperMicro {
+        NaiveDateTimeWrapperMicro(val)
+    }
+}
+
+impl TypeConversion<DateTime<Utc>, DateTimeWrapperMicro> for OracleArrowTransport {
+    fn convert(val: DateTime<Utc>) -> DateTimeWrapperMicro {
+        DateTimeWrapperMicro(val)
+    }
+}
diff --git a/connectorx/src/transports/oracle_arrow2.rs b/connectorx/src/transports/oracle_arrow2.rs
index 7e351fb64b..753bcebd14 100644
--- a/connectorx/src/transports/oracle_arrow2.rs
+++ b/connectorx/src/transports/oracle_arrow2.rs
@@ -1,6 +1,7 @@
 use crate::{
     destinations::arrow2::{
-        typesystem::Arrow2TypeSystem, Arrow2Destination, Arrow2DestinationError,
+        typesystem::{Arrow2TypeSystem, DateTimeWrapperMicro, NaiveDateTimeWrapperMicro},
+        Arrow2Destination, Arrow2DestinationError,
     },
     impl_transport,
     sources::oracle::{OracleSource, OracleSourceError, OracleTypeSystem},
@@ -41,7 +42,21 @@ impl_transport!(
         { NVarChar[String]              => LargeUtf8[String]            | conversion none }
         { NChar[String]                 => LargeUtf8[String]            | conversion none }
         { Date[NaiveDateTime]           => Date64[NaiveDateTime]        | conversion auto }
-        { Timestamp[NaiveDateTime]      => Date64[NaiveDateTime]        | conversion none }
-        { TimestampTz[DateTime<Utc>]    => DateTimeTz[DateTime<Utc>]    | conversion auto }
+        { Timestamp[NaiveDateTime]      => Date64Micro[NaiveDateTimeWrapperMicro]       | conversion option }
+        { TimestampNano[NaiveDateTime]      => Date64[NaiveDateTime]        | conversion none }
+        { TimestampTz[DateTime<Utc>]    => DateTimeTzMicro[DateTimeWrapperMicro]        | conversion option }
+        { TimestampTzNano[DateTime<Utc>]    => DateTimeTz[DateTime<Utc>]    | conversion auto }
     }
 );
+
+impl TypeConversion<NaiveDateTime, NaiveDateTimeWrapperMicro> for OracleArrow2Transport {
+    fn convert(val: NaiveDateTime) -> NaiveDateTimeWrapperMicro {
+        NaiveDateTimeWrapperMicro(val)
+    }
+}
+
+impl TypeConversion<DateTime<Utc>, DateTimeWrapperMicro> for OracleArrow2Transport {
+    fn convert(val: DateTime<Utc>) -> DateTimeWrapperMicro {
+        DateTimeWrapperMicro(val)
+    }
+}
diff --git a/connectorx/src/transports/postgres_arrow.rs b/connectorx/src/transports/postgres_arrow.rs
index 73c076fe84..0ec93a1d44 100644
--- a/connectorx/src/transports/postgres_arrow.rs
+++ b/connectorx/src/transports/postgres_arrow.rs
@@ -1,7 +1,10 @@
 //! Transport from Postgres Source to Arrow Destination.
 
 use crate::destinations::arrow::{
-    typesystem::ArrowTypeSystem, ArrowDestination, ArrowDestinationError,
+    typesystem::{
+        ArrowTypeSystem, DateTimeWrapperMicro, NaiveDateTimeWrapperMicro, NaiveTimeWrapperMicro,
+    },
+    ArrowDestination, ArrowDestinationError,
 };
 use crate::sources::postgres::{
     BinaryProtocol, CSVProtocol, CursorProtocol, PostgresSource, PostgresSourceError,
@@ -52,10 +55,10 @@ macro_rules! impl_postgres_transport {
                 { BpChar[&'r str]            => LargeUtf8[String]         | conversion none }
                 { VarChar[&'r str]           => LargeUtf8[String]         | conversion none }
                 { Name[&'r str]              => LargeUtf8[String]         | conversion none }
-                { Timestamp[NaiveDateTime]   => Date64[NaiveDateTime]     | conversion auto }
+                { Timestamp[NaiveDateTime]   => Date64Micro[NaiveDateTimeWrapperMicro] | conversion option }
                 { Date[NaiveDate]            => Date32[NaiveDate]         | conversion auto }
-                { Time[NaiveTime]            => Time64[NaiveTime]         | conversion auto }
-                { TimestampTz[DateTime<Utc>] => DateTimeTz[DateTime<Utc>] | conversion auto }
+                { Time[NaiveTime]            => Time64Micro[NaiveTimeWrapperMicro]     | conversion option }
+                { TimestampTz[DateTime<Utc>] => DateTimeTzMicro[DateTimeWrapperMicro]  | conversion option }
                 { UUID[Uuid]                 => LargeUtf8[String]         | conversion option }
                 { Char[&'r str]              => LargeUtf8[String]         | conversion none }
                 { ByteA[Vec<u8>]             => LargeBinary[Vec<u8>]      | conversion auto }
@@ -75,6 +78,26 @@ impl_postgres_transport!(CursorProtocol, MakeTlsConnector);
 impl_postgres_transport!(SimpleProtocol, NoTls);
 impl_postgres_transport!(SimpleProtocol, MakeTlsConnector);
 
+impl<P, C> TypeConversion<NaiveTime, NaiveTimeWrapperMicro> for PostgresArrowTransport<P, C> {
+    fn convert(val: NaiveTime) -> NaiveTimeWrapperMicro {
+        NaiveTimeWrapperMicro(val)
+    }
+}
+
+impl<P, C> TypeConversion<NaiveDateTime, NaiveDateTimeWrapperMicro>
+    for PostgresArrowTransport<P, C>
+{
+    fn convert(val: NaiveDateTime) -> NaiveDateTimeWrapperMicro {
+        NaiveDateTimeWrapperMicro(val)
+    }
+}
+
+impl<P, C> TypeConversion<DateTime<Utc>, DateTimeWrapperMicro> for PostgresArrowTransport<P, C> {
+    fn convert(val: DateTime<Utc>) -> DateTimeWrapperMicro {
+        DateTimeWrapperMicro(val)
+    }
+}
+
 impl<P, C> TypeConversion<Uuid, String> for PostgresArrowTransport<P, C> {
     fn convert(val: Uuid) -> String {
         val.to_string()