Skip to content

Commit

Permalink
Calculate differences between timestamps, and use for equality
Browse files Browse the repository at this point in the history
  • Loading branch information
spenczar committed Aug 28, 2023
1 parent 155a987 commit 668fc70
Show file tree
Hide file tree
Showing 2 changed files with 300 additions and 20 deletions.
103 changes: 103 additions & 0 deletions adam_core/time/tests/test_time.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import astropy.time
import astropy.units
import numpy.testing as npt
import pyarrow.compute as pc
import pytest
import quivr as qv

Expand Down Expand Up @@ -334,3 +335,105 @@ def test_add_days(self):
have = self.t1.add_days([1, 2, 3])
assert have.days.to_pylist() == [1, 50002, 60003]
assert have.nanos.to_pylist() == self.t1.nanos.to_pylist()

def test_equals_array(self):
t1 = self.t1
t2 = self.t1

assert pc.all(t1.equals_array(t2)).as_py()

t3 = t2.add_nanos(1)
assert not pc.all(t1.equals_array(t3)).as_py()
assert pc.all(t1.equals_array(t3, precision="us")).as_py()

t4 = time.Timestamp.from_kwargs(
days=t1.days,
nanos=t1.nanos,
scale="utc",
)
with pytest.raises(ValueError):
t1.equals_array(t4)

def test_equals_scalar(self):
t1 = time.Timestamp.from_kwargs(
days=[50000, 60000, 70000],
nanos=[0, 1, 2],
)

have = t1.equals_scalar(days=50000, nanos=0)
assert have.to_pylist() == [True, False, False]

def test_equals_scalar_precision(self):
t1 = time.Timestamp.from_kwargs(
days=[0, 0, 1, 1, 2, 2],
nanos=[
500,
86400_000_000_000 - 500,
500,
86400_000_000_000 - 500,
500,
86400_000_000_000 - 500,
],
)
have = t1.equals_scalar(days=1, nanos=0, precision="us")
assert have.to_pylist() == [False, True, True, False, False, False]

def test_difference_scalar(self):
# Compute difference from days=1, nanos=100
cases = [
{
"in_days": 0,
"in_nanos": 0,
"out_days": -2,
"out_nanos": 86400_000_000_000 - 100,
},
{
"in_days": 0,
"in_nanos": 50,
"out_days": -2,
"out_nanos": 86400_000_000_000 - 50,
},
{"in_days": 0, "in_nanos": 200, "out_days": -1, "out_nanos": 100},
{
"in_days": 1,
"in_nanos": 50,
"out_days": -1,
"out_nanos": 86400_000_000_000 - 50,
},
{"in_days": 1, "in_nanos": 100, "out_days": 0, "out_nanos": 0},
{"in_days": 1, "in_nanos": 200, "out_days": 0, "out_nanos": 100},
{
"in_days": 2,
"in_nanos": 50,
"out_days": 0,
"out_nanos": 86400_000_000_000 - 50,
},
{"in_days": 2, "in_nanos": 100, "out_days": 1, "out_nanos": 0},
{"in_days": 2, "in_nanos": 200, "out_days": 1, "out_nanos": 100},
]

for (i, c) in enumerate(cases):
t1 = time.Timestamp.from_kwargs(
days=[c["in_days"]],
nanos=[c["in_nanos"]],
)
have_days, have_nanos = t1.difference_scalar(days=1, nanos=100)
assert have_days[0].as_py() == c["out_days"], f"case {i}"
assert have_nanos[0].as_py() == c["out_nanos"], f"case {i}"

def test_difference(self):
t1 = time.Timestamp.from_kwargs(
days=[50000, 60000, 70000],
nanos=[0, 1, 2],
)
t2 = time.Timestamp.from_kwargs(
days=[50000, 60000, 70000],
nanos=[100, 200, 300],
)
have_days, have_nanos = t1.difference(t2)
assert have_days.to_pylist() == [-1, -1, -1]
assert have_nanos.to_pylist() == [
86400_000_000_000 - 100,
86400_000_000_000 - 199,
86400_000_000_000 - 298,
]
217 changes: 197 additions & 20 deletions adam_core/time/time.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,35 +64,55 @@ def equals(self, other: Timestamp, precision: str = "ns") -> pa.BooleanArray:
def equals_scalar(
self, days: int, nanos: int, precision: str = "ns"
) -> pa.BooleanArray:
days_equal = pc.equal(self.days, days)
delta_days, delta_nanos = self.difference_scalar(days, nanos)
if precision == "ns":
times_equal = pc.equal(self.nanos, nanos)
max_deviation = 0
elif precision == "us":
times_equal = pc.equal(self.micros(), nanos // 1_000)
max_deviation = 999
elif precision == "ms":
times_equal = pc.equal(self.millis(), nanos // 1_000_000)
max_deviation = 999_999
elif precision == "s":
times_equal = pc.equal(self.seconds(), nanos // 1_000_000_000)
max_deviation = 999_999_999
else:
raise ValueError(f"Unsupported precision: {precision}")
return pc.and_(days_equal, times_equal)
return _duration_arrays_within_tolerance(delta_days, delta_nanos, max_deviation)

def equals_array(self, other: Timestamp, precision: str = "ns") -> pa.BooleanArray:
days_equal = pc.equal(self.days, other.days)
"""
Compare two Timestamps, returning a BooleanArray indicating
whether each element is equal.
The Timestamps must have the same scale, and the same length.
"""
if self.scale != other.scale:
raise ValueError("Cannot compare timestamps with different scales")
if len(self) != len(other):
raise ValueError("Timestamps must have the same length")

delta_days, delta_nanos = self.difference(other)
if precision == "ns":
times_equal = pc.equal(self.nanos, other.nanos)
max_deviation = 0
elif precision == "us":
times_equal = pc.equal(self.micros(), other.micros())
max_deviation = 999
elif precision == "ms":
times_equal = pc.equal(self.millis(), other.millis())
max_deviation = 999_999
elif precision == "s":
times_equal = pc.equal(self.seconds(), other.seconds())
max_deviation = 999_999_999
else:
raise ValueError(f"Unsupported precision: {precision}")
return pc.and_(days_equal, times_equal)
return _duration_arrays_within_tolerance(delta_days, delta_nanos, max_deviation)

@classmethod
def from_astropy(cls, astropy_time: astropy.time.Time) -> Timestamp:
"""Convert an astropy time to a quivr timestamp.
This is a lossy conversion, since astropy uses floating point
to represent times, while quivr uses integers.
The astropy time must use a scale supported by quivr. The
supported scales are "tai", "tt", "ut1", "utc", and "tdb".
"""
if astropy_time.scale not in SCALES:
raise ValueError(f"Unsupported scale: {astropy_time.scale}")
if astropy_time.isscalar:
Expand Down Expand Up @@ -143,6 +163,9 @@ def _from_astropy_scalar(cls, astropy_time: astropy.time.Time) -> Timestamp:
)

def to_astropy(self) -> astropy.time.Time:
"""
Convert the timestamp to an astropy time.
"""
fractional_days = self.fractional_days()
return astropy.time.Time(
val=self.days,
Expand All @@ -155,16 +178,17 @@ def add_nanos(
self, nanos: pa.lib.Int64Array | int, check_range: bool = True
) -> Timestamp:
"""
Add nanoseconds to the timestamp.
Add nanoseconds to the timestamp. Negative nanoseconds are
allowed.
Args:
nanos: The nanoseconds to add. Can be a scalar or an array of
Parameters
----------
nanos : The nanoseconds to add. Can be a scalar or an array of
the same length as the timestamp. Must be in the range [-86400e9, 86400e9).
check_range: If True, check that the nanoseconds are in the
check_range : If True, check that the nanoseconds are in the
range [-86400e9, 86400e9). If False, the caller is
responsible for ensuring that the nanoseconds are in the
correct range.
"""
if check_range:
if isinstance(nanos, int):
Expand Down Expand Up @@ -202,19 +226,172 @@ def add_nanos(

def add_seconds(self, seconds: pa.lib.Int64Array | int) -> Timestamp:
"""
Add seconds to the timestamp.
Add seconds to the timestamp. Negative seconds are supported.
Args:
seconds: The seconds to add. Can be a scalar or an array of
Parameters
----------
seconds : The seconds to add. Can be a scalar or an array of
the same length as the timestamp. Must be in the range [-86400, 86400).
See Also
--------
add_nanos : Add nanoseconds to the timestamp. This method includes
a 'check_range' parameter that allows the caller to disable range
checking for performance reasons.
"""
return self.add_nanos(pc.multiply(seconds, 1_000_000_000))

def add_millis(self, millis: pa.lib.Int64Array | int) -> Timestamp:
"""
Add milliseconds to the timestamp. Negative milliseconds are
supported.
Parameters
----------
millis : The milliseconds to add. Can be a scalar or an array of
the same length as the timestamp. Must be in the range [-86400e3, 86400e3).
See Also
--------
add_nanos : Add nanoseconds to the timestamp. This method includes
a 'check_range' parameter that allows the caller to disable range
checking for performance reasons.
"""
return self.add_nanos(pc.multiply(millis, 1_000_000))

def add_micros(self, micros: pa.lib.Int64Array | int) -> Timestamp:
"""
Add microseconds to the timestamp. Negative microseconds are
supported.
Parameters
----------
micros : The microseconds to add. Can be a scalar or an array of
the same length as the timestamp. Must be in the range [-86400e6, 86400e6).
See Also
--------
add_nanos : Add nanoseconds to the timestamp. This method includes
a 'check_range' parameter that allows the caller to disable range
checking for performance reasons.
"""
return self.add_nanos(pc.multiply(micros, 1_000))

def add_days(self, days: pa.lib.Int64Array | int) -> Timestamp:
"""Add days to the timestamp.
Parameters
----------
days : The days to add. Can be a scalar or an array of the
same length as the timestamp. Use negative values to
subtract days.
"""
return self.set_column("days", pc.add(self.days, days))

def difference_scalar(
self, days: int, nanos: int
) -> tuple[pa.Int64Array, pa.Int64Array]:
"""
Compute the difference between this timestamp and a scalar
timestamp.
The difference is computed as (self - scalar). The result is
presented as a tuple of (days, nanos). The nanos value is
always non-negative, in the range [0, 86400e9).
Parameters
----------
days : The days of the scalar timestamp.
nanos : The nanoseconds of the scalar timestamp.
Returns
-------
days : The difference in days. This value can be negative.
nanos : The difference in nanoseconds. This value is always
non-negative, in the range [0, 86400e9).
Examples
--------
>>> from pyarrow import Timestamp
>>> ts = Timestamp.from_kwargs(days=[0, 1, 2], nanos=[200, 0, 100])
>>> have_days, have_nanos = ts.difference_scalar(1, 100)
>>> have_days.to_numpy()
array([-2, -1, 1])
>>> have_nanos.to_numpy()
array([100, 86399999999900, 0])
"""
days1 = pc.subtract(self.days, days)
nanos1 = pc.subtract(self.nanos, nanos)
overflows = pc.greater_equal(nanos1, 86400 * 1e9)
underflows = pc.less(nanos1, 0)
mask = pa.StructArray.from_arrays(
[overflows, underflows], names=["overflows", "underflows"]
)
nanos2 = pc.case_when(
mask,
pc.subtract(nanos1, int(86400 * 1e9)),
pc.add(nanos1, int(86400 * 1e9)),
nanos1,
)
days2 = pc.case_when(
mask,
pc.add(days1, 1),
pc.subtract(days1, 1),
days1,
)
return days2, nanos2

def difference(self, other: Timestamp) -> tuple[pa.Int64Array, pa.Int64Array]:
"""
Compute the element-wise difference between this timestamp and another.
"""
if self.scale != other.scale:
raise ValueError(
"Cannot compute difference between timestamps with different scales"
)
days1 = pc.subtract(self.days, other.days)
nanos1 = pc.subtract(self.nanos, other.nanos)

overflows = pc.greater_equal(nanos1, 86400 * 1e9)
underflows = pc.less(nanos1, 0)
mask = pa.StructArray.from_arrays(
[overflows, underflows], names=["overflows", "underflows"]
)
nanos2 = pc.case_when(
mask,
pc.subtract(nanos1, int(86400 * 1e9)),
pc.add(nanos1, int(86400 * 1e9)),
nanos1,
)
days2 = pc.case_when(
mask,
pc.add(days1, 1),
pc.subtract(days1, 1),
days1,
)
return days2, nanos2


def _duration_arrays_within_tolerance(
delta_days: pa.Int64Array, delta_nanos: pa.Int64Array, max_nanos_deviation: int
) -> pa.BooleanArray:
"""Return a boolean array indicating whether the delta_days and delta_nanos
arrays are within the specified tolerance.
The max_nanos_deviation should be the maximum number of
nanoseconds that the the two arrays can deviate to still be
considered 'within tolerance'.
"""
if max_nanos_deviation == 0:
return pc.and_(pc.equal(delta_days, 0), pc.equal(delta_nanos, 0))

cond1 = pc.and_(
pc.equal(delta_days, 0), pc.less(pc.abs(delta_nanos), max_nanos_deviation)
)
cond2 = pc.and_(
pc.equal(delta_days, -1),
pc.greater_equal(pc.abs(delta_nanos), 86400 * 1e9 - max_nanos_deviation),
)
return pc.or_(cond1, cond2)

0 comments on commit 668fc70

Please sign in to comment.