Skip to content

Commit

Permalink
Add tests for data profile scan
Browse files Browse the repository at this point in the history
  • Loading branch information
syou6162 committed Nov 3, 2024
1 parent 7f80a97 commit 38f1e8c
Showing 1 changed file with 155 additions and 0 deletions.
155 changes: 155 additions & 0 deletions tests/functional/test_data_profile_scan.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
import json
import pytest
from unittest.mock import patch
from dbt.adapters.bigquery.relation import BigQueryRelation
from dbt.tests.util import run_dbt, get_connection, relation_from_name

SCAN_LOCATION = "us-central1"
SCAN_ID = "bigquery_data_profile_scan_test"
MODEL_NAME = "test_model"

ORIGINAL_LABELS = {
"my_label_key": "my_label_value",
}

SQL_CONTENT = """
{{
config(
materialized="table"
)
}}
select 20 as id, cast('2020-01-01 01:00:00' as datetime) as date_hour union all
select 40 as id, cast('2020-01-01 02:00:00' as datetime) as date_hour
"""

YAML_CONTENT = f"""version: 2
models:
- name: {MODEL_NAME}
"""


class TestDataProfileScan:
@pytest.fixture(scope="class")
def project_config_update(self):
return {
"models": {
"+labels": ORIGINAL_LABELS,
"+data_profile_scan": {
"location": SCAN_LOCATION,
"scan_id": SCAN_ID,
"sampling_percent": 10,
# "enabled": True
},
},
}

@pytest.fixture(scope="class")
def models(self):
return {
f"{MODEL_NAME}.sql": SQL_CONTENT,
f"{MODEL_NAME}.yml": YAML_CONTENT,
}

def test_create_data_profile_scan(self, project):
with patch(
"dbt.adapters.bigquery.impl.dataplex_v1.DataScanServiceClient"
) as MockDataScanClient:
mock_data_scan_client = MockDataScanClient.return_value

results = run_dbt()
assert len(results) == 1

mock_data_scan_client.create_data_scan.assert_called_once()
mock_data_scan_client.run_data_scan.assert_called_once()

relation: BigQueryRelation = relation_from_name(project.adapter, MODEL_NAME)
adapter = project.adapter
with get_connection(project.adapter) as conn:
table = conn.handle.get_table(
adapter.connections.get_bq_table(
relation.database, relation.schema, relation.table
)
)
profile_scan_labels = [
"dataplex-dp-published-scan",
"dataplex-dp-published-project",
"dataplex-dp-published-location",
]
labels_to_be_created = profile_scan_labels + list(ORIGINAL_LABELS.keys())
assert set(table.labels.keys()) == set(labels_to_be_created)


class TestDataProfileScanWithoutProfileScanSetting:
@pytest.fixture(scope="class")
def models(self):
return {
f"{MODEL_NAME}.sql": SQL_CONTENT,
f"{MODEL_NAME}.yml": YAML_CONTENT,
}

def test_create_data_profile_scan(self, project):
with patch(
"dbt.adapters.bigquery.impl.dataplex_v1.DataScanServiceClient"
) as MockDataScanClient:
mock_data_scan_client = MockDataScanClient.return_value

results = run_dbt()
assert len(results) == 1

mock_data_scan_client.create_data_scan.assert_not_called()
mock_data_scan_client.run_data_scan.assert_not_called()

relation: BigQueryRelation = relation_from_name(project.adapter, MODEL_NAME)
adapter = project.adapter
with get_connection(project.adapter) as conn:
table = conn.handle.get_table(
adapter.connections.get_bq_table(
relation.database, relation.schema, relation.table
)
)
labels_to_be_created = []
assert set(table.labels.keys()) == set(labels_to_be_created)


class TestDataProfileScanDisabledProfileScanSetting:
@pytest.fixture(scope="class")
def project_config_update(self):
return {
"models": {
"+data_profile_scan": {
"location": SCAN_LOCATION,
"scan_id": SCAN_ID,
"enabled": False,
},
},
}

@pytest.fixture(scope="class")
def models(self):
return {
f"{MODEL_NAME}.sql": SQL_CONTENT,
f"{MODEL_NAME}.yml": YAML_CONTENT,
}

def test_create_data_profile_scan(self, project):
with patch(
"dbt.adapters.bigquery.impl.dataplex_v1.DataScanServiceClient"
) as MockDataScanClient:
mock_data_scan_client = MockDataScanClient.return_value

results = run_dbt()
assert len(results) == 1

mock_data_scan_client.create_data_scan.assert_not_called()
mock_data_scan_client.run_data_scan.assert_not_called()

relation: BigQueryRelation = relation_from_name(project.adapter, MODEL_NAME)
adapter = project.adapter
with get_connection(project.adapter) as conn:
table = conn.handle.get_table(
adapter.connections.get_bq_table(
relation.database, relation.schema, relation.table
)
)
labels_to_be_created = []
assert set(table.labels.keys()) == set(labels_to_be_created)

0 comments on commit 38f1e8c

Please sign in to comment.