Skip to content

Commit a993db2

Browse files
authored
feat: Add helper for OpenLineage version check (#47897)
1 parent f01f8ea commit a993db2

File tree

2 files changed

+386
-0
lines changed
  • providers/common/compat

2 files changed

+386
-0
lines changed
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
from __future__ import annotations
19+
20+
import functools
21+
import logging
22+
from importlib import metadata
23+
24+
from packaging.version import Version
25+
26+
log = logging.getLogger(__name__)
27+
28+
29+
def require_openlineage_version(
30+
provider_min_version: str | None = None, client_min_version: str | None = None
31+
):
32+
"""
33+
Enforce minimum version requirements for OpenLineage provider or client.
34+
35+
Some providers, such as Snowflake and DBT Cloud, do not require an OpenLineage provider but may
36+
offer optional features that depend on it. These features are generally available starting
37+
from a specific version of the OpenLineage provider or client. This decorator helps ensure compatibility,
38+
preventing import errors and providing clear logs about version requirements.
39+
40+
Args:
41+
provider_min_version: Optional minimum version requirement for apache-airflow-providers-openlineage
42+
client_min_version: Optional minimum version requirement for openlineage-python
43+
44+
Raises:
45+
ValueError: If neither `provider_min_version` nor `client_min_version` is provided.
46+
TypeError: If the decorator is used without parentheses (e.g., `@require_openlineage_version`).
47+
"""
48+
err_msg = (
49+
"`require_openlineage_version` decorator must be used with at least one argument: "
50+
"'provider_min_version' or 'client_min_version', "
51+
'e.g., @require_openlineage_version(provider_min_version="1.0.0")'
52+
)
53+
# Detect if decorator is mistakenly used without arguments
54+
if callable(provider_min_version) and client_min_version is None:
55+
raise TypeError(err_msg)
56+
57+
# Ensure at least one argument is provided
58+
if provider_min_version is None and client_min_version is None:
59+
raise ValueError(err_msg)
60+
61+
def decorator(func):
62+
@functools.wraps(func)
63+
def wrapper(*args, **kwargs):
64+
if provider_min_version:
65+
try:
66+
provider_version: str = metadata.version("apache-airflow-providers-openlineage")
67+
except metadata.PackageNotFoundError:
68+
try:
69+
from airflow.providers.openlineage import __version__ as provider_version
70+
except (ImportError, AttributeError, ModuleNotFoundError):
71+
log.info(
72+
"OpenLineage provider not found or has no version, skipping function `%s` execution",
73+
func.__name__,
74+
)
75+
return None
76+
77+
if provider_version and Version(provider_version) < Version(provider_min_version):
78+
log.info(
79+
"OpenLineage provider version `%s` is lower than required `%s`, skipping function `%s` execution",
80+
provider_version,
81+
provider_min_version,
82+
func.__name__,
83+
)
84+
return None
85+
86+
if client_min_version:
87+
try:
88+
client_version: str = metadata.version("openlineage-python")
89+
except metadata.PackageNotFoundError:
90+
log.info("OpenLineage client not found, skipping function `%s` execution", func.__name__)
91+
return None
92+
93+
if client_version and Version(client_version) < Version(client_min_version):
94+
log.info(
95+
"OpenLineage client version `%s` is lower than required `%s`, skipping function `%s` execution",
96+
client_version,
97+
client_min_version,
98+
func.__name__,
99+
)
100+
return None
101+
102+
return func(*args, **kwargs)
103+
104+
return wrapper
105+
106+
return decorator
Lines changed: 280 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,280 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
from __future__ import annotations
19+
20+
import logging
21+
import sys
22+
import types
23+
from importlib import metadata
24+
from unittest.mock import patch
25+
26+
import pytest
27+
28+
from airflow.providers.common.compat.openlineage.check import require_openlineage_version
29+
30+
31+
def _mock_version(package):
32+
if package == "apache-airflow-providers-openlineage":
33+
return "1.0.0"
34+
if package == "openlineage-python":
35+
return "1.0.0"
36+
raise Exception("Unexpected package")
37+
38+
39+
def test_decorator_without_arguments():
40+
with pytest.raises(TypeError) as excinfo:
41+
42+
@require_openlineage_version # used without parentheses
43+
def dummy():
44+
return "result"
45+
46+
expected_error = (
47+
"`require_openlineage_version` decorator must be used with at least one argument: "
48+
"'provider_min_version' or 'client_min_version', "
49+
'e.g., @require_openlineage_version(provider_min_version="1.0.0")'
50+
)
51+
assert str(excinfo.value) == expected_error
52+
53+
54+
def test_decorator_without_arguments_with_parentheses():
55+
with pytest.raises(ValueError) as excinfo:
56+
57+
@require_openlineage_version()
58+
def dummy():
59+
return "result"
60+
61+
expected_error = (
62+
"`require_openlineage_version` decorator must be used with at least one argument: "
63+
"'provider_min_version' or 'client_min_version', "
64+
'e.g., @require_openlineage_version(provider_min_version="1.0.0")'
65+
)
66+
assert str(excinfo.value) == expected_error
67+
68+
69+
def test_no_arguments_provided():
70+
with pytest.raises(ValueError) as excinfo:
71+
require_openlineage_version()
72+
expected_error = (
73+
"`require_openlineage_version` decorator must be used with at least one argument: "
74+
"'provider_min_version' or 'client_min_version', "
75+
'e.g., @require_openlineage_version(provider_min_version="1.0.0")'
76+
)
77+
assert str(excinfo.value) == expected_error
78+
79+
80+
@pytest.mark.parametrize("provider_min_version", ("1.0.0", "0.9", "0", "0.9.9", "1.0.0.dev0", "1.0.0rc1"))
81+
@patch("importlib.metadata.version", side_effect=_mock_version)
82+
def test_provider_version_sufficient(mock_version, caplog, provider_min_version):
83+
@require_openlineage_version(provider_min_version=provider_min_version)
84+
def dummy():
85+
return "result"
86+
87+
caplog.set_level(logging.INFO)
88+
result = dummy()
89+
assert result == "result"
90+
# No log messages about skipping should be emitted.
91+
assert "skipping function" not in caplog.text
92+
93+
94+
@pytest.mark.parametrize("provider_min_version", ("1.1.0", "1.0.1.dev0", "1.0.1rc1", "2", "1.1"))
95+
@patch("importlib.metadata.version", side_effect=_mock_version)
96+
def test_provider_version_insufficient(mock_version, caplog, provider_min_version):
97+
@require_openlineage_version(provider_min_version=provider_min_version)
98+
def dummy():
99+
return "result"
100+
101+
caplog.set_level(logging.INFO)
102+
result = dummy()
103+
assert result is None
104+
105+
expected_log = (
106+
f"OpenLineage provider version `1.0.0` is lower than required `{provider_min_version}`, "
107+
"skipping function `dummy` execution"
108+
)
109+
assert expected_log in caplog.text
110+
111+
112+
def test_provider_not_found(caplog):
113+
def fake_version(package):
114+
if package == "apache-airflow-providers-openlineage":
115+
raise metadata.PackageNotFoundError
116+
raise Exception("Unexpected package")
117+
118+
with patch("importlib.metadata.version", side_effect=fake_version):
119+
# Simulate that the fallback import returns a module without __version__
120+
dummy_module = types.ModuleType("airflow.providers.openlineage")
121+
with patch.dict(sys.modules, {"airflow.providers.openlineage": dummy_module}):
122+
123+
@require_openlineage_version(provider_min_version="1.0.0")
124+
def dummy():
125+
return "result"
126+
127+
caplog.set_level(logging.INFO)
128+
result = dummy()
129+
assert result is None
130+
131+
expected_log = (
132+
"OpenLineage provider not found or has no version, skipping function `dummy` execution"
133+
)
134+
assert expected_log in caplog.text
135+
136+
137+
def test_provider_fallback_import(caplog):
138+
def fake_version(package):
139+
if package == "apache-airflow-providers-openlineage":
140+
raise metadata.PackageNotFoundError
141+
raise Exception("Unexpected package")
142+
143+
with patch("importlib.metadata.version", side_effect=fake_version):
144+
# Simulate a module with a sufficient __version__
145+
dummy_module = types.ModuleType("airflow.providers.openlineage")
146+
dummy_module.__version__ = "1.2.0"
147+
with patch.dict(sys.modules, {"airflow.providers.openlineage": dummy_module}):
148+
149+
@require_openlineage_version(provider_min_version="1.0.0")
150+
def dummy():
151+
return "result"
152+
153+
caplog.set_level(logging.INFO)
154+
result = dummy()
155+
assert result == "result"
156+
assert "skipping function" not in caplog.text
157+
158+
159+
@pytest.mark.parametrize("client_min_version", ("1.0.0", "0.9", "0", "0.9.9", "1.0.0.dev0", "1.0.0rc1"))
160+
@patch("importlib.metadata.version", side_effect=_mock_version)
161+
def test_client_version_sufficient(mock_version, caplog, client_min_version):
162+
@require_openlineage_version(client_min_version=client_min_version)
163+
def dummy():
164+
return "result"
165+
166+
caplog.set_level(logging.INFO)
167+
result = dummy()
168+
assert result == "result"
169+
# No log messages about skipping should be emitted.
170+
assert "skipping function" not in caplog.text
171+
172+
173+
@pytest.mark.parametrize("client_min_version", ("1.1.0", "1.0.1.dev0", "1.0.1rc1", "2", "1.1"))
174+
@patch("importlib.metadata.version", side_effect=_mock_version)
175+
def test_client_version_insufficient(mock_version, caplog, client_min_version):
176+
@require_openlineage_version(client_min_version=client_min_version)
177+
def dummy():
178+
return "result"
179+
180+
caplog.set_level(logging.INFO)
181+
result = dummy()
182+
assert result is None
183+
184+
expected_log = (
185+
f"OpenLineage client version `1.0.0` is lower than required `{client_min_version}`, "
186+
"skipping function `dummy` execution"
187+
)
188+
assert expected_log in caplog.text
189+
190+
191+
def test_client_version_not_found(caplog):
192+
def fake_version(package):
193+
if package == "openlineage-python":
194+
raise metadata.PackageNotFoundError
195+
raise Exception("Unexpected package")
196+
197+
with patch("importlib.metadata.version", side_effect=fake_version):
198+
199+
@require_openlineage_version(client_min_version="1.0.0")
200+
def dummy():
201+
return "result"
202+
203+
caplog.set_level(logging.INFO)
204+
result = dummy()
205+
assert result is None
206+
expected_log = "OpenLineage client not found, skipping function `dummy` execution"
207+
assert expected_log in caplog.text
208+
209+
210+
@pytest.mark.parametrize("client_min_version", ("1.1.0", "1.0.1.dev0", "1.0.1rc1", "2", "1.1"))
211+
@patch("importlib.metadata.version", side_effect=_mock_version)
212+
def test_client_version_insufficient_when_both_passed(mock_version, caplog, client_min_version):
213+
@require_openlineage_version(provider_min_version="1.0.0", client_min_version=client_min_version)
214+
def dummy():
215+
return "result"
216+
217+
caplog.set_level(logging.INFO)
218+
result = dummy()
219+
assert result is None
220+
221+
expected_log = (
222+
f"OpenLineage client version `1.0.0` is lower than required `{client_min_version}`, "
223+
"skipping function `dummy` execution"
224+
)
225+
assert expected_log in caplog.text
226+
227+
228+
@pytest.mark.parametrize("provider_min_version", ("1.1.0", "1.0.1.dev0", "1.0.1rc1", "2", "1.1"))
229+
@patch("importlib.metadata.version", side_effect=_mock_version)
230+
def test_provider_version_insufficient_when_both_passed(mock_version, caplog, provider_min_version):
231+
@require_openlineage_version(provider_min_version=provider_min_version, client_min_version="1.0.0")
232+
def dummy():
233+
return "result"
234+
235+
caplog.set_level(logging.INFO)
236+
result = dummy()
237+
assert result is None
238+
239+
expected_log = (
240+
f"OpenLineage provider version `1.0.0` is lower than required `{provider_min_version}`, "
241+
"skipping function `dummy` execution"
242+
)
243+
assert expected_log in caplog.text
244+
245+
246+
@pytest.mark.parametrize("client_min_version", ("1.0.0", "0.9", "0", "0.9.9", "1.0.0.dev0", "1.0.0rc1"))
247+
@pytest.mark.parametrize("provider_min_version", ("1.0.0", "0.9", "0", "0.9.9", "1.0.0.dev0", "1.0.0rc1"))
248+
@patch("importlib.metadata.version", side_effect=_mock_version)
249+
def test_both_versions_sufficient(mock_version, caplog, provider_min_version, client_min_version):
250+
@require_openlineage_version(
251+
provider_min_version=provider_min_version, client_min_version=client_min_version
252+
)
253+
def dummy():
254+
return "result"
255+
256+
caplog.set_level(logging.INFO)
257+
result = dummy()
258+
assert result == "result"
259+
assert "skipping function" not in caplog.text
260+
261+
262+
@pytest.mark.parametrize("client_min_version", ("1.1.0", "1.0.1.dev0", "1.0.1rc1", "2", "1.1"))
263+
@pytest.mark.parametrize("provider_min_version", ("1.1.0", "1.0.1.dev0", "1.0.1rc1", "2", "1.1"))
264+
@patch("importlib.metadata.version", side_effect=_mock_version)
265+
def test_both_versions_insufficient(mock_version, caplog, provider_min_version, client_min_version):
266+
@require_openlineage_version(
267+
provider_min_version=provider_min_version, client_min_version=client_min_version
268+
)
269+
def dummy():
270+
return "result"
271+
272+
caplog.set_level(logging.INFO)
273+
result = dummy()
274+
assert result is None
275+
276+
expected_log = (
277+
f"OpenLineage provider version `1.0.0` is lower than required `{provider_min_version}`, "
278+
"skipping function `dummy` execution"
279+
)
280+
assert expected_log in caplog.text

0 commit comments

Comments
 (0)