Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: Add support for Data Profiling Scan #1392

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20241104-083421.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Support Data Profiling in dbt
time: 2024-11-04T08:34:21.596015+09:00
custom:
Author: syou6162
Issue: "1330"
170 changes: 170 additions & 0 deletions dbt/adapters/bigquery/dataplex.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
from dataclasses import dataclass
import hashlib
from typing import Optional

from dbt.adapters.bigquery import BigQueryConnectionManager
from google.cloud import dataplex_v1
from google.protobuf import field_mask_pb2


@dataclass
class DataProfileScanSetting:
location: str
scan_id: Optional[str]

project_id: str
dataset_id: str
table_id: str

sampling_percent: Optional[float]
row_filter: Optional[str]
cron: Optional[str]

def parent(self):
return f"projects/{self.project_id}/locations/{self.location}"

def data_scan_name(self):
return f"{self.parent()}/dataScans/{self.scan_id}"


class DataProfileScan:
def __init__(self, connections: BigQueryConnectionManager):
self.connections = connections

# If the label `dataplex-dp-published-*` is not assigned, we cannot view the results of the Data Profile Scan from BigQuery
def _update_labels_with_data_profile_scan_labels(
self,
project_id: str,
dataset_id: str,
table_id: str,
location: str,
scan_id: str,
):
table = self.connections.get_bq_table(project_id, dataset_id, table_id)
original_labels = table.labels
profile_scan_labels = {
"dataplex-dp-published-scan": scan_id,
"dataplex-dp-published-project": project_id,
"dataplex-dp-published-location": location,
}
table.labels = {**original_labels, **profile_scan_labels}
self.connections.get_thread_connection().handle.update_table(table, ["labels"])

# scan_id must be unique within the project and no longer than 63 characters,
# so generate an id that meets the constraints
def _generate_unique_scan_id(self, dataset_id: str, table_id: str) -> str:
md5 = hashlib.md5(f"{dataset_id}_{table_id}".encode("utf-8")).hexdigest()
return f"dbt-{table_id.replace('_', '-')}-{md5}"[:63]

def _create_or_update_data_profile_scan(
self,
client: dataplex_v1.DataScanServiceClient,
scan_setting: DataProfileScanSetting,
):
data_profile_spec = dataplex_v1.DataProfileSpec(
sampling_percent=scan_setting.sampling_percent,
row_filter=scan_setting.row_filter,
)
display_name = (
f"Data Profile Scan for {scan_setting.table_id} in {scan_setting.dataset_id}"
)
description = f"This is a Data Profile Scan for {scan_setting.project_id}.{scan_setting.dataset_id}.{scan_setting.table_id}. Created by dbt."
labels = {
"managed_by": "dbt",
}

if scan_setting.cron:
trigger = dataplex_v1.Trigger(
schedule=dataplex_v1.Trigger.Schedule(cron=scan_setting.cron)
)
else:
trigger = dataplex_v1.Trigger(on_demand=dataplex_v1.Trigger.OnDemand())
execution_spec = dataplex_v1.DataScan.ExecutionSpec(trigger=trigger)

if all(
scan.name != scan_setting.data_scan_name()
for scan in client.list_data_scans(parent=scan_setting.parent())
):
data_scan = dataplex_v1.DataScan(
data=dataplex_v1.DataSource(
resource=f"//bigquery.googleapis.com/projects/{scan_setting.project_id}/datasets/{scan_setting.dataset_id}/tables/{scan_setting.table_id}"
),
data_profile_spec=data_profile_spec,
execution_spec=execution_spec,
display_name=display_name,
description=description,
labels=labels,
)
request = dataplex_v1.CreateDataScanRequest(
parent=scan_setting.parent(),
data_scan_id=scan_setting.scan_id,
data_scan=data_scan,
)
client.create_data_scan(request=request).result()
else:
request = dataplex_v1.GetDataScanRequest(
name=scan_setting.data_scan_name(),
)
data_scan = client.get_data_scan(request=request)

data_scan.data_profile_spec = data_profile_spec
data_scan.execution_spec = execution_spec
data_scan.display_name = display_name
data_scan.description = description
data_scan.labels = labels

update_mask = field_mask_pb2.FieldMask(
paths=[
"data_profile_spec",
"execution_spec",
"display_name",
"description",
"labels",
]
)
request = dataplex_v1.UpdateDataScanRequest(
data_scan=data_scan,
update_mask=update_mask,
)
client.update_data_scan(request=request).result()

def create_or_update_data_profile_scan(self, config):
project_id = config.get("database")
dataset_id = config.get("schema")
table_id = config.get("name")

data_profile_config = config.get("config").get("data_profile_scan", {})

# Skip if data_profile_scan is not configured
if not data_profile_config:
return None

client = dataplex_v1.DataScanServiceClient()
scan_setting = DataProfileScanSetting(
location=data_profile_config["location"],
scan_id=data_profile_config.get(
"scan_id", self._generate_unique_scan_id(dataset_id, table_id)
),
project_id=project_id,
dataset_id=dataset_id,
table_id=table_id,
sampling_percent=data_profile_config.get("sampling_percent", None),
row_filter=data_profile_config.get("row_filter", None),
cron=data_profile_config.get("cron", None),
)

# Delete existing data profile scan if it is disabled
if not data_profile_config.get("enabled", True):
client.delete_data_scan(name=scan_setting.data_scan_name())
return None

self._create_or_update_data_profile_scan(client, scan_setting)

if not scan_setting.cron:
client.run_data_scan(
request=dataplex_v1.RunDataScanRequest(name=scan_setting.data_scan_name())
)

self._update_labels_with_data_profile_scan_labels(
project_id, dataset_id, table_id, scan_setting.location, scan_setting.scan_id
)
5 changes: 5 additions & 0 deletions dbt/adapters/bigquery/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@

from dbt.adapters.bigquery.column import BigQueryColumn, get_nested_column_data_types
from dbt.adapters.bigquery.connections import BigQueryAdapterResponse, BigQueryConnectionManager
from dbt.adapters.bigquery.dataplex import DataProfileScan
from dbt.adapters.bigquery.dataset import add_access_entry_to_dataset, is_access_entry_in_dataset
from dbt.adapters.bigquery.python_submissions import (
ClusterDataprocHelper,
Expand Down Expand Up @@ -969,3 +970,7 @@ def validate_sql(self, sql: str) -> AdapterResponse:
:param str sql: The sql to validate
"""
return self.connections.dry_run(sql)

@available
def create_or_update_data_profile_scan(self, config):
DataProfileScan(self.connections).create_or_update_data_profile_scan(config)
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@
{% do apply_grants(target_relation, grant_config, should_revoke) %}

{% do persist_docs(target_relation, model) %}
{% do adapter.create_or_update_data_profile_scan(model) %}

{%- if tmp_relation_exists -%}
{{ adapter.drop_relation(tmp_relation) }}
Expand Down
1 change: 1 addition & 0 deletions dbt/include/bigquery/macros/materializations/table.sql
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
{% do apply_grants(target_relation, grant_config, should_revoke) %}

{% do persist_docs(target_relation, model) %}
{% do adapter.create_or_update_data_profile_scan(model) %}

{{ return({'relations': [target_relation]}) }}

Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ def _dbt_bigquery_version() -> str:
"google-cloud-bigquery[pandas]>=3.0,<4.0",
"google-cloud-storage~=2.4",
"google-cloud-dataproc~=5.0",
"google-cloud-dataplex~=2.3",
# ----
# Expect compatibility with all new versions of these packages, so lower bounds only.
"google-api-core>=2.11.0",
Expand Down
Loading