Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Metadata cleanup #272

Closed
wants to merge 19 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/dlt_tempo.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def ts_bronze():
@dlt.expect_or_drop("User_check","User in ('a','c','i')")
def ts_ft():
phone_accel_df = dlt.read("ts_bronze")
phone_accel_tsdf = TSDF(phone_accel_df, ts_col="event_ts", partition_cols = ["User"])
phone_accel_tsdf = TSDF(phone_accel_df, ts_col="event_ts")
ts_ft_df = phone_accel_tsdf.fourier_transform(timestep=1, valueCol="x").df
return ts_ft_df

Expand Down
6 changes: 3 additions & 3 deletions examples/financial_services_quickstart.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@
# DBTITLE 1,Define TSDF Time Series Data Structure
from tempo import *

trades_tsdf = TSDF(trades_df, partition_cols = ['date', 'symbol'], ts_col = 'event_ts')
quotes_tsdf = TSDF(quotes_df, partition_cols = ['date', 'symbol'], ts_col = 'event_ts')
trades_tsdf = TSDF(trades_df, ts_col='event_ts')
quotes_tsdf = TSDF(quotes_df, ts_col='event_ts')

# COMMAND ----------

Expand Down Expand Up @@ -178,7 +178,7 @@
from tempo import *
from pyspark.sql.functions import *

minute_bars = TSDF(spark.table("time_test"), partition_cols=['ticker'], ts_col="ts").calc_bars(freq = '1 minute', func= 'ceil')
minute_bars = TSDF(spark.table("time_test"), ts_col="ts").calc_bars(freq ='1 minute', func='ceil')

display(minute_bars)

Expand Down
45 changes: 22 additions & 23 deletions python/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ phone_accel_df = spark.read.format("csv").option("header", "true").load("dbfs:/h

from tempo import *

phone_accel_tsdf = TSDF(phone_accel_df, ts_col="event_ts", partition_cols = ["User"])
phone_accel_tsdf = TSDF(phone_accel_df, ts_col="event_ts", series_ids = ["User"])

display(phone_accel_tsdf)
```
Expand All @@ -65,7 +65,7 @@ Note: You can upsample any missing values by using an option in the resample int

```python
# ts_col = timestamp column on which to sort fact and source table
# partition_cols - columns to use for partitioning the TSDF into more granular time series for windowing and sorting
# series_ids - columns to use for partitioning the TSDF into more granular time series for windowing and sorting

resampled_sdf = phone_accel_tsdf.resample(freq='min', func='floor')
resampled_pdf = resampled_sdf.df.filter(col('event_ts').cast("date") == "2015-02-23").toPandas()
Expand Down Expand Up @@ -97,7 +97,7 @@ from pyspark.sql.functions import *

watch_accel_df = spark.read.format("csv").option("header", "true").load("dbfs:/home/tempo/Watch_accelerometer").withColumn("event_ts", (col("Arrival_Time").cast("double")/1000).cast("timestamp")).withColumn("x", col("x").cast("double")).withColumn("y", col("y").cast("double")).withColumn("z", col("z").cast("double")).withColumn("event_ts_dbl", col("event_ts").cast("double"))

watch_accel_tsdf = TSDF(watch_accel_df, ts_col="event_ts", partition_cols = ["User"])
watch_accel_tsdf = TSDF(watch_accel_df, ts_col="event_ts", series_ids = ["User"])

# Applying AS OF join to TSDF datasets
joined_df = watch_accel_tsdf.asofJoin(phone_accel_tsdf, right_prefix="phone_accel")
Expand All @@ -107,12 +107,12 @@ display(joined_df)

#### 3. Skew Join Optimized AS OF Join

The purpose of the skew optimized as of join is to bucket each set of `partition_cols` to get the latest source record merged onto the fact table
The purpose of the skew optimized as of join is to bucket each set of `series_ids` to get the latest source record merged onto the fact table

Parameters:

ts_col = timestamp column for sorting
partition_cols = partition columns for defining granular time series for windowing and sorting
series_ids = partition columns for defining granular time series for windowing and sorting
tsPartitionVal = value to break up each partition into time brackets
fraction = overlap fraction
right_prefix = prefix used for source columns when merged into fact table
Expand Down Expand Up @@ -185,11 +185,10 @@ Valid columns data types for interpolation are: `["int", "bigint", "float", "dou
```python
# Create instance of the TSDF class
input_tsdf = TSDF(
input_df,
partition_cols=["partition_a", "partition_b"],
ts_col="event_ts",
)

input_df,
series_ids=["partition_a", "partition_b"],
ts_col="event_ts",
)

# What the following chain of operation does is:
# 1. Aggregate all valid numeric columns using mean into 30 second intervals
Expand All @@ -205,32 +204,32 @@ interpolated_tsdf = input_tsdf.resample(freq="30 seconds", func="mean").interpol
interpolated_tsdf = input_tsdf.interpolate(
freq="30 seconds",
func="mean",
target_cols= ["columnA","columnB"],
target_cols=["columnA", "columnB"],
method="linear"

)

# Alternatively it's also possible to override default TSDF parameters.
# e.g. partition_cols, ts_col a
# e.g. series_ids, ts_col a
interpolated_tsdf = input_tsdf.interpolate(
partition_cols=["partition_c"],
series_ids=["partition_c"],
ts_col="other_event_ts"
freq="30 seconds",
func="mean",
target_cols= ["columnA","columnB"],
method="linear"
freq = "30 seconds",
func = "mean",
target_cols = ["columnA", "columnB"],
method = "linear"
)

# The show_interpolated flag can be set to `True` to show additional boolean columns
# for a given row that shows if a column has been interpolated.
interpolated_tsdf = input_tsdf.interpolate(
partition_cols=["partition_c"],
series_ids=["partition_c"],
ts_col="other_event_ts"
freq="30 seconds",
func="mean",
method="linear",
target_cols= ["columnA","columnB"],
show_interpolated=True,
freq = "30 seconds",
func = "mean",
method = "linear",
target_cols = ["columnA", "columnB"],
show_interpolated = True,
)

```
Expand Down
Loading