Skip to content

Commit

Permalink
Add Athena Connection Support (#91)
Browse files Browse the repository at this point in the history
* Add Athena Connection Support

Add an Athena URI builder to make_connection_string(),
assuming for now that Athena is the only connection when an AWS
connection type is given. This is an incorrect assumption, but we
currently do not have asks for other use cases figuring out how
to differentiate these may be a non-trivial issue.

Signed-off-by: Benji Lampel <[email protected]>

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Use params instead of kwargs to pass Athena-specific parameters. Not sure if this is correct use of params...

Signed-off-by: Benji Lampel <[email protected]>

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Add Athena Connection Support

Add an Athena URI builder to make_connection_string(),
assuming for now that Athena is the only connection when an AWS
connection type is given. This is an incorrect assumption, but we
currently do not have asks for other use cases figuring out how
to differentiate these may be a non-trivial issue.

Signed-off-by: Benji Lampel <[email protected]>

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Use params instead of kwargs to pass Athena-specific parameters. Not sure if this is correct use of params...

Signed-off-by: Benji Lampel <[email protected]>

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Fix flake8 errors

Signed-off-by: Benji Lampel <[email protected]>

* Fix tests

Signed-off-by: Benji Lampel <[email protected]>

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Fix flake8 and black issues

Signed-off-by: Benji Lampel <[email protected]>

---------

Signed-off-by: Benji Lampel <[email protected]>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
denimalpaca and pre-commit-ci[bot] authored Mar 22, 2023
1 parent 9b63b1f commit 9291e1f
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 1 deletion.
18 changes: 17 additions & 1 deletion great_expectations_provider/operators/great_expectations.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ def __init__(
self.datasource: Optional[Datasource] = None
self.batch_request: Optional[BatchRequestBase] = None
self.schema = schema
self.kwargs = kwargs

if self.is_dataframe and self.query_to_validate:
raise ValueError(
Expand Down Expand Up @@ -288,7 +289,22 @@ def make_connection_configuration(self) -> Dict[str, str]:
uri_string = f"{self.conn.host}{self.schema}"
elif conn_type == "sqlite":
uri_string = f"sqlite:///{self.conn.host}"
# TODO: Add Athena and Trino support if possible
elif conn_type == "aws":
# TODO: Check which AWS resource is being used based on the hook. This is difficult because
# we don't have access to a specific hook.
athena_db = self.schema or self.params.get("database")
s3_path = self.params.get("s3_path")
region = self.params.get("region")
if not s3_path:
raise ValueError("No s3_path given in params.")
if not region:
raise ValueError("No region given in params.")
if athena_db:
uri_string = f"awsathena+rest://@athena.{region}.amazonaws.com/{athena_db}?s3_staging_dir={s3_path}"
else:
uri_string = f"awsathena+rest://@athena.{region}.amazonaws.com/?s3_staging_dir={s3_path}"
# TODO: Add other AWS sources here as needed
# TODO: Add and Trino support (if possible)
else:
raise ValueError(f"Conn type: {conn_type} is not supported.")
return {"connection_string": uri_string}
Expand Down
42 changes: 42 additions & 0 deletions tests/operators/test_great_expectations.py
Original file line number Diff line number Diff line change
Expand Up @@ -930,6 +930,48 @@ def test_great_expectations_operator__make_connection_string_sqlite():
assert operator.make_connection_configuration() == test_conn_conf


def test_great_expectations_operator__make_connection_string_athena_with_db():
test_conn_conf = {
"connection_string": "awsathena+rest://@athena.us-east-1.amazonaws.com/athena_db?s3_staging_dir=bucket/path/to/staging/dir" # noqa
}
operator = GreatExpectationsOperator(
task_id="task_id",
data_context_config=in_memory_data_context_config,
data_asset_name="athena_db.table_name",
conn_id="aws_default",
expectation_suite_name="suite",
params={"region": "us-east-1", "s3_path": "bucket/path/to/staging/dir"},
)
operator.conn = Connection(
conn_id="aws_default",
conn_type="aws",
host="host",
)
operator.conn_type = operator.conn.conn_type
assert operator.make_connection_configuration() == test_conn_conf


def test_great_expectations_operator__make_connection_string_athena_without_db():
test_conn_conf = {
"connection_string": "awsathena+rest://@athena.us-east-1.amazonaws.com/?s3_staging_dir=bucket/path/to/staging/dir" # noqa
}
operator = GreatExpectationsOperator(
task_id="task_id",
data_context_config=in_memory_data_context_config,
data_asset_name="table_name",
conn_id="aws_default",
expectation_suite_name="suite",
params={"region": "us-east-1", "s3_path": "bucket/path/to/staging/dir"},
)
operator.conn = Connection(
conn_id="aws_default",
conn_type="aws",
host="host",
)
operator.conn_type = operator.conn.conn_type
assert operator.make_connection_configuration() == test_conn_conf


def test_great_expectations_operator__make_connection_string_schema_parameter(mocker):
test_conn_conf = {
"url": URL.create(
Expand Down

0 comments on commit 9291e1f

Please sign in to comment.