Skip to content

Commit

Permalink
checkpoint save - testing basic index functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
tnixon committed Jan 7, 2024
1 parent cca5006 commit d488d7c
Show file tree
Hide file tree
Showing 6 changed files with 653 additions and 310 deletions.
1 change: 1 addition & 0 deletions python/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@ sphinx-design==0.2.0
sphinx-panels==0.6.0
jsonref==1.1.0
python-dateutil==2.8.2
parameterized==0.8.1
72 changes: 36 additions & 36 deletions python/tempo/tsdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
import tempo.resample as t_resample
import tempo.utils as t_utils
from tempo.intervals import IntervalsDF
from tempo.tsschema import CompositeTSIndex, TSIndex, TSSchema, WindowBuilder
from tempo.tsschema import DEFAULT_TIMESTAMP_FORMAT, is_time_format, \
CompositeTSIndex, TSIndex, TSSchema, WindowBuilder

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -120,33 +121,48 @@ def fromSubsequenceCol(
# construct & return the TSDF with appropriate schema
return TSDF(with_subseq_struct_df, ts_schema=TSSchema(subseq_idx, series_ids))

# default column name for parsed timeseries column
__DEFAULT_PARSED_TS_COL = "parsed_ts"

@classmethod
def fromTimestampString(
def fromStringTimestamp(
cls,
df: DataFrame,
ts_col: str,
series_ids: Collection[str] = None,
ts_fmt: str = "YYYY-MM-DDThh:mm:ss[.SSSSSS]",
ts_fmt: str = DEFAULT_TIMESTAMP_FORMAT,
) -> "TSDF":
pass

@classmethod
def fromDateString(
cls,
df: DataFrame,
ts_col: str,
series_ids: Collection[str],
date_fmt: str = "YYYY-MM-DD",
) -> "TSDF ":
pass
# parse the ts_col based on the pattern
if is_time_format(ts_fmt):
# if the ts_fmt is a time format, we can use to_timestamp
ts_expr = sfn.to_timestamp(sfn.col(ts_col), ts_fmt)
else:
# otherwise, we'll use to_date
ts_expr = sfn.to_date(sfn.col(ts_col), ts_fmt)
# parse the ts_col give the expression
parsed_ts_col = cls.__DEFAULT_PARSED_TS_COL
parsed_df = df.withColumn(cls.__DEFAULT_PARSED_TS_COL, ts_expr)
# move the ts cols into a struct
struct_col_name = cls.__DEFAULT_TS_IDX_COL
with_parsed_struct_df = cls.__makeStructFromCols(parsed_df,
struct_col_name,
[ts_col, parsed_ts_col])
# construct an appropriate TSIndex
parsed_struct = with_parsed_struct_df.schema[struct_col_name]
parsed_ts_idx = ParsedTSIndex.fromParsedTimestamp(parsed_struct,
parsed_ts_col,
ts_col)
# construct & return the TSDF with appropriate schema
return TSDF(with_parsed_struct_df, ts_schema=TSSchema(parsed_ts_idx, series_ids))

@property
def ts_index(self) -> "TSIndex":
return self.ts_schema.ts_idx

@property
def ts_col(self) -> str:
return self.ts_index.ts_col
# TODO - this should be replaced TSIndex expressions
pass

@property
def columns(self) -> List[str]:
Expand Down Expand Up @@ -408,22 +424,6 @@ def where(self, condition: Union[Column, str]) -> "TSDF":
where_df = self.df.where(condition)
return self.__withTransformedDF(where_df)

def __slice(self, op: str, target_ts: Any) -> "TSDF":
"""
Private method to slice TSDF by time
:param op: string symbol of the operation to perform
:type op: str
:param target_ts: timestamp on which to filter
:return: a TSDF object containing only those records within the time slice specified
"""
# quote our timestamp if its a string
target_expr = f"'{target_ts}'" if isinstance(target_ts, str) else target_ts
slice_expr = sfn.expr(f"{self.ts_col} {op} {target_expr}")
sliced_df = self.df.where(slice_expr)
return self.__withTransformedDF(sliced_df)

def at(self, ts: Any) -> "TSDF":
"""
Select only records at a given time
Expand All @@ -432,7 +432,7 @@ def at(self, ts: Any) -> "TSDF":
:return: a :class:`~tsdf.TSDF` object containing just the records at the given time
"""
return self.__slice("==", ts)
return self.where(self.ts_index == ts)

def before(self, ts: Any) -> "TSDF":
"""
Expand All @@ -442,7 +442,7 @@ def before(self, ts: Any) -> "TSDF":
:return: a :class:`~tsdf.TSDF` object containing just the records before the given time
"""
return self.__slice("<", ts)
return self.where(self.ts_index < ts)

def atOrBefore(self, ts: Any) -> "TSDF":
"""
Expand All @@ -452,7 +452,7 @@ def atOrBefore(self, ts: Any) -> "TSDF":
:return: a :class:`~tsdf.TSDF` object containing just the records at or before the given time
"""
return self.__slice("<=", ts)
return self.where(self.ts_index <= ts)

def after(self, ts: Any) -> "TSDF":
"""
Expand All @@ -462,7 +462,7 @@ def after(self, ts: Any) -> "TSDF":
:return: a :class:`~tsdf.TSDF` object containing just the records after the given time
"""
return self.__slice(">", ts)
return self.where(self.ts_index > ts)

def atOrAfter(self, ts: Any) -> "TSDF":
"""
Expand All @@ -472,7 +472,7 @@ def atOrAfter(self, ts: Any) -> "TSDF":
:return: a :class:`~tsdf.TSDF` object containing just the records at or after the given time
"""
return self.__slice(">=", ts)
return self.where(self.ts_index >= ts)

def between(
self, start_ts: Any, end_ts: Any, inclusive: bool = True
Expand Down
Loading

0 comments on commit d488d7c

Please sign in to comment.