Skip to content

Commit

Permalink
Polars expressions chain (okube-ai#208)
Browse files Browse the repository at this point in the history
  • Loading branch information
osoucy authored Jun 11, 2024
1 parent 02cc87f commit b6afc32
Show file tree
Hide file tree
Showing 64 changed files with 1,467 additions and 1,652 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/run_quickstart.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ jobs:
shell: python

- name: Install Laktory
run: make install
run: make install-with-dbks

- name: make dir
run: |
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
* Polars DataFrame extension
* Polars Expressions extension
### Breaking changes
* Refactored column creation inside a transformer node
* Moved laktory Spark dataframe custom functions under a laktory namespace.

## [0.3.3] - 2024-05-30
Expand Down
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
install:
pip install ./

install-with-dbks:
pip install './[databricks]'

dev:
pip install -e './[pulumi, polars, spark, databricks, dev, test, azure, aws, gcp]'

Expand Down
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ node_brz = models.PipelineNode(
transformer={
"nodes": [
{
"spark_func_name": "select",
"spark_func_args": [
"func_name": "select",
"func_args": [
"symbol",
"timestamp",
"open",
Expand All @@ -83,8 +83,8 @@ node_slv = models.PipelineNode(
transformer={
"nodes": [
{
"spark_func_name": "drop_duplicates",
"spark_func_kwargs": {
"func_name": "drop_duplicates",
"func_kwargs": {
"subset": ["timestamp", "symbol"]
}
},
Expand Down
1 change: 1 addition & 0 deletions docs/api/models/transformers/basechainnode.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
::: laktory.models.BaseChainNode
9 changes: 9 additions & 0 deletions docs/api/models/transformers/polarschain.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
::: laktory.models.PolarsChain

--

::: laktory.models.PolarsChainNode

--

::: laktory.models.PolarsChainNodeColumn
8 changes: 8 additions & 0 deletions docs/api/models/transformers/sparkchain.md
Original file line number Diff line number Diff line change
@@ -1 +1,9 @@
::: laktory.models.SparkChain

--

::: laktory.models.SparkChainNode

--

::: laktory.models.SparkChainNodeColumn
1 change: 0 additions & 1 deletion docs/api/models/transformers/sparkchainnode.md

This file was deleted.

1 change: 0 additions & 1 deletion docs/api/models/transformers/sparkfuncarg.md

This file was deleted.

1 change: 0 additions & 1 deletion docs/api/polars/expressions/poly1.md

This file was deleted.

1 change: 0 additions & 1 deletion docs/api/polars/expressions/poly2.md

This file was deleted.

1 change: 0 additions & 1 deletion docs/api/polars/expressions/scaled_power.md

This file was deleted.

1 change: 0 additions & 1 deletion docs/api/spark/functions/add.md

This file was deleted.

1 change: 0 additions & 1 deletion docs/api/spark/functions/div.md

This file was deleted.

1 change: 0 additions & 1 deletion docs/api/spark/functions/mul.md

This file was deleted.

1 change: 0 additions & 1 deletion docs/api/spark/functions/poly1.md

This file was deleted.

1 change: 0 additions & 1 deletion docs/api/spark/functions/poly2.md

This file was deleted.

1 change: 0 additions & 1 deletion docs/api/spark/functions/scaled_power.md

This file was deleted.

1 change: 0 additions & 1 deletion docs/api/spark/functions/sub.md

This file was deleted.

8 changes: 4 additions & 4 deletions docs/concepts/models/pipeline.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,17 @@ nodes:
table_name: brz_stock_prices
transformer:
nodes:
- spark_func_name: select
spark_func_args:
- func_name: select
func_args:
- timestamp
- symbol
- open
- close
- high
- low
- volume
- spark_func_name: drop_duplicates
spark_func_kwargs:
- func_name: drop_duplicates
func_kwargs:
subset:
- symbol
- timestamp
Expand Down
23 changes: 16 additions & 7 deletions docs/concepts/models/transformers.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ transformation applied to a dataframe. A node declare the spark function
responsible for the transformation and the arguments to pass to that function.
Each function is expected to output a dataframe and receive as an input the
output of the previous node. As a convenience, a node can also declare a new
column. In this case, the function is expected to output a column.
column from a sql or spark expression.

For example, consider a simple dataframe with column `x` for which you want to:

Expand All @@ -35,16 +35,15 @@ df0 = spark.createDataFrame(pd.DataFrame({"x": [1, 2, 2, 3]}))
sc = models.SparkChain(
nodes=[
{
"spark_func_name": "withColumnRenamed",
"spark_func_args": ["x", "theta"],
"func_name": "withColumnRenamed",
"func_args": ["x", "theta"],
},
{
"column": {
"with_column": {
"name": "cos",
"type": "double",
"expr": "F.cos('theta')",
},
"spark_func_name": "cos",
"spark_func_args": ["theta"],
},
{
"spark_func_name": "drop",
Expand All @@ -62,4 +61,14 @@ over many of the benefits offered by SparkChain, have a look at these
[Spark vs SQL](https://www.linkedin.com/pulse/sparkling-queries-in-depth-spark-vs-sql-data-pipelines-olivier-soucy-nfyve/)
and
[SparkChain](https://www.linkedin.com/pulse/laktory-sparkchain-serializable-spark-based-data-olivier-soucy-oihxe/)
blog posts.
blog posts.


### Polars Chain
??? "API Documentation"
[`laktory.models.SparkChain`][laktory.models.SparkChain]<br>

The Polars chain is very similar to the Spark Chain as it defines a series of
core data transformations, except that it uses Polars instead of Spark as its
engine. The supported functions and syntax are also slightly different to
accommodate for Polars.
8 changes: 4 additions & 4 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,17 +60,17 @@ sink:
layer: SILVER
transformer:
nodes:
- spark_func_name: select
spark_func_args:
- func_name: select
func_args:
- timestamp
- symbol
- open
- close
- high
- low
- volume
- spark_func_name: drop_duplicates
spark_func_kwargs:
- func_name: drop_duplicates
func_kwargs:
subset:
- symbol
- timestamp
Expand Down
2 changes: 1 addition & 1 deletion laktory/cli/_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def model_post_init(self, __context):

# Check environment
if self.env is None:
env_names = list(self.stack.envs.keys())
env_names = list(self.stack.environments.keys())
if env_names:
logger.warn(
f"Environment not specified, defaulting to first available ({env_names[0]})"
Expand Down
18 changes: 8 additions & 10 deletions laktory/cli/quickstart_stack.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,23 +49,21 @@ resources:
table_name: slv_stock_prices
transformer:
nodes:
- column:
- with_column:
name: created_at
type: timestamp
sql_expression: data.created_at
- column:
sql_expr: data.created_at
- with_column:
name: symbol
spark_func_args:
- data.symbol
spark_func_name: coalesce
- column:
sql_expr: data.symbol
- with_column:
name: open
type: double
sql_expression: data.open
- column:
sql_expr: data.open
- with_column:
name: close
type: double
sql_expression: data.close
sql_expr: data.close

providers:
databricks:
Expand Down
7 changes: 7 additions & 0 deletions laktory/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,13 @@
"terraform",
]

SUPPORTED_DFTYPES = [
"POLARS",
"SPARK",
]

DEFAULT_DFTYPE = "SPARK"

SUPPORTED_DATATYPES = [
"_any",
"binary",
Expand Down
5 changes: 1 addition & 4 deletions laktory/models/datasinks/basedatasink.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ class BaseDataSink(BaseModel):
Attributes
----------
dataframe_type:
Type of dataframe
mode:
Write mode.
- overwrite: Overwrite existing data
Expand All @@ -27,11 +25,10 @@ class BaseDataSink(BaseModel):
- complete: Overwrite for streaming dataframes
"""

dataframe_type: Literal["SPARK", "POLARS"] = "SPARK"
mode: Union[Literal["OVERWRITE", "APPEND", "IGNORE", "ERROR", "COMPLETE"], None] = (
None
)
_pipeline_node: "PipelineNode" = None
_parent: "PipelineNode" = None

# ----------------------------------------------------------------------- #
# Properties #
Expand Down
1 change: 0 additions & 1 deletion laktory/models/datasinks/filedatasink.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,6 @@ def as_source(self, as_stream: bool = None) -> FileDataSource:
source = FileDataSource(
path=self.path,
format=self.format,
dataframe_type=self.dataframe_type,
)
if as_stream:
source.as_stream = as_stream
Expand Down
33 changes: 25 additions & 8 deletions laktory/models/datasources/basedatasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from pydantic import Field

from laktory.models.basemodel import BaseModel

from laktory.constants import DEFAULT_DFTYPE
from laktory.spark import SparkDataFrame
from laktory.spark import is_spark_dataframe
from laktory.polars import PolarsDataFrame
Expand Down Expand Up @@ -134,7 +134,7 @@ class BaseDataSource(BaseModel):
as_stream: bool = False
broadcast: Union[bool, None] = False
cdc: Union[DataSourceCDC, None] = None
dataframe_type: Literal["SPARK", "POLARS"] = "SPARK"
dataframe_type: Literal["SPARK", "POLARS"] = DEFAULT_DFTYPE
drops: Union[list, None] = None
filter: Union[str, None] = None
limit: Union[int, None] = None
Expand All @@ -143,7 +143,7 @@ class BaseDataSource(BaseModel):
sample: Union[DataFrameSample, None] = None
selects: Union[list[str], dict[str, str], None] = None
watermark: Union[Watermark, None] = None
_pipeline_node: "PipelineNode" = None
_parent: "PipelineNode" = None

@model_validator(mode="after")
def options(self) -> Any:
Expand All @@ -154,9 +154,9 @@ def options(self) -> Any:
elif is_polars_dataframe(self.mock_df):
self.dataframe_type = "POLARS"

if self.dataframe_type == "SPARK":
if self.dftype == "SPARK":
pass
elif self.dataframe_type == "POLARS":
elif self.dftype == "POLARS":
if self.as_stream:
raise ValueError("Polars DataFrames don't support streaming read.")
if self.watermark:
Expand All @@ -179,11 +179,28 @@ def is_cdc(self) -> bool:
"""If `True` source data is a change data capture (CDC)"""
return self.cdc is not None

@property
def user_dftype(self) -> Union[str, None]:
"""
User-configured dataframe type directly from model or from parent.
"""
if "dataframe_type" in self.__fields_set__:
return self.dataframe_type
if self._parent:
return self._parent.user_dftype
return None

@property
def dftype(self):
if self.user_dftype:
return self.user_dftype
return self.dataframe_type

@property
def is_orchestrator_dlt(self) -> bool:
"""If `True`, data source is used in the context of a DLT pipeline"""
is_orchestrator_dlt = False
if self._pipeline_node and self._pipeline_node.is_orchestrator_dlt:
if self._parent and self._parent.is_orchestrator_dlt:
is_orchestrator_dlt = True
return is_orchestrator_dlt

Expand All @@ -207,9 +224,9 @@ def read(self, spark=None) -> AnyDataFrame:
"""
if self.mock_df is not None:
df = self.mock_df
elif self.dataframe_type == "SPARK":
elif self.dftype == "SPARK":
df = self._read_spark(spark=spark)
elif self.dataframe_type == "POLARS":
elif self.dftype == "POLARS":
df = self._read_polars()
else:
raise ValueError(
Expand Down
2 changes: 1 addition & 1 deletion laktory/models/datasources/filedatasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class FileDataSource(BaseDataSource):
@model_validator(mode="after")
def options(self) -> Any:

if self.dataframe_type == "SPARK":
if self.dftype == "SPARK":
if self.format in [
"EXCEL",
]:
Expand Down
2 changes: 1 addition & 1 deletion laktory/models/datasources/pipelinenodedatasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ def _read_spark(self, spark) -> SparkDataFrame:
def _read_polars(self) -> PolarsDataFrame:

# Read from node output DataFrame (if available)
if self.node.output_df:
if self.node.output_df is not None:
logger.info(f"Reading pipeline node {self._id} from output DataFrame")
df = self.node.output_df

Expand Down
Loading

0 comments on commit b6afc32

Please sign in to comment.