Skip to content

Commit

Permalink
Initial support for extract(x from time)
Browse files Browse the repository at this point in the history
  • Loading branch information
Jefffrey committed Dec 31, 2023
1 parent 545275b commit 99f2073
Show file tree
Hide file tree
Showing 4 changed files with 243 additions and 46 deletions.
37 changes: 29 additions & 8 deletions datafusion/common/src/cast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,18 @@ use crate::{downcast_value, DataFusionError, Result};
use arrow::{
array::{
Array, BinaryArray, BooleanArray, Date32Array, Date64Array, Decimal128Array,
DictionaryArray, FixedSizeBinaryArray, FixedSizeListArray, Float32Array,
Float64Array, GenericBinaryArray, GenericListArray, GenericStringArray,
Int32Array, Int64Array, IntervalDayTimeArray, IntervalMonthDayNanoArray,
IntervalYearMonthArray, LargeListArray, ListArray, MapArray, NullArray,
OffsetSizeTrait, PrimitiveArray, StringArray, StructArray,
TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
TimestampSecondArray, UInt32Array, UInt64Array, UInt8Array, UnionArray,
Decimal256Array, DictionaryArray, FixedSizeBinaryArray, FixedSizeListArray,
Float32Array, Float64Array, GenericBinaryArray, GenericListArray,
GenericStringArray, Int32Array, Int64Array, IntervalDayTimeArray,
IntervalMonthDayNanoArray, IntervalYearMonthArray, LargeListArray, ListArray,
MapArray, NullArray, OffsetSizeTrait, PrimitiveArray, StringArray, StructArray,
Time32MillisecondArray, Time32SecondArray, Time64MicrosecondArray,
Time64NanosecondArray, TimestampMicrosecondArray, TimestampMillisecondArray,
TimestampNanosecondArray, TimestampSecondArray, UInt32Array, UInt64Array,
UInt8Array, UnionArray,
},
datatypes::{ArrowDictionaryKeyType, ArrowPrimitiveType},
};
use arrow_array::Decimal256Array;

// Downcast ArrayRef to Date32Array
pub fn as_date32_array(array: &dyn Array) -> Result<&Date32Array> {
Expand Down Expand Up @@ -154,6 +155,26 @@ pub fn as_union_array(array: &dyn Array) -> Result<&UnionArray> {
Ok(downcast_value!(array, UnionArray))
}

// Downcast ArrayRef to Time32SecondArray
pub fn as_time32_second_array(array: &dyn Array) -> Result<&Time32SecondArray> {
Ok(downcast_value!(array, Time32SecondArray))
}

// Downcast ArrayRef to Time32MillisecondArray
pub fn as_time32_millisecond_array(array: &dyn Array) -> Result<&Time32MillisecondArray> {
Ok(downcast_value!(array, Time32MillisecondArray))
}

// Downcast ArrayRef to Time64MicrosecondArray
pub fn as_time64_microsecond_array(array: &dyn Array) -> Result<&Time64MicrosecondArray> {
Ok(downcast_value!(array, Time64MicrosecondArray))
}

// Downcast ArrayRef to Time64NanosecondArray
pub fn as_time64_nanosecond_array(array: &dyn Array) -> Result<&Time64NanosecondArray> {
Ok(downcast_value!(array, Time64NanosecondArray))
}

// Downcast ArrayRef to TimestampNanosecondArray
pub fn as_timestamp_nanosecond_array(
array: &dyn Array,
Expand Down
106 changes: 106 additions & 0 deletions datafusion/core/tests/sql/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -739,6 +739,112 @@ async fn test_extract_date_part() -> Result<()> {
Ok(())
}

#[tokio::test]
async fn test_extract_time() -> Result<()> {
// time32 seconds
let time32_seconds = "arrow_cast('23:32:50'::time, 'Time32(Second)')";
test_expression!(format!("extract(hour from {time32_seconds})"), "23.0");
test_expression!(format!("extract(minute from {time32_seconds})"), "32.0");
test_expression!(format!("extract(second from {time32_seconds})"), "50.0");
test_expression!(
format!("extract(millisecond from {time32_seconds})"),
"50000.0"
);
test_expression!(
format!("extract(microsecond from {time32_seconds})"),
"50000000.0"
);
test_expression!(
format!("extract(nanosecond from {time32_seconds})"),
"50000000000.0"
);
test_expression!(format!("extract(epoch from {time32_seconds})"), "84770.0");

// time32 milliseconds
let time32_milliseconds = "arrow_cast('23:32:50.123'::time, 'Time32(Millisecond)')";
test_expression!(format!("extract(hour from {time32_milliseconds})"), "23.0");
test_expression!(
format!("extract(minute from {time32_milliseconds})"),
"32.0"
);
test_expression!(
format!("extract(second from {time32_milliseconds})"),
"50.123"
);
test_expression!(
format!("extract(millisecond from {time32_milliseconds})"),
"50123.0"
);
test_expression!(
format!("extract(microsecond from {time32_milliseconds})"),
"50123000.0"
);
test_expression!(
format!("extract(nanosecond from {time32_milliseconds})"),
"50123000000.0"
);
test_expression!(
format!("extract(epoch from {time32_milliseconds})"),
"84770.123"
);

// time64 microseconds
let time64_microseconds =
"arrow_cast('23:32:50.123456'::time, 'Time32(Microsecond)')";
test_expression!(format!("extract(hour from {time64_microseconds})"), "23.0");
test_expression!(
format!("extract(minute from {time64_microseconds})"),
"32.0"
);
test_expression!(
format!("extract(second from {time64_microseconds})"),
"50.123456"
);
test_expression!(
format!("extract(millisecond from {time64_microseconds})"),
"50123.456"
);
test_expression!(
format!("extract(microsecond from {time64_microseconds})"),
"50123456.0"
);
test_expression!(
format!("extract(nanosecond from {time64_microseconds})"),
"50123456000.0"
);
test_expression!(
format!("extract(epoch from {time64_microseconds})"),
"84770.123456"
);

// time64 nanoseconds
let time64_nanoseconds =
"arrow_cast('23:32:50.123456789'::time, 'Time32(Nanosecond)')";
test_expression!(format!("extract(hour from {time64_nanoseconds})"), "23.0");
test_expression!(format!("extract(minute from {time64_nanoseconds})"), "32.0");
test_expression!(
format!("extract(second from {time64_nanoseconds})"),
"50.123456789"
);
test_expression!(
format!("extract(millisecond from {time64_nanoseconds})"),
"50123.456789"
);
test_expression!(
format!("extract(microsecond from {time64_nanoseconds})"),
"50123456.789"
);
test_expression!(
format!("extract(nanosecond from {time64_nanoseconds})"),
"50123456789.0"
);
test_expression!(
format!("extract(epoch from {time64_nanoseconds})"),
"84770.123456789"
);
Ok(())
}

#[tokio::test]
async fn test_extract_epoch() -> Result<()> {
test_expression!(
Expand Down
11 changes: 11 additions & 0 deletions datafusion/expr/src/built_in_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1210,6 +1210,7 @@ impl BuiltinScalarFunction {
}
BuiltinScalarFunction::DatePart => Signature::one_of(
vec![
// timestamps
Exact(vec![Utf8, Timestamp(Nanosecond, None)]),
Exact(vec![
Utf8,
Expand All @@ -1230,8 +1231,18 @@ impl BuiltinScalarFunction {
Utf8,
Timestamp(Second, Some(TIMEZONE_WILDCARD.into())),
]),
// dates
Exact(vec![Utf8, Date64]),
Exact(vec![Utf8, Date32]),
// time
Exact(vec![Utf8, Time64(Nanosecond)]),
Exact(vec![Utf8, Time64(Microsecond)]),
Exact(vec![Utf8, Time64(Millisecond)]),
Exact(vec![Utf8, Time64(Second)]),
Exact(vec![Utf8, Time32(Nanosecond)]),
Exact(vec![Utf8, Time32(Microsecond)]),
Exact(vec![Utf8, Time32(Millisecond)]),
Exact(vec![Utf8, Time32(Second)]),
],
self.volatility(),
),
Expand Down
135 changes: 97 additions & 38 deletions datafusion/physical-expr/src/datetime_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
use crate::datetime_expressions;
use crate::expressions::cast_column;
use arrow::array::Float64Builder;
use arrow::compute::cast;
use arrow::{
array::{Array, ArrayRef, Float64Array, OffsetSizeTrait, PrimitiveArray},
Expand All @@ -42,8 +41,10 @@ use chrono::prelude::*;
use chrono::{Duration, Months, NaiveDate};
use datafusion_common::cast::{
as_date32_array, as_date64_array, as_generic_string_array, as_primitive_array,
as_timestamp_microsecond_array, as_timestamp_millisecond_array,
as_timestamp_nanosecond_array, as_timestamp_second_array,
as_time32_millisecond_array, as_time32_second_array, as_time64_microsecond_array,
as_time64_nanosecond_array, as_timestamp_microsecond_array,
as_timestamp_millisecond_array, as_timestamp_nanosecond_array,
as_timestamp_second_array,
};
use datafusion_common::{
exec_err, internal_err, not_impl_err, DataFusionError, Result, ScalarType,
Expand Down Expand Up @@ -790,6 +791,34 @@ macro_rules! extract_date_part {
};
}

macro_rules! extract_time_part {
($ARRAY: expr, $FN:expr) => {
match $ARRAY.data_type() {
DataType::Time32(TimeUnit::Second) => {
let array = as_time32_second_array($ARRAY)?;
Ok($FN(array)
.map(|v| cast(&(Arc::new(v) as ArrayRef), &DataType::Float64))?)
}
DataType::Time32(TimeUnit::Millisecond) => {
let array = as_time32_millisecond_array($ARRAY)?;
Ok($FN(array)
.map(|v| cast(&(Arc::new(v) as ArrayRef), &DataType::Float64))?)
}
DataType::Time64(TimeUnit::Microsecond) => {
let array = as_time64_microsecond_array($ARRAY)?;
Ok($FN(array)
.map(|v| cast(&(Arc::new(v) as ArrayRef), &DataType::Float64))?)
}
DataType::Time64(TimeUnit::Nanosecond) => {
let array = as_time64_nanosecond_array($ARRAY)?;
Ok($FN(array)
.map(|v| cast(&(Arc::new(v) as ArrayRef), &DataType::Float64))?)
}
datatype => internal_err!("Extract does not support datatype {:?}", datatype),
}
};
}

/// DATE_PART SQL function
pub fn date_part(args: &[ColumnarValue]) -> Result<ColumnarValue> {
if args.len() != 2 {
Expand All @@ -810,22 +839,41 @@ pub fn date_part(args: &[ColumnarValue]) -> Result<ColumnarValue> {
ColumnarValue::Scalar(scalar) => scalar.to_array()?,
};

let arr = match date_part.to_lowercase().as_str() {
"year" => extract_date_part!(&array, temporal::year),
"quarter" => extract_date_part!(&array, temporal::quarter),
"month" => extract_date_part!(&array, temporal::month),
"week" => extract_date_part!(&array, temporal::week),
"day" => extract_date_part!(&array, temporal::day),
"doy" => extract_date_part!(&array, temporal::doy),
"dow" => extract_date_part!(&array, temporal::num_days_from_sunday),
"hour" => extract_date_part!(&array, temporal::hour),
"minute" => extract_date_part!(&array, temporal::minute),
"second" => extract_date_part!(&array, seconds),
"millisecond" => extract_date_part!(&array, millis),
"microsecond" => extract_date_part!(&array, micros),
"nanosecond" => extract_date_part!(&array, nanos),
"epoch" => extract_date_part!(&array, epoch),
_ => exec_err!("Date part '{date_part}' not supported"),
let arr = match array.data_type() {
DataType::Date32 | DataType::Date64 | DataType::Timestamp(_, _) => {
match date_part.to_lowercase().as_str() {
"year" => extract_date_part!(&array, temporal::year),
"quarter" => extract_date_part!(&array, temporal::quarter),
"month" => extract_date_part!(&array, temporal::month),
"week" => extract_date_part!(&array, temporal::week),
"day" => extract_date_part!(&array, temporal::day),
"doy" => extract_date_part!(&array, temporal::doy),
"dow" => extract_date_part!(&array, temporal::num_days_from_sunday),
"hour" => extract_date_part!(&array, temporal::hour),
"minute" => extract_date_part!(&array, temporal::minute),
"second" => extract_date_part!(&array, seconds),
"millisecond" => extract_date_part!(&array, millis),
"microsecond" => extract_date_part!(&array, micros),
"nanosecond" => extract_date_part!(&array, nanos),
"epoch" => extract_date_part!(&array, epoch),
_ => exec_err!(
"Date part '{date_part}' not supported for date/timestamp types"
),
}
}
DataType::Time32(_) | DataType::Time64(_) => {
match date_part.to_lowercase().as_str() {
"hour" => extract_time_part!(&array, temporal::hour),
"minute" => extract_time_part!(&array, temporal::minute),
"second" => extract_time_part!(&array, seconds),
"millisecond" => extract_time_part!(&array, millis),
"microsecond" => extract_time_part!(&array, micros),
"nanosecond" => extract_time_part!(&array, nanos),
"epoch" => extract_time_part!(&array, epoch),
_ => exec_err!("Date part '{date_part}' not supported for time types"),
}
}
datatype => internal_err!("Extract does not support datatype {:?}", datatype),
}?;

Ok(if is_scalar {
Expand Down Expand Up @@ -887,28 +935,39 @@ where
T: ArrowTemporalType + ArrowNumericType,
i64: From<T::Native>,
{
let mut b = Float64Builder::with_capacity(array.len());
match array.data_type() {
let b = match array.data_type() {
DataType::Timestamp(tu, _) => {
for i in 0..array.len() {
if array.is_null(i) {
b.append_null();
} else {
let scale = match tu {
TimeUnit::Second => 1,
TimeUnit::Millisecond => 1_000,
TimeUnit::Microsecond => 1_000_000,
TimeUnit::Nanosecond => 1_000_000_000,
};

let n: i64 = array.value(i).into();
b.append_value(n as f64 / scale as f64);
}
}
let scale = match tu {
TimeUnit::Second => 1,
TimeUnit::Millisecond => 1_000,
TimeUnit::Microsecond => 1_000_000,
TimeUnit::Nanosecond => 1_000_000_000,
} as f64;

array.unary(|n| {
let n: i64 = n.into();
n as f64 / scale
})
}
DataType::Time32(TimeUnit::Second) => array.unary(|n| {
let n: i64 = n.into();
n as f64
}),
DataType::Time32(TimeUnit::Millisecond) => array.unary(|n| {
let n: i64 = n.into();
n as f64 / 1_000_f64
}),
DataType::Time64(TimeUnit::Microsecond) => array.unary(|n| {
let n: i64 = n.into();
n as f64 / 1_000_000_f64
}),
DataType::Time64(TimeUnit::Nanosecond) => array.unary(|n| {
let n: i64 = n.into();
n as f64 / 1_000_000_000_f64
}),
_ => return internal_err!("Can not convert {:?} to epoch", array.data_type()),
}
Ok(b.finish())
};
Ok(b)
}

/// to_timestammp() SQL function implementation
Expand Down

0 comments on commit 99f2073

Please sign in to comment.