Skip to content

Commit

Permalink
Extend test suite for pipeline load info schema
Browse files Browse the repository at this point in the history
  • Loading branch information
sultaniman committed Mar 5, 2024
1 parent 63695d7 commit 82c7f7b
Showing 1 changed file with 90 additions and 39 deletions.
129 changes: 90 additions & 39 deletions tests/pipeline/test_pipeline_load_info.py
Original file line number Diff line number Diff line change
@@ -1,64 +1,115 @@
import json
import dlt
import faker
import random

from tests.common.utils import json_case_path


data = [
{"id": 1, "name": "Alice"},
{"id": 2, "name": "Bob"},
]

faker.Faker.seed(random.randint(0, 10000))
fakes = faker.Faker("de_DE")


@dlt.source
def users_source():
return dlt.resource([data], name="users_resource")


@dlt.source
def taxi_demand_source():
@dlt.resource(primary_key="city")
def locations():
for idx in range(10):
yield {
"id": idx,
"address": fakes.address(),
"city": fakes.city(),
}

@dlt.resource(primary_key="id")
def demand_map():
for idx in range(10):
yield {
"id": idx,
"city": fakes.city(),
"demand": fakes.random.randint(0, 10000),
}

return [locations, demand_map]


def test_pipeline_load_info_metrics_schema_is_not_chaning() -> None:
schema = dlt.Schema(name="nice_load_info_schema")
pipeline = dlt.pipeline(
pipeline_name="quick_start",
destination="duckdb",
dataset_name="mydata",
)

load_info = pipeline.run(data, table_name="users")
users_load_info = pipeline.run(
users_source(),
table_name="users",
primary_key="id",
)
pipeline.run([users_load_info], table_name="_load_info")

pipeline.run([load_info], table_name="_load_info")
first_version_hash = pipeline.default_schema.version_hash
schema_hashset = set()
pipeline.run([users_load_info], table_name="_load_info", schema=schema)

load_info = pipeline.run(data, table_name="users")
pipeline.run([load_info], table_name="_load_info")
second_version_hash = pipeline.default_schema.version_hash
pipeline.run(
[pipeline.last_trace.last_normalize_info], table_name="_normalize_info", schema=schema
)

assert (
len(
{
first_version_hash,
second_version_hash,
}
)
== 1
pipeline.run([pipeline.last_trace.last_extract_info], table_name="_extract_info", schema=schema)
schema_hashset.add(pipeline.schemas["nice_load_info_schema"].version_hash)

taxi_load_info = pipeline.run(
taxi_demand_source(),
table_name="taxi_demands",
primary_key="id",
write_disposition="replace",
)

event_data = json.load(open(json_case_path("rasa_event_bot_metadata"), "rb"))
load_info = pipeline.run([event_data], table_name="event_data")
pipeline.run([load_info], table_name="_load_info")
third_version_hash = pipeline.default_schema.version_hash

event_data = json.load(open(json_case_path("rasa_event_bot_metadata"), "rb"))
load_info = pipeline.run([event_data], table_name="event_data")
pipeline.run([load_info], table_name="_load_info")
fourth_version_hash = pipeline.default_schema.version_hash

assert (
len(
{
third_version_hash,
fourth_version_hash,
}
)
== 1
pipeline.run([taxi_load_info], table_name="_load_info", schema=schema)
schema_hashset.add(pipeline.schemas["nice_load_info_schema"].version_hash)

pipeline.run(
[pipeline.last_trace.last_normalize_info], table_name="_normalize_info", schema=schema
)
schema_hashset.add(pipeline.schemas["nice_load_info_schema"].version_hash)

pipeline.run([pipeline.last_trace.last_extract_info], table_name="_extract_info", schema=schema)
schema_hashset.add(pipeline.schemas["nice_load_info_schema"].version_hash)

assert len(schema_hashset) == 1

users_load_info = pipeline.run(
users_source(),
table_name="users",
primary_key="id",
)
taxi_load_info = pipeline.run(
taxi_demand_source(),
table_name="taxi_demands",
primary_key="id",
write_disposition="replace",
)

last_trace = pipeline.last_trace
assert (
last_trace.last_extract_info.load_packages[0].schema_hash
== last_trace.last_normalize_info.load_packages[0].schema_hash
== last_trace.last_load_info.load_packages[0].schema_hash
pipeline.run([users_load_info], table_name="_load_info", schema=schema)
schema_hashset.add(pipeline.schemas["nice_load_info_schema"].version_hash)

pipeline.run([taxi_load_info], table_name="_load_info", schema=schema)
schema_hashset.add(pipeline.schemas["nice_load_info_schema"].version_hash)

pipeline.run(
[pipeline.last_trace.last_normalize_info], table_name="_normalize_info", schema=schema
)
schema_hashset.add(pipeline.schemas["nice_load_info_schema"].version_hash)

pipeline.run([pipeline.last_trace.last_extract_info], table_name="_extract_info", schema=schema)
schema_hashset.add(pipeline.schemas["nice_load_info_schema"].version_hash)

assert len(schema_hashset) == 1

0 comments on commit 82c7f7b

Please sign in to comment.