Skip to content

Commit

Permalink
Update some load tests
Browse files Browse the repository at this point in the history
  • Loading branch information
steinitzu committed May 3, 2024
1 parent 5b01e08 commit 47bb759
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 9 deletions.
17 changes: 9 additions & 8 deletions tests/load/test_dummy_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ def test_spool_job_failed_exception_init() -> None:
package_info = load.load_storage.get_load_package_info(load_id)
assert package_info.state == "aborted"
# both failed - we wait till the current loop is completed and then raise
assert len(package_info.jobs["failed_jobs"]) == 2
assert len(package_info.jobs["failed_jobs"]) == 1
assert len(package_info.jobs["started_jobs"]) == 0
# load id was never committed
complete_load.assert_not_called()
Expand All @@ -232,7 +232,7 @@ def test_spool_job_failed_exception_complete() -> None:
package_info = load.load_storage.get_load_package_info(load_id)
assert package_info.state == "aborted"
# both failed - we wait till the current loop is completed and then raise
assert len(package_info.jobs["failed_jobs"]) == 2
assert len(package_info.jobs["failed_jobs"]) == 1
assert len(package_info.jobs["started_jobs"]) == 0


Expand All @@ -254,8 +254,8 @@ def test_spool_job_retry_spool_new() -> None:
with ThreadPoolExecutor() as pool:
load.pool = pool
jobs_count, jobs = load.spool_new_jobs(load_id, schema)
assert jobs_count == 2
assert len(jobs) == 2
assert jobs_count == 1
assert len(jobs) == 1


def test_spool_job_retry_started() -> None:
Expand Down Expand Up @@ -315,11 +315,11 @@ def test_try_retrieve_job() -> None:
load_id, schema = prepare_load_package(load.load_storage, NORMALIZED_FILES)
load.pool = ThreadPoolExecutor()
jobs_count, jobs = load.spool_new_jobs(load_id, schema)
assert jobs_count == 2
assert jobs_count == 1
# now jobs are known
with load.destination.client(schema, load.initial_client_config) as c:
job_count, jobs = load.retrieve_jobs(c, load_id)
assert job_count == 2
assert job_count == 1
for j in jobs:
assert j.state() == "running"

Expand Down Expand Up @@ -636,6 +636,7 @@ def test_init_client_truncate_tables() -> None:
assert update_stored_schema.call_args[1]["only_tables"] == {
"_dlt_loads",
"_dlt_version",
"_dlt_pipeline_state",
}
assert initialize_storage.call_count == 2
# initialize storage is called twice, we deselected all tables to truncate
Expand Down Expand Up @@ -697,7 +698,7 @@ def test_init_client_truncate_tables() -> None:
# we use staging dataset
assert update_stored_schema.call_count == 2
# 4 tables to update in main dataset
assert len(update_stored_schema.call_args_list[0].kwargs["only_tables"]) == 4
assert len(update_stored_schema.call_args_list[0].kwargs["only_tables"]) == 5
assert (
"event_user" in update_stored_schema.call_args_list[0].kwargs["only_tables"]
)
Expand Down Expand Up @@ -726,7 +727,7 @@ def test_init_client_truncate_tables() -> None:
initialize_storage.call_args_list[1].kwargs["truncate_tables"]
) == len(bot_chain)
# migrate only tables for which we have jobs
assert len(update_stored_schema.call_args_list[0].kwargs["only_tables"]) == 4
assert len(update_stored_schema.call_args_list[0].kwargs["only_tables"]) == 5
# print(initialize_storage.call_args_list)
# print(update_stored_schema.call_args_list)

Expand Down
8 changes: 7 additions & 1 deletion tests/load/test_job_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from dlt.common.schema.typing import (
LOADS_TABLE_NAME,
VERSION_TABLE_NAME,
STATE_TABLE_NAME,
TWriteDisposition,
TTableSchema,
)
Expand Down Expand Up @@ -85,7 +86,12 @@ def test_get_update_basic_schema(client: SqlJobClientBase) -> None:
schema = client.schema
schema_update = client.update_stored_schema()
# expect dlt tables in schema update
assert set(schema_update.keys()) == {VERSION_TABLE_NAME, LOADS_TABLE_NAME, "event_slot"}
assert set(schema_update.keys()) == {
VERSION_TABLE_NAME,
LOADS_TABLE_NAME,
STATE_TABLE_NAME,
"event_slot",
}
# event_bot and event_user are not present because they have no columns
# check is event slot has variant
assert schema_update["event_slot"]["columns"]["value"]["variant"] is True
Expand Down

0 comments on commit 47bb759

Please sign in to comment.