Skip to content

Commit

Permalink
Merge branch 'ts_idx_refactor' into v0.2-integration
Browse files Browse the repository at this point in the history
  • Loading branch information
tnixon committed Jan 23, 2024
2 parents c652a80 + 5c31d3a commit fa5dff3
Show file tree
Hide file tree
Showing 11 changed files with 3,040 additions and 4,600 deletions.
1 change: 1 addition & 0 deletions python/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@ sphinx-design==0.2.0
sphinx-panels==0.6.0
jsonref==1.1.0
python-dateutil==2.8.2
parameterized==0.8.1
46 changes: 46 additions & 0 deletions python/tempo/timeunit.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
from typing import NamedTuple
from functools import total_ordering


@total_ordering
class TimeUnit(NamedTuple):
name: str
approx_seconds: float
"""
Represents a unit of time, with a name,
an approximate number of seconds,
and a sub-second precision.
"""

def __eq__(self, other):
return self.approx_seconds == other.approx_seconds

def __lt__(self, other):
return self.approx_seconds < other.approx_seconds


TimeUnitsType = NamedTuple("TimeUnitsType",
[("YEARS", TimeUnit),
("MONTHS", TimeUnit),
("WEEKS", TimeUnit),
("DAYS", TimeUnit),
("HOURS", TimeUnit),
("MINUTES", TimeUnit),
("SECONDS", TimeUnit),
("MILLISECONDS", TimeUnit),
("MICROSECONDS", TimeUnit),
("NANOSECONDS", TimeUnit)])

StandardTimeUnits = TimeUnitsType(
TimeUnit("year", 365 * 24 * 60 * 60),
TimeUnit("month", 30 * 24 * 60 * 60),
TimeUnit("week", 7 * 24 * 60 * 60),
TimeUnit("day", 24 * 60 * 60),
TimeUnit("hour", 60 * 60),
TimeUnit("minute", 60),
TimeUnit("second", 1),
TimeUnit("millisecond", 1e-03),
TimeUnit("microsecond", 1e-06),
TimeUnit("nanosecond", 1e-09)
)

110 changes: 54 additions & 56 deletions python/tempo/tsdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
import tempo.resample as t_resample
import tempo.utils as t_utils
from tempo.intervals import IntervalsDF
from tempo.tsschema import CompositeTSIndex, TSIndex, TSSchema, WindowBuilder
from tempo.tsschema import DEFAULT_TIMESTAMP_FORMAT, is_time_format, sub_seconds_precision_digits, \
CompositeTSIndex, ParsedTSIndex, TSIndex, TSSchema, WindowBuilder
from tempo.typing import ColumnOrName, PandasMapIterFunction, PandasGroupedMapFunction

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -52,14 +53,12 @@ def __init__(
self.ts_schema.validate(df.schema)

def __repr__(self) -> str:
return self.__str__()
return f"{self.__class__.__name__}(df={self.df}, ts_schema={self.ts_schema})"

def __str__(self) -> str:
return f"""TSDF({id(self)}):
TS Index: {self.ts_index}
Series IDs: {self.series_ids}
Observational Cols: {self.observational_cols}
DataFrame: {self.df.schema}"""
def __eq__(self, other: Any) -> bool:
if not isinstance(other, TSDF):
return False
return self.ts_schema == other.ts_schema and self.df == other.df

def __withTransformedDF(self, new_df: DataFrame) -> "TSDF":
"""
Expand Down Expand Up @@ -118,42 +117,57 @@ def fromSubsequenceCol(
) -> "TSDF":
# construct a struct with the ts_col and subsequence_col
struct_col_name = cls.__DEFAULT_TS_IDX_COL
with_subseq_struct_df = cls.__makeStructFromCols(
df, struct_col_name, [ts_col, subsequence_col]
)
with_subseq_struct_df = cls.__makeStructFromCols(df,
struct_col_name,
[ts_col, subsequence_col])
# construct an appropriate TSIndex
subseq_struct = with_subseq_struct_df.schema[struct_col_name]
subseq_idx = CompositeTSIndex(subseq_struct, ts_col, subsequence_col)
# construct & return the TSDF with appropriate schema
return TSDF(with_subseq_struct_df, ts_schema=TSSchema(subseq_idx, series_ids))

# default column name for parsed timeseries column
__DEFAULT_PARSED_TS_COL = "parsed_ts"

@classmethod
def fromTimestampString(
def fromStringTimestamp(
cls,
df: DataFrame,
ts_col: str,
series_ids: Optional[Collection[str]] = None,
ts_fmt: str = "YYYY-MM-DDThh:mm:ss[.SSSSSS]",
ts_fmt: str = DEFAULT_TIMESTAMP_FORMAT
) -> "TSDF":
pass

@classmethod
def fromDateString(
cls,
df: DataFrame,
ts_col: str,
series_ids: Collection[str],
date_fmt: str = "YYYY-MM-DD",
) -> "TSDF ":
pass
# parse the ts_col based on the pattern
if is_time_format(ts_fmt):
# if the ts_fmt is a time format, we can use to_timestamp
ts_expr = sfn.to_timestamp(sfn.col(ts_col), ts_fmt)
else:
# otherwise, we'll use to_date
ts_expr = sfn.to_date(sfn.col(ts_col), ts_fmt)
# parse the ts_col give the expression
parsed_ts_col = cls.__DEFAULT_PARSED_TS_COL
parsed_df = df.withColumn(parsed_ts_col, ts_expr)
# move the ts cols into a struct
struct_col_name = cls.__DEFAULT_TS_IDX_COL
with_parsed_struct_df = cls.__makeStructFromCols(parsed_df,
struct_col_name,
[ts_col, parsed_ts_col])
# construct an appropriate TSIndex
parsed_struct = with_parsed_struct_df.schema[struct_col_name]
parsed_ts_idx = ParsedTSIndex.fromParsedTimestamp(parsed_struct,
parsed_ts_col,
ts_col)
# construct & return the TSDF with appropriate schema
return TSDF(with_parsed_struct_df, ts_schema=TSSchema(parsed_ts_idx, series_ids))

@property
def ts_index(self) -> "TSIndex":
return self.ts_schema.ts_idx

@property
def ts_col(self) -> str:
return self.ts_index.ts_col
# TODO - this should be replaced TSIndex expressions
pass

@property
def columns(self) -> List[str]:
Expand Down Expand Up @@ -409,74 +423,58 @@ def where(self, condition: Union[Column, str]) -> "TSDF":
where_df = self.df.where(condition)
return self.__withTransformedDF(where_df)

def __slice(self, op: str, target_ts: Union[str, int]) -> "TSDF":
"""
Private method to slice TSDF by time
:param op: string symbol of the operation to perform
:type op: str
:param target_ts: timestamp on which to filter
:return: a TSDF object containing only those records within the time slice specified
"""
# quote our timestamp if its a string
target_expr = f"'{target_ts}'" if isinstance(target_ts, str) else target_ts
slice_expr = sfn.expr(f"{self.ts_col} {op} {target_expr}")
sliced_df = self.df.where(slice_expr)
return self.__withTransformedDF(sliced_df)

def at(self, ts: Union[str, int]) -> "TSDF":
def at(self, ts: Any) -> "TSDF":
"""
Select only records at a given time
:param ts: timestamp of the records to select
:return: a :class:`~tsdf.TSDF` object containing just the records at the given time
"""
return self.__slice("==", ts)
return self.where(self.ts_index == ts)

def before(self, ts: Union[str, int]) -> "TSDF":
def before(self, ts: Any) -> "TSDF":
"""
Select only records before a given time
:param ts: timestamp on which to filter records
:return: a :class:`~tsdf.TSDF` object containing just the records before the given time
"""
return self.__slice("<", ts)
return self.where(self.ts_index < ts)

def atOrBefore(self, ts: Union[str, int]) -> "TSDF":
def atOrBefore(self, ts: Any) -> "TSDF":
"""
Select only records at or before a given time
:param ts: timestamp on which to filter records
:return: a :class:`~tsdf.TSDF` object containing just the records at or before the given time
"""
return self.__slice("<=", ts)
return self.where(self.ts_index <= ts)

def after(self, ts: Union[str, int]) -> "TSDF":
def after(self, ts: Any) -> "TSDF":
"""
Select only records after a given time
:param ts: timestamp on which to filter records
:return: a :class:`~tsdf.TSDF` object containing just the records after the given time
"""
return self.__slice(">", ts)
return self.where(self.ts_index > ts)

def atOrAfter(self, ts: Union[str, int]) -> "TSDF":
def atOrAfter(self, ts: Any) -> "TSDF":
"""
Select only records at or after a given time
:param ts: timestamp on which to filter records
:return: a :class:`~tsdf.TSDF` object containing just the records at or after the given time
"""
return self.__slice(">=", ts)
return self.where(self.ts_index >= ts)

def between(
self, start_ts: Union[str, int], end_ts: Union[str, int], inclusive: bool = True
self, start_ts: Any, end_ts: Any, inclusive: bool = True
) -> "TSDF":
"""
Select only records in a given range
Expand Down Expand Up @@ -531,7 +529,7 @@ def latest(self, n: int = 1) -> "TSDF":
next_window = self.baseWindow(reverse=True)
return self.__top_rows_per_series(next_window, n)

def priorTo(self, ts: Union[str, int], n: int = 1) -> "TSDF":
def priorTo(self, ts: Any, n: int = 1) -> "TSDF":
"""
Select the n most recent records prior to a given time
You can think of this like an 'asOf' select - it selects the records as of a particular time
Expand All @@ -543,7 +541,7 @@ def priorTo(self, ts: Union[str, int], n: int = 1) -> "TSDF":
"""
return self.atOrBefore(ts).latest(n)

def subsequentTo(self, ts: Union[str, int], n: int = 1) -> "TSDF":
def subsequentTo(self, ts: Any, n: int = 1) -> "TSDF":
"""
Select the n records subsequent to a give time
Expand Down Expand Up @@ -868,7 +866,7 @@ def asofJoin(
if tsPartitionVal is None:
seq_col = None
if isinstance(combined_df.ts_index, CompositeTSIndex):
seq_col = cast(CompositeTSIndex, combined_df.ts_index).ts_component(1)
seq_col = cast(CompositeTSIndex, combined_df.ts_index).get_ts_component(1)
asofDF = combined_df.__getLastRightRow(
left_tsdf.ts_col,
right_columns,
Expand All @@ -883,7 +881,7 @@ def asofJoin(
)
seq_col = None
if isinstance(tsPartitionDF.ts_index, CompositeTSIndex):
seq_col = cast(CompositeTSIndex, tsPartitionDF.ts_index).ts_component(1)
seq_col = cast(CompositeTSIndex, tsPartitionDF.ts_index).get_ts_component(1)
asofDF = tsPartitionDF.__getLastRightRow(
left_tsdf.ts_col,
right_columns,
Expand Down
Loading

0 comments on commit fa5dff3

Please sign in to comment.