Skip to content

Commit

Permalink
Merge pull request #8 from great-expectations/f/core-727/checkpoint_o…
Browse files Browse the repository at this point in the history
…perator_unit_tests

Unit tests for GXValidateCheckpointOperator
  • Loading branch information
joshua-stauffer authored Jan 8, 2025
2 parents 985f6d4 + b9e3d30 commit 6622bcb
Showing 1 changed file with 127 additions and 3 deletions.
130 changes: 127 additions & 3 deletions tests/unit/test_validate_checkpoint_operator.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,22 @@
import json
from typing import Literal
from unittest.mock import Mock
import pytest
from great_expectations import ValidationDefinition
from great_expectations import Checkpoint
from pytest_mock import MockerFixture

from great_expectations_provider.operators.validate_checkpoint import (
GXValidateCheckpointOperator,
)
from great_expectations.data_context import AbstractDataContext
from great_expectations.data_context import AbstractDataContext, FileDataContext
import pandas as pd
from great_expectations import ExpectationSuite
from great_expectations.expectations import ExpectColumnValuesToBeInSet


class TestValidateCheckpointOperator:
def test_validate_dataframe(self):
def test_validate_dataframe(self) -> None:
# arrange
column_name = "col_A"

Expand Down Expand Up @@ -55,7 +59,7 @@ def configure_checkpoint(context: AbstractDataContext) -> Checkpoint:
df = pd.DataFrame({column_name: ["a", "b", "c"]})

validate_cloud_checkpoint = GXValidateCheckpointOperator(
task_id="validate_cloud_checkpoint",
task_id="validate_checkpoint",
configure_checkpoint=configure_checkpoint,
batch_parameters={"dataframe": df},
)
Expand All @@ -66,3 +70,123 @@ def configure_checkpoint(context: AbstractDataContext) -> Checkpoint:
# assert
assert serialized_result["success"]
json.dumps(serialized_result) # result must be json serializable

def test_context_type_ephemeral(self, mocker: MockerFixture) -> None:
"""Expect that param context_type creates an EphemeralDataContext."""
# arrange
context_type: Literal["ephemeral"] = "ephemeral"
configure_checkpoint = Mock()
mock_gx = Mock()
mocker.patch.dict("sys.modules", {"great_expectations": mock_gx})
validate_checkpoint = GXValidateCheckpointOperator(
task_id="validate_checkpoint",
configure_checkpoint=configure_checkpoint,
context_type=context_type,
)

# act
validate_checkpoint.execute(context={})

# assert
mock_gx.get_context.assert_called_once_with(mode=context_type)

def test_context_type_cloud(self, mocker: MockerFixture) -> None:
"""Expect that param context_type creates a CloudDataContext."""
# arrange
context_type: Literal["cloud"] = "cloud"
configure_checkpoint = Mock()
mock_gx = Mock()
mocker.patch.dict("sys.modules", {"great_expectations": mock_gx})
validate_checkpoint = GXValidateCheckpointOperator(
task_id="validate_checkpoint_success",
configure_checkpoint=configure_checkpoint,
context_type=context_type,
)

# act
validate_checkpoint.execute(context={})

# assert
mock_gx.get_context.assert_called_once_with(mode=context_type)

def test_context_type_filesystem(self, mocker: MockerFixture) -> None:
"""Expect that param context_type defers creation of data context to user."""
# arrange
context_type: Literal["file"] = "file"
configure_checkpoint = Mock()
configure_file_data_context = Mock()
mock_gx = Mock()
mocker.patch.dict("sys.modules", {"great_expectations": mock_gx})
validate_checkpoint = GXValidateCheckpointOperator(
task_id="validate_checkpoint_success",
configure_checkpoint=configure_checkpoint,
context_type=context_type,
configure_file_data_context=configure_file_data_context,
)

# act
validate_checkpoint.execute(context={})

# assert
mock_gx.get_context.assert_not_called()

def test_user_configured_context_is_passed_to_configure_checkpoint(self) -> None:
"""Expect that if a user configures a file context, it gets passed to the configure_checkpoint function."""
# arrange
context_type: Literal["file"] = "file"
configure_checkpoint = Mock()
configure_file_data_context = Mock()
validate_checkpoint = GXValidateCheckpointOperator(
task_id="validate_checkpoint_success",
configure_checkpoint=configure_checkpoint,
context_type=context_type,
configure_file_data_context=configure_file_data_context,
)

# act
validate_checkpoint.execute(context={})

# assert
configure_checkpoint.assert_called_once_with(
configure_file_data_context.return_value
)

def test_context_type_filesystem_requires_configure_file_data_context(self) -> None:
"""Expect that param context_type requires the configure_file_data_context parameter."""
# arrange
context_type: Literal["file"] = "file"
configure_checkpoint = Mock()

# act/assert
with pytest.raises(ValueError, match="configure_file_data_context"):
GXValidateCheckpointOperator(
task_id="validate_checkpoint_success",
configure_checkpoint=configure_checkpoint,
context_type=context_type,
configure_file_data_context=None, # must be defined
)

def test_batch_parameters(self) -> None:
"""Expect that param batch_parameters is passed to Checkpoint.run. This
also confirms that we run the Checkpoint returned by configure_checkpoint."""
# arrange
mock_checkpoint = Mock()
configure_checkpoint = Mock()
configure_checkpoint.return_value = mock_checkpoint
batch_parameters = {
"year": "2024",
"month": "01",
"day": "01",
}
validate_checkpoint = GXValidateCheckpointOperator(
task_id="validate_checkpoint",
configure_checkpoint=configure_checkpoint,
batch_parameters=batch_parameters,
context_type="ephemeral",
)

# act
validate_checkpoint.execute(context={})

# assert
mock_checkpoint.run.assert_called_once_with(batch_parameters=batch_parameters)

0 comments on commit 6622bcb

Please sign in to comment.