Skip to content

Commit

Permalink
a way to pass row into a block (suharev7#48)
Browse files Browse the repository at this point in the history
  • Loading branch information
suharev7 committed Sep 23, 2019
1 parent c401657 commit 324f9cd
Show file tree
Hide file tree
Showing 21 changed files with 499 additions and 81 deletions.
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "clickhouse-rs"
version = "0.1.14"
version = "0.1.15"
authors = ["Mikhail Sukharev <[email protected]>"]
license = "MIT"
homepage = "https://github.com/suharev7/clickhouse-rs"
Expand Down Expand Up @@ -34,4 +34,4 @@ lazy_static = "1.3.0"

[dev-dependencies]
env_logger = "0.6.2"
rand = "0.7.0"
rand = "0.7.0"
15 changes: 7 additions & 8 deletions examples/simple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::env;

use futures::Future;

use clickhouse_rs::{types::Block, Pool};
use clickhouse_rs::{row, types::Block, Pool};

fn main() {
env::set_var("RUST_LOG", "clickhouse_rs=debug");
Expand All @@ -18,13 +18,12 @@ fn main() {
account_name Nullable(FixedString(3))
) Engine=Memory";

let block = Block::new()
.add_column("customer_id", vec![1_u32, 3, 5, 7, 9])
.add_column("amount", vec![2_u32, 4, 6, 8, 10])
.add_column(
"account_name",
vec![Some("foo"), None, None, None, Some("bar")],
);
let mut block = Block::new();
block.push(row!{ customer_id: 1_u32, amount: 2_u32, account_name: Some("foo") }).unwrap();
block.push(row!{ customer_id: 3_u32, amount: 4_u32, account_name: None::<&str> }).unwrap();
block.push(row!{ customer_id: 5_u32, amount: 6_u32, account_name: None::<&str> }).unwrap();
block.push(row!{ customer_id: 7_u32, amount: 8_u32, account_name: None::<&str> }).unwrap();
block.push(row!{ customer_id: 9_u32, amount: 10_u32, account_name: Some("bar") }).unwrap();

let database_url =
env::var("DATABASE_URL").unwrap_or_else(|_| "tcp://localhost:9000?compression=lz4".into());
Expand Down
59 changes: 59 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,65 @@ mod retry_guard;
/// Clickhouse types.
pub mod types;

/// This macro is a convenient way to pass row into a block.
///
/// ```rust
/// # use clickhouse_rs::{types::Block, row, errors::Error};
/// # fn make_block() -> Result<(), Error> {
/// let mut block = Block::new();
/// block.push(row!{customer_id: 1, amount: 2, account_name: "foo"})?;
/// block.push(row!{customer_id: 4, amount: 4, account_name: "bar"})?;
/// block.push(row!{customer_id: 5, amount: 5, account_name: "baz"})?;
/// # assert_eq!(block.row_count(), 3);
/// # Ok(())
/// # }
/// # make_block().unwrap()
/// ```
///
/// you can also use `Vec<(String, Value)>` to construct row to insert into a block:
///
/// ```rust
/// # use clickhouse_rs::{types::Block, errors::Error, types::Value};
/// # fn make_block() -> Result<(), Error> {
/// let mut block = Block::new();
/// for i in 1..10 {
/// let mut row = Vec::new();
/// for j in 1..10 {
/// row.push((format!("#{}", j), Value::from(i * j)));
/// }
/// block.push(row)?;
/// }
/// assert_eq!(block.row_count(), 9);
/// # println!("{:?}", block);
/// # Ok(())
/// # }
/// # make_block().unwrap()
/// ```
#[macro_export]
macro_rules! row {
() => { $crate::types::RNil };
( $i:ident, $($tail:tt)* ) => {
row!( $($tail)* ).put(stringify!($i).into(), $i.into())
};
( $i:ident ) => { row!($i: $i) };

( $k:ident: $v:expr ) => {
$crate::types::RNil.put(stringify!($k).into(), $v.into())
};

( $k:ident: $v:expr, $($tail:tt)* ) => {
row!( $($tail)* ).put(stringify!($k).into(), $v.into())
};

( $k:expr => $v:expr ) => {
$crate::types::RNil.put($k.into(), $v.into())
};

( $k:expr => $v:expr, $($tail:tt)* ) => {
row!( $($tail)* ).put($k.into(), $v.into())
};
}

macro_rules! try_opt {
($expr:expr) => {
match $expr {
Expand Down
190 changes: 190 additions & 0 deletions src/types/block/builder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
use std::borrow::Cow;

use chrono_tz::Tz;

use crate::{
Block,
errors::{Error, FromSqlError},
types::{
block::ColumnIdx,
column::{ArcColumnWrapper, ColumnData, Either},
Column,
Value
},
};

pub trait RowBuilder {
fn apply(self, block: &mut Block) -> Result<(), Error>;
}

pub struct RNil;

pub struct RCons<T>
where
T: RowBuilder,
{
key: Cow<'static, str>,
value: Value,
tail: T,
}

impl RNil {
pub fn put(self, key: Cow<'static, str>, value: Value) -> RCons<Self> {
RCons {
key,
value,
tail: RNil,
}
}
}

impl<T> RCons<T>
where
T: RowBuilder,
{
pub fn put(self, key: Cow<'static, str>, value: Value) -> RCons<Self> {
RCons {
key,
value,
tail: self,
}
}
}

impl RowBuilder for RNil {
#[inline(always)]
fn apply(self, _block: &mut Block) -> Result<(), Error> {
Ok(())
}
}

impl<T> RowBuilder for RCons<T>
where
T: RowBuilder,
{
#[inline(always)]
fn apply(self, block: &mut Block) -> Result<(), Error> {
put_param(self.key, self.value, block)?;
self.tail.apply(block)
}
}

impl RowBuilder for Vec<(String, Value)> {
fn apply(self, block: &mut Block) -> Result<(), Error> {
for (k, v) in self {
put_param(k.into(), v, block)?;
}
Ok(())
}
}

fn put_param(key: Cow<'static, str>, value: Value, block: &mut Block) -> Result<(), Error> {
let col_index = match key.as_ref().get_index(&block.columns) {
Ok(col_index) => col_index,
Err(Error::FromSql(FromSqlError::OutOfRange)) => {
if block.row_count() <= 1 {
let sql_type = From::from(value.clone());

let timezone = extract_timezone(&value);

let column = Column {
name: key.clone().into(),
data: ColumnData::from_type::<ArcColumnWrapper>(sql_type, timezone)?,
};

block.columns.push(column);
return put_param(key, value, block);
} else {
return Err(Error::FromSql(FromSqlError::OutOfRange));
}
}
Err(err) => return Err(err),
};

block.columns[col_index].push(value);
Ok(())
}

fn extract_timezone(value: &Value) -> Tz {
match value {
Value::Date(_, tz) => *tz,
Value::DateTime(_, tz) => *tz,
Value::Nullable(Either::Right(d)) => {
extract_timezone(&&d)
}
Value::Array(_, data) => {
if let Some(v) = data.first() {
extract_timezone(v)
} else {
Tz::Zulu
}
}
_ => Tz::Zulu
}
}

#[cfg(test)]
mod test {
use chrono::prelude::*;
use chrono_tz::Tz::{self, UTC};

use crate::{row, types::{Decimal, SqlType}};

use super::*;

#[test]
fn test_push_row() {
let date_value: Date<Tz> = UTC.ymd(2016, 10, 22);
let date_time_value: DateTime<Tz> = UTC.ymd(2014, 7, 8).and_hms(14, 0, 0);

let decimal = Decimal::of(2.0_f64, 4);

let mut block = Block::new();
block.push(row!{
i8_field: 1_i8,
i16_field: 1_i16,
i32_field: 1_i32,
i64_field: 1_i64,

u8_field: 1_u8,
u16_field: 1_u16,
u32_field: 1_u32,
u64_field: 1_u64,

f32_field: 4.66_f32,
f64_field: 2.71_f64,

str_field: "text",
opt_filed: Some("text"),
nil_filed: Option::<&str>::None,

date_field: date_value,
date_time_field: date_time_value,

decimal_field: decimal
}).unwrap();

assert_eq!(block.row_count(), 1);

assert_eq!(block.columns[0].sql_type(), SqlType::Int8);
assert_eq!(block.columns[1].sql_type(), SqlType::Int16);
assert_eq!(block.columns[2].sql_type(), SqlType::Int32);
assert_eq!(block.columns[3].sql_type(), SqlType::Int64);

assert_eq!(block.columns[4].sql_type(), SqlType::UInt8);
assert_eq!(block.columns[5].sql_type(), SqlType::UInt16);
assert_eq!(block.columns[6].sql_type(), SqlType::UInt32);
assert_eq!(block.columns[7].sql_type(), SqlType::UInt64);

assert_eq!(block.columns[8].sql_type(), SqlType::Float32);
assert_eq!(block.columns[9].sql_type(), SqlType::Float64);

assert_eq!(block.columns[10].sql_type(), SqlType::String);
assert_eq!(block.columns[11].sql_type(), SqlType::Nullable(SqlType::String.into()));
assert_eq!(block.columns[12].sql_type(), SqlType::Nullable(SqlType::String.into()));

assert_eq!(block.columns[13].sql_type(), SqlType::Date);
assert_eq!(block.columns[14].sql_type(), SqlType::DateTime);
assert_eq!(block.columns[15].sql_type(), SqlType::Decimal(18, 4));
}
}
14 changes: 14 additions & 0 deletions src/types/block/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@ pub(crate) use self::row::BlockRef;
pub use self::{
block_info::BlockInfo,
row::{Row, Rows},
builder::{RowBuilder, RCons, RNil},
};

mod block_info;
mod chunk_iterator;
mod compressed;
mod row;
mod builder;

const INSERT_BLOCK_SIZE: usize = 1_048_576;

Expand Down Expand Up @@ -74,6 +76,7 @@ impl AsRef<Block> for Block {
}

impl ColumnIdx for usize {
#[inline(always)]
fn get_index(&self, _: &[Column]) -> ClickhouseResult<usize> {
Ok(*self)
}
Expand All @@ -92,6 +95,12 @@ impl<'a> ColumnIdx for &'a str {
}
}

impl ColumnIdx for String {
fn get_index(&self, columns: &[Column]) -> Result<usize, Error> {
self.as_str().get_index(columns)
}
}

impl Block {
/// Constructs a new, empty Block.
pub fn new() -> Self {
Expand Down Expand Up @@ -190,6 +199,11 @@ impl Block {
block_ref: BlockRef::Borrowed(&self),
}
}

/// This method is a convenient way to pass row into a block.
pub fn push<B: RowBuilder>(&mut self, row: B) -> Result<(), Error> {
row.apply(self)
}
}

impl Block {
Expand Down
9 changes: 8 additions & 1 deletion src/types/column/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::{
binary::{Encoder, ReadEx},
errors::Error,
types::{
column::{list::List, BoxColumnWrapper, ColumnData},
column::{list::List, BoxColumnWrapper, ColumnData, column_data::BoxColumnData},
SqlType, Value, ValueRef,
},
};
Expand Down Expand Up @@ -90,6 +90,13 @@ impl ColumnData for ArrayColumnData {
}
ValueRef::Array(sql_type.into(), Arc::new(vs))
}

fn clone_instance(&self) -> BoxColumnData {
Box::new(Self {
inner: self.inner.clone_instance(),
offsets: self.offsets.clone(),
})
}
}

#[cfg(test)]
Expand Down
Loading

0 comments on commit 324f9cd

Please sign in to comment.