Skip to content

Commit

Permalink
Implement transport mapping from PG JSON to Arrow v1
Browse files Browse the repository at this point in the history
In addition, arrow associations for DateTime types.
  • Loading branch information
gruuya committed Nov 21, 2022
1 parent 9288f22 commit 7cec03b
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 21 deletions.
12 changes: 6 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

40 changes: 25 additions & 15 deletions connectorx/src/destinations/arrow/arrow_assoc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::constants::SECONDS_IN_DAY;
use arrow::array::{
ArrayBuilder, BooleanBuilder, Date32Builder, Date64Builder, Float32Builder, Float64Builder,
Int32Builder, Int64Builder, LargeBinaryBuilder, StringBuilder, Time64NanosecondBuilder,
UInt32Builder, UInt64Builder,
TimestampNanosecondBuilder, UInt32Builder, UInt64Builder,
};
use arrow::datatypes::Field;
use arrow::datatypes::{DataType as ArrowDataType, TimeUnit};
Expand Down Expand Up @@ -140,34 +140,44 @@ impl ArrowAssoc for Option<String> {
}

impl ArrowAssoc for DateTime<Utc> {
type Builder = Float64Builder;
type Builder = TimestampNanosecondBuilder;

fn builder(_nrows: usize) -> Float64Builder {
unimplemented!()
fn builder(nrows: usize) -> Self::Builder {
TimestampNanosecondBuilder::with_capacity(nrows)
}

fn append(_builder: &mut Self::Builder, _value: DateTime<Utc>) -> Result<()> {
unimplemented!()
#[throws(ArrowDestinationError)]
fn append(builder: &mut Self::Builder, value: DateTime<Utc>) {
builder.append_value(value.timestamp_nanos())
}

fn field(_header: &str) -> Field {
unimplemented!()
fn field(header: &str) -> Field {
Field::new(
header,
ArrowDataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".to_string())),
true,
)
}
}

impl ArrowAssoc for Option<DateTime<Utc>> {
type Builder = Float64Builder;
type Builder = TimestampNanosecondBuilder;

fn builder(_nrows: usize) -> Float64Builder {
unimplemented!()
fn builder(nrows: usize) -> Self::Builder {
TimestampNanosecondBuilder::with_capacity(nrows)
}

fn append(_builder: &mut Self::Builder, _value: Option<DateTime<Utc>>) -> Result<()> {
unimplemented!()
#[throws(ArrowDestinationError)]
fn append(builder: &mut Self::Builder, value: Option<DateTime<Utc>>) {
builder.append_option(value.map(|x| x.timestamp_nanos()))
}

fn field(_header: &str) -> Field {
unimplemented!()
fn field(header: &str) -> Field {
Field::new(
header,
ArrowDataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".to_string())),
false,
)
}
}

Expand Down
9 changes: 9 additions & 0 deletions connectorx/src/transports/postgres_arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use num_traits::ToPrimitive;
use postgres::NoTls;
use postgres_openssl::MakeTlsConnector;
use rust_decimal::Decimal;
use serde_json::Value;
use std::marker::PhantomData;
use thiserror::Error;
use uuid::Uuid;
Expand Down Expand Up @@ -57,6 +58,8 @@ macro_rules! impl_postgres_transport {
{ UUID[Uuid] => LargeUtf8[String] | conversion option }
{ Char[&'r str] => LargeUtf8[String] | conversion none }
{ ByteA[Vec<u8>] => LargeBinary[Vec<u8>] | conversion auto }
{ JSON[Value] => LargeUtf8[String] | conversion option }
{ JSONB[Value] => LargeUtf8[String] | conversion none }
}
);
}
Expand All @@ -83,3 +86,9 @@ impl<P, C> TypeConversion<Decimal, f64> for PostgresArrowTransport<P, C> {
.unwrap_or_else(|| panic!("cannot convert decimal {:?} to float64", val))
}
}

impl<P, C> TypeConversion<Value, String> for PostgresArrowTransport<P, C> {
fn convert(val: Value) -> String {
val.to_string()
}
}

0 comments on commit 7cec03b

Please sign in to comment.