From 57eb7d756c071cc893726b5a321d8ccf47b170b3 Mon Sep 17 00:00:00 2001 From: Christos Hadjinikolis Date: Sun, 15 Jan 2023 22:22:09 +0000 Subject: [PATCH 1/3] scratch: Try coupling vals and logs --- demo/src/io.py | 10 ++++++++-- demo/tests/test_pandera_validations.py | 14 ++++++++++++++ dynamicio/metrics.py | 23 +++++++++++++++-------- requirements.txt | 1 + 4 files changed, 38 insertions(+), 10 deletions(-) create mode 100644 demo/tests/test_pandera_validations.py diff --git a/demo/src/io.py b/demo/src/io.py index c7e0098..d7e1179 100644 --- a/demo/src/io.py +++ b/demo/src/io.py @@ -2,12 +2,18 @@ # pylint: disable=too-few-public-methods __all__ = ["InputIO", "StagedFoo", "StagedBar"] -from sqlalchemy.ext.declarative import declarative_base +from pandera import SchemaModel, String, Float, Field +from pandera.typing import Series from dynamicio import UnifiedIO, WithLocal, WithPostgres, WithS3File from dynamicio.core import SCHEMA_FROM_FILE, DynamicDataIO -Base = declarative_base() + +class Foo(SchemaModel): + column_a: Series[String] = Field(unique=True, report_duplicates="all", logging={"metrics": ["Counts"], "dataset_name": "Foo", "column": "column_a"}) + column_b: Series[String] = Field(nullable=False, logging={"metrics": ["CountsPerLabel"], "dataset_name": "Foo", "column": "column_a"}) + column_c: Series[Float] = Field(gt=1000) + column_d: Series[Float] = Field(lt=1000, logging={"metrics": ["Min", "Max", "Mean", "Std", "Variance"], "dataset_name": "Foo", "column": "column_a"}) class InputIO(UnifiedIO): diff --git a/demo/tests/test_pandera_validations.py b/demo/tests/test_pandera_validations.py new file mode 100644 index 0000000..2ed5c62 --- /dev/null +++ b/demo/tests/test_pandera_validations.py @@ -0,0 +1,14 @@ +import pandas as pd + +from demo.src.io import Foo + + +def test_pandera_validations(): + + # Given + df = pd.read_csv("/Users/chadjinik/Github/Vortexa/dynamicio/demo/tests/data/input/foo.csv") + + # When + Foo.validate(df) + + # Then \ No newline at end of file diff --git a/dynamicio/metrics.py b/dynamicio/metrics.py index 711a9e4..12c8ad2 100644 --- a/dynamicio/metrics.py +++ b/dynamicio/metrics.py @@ -8,6 +8,7 @@ import pandas as pd # type: ignore from magic_logger import logger +from pandera import extensions from pythonjsonlogger import jsonlogger # type: ignore logHandler = logging.StreamHandler(sys.stdout) @@ -35,6 +36,12 @@ def log_metric(dataset: str, column: str, metric: str, value: float): logger.info(json.dumps({"message": "METRIC", "dataset": dataset, "column": column, "metric": metric, "value": float(value)})) +@extensions.register_check_method(statistics=["metric_options"]) +def logging(df: pd.DataFrame, **metric_options: Mapping[str, Any]): + for metric in metric_options["metrics"]: + get_metric(metric)(metric_options["dataset_name"], df, metric_options["column"])() + return True + class Metric: """A base class for implementing metrics classes.""" @@ -86,7 +93,7 @@ def calculate_metric(self) -> Number: Returns: The minimum value of a column. """ - return self.df[self.column].min() + return self.df.min() class Max(Metric): @@ -98,7 +105,7 @@ def calculate_metric(self) -> Number: Returns: The maximum value of a column. """ - return self.df[self.column].max() + return self.df.max() class Mean(Metric): @@ -110,7 +117,7 @@ def calculate_metric(self) -> Number: Returns: The mean value of a column. """ - return self.df[self.column].mean() + return self.df.mean() class Std(Metric): @@ -122,7 +129,7 @@ def calculate_metric(self) -> Number: Returns: The standard deviation of a column. """ - return self.df[self.column].std() + return self.df.std() class Variance(Metric): @@ -134,7 +141,7 @@ def calculate_metric(self) -> Number: Returns: The variance of a column. """ - return self.df[self.column].var() + return self.df.var() class Counts(Metric): @@ -146,7 +153,7 @@ def calculate_metric(self) -> int: Returns: The length of a column. """ - return len(self.df[self.column]) + return len(self.df) class UniqueCounts(Metric): @@ -158,7 +165,7 @@ def calculate_metric(self) -> int: Returns: The unique values of a column. """ - return len(self.df[self.column].unique()) + return len(self.df.unique()) class CountsPerLabel(Metric): @@ -170,7 +177,7 @@ def calculate_metric(self) -> Mapping: Returns: The counts per label in a categorical column """ - column_vs_metric_value = self.df[self.column].value_counts().to_dict() + column_vs_metric_value = self.df.value_counts().to_dict() label_vs_metric_value_with_column_prefix = {} for key in column_vs_metric_value.keys(): new_key = self.column + "-" + key diff --git a/requirements.txt b/requirements.txt index 0eae907..5c12363 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,6 +6,7 @@ kafka-python~=2.0.2 logzero>=1.7.0 magic-logger>=1.0.2 pandas>=1.2.4 +pandera==0.13.4 psycopg2-binary~=2.9.3 pyarrow>=7.0.0 python-json-logger~=2.0.1 From abe3b2d702eff6924a84fe6bccdf886ae924d382 Mon Sep 17 00:00:00 2001 From: Christos Hadjinikolis Date: Fri, 24 Feb 2023 14:29:15 +0000 Subject: [PATCH 2/3] wip: Demo --- README.md | 81 +++++++-------- .../{input.yaml => staging_input.yaml} | 0 .../{raw.yaml => staging_output.yaml} | 0 .../definitions/transform_input.yaml | 26 +++++ .../{processed.yaml => transform_output.yaml} | 0 demo/src/__init__.py | 19 ++-- demo/src/io.py | 44 --------- demo/src/io/__init__.py | 18 ++++ demo/src/io/schemas.py | 41 ++++++++ demo/src/runners/staging.py | 13 +-- demo/src/runners/transform.py | 12 ++- .../expected/staged_bar.parquet | Bin .../expected/staged_foo.parquet | Bin dynamicio/core.py | 35 +++---- tests/test_core.py | 92 +++++++++--------- tests/test_mixins/test_local_mixins.py | 66 ++++++------- tests/test_mixins/test_mixin_utils.py | 4 +- tests/test_mixins/test_postgres_mixins.py | 24 ++--- tests/test_mixins/test_s3_mixins.py | 60 ++++++------ tests/test_regressions/test_v430.py | 2 +- 20 files changed, 290 insertions(+), 247 deletions(-) rename demo/resources/definitions/{input.yaml => staging_input.yaml} (100%) rename demo/resources/definitions/{raw.yaml => staging_output.yaml} (100%) create mode 100644 demo/resources/definitions/transform_input.yaml rename demo/resources/definitions/{processed.yaml => transform_output.yaml} (100%) delete mode 100644 demo/src/io.py create mode 100644 demo/src/io/__init__.py create mode 100644 demo/src/io/schemas.py rename demo/tests/data/{raw => transform}/expected/staged_bar.parquet (100%) rename demo/tests/data/{raw => transform}/expected/staged_foo.parquet (100%) diff --git a/README.md b/README.md index 1e4efdd..249151c 100644 --- a/README.md +++ b/README.md @@ -965,37 +965,38 @@ logger = logging.getLogger(__name__) def main() -> None: - """The entry point for the Airflow Staging task. + """The entry point for the Airflow Staging task. - Returns: - Void function. - """ - # LOAD DATA - logger.info("Loading data from live sources...") + Returns: + Void function. + """ + # LOAD DATA + logger.info("Loading data from live sources...") - bar_df = InputIO(source_config=input_config.get(source_key="BAR"), apply_schema_validations=True, log_schema_metrics=True).read() - foo_df = InputIO(source_config=input_config.get(source_key="FOO"), apply_schema_validations=True, log_schema_metrics=True).read() + bar_df = InputIO(resource_definition=input_config.get(source_key="BAR"), apply_schema_validations=True, log_schema_metrics=True).read() + foo_df = InputIO(resource_definition=input_config.get(source_key="FOO"), apply_schema_validations=True, log_schema_metrics=True).read() - logger.info("Data successfully loaded from live sources...") + logger.info("Data successfully loaded from live sources...") - # TRANSFORM DATA - logger.info("Apply transformations...") + # TRANSFORM DATA + logger.info("Apply transformations...") - # TODO: Apply your transformations + # TODO: Apply your transformations - logger.info("Transformations applied successfully...") + logger.info("Transformations applied successfully...") - # SINK DATA - logger.info("Begin sinking data to staging area:") - StagedFoo(source_config=raw_config.get(source_key="STAGED_FOO"), **constants.TO_PARQUET_KWARGS).write(foo_df) - StagedBar(source_config=raw_config.get(source_key="STAGED_BAR")).write(bar_df) - logger.info("Data staging is complete...") + # SINK DATA + logger.info("Begin sinking data to staging area:") + StagedFoo(resource_definition=raw_config.get(source_key="STAGED_FOO"), **constants.TO_PARQUET_KWARGS).write(foo_df) + StagedBar(resource_definition=raw_config.get(source_key="STAGED_BAR")).write(bar_df) + logger.info("Data staging is complete...") ``` ### Utilising `asyncio` `Dynamic(i/o)` supports use of `asyncio` to speed up `I/O bound` operations through leveraging multithreading. An example can be found in the second of the two demo tasks, namely, the `transform.py` task. + ```python """Add module docstring....""" import asyncio @@ -1009,35 +1010,35 @@ logger = logging.getLogger(__name__) async def main() -> None: - """The entry point for the Airflow Staging task. + """The entry point for the Airflow Staging task. - Returns: - Void function. - """ - # LOAD DATA - logger.info("Loading data from live sources...") + Returns: + Void function. + """ + # LOAD DATA + logger.info("Loading data from live sources...") - [bar_df, foo_df] = await asyncio.gather( - StagedBar(source_config=raw_config.get(source_key="STAGED_BAR")).async_read(), - StagedFoo(source_config=raw_config.get(source_key="STAGED_FOO")).async_read() - ) + [bar_df, foo_df] = await asyncio.gather( + StagedBar(resource_definition=raw_config.get(source_key="STAGED_BAR")).async_read(), + StagedFoo(resource_definition=raw_config.get(source_key="STAGED_FOO")).async_read() + ) - logger.info("Data successfully loaded from live sources...") + logger.info("Data successfully loaded from live sources...") - # TRANSFORM DATA - logger.info("Apply transformations...") + # TRANSFORM DATA + logger.info("Apply transformations...") - # TODO: Apply your transformations + # TODO: Apply your transformations - logger.info("Transformations applied successfully...") + logger.info("Transformations applied successfully...") - # SINK DATA - logger.info(f"Begin sinking data to staging area: S3:{demo.src.environment.S3_YOUR_OUTPUT_BUCKET}:live/data/raw") - await asyncio.gather( - InputIO(source_config=processed_config.get(source_key="FINAL_FOO"), apply_schema_validations=True, log_schema_metrics=True).async_write(foo_df), - InputIO(source_config=processed_config.get(source_key="FINAL_BAR"), apply_schema_validations=True, log_schema_metrics=True).async_write(bar_df), - ) - logger.info("Data staging is complete...") + # SINK DATA + logger.info(f"Begin sinking data to staging area: S3:{demo.src.environment.S3_YOUR_OUTPUT_BUCKET}:live/data/raw") + await asyncio.gather( + InputIO(resource_definition=processed_config.get(source_key="FINAL_FOO"), apply_schema_validations=True, log_schema_metrics=True).async_write(foo_df), + InputIO(resource_definition=processed_config.get(source_key="FINAL_BAR"), apply_schema_validations=True, log_schema_metrics=True).async_write(bar_df), + ) + logger.info("Data staging is complete...") ``` In short, you simply need to utilise the `async_read()` or the `async_write()` methods instead, plus await and gather your calls. diff --git a/demo/resources/definitions/input.yaml b/demo/resources/definitions/staging_input.yaml similarity index 100% rename from demo/resources/definitions/input.yaml rename to demo/resources/definitions/staging_input.yaml diff --git a/demo/resources/definitions/raw.yaml b/demo/resources/definitions/staging_output.yaml similarity index 100% rename from demo/resources/definitions/raw.yaml rename to demo/resources/definitions/staging_output.yaml diff --git a/demo/resources/definitions/transform_input.yaml b/demo/resources/definitions/transform_input.yaml new file mode 100644 index 0000000..51a680c --- /dev/null +++ b/demo/resources/definitions/transform_input.yaml @@ -0,0 +1,26 @@ +--- +STAGED_FOO: + sample: + type: "local" + local: + file_path: "[[ TEST_RESOURCES ]]/data/raw/staged_foo.parquet" + file_type: "parquet" + actual: + type: "s3" + s3: + bucket: "[[ S3_YOUR_OUTPUT_BUCKET ]]" + file_path: "live/data/raw/staged_foo.parquet" + file_type: "parquet" + +STAGED_BAR: + sample: + type: "local" + local: + file_path: "[[ TEST_RESOURCES ]]/data/raw/staged_bar.parquet" + file_type: "parquet" + actual: + type: "s3" + s3: + bucket: "[[ S3_YOUR_OUTPUT_BUCKET ]]" + file_path: "live/data/raw/staged_bar.parquet" + file_type: "parquet" diff --git a/demo/resources/definitions/processed.yaml b/demo/resources/definitions/transform_output.yaml similarity index 100% rename from demo/resources/definitions/processed.yaml rename to demo/resources/definitions/transform_output.yaml diff --git a/demo/src/__init__.py b/demo/src/__init__.py index babc177..189db6b 100644 --- a/demo/src/__init__.py +++ b/demo/src/__init__.py @@ -1,5 +1,5 @@ """Set config IOs.""" -__all__ = ["input_config", "raw_config", "processed_config"] +__all__ = ["staging_input_config", "staging_output_config", "transform_input_config", "transform_output_config"] import logging import os @@ -12,18 +12,23 @@ logging.getLogger("kafka").setLevel(logging.WARNING) -input_config = IOConfig( - path_to_source_yaml=(os.path.join(RESOURCES, "definitions/input.yaml")), +staging_input_config = IOConfig( + path_to_source_yaml=(os.path.join(RESOURCES, "definitions/staging_input.yaml")), env_identifier=ENVIRONMENT, dynamic_vars=environment, ) -raw_config = IOConfig( - path_to_source_yaml=(os.path.join(RESOURCES, "definitions/raw.yaml")), +staging_output_config = IOConfig( + path_to_source_yaml=(os.path.join(RESOURCES, "definitions/staging_output.yaml")), env_identifier=ENVIRONMENT, dynamic_vars=environment, ) -processed_config = IOConfig( - path_to_source_yaml=(os.path.join(RESOURCES, "definitions/processed.yaml")), +transform_input_config = IOConfig( + path_to_source_yaml=(os.path.join(RESOURCES, "definitions/transform_input.yaml")), + env_identifier=ENVIRONMENT, + dynamic_vars=environment, +) +transform_output_config = IOConfig( + path_to_source_yaml=(os.path.join(RESOURCES, "definitions/transform_output.yaml")), env_identifier=ENVIRONMENT, dynamic_vars=environment, ) diff --git a/demo/src/io.py b/demo/src/io.py deleted file mode 100644 index d7e1179..0000000 --- a/demo/src/io.py +++ /dev/null @@ -1,44 +0,0 @@ -"""Responsible for configuring io operations for input data.""" -# pylint: disable=too-few-public-methods -__all__ = ["InputIO", "StagedFoo", "StagedBar"] - -from pandera import SchemaModel, String, Float, Field -from pandera.typing import Series - -from dynamicio import UnifiedIO, WithLocal, WithPostgres, WithS3File -from dynamicio.core import SCHEMA_FROM_FILE, DynamicDataIO - - -class Foo(SchemaModel): - column_a: Series[String] = Field(unique=True, report_duplicates="all", logging={"metrics": ["Counts"], "dataset_name": "Foo", "column": "column_a"}) - column_b: Series[String] = Field(nullable=False, logging={"metrics": ["CountsPerLabel"], "dataset_name": "Foo", "column": "column_a"}) - column_c: Series[Float] = Field(gt=1000) - column_d: Series[Float] = Field(lt=1000, logging={"metrics": ["Min", "Max", "Mean", "Std", "Variance"], "dataset_name": "Foo", "column": "column_a"}) - - -class InputIO(UnifiedIO): - """UnifiedIO subclass for V6 data.""" - - schema = SCHEMA_FROM_FILE - - -class StagedFoo(WithS3File, WithLocal, DynamicDataIO): - """UnifiedIO subclass for staged foos.""" - - schema = { - "column_a": "object", - "column_b": "object", - "column_c": "int64", - "column_d": "int64", - } - - -class StagedBar(WithLocal, WithPostgres, DynamicDataIO): - """UnifiedIO subclass for cargo movements volumes data.""" - - schema = { - "column_a": "object", - "column_b": "object", - "column_c": "int64", - "column_d": "int64", - } diff --git a/demo/src/io/__init__.py b/demo/src/io/__init__.py new file mode 100644 index 0000000..889072c --- /dev/null +++ b/demo/src/io/__init__.py @@ -0,0 +1,18 @@ +"""Responsible for configuring io operations for input data.""" +# pylint: disable=too-few-public-methods +__all__ = ["InputIO", "StagedFooIO", "StagedBarIO"] + + +from dynamicio import UnifiedIO, WithS3File, WithLocal, DynamicDataIO, WithPostgres + + +class InputIO(UnifiedIO): + """UnifiedIO subclass for V6 data.""" + + +class StagedFooIO(WithS3File, WithLocal, DynamicDataIO): + """UnifiedIO subclass for staged foos.""" + + +class StagedBarIO(WithLocal, WithPostgres, DynamicDataIO): + """UnifiedIO subclass for cargo movements volumes data.""" diff --git a/demo/src/io/schemas.py b/demo/src/io/schemas.py new file mode 100644 index 0000000..a6bc6b7 --- /dev/null +++ b/demo/src/io/schemas.py @@ -0,0 +1,41 @@ +__all__ = ["Foo", "Bar", "StagedFoo", "StagedBar", "FinalFoo", "FinalBar"] + +from pandera import SchemaModel, String, Float, Field +from pandera.typing import Series + + +class Foo(SchemaModel): + column_a: Series[String] = Field(unique=True, report_duplicates="all", logging={"metrics": ["Counts"], "dataset_name": "Foo", "column": "column_a"}) + column_b: Series[String] = Field(nullable=False, logging={"metrics": ["CountsPerLabel"], "dataset_name": "Foo", "column": "column_a"}) + column_c: Series[Float] = Field(gt=1000) + column_d: Series[Float] = Field(lt=1000, logging={"metrics": ["Min", "Max", "Mean", "Std", "Variance"], "dataset_name": "Foo", "column": "column_a"}) + +class Bar(SchemaModel): + column_a: Series[String] = Field(unique=True, report_duplicates="all", logging={"metrics": ["Counts"], "dataset_name": "Foo", "column": "column_a"}) + column_b: Series[String] = Field(nullable=False, logging={"metrics": ["CountsPerLabel"], "dataset_name": "Foo", "column": "column_a"}) + column_c: Series[Float] = Field(gt=1000) + column_d: Series[Float] = Field(lt=1000, logging={"metrics": ["Min", "Max", "Mean", "Std", "Variance"], "dataset_name": "Foo", "column": "column_a"}) + +class StagedFoo(SchemaModel): + column_a: Series[String] = Field(unique=True, report_duplicates="all", logging={"metrics": ["Counts"], "dataset_name": "Foo", "column": "column_a"}) + column_b: Series[String] = Field(nullable=False, logging={"metrics": ["CountsPerLabel"], "dataset_name": "Foo", "column": "column_a"}) + column_c: Series[Float] = Field(gt=1000) + column_d: Series[Float] = Field(lt=1000, logging={"metrics": ["Min", "Max", "Mean", "Std", "Variance"], "dataset_name": "Foo", "column": "column_a"}) + +class StagedBar(SchemaModel): + column_a: Series[String] = Field(unique=True, report_duplicates="all", logging={"metrics": ["Counts"], "dataset_name": "Foo", "column": "column_a"}) + column_b: Series[String] = Field(nullable=False, logging={"metrics": ["CountsPerLabel"], "dataset_name": "Foo", "column": "column_a"}) + column_c: Series[Float] = Field(gt=1000) + column_d: Series[Float] = Field(lt=1000, logging={"metrics": ["Min", "Max", "Mean", "Std", "Variance"], "dataset_name": "Foo", "column": "column_a"}) + +class FinalFoo(SchemaModel): + column_a: Series[String] = Field(unique=True, report_duplicates="all", logging={"metrics": ["Counts"], "dataset_name": "Foo", "column": "column_a"}) + column_b: Series[String] = Field(nullable=False, logging={"metrics": ["CountsPerLabel"], "dataset_name": "Foo", "column": "column_a"}) + column_c: Series[Float] = Field(gt=1000) + column_d: Series[Float] = Field(lt=1000, logging={"metrics": ["Min", "Max", "Mean", "Std", "Variance"], "dataset_name": "Foo", "column": "column_a"}) + +class FinalBar(SchemaModel): + column_a: Series[String] = Field(unique=True, report_duplicates="all", logging={"metrics": ["Counts"], "dataset_name": "Foo", "column": "column_a"}) + column_b: Series[String] = Field(nullable=False, logging={"metrics": ["CountsPerLabel"], "dataset_name": "Foo", "column": "column_a"}) + column_c: Series[Float] = Field(gt=1000) + column_d: Series[Float] = Field(lt=1000, logging={"metrics": ["Min", "Max", "Mean", "Std", "Variance"], "dataset_name": "Foo", "column": "column_a"}) \ No newline at end of file diff --git a/demo/src/runners/staging.py b/demo/src/runners/staging.py index 1327866..7aa48cf 100644 --- a/demo/src/runners/staging.py +++ b/demo/src/runners/staging.py @@ -1,8 +1,9 @@ """Add module docstring....""" import logging -from demo.src import constants, input_config, raw_config -from demo.src.io import InputIO, StagedBar, StagedFoo +from demo.src import constants, staging_input_config, staging_output_config +from demo.src.io import InputIO, StagedFooIO, StagedBarIO +from demo.src.io.schemas import Bar, Foo logger = logging.getLogger(__name__) @@ -16,8 +17,8 @@ def main() -> None: # LOAD DATA logger.info("Loading data from live sources...") - bar_df = InputIO(source_config=input_config.get(source_key="BAR"), apply_schema_validations=True, log_schema_metrics=True).read() - foo_df = InputIO(source_config=input_config.get(source_key="FOO"), apply_schema_validations=True, log_schema_metrics=True).read() + bar_df = InputIO(resource_definition=staging_input_config.get(source_key="BAR"), schema=Bar, apply_schema_validations=True, log_schema_metrics=True).read() + foo_df = InputIO(resource_definition=staging_input_config.get(source_key="FOO"), scheam=Foo, apply_schema_validations=True, log_schema_metrics=True).read() logger.info("Data successfully loaded from live sources...") @@ -30,6 +31,6 @@ def main() -> None: # SINK DATA logger.info("Begin sinking data to staging area:") - StagedFoo(source_config=raw_config.get(source_key="STAGED_FOO"), **constants.TO_PARQUET_KWARGS).write(foo_df) - StagedBar(source_config=raw_config.get(source_key="STAGED_BAR")).write(bar_df) + StagedFooIO(resource_definition=staging_output_config.get(source_key="STAGED_FOO"), **constants.TO_PARQUET_KWARGS).write(foo_df) + StagedBarIO(resource_definition=staging_output_config.get(source_key="STAGED_BAR")).write(bar_df) logger.info("Data staging is complete...") diff --git a/demo/src/runners/transform.py b/demo/src/runners/transform.py index 6d9d85a..82d257b 100644 --- a/demo/src/runners/transform.py +++ b/demo/src/runners/transform.py @@ -3,8 +3,9 @@ import logging import demo.src.environment -from demo.src import processed_config, raw_config -from demo.src.io import InputIO, StagedBar, StagedFoo +from demo.src import transform_input_config, transform_output_config +from demo.src.io import InputIO, StagedFooIO, StagedBarIO +from demo.src.io.schemas import FinalFoo, FinalBar, StagedBar, StagedFoo logger = logging.getLogger(__name__) @@ -19,7 +20,8 @@ async def main() -> None: logger.info("Loading data from live sources...") [bar_df, foo_df] = await asyncio.gather( - StagedBar(source_config=raw_config.get(source_key="STAGED_BAR")).async_read(), StagedFoo(source_config=raw_config.get(source_key="STAGED_FOO")).async_read() + StagedBarIO(resource_definition=transform_input_config.get(source_key="STAGED_BAR"), schema=StagedBar).async_read(), + StagedFooIO(resource_definition=transform_input_config.get(source_key="STAGED_FOO"), schema=StagedFoo).async_read() ) logger.info("Data successfully loaded from live sources...") @@ -34,7 +36,7 @@ async def main() -> None: # SINK DATA logger.info(f"Begin sinking data to staging area: S3:{demo.src.environment.S3_YOUR_OUTPUT_BUCKET}:live/data/raw") await asyncio.gather( - InputIO(source_config=processed_config.get(source_key="FINAL_FOO"), apply_schema_validations=True, log_schema_metrics=True).async_write(foo_df), - InputIO(source_config=processed_config.get(source_key="FINAL_BAR"), apply_schema_validations=True, log_schema_metrics=True).async_write(bar_df), + InputIO(resource_definition=transform_output_config.get(source_key="FINAL_FOO"), schema=FinalFoo, apply_schema_validations=True, log_schema_metrics=True).async_write(foo_df), + InputIO(resource_definition=transform_output_config.get(source_key="FINAL_BAR"), schema=FinalBar, apply_schema_validations=True, log_schema_metrics=True).async_write(bar_df), ) logger.info("Data staging is complete...") diff --git a/demo/tests/data/raw/expected/staged_bar.parquet b/demo/tests/data/transform/expected/staged_bar.parquet similarity index 100% rename from demo/tests/data/raw/expected/staged_bar.parquet rename to demo/tests/data/transform/expected/staged_bar.parquet diff --git a/demo/tests/data/raw/expected/staged_foo.parquet b/demo/tests/data/transform/expected/staged_foo.parquet similarity index 100% rename from demo/tests/data/raw/expected/staged_foo.parquet rename to demo/tests/data/transform/expected/staged_foo.parquet diff --git a/dynamicio/core.py b/dynamicio/core.py index 0bee13f..d7e0bf0 100644 --- a/dynamicio/core.py +++ b/dynamicio/core.py @@ -11,6 +11,7 @@ import pandas as pd # type: ignore import pydantic from magic_logger import logger +from pandera import SchemaModel from dynamicio import validations from dynamicio.config.pydantic import DataframeSchema, IOEnvironment @@ -32,19 +33,19 @@ class DynamicDataIO: >>> ) >>> >>> class IO(WithS3File, WithLocal, DynamicDataIO): - >>> schema = S >>> >>> my_dataset_local_mapping = input_config.get(source_key="MY_DATASET") >>> my_dataset_io = IO(my_dataset_local_mapping) >>> my_dataset_df = my_dataset_io.read() """ - schema: DataframeSchema - sources_config: IOEnvironment + schema: SchemaModel + resource_definition: IOEnvironment def __init__( self, - source_config: IOEnvironment, + resource_definition: IOEnvironment, + schema: SchemaModel, apply_schema_validations: bool = False, log_schema_metrics: bool = False, show_casting_warnings: bool = False, @@ -53,7 +54,8 @@ def __init__( """Class constructor. Args: - source_config: Configuration to use when reading/writing data from/to a source + resource_definition: Configuration to use when reading/writing data from/to a source + schema: A pandera schema model (includes validations and metrics logging) apply_schema_validations: Applies schema validations on either read() or write() log_schema_metrics: Logs schema metrics on either read() or write() show_casting_warnings: Logs casting warnings on either read() or write() if set to True @@ -62,24 +64,15 @@ def __init__( if type(self) is DynamicDataIO: # pylint: disable=unidiomatic-typecheck raise TypeError("Abstract class DynamicDataIO cannot be used to instantiate an object...") - self.sources_config = source_config + self.resource_definition = resource_definition self.name = self._transform_class_name_to_dataset_name(self.__class__.__name__) + self.schema = schema self.apply_schema_validations = apply_schema_validations self.log_schema_metrics = log_schema_metrics self.show_casting_warnings = show_casting_warnings - self.options = self._get_options(options, source_config.options) - source_name = self.sources_config.data_backend_type - if self.schema is SCHEMA_FROM_FILE: - active_schema = self.sources_config.dynamicio_schema - else: - active_schema = self._schema_from_obj(self) - - if not active_schema: - raise SchemaNotFoundError() - - assert isinstance(active_schema, DataframeSchema) - self.schema = active_schema - self.name = self.schema.name.upper() + self.options = self._get_options(options, resource_definition.options) + source_name = self.resource_definition.data_backend_type + self.name = self.schema.__class__.__name__.upper() self.schema_validations = self.schema.validations self.schema_metrics = self.schema.metrics @@ -146,7 +139,7 @@ def read(self) -> pd.DataFrame: Returns: A pandas dataframe or an iterable. """ - source_name = self.sources_config.data_backend_type + source_name = self.resource_definition.data_backend_type df = getattr(self, f"_read_from_{source_name}")() df = self._apply_schema(df) @@ -172,7 +165,7 @@ def write(self, df: pd.DataFrame): Args: df: The data to be written """ - source_name = self.sources_config.data_backend_type + source_name = self.resource_definition.data_backend_type if set(df.columns) != self.schema.column_names: # pylint: disable=E1101 columns = [column for column in df.columns.to_list() if column in self.schema.column_names] df = df[columns] diff --git a/tests/test_core.py b/tests/test_core.py index 9118bcc..d169843 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -53,7 +53,7 @@ def test_abstract_class_dynamic_data_io_cant_be_used_for_object_instantiation(se # When/Then with pytest.raises(TypeError): - DynamicDataIO(source_config=s3_csv_local_config) + DynamicDataIO(resource_definition=s3_csv_local_config) @pytest.mark.unit def test_objects_of_dynamic_data_io_subclasses_cant_be_instantiated_in_the_absence_of_a_non_empty_schema( @@ -72,7 +72,7 @@ def test_objects_of_dynamic_data_io_subclasses_cant_be_instantiated_in_the_absen class AbsentSchemaIO(DynamicDataIO): pass - AbsentSchemaIO(source_config=s3_csv_local_config) + AbsentSchemaIO(resource_definition=s3_csv_local_config) @pytest.mark.unit def test_objects_of_s3io_subclasses_cant_be_instantiated_in_the_presence_of_a_empty_dict_schema( @@ -92,7 +92,7 @@ class EmptySchemaIO(WithS3File, DynamicDataIO): dataset_name = "EmptySchema" schema = {} - EmptySchemaIO(source_config=s3_csv_local_config) + EmptySchemaIO(resource_definition=s3_csv_local_config) @pytest.mark.unit def test_objects_of_dynamic_data_io_subclasses_cant_be_instantiated_in_the_presence_of_a_schema_eq_to_none( @@ -112,7 +112,7 @@ class NoneSchemaIO(WithS3File, DynamicDataIO): dataset_name = "NoneSchema" schema = None - NoneSchemaIO(source_config=s3_csv_local_config) + NoneSchemaIO(resource_definition=s3_csv_local_config) @pytest.mark.unit def test_dynamic_data_io_object_instantiation_is_only_possible_for_subclasses(self): @@ -124,7 +124,7 @@ def test_dynamic_data_io_object_instantiation_is_only_possible_for_subclasses(se ).get(source_key="READ_FROM_S3_CSV") # When - s3_csv_io = ReadS3CsvIO(source_config=s3_csv_local_config) + s3_csv_io = ReadS3CsvIO(resource_definition=s3_csv_local_config) # Then assert isinstance(s3_csv_io, ReadS3CsvIO) and isinstance(s3_csv_io, DynamicDataIO) @@ -152,7 +152,7 @@ def test_subclasses_of_dynamic_data_io_need_to_define_a_static_validate_function class CMVolumesIONoValidationFunction(DynamicDataIO): schema = {"foo": "int64"} - CMVolumesIONoValidationFunction(source_config=s3_csv_local_config) + CMVolumesIONoValidationFunction(resource_definition=s3_csv_local_config) @pytest.mark.unit def test_subclasses_of_dynamic_data_io_need_to_implement_private_reader_for_new_source_types( @@ -167,7 +167,7 @@ def test_subclasses_of_dynamic_data_io_need_to_implement_private_reader_for_new_ # When with pytest.raises(AssertionError): - ReadS3IO(source_config=athena_cloud_config) + ReadS3IO(resource_definition=athena_cloud_config) @pytest.mark.unit def test_key_error_is_thrown_for_missing_schema_if_unified_io_subclass_assigns_schema_from_file_but_file_is_missing( @@ -182,7 +182,7 @@ def test_key_error_is_thrown_for_missing_schema_if_unified_io_subclass_assigns_s # When with pytest.raises(SchemaNotFoundError): - ReadMockS3CsvIO(source_config=read_mock_s3_cloud_config) + ReadMockS3CsvIO(resource_definition=read_mock_s3_cloud_config) @pytest.mark.integration def test_schema_validations_are_applied_for_an_io_class_with_a_schema_definition(self, valid_dataframe): @@ -193,7 +193,7 @@ def test_schema_validations_are_applied_for_an_io_class_with_a_schema_definition env_identifier="CLOUD", dynamic_vars=constants, ).get(source_key="READ_FROM_S3_CSV") - io_instance = ReadS3CsvIO(source_config=s3_csv_cloud_config) + io_instance = ReadS3CsvIO(resource_definition=s3_csv_cloud_config) # When return_value = io_instance.validate_from_schema(df) @@ -210,7 +210,7 @@ def test_log_metrics_from_schema_are_applied_for_an_io_class_with_a_schema_defin env_identifier="CLOUD", dynamic_vars=constants, ).get(source_key="READ_FROM_S3_CSV") - io_instance = ReadS3CsvIO(source_config=s3_csv_cloud_config) + io_instance = ReadS3CsvIO(resource_definition=s3_csv_cloud_config) # When with caplog.at_level(logging.INFO): @@ -245,7 +245,7 @@ def test_schema_validations_errors_are_thrown_for_each_validation_if_df_does_not # When with pytest.raises(SchemaValidationError): - ReadS3CsvIO(source_config=s3_csv_cloud_config).validate_from_schema(df) + ReadS3CsvIO(resource_definition=s3_csv_cloud_config).validate_from_schema(df) @pytest.mark.integration def test_schema_validations_exception_message_is_a_dict_with_all_violated_validations(self, invalid_dataframe, expected_messages): @@ -259,7 +259,7 @@ def test_schema_validations_exception_message_is_a_dict_with_all_violated_valida # When try: - ReadS3CsvIO(source_config=s3_csv_cloud_config).validate_from_schema(df) + ReadS3CsvIO(resource_definition=s3_csv_cloud_config).validate_from_schema(df) except SchemaValidationError as _exception: # Then assert _exception.message.keys() == expected_messages # pylint: disable=no-member @@ -286,7 +286,7 @@ def test_local_writers_only_write_out_castable_columns_according_to_the_io_schem # 'end_odometer': 'int64', # 'foo_name': 'object', # } - write_s3_io = WriteS3ParquetExternalIO(source_config=s3_parquet_local_config) + write_s3_io = WriteS3ParquetExternalIO(resource_definition=s3_parquet_local_config) write_s3_io.write(input_df) # # Then @@ -314,7 +314,7 @@ def test_schema_validations_are_not_applied_on_read_if_validate_flag_is_false(se # When # ReadS3CsvIO(source_config=s3_csv_cloud_config, apply_schema_validations=False).read() - ReadS3CsvIO(source_config=s3_csv_local_config).read() # False is the default value + ReadS3CsvIO(resource_definition=s3_csv_local_config).read() # False is the default value # Then mock_validate_from_schema.assert_not_called() @@ -330,7 +330,7 @@ def test_schema_validations_are_automatically_applied_on_read_if_validate_flag_i ).get(source_key="READ_FROM_S3_CSV") # When - ReadS3CsvIO(source_config=s3_csv_local_config, apply_schema_validations=True).read() + ReadS3CsvIO(resource_definition=s3_csv_local_config, apply_schema_validations=True).read() # Then mock_validate_from_schema.assert_called() @@ -347,7 +347,7 @@ def test_schema_validations_are_automatically_applied_on_write_if_validate_flag_ ).get(source_key="WRITE_TO_S3_CSV") # When - WriteS3CsvWithSchema(source_config=s3_csv_local_config, apply_schema_validations=True).write(df) + WriteS3CsvWithSchema(resource_definition=s3_csv_local_config, apply_schema_validations=True).write(df) # Then try: @@ -368,7 +368,7 @@ def test_schema_validations_are_not_applied_on_write_if_validate_flag_is_false(s # When # WriteS3CsvWithSchema(source_config=s3_csv_cloud_config, apply_schema_validations=False).write(df) - WriteS3CsvWithSchema(source_config=s3_csv_local_config).write(df) # False is the default value + WriteS3CsvWithSchema(resource_definition=s3_csv_local_config).write(df) # False is the default value # Then try: @@ -388,7 +388,7 @@ def test_schema_metrics_are_not_logged_on_read_if_metrics_flag_is_false(self, mo # When # ReadS3CsvIO(source_config=s3_csv_cloud_config, log_schema_metrics=False).read() - ReadS3CsvIO(source_config=s3_csv_local_config).read() # False is the default value + ReadS3CsvIO(resource_definition=s3_csv_local_config).read() # False is the default value # Then mock_log_metrics_from_schema.assert_not_called() @@ -404,7 +404,7 @@ def test_schema_metrics_are_automatically_logged_on_read_if_validate_flag_is_tru ).get(source_key="READ_FROM_S3_CSV") # When - ReadS3CsvIO(source_config=s3_csv_local_config, log_schema_metrics=True).read() + ReadS3CsvIO(resource_definition=s3_csv_local_config, log_schema_metrics=True).read() # Then mock_log_metrics_from_schema.assert_called() @@ -421,7 +421,7 @@ def test_schema_metrics_are_automatically_logged_on_write_if_metrics_flag_is_tru ).get(source_key="WRITE_TO_S3_CSV") # When - WriteS3CsvWithSchema(source_config=s3_csv_local_config, log_schema_metrics=True).write(df) + WriteS3CsvWithSchema(resource_definition=s3_csv_local_config, log_schema_metrics=True).write(df) # Then try: @@ -442,7 +442,7 @@ def test_schema_metrics_are_not_logged_on_write_if_metrics_flag_is_false(self, m # When # WriteS3CsvWithSchema(source_config=s3_csv_cloud_config, log_schema_metrics=False).write(df) - WriteS3CsvWithSchema(source_config=s3_csv_local_config).write(df) # False is the default value + WriteS3CsvWithSchema(resource_definition=s3_csv_local_config).write(df) # False is the default value # Then try: @@ -491,7 +491,7 @@ def test__has_valid_dtypes_does_not_attempt_to_convert_object_type_to_other_type dynamic_vars=constants, ).get(source_key="S3_PARQUET_WITH_BOOL") - ParquetWithSomeBool(source_config=s3_parquet_with_some_bool_col_local_config).write(df) + ParquetWithSomeBool(resource_definition=s3_parquet_with_some_bool_col_local_config).write(df) # Then try: @@ -553,7 +553,7 @@ def test__has_valid_dtypes_does_not_attempt_to_convert_object_type_to_other_type dynamic_vars=constants, ).get(source_key="S3_CSV_WITH_BOOL") - CsvWithSomeBool(source_config=s3_csv_with_some_bool_col_local_config).write(df) + CsvWithSomeBool(resource_definition=s3_csv_with_some_bool_col_local_config).write(df) # Then try: @@ -615,7 +615,7 @@ def test__has_valid_dtypes_does_not_attempt_to_convert_object_type_to_other_type dynamic_vars=constants, ).get(source_key="S3_HDF_WITH_BOOL") - HdfWithSomeBool(source_config=s3_hdf_with_some_bool_col_local_config).write(df) + HdfWithSomeBool(resource_definition=s3_hdf_with_some_bool_col_local_config).write(df) # Then try: @@ -680,7 +680,7 @@ def test__has_valid_dtypes_does_not_attempt_to_convert_object_type_to_other_type dynamic_vars=constants, ).get(source_key="S3_JSON_WITH_BOOL") - JsonWithSomeBool(source_config=s3_json_with_some_bool_col_local_config).write(df) + JsonWithSomeBool(resource_definition=s3_json_with_some_bool_col_local_config).write(df) # Then try: @@ -719,7 +719,7 @@ def test__has_valid_dtypes_throws_columns_data_type_error_when_casting_fails(sel # Then with pytest.raises(ColumnsDataTypeError): - ParquetWithSomeBool(source_config=s3_parquet_with_some_bool_col_local_config).write(df) + ParquetWithSomeBool(resource_definition=s3_parquet_with_some_bool_col_local_config).write(df) @pytest.mark.unit def test_a_custom_validate_method_can_be_used_to_override_the_default_abstract_one(self): @@ -733,7 +733,7 @@ def test_a_custom_validate_method_can_be_used_to_override_the_default_abstract_o ).get(source_key="S3_PARQUET_WITH_CUSTOM_VALIDATE") # When - ParquetWithCustomValidate(source_config=s3_parquet_with_some_bool_col_local_config).write(df) + ParquetWithCustomValidate(resource_definition=s3_parquet_with_some_bool_col_local_config).write(df) # Then try: @@ -749,7 +749,7 @@ def test_show_casting_warnings_flag_default_value_prevents_showing_casting_logs( env_identifier="LOCAL", dynamic_vars=constants, ).get(source_key="READ_FROM_S3_CSV") - io_instance = ReadS3DataWithFalseTypes(source_config=s3_csv_cloud_config) # i.e.show_casting_warnings=False + io_instance = ReadS3DataWithFalseTypes(resource_definition=s3_csv_cloud_config) # i.e.show_casting_warnings=False # When with caplog.at_level(logging.INFO): @@ -766,7 +766,7 @@ def test_show_casting_warnings_flag_allows_casting_logs_to_be_printed_if_set_to_ env_identifier="LOCAL", dynamic_vars=constants, ).get(source_key="READ_FROM_S3_CSV") - io_instance = ReadS3DataWithFalseTypes(source_config=s3_csv_cloud_config, show_casting_warnings=True) + io_instance = ReadS3DataWithFalseTypes(resource_definition=s3_csv_cloud_config, show_casting_warnings=True) # When with caplog.at_level(logging.INFO): @@ -786,7 +786,7 @@ def test_options_are_read_from_code(self): ).get(source_key="S3_PARQUET_WITH_OPTIONS_IN_CODE") # When - config_io = ReadS3ParquetIO(source_config=s3_parquet_local_config, option_1=False, option_2=True) + config_io = ReadS3ParquetIO(resource_definition=s3_parquet_local_config, option_1=False, option_2=True) # Then assert config_io.options == {"option_1": False, "option_2": True} @@ -801,7 +801,7 @@ def test_options_are_read_from_resource_definition(self): ).get(source_key="S3_PARQUET_WITH_OPTIONS_IN_DEFINITION") # When - config_io = ReadS3ParquetIO(source_config=s3_parquet_local_config) + config_io = ReadS3ParquetIO(resource_definition=s3_parquet_local_config) # Then assert config_io.options == {"option_3": False, "option_4": True} @@ -816,7 +816,7 @@ def test_options_are_that_are_read_from_both_resource_definition_and_code_but_wi ).get(source_key="S3_PARQUET_WITH_OPTIONS_IN_DEFINITION") # When - config_io = ReadS3ParquetIO(source_config=s3_parquet_local_config, option_1=False, option_2=True) + config_io = ReadS3ParquetIO(resource_definition=s3_parquet_local_config, option_1=False, option_2=True) # Then assert config_io.options == {"option_1": False, "option_2": True, "option_3": False, "option_4": True} @@ -831,7 +831,7 @@ def test_options_from_code_are_prioritized(self): ).get(source_key="S3_PARQUET_WITH_OPTIONS_IN_DEFINITION") # When - config_io = ReadS3ParquetIO(source_config=s3_parquet_local_config, option_1=False, option_2=True, option_3=True) # option_3 is conflicting + config_io = ReadS3ParquetIO(resource_definition=s3_parquet_local_config, option_1=False, option_2=True, option_3=True) # option_3 is conflicting # Then assert config_io.options == {"option_1": False, "option_2": True, "option_3": True, "option_4": True} @@ -864,7 +864,7 @@ def test_no_options_at_all_are_provided_with_no_issues(self): ).get(source_key="S3_PARQUET_WITH_OPTIONS_IN_CODE") # When - config_io = ReadS3ParquetIO(source_config=s3_parquet_local_config) + config_io = ReadS3ParquetIO(resource_definition=s3_parquet_local_config) # Then assert config_io.options == {} @@ -880,7 +880,7 @@ def test_dataset_name_is_defined_by_io_class_if_schema_from_file_is_not_provided ).get(source_key="READ_FROM_S3_PARQUET") # When - config_io = ReadS3ParquetIO(source_config=s3_parquet_local_config) + config_io = ReadS3ParquetIO(resource_definition=s3_parquet_local_config) # Then assert config_io.name == "READ_S3_PARQUET_IO" @@ -896,7 +896,7 @@ def test_dataset_name_is_inferred_from_schema_if_schema_from_file_is_provided(se ).get(source_key="READ_FROM_S3_CSV") # When - config_io = ReadS3CsvIO(source_config=s3_read_from_csv_config) + config_io = ReadS3CsvIO(resource_definition=s3_read_from_csv_config) # Then assert config_io.name == "READ_FROM_S3_CSV" @@ -915,7 +915,7 @@ def test_read_is_called_through_async_read(self): # When with patch.object(dynamicio.core.DynamicDataIO, "read") as mock_read: mock_read.return_value = pd.DataFrame.from_records([[1, "name_a"]], columns=["id", "foo_name"]) - asyncio.run(ReadS3CsvIO(source_config=s3_csv_local_config).async_read()) + asyncio.run(ReadS3CsvIO(resource_definition=s3_csv_local_config).async_read()) # Then mock_read.assert_called() @@ -934,7 +934,7 @@ async def test_write_is_called_through_async_write(self): # When with patch.object(dynamicio.core.DynamicDataIO, "write") as mock_write: - await asyncio.gather(WriteS3CsvIO(source_config=s3_csv_local_config).async_write(df)) + await asyncio.gather(WriteS3CsvIO(resource_definition=s3_csv_local_config).async_write(df)) # Then mock_write.assert_called() @@ -954,10 +954,10 @@ def dummy_read(self) -> pd.DataFrame: # pylint: disable=unused-argument async def multi_read(config: Mapping[str, str]) -> Tuple: return await asyncio.gather( - ReadS3CsvIO(source_config=config).async_read(), - ReadS3CsvIO(source_config=config).async_read(), - ReadS3CsvIO(source_config=config).async_read(), - ReadS3CsvIO(source_config=config).async_read(), + ReadS3CsvIO(resource_definition=config).async_read(), + ReadS3CsvIO(resource_definition=config).async_read(), + ReadS3CsvIO(resource_definition=config).async_read(), + ReadS3CsvIO(resource_definition=config).async_read(), ) # When @@ -986,10 +986,10 @@ def dummy_write(self, _df: pd.DataFrame) -> bool: # pylint: disable=unused-argu async def multi_write(config: Mapping[str, str], _df: pd.DataFrame) -> Tuple: return await asyncio.gather( - WriteS3CsvIO(source_config=config).async_write(_df), - WriteS3CsvIO(source_config=config).async_write(_df), - WriteS3CsvIO(source_config=config).async_write(_df), - WriteS3CsvIO(source_config=config).async_write(_df), + WriteS3CsvIO(resource_definition=config).async_write(_df), + WriteS3CsvIO(resource_definition=config).async_write(_df), + WriteS3CsvIO(resource_definition=config).async_write(_df), + WriteS3CsvIO(resource_definition=config).async_write(_df), ) # When diff --git a/tests/test_mixins/test_local_mixins.py b/tests/test_mixins/test_local_mixins.py index 39732b6..cc92f6a 100644 --- a/tests/test_mixins/test_local_mixins.py +++ b/tests/test_mixins/test_local_mixins.py @@ -49,7 +49,7 @@ def test_read_parquet_pandas_reader_will_only_load_columns_in_schema(self, expec ).get(source_key="READ_FROM_S3_PARQUET") # When - s3_parquet_df = ReadS3DataWithLessColumnsIO(source_config=s3_parquet_local_config).read() + s3_parquet_df = ReadS3DataWithLessColumnsIO(resource_definition=s3_parquet_local_config).read() # Then assert expected_df_with_less_columns.equals(s3_parquet_df) @@ -67,7 +67,7 @@ def test_read_json_pandas_reader_will_maintain_columns_order_of_the_original_dat ).get(source_key="READ_FROM_S3_JSON") # When - s3_json_df = ReadS3DataWithLessColumnsAndMessedOrderOfColumnsIO(source_config=s3_json_local_config).read() + s3_json_df = ReadS3DataWithLessColumnsAndMessedOrderOfColumnsIO(resource_definition=s3_json_local_config).read() # Then assert s3_json_df.columns.to_list() == ["foo_name", "bar", "bar_type", "a_number", "b_number"] @@ -85,7 +85,7 @@ def test_read_hdf_pandas_reader_will_maintain_columns_order_of_the_original_data ).get(source_key="READ_FROM_S3_HDF") # When - s3_hdf_df = ReadS3DataWithLessColumnsAndMessedOrderOfColumnsIO(source_config=s3_hdf_local_config).read() + s3_hdf_df = ReadS3DataWithLessColumnsAndMessedOrderOfColumnsIO(resource_definition=s3_hdf_local_config).read() # Then assert s3_hdf_df.columns.to_list() == ["foo_name", "bar", "bar_type", "a_number", "b_number"] @@ -101,7 +101,7 @@ def test_read_csv_pandas_reader_will_only_load_columns_in_schema(self, expected_ ).get(source_key="READ_FROM_S3_CSV_ALT") # When - s3_csv_df = ReadS3DataWithLessColumnsIO(source_config=s3_csv_local_config).read() + s3_csv_df = ReadS3DataWithLessColumnsIO(resource_definition=s3_csv_local_config).read() # Then assert expected_df_with_less_columns.equals(s3_csv_df) @@ -117,7 +117,7 @@ def test_read_h5_pandas_reader_will_only_load_columns_in_schema(self, expected_d ).get(source_key="READ_FROM_S3_HDF") # When - s3_hdf_df = ReadS3DataWithLessColumnsIO(source_config=s3_parquet_local_config).read() + s3_hdf_df = ReadS3DataWithLessColumnsIO(resource_definition=s3_parquet_local_config).read() # Then assert expected_df_with_less_columns.equals(s3_hdf_df) @@ -133,7 +133,7 @@ def test_read_json_pandas_reader_will_only_load_columns_in_schema(self, expected ).get(source_key="READ_FROM_S3_JSON") # When - s3_json_df = ReadS3DataWithLessColumnsIO(source_config=s3_json_local_config).read() + s3_json_df = ReadS3DataWithLessColumnsIO(resource_definition=s3_json_local_config).read() # Then assert expected_df_with_less_columns.equals(s3_json_df) @@ -148,7 +148,7 @@ def test_read_json_pandas_reader_will_only_filter_out_columns_not_in_schema(self ).get(source_key="READ_FROM_S3_JSON") # When - s3_json_df = ReadS3DataWithLessColumnsIO(source_config=s3_json_local_config).read() + s3_json_df = ReadS3DataWithLessColumnsIO(resource_definition=s3_json_local_config).read() # Then assert expected_df_with_less_columns.equals(s3_json_df) @@ -163,7 +163,7 @@ def test_read_hdf_pandas_reader_will_only_filter_out_columns_not_in_schema(self, ).get(source_key="READ_FROM_S3_HDF") # When - s3_hdf_df = ReadS3DataWithLessColumnsIO(source_config=s3_hdf_local_config).read() + s3_hdf_df = ReadS3DataWithLessColumnsIO(resource_definition=s3_hdf_local_config).read() # Then assert expected_df_with_less_columns.equals(s3_hdf_df) @@ -180,7 +180,7 @@ def test_local_reader_is_called_for_loading_any_file_when_env_is_set_to_local(se mock__read_from_local.return_value = expected_s3_csv_df # When - ReadS3CsvIO(source_config=s3_csv_local_config).read() + ReadS3CsvIO(resource_definition=s3_csv_local_config).read() # Then mock__read_from_local.assert_called() @@ -195,7 +195,7 @@ def test_a_local_parquet_file_is_loaded_when_io_config_is_initialised_with_local ).get(source_key="READ_FROM_POSTGRES") # When - pg_parquet_df = ReadPostgresIO(source_config=pg_parquet_local_config).read() + pg_parquet_df = ReadPostgresIO(resource_definition=pg_parquet_local_config).read() # Then assert test_df.equals(pg_parquet_df) @@ -210,7 +210,7 @@ def test_a_local_h5_file_is_loaded_when_io_config_is_initialised_with_local_env_ ).get(source_key="READ_FROM_S3_HDF") # When - s3_hdf_df = ReadS3HdfIO(source_config=s3_hdf_local_config).read() + s3_hdf_df = ReadS3HdfIO(resource_definition=s3_hdf_local_config).read() # Then assert expected_s3_hdf_df.equals(s3_hdf_df) @@ -226,7 +226,7 @@ def test_a_local_json_file_is_loaded_when_io_config_is_initialised_with_local_en # When options = {"orient": "columns"} - s3_json_df = ReadS3JsonIO(source_config=s3_json_local_config, **options).read() + s3_json_df = ReadS3JsonIO(resource_definition=s3_json_local_config, **options).read() # Then assert expected_s3_json_df.equals(s3_json_df) @@ -241,7 +241,7 @@ def test_a_local_csv_file_is_loaded_when_io_config_is_initialised_with_local_env ).get(source_key="READ_FROM_S3_CSV") # When - s3_csv_df = ReadS3CsvIO(source_config=s3_csv_local_config).read() + s3_csv_df = ReadS3CsvIO(resource_definition=s3_csv_local_config).read() # Then assert expected_s3_csv_df.equals(s3_csv_df) @@ -256,7 +256,7 @@ def test_a_local_parquet_file_is_loaded_when_io_config_is_set_with_local_env_a_p ).get(source_key="READ_FROM_POSTGRES") # When - pg_df = ReadPostgresIO(source_config=pg_local_config, model=ERModel).read() + pg_df = ReadPostgresIO(resource_definition=pg_local_config, model=ERModel).read() # Then assert test_df.equals(pg_df) @@ -274,7 +274,7 @@ def test_local_writer_is_called_for_writing_any_file_when_env_is_set_to_local(se ).get(source_key="WRITE_TO_S3_CSV") # When - WriteS3CsvIO(source_config=s3_csv_local_config).write(df) + WriteS3CsvIO(resource_definition=s3_csv_local_config).write(df) # Then mock__write_to_local.assert_called() @@ -294,7 +294,7 @@ def test_a_df_is_written_locally_as_parquet_when_io_config_is_initialised_with_l ).get(source_key="WRITE_TO_PG_PARQUET") # When - WritePostgresIO(source_config=pg_parquet_local_config).write(df) + WritePostgresIO(resource_definition=pg_parquet_local_config).write(df) # Then try: @@ -316,7 +316,7 @@ def test_a_df_is_written_locally_as_csv_when_io_config_is_initialised_with_local ).get(source_key="WRITE_TO_S3_CSV") # When - WriteS3CsvIO(source_config=s3_csv_local_config).write(df) + WriteS3CsvIO(resource_definition=s3_csv_local_config).write(df) # Then try: @@ -336,7 +336,7 @@ def test_a_df_is_written_locally_as_json_when_io_config_is_initialised_with_loca ).get(source_key="WRITE_TO_KAFKA_JSON") # When - WriteKafkaIO(source_config=kafka_json_local_config).write(df) + WriteKafkaIO(resource_definition=kafka_json_local_config).write(df) # Then try: @@ -358,7 +358,7 @@ def test_a_df_is_written_locally_as_h5_when_io_config_is_initialised_with_local_ ).get(source_key="WRITE_TO_S3_HDF") # When - WriteS3HdfIO(source_config=s3_hdf_local_config).write(df) + WriteS3HdfIO(resource_definition=s3_hdf_local_config).write(df) # Then try: @@ -380,7 +380,7 @@ def test_dynamicio_default_pickle_protocol_is_4( ).get(source_key="WRITE_TO_S3_HDF") # When - WriteS3HdfIO(source_config=s3_hdf_local_config).write(df) + WriteS3HdfIO(resource_definition=s3_hdf_local_config).write(df) # Then try: @@ -402,7 +402,7 @@ def test_dynamicio_default_pickle_protocol_is_bypassed_by_user_input( ).get(source_key="WRITE_TO_S3_HDF") # When - WriteS3HdfIO(source_config=s3_hdf_local_config, protocol=5).write(df) + WriteS3HdfIO(resource_definition=s3_hdf_local_config, protocol=5).write(df) # Then try: @@ -419,7 +419,7 @@ def test_read_resolves_file_path_if_templated_for_some_input_data(self): dynamic_vars=constants, ).get(source_key="TEMPLATED_FILE_PATH") - io_object = TemplatedFile(source_config=config, file_name_to_replace="some_csv_to_read") + io_object = TemplatedFile(resource_definition=config, file_name_to_replace="some_csv_to_read") with patch.object(io_object, "_read_csv_file") as mocked__read_csv_file: mocked__read_csv_file.return_value = pd.read_csv(os.path.join(TEST_RESOURCES, "data/input/some_csv_to_read.csv")) @@ -439,7 +439,7 @@ def test_write_resolves_file_path_if_templated_for_some_output_data(self): dynamic_vars=constants, ).get(source_key="TEMPLATED_FILE_PATH") - io_object = TemplatedFile(source_config=config, file_name_to_replace="some_csv_to_read") + io_object = TemplatedFile(resource_definition=config, file_name_to_replace="some_csv_to_read") df = pd.read_csv(os.path.join(TEST_RESOURCES, "data/input/some_csv_to_read.csv")) with patch.object(io_object, "_write_csv_file") as mocked__write_csv_file: @@ -472,7 +472,7 @@ def test_local_writers_only_write_out_castable_columns_according_to_the_io_schem # @staticmethod # def validate(df: pd.DataFrame): # pass - write_s3_io = WriteS3ParquetIO(source_config=s3_parquet_local_config) + write_s3_io = WriteS3ParquetIO(resource_definition=s3_parquet_local_config) write_s3_io.write(input_df) # # Then @@ -504,7 +504,7 @@ def test_local_writers_only_write_out_columns_in_a_provided_io_schema(self): # @staticmethod # def validate(df: pd.DataFrame): # pass - write_s3_io = WriteS3ParquetIO(source_config=s3_parquet_local_config) + write_s3_io = WriteS3ParquetIO(resource_definition=s3_parquet_local_config) write_s3_io.write(input_df) # Then @@ -546,7 +546,7 @@ def test_write_parquet_file_is_called_with_additional_pyarrow_args(self): # When with patch.object(dynamicio.mixins.with_local.pd.DataFrame, "to_parquet") as mocked__to_parquet: - write_s3_io = WriteS3ParquetIO(source_config=s3_parquet_local_config, **to_parquet_kwargs) + write_s3_io = WriteS3ParquetIO(resource_definition=s3_parquet_local_config, **to_parquet_kwargs) write_s3_io.write(input_df) # Then @@ -684,8 +684,8 @@ def test_async_read_does_not_operate_in_parallel_for_hdf_files(self): async def multi_read(config: Mapping[str, str]) -> Tuple: return await asyncio.gather( - AsyncReadS3HdfIO(source_config=config).async_read(), - AsyncReadS3HdfIO(source_config=config).async_read(), + AsyncReadS3HdfIO(resource_definition=config).async_read(), + AsyncReadS3HdfIO(resource_definition=config).async_read(), ) def dummy_read_hdf(*args, **kwargs) -> pd.DataFrame: # pylint: disable=unused-argument @@ -713,7 +713,7 @@ def test_async_write_does_not_operate_in_parallel_for_hdf_files(self): ).get(source_key="WRITE_TO_S3_HDF") async def multi_write(config: Mapping[str, str], _df: pd.DataFrame) -> Tuple: - return await asyncio.gather(WriteS3HdfIO(source_config=config).async_write(_df), WriteS3HdfIO(source_config=config).async_write(_df)) + return await asyncio.gather(WriteS3HdfIO(resource_definition=config).async_write(_df), WriteS3HdfIO(resource_definition=config).async_write(_df)) @dynamicio.mixins.utils.allow_options([*dynamicio.mixins.utils.args_of(pd.DataFrame.to_hdf), *["protocol"]]) def dummy_to_hdf(*args, **kwargs): # pylint: disable=unused-argument @@ -741,7 +741,7 @@ def test_multiple_files_are_loaded_when_batch_local_type_is_used_for_parquet(sel expected_concatenated_df = expected_s3_parquet_df # When - concatenated_df = ReadFromBatchLocalParquet(source_config=parquet_local_batch_config).read() + concatenated_df = ReadFromBatchLocalParquet(resource_definition=parquet_local_batch_config).read() # Then pd.testing.assert_frame_equal(expected_concatenated_df, concatenated_df) @@ -757,7 +757,7 @@ def test_files_that_dont_comply_to_the_provided_file_type_are_ignored(self, expe expected_concatenated_df = expected_s3_parquet_df # When - concatenated_df = ReadFromBatchLocalParquet(source_config=parquet_local_batch_config).read() + concatenated_df = ReadFromBatchLocalParquet(resource_definition=parquet_local_batch_config).read() # Then pd.testing.assert_frame_equal(expected_concatenated_df, concatenated_df) @@ -773,7 +773,7 @@ def test_if_hdf_file_is_chosen_then_file_type_is_converted_to_h5_for_filtering(s expected_concatenated_df = expected_s3_parquet_df # When - concatenated_df = ReadFromBatchLocalParquet(source_config=parquet_local_batch_config).read() + concatenated_df = ReadFromBatchLocalParquet(resource_definition=parquet_local_batch_config).read() # Then pd.testing.assert_frame_equal(expected_concatenated_df, concatenated_df) @@ -789,7 +789,7 @@ def test_multiple_files_are_loaded_when_batch_local_type_is_used_for_hdf(self, e expected_concatenated_df = expected_s3_hdf_df # When - concatenated_df = ReadFromBatchLocalHdf(source_config=parquet_local_batch_config).read() + concatenated_df = ReadFromBatchLocalHdf(resource_definition=parquet_local_batch_config).read() # Then pd.testing.assert_frame_equal(expected_concatenated_df, concatenated_df.sort_values(by="id").reset_index(drop=True)) diff --git a/tests/test_mixins/test_mixin_utils.py b/tests/test_mixins/test_mixin_utils.py index c79c224..bd94c8d 100644 --- a/tests/test_mixins/test_mixin_utils.py +++ b/tests/test_mixins/test_mixin_utils.py @@ -137,7 +137,7 @@ def test_when_reading_locally_or_from_s3_invalid_options_are_ignored(self, expec ).get(source_key="READ_FROM_S3_CSV") # When - s3_csv_df = ReadS3CsvIO(source_config=s3_csv_local_config, foo=invalid_option).read() + s3_csv_df = ReadS3CsvIO(resource_definition=s3_csv_local_config, foo=invalid_option).read() # Then assert expected_s3_csv_df.equals(s3_csv_df) @@ -153,7 +153,7 @@ def test_when_reading_locally_or_from_s3_valid_options_are_considered(self, expe ).get(source_key="READ_FROM_S3_CSV") # When - s3_csv_df = ReadS3CsvIO(source_config=s3_csv_local_config, dtype=None).read() + s3_csv_df = ReadS3CsvIO(resource_definition=s3_csv_local_config, dtype=None).read() # Then assert expected_s3_csv_df.equals(s3_csv_df) diff --git a/tests/test_mixins/test_postgres_mixins.py b/tests/test_mixins/test_postgres_mixins.py index 62447db..853b92c 100644 --- a/tests/test_mixins/test_postgres_mixins.py +++ b/tests/test_mixins/test_postgres_mixins.py @@ -28,7 +28,7 @@ def test_when_reading_from_postgres_with_env_as_cloud_get_table_columns_returns_ ).get(source_key="READ_FROM_POSTGRES") # When - columns = ReadPostgresIO(source_config=pg_cloud_config)._get_table_columns(ERModel) # pylint: disable=protected-access + columns = ReadPostgresIO(resource_definition=pg_cloud_config)._get_table_columns(ERModel) # pylint: disable=protected-access # Then assert columns == expected_columns @@ -44,7 +44,7 @@ def test_read_from_postgres_is_called_for_loading_a_table_with_columns_with_env_ ).get(source_key="READ_FROM_POSTGRES") # When - ReadPostgresIO(source_config=postgres_cloud_config).read() + ReadPostgresIO(resource_definition=postgres_cloud_config).read() # Then mock__read_from_postgres.assert_called() @@ -61,7 +61,7 @@ def test_write_to_postgres_is_called_for_uploading_a_table_with_columns_with_env ).get(source_key="WRITE_TO_PG_PARQUET") # When - WritePostgresIO(source_config=postgres_cloud_config).write(df) + WritePostgresIO(resource_definition=postgres_cloud_config).write(df) # Then mock__write_to_postgres.assert_called() @@ -80,7 +80,7 @@ def test_write_to_postgres_is_called_with_truncate_and_append_option(self, mock_ ) # When - write_config = WritePostgresIO(source_config=postgres_cloud_config, truncate_and_append=True) + write_config = WritePostgresIO(resource_definition=postgres_cloud_config, truncate_and_append=True) write_config.write(df) @@ -102,7 +102,7 @@ def test_read_from_postgres_by_implicitly_generating_datamodel_from_schema(self, ).get(source_key="READ_FROM_POSTGRES") # When / Then - ReadPostgresIO(source_config=postgres_cloud_config).read() + ReadPostgresIO(resource_definition=postgres_cloud_config).read() mock__read_from_postgres.assert_called() @pytest.mark.unit @@ -116,7 +116,7 @@ def test_read_from_postgres_with_query(self, mock__read_database): ).get(source_key="READ_FROM_POSTGRES") # When - ReadPostgresIO(source_config=postgres_cloud_config, sql_query="SELECT * FROM example").read() + ReadPostgresIO(resource_definition=postgres_cloud_config, sql_query="SELECT * FROM example").read() # Then mock__read_database.assert_called_with(ANY, "SELECT * FROM example") @@ -132,7 +132,7 @@ def test_read_from_postgres_with_query_in_options(self, mock__read_database): ).get(source_key="READ_FROM_POSTGRES_WITH_QUERY_IN_OPTIONS") # When - ReadPostgresIO(source_config=postgres_cloud_config).read() + ReadPostgresIO(resource_definition=postgres_cloud_config).read() # Then mock__read_database.assert_called_with(ANY, "SELECT * FROM table_name_from_yaml_options") @@ -148,7 +148,7 @@ def test_read_from_postgres_with_query_and_options(self, mock__read_sql): ).get(source_key="READ_FROM_POSTGRES") # When - ReadPostgresIO(source_config=postgres_cloud_config, sql_query="SELECT * FROM example", parse_dates=["date"], wrong_arg="whatever").read() + ReadPostgresIO(resource_definition=postgres_cloud_config, sql_query="SELECT * FROM example", parse_dates=["date"], wrong_arg="whatever").read() # Then mock__read_sql.assert_called_with(sql="SELECT * FROM example", con=ANY, parse_dates=["date"]) @@ -165,7 +165,7 @@ def test_generate_model_from_schema_returns_model(self): # When schema = postgres_cloud_config.dynamicio_schema schema_name = postgres_cloud_config.dynamicio_schema.name - model = ReadPostgresIO(source_config=postgres_cloud_config)._generate_model_from_schema(schema) + model = ReadPostgresIO(resource_definition=postgres_cloud_config)._generate_model_from_schema(schema) # Then assert len(model.__table__.columns) == len(schema.columns) and model.__tablename__ == schema_name @@ -181,8 +181,8 @@ def test_get_table_columns_from_generated_model_returns_valid_list_of_columns(se # When schema = pg_cloud_config.dynamicio_schema - model = ReadPostgresIO(source_config=pg_cloud_config)._generate_model_from_schema(schema) # pylint: disable=protected-access - columns = ReadPostgresIO(source_config=pg_cloud_config)._get_table_columns(model) # pylint: disable=protected-access + model = ReadPostgresIO(resource_definition=pg_cloud_config)._generate_model_from_schema(schema) # pylint: disable=protected-access + columns = ReadPostgresIO(resource_definition=pg_cloud_config)._get_table_columns(model) # pylint: disable=protected-access # Then assert isinstance(model.__table__.columns, ImmutableColumnCollection) @@ -208,7 +208,7 @@ def test_to_check_if_dataframe_has_valid_data_types(self): ) # When - is_valid = WriteExtendedPostgresIO(source_config=postgres_cloud_config, show_casting_warnings=True)._has_valid_dtypes(df) + is_valid = WriteExtendedPostgresIO(resource_definition=postgres_cloud_config, show_casting_warnings=True)._has_valid_dtypes(df) # Then assert is_valid is True diff --git a/tests/test_mixins/test_s3_mixins.py b/tests/test_mixins/test_s3_mixins.py index 4a98897..acef3f9 100644 --- a/tests/test_mixins/test_s3_mixins.py +++ b/tests/test_mixins/test_s3_mixins.py @@ -53,7 +53,7 @@ def test_read_resolves_file_path_if_templated(self): ) as mock_s3_reader: with open(file_path, "r") as file: # pylint: disable=unspecified-encoding mock_s3_reader.return_value = file - io_obj = TemplatedFile(source_config=config, file_name_to_replace="some_csv_to_read") + io_obj = TemplatedFile(resource_definition=config, file_name_to_replace="some_csv_to_read") final_schema = io_obj.schema io_obj.read() @@ -72,7 +72,7 @@ def test_write_resolves_file_path_if_templated(self): # When with patch.object(dynamicio.mixins.with_local.WithLocal, "_write_csv_file") as mock__write_csv_file: df = pd.read_csv(os.path.join(TEST_RESOURCES, "data/input/some_csv_to_read.csv")) - TemplatedFile(source_config=config, file_name_to_replace="some_csv_to_read").write(df) + TemplatedFile(resource_definition=config, file_name_to_replace="some_csv_to_read").write(df) # Then args, _ = mock__write_csv_file.call_args @@ -90,7 +90,7 @@ def test_read_from_s3_file_is_called_for_loading_a_file_with_env_as_cloud_s3(sel ).get(source_key="READ_FROM_S3_CSV") # When - ReadS3CsvIO(source_config=s3_csv_cloud_config).read() + ReadS3CsvIO(resource_definition=s3_csv_cloud_config).read() # Then mock__read_from_s3_file.assert_called() @@ -112,7 +112,7 @@ def test_s3_reader_is_not_called_for_loading_a_parquet_with_env_as_cloud_s3_and_ ) as mock_read_parquet_file: with open(file_path, "r") as file: # pylint: disable=unspecified-encoding mock_s3_reader.return_value = file - ReadS3ParquetIO(source_config=s3_parquet_cloud_config, no_disk_space=True).read() + ReadS3ParquetIO(resource_definition=s3_parquet_cloud_config, no_disk_space=True).read() # Then mock_s3_reader.assert_not_called() @@ -135,7 +135,7 @@ def mock_download_fobj(s3_bucket, s3_key, target_file): shutil.copyfileobj(fin, target_file) mock__boto3_client.download_fileobj.side_effect = mock_download_fobj - loaded_hdf_pd = ReadS3HdfIO(source_config=s3_hdf_cloud_config, no_disk_space=True).read() + loaded_hdf_pd = ReadS3HdfIO(resource_definition=s3_hdf_cloud_config, no_disk_space=True).read() # Then pd.testing.assert_frame_equal(loaded_hdf_pd, expected_s3_hdf_df) @@ -153,7 +153,7 @@ def test_s3_reader_is_not_called_for_loading_a_json_with_env_as_cloud_s3_and_typ with patch.object(dynamicio.mixins.with_s3.WithS3File, "_s3_reader") as mock__s3_reader, patch.object( dynamicio.mixins.with_s3.WithS3File, "_read_json_file" ) as mock__read_json_file: - ReadS3JsonIO(source_config=s3_json_cloud_config, no_disk_space=True).read() + ReadS3JsonIO(resource_definition=s3_json_cloud_config, no_disk_space=True).read() # Then mock__s3_reader.assert_not_called() @@ -172,7 +172,7 @@ def test_s3_reader_is_not_called_for_loading_a_csv_with_env_as_cloud_s3_and_type with patch.object(dynamicio.mixins.with_s3.WithS3File, "_s3_reader") as mock__s3_reader, patch.object( dynamicio.mixins.with_s3.WithS3File, "_read_csv_file" ) as mock__read_csv_file: - ReadS3CsvIO(source_config=s3_csv_cloud_config, no_disk_space=True).read() + ReadS3CsvIO(resource_definition=s3_csv_cloud_config, no_disk_space=True).read() # Then mock__s3_reader.assert_not_called() @@ -232,7 +232,7 @@ def test_s3_writers_only_validate_schema_prior_writing_out_the_dataframe(self): ) as mock__write_parquet_file: with NamedTemporaryFile(delete=False) as temp_file: mock__s3_writer.return_value = temp_file - WriteS3ParquetIO(source_config=s3_parquet_cloud_config).write(input_df) + WriteS3ParquetIO(resource_definition=s3_parquet_cloud_config).write(input_df) # Then mock__apply_schema.assert_called() @@ -252,7 +252,7 @@ def test_columns_data_type_error_exception_is_not_generated_if_column_dtypes_can dynamicio.mixins.with_s3.WithS3File, "_s3_named_file_reader" ): mock__read_parquet_file.return_value = expected_s3_parquet_df - ReadS3ParquetWithDifferentCastableDTypeIO(source_config=s3_parquet_cloud_config).read() + ReadS3ParquetWithDifferentCastableDTypeIO(resource_definition=s3_parquet_cloud_config).read() assert True, "No exception was raised" @@ -286,7 +286,7 @@ def test_columns_data_type_error_exception_is_generated_if_column_dtypes_dont_ma # When/Then with pytest.raises(ColumnsDataTypeError): - ReadS3ParquetWithDifferentNonCastableDTypeIO(source_config=s3_parquet_cloud_config).read() + ReadS3ParquetWithDifferentNonCastableDTypeIO(resource_definition=s3_parquet_cloud_config).read() moc__read_parquet_file.assert_called() @pytest.mark.unit @@ -304,7 +304,7 @@ def test_read_parquet_file_is_called_while_s3_reader_is_not_for_loading_a_parque with patch.object(dynamicio.mixins.with_s3.WithS3File, "_s3_reader") as mock__s3_reader, patch.object( dynamicio.mixins.with_local.WithLocal, "_read_parquet_file" ) as mock__read_parquet_file: - ReadS3ParquetIO(source_config=s3_parquet_cloud_config, no_disk_space=True).read() + ReadS3ParquetIO(resource_definition=s3_parquet_cloud_config, no_disk_space=True).read() # Then mock__s3_reader.assert_not_called() @@ -323,7 +323,7 @@ def test_s3_writer_is_called_for_writing_a_file_with_env_is_set_to_cloud_s3(self ).get(source_key="WRITE_TO_S3_JSON") # When - ReadS3HdfIO(source_config=s3_json_local_config).write(df) + ReadS3HdfIO(resource_definition=s3_json_local_config).write(df) # Then mock__write_to_s3_file.assert_called() @@ -345,7 +345,7 @@ def test_write_parquet_file_is_called_for_writing_a_parquet_with_env_as_cloud_s3 ) as mock__write_parquet_file: with NamedTemporaryFile(delete=False) as temp_file: mock__s3_writer.return_value = temp_file - WriteS3ParquetIO(source_config=s3_parquet_local_config).write(df) + WriteS3ParquetIO(resource_definition=s3_parquet_local_config).write(df) # Then mock__write_parquet_file.assert_called() @@ -367,7 +367,7 @@ def test_write_csv_file_is_called_for_writing_a_parquet_with_env_as_cloud_s3_and ) as mock__write_csv_file: with NamedTemporaryFile(delete=False) as temp_file: mock__s3_writer.return_value = temp_file - WriteS3CsvIO(source_config=s3_csv_local_config).write(df) + WriteS3CsvIO(resource_definition=s3_csv_local_config).write(df) # Then mock__write_csv_file.assert_called() @@ -389,7 +389,7 @@ def test_write_json_file_is_called_for_writing_a_parquet_with_env_as_cloud_s3_an ) as mock__write_json_file: with NamedTemporaryFile(delete=False) as temp_file: mock__s3_writer.return_value = temp_file - WriteS3JsonIO(source_config=s3_json_local_config).write(df) + WriteS3JsonIO(resource_definition=s3_json_local_config).write(df) # Then mock__write_json_file.assert_called() @@ -408,7 +408,7 @@ def test_write_hdf_file_is_called_for_writing_a_parquet_with_env_as_cloud_s3_and with patch.object(dynamicio.mixins.with_s3.WithS3File, "_s3_writer") as mock__s3_writer: with NamedTemporaryFile(delete=False) as temp_file: mock__s3_writer.return_value = temp_file - WriteS3HdfIO(source_config=s3_hdf_local_config).write(df) + WriteS3HdfIO(resource_definition=s3_hdf_local_config).write(df) # Then assert os.stat(temp_file.name).st_size == 1064192, "Confirm that the output file size did not change" @@ -459,7 +459,7 @@ def test_ValueError_is_raised_if_partition_cols_missing_from_options_when_upload # When / Then with pytest.raises(ValueError): - WriteS3ParquetIO(source_config=s3_parquet_cloud_config).write(input_df) + WriteS3ParquetIO(resource_definition=s3_parquet_cloud_config).write(input_df) @pytest.mark.unit def test_error_is_raised_if_file_type_not_parquet_when_uploading(self, tmp_path): @@ -501,7 +501,7 @@ def test_read_from_s3_path_prefix_is_called_for_loading_a_path_prefix_with_env_a ).get(source_key="READ_FROM_S3_PATH_PREFIX_CSV") # When - ReadS3CsvIO(source_config=s3_csv_cloud_config).read() + ReadS3CsvIO(resource_definition=s3_csv_cloud_config).read() # Then mock__read_from_s3_path_prefix.assert_called() @@ -519,7 +519,7 @@ def test_write_to_s3_path_prefix_is_called_for_uploading_to_a_path_prefix_with_e ).get(source_key="WRITE_TO_S3_PATH_PREFIX_PARQUET") # When - WriteS3ParquetIO(source_config=s3_parquet_cloud_config).write(input_df) + WriteS3ParquetIO(resource_definition=s3_parquet_cloud_config).write(input_df) # Then mock__write_to_s3_path_prefix.assert_called() @@ -538,7 +538,7 @@ def test_awscli_runner_is_called_with_correct_s3_path_and_aws_command_when_uploa # When with patch.object(dynamicio.mixins.with_s3, "awscli_runner") as mocked__awscli_runner: - WriteS3ParquetIO(source_config=s3_parquet_cloud_config, partition_cols="col_2").write(input_df) + WriteS3ParquetIO(resource_definition=s3_parquet_cloud_config, partition_cols="col_2").write(input_df) # Then mocked__awscli_runner.assert_called_with("s3", "sync", "temp", "s3://mock-bucket/mock-key", "--acl", "bucket-owner-full-control", "--only-show-errors", "--exact-timestamps") @@ -557,7 +557,7 @@ def test_awscli_runner_is_called_with_correct_s3_path_and_aws_command_when_loadi # When with patch.object(dynamicio.mixins.with_s3, "awscli_runner") as mocked__awscli_runner: - ReadS3HdfIO(source_config=s3_hdf_cloud_config).read() + ReadS3HdfIO(resource_definition=s3_hdf_cloud_config).read() # Then mocked__awscli_runner.assert_called_with("s3", "sync", "s3://mock-bucket/mock-key", "temp", "--acl", "bucket-owner-full-control", "--only-show-errors", "--exact-timestamps") @@ -577,7 +577,7 @@ def test__read_hdf_file_is_called_with_correct_local_file_path_when_loading_a_pa # When with patch.object(dynamicio.mixins.with_s3, "awscli_runner") as mocked__awscli_runner: mocked__awscli_runner.return_value = True - read_obj = ReadS3HdfIO(source_config=s3_hdf_cloud_config) + read_obj = ReadS3HdfIO(resource_definition=s3_hdf_cloud_config) actual_schema = read_obj.schema read_obj.read() @@ -606,7 +606,7 @@ def test__read_parquet_file_is_called_with_correct_local_file_path_when_loading_ # When with patch.object(dynamicio.mixins.with_s3, "awscli_runner") as mocked__awscli_runner: mocked__awscli_runner.return_value = True - read_obj = ReadS3ParquetIO(source_config=s3_parquet_cloud_config) + read_obj = ReadS3ParquetIO(resource_definition=s3_parquet_cloud_config) actual_schema = read_obj.schema read_obj.read() @@ -635,7 +635,7 @@ def test_read_parquet_file_is_called_while_awscli_runner_is_not_for_loading_a_pa with patch.object(dynamicio.mixins.with_s3, "awscli_runner") as mock__awscli_runner, patch.object( dynamicio.mixins.with_local.WithLocal, "_read_parquet_file" ) as mock__read_parquet_file: - ReadS3ParquetIO(source_config=s3_parquet_cloud_config, no_disk_space=True).read() + ReadS3ParquetIO(resource_definition=s3_parquet_cloud_config, no_disk_space=True).read() # Then mock__read_parquet_file.assert_called() @@ -654,7 +654,7 @@ def test__read_parquet_file_can_read_directory_of_parquet_files_loading_only_nec # When with patch.object(dynamicio.mixins.with_s3, "awscli_runner") as mocked__awscli_runner: mocked__awscli_runner.return_value = True - df = ReadS3ParquetWithLessColumnsIO(source_config=s3_parquet_cloud_config).read() + df = ReadS3ParquetWithLessColumnsIO(resource_definition=s3_parquet_cloud_config).read() # Then assert df.shape == (15, 2) and df.columns.tolist() == ["id", "foo_name"] @@ -672,7 +672,7 @@ def test__read_parquet_file_can_filter_out_rows_using_appropriate_options(self, # When with patch.object(dynamicio.mixins.with_s3, "awscli_runner") as mocked__awscli_runner: mocked__awscli_runner.return_value = True - df = ReadS3ParquetIO(source_config=s3_parquet_cloud_config, filters=[[("foo_name", "==", "name_a")]]).read() + df = ReadS3ParquetIO(resource_definition=s3_parquet_cloud_config, filters=[[("foo_name", "==", "name_a")]]).read() # Then assert df.shape == (8, 3) and df.columns.tolist() == ["id", "foo_name", "bar"] and df.foo_name.unique() == ["name_a"] @@ -695,7 +695,7 @@ def test__read_csv_file_is_called_with_correct_local_file_path_when_loading_a_pa # When with patch.object(dynamicio.mixins.with_s3, "awscli_runner") as mocked__awscli_runner: mocked__awscli_runner.return_value = True - read_obj = ReadS3ParquetIO(source_config=s3_csv_cloud_config) + read_obj = ReadS3ParquetIO(resource_definition=s3_csv_cloud_config) actual_schema = read_obj.schema read_obj.read() @@ -727,7 +727,7 @@ def test__read_json_file_is_called_with_correct_local_file_path_when_loading_a_p # When with patch.object(dynamicio.mixins.with_s3, "awscli_runner") as mocked__awscli_runner: mocked__awscli_runner.return_value = True - read_obj = ReadS3ParquetIO(source_config=s3_csv_cloud_config) + read_obj = ReadS3ParquetIO(resource_definition=s3_csv_cloud_config) actual_schema = read_obj.schema read_obj.read() @@ -759,7 +759,7 @@ def test_a_concatenated_hdf_file_is_returned_with_schema_columns_when_loading_a_ # When with patch.object(dynamicio.mixins.with_s3, "awscli_runner") as mocked__awscli_runner: mocked__awscli_runner.return_Value = True - h5_df = ReadS3HdfIO(source_config=s3_hdf_cloud_config).read() + h5_df = ReadS3HdfIO(resource_definition=s3_hdf_cloud_config).read() # Then pd.testing.assert_frame_equal( @@ -825,7 +825,7 @@ def test__read_parquet_file_can_read_directory_of_parquet_files_containing_empty # When with patch.object(dynamicio.mixins.with_s3, "awscli_runner") as mocked__awscli_runner: mocked__awscli_runner.return_value = True - df = ReadS3ParquetWEmptyFilesIO(source_config=s3_parquet_cloud_config).read() + df = ReadS3ParquetWEmptyFilesIO(resource_definition=s3_parquet_cloud_config).read() # Then assert df.shape == (10, 2) and df.columns.tolist() == ["id", "bar"] diff --git a/tests/test_regressions/test_v430.py b/tests/test_regressions/test_v430.py index e0d1d11..336b7e5 100644 --- a/tests/test_regressions/test_v430.py +++ b/tests/test_regressions/test_v430.py @@ -17,7 +17,7 @@ def test_missing_validations_and_metrics(regressions_resources_dir, regressions_ env_identifier="LOCAL", dynamic_vars=regressions_constants_module, ) - io_instance = IO(source_config=input_config.get(source_key="PRODUCTS"), apply_schema_validations=True, log_schema_metrics=True) + io_instance = IO(resource_definition=input_config.get(source_key="PRODUCTS"), apply_schema_validations=True, log_schema_metrics=True) # When data = io_instance.read() From 609b459c61f6ad7c15baf79fbc0765ac33366359 Mon Sep 17 00:00:00 2001 From: Christos Hadjinikolis Date: Sat, 25 Feb 2023 17:58:26 +0000 Subject: [PATCH 3/3] wip --- demo/src/runners/staging.py | 22 ++++++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/demo/src/runners/staging.py b/demo/src/runners/staging.py index 7aa48cf..6aa13f7 100644 --- a/demo/src/runners/staging.py +++ b/demo/src/runners/staging.py @@ -17,8 +17,13 @@ def main() -> None: # LOAD DATA logger.info("Loading data from live sources...") - bar_df = InputIO(resource_definition=staging_input_config.get(source_key="BAR"), schema=Bar, apply_schema_validations=True, log_schema_metrics=True).read() - foo_df = InputIO(resource_definition=staging_input_config.get(source_key="FOO"), scheam=Foo, apply_schema_validations=True, log_schema_metrics=True).read() + bar_df = UnifiedIO.read( + resource_definition=staging_input_config.get(source_key="BAR"), + schema=Bar, + apply_schema_validations=True, + log_schema_metrics=True + ) + foo_df = InputIO(resource_definition=staging_input_config.get(source_key="FOO"), schema=Foo, apply_schema_validations=True, log_schema_metrics=True).read() logger.info("Data successfully loaded from live sources...") @@ -31,6 +36,15 @@ def main() -> None: # SINK DATA logger.info("Begin sinking data to staging area:") - StagedFooIO(resource_definition=staging_output_config.get(source_key="STAGED_FOO"), **constants.TO_PARQUET_KWARGS).write(foo_df) - StagedBarIO(resource_definition=staging_output_config.get(source_key="STAGED_BAR")).write(bar_df) + UnifiedIO(resource_definition=staging_output_config.get(source_key="STAGED_FOO"), **constants.TO_PARQUET_KWARGS).write(foo_df) + UnifiedIO(resource_definition=staging_output_config.get(source_key="STAGED_BAR")).write(bar_df) logger.info("Data staging is complete...") + + + + # 1. Use UnifiedIO as IO class for loading writing data + # 2. Introduce pandera schema as extra field in UnifiedIO parameters + # 3. Write script to generate pandera schema from dataframe (extra propose logging and validations too) + # 4. Write script to migrate from yamls schemas to pandera schemas + # 5. Write validations that are not available through pandera standard validations but we use after we identify what is missing! + # 6. Confirm everything works