Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add file and database utilities #14

Merged
merged 31 commits into from
Dec 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
c24d1cd
wip: add config
PaulKalho Oct 28, 2024
a77f4be
wip: add yaml dependency
PaulKalho Oct 29, 2024
d259855
feat: add validation and loading of config file
PaulKalho Oct 30, 2024
46d2f9e
style: fix linting
PaulKalho Oct 30, 2024
b2d3527
Merge branch 'main' into issue/compute-block-config
PaulKalho Oct 30, 2024
1fbbd39
style: remove line
PaulKalho Oct 30, 2024
f6e7150
docs: fix note
PaulKalho Oct 30, 2024
73c8843
feat: add optional to inputs validation
PaulKalho Nov 4, 2024
f086823
feat: remodel config structure
PaulKalho Nov 4, 2024
8d8574a
feat: ensure env_key is set
PaulKalho Nov 4, 2024
6b56af7
feat: allow multiple envs for input configuration
PaulKalho Nov 7, 2024
8cc2270
docs: update readme
PaulKalho Nov 7, 2024
61738de
style: add better comments
PaulKalho Nov 7, 2024
c543401
feat: add more datatypes for env keys
PaulKalho Nov 9, 2024
771b5b1
feat: add basic validation
PaulKalho Nov 10, 2024
dde0e50
feat: add validation
PaulKalho Nov 11, 2024
1e79770
docs: update readme
PaulKalho Nov 11, 2024
f3553ad
style: rename base class
PaulKalho Nov 11, 2024
bf89361
feat: add input output abstraction to settings
PaulKalho Nov 11, 2024
55dac98
feat: validate in outputs with load_and_validate func
PaulKalho Nov 12, 2024
5e12337
feat: validate on execute and custom validation function
PaulKalho Nov 12, 2024
1799e7d
tests: add and refactor tests
PaulKalho Nov 12, 2024
e9eb8d6
feat: export important functions directly
PaulKalho Nov 12, 2024
ea62a09
feat: add test workflow
PaulKalho Nov 13, 2024
2db72a7
wip: concept
PaulKalho Nov 18, 2024
a9528b9
feat: add basic functionality
PaulKalho Nov 18, 2024
f27235b
feat: implement basic database operations
PaulKalho Nov 19, 2024
a0c0dbf
feat: enable up/downloads to/from s3-buckets
PaulKalho Nov 19, 2024
39d8666
docs: add s3 documentation
PaulKalho Nov 20, 2024
f993771
style: remove solved TODOs
PaulKalho Nov 20, 2024
1f622d4
chore: update major version number
PaulKalho Dec 9, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
name: test
on:
pull_request:

jobs:
run-tests:
runs-on: ubuntu-latest
steps:
- name: Checkout repository
uses: actions/checkout@v4

- name: Setup python
uses: actions/setup-python@v5
with:
cache: "pip"

- name: Install dependencies
run: |
python -m venv .venv
source .venv/bin/activate
pip install --upgrade pip
pip install -e .

- name: Run tests
run: |
source .venv/bin/activate
python -m unittest discover -s tests
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ __pycache__/
dist/
build/
venv/
.venv/
1 change: 1 addition & 0 deletions MANIFEST.in
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
include scystream-sdk/scystream/sdk/spark_jars/*
213 changes: 210 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,37 @@ You can install the package via pip once it's published:
pip install scystream-sdk
```

## Usage
## Introduction

One of the central concepts of scystream are the so-called **Compute Blocks**.

A Compute Block describes an independent programm, that acts as some kind of worker
which will be scheduled using the scystream-core application.
This worker executes a task (e.g. a NLP task, a crawling task).

This SDK aims to provide helper functions and all other requirements you need to implement
a custom Compute Block on your own.

Each worker can have multiple entrypoints, each aiming to solve one task.
These entrypoints can be configured from the outside using the **Settings**.
These are basically ENV-Variables, which will be parsed & validated using pydantic.

You can either set "global" Settings (for the entrypoint), by using the `envs` block.
Or you can set "input/output-related" Settings by using the `config` block in each input/output.

## Basic Usage of the SDK

```python3
from scystream.sdk.core import entrypoint
from scystream.sdk.scheduler import Scheduler


@entrypoint
@entrypoint()
def example_task():
print("Executing example_task...")


@entrypoint
@entrypoint()
def another_task(task_name):
print(f"Executing another_task with task name: {task_name}")

Expand All @@ -35,3 +53,192 @@ if __name__ == "__main__":
main()

```

## Defining Settings and Using them.

Earlier, we already wrote about **Settings**.
Each Input & Output can be configured using these settings.
There are also Global Settings, refered to as `envs` in the `cbc.yaml`

Below you can find a simple example of how we define & validate these settings.
Therefore you should use the `EnvSettings` class.

```python3
from scystream.sdk.core import entrypoint
from scystream.sdk.env.settings import EnvSettings

class TextDataInputSettings(EnvSettings):
TXT_SRC_PATH: str # no default provided, manual setting is a MUST

class DBDataInputSettings(EnvSettings):
DATA_TABLE_NAME: str = "nlp_information"
DB_HOST: str = "time.rwth-aachen.de"
DB_PORT: str = 1234

class TopicModellingEntrypointSettings(EnvSettings):
LANGUAGE: str = "de"

text_data: TextDataInputSettings
db_data: DBDataInputSettings

@entrypoint(TopicModellingEntrypointSettings) # Pass it to the Entrypoint
def topic_modelling(settings): # The settings param is automatically injected to your function, you can use it
print(f"Running topic modelling, using file: {settings.text_data.TXT_SRC_PATH}")

@entrypoint()
def test_entrypint():
print("This entrypoint does not have any configs.")
```

Of course, you will also be able to use your settings in other files/directories.
For that, just import your desired setting and use the `get_settings()` function.
It will load the configurations correctly.

## Compute Block Config

We expect every repository which will be used within the scystream application
to contain a **Compute Block Config File**, the `cbc.yaml`, within the root directory.
This `cbc.yaml` will be used to define the entrypoints, the inputs & outputs each
Compute Block offers, necessary for the scystream-frontend to understand.

This is an example `cbc.yaml`:

```yaml
name: "NLP toolbox"
description: "Contains NLP algorithms..."
author: "John Doe"
docker_image: "https://ghcr.io/nlp-toolbox"

entrypoints:
topic_modelling:
description: "Run topic modelling"
envs:
LANGUAGE: "de"
inputs:
text_data:
description: "Text file. Can be uploaded by the user."
type: "file"
config:
TXT_SRC_PATH: null
db_data:
description: "Information in a database"
type: "db_table"
config:
DATA_TABLE_NAME: "nlp_information"
DB_HOST: "time.rwth-aachen.de"
DB_PORT: 1234
outputs:
topic_model:
type: "file"
description: "Topic model file"
config:
OUTPUT_PATH_TOPIC_MODEL: null
run_durations:
type: "db_table"
description: "Table that contains the run durations per day."
config:
RUN_DURATIONS_TABLE_NAME: "run_durations_nlp"

analyze_runtime:
description: "Analyze the runtimes"
inputs:
run_durations:
description: "Table that contains all runtimes and dates"
type: "db_table"
config:
RUN_DURATIONS_TABLE_NAME: "run_durations_nlp"
outputs:
csv_output:
type: "file"
description: "A csv containing statistical information"
config:
CSV_OUTPUT_PATH: "outputs/statistics.csv"
```

### Generating a config

After writing the functionality of your ComputeBlock (see more below) you can generate
the corresponding `cbc.yaml` by using the following function:

```python3
from scystream.sdk.config import generate_config_from_compute_block, get_compute_block
from pathlib import Path

@entrypoint()
def example_entrypoint():
print("Example...")

if __name__ == "__main__":
compute_block = get_compute_block()
generate_config_from_compute_block(compute_block, Path("cbc.yaml"))
```

This will take all the entrypoints, their defined settings, and generate a config from them.

> [!NOTE]
> Make sure to edit the generated config by your user-defined metadata
> (e.g. author, description, docker_image, ...)

### Validating a config

If you want your `cbc.yaml` to be located in a different directory or have a different name, you
have to configure that accordingly:

```python3
from scystream.sdk.config import global_config

if __name__ == "__main__":
# Set the config_path
global_config.set_config_path("custom_dir/custom_name.yaml")
```

Of course, you can also write the config completely on your own.

> [!NOTE]
> When using `Scheduler.execute_function("entrypoint")` the Settings for the
> entrypoint and the config will be validated.
> If the Settings do not correspond to the definition in the yaml, execution will not be possible.

To validate the config, you can also use a helper function like this:

```python3
from scystream.sdk.config import validate_config_with_code

@entrypoint()
def example_entrypoint():
print("Example...")

if __name__ == "__main__":
validate_config_with_code()
```

## Development of the SDK

### Installation

1. Create a venv and use it

```bash
python3 -m venv .venv
source .venv/bin/activate
PaulKalho marked this conversation as resolved.
Show resolved Hide resolved
```

2. Install the package within the venv

> [!NOTE]
> This will also install all the install_requirements from the setup.py

```bash
pip install -e .
```

3. Develop!

### Tests

To run all the tests run the following command:

```bash
python3 -m unittest discover -s tests
```

7 changes: 7 additions & 0 deletions scystream/sdk/config/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from .config_loader import \
validate_config_with_code, load_config
from .compute_block_utils import get_compute_block
from .config_loader import SDKConfig

__all__ = ["validate_config_with_code",
"load_config", "EnvSettings", "get_compute_block", "SDKConfig"]
72 changes: 72 additions & 0 deletions scystream/sdk/config/compute_block_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
from typing import Union
from pydantic_core import PydanticUndefinedType
from scystream.sdk.config.models import ComputeBlock, Entrypoint, \
InputOutputModel
from scystream.sdk.env.settings import InputSettings, \
OutputSettings
from scystream.sdk.config.entrypoints import get_registered_functions


def _get_pydantic_default_value_or_none(value):
if type(value.default) is PydanticUndefinedType:
return None
return value.default


def _build_input_output_dict_from_class(
subject: Union[InputSettings, OutputSettings]
):
config_dict = {}
for key, value in subject.model_fields.items():
config_dict[key] = _get_pydantic_default_value_or_none(value)
return InputOutputModel(
type="TODO: SetType",
description="<to-be-set>",
config=config_dict
)


def get_compute_block() -> ComputeBlock:
"""
Converts Entrypoints & Settings to a ComputeBlock
"""
entrypoints = {}
for entrypoint, func in get_registered_functions().items():
envs = {}
inputs = {}
outputs = {}

if func["settings"]:
entrypoint_settings_class = func["settings"]
for key, value in entrypoint_settings_class.model_fields.items():
if (
isinstance(value.default_factory, type) and
issubclass(value.default_factory, InputSettings)
):
inputs[key] = _build_input_output_dict_from_class(
value.default_factory
)
elif (
isinstance(value.default_factory, type) and
issubclass(value.default_factory, OutputSettings)
):
outputs[key] = _build_input_output_dict_from_class(
value.default_factory
)
else:
envs[key] = _get_pydantic_default_value_or_none(value)

entrypoints[entrypoint] = Entrypoint(
description="<tbd>",
envs=envs if envs != {} else None,
inputs=inputs if inputs != {} else None,
outputs=outputs if outputs != {} else None
)

return ComputeBlock(
name="<tbs>",
description="<tbs>",
author="<tbs>",
entrypoints=entrypoints,
docker_image="<tbs>"
)
Loading
Loading