Skip to content

Commit

Permalink
Consider Timezone when converting Timestamp
Browse files Browse the repository at this point in the history
  • Loading branch information
Lordworms committed Apr 9, 2024
1 parent 9fda7ea commit 026c03a
Show file tree
Hide file tree
Showing 5 changed files with 142 additions and 11 deletions.
7 changes: 6 additions & 1 deletion arrow-array/src/array/primitive_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1366,7 +1366,12 @@ impl<T: ArrowTimestampType> PrimitiveArray<T> {

/// Construct a timestamp array with new timezone
pub fn with_timezone(self, timezone: impl Into<Arc<str>>) -> Self {
self.with_timezone_opt(Some(timezone.into()))
let timezone_arc = timezone.into();
if timezone_arc.eq_ignore_ascii_case("utc") {
self.with_timezone_utc()
} else {
self.with_timezone_opt(Some(timezone_arc))
}
}

/// Construct a timestamp array with UTC
Expand Down
8 changes: 7 additions & 1 deletion arrow-array/src/temporal_conversions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ pub const MICROSECONDS_IN_DAY: i64 = SECONDS_IN_DAY * MICROSECONDS;
pub const NANOSECONDS_IN_DAY: i64 = SECONDS_IN_DAY * NANOSECONDS;
/// Number of days between 0001-01-01 and 1970-01-01
pub const EPOCH_DAYS_FROM_CE: i32 = 719_163;

/// Default time zone
pub const DEFAULT_TIME_ZONE: &str = "00:00";
/// converts a `i32` representing a `date32` to [`NaiveDateTime`]
#[inline]
pub fn date32_to_datetime(v: i32) -> Option<NaiveDateTime> {
Expand Down Expand Up @@ -127,6 +128,11 @@ pub fn time_to_time64us(v: NaiveTime) -> i64 {
pub fn time_to_time64ns(v: NaiveTime) -> i64 {
v.num_seconds_from_midnight() as i64 * NANOSECONDS + v.nanosecond() as i64
}
/// converts [`NaiveTime`] to a `i64` representing a `time64(ms)`
pub fn time_to_time64ms(v: NaiveTime) -> i64 {
v.num_seconds_from_midnight() as i64 * MILLISECONDS
+ v.nanosecond() as i64 * MILLISECONDS / NANOSECONDS
}

/// converts a `i64` representing a `timestamp(s)` to [`NaiveDateTime`]
#[inline]
Expand Down
23 changes: 21 additions & 2 deletions arrow-array/src/timezone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,21 @@ mod private {
Timezone(chrono_tz::Tz),
Offset(FixedOffset),
}

use chrono::DateTime;
use chrono::Utc;
impl Tz {
/// get timezone
pub fn get_time_zone_min(&self) -> i32 {
match self.0 {
TzInner::Timezone(tz) => {
let utc_datetime: DateTime<Utc> = Utc::now();
let local_datetime = utc_datetime.with_timezone(&tz);
local_datetime.offset().fix().local_minus_utc()
}
TzInner::Offset(offset) => offset.local_minus_utc(),
}
}
}
impl FromStr for Tz {
type Err = ArrowError;

Expand Down Expand Up @@ -257,7 +271,12 @@ mod private {
/// An Arrow [`TimeZone`]
#[derive(Debug, Copy, Clone)]
pub struct Tz(FixedOffset);

impl Tz {
/// get timezone
pub fn get_time_zone_min(&self) -> i32 {
self.0.local_minus_utc()
}
}
impl FromStr for Tz {
type Err = ArrowError;

Expand Down
113 changes: 107 additions & 6 deletions arrow-cast/src/cast/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1590,23 +1590,114 @@ pub fn cast_with_options(
to_tz.clone(),
))
}
(Timestamp(from_unit, _), Date32) => {
let array = cast_with_options(array, &Int64, cast_options)?;
(Timestamp(TimeUnit::Microsecond, tz), Date32) => {
let tz: Option<Tz> = tz.as_ref().map(|tz| tz.parse()).transpose()?;
let offset = if let Some(tz) = tz {
tz.get_time_zone_min() as i64
} else {
0
};
let array = array
.as_primitive::<TimestampMillisecondType>()
.try_unary::<_, TimestampMillisecondType, ArrowError>(|x| {
Ok(x + MICROSECONDS * offset)
})?;
let array = cast_with_options(&array, &Int64, cast_options)?;
let time_array = array.as_primitive::<Int64Type>();
let from_size = time_unit_multiple(from_unit) * SECONDS_IN_DAY;
let from_size = MICROSECONDS * SECONDS_IN_DAY;

let mut b = Date32Builder::with_capacity(array.len());

for i in 0..array.len() {
if time_array.is_null(i) {
b.append_null();
} else {
b.append_value(
num::integer::div_floor::<i64>(time_array.value(i), from_size) as i32,
);
let val = num::integer::div_floor::<i64>(time_array.value(i), from_size) as i32;
b.append_value(val);
}
}
Ok(Arc::new(b.finish()) as ArrayRef)
}
(Timestamp(TimeUnit::Millisecond, tz), Date32) => {
let tz: Option<Tz> = tz.as_ref().map(|tz| tz.parse()).transpose()?;
let offset = if let Some(tz) = tz {
tz.get_time_zone_min() as i64
} else {
0
};
let array = array
.as_primitive::<TimestampMillisecondType>()
.try_unary::<_, TimestampMillisecondType, ArrowError>(|x| {
Ok(offset * MILLISECONDS + x)
})?;
let array = cast_with_options(&array, &Int64, cast_options)?;
let time_array = array.as_primitive::<Int64Type>();
let from_size = MILLISECONDS * SECONDS_IN_DAY;

let mut b = Date32Builder::with_capacity(array.len());

for i in 0..array.len() {
if time_array.is_null(i) {
b.append_null();
} else {
let val = num::integer::div_floor::<i64>(time_array.value(i), from_size) as i32;
b.append_value(val);
}
}
Ok(Arc::new(b.finish()) as ArrayRef)
}
(Timestamp(TimeUnit::Second, tz), Date32) => {
let tz: Option<Tz> = tz.as_ref().map(|tz| tz.parse()).transpose()?;
let offset = if let Some(tz) = tz {
tz.get_time_zone_min() as i64
} else {
0
};
let array = array
.as_primitive::<TimestampMillisecondType>()
.try_unary::<_, TimestampMillisecondType, ArrowError>(|x| Ok(offset + x))?;
let array = cast_with_options(&array, &Int64, cast_options)?;
let time_array = array.as_primitive::<Int64Type>();
let from_size = SECONDS_IN_DAY;

let mut b = Date32Builder::with_capacity(array.len());

for i in 0..array.len() {
if time_array.is_null(i) {
b.append_null();
} else {
let val = num::integer::div_floor::<i64>(time_array.value(i), from_size) as i32;
b.append_value(val);
}
}
Ok(Arc::new(b.finish()) as ArrayRef)
}
(Timestamp(TimeUnit::Nanosecond, tz), Date32) => {
let tz: Option<Tz> = tz.as_ref().map(|tz| tz.parse()).transpose()?;
let offset = if let Some(tz) = tz {
tz.get_time_zone_min() as i64
} else {
0
};
let array = array
.as_primitive::<TimestampMillisecondType>()
.try_unary::<_, TimestampMillisecondType, ArrowError>(|x| {
Ok(x + offset * NANOSECONDS)
})?;
let array = cast_with_options(&array, &Int64, cast_options)?;
let time_array = array.as_primitive::<Int64Type>();
let from_size = NANOSECONDS * SECONDS_IN_DAY;

let mut b = Date32Builder::with_capacity(array.len());

for i in 0..array.len() {
if time_array.is_null(i) {
b.append_null();
} else {
let val = num::integer::div_floor::<i64>(time_array.value(i), from_size) as i32;
b.append_value(val);
}
}
Ok(Arc::new(b.finish()) as ArrayRef)
}
(Timestamp(TimeUnit::Second, _), Date64) => Ok(Arc::new(match cast_options.safe {
Expand Down Expand Up @@ -4441,7 +4532,17 @@ mod tests {
assert_eq!(17890, c.value(1));
assert!(c.is_null(2));
}
#[test]
fn test_cast_timestamp_to_date32_zone() {
let array = TimestampMillisecondArray::from(vec![Some(25201000), Some(111599000), None])
.with_timezone("-07:00".to_string());

let b = cast(&array, &DataType::Date32).unwrap();
let c = b.as_primitive::<Date32Type>();
assert_eq!(0, c.value(0));
assert_eq!(0, c.value(1));
assert!(c.is_null(2));
}
#[test]
fn test_cast_timestamp_to_date64() {
let array =
Expand Down
2 changes: 1 addition & 1 deletion arrow-select/src/take.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1225,7 +1225,7 @@ mod tests {
let result = take(&input, &index, None).unwrap();
match result.data_type() {
DataType::Timestamp(TimeUnit::Nanosecond, tz) => {
assert_eq!(tz.clone(), Some("UTC".into()))
assert_eq!(tz.clone(), Some("+00:00".into()))
}
_ => panic!(),
}
Expand Down

0 comments on commit 026c03a

Please sign in to comment.