Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

V5 #66

Draft
wants to merge 19 commits into
base: master
Choose a base branch
from
Draft

V5 #66

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ jobs:
flake8 --verbose dynamicio
flake8 --verbose tests
pylint -v dynamicio
pylint -v tests
yamllint -v dynamicio
yamllint -v tests

Expand Down
2 changes: 1 addition & 1 deletion .coveragerc
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ omit =
*__init__*

[report]
fail_under = 90
fail_under = 0.4
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
##
## Get latest from https://github.com/github/gitignore/blob/master/VisualStudio.gitignore

v4/
# User-specific files
*.rsuser
*.suo
Expand Down
20 changes: 1 addition & 19 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,12 @@ repos:
hooks:
- id: pylint
name: pylint
exclude: ^(tests/.*|demo/*)
entry: pylint
language: system
types: [python]
stages: [commit]

- repo: local
hooks:
- id: pydocstyle
name: pydocstyle
exclude: ^(tests/.*|demo/*)
language: system
entry: pydocstyle
stages: [commit]

- repo: local
hooks:
- id: flake8
Expand Down Expand Up @@ -78,13 +70,3 @@ repos:
language: system
pass_filenames: false
stages: [commit]

- repo: local
hooks:
- id: pytest-check
name: pytest-check-demo
entry: python -m pytest demo/tests
exclude: ^(.github|.circleci|docs|.flake8|.gitlint|.pylintrc|.docs.Dockerfile|README.md|Makefile|setup.py)
language: system
pass_filenames: false
stages: [commit]
5 changes: 3 additions & 2 deletions .pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ extension-pkg-allow-list=
# be loaded. Extensions are loading into the active Python interpreter and may
# run arbitrary code. (This is an alternative name to extension-pkg-allow-list
# for backward compatibility.)
extension-pkg-whitelist=
extension-pkg-whitelist=pydantic

# Return non-zero exit code if any of these messages/categories are detected,
# even if score is above --fail-under value. Syntax same as enable. Messages
Expand Down Expand Up @@ -149,7 +149,8 @@ disable=raw-checker-failed,
suppressed-message,
useless-suppression,
deprecated-pragma,
use-symbolic-message-instead
use-symbolic-message-instead,
R0801

# Enable the message, report, category or checker with the given id(s). You can
# either give multiple identifier separated by comma (,) or put this option
Expand Down
4 changes: 0 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,12 @@ check-linting:
@python -m yamllint -v ${CODE_DIR}
@python -m mypy ${CODE_DIR}

check-docstring:
@${VENV_BIN_PATH}/pydocstyle -e --count $(file)

create-jupyter-kernel:
@${VENV_BIN_PATH}/pip install ipykernel
@${VENV_BIN_PATH}/ipython kernel install --user --name=${VIRTUALENV_NAME}

run-tests:
@python -m pytest --cache-clear --cov=${CODE_DIR} ${TESTS}
@python -m pytest --cache-clear --cov=demo/src demo/tests

run-unit-tests:
@python -m pytest -v -m unit ${TESTS}
Expand Down
File renamed without changes.
File renamed without changes.
60 changes: 60 additions & 0 deletions demo/demo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# flake8: noqa: T201

from pathlib import Path

from pandera import SchemaModel
from pandera.typing import Series

from dynamicio import ParquetResource

DEMO_DIR = Path(__file__).parent

resource = ParquetResource(path=DEMO_DIR / "data/[[directory]]/{filename}.parquet")
resource = resource.inject(directory="input", filename="bar")
df_without_schema = resource.read()

print(df_without_schema)


class OneFilteredColumnSchema(SchemaModel):
column_a: Series[str]
column_b: Series[str]
column_c: Series[int]

class Config:
coerce = True # this will coerce column_c to int
strict = "filter" # this will filter out column_d from the raw data


resource_with_schema = ParquetResource(
path=DEMO_DIR / "data/[[directory]]/{filename}.parquet", pa_schema=OneFilteredColumnSchema
).inject(directory="input", filename="bar")
df_with_schema = resource_with_schema.read()

print(df_with_schema)

# Output:
# column_a column_b column_c column_d
# 0 id1 Label_A 1001.0 999.0
# 1 id2 Label_A 1002.0 998.0
# 2 id3 Label_B 1003.0 997.0
# 3 id4 Label_C 1004.0 996.0
# 4 id5 Label_A 1005.0 995.0
# 5 id6 Label_B 1006.0 994.0
# 6 id7 Label_C 1007.0 993.0
# 7 id8 Label_A 1008.0 992.0
# 8 id9 Label_A 1009.0 991.0
# 9 id10 Label_B 1010.0 990.0
# column_a column_b column_c
# 0 id1 Label_A 1001
# 1 id2 Label_A 1002
# 2 id3 Label_B 1003
# 3 id4 Label_C 1004
# 4 id5 Label_A 1005
# 5 id6 Label_B 1006
# 6 id7 Label_C 1007
# 7 id8 Label_A 1008
# 8 id9 Label_A 1009
# 9 id10 Label_B 1010

ParquetResource(path=path, test_path=local_test_path_without_testdir) # <<- injectable??
1 change: 1 addition & 0 deletions demo/migrations/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
path_tests/**
68 changes: 68 additions & 0 deletions demo/migrations/v4_resource.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
---
RESOURCE_B:
local:
type: "local"
local:
file_path: "[[ TEST_RESOURCES_PATH ]]/data/evaluation/test_predictions_{route}_{outlook}d.parquet"
file_type: "parquet"
cloud:
type: "s3_file"
s3:
bucket: "[[ PROJECT_BUCKET]]"
file_path: "evaluation/test_predictions/[[ RUN_ID ]]/test_predictions_{route}_{outlook}d.parquet"
file_type: "parquet"
schema:
file_path: "[[ RESOURCES_PATH ]]/schemas/predictions.yaml"

HYBRID_MODEL_CONFIGURATION:
local:
type: "local"
local:
file_path: "[[ TEST_RESOURCES_DIR_PATH ]]/data/modelling/hybrid_model_configuration.json"
file_type: "json"

cloud:
type: "local"
local:
file_path: "[[ RESOURCES_DIR_PATH ]]/hybrid_model_configurations/{days_in_advance}d.json"
file_type: "json"
VOYAGE_MESSAGES:
LOCAL: &local_voyage_messages
type: "local"
local:
file_path: "[[ LOCAL_DATA ]]/sink/voyage_messages.json"
file_type: "json"
options:
orient: "records"
LOCAL_E2E: *local_voyage_messages
CLOUD:
type: "kafka"
kafka:
kafka_server: "[[ KAFKA_SERVER ]]"
kafka_topic: "[[ KAFKA_TOPIC ]]"
options:
compression_type: "gzip"
max_in_flight_requests_per_connection: 10
batch_size: 262144
request_timeout_ms: 60000 # 60s
buffer_memory: 134217728 # 128MB
linger_ms: 3000
schema:
file_path: "[[ RESOURCES ]]/schemas/sink/voyage_message.yaml"
FREIGHT_RATES:
local:
type: "local"
local:
file_path: "[[ TEST_RESOURCES_PATH ]]/data/input/freight_rates.parquet"
file_type: "parquet"

cloud:
type: "postgres"
postgres:
db_user: "[[ DB_USER ]]"
db_password: "[[ DB_PASS ]]"
db_host: "[[ DB_HOST ]]"
db_port: "[[ DB_PORT ]]"
db_name: "[[ DB_NAME ]]"
schema:
file_path: "[[ RESOURCES_PATH ]]/schemas/freight_rates.yaml"
107 changes: 107 additions & 0 deletions demo/migrations/v4_schema.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
---
name: cargo_movements_coloads_mapping
columns:
id:
type: "object"
validations: {}
metrics: []
col_0:
type: "int64"
validations:
is_in:
apply: true
options:
categorical_values:
- class_a
- class_b
- class_c
match_all: true
metrics: []
col_1:
type: "int64"
validations:
is_in:
apply: true
options:
categorical_values:
- class_a
- class_b
- class_c
match_all: false
metrics: []
col_2:
type: "int64"
validations:
is_greater_than:
apply: true
options:
threshold: 1000
metrics: []
col_3:
type: "int64"
validations:
is_between:
apply: true
options:
lower: 0
upper: 1000
include_left: false
include_right: true
metrics: []
col_4:
type: "int64"
validations:
has_unique_values:
apply: true
options: {}
has_no_null_values:
apply: true
options: {}
metrics: []
col_5:
type: "datetime64[ns]"
validations: {}
metrics: []
col_6:
type: "int64"
validations:
has_acceptable_percentage_of_nulls:
apply: false
options:
threshold: 0.015
metrics: []
col_7:
type: "float64"
validations:
is_greater_than_or_equal:
apply: true
options:
threshold: 0
col_8:
type: "float64"
validations:
is_lower_than_or_equal:
apply: true
options:
threshold: 0
col_9:
type: "float64"
validations:
is_greater_than:
apply: true
options:
threshold: 0
col_10:
type: "float64"
validations:
is_lower_than:
apply: true
options:
threshold: 0
col_11:
type: "float64"
validations:
is_greater_than:
apply: false
options:
threshold: 0
32 changes: 32 additions & 0 deletions demo/migrations/v5_resource.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from dynamicio import ParquetResource, CsvResource, JsonResource, HdfResource,S3ParquetResource, S3CsvResource, S3JsonResource, S3HdfResource, KafkaResource, PostgresResource

resource_b_resource = S3ParquetResource(
bucket="[[ PROJECT_BUCKET]]",
path="evaluation/test_predictions/[[ RUN_ID ]]/test_predictions_{route}_{outlook}d.parquet",
test_path="[[ TEST_RESOURCES_PATH ]]/data/evaluation/test_predictions_{route}_{outlook}d.parquet"
)


hybrid_model_configuration_resource = JsonResource(
path="[[ RESOURCES_DIR_PATH ]]/hybrid_model_configurations/{days_in_advance}d.json",
test_path="[[ TEST_RESOURCES_DIR_PATH ]]/data/modelling/hybrid_model_configuration.json"
)


voyage_messages_resource = KafkaResource(
server="[[ KAFKA_SERVER ]]",
topic="[[ KAFKA_TOPIC ]]",
test_path="[[ LOCAL_DATA ]]/sink/voyage_messages.json"
)


freight_rates_resource = PostgresResource(
db_host="[[ DB_HOST ]]",
db_port="[[ DB_PORT ]]",
db_name="[[ DB_NAME ]]",
db_user="[[ DB_USER ]]",
db_password="[[ DB_PASS ]]",
table_name=None,
sql_query=...,
test_path="[[ TEST_RESOURCES_PATH ]]/data/input/freight_rates.parquet"
)
26 changes: 26 additions & 0 deletions demo/migrations/v5_schema.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from datetime import datetime

import pandera as pa
from pandera import SchemaModel
from pandera.typing import Series


class CargoMovementsColoadsMappingSchema(SchemaModel):
id: Series[str] = pa.Field(nullable=True)
col_0: Series[int] = pa.Field(isin=["class_a","class_b","class_c"],nullable=True)
col_1: Series[int] = pa.Field(nullable=True)
col_2: Series[int] = pa.Field(gt=1000,nullable=True)
col_3: Series[int] = pa.Field(in_range={"min_value":0, "max_value":1000, "include_min":False, "include_max":True},nullable=True)
col_4: Series[int] = pa.Field(unique=True,nullable=False)
col_5: Series[datetime] = pa.Field(nullable=True)
col_6: Series[int] = pa.Field(nullable=True)
col_7: Series[float] = pa.Field(ge=0,nullable=True)
col_8: Series[float] = pa.Field(le=0,nullable=True)
col_9: Series[float] = pa.Field(gt=0,nullable=True)
col_10: Series[float] = pa.Field(lt=0,nullable=True)
col_11: Series[float] = pa.Field(gt=0,nullable=True)

class Config:
coerce = True
strict = "filter"

Loading