From 0cae5a28aa34e17f8d06c278483c36238cd1cbc3 Mon Sep 17 00:00:00 2001 From: Yanxin Xiang Date: Mon, 8 Apr 2024 08:25:32 -0500 Subject: [PATCH] adding timezone when casting timestamp --- arrow-array/src/array/primitive_array.rs | 8 +- arrow-array/src/temporal_conversions.rs | 8 +- arrow-array/src/timezone.rs | 7 +- arrow-cast/src/cast/mod.rs | 115 +++++++++++++++++++++-- 4 files changed, 129 insertions(+), 9 deletions(-) diff --git a/arrow-array/src/array/primitive_array.rs b/arrow-array/src/array/primitive_array.rs index ddae770d383d..bdbd7cade48c 100644 --- a/arrow-array/src/array/primitive_array.rs +++ b/arrow-array/src/array/primitive_array.rs @@ -1366,7 +1366,13 @@ impl PrimitiveArray { /// Construct a timestamp array with new timezone pub fn with_timezone(self, timezone: impl Into>) -> Self { - self.with_timezone_opt(Some(timezone.into())) + let timezone_str = timezone.into().to_lowercase(); + + if timezone_str == "utc" { + self.with_timezone_utc() + } else { + self.with_timezone_opt(Some(Arc::from(timezone_str))) + } } /// Construct a timestamp array with UTC diff --git a/arrow-array/src/temporal_conversions.rs b/arrow-array/src/temporal_conversions.rs index 8d238b3a196c..b86d10a59873 100644 --- a/arrow-array/src/temporal_conversions.rs +++ b/arrow-array/src/temporal_conversions.rs @@ -39,7 +39,8 @@ pub const MICROSECONDS_IN_DAY: i64 = SECONDS_IN_DAY * MICROSECONDS; pub const NANOSECONDS_IN_DAY: i64 = SECONDS_IN_DAY * NANOSECONDS; /// Number of days between 0001-01-01 and 1970-01-01 pub const EPOCH_DAYS_FROM_CE: i32 = 719_163; - +/// Default time zone +pub const DEFAULT_TIME_ZONE: &str = "00:00"; /// converts a `i32` representing a `date32` to [`NaiveDateTime`] #[inline] pub fn date32_to_datetime(v: i32) -> Option { @@ -127,6 +128,11 @@ pub fn time_to_time64us(v: NaiveTime) -> i64 { pub fn time_to_time64ns(v: NaiveTime) -> i64 { v.num_seconds_from_midnight() as i64 * NANOSECONDS + v.nanosecond() as i64 } +/// converts [`NaiveTime`] to a `i64` representing a `time64(ms)` +pub fn time_to_time64ms(v: NaiveTime) -> i64 { + v.num_seconds_from_midnight() as i64 * MILLISECONDS + + v.nanosecond() as i64 * MILLISECONDS / NANOSECONDS +} /// converts a `i64` representing a `timestamp(s)` to [`NaiveDateTime`] #[inline] diff --git a/arrow-array/src/timezone.rs b/arrow-array/src/timezone.rs index b4df77deb4f5..3fff618948c8 100644 --- a/arrow-array/src/timezone.rs +++ b/arrow-array/src/timezone.rs @@ -257,7 +257,12 @@ mod private { /// An Arrow [`TimeZone`] #[derive(Debug, Copy, Clone)] pub struct Tz(FixedOffset); - + impl Tz { + /// get timezone + pub fn get_time_zone(&self) -> FixedOffset { + self.0 + } + } impl FromStr for Tz { type Err = ArrowError; diff --git a/arrow-cast/src/cast/mod.rs b/arrow-cast/src/cast/mod.rs index 3e2bf4392ff0..830d1fd5b3e2 100644 --- a/arrow-cast/src/cast/mod.rs +++ b/arrow-cast/src/cast/mod.rs @@ -1590,10 +1590,77 @@ pub fn cast_with_options( to_tz.clone(), )) } - (Timestamp(from_unit, _), Date32) => { - let array = cast_with_options(array, &Int64, cast_options)?; + (Timestamp(TimeUnit::Microsecond, tz), Date32) => { + let tz: Option = tz.as_ref().map(|tz| tz.parse()).transpose()?; + let array = array + .as_primitive::() + .try_unary::<_, TimestampMillisecondType, ArrowError>(|x| { + if let Some(tzone) = tz { + let res = MICROSECONDS * tzone.get_time_zone().local_minus_utc() as i64 + x; + Ok(res) + } else { + Ok(x.into()) + } + })?; + let array = cast_with_options(&array, &Int64, cast_options)?; + let time_array = array.as_primitive::(); + let from_size = MICROSECONDS * SECONDS_IN_DAY; + + let mut b = Date32Builder::with_capacity(array.len()); + + for i in 0..array.len() { + if time_array.is_null(i) { + b.append_null(); + } else { + let val = num::integer::div_floor::(time_array.value(i), from_size) as i32; + b.append_value(val); + } + } + Ok(Arc::new(b.finish()) as ArrayRef) + } + (Timestamp(TimeUnit::Millisecond, tz), Date32) => { + let tz: Option = tz.as_ref().map(|tz| tz.parse()).transpose()?; + let array = array + .as_primitive::() + .try_unary::<_, TimestampMillisecondType, ArrowError>(|x| { + if let Some(tzone) = tz { + let res = MILLISECONDS * tzone.get_time_zone().local_minus_utc() as i64 + x; + Ok(res) + } else { + Ok(x.into()) + } + })?; + let array = cast_with_options(&array, &Int64, cast_options)?; + let time_array = array.as_primitive::(); + let from_size = MILLISECONDS * SECONDS_IN_DAY; + + let mut b = Date32Builder::with_capacity(array.len()); + + for i in 0..array.len() { + if time_array.is_null(i) { + b.append_null(); + } else { + let val = num::integer::div_floor::(time_array.value(i), from_size) as i32; + b.append_value(val); + } + } + Ok(Arc::new(b.finish()) as ArrayRef) + } + (Timestamp(TimeUnit::Second, tz), Date32) => { + let tz: Option = tz.as_ref().map(|tz| tz.parse()).transpose()?; + let array = array + .as_primitive::() + .try_unary::<_, TimestampMillisecondType, ArrowError>(|x| { + if let Some(tzone) = tz { + let res = tzone.get_time_zone().local_minus_utc() as i64 + x; + Ok(res) + } else { + Ok(x.into()) + } + })?; + let array = cast_with_options(&array, &Int64, cast_options)?; let time_array = array.as_primitive::(); - let from_size = time_unit_multiple(from_unit) * SECONDS_IN_DAY; + let from_size = 1 * SECONDS_IN_DAY; let mut b = Date32Builder::with_capacity(array.len()); @@ -1601,12 +1668,38 @@ pub fn cast_with_options( if time_array.is_null(i) { b.append_null(); } else { - b.append_value( - num::integer::div_floor::(time_array.value(i), from_size) as i32, - ); + let val = num::integer::div_floor::(time_array.value(i), from_size) as i32; + b.append_value(val); } } + Ok(Arc::new(b.finish()) as ArrayRef) + } + (Timestamp(TimeUnit::Nanosecond, tz), Date32) => { + let tz: Option = tz.as_ref().map(|tz| tz.parse()).transpose()?; + let array = array + .as_primitive::() + .try_unary::<_, TimestampMillisecondType, ArrowError>(|x| { + if let Some(tzone) = tz { + let res = NANOSECONDS * tzone.get_time_zone().local_minus_utc() as i64 + x; + Ok(res) + } else { + Ok(x.into()) + } + })?; + let array = cast_with_options(&array, &Int64, cast_options)?; + let time_array = array.as_primitive::(); + let from_size = NANOSECONDS * SECONDS_IN_DAY; + let mut b = Date32Builder::with_capacity(array.len()); + + for i in 0..array.len() { + if time_array.is_null(i) { + b.append_null(); + } else { + let val = num::integer::div_floor::(time_array.value(i), from_size) as i32; + b.append_value(val); + } + } Ok(Arc::new(b.finish()) as ArrayRef) } (Timestamp(TimeUnit::Second, _), Date64) => Ok(Arc::new(match cast_options.safe { @@ -4441,7 +4534,17 @@ mod tests { assert_eq!(17890, c.value(1)); assert!(c.is_null(2)); } + #[test] + fn test_cast_timestamp_to_date32_zone() { + let array = TimestampMillisecondArray::from(vec![Some(25201000), Some(111599000), None]) + .with_timezone("-07:00".to_string()); + let b = cast(&array, &DataType::Date32).unwrap(); + let c = b.as_primitive::(); + assert_eq!(0, c.value(0)); + assert_eq!(0, c.value(1)); + assert!(c.is_null(2)); + } #[test] fn test_cast_timestamp_to_date64() { let array =