Skip to content

Commit

Permalink
Merge branch 'master' into v0.2-integration
Browse files Browse the repository at this point in the history
  • Loading branch information
tnixon committed Jan 2, 2024
2 parents 98f66ff + d572233 commit a81d34d
Show file tree
Hide file tree
Showing 4 changed files with 134 additions and 127 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,12 @@ jobs:
with:
python-version: "3.9"
- name: Type check
working-directory: ./python
run: |
pip install tox
tox -e type-check
test:
name: Buid and Test Module
name: Build and Test Module
runs-on: ${{ matrix.os }}
strategy:
matrix:
Expand Down
101 changes: 63 additions & 38 deletions LICENSE
Original file line number Diff line number Diff line change
@@ -1,44 +1,69 @@
tempo
Databricks License
Copyright (2019) Databricks, Inc.

Copyright (2019) Databricks, Inc.
Definitions.

Agreement: The agreement between Databricks, Inc., and you governing
the use of the Databricks Services, as that term is defined in
the Master Cloud Services Agreement (MCSA) located at
www.databricks.com/legal/mcsa.

Licensed Materials: The source code, object code, data, and/or other
works to which this license applies.

This library (the "Software") may not be used except in connection with the Licensee's use of the Databricks Platform Services pursuant
to an Agreement (defined below) between Licensee (defined below) and Databricks, Inc. ("Databricks"). The Object Code version of the
Software shall be deemed part of the Downloadable Services under the Agreement, or if the Agreement does not define Downloadable Services,
Subscription Services, or if neither are defined then the term in such Agreement that refers to the applicable Databricks Platform
Services (as defined below) shall be substituted herein for “Downloadable Services.” Licensee's use of the Software must comply at
all times with any restrictions applicable to the Downlodable Services and Subscription Services, generally, and must be used in
accordance with any applicable documentation. For the avoidance of doubt, the Software constitutes Databricks Confidential Information
under the Agreement.
Scope of Use. You may not use the Licensed Materials except in
connection with your use of the Databricks Services pursuant to
the Agreement. Your use of the Licensed Materials must comply at all
times with any restrictions applicable to the Databricks Services,
generally, and must be used in accordance with any applicable
documentation. You may view, use, copy, modify, publish, and/or
distribute the Licensed Materials solely for the purposes of using
the Licensed Materials within or connecting to the Databricks Services.
If you do not agree to these terms, you may not view, use, copy,
modify, publish, and/or distribute the Licensed Materials.

Redistribution. You may redistribute and sublicense the Licensed
Materials so long as all use is in compliance with these terms.
In addition:

- You must give any other recipients a copy of this License;
- You must cause any modified files to carry prominent notices
stating that you changed the files;
- You must retain, in any derivative works that you distribute,
all copyright, patent, trademark, and attribution notices,
excluding those notices that do not pertain to any part of
the derivative works; and
- If a "NOTICE" text file is provided as part of its
distribution, then any derivative works that you distribute
must include a readable copy of the attribution notices
contained within such NOTICE file, excluding those notices
that do not pertain to any part of the derivative works.

Additionally, and notwithstanding anything in the Agreement to the contrary:
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
* you may view, make limited copies of, and may compile the Source Code version of the Software into an Object Code version of the
Software. For the avoidance of doubt, you may not make derivative works of Software (or make any any changes to the Source Code
version of the unless you have agreed to separate terms with Databricks permitting such modifications (e.g., a contribution license
agreement)).
You may add your own copyright statement to your modifications and may
provide additional license terms and conditions for use, reproduction,
or distribution of your modifications, or for any such derivative works
as a whole, provided your use, reproduction, and distribution of
the Licensed Materials otherwise complies with the conditions stated
in this License.

If you have not agreed to an Agreement or otherwise do not agree to these terms, you may not use the Software or view, copy or compile
the Source Code of the Software.

This license terminates automatically upon the termination of the Agreement or Licensee's breach of these terms. Additionally,
Databricks may terminate this license at any time on notice. Upon termination, you must permanently delete the Software and all
copies thereof (including the Source Code).
Termination. This license terminates automatically upon your breach of
these terms or upon the termination of your Agreement. Additionally,
Databricks may terminate this license at any time on notice. Upon
termination, you must permanently delete the Licensed Materials and
all copies thereof.

Agreement: the agreement between Databricks and Licensee governing the use of the Databricks Platform Services, which shall be, with
respect to Databricks, the Databricks Terms of Service located at www.databricks.com/termsofservice, and with respect to Databricks
Community Edition, the Community Edition Terms of Service located at www.databricks.com/ce-termsofuse, in each case unless Licensee
has entered into a separate written agreement with Databricks governing the use of the applicable Databricks Platform Services.

Databricks Platform Services: the Databricks services or the Databricks Community Edition services, according to where the Software is used.

Licensee: the user of the Software, or, if the Software is being used on behalf of a company, the company.

Object Code: is version of the Software produced when an interpreter or a compiler translates the Source Code into recognizable and
executable machine code.

Source Code: the human readable portion of the Software.
DISCLAIMER; LIMITATION OF LIABILITY.

THE LICENSED MATERIALS ARE PROVIDED “AS-IS” AND WITH ALL FAULTS.
DATABRICKS, ON BEHALF OF ITSELF AND ITS LICENSORS, SPECIFICALLY
DISCLAIMS ALL WARRANTIES RELATING TO THE LICENSED MATERIALS, EXPRESS
AND IMPLIED, INCLUDING, WITHOUT LIMITATION, IMPLIED WARRANTIES,
CONDITIONS AND OTHER TERMS OF MERCHANTABILITY, SATISFACTORY QUALITY OR
FITNESS FOR A PARTICULAR PURPOSE, AND NON-INFRINGEMENT. DATABRICKS AND
ITS LICENSORS TOTAL AGGREGATE LIABILITY RELATING TO OR ARISING OUT OF
YOUR USE OF OR DATABRICKS’ PROVISIONING OF THE LICENSED MATERIALS SHALL
BE LIMITED TO ONE THOUSAND ($1,000) DOLLARS. IN NO EVENT SHALL
THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR
OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
ARISING FROM, OUT OF OR IN CONNECTION WITH THE LICENSED MATERIALS OR
THE USE OR OTHER DEALINGS IN THE LICENSED MATERIALS.
153 changes: 67 additions & 86 deletions python/tempo/tsdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,20 @@
import logging
import operator
from abc import ABCMeta, abstractmethod
from functools import cached_property, reduce
from typing import Any, Callable, Collection, Dict, List, Optional, Sequence, TypeVar, \
Union, \
cast, overload
from functools import cached_property
from typing import Any, Callable, List, Optional, Sequence, TypeVar, Union
from typing import Collection, Dict, cast, overload

import pyspark.sql.functions as sfn
from IPython.core.display import HTML
from IPython.display import display as ipydisplay
from pyspark.sql import GroupedData, SparkSession
from pyspark.sql import GroupedData
from pyspark.sql import SparkSession
from pyspark.sql.column import Column
from pyspark.sql.dataframe import DataFrame
from pyspark.sql.types import DataType, StructType
from pyspark.sql.window import Window, WindowSpec
from scipy.fft import fft, fftfreq # type: ignore

import tempo.interpol as t_interpolation
import tempo.io as t_io
Expand Down Expand Up @@ -192,33 +193,38 @@ def __addPrefixToColumns(self, col_list: list[str], prefix: str) -> "TSDF":
"""
Add prefix to all specified columns.
"""
if prefix != "":
prefix = prefix + "_"

df = reduce(
lambda df, idx: df.withColumnRenamed(
col_list[idx], "".join([prefix, col_list[idx]])
),
range(len(col_list)),
self.df,
)

if prefix == "":
ts_col = self.ts_col
else:
ts_col = "".join([prefix, self.ts_col])

return TSDF(df, ts_col=ts_col, series_ids=self.series_ids)
# no-op if no prefix
if not prefix:
return self

# build a column rename map
col_map = {col: "_".join([prefix, col]) for col in col_list}
# TODO - In the future (when Spark 3.4+ is standard) we should implement batch rename using:
# df = self.df.withColumnsRenamed(col_map)

# build a list of column expressions to rename columns in a select
select_exprs = [
sfn.col(col).alias(col_map[col]) if col in col_map else sfn.col(col)
for col in self.df.columns
]
# select the renamed columns
renamed_df = self.df.select(*select_exprs)

# find the structural columns
ts_col = col_map.get(self.ts_col, self.ts_col)
partition_cols = [col_map.get(c, c) for c in self.partitionCols]
sequence_col = col_map.get(self.sequence_col, self.sequence_col)
return TSDF(renamed_df, ts_col, partition_cols, sequence_col=sequence_col)

def __addColumnsFromOtherDF(self, other_cols: Sequence[str]) -> "TSDF":
"""
Add columns from some other DF as lit(None), as pre-step before union.
"""
new_df = reduce(
lambda df, idx: df.withColumn(other_cols[idx], sfn.lit(None)),
range(len(other_cols)),
self.df,
)

# build a list of column expressions to rename columns in a select
current_cols = [sfn.col(col) for col in self.df.columns]
new_cols = [sfn.lit(None).alias(col) for col in other_cols]
new_df = self.df.select(current_cols + new_cols)

return self.__withTransformedDF(new_df)

Expand Down Expand Up @@ -252,54 +258,41 @@ def __getLastRightRow(
.rowsBetween(Window.unboundedPreceding, Window.currentRow)
)

# generate expressions to find the last value of each right-hand column
if ignoreNulls is False:
if tsPartitionVal is not None:
raise ValueError(
"Disabling null skipping with a partition value is not supported yet."
)
df = reduce(
lambda df, idx: df.withColumn(
right_cols[idx],
sfn.last(
sfn.when(
sfn.col("rec_ind") == -1, sfn.struct(right_cols[idx])
).otherwise(None),
True, # ignore nulls because it indicates rows from the left side
).over(window_spec),
),
range(len(right_cols)),
self.df,
)
df = reduce(
lambda df, idx: df.withColumn(
right_cols[idx], sfn.col(right_cols[idx])[right_cols[idx]]
),
range(len(right_cols)),
df,
)
mod_right_cols = [
sfn.last(
sfn.when(sfn.col("rec_ind") == -1, sfn.struct(col)).otherwise(None),
True,
)
.over(window_spec)[col]
.alias(col)
for col in right_cols
]
elif tsPartitionVal is None:
# splitting off the condition as we want different columns in the reduce if implementing the skew AS OF join
df = reduce(
lambda df, idx: df.withColumn(
right_cols[idx],
sfn.last(right_cols[idx], ignoreNulls).over(window_spec),
),
range(len(right_cols)),
self.df,
)
mod_right_cols = [
sfn.last(col, ignoreNulls).over(window_spec).alias(col)
for col in right_cols
]
else:
df = reduce(
lambda df, idx: df.withColumn(
right_cols[idx],
sfn.last(right_cols[idx], ignoreNulls).over(window_spec),
).withColumn(
"non_null_ct" + right_cols[idx],
sfn.count(right_cols[idx]).over(window_spec),
),
range(len(right_cols)),
self.df,
)
mod_right_cols = [
sfn.last(col, ignoreNulls).over(window_spec).alias(col)
for col in right_cols
]
# non-null count columns, these will be dropped below
mod_right_cols += [
sfn.count(col).over(window_spec).alias("non_null_ct" + col)
for col in right_cols
]

# select the left-hand side columns, and the modified right-hand side columns
non_right_cols = list(set(self.df.columns) - set(right_cols))
df = self.df.select(non_right_cols + mod_right_cols)
# drop the null left-hand side rows
df = (df.filter(sfn.col(left_ts_col).isNotNull()).drop(self.ts_col)).drop(
"rec_ind"
)
Expand Down Expand Up @@ -780,16 +773,8 @@ def asofJoin(
left_cols = list(set(left_df.columns) - set(self.series_ids))
right_cols = list(set(right_df.columns) - set(right_tsdf.series_ids))

left_prefix = (
""
if ((left_prefix is None) | (left_prefix == ""))
else left_prefix + "_"
)
right_prefix = (
""
if ((right_prefix is None) | (right_prefix == ""))
else right_prefix + "_"
)
left_prefix = left_prefix + "_" if left_prefix else ""
right_prefix = right_prefix + "_" if right_prefix else ""

w = Window.partitionBy(*partition_cols).orderBy(
right_prefix + right_tsdf.ts_col
Expand Down Expand Up @@ -858,17 +843,13 @@ def asofJoin(
[right_tsdf.ts_col] + orig_right_col_diff, right_prefix
)

left_nonpartition_cols = list(
set(left_tsdf.df.columns) - set(self.series_ids)
left_columns = list(
set(left_tsdf.df.columns).difference(set(self.partitionCols))
)
right_nonpartition_cols = list(
set(right_tsdf.df.columns) - set(self.series_ids)
right_columns = list(
set(right_tsdf.df.columns).difference(set(self.partitionCols))
)

# For both dataframes get all non-partition columns (including ts_col)
left_columns = [left_tsdf.ts_col] + left_nonpartition_cols
right_columns = [right_tsdf.ts_col] + right_nonpartition_cols

# Union both dataframes, and create a combined TS column
combined_ts_col = "combined_ts"
combined_df = left_tsdf.__addColumnsFromOtherDF(right_columns).__combineTSDF(
Expand Down Expand Up @@ -1478,7 +1459,7 @@ def null_safe_equals(col1: Column, col2: Column) -> Column:
)

def state_comparison_fn(a: CT, b: CT) -> Callable[[Column, Column], Column]:
return operator_dict[state_definition](a, b) # type: ignore
return operator_dict[state_definition](a, b)

elif callable(state_definition):
state_comparison_fn = state_definition # type: ignore
Expand Down
4 changes: 2 additions & 2 deletions python/tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ deps =
flake8
black
commands =
black --check {toxinidir}
flake8
black --check {toxinidir}/tempo
flake8 --config {toxinidir}/.flake8 {toxinidir}/tempo

[testenv:type-check]
description = run type checks
Expand Down

0 comments on commit a81d34d

Please sign in to comment.