Skip to content

Commit

Permalink
URL encode dataset names to support multibyte characters (#1198)
Browse files Browse the repository at this point in the history
In projects containing models with names like the following, dataset
creation fails, and an error occurs during execution.
```txt
└── dbt
    └── my_project
        └── models
            ├── 日本語名モデル.sql
            └── 日本語名モデル.yml

```

```
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/datasets/__init__.py", line 78, in _sanitize_uri
    raise ValueError("Dataset URI must only consist of ASCII characters")
ValueError: Dataset URI must only consist of ASCII characters
```

To support model names with multibyte characters, it might be good to
URL encode the names.

closes: #1197

Co-authored-by: Tatiana Al-Chueyr <[email protected]>
  • Loading branch information
t0momi219 and tatiana authored Sep 26, 2024
1 parent 56ff6dd commit e0a9fd3
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 2 deletions.
3 changes: 2 additions & 1 deletion cosmos/operators/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import json
import os
import tempfile
import urllib.parse
import warnings
from abc import ABC, abstractmethod
from functools import cached_property
Expand Down Expand Up @@ -449,7 +450,7 @@ def get_datasets(self, source: Literal["inputs", "outputs"]) -> list[Dataset]:
uris = []
for completed in self.openlineage_events_completes:
for output in getattr(completed, source):
dataset_uri = output.namespace + "/" + output.name
dataset_uri = output.namespace + "/" + urllib.parse.quote(output.name)
uris.append(dataset_uri)
self.log.debug("URIs to be converted to Dataset: %s", uris)

Expand Down
2 changes: 2 additions & 0 deletions dev/dags/dbt/simple/models/multibyte.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
select
'TEST_FOR_MULTIBYTE_CHARCTERS'
2 changes: 1 addition & 1 deletion docs/configuration/render-config.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ It does this by exposing a ``cosmos.config.RenderConfig`` class that you can use

The ``RenderConfig`` class takes the following arguments:

- ``emit_datasets``: whether or not to emit Airflow datasets to be used for data-aware scheduling. Defaults to True. Depends on `additional dependencies <lineage.html>`_.
- ``emit_datasets``: whether or not to emit Airflow datasets to be used for data-aware scheduling. Defaults to True. Depends on `additional dependencies <lineage.html>`_. If a model in the project has a name containing multibyte characters, the dataset name will be URL-encoded.
- ``test_behavior``: how to run tests. Defaults to running a model's tests immediately after the model is run. For more information, see the `Testing Behavior <testing-behavior.html>`_ section.
- ``load_method``: how to load your dbt project. See `Parsing Methods <parsing-methods.html>`_ for more information.
- ``select`` and ``exclude``: which models to include or exclude from your DAGs. See `Selecting & Excluding <selecting-excluding.html>`_ for more information.
Expand Down
30 changes: 30 additions & 0 deletions tests/operators/test_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,36 @@ def test_run_operator_dataset_emission_is_skipped(caplog):
assert run_operator.outlets == []


@pytest.mark.skipif(
version.parse(airflow_version) < version.parse("2.4")
or version.parse(airflow_version) in PARTIALLY_SUPPORTED_AIRFLOW_VERSIONS,
reason="Airflow DAG did not have datasets until the 2.4 release, inlets and outlets do not work by default in Airflow 2.9.0 and 2.9.1",
)
@pytest.mark.integration
def test_run_operator_dataset_url_encoded_names(caplog):
from airflow.datasets import Dataset

with DAG("test-id-1", start_date=datetime(2022, 1, 1)) as dag:
run_operator = DbtRunLocalOperator(
profile_config=real_profile_config,
project_dir=Path(__file__).parent.parent.parent / "dev/dags/dbt/simple",
task_id="run",
dbt_cmd_flags=["--models", "multibyte"],
install_deps=True,
append_env=True,
)
run_operator

run_test_dag(dag)

assert run_operator.outlets == [
Dataset(
uri="postgres://0.0.0.0:5432/postgres.public.%EF%BD%8D%EF%BD%95%EF%BD%8C%EF%BD%94%EF%BD%89%EF%BD%82%EF%BD%99%EF%BD%94%EF%BD%85",
extra=None,
)
]


@pytest.mark.integration
def test_run_operator_caches_partial_parsing(caplog, tmp_path):
caplog.set_level(logging.DEBUG)
Expand Down

0 comments on commit e0a9fd3

Please sign in to comment.