Skip to content

Commit ccba19f

Browse files
committed
Merge remote-tracking branch 'origin/main'
2 parents a6e9e66 + b3546b3 commit ccba19f

File tree

5 files changed

+333
-3
lines changed

5 files changed

+333
-3
lines changed

doc/changes/unreleased.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,5 @@
11
# Unreleased
2+
3+
## Features
4+
5+
* #42: Optionally wait until SLC is deployed to all nodes in the database cluster
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
import re
2+
import exasol.bucketfs as bfs # type: ignore
3+
import pyexasol # type: ignore
4+
5+
from datetime import datetime, timedelta
6+
from typing import Callable, List
7+
from tenacity import Retrying
8+
from tenacity.wait import wait_fixed
9+
from tenacity.stop import stop_after_delay
10+
11+
from exasol.python_extension_common.deployment.language_container_validator import (
12+
temp_schema
13+
)
14+
15+
MANIFEST_FILE = "exasol-manifest.json"
16+
17+
18+
def _udf_name(schema: str | None, name: str) -> str:
19+
timestamp = f'{datetime.now().timestamp():.0f}'
20+
suffix = f'"{name}_manifest_{timestamp}"'
21+
return f'"{schema}".{suffix}' if schema else suffix
22+
23+
24+
class ExtractException(Exception):
25+
"""
26+
Expected file MANIFEST_FILE could not detected on all nodes of the
27+
database cluster.
28+
"""
29+
30+
31+
class ExtractValidator:
32+
"""
33+
This validates that a given archive (e.g. tgz) has been extracted on
34+
all nodes of an Exasol database cluster by checking if MANIFEST_FILE
35+
exists.
36+
37+
The specified timeout applies to the max. total duration of both phases:
38+
P1) creating the UDF script and P2) checking if the UDF in SLC can be
39+
executed and finds extracted MANIFEST_FILE on each node.
40+
41+
If a callback is specified then this function will be called multiple
42+
times during detecting the MANIFEST_FILE on the nodes.
43+
44+
The callback is called with two arguments: the total number of nodes in
45+
the database cluster as returned by nproc() and a list of the IDs of the
46+
pending nodes on which the MANIFEST_FILE could not be found, yet.
47+
"""
48+
def __init__(self,
49+
pyexasol_connection: pyexasol.ExaConnection,
50+
timeout: timedelta,
51+
interval: timedelta = timedelta(seconds=10),
52+
callback: Callable[[int, List[int]], None] | None = None,
53+
) -> None:
54+
self._pyexasol_conn = pyexasol_connection
55+
self._timeout = timeout
56+
self._interval = interval
57+
self._callback = callback if callback else lambda x, y: None
58+
59+
def _create_manifest_udf_with_retry(self, language_alias: str, udf_name: str):
60+
for attempt in Retrying(
61+
wait=wait_fixed(self._interval),
62+
stop=stop_after_delay(self._timeout),
63+
reraise=True):
64+
with attempt:
65+
self._create_manifest_udf(language_alias, udf_name)
66+
67+
def _create_manifest_udf(self, language_alias: str, udf_name: str):
68+
"""
69+
The SQL statements "ALTER SESSION SET SCRIPT_LANGUAGES" and "ALTER
70+
SYSTEM SET SCRIPT_LANGUAGES" doe not check whether the specified
71+
BucketFS path exists and has permissions allowing it to be accessed by
72+
UDFs.
73+
74+
Much more a later statement "CREATE SCRIPT" will fail with an error
75+
message. Hence we need to use a retry here, as well.
76+
"""
77+
self._pyexasol_conn.execute(
78+
f"""
79+
CREATE OR REPLACE {language_alias} SET SCRIPT
80+
{udf_name}(my_path VARCHAR(256))
81+
EMITS (node INTEGER, manifest BOOL) AS
82+
import os
83+
def run(ctx):
84+
ctx.emit(exa.meta.node_id, os.path.isfile(ctx.my_path))
85+
/
86+
"""
87+
)
88+
89+
def _check_all_nodes_with_retry(self, udf_name: str, nproc: int, manifest: str, timeout: timedelta):
90+
for attempt in Retrying(
91+
wait=wait_fixed(self._interval),
92+
stop=stop_after_delay(timeout),
93+
reraise=True):
94+
with attempt:
95+
self._check_all_nodes(udf_name, nproc, manifest)
96+
97+
def _check_all_nodes(self, udf_name: str, nproc: int, manifest: str):
98+
result = self._pyexasol_conn.execute(
99+
f"""
100+
SELECT {udf_name}({manifest})
101+
FROM VALUES BETWEEN 1 AND {nproc} t(i) GROUP BY i
102+
"""
103+
).fetchall()
104+
pending = list( x[0] for x in result if not x[1] )
105+
self._callback(nproc, pending)
106+
if len(pending) > 0:
107+
raise ExtractException(
108+
f"{len(pending)} of {nproc} nodes are still pending."
109+
f" IDs: {pending}")
110+
111+
def verify_all_nodes(self, schema: str, language_alias: str, bfs_archive_path: bfs.path.PathLike):
112+
"""
113+
Verify if the given bfs_archive_path was extracted on all nodes
114+
successfully.
115+
116+
Raise an ExtractException if after the configured timeout there are
117+
still nodes pending, for which the extraction could not be verified,
118+
yet.
119+
"""
120+
manifest = f"{bfs_archive_path.as_udf_path()}/{MANIFEST_FILE}"
121+
nproc = self._pyexasol_conn.execute("SELECT nproc()").fetchone()
122+
udf_name = _udf_name(schema, language_alias)
123+
start = datetime.now()
124+
try:
125+
self._create_manifest_udf_with_retry(language_alias, udf_name)
126+
elapsed = datetime.now() - start
127+
remaining = self._timeout - elapsed
128+
self._check_all_nodes_with_retry(udf_name, nproc, manifest, remaining)
129+
finally:
130+
self._pyexasol_conn.execute(f"DROP SCRIPT IF EXISTS {udf_name}")

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[tool.poetry]
22
name = "exasol-python-extension-common"
3-
version = "0.3.1"
3+
version = "0.4.0"
44
description = "A collection of common utilities for Exasol extensions."
55
packages = [ {include = "exasol"}, ]
66
authors = ["Mikhail Beck <[email protected]>"]
Lines changed: 196 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,196 @@
1+
import contextlib
2+
import logging
3+
import pytest
4+
import re
5+
import exasol.bucketfs as bfs # type: ignore
6+
from pyexasol import ExaConnection
7+
8+
from typing import Any, Dict, List
9+
from unittest.mock import Mock, call, patch
10+
from datetime import timedelta
11+
12+
from exasol.python_extension_common.deployment.extract_validator import (
13+
ExtractValidator,
14+
ExtractException,
15+
_udf_name,
16+
)
17+
from tenacity import RetryError
18+
19+
LOG = logging.getLogger(__name__)
20+
21+
22+
def bucket_path(path: str):
23+
bucket_api = bfs.MountedBucket("svc", "bkt")
24+
return bfs.path.BucketPath(path, bucket_api=bucket_api)
25+
26+
27+
@pytest.fixture
28+
def archive_bucket_path():
29+
return bucket_path("/folder/a.tgz")
30+
31+
32+
class ConnectionMock:
33+
def __init__(self, spec: Dict[str, Any]):
34+
self.spec = spec
35+
self.values = iter(())
36+
37+
def _get_values(self, first_line: str):
38+
for regex, values in self.spec.items():
39+
if re.match(regex, first_line, re.IGNORECASE):
40+
return values() if callable(values) else values
41+
LOG.warning(f"ConnectionMock.execute() called with '{first_line[:40]}...'")
42+
return ()
43+
44+
def execute(self, *args, **kwargs):
45+
statement = args[0] if len(args) else kwargs["query"]
46+
first_line = statement.strip().splitlines()[0]
47+
self.values = iter(self._get_values(first_line))
48+
return self
49+
50+
def fetchone(self):
51+
return next(self.values)
52+
53+
def fetchall(self):
54+
return [ v for v in self.values ]
55+
56+
57+
class Simulator:
58+
def __init__(self, nodes: int, udf_results: List[List[any]],
59+
create_script=()):
60+
self.create_script = create_script
61+
self.nodes = nodes
62+
self.udf = Mock(side_effect=udf_results)
63+
self.callback = Mock(side_effect = self._callback)
64+
65+
def _callback(self, n, pending):
66+
LOG.debug(f"{len(pending)} of {n} nodes pending: {pending}")
67+
68+
@property
69+
def testee(self):
70+
connection = ConnectionMock({
71+
r"CREATE .* SCRIPT": self.create_script,
72+
r"(CREATE|DROP) ": (),
73+
r"SELECT nproc\(\)": [ self.nodes ],
74+
r'SELECT .*_manifest_': self.udf,
75+
})
76+
return ExtractValidator(
77+
pyexasol_connection=Mock(execute=connection.execute),
78+
timeout=timedelta(seconds=10),
79+
interval=timedelta(seconds=1),
80+
callback=self.callback,
81+
)
82+
83+
84+
@contextlib.contextmanager
85+
def mock_tenacity_wait(*wait_lists: List[int|float], max: int = 1000):
86+
"""
87+
This context mocks internals of library ``tenacity`` in order to
88+
simulate waiting for timeouts in ``tenacity.Retrying()``. All specified
89+
durations are interpreted as number of seconds which can be floats.
90+
91+
A test case may provide multiple lists of waiting periods to cover
92+
multiple consecutive retry phases in the class under test, see
93+
``ExtractValidator`` for example.
94+
95+
mock_tenacity_wait([1, 2], [3, 4], max=100)
96+
97+
After all wait lists are exhausted, i.e. the mock simulated waiting for
98+
the specified periods, the mock will constantly simulate
99+
``time.monotonic()`` to return the specified max time, typically making
100+
tenacity detect a timeout.
101+
102+
Internally the mock needs to prefix each list of waiting periods with two
103+
additional entries [0, 0] which are used by ``tenacity.Retrying()`` to
104+
inititialize its start times in ``BaseRetrying.begin()`` and
105+
``RetryCallState.__init__()``, see
106+
https://github.com/jd/tenacity/blob/main/tenacity/__init__.py.
107+
"""
108+
def expand(wait_lists):
109+
for waits in wait_lists:
110+
yield from [ 0, 0 ] + waits
111+
112+
durations = expand(wait_lists)
113+
def mock():
114+
try:
115+
return next(durations)
116+
except StopIteration:
117+
return max
118+
119+
with patch("tenacity.time.sleep"):
120+
with patch("tenacity.time.monotonic", side_effect=mock):
121+
yield
122+
123+
124+
@pytest.mark.parametrize(
125+
"schema, expected",
126+
[
127+
(None, r'"alias_manifest_[0-9]+"'),
128+
("schema", r'"schema"\."alias_manifest_[0-9]+"'),
129+
])
130+
def test_udf_name(schema, expected):
131+
assert re.match(expected, _udf_name(schema, "alias"))
132+
133+
134+
def test_create_script_failure(archive_bucket_path):
135+
create_script = Mock(side_effect=Exception("failed to create UDF script"))
136+
sim = Simulator(nodes=4, udf_results=[], create_script=create_script)
137+
with pytest.raises(Exception, match="failed to create UDF script") as ex:
138+
with mock_tenacity_wait([1]):
139+
sim.testee.verify_all_nodes("alias", "schema", archive_bucket_path)
140+
141+
142+
def test_failure(archive_bucket_path):
143+
sim = Simulator(
144+
nodes=4,
145+
udf_results=[
146+
[[1, False]],
147+
[[1, False]],
148+
[[1, False]],
149+
])
150+
with pytest.raises(ExtractException) as ex:
151+
with mock_tenacity_wait([1], [2, 4]):
152+
sim.testee.verify_all_nodes("alias", "schema", archive_bucket_path)
153+
assert "1 of 4 nodes are still pending. IDs: [1]" == str(ex.value)
154+
155+
156+
def test_success(archive_bucket_path):
157+
sim = Simulator(
158+
nodes=4,
159+
udf_results=[
160+
[[1, False], [2, False]],
161+
[[1, True ], [2, False]],
162+
[[1, True ], [2, True ]],
163+
])
164+
with mock_tenacity_wait([1], [2, 4]):
165+
sim.testee.verify_all_nodes("alias", "schema", archive_bucket_path)
166+
assert sim.callback.call_args_list == [
167+
call(4, [1, 2]),
168+
call(4, [2]),
169+
call(4, []),
170+
]
171+
172+
173+
def test_reduced_timeout(archive_bucket_path):
174+
"""
175+
This test simulates a retry being required for creating the UDF
176+
script, hence already eating up part of the total timeout.
177+
178+
The test then verifies the remaining part of the total timeout for actual
179+
calls to the UDF being too short for successfully detecting the manifest
180+
on all nodes.
181+
"""
182+
create_script = Mock(side_effect=[Exception("failure"), ()])
183+
udf_results=[
184+
[[1, False], [2, False]],
185+
[[1, True ], [2, False]],
186+
[[1, True ], [2, True ]],
187+
]
188+
sim = Simulator(
189+
nodes=4,
190+
udf_results=udf_results,
191+
create_script=create_script,
192+
)
193+
with pytest.raises(ExtractException) as ex:
194+
with mock_tenacity_wait([1], [2, 4]):
195+
sim.testee.verify_all_nodes("alias", "schema", archive_bucket_path)
196+
assert "1 of 4 nodes are still pending. IDs: [2]" == str(ex.value)

version.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,6 @@
55
# Do not edit this file manually!
66
# If you need to change the version, do so in the project.toml, e.g. by using `poetry version X.Y.Z`.
77
MAJOR = 0
8-
MINOR = 3
9-
PATCH = 1
8+
MINOR = 4
9+
PATCH = 0
1010
VERSION = f"{MAJOR}.{MINOR}.{PATCH}"

0 commit comments

Comments
 (0)