Skip to content

Commit

Permalink
feat(clickhouse): add Nullable data type support (#343)
Browse files Browse the repository at this point in the history
* feat(clickhouse): add Nullable data type support

* feat(clickhouse): add nullable test case
  • Loading branch information
burmecia authored Sep 11, 2024
1 parent 7cebb59 commit ac3edd6
Show file tree
Hide file tree
Showing 6 changed files with 188 additions and 48 deletions.
54 changes: 36 additions & 18 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions docs/catalog/clickhouse.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ The ClickHouse Wrapper allows you to read and write data from ClickHouse within
| text | String |
| date | Date |
| timestamp | DateTime |
| * | Nullable<T> |

## Preparation

Expand Down
5 changes: 3 additions & 2 deletions wrappers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ bigquery_fdw = [
"yup-oauth2",
"thiserror",
]
clickhouse_fdw = ["clickhouse-rs", "chrono", "chrono-tz", "regex", "thiserror"]
clickhouse_fdw = ["clickhouse-rs", "chrono", "chrono-tz", "regex", "thiserror", "either"]
stripe_fdw = [
"http",
"reqwest",
Expand Down Expand Up @@ -170,11 +170,12 @@ pgrx = { version = "=0.11.3" }
supabase-wrappers = { path = "../supabase-wrappers", default-features = false }

# for clickhouse_fdw
clickhouse-rs = { git = "https://github.com/suharev7/clickhouse-rs", rev = "ecf28f4677", features = [
clickhouse-rs = { git = "https://github.com/suharev7/clickhouse-rs", rev = "abfe517101eefe36bfdb7da248690659aa284ad3", features = [
"tls",
], optional = true }
chrono = { version = "0.4", optional = true }
chrono-tz = { version = "0.6", optional = true }
either = { version = "1.12.0", optional = true }


# for bigquery_fdw, firebase_fdw, airtable_fdw and etc.
Expand Down
1 change: 1 addition & 0 deletions wrappers/src/fdw/clickhouse_fdw/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ This is a foreign data wrapper for [ClickHouse](https://clickhouse.com/). It is

| Version | Date | Notes |
| ------- | ---------- | ---------------------------------------------------- |
| 0.1.4 | 2024-09-10 | Added Nullable type suppport |
| 0.1.3 | 2023-07-17 | Added sort and limit pushdown suppport |
| 0.1.2 | 2023-07-13 | Added fdw stats collection |
| 0.1.1 | 2023-05-19 | Added custom sql support |
Expand Down
155 changes: 130 additions & 25 deletions wrappers/src/fdw/clickhouse_fdw/clickhouse_fdw.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use crate::stats;
#[allow(deprecated)]
use chrono::{Date, DateTime, Datelike, NaiveDate, NaiveDateTime, Utc};
use chrono_tz::Tz;
use chrono::{DateTime, Datelike, NaiveDate, NaiveDateTime, Utc};
use clickhouse_rs::{types, types::Block, types::SqlType, ClientHandle, Pool};
use pgrx::to_timestamp;
use regex::{Captures, Regex};
Expand Down Expand Up @@ -56,8 +55,7 @@ fn field_to_cell(row: &types::Row<types::Complex>, i: usize) -> ClickHouseFdwRes
Ok(Some(Cell::String(value)))
}
SqlType::Date => {
#[allow(deprecated)]
let value = row.get::<Date<_>, usize>(i)?;
let value = row.get::<NaiveDate, usize>(i)?;
let dt = pgrx::Date::new(value.year(), value.month() as u8, value.day() as u8)?;
Ok(Some(Cell::Date(dt)))
}
Expand All @@ -66,14 +64,73 @@ fn field_to_cell(row: &types::Row<types::Complex>, i: usize) -> ClickHouseFdwRes
let ts = to_timestamp(value.timestamp() as f64);
Ok(Some(Cell::Timestamp(ts.to_utc())))
}
SqlType::Nullable(v) => match v {
SqlType::UInt8 => {
let value = row.get::<Option<u8>, usize>(i)?;
Ok(value.map(|t| Cell::Bool(t != 0)))
}
SqlType::Int16 => {
let value = row.get::<Option<i16>, usize>(i)?;
Ok(value.map(Cell::I16))
}
SqlType::UInt16 => {
let value = row.get::<Option<u16>, usize>(i)?;
Ok(value.map(|t| Cell::I32(t as _)))
}
SqlType::Int32 => {
let value = row.get::<Option<i32>, usize>(i)?;
Ok(value.map(Cell::I32))
}
SqlType::UInt32 => {
let value = row.get::<Option<u32>, usize>(i)?;
Ok(value.map(|t| Cell::I64(t as _)))
}
SqlType::Float32 => {
let value = row.get::<Option<f32>, usize>(i)?;
Ok(value.map(Cell::F32))
}
SqlType::Float64 => {
let value = row.get::<Option<f64>, usize>(i)?;
Ok(value.map(Cell::F64))
}
SqlType::UInt64 => {
let value = row.get::<Option<u64>, usize>(i)?;
Ok(value.map(|t| Cell::I64(t as _)))
}
SqlType::Int64 => {
let value = row.get::<Option<i64>, usize>(i)?;
Ok(value.map(Cell::I64))
}
SqlType::String => {
let value = row.get::<Option<String>, usize>(i)?;
Ok(value.map(Cell::String))
}
SqlType::Date => {
let value = row.get::<Option<NaiveDate>, usize>(i)?;
Ok(value
.map(|t| pgrx::Date::new(t.year(), t.month() as u8, t.day() as u8))
.transpose()?
.map(Cell::Date))
}
SqlType::DateTime(_) => {
let value = row.get::<Option<DateTime<_>>, usize>(i)?;
Ok(value.map(|t| {
let ts = to_timestamp(t.timestamp() as f64);
Cell::Timestamp(ts.to_utc())
}))
}
_ => Err(ClickHouseFdwError::UnsupportedColumnType(
sql_type.to_string().into(),
)),
},
_ => Err(ClickHouseFdwError::UnsupportedColumnType(
sql_type.to_string().into(),
)),
}
}

#[wrappers_fdw(
version = "0.1.3",
version = "0.1.4",
author = "Supabase",
website = "https://github.com/supabase/wrappers/tree/main/wrappers/src/fdw/clickhouse_fdw",
error_type = "ClickHouseFdwError"
Expand Down Expand Up @@ -283,9 +340,6 @@ impl ForeignDataWrapper<ClickHouseFdwError> for ClickHouseFdw {
.unwrap();
let cell = field_to_cell(&src_row, i)?;
let col_name = src_row.name(i).unwrap();
if cell.as_ref().is_none() {
return Ok(None);
}
row.push(col_name, cell);
}
self.row_idx += 1;
Expand All @@ -310,33 +364,84 @@ impl ForeignDataWrapper<ClickHouseFdwError> for ClickHouseFdw {

fn insert(&mut self, src: &Row) -> ClickHouseFdwResult<()> {
if let Some(ref mut client) = self.client {
// use a dummy query to probe column types
let sql = format!("select * from {} where false", self.table);
let probe = self.rt.block_on(client.query(&sql).fetch_all())?;

// add row to block
let mut row = Vec::new();
for (col_name, cell) in src.iter() {
let col_name = col_name.to_owned();
if let Some(cell) = cell {
match cell {
Cell::Bool(v) => row.push((col_name, types::Value::from(*v))),
Cell::F64(v) => row.push((col_name, types::Value::from(*v))),
Cell::I64(v) => row.push((col_name, types::Value::from(*v))),
Cell::String(v) => row.push((col_name, types::Value::from(v.as_str()))),
let tgt_col = probe.get_column(col_name.as_ref())?;
let is_nullable = matches!(tgt_col.sql_type(), SqlType::Nullable(_));

let value = cell
.as_ref()
.map(|c| match c {
Cell::Bool(v) => {
let val = if is_nullable {
types::Value::from(Some(*v))
} else {
types::Value::from(*v)
};
Ok(val)
}
Cell::F64(v) => {
let val = if is_nullable {
types::Value::from(Some(*v))
} else {
types::Value::from(*v)
};
Ok(val)
}
Cell::I64(v) => {
let val = if is_nullable {
types::Value::from(Some(*v))
} else {
types::Value::from(*v)
};
Ok(val)
}
Cell::String(v) => {
let s = v.as_str();
let val = if is_nullable {
types::Value::from(Some(s))
} else {
types::Value::from(s)
};
Ok(val)
}
Cell::Date(_) => {
let s = cell.to_string().replace('\'', "");
let s = c.to_string().replace('\'', "");
let tm = NaiveDate::parse_from_str(&s, "%Y-%m-%d")?;
let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
let duration = tm - epoch;
let dt = types::Value::Date(duration.num_days() as u16, Tz::UTC);
row.push((col_name, dt));
let dt = duration.num_days() as u16;
let val = if is_nullable {
types::Value::from(Some(dt))
} else {
types::Value::Date(dt)
};
Ok(val)
}
Cell::Timestamp(_) => {
let s = cell.to_string().replace('\'', "");
let tm = NaiveDateTime::parse_from_str(&s, "%Y-%m-%d %H:%M:%S")?;
let tm: DateTime<Utc> = DateTime::from_naive_utc_and_offset(tm, Utc);
row.push((col_name, types::Value::from(tm)));
}
_ => {
return Err(ClickHouseFdwError::UnsupportedColumnType(cell.to_string()))
let s = c.to_string().replace('\'', "");
let naive_tm = NaiveDateTime::parse_from_str(&s, "%Y-%m-%d %H:%M:%S")?;
let tm: DateTime<Utc> =
DateTime::from_naive_utc_and_offset(naive_tm, Utc);
let val = if is_nullable {
types::Value::Nullable(either::Either::Right(Box::new(tm.into())))
} else {
types::Value::from(tm)
};
Ok(val)
}
}
_ => Err(ClickHouseFdwError::UnsupportedColumnType(c.to_string())),
})
.transpose()?;

if let Some(v) = value {
row.push((col_name, v));
}
}
let mut block = Block::new();
Expand Down
Loading

0 comments on commit ac3edd6

Please sign in to comment.