Skip to content

Commit

Permalink
adding timezone when casting timestamp
Browse files Browse the repository at this point in the history
  • Loading branch information
Lordworms committed Apr 8, 2024
1 parent 9fda7ea commit 0cae5a2
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 9 deletions.
8 changes: 7 additions & 1 deletion arrow-array/src/array/primitive_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1366,7 +1366,13 @@ impl<T: ArrowTimestampType> PrimitiveArray<T> {

/// Construct a timestamp array with new timezone
pub fn with_timezone(self, timezone: impl Into<Arc<str>>) -> Self {
self.with_timezone_opt(Some(timezone.into()))
let timezone_str = timezone.into().to_lowercase();

if timezone_str == "utc" {
self.with_timezone_utc()
} else {
self.with_timezone_opt(Some(Arc::from(timezone_str)))
}
}

/// Construct a timestamp array with UTC
Expand Down
8 changes: 7 additions & 1 deletion arrow-array/src/temporal_conversions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ pub const MICROSECONDS_IN_DAY: i64 = SECONDS_IN_DAY * MICROSECONDS;
pub const NANOSECONDS_IN_DAY: i64 = SECONDS_IN_DAY * NANOSECONDS;
/// Number of days between 0001-01-01 and 1970-01-01
pub const EPOCH_DAYS_FROM_CE: i32 = 719_163;

/// Default time zone
pub const DEFAULT_TIME_ZONE: &str = "00:00";
/// converts a `i32` representing a `date32` to [`NaiveDateTime`]
#[inline]
pub fn date32_to_datetime(v: i32) -> Option<NaiveDateTime> {
Expand Down Expand Up @@ -127,6 +128,11 @@ pub fn time_to_time64us(v: NaiveTime) -> i64 {
pub fn time_to_time64ns(v: NaiveTime) -> i64 {
v.num_seconds_from_midnight() as i64 * NANOSECONDS + v.nanosecond() as i64
}
/// converts [`NaiveTime`] to a `i64` representing a `time64(ms)`
pub fn time_to_time64ms(v: NaiveTime) -> i64 {
v.num_seconds_from_midnight() as i64 * MILLISECONDS
+ v.nanosecond() as i64 * MILLISECONDS / NANOSECONDS
}

/// converts a `i64` representing a `timestamp(s)` to [`NaiveDateTime`]
#[inline]
Expand Down
7 changes: 6 additions & 1 deletion arrow-array/src/timezone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,12 @@ mod private {
/// An Arrow [`TimeZone`]
#[derive(Debug, Copy, Clone)]
pub struct Tz(FixedOffset);

impl Tz {
/// get timezone
pub fn get_time_zone(&self) -> FixedOffset {
self.0
}
}
impl FromStr for Tz {
type Err = ArrowError;

Expand Down
115 changes: 109 additions & 6 deletions arrow-cast/src/cast/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1590,23 +1590,116 @@ pub fn cast_with_options(
to_tz.clone(),
))
}
(Timestamp(from_unit, _), Date32) => {
let array = cast_with_options(array, &Int64, cast_options)?;
(Timestamp(TimeUnit::Microsecond, tz), Date32) => {
let tz: Option<Tz> = tz.as_ref().map(|tz| tz.parse()).transpose()?;
let array = array
.as_primitive::<TimestampMillisecondType>()
.try_unary::<_, TimestampMillisecondType, ArrowError>(|x| {
if let Some(tzone) = tz {
let res = MICROSECONDS * tzone.get_time_zone().local_minus_utc() as i64 + x;
Ok(res)
} else {
Ok(x.into())
}
})?;
let array = cast_with_options(&array, &Int64, cast_options)?;
let time_array = array.as_primitive::<Int64Type>();
let from_size = MICROSECONDS * SECONDS_IN_DAY;

let mut b = Date32Builder::with_capacity(array.len());

for i in 0..array.len() {
if time_array.is_null(i) {
b.append_null();
} else {
let val = num::integer::div_floor::<i64>(time_array.value(i), from_size) as i32;
b.append_value(val);
}
}
Ok(Arc::new(b.finish()) as ArrayRef)
}
(Timestamp(TimeUnit::Millisecond, tz), Date32) => {
let tz: Option<Tz> = tz.as_ref().map(|tz| tz.parse()).transpose()?;
let array = array
.as_primitive::<TimestampMillisecondType>()
.try_unary::<_, TimestampMillisecondType, ArrowError>(|x| {
if let Some(tzone) = tz {
let res = MILLISECONDS * tzone.get_time_zone().local_minus_utc() as i64 + x;
Ok(res)
} else {
Ok(x.into())
}
})?;
let array = cast_with_options(&array, &Int64, cast_options)?;
let time_array = array.as_primitive::<Int64Type>();
let from_size = MILLISECONDS * SECONDS_IN_DAY;

let mut b = Date32Builder::with_capacity(array.len());

for i in 0..array.len() {
if time_array.is_null(i) {
b.append_null();
} else {
let val = num::integer::div_floor::<i64>(time_array.value(i), from_size) as i32;
b.append_value(val);
}
}
Ok(Arc::new(b.finish()) as ArrayRef)
}
(Timestamp(TimeUnit::Second, tz), Date32) => {
let tz: Option<Tz> = tz.as_ref().map(|tz| tz.parse()).transpose()?;
let array = array
.as_primitive::<TimestampMillisecondType>()
.try_unary::<_, TimestampMillisecondType, ArrowError>(|x| {
if let Some(tzone) = tz {
let res = tzone.get_time_zone().local_minus_utc() as i64 + x;
Ok(res)
} else {
Ok(x.into())
}
})?;
let array = cast_with_options(&array, &Int64, cast_options)?;
let time_array = array.as_primitive::<Int64Type>();
let from_size = time_unit_multiple(from_unit) * SECONDS_IN_DAY;
let from_size = 1 * SECONDS_IN_DAY;

let mut b = Date32Builder::with_capacity(array.len());

for i in 0..array.len() {
if time_array.is_null(i) {
b.append_null();
} else {
b.append_value(
num::integer::div_floor::<i64>(time_array.value(i), from_size) as i32,
);
let val = num::integer::div_floor::<i64>(time_array.value(i), from_size) as i32;
b.append_value(val);
}
}
Ok(Arc::new(b.finish()) as ArrayRef)
}
(Timestamp(TimeUnit::Nanosecond, tz), Date32) => {
let tz: Option<Tz> = tz.as_ref().map(|tz| tz.parse()).transpose()?;
let array = array
.as_primitive::<TimestampMillisecondType>()
.try_unary::<_, TimestampMillisecondType, ArrowError>(|x| {
if let Some(tzone) = tz {
let res = NANOSECONDS * tzone.get_time_zone().local_minus_utc() as i64 + x;
Ok(res)
} else {
Ok(x.into())
}
})?;
let array = cast_with_options(&array, &Int64, cast_options)?;
let time_array = array.as_primitive::<Int64Type>();
let from_size = NANOSECONDS * SECONDS_IN_DAY;

let mut b = Date32Builder::with_capacity(array.len());

for i in 0..array.len() {
if time_array.is_null(i) {
b.append_null();
} else {
let val = num::integer::div_floor::<i64>(time_array.value(i), from_size) as i32;
b.append_value(val);
}
}
Ok(Arc::new(b.finish()) as ArrayRef)
}
(Timestamp(TimeUnit::Second, _), Date64) => Ok(Arc::new(match cast_options.safe {
Expand Down Expand Up @@ -4441,7 +4534,17 @@ mod tests {
assert_eq!(17890, c.value(1));
assert!(c.is_null(2));
}
#[test]
fn test_cast_timestamp_to_date32_zone() {
let array = TimestampMillisecondArray::from(vec![Some(25201000), Some(111599000), None])
.with_timezone("-07:00".to_string());

let b = cast(&array, &DataType::Date32).unwrap();
let c = b.as_primitive::<Date32Type>();
assert_eq!(0, c.value(0));
assert_eq!(0, c.value(1));
assert!(c.is_null(2));
}
#[test]
fn test_cast_timestamp_to_date64() {
let array =
Expand Down

0 comments on commit 0cae5a2

Please sign in to comment.