Skip to content

Commit

Permalink
re-order tests
Browse files Browse the repository at this point in the history
  • Loading branch information
sh-rp committed Nov 25, 2024
1 parent 0eb6f58 commit c06525b
Showing 1 changed file with 74 additions and 65 deletions.
139 changes: 74 additions & 65 deletions tests/load/test_read_interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,15 @@ def _expected_chunk_count(p: Pipeline) -> List[int]:
@pytest.fixture(scope="session")
def populated_pipeline(request) -> Any:
"""fixture that returns a pipeline object populated with the example data"""

# ensure ibis is not installed
import subprocess

try:
subprocess.check_call(["pip", "uninstall", "ibis-framework"])
except subprocess.CalledProcessError:
pass

destination_config = cast(DestinationTestConfiguration, request.param)

if (
Expand Down Expand Up @@ -254,71 +263,6 @@ def test_db_cursor_access(populated_pipeline: Pipeline) -> None:
assert set(ids) == set(range(total_records))


@pytest.mark.no_load
@pytest.mark.essential
@pytest.mark.parametrize(
"populated_pipeline",
configs,
indirect=True,
ids=lambda x: x.name,
)
def test_ibis_dataset_access(populated_pipeline: Pipeline) -> None:
# NOTE: we could generalize this with a context for certain deps
import subprocess

subprocess.check_call(
["pip", "install", "ibis-framework[duckdb,postgres,bigquery,snowflake,mssql,clickhouse]"]
)

from dlt.common.libs.ibis import SUPPORTED_DESTINATIONS

# check correct error if not supported
if populated_pipeline.destination.destination_type not in SUPPORTED_DESTINATIONS:
with pytest.raises(NotImplementedError):
populated_pipeline._dataset().ibis()
return

total_records = _total_records(populated_pipeline)
ibis_connection = populated_pipeline._dataset().ibis()

map_i = lambda x: x
if populated_pipeline.destination.destination_type == "dlt.destinations.snowflake":
map_i = lambda x: x.upper()

dataset_name = map_i(populated_pipeline.dataset_name)
table_like_statement = None
table_name_prefix = ""
addtional_tables = []

# clickhouse has no datasets, but table prefixes and a sentinel table
if populated_pipeline.destination.destination_type == "dlt.destinations.clickhouse":
table_like_statement = dataset_name + "."
table_name_prefix = dataset_name + "___"
dataset_name = None
addtional_tables = ["dlt_sentinel_table"]

add_table_prefix = lambda x: table_name_prefix + x

# just do a basic check to see wether ibis can connect
assert set(ibis_connection.list_tables(database=dataset_name, like=table_like_statement)) == {
add_table_prefix(map_i(x))
for x in (
[
"_dlt_loads",
"_dlt_pipeline_state",
"_dlt_version",
"double_items",
"items",
"items__children",
]
+ addtional_tables
)
}

items_table = ibis_connection.table(add_table_prefix(map_i("items")), database=dataset_name)
assert items_table.count().to_pandas() == total_records


@pytest.mark.no_load
@pytest.mark.essential
@pytest.mark.parametrize(
Expand Down Expand Up @@ -551,6 +495,71 @@ def test_ibis_expression_relation(populated_pipeline: Pipeline) -> None:
# assert table.schema.field("di_decimal").type.precision == expected_decimal_precision_di


@pytest.mark.no_load
@pytest.mark.essential
@pytest.mark.parametrize(
"populated_pipeline",
configs,
indirect=True,
ids=lambda x: x.name,
)
def test_ibis_dataset_access(populated_pipeline: Pipeline) -> None:
# NOTE: we could generalize this with a context for certain deps
import subprocess

subprocess.check_call(
["pip", "install", "ibis-framework[duckdb,postgres,bigquery,snowflake,mssql,clickhouse]"]
)

from dlt.common.libs.ibis import SUPPORTED_DESTINATIONS

# check correct error if not supported
if populated_pipeline.destination.destination_type not in SUPPORTED_DESTINATIONS:
with pytest.raises(NotImplementedError):
populated_pipeline._dataset().ibis()
return

total_records = _total_records(populated_pipeline)
ibis_connection = populated_pipeline._dataset().ibis()

map_i = lambda x: x
if populated_pipeline.destination.destination_type == "dlt.destinations.snowflake":
map_i = lambda x: x.upper()

dataset_name = map_i(populated_pipeline.dataset_name)
table_like_statement = None
table_name_prefix = ""
addtional_tables = []

# clickhouse has no datasets, but table prefixes and a sentinel table
if populated_pipeline.destination.destination_type == "dlt.destinations.clickhouse":
table_like_statement = dataset_name + "."
table_name_prefix = dataset_name + "___"
dataset_name = None
addtional_tables = ["dlt_sentinel_table"]

add_table_prefix = lambda x: table_name_prefix + x

# just do a basic check to see wether ibis can connect
assert set(ibis_connection.list_tables(database=dataset_name, like=table_like_statement)) == {
add_table_prefix(map_i(x))
for x in (
[
"_dlt_loads",
"_dlt_pipeline_state",
"_dlt_version",
"double_items",
"items",
"items__children",
]
+ addtional_tables
)
}

items_table = ibis_connection.table(add_table_prefix(map_i("items")), database=dataset_name)
assert items_table.count().to_pandas() == total_records


@pytest.mark.no_load
@pytest.mark.essential
@pytest.mark.parametrize(
Expand Down

0 comments on commit c06525b

Please sign in to comment.