Skip to content

Commit c6c27ea

Browse files
ckunkitkilias
andauthored
#42 2 integrate extract validator into language container deployer (#46)
* Integrated ExtractValidator into LanguageContainerDeployer * Added file language_container_validator.py to provide backwards compatibility * Fix until manifest is deployed with standard SLCs see #48 * Fixed SQL syntax error * Fixed integration test * Fixed indention for UDF python script Co-authored-by: Torsten Kilias <[email protected]>
1 parent 1f5b920 commit c6c27ea

File tree

8 files changed

+124
-218
lines changed

8 files changed

+124
-218
lines changed

exasol/python_extension_common/connections/pyexasol_connection.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ def open_pyexasol_connection(
5353
pat=saas_token)
5454
else:
5555
raise ValueError('Incomplete parameter list. '
56-
'Please either provide the parameters [dns, db_user, db_pass] '
56+
'Please either provide the parameters [dsn, db_user, db_pass] '
5757
'for an On-Prem database or [saas_url, saas_account_id, '
5858
'saas_database_id or saas_database_name, saas_token] '
5959
'for a SaaS database.')

exasol/python_extension_common/deployment/extract_validator.py

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,12 @@
33
import pyexasol # type: ignore
44

55
from datetime import datetime, timedelta
6+
from textwrap import dedent
67
from typing import Callable, List
78
from tenacity import Retrying
89
from tenacity.wait import wait_fixed
910
from tenacity.stop import stop_after_delay
1011

11-
from exasol.python_extension_common.deployment.language_container_validator import (
12-
temp_schema
13-
)
14-
1512
MANIFEST_FILE = "exasol-manifest.json"
1613

1714

@@ -74,8 +71,7 @@ def _create_manifest_udf(self, language_alias: str, udf_name: str):
7471
Much more a later statement "CREATE SCRIPT" will fail with an error
7572
message. Hence we need to use a retry here, as well.
7673
"""
77-
self._pyexasol_conn.execute(
78-
f"""
74+
self._pyexasol_conn.execute(dedent(f"""
7975
CREATE OR REPLACE {language_alias} SET SCRIPT
8076
{udf_name}(my_path VARCHAR(256))
8177
EMITS (node INTEGER, manifest BOOL) AS
@@ -84,7 +80,7 @@ def run(ctx):
8480
ctx.emit(exa.meta.node_id, os.path.isfile(ctx.my_path))
8581
/
8682
"""
87-
)
83+
))
8884

8985
def _check_all_nodes_with_retry(self, udf_name: str, nproc: int, manifest: str, timeout: timedelta):
9086
for attempt in Retrying(
@@ -97,7 +93,7 @@ def _check_all_nodes_with_retry(self, udf_name: str, nproc: int, manifest: str,
9793
def _check_all_nodes(self, udf_name: str, nproc: int, manifest: str):
9894
result = self._pyexasol_conn.execute(
9995
f"""
100-
SELECT {udf_name}({manifest})
96+
SELECT {udf_name}('{manifest}')
10197
FROM VALUES BETWEEN 1 AND {nproc} t(i) GROUP BY i
10298
"""
10399
).fetchall()
@@ -117,8 +113,9 @@ def verify_all_nodes(self, schema: str, language_alias: str, bfs_archive_path: b
117113
still nodes pending, for which the extraction could not be verified,
118114
yet.
119115
"""
120-
manifest = f"{bfs_archive_path.as_udf_path()}/{MANIFEST_FILE}"
121-
nproc = self._pyexasol_conn.execute("SELECT nproc()").fetchone()
116+
# manifest = f"{bfs_archive_path.as_udf_path()}/{MANIFEST_FILE}"
117+
manifest = "/exaudf/exaudfclient_py3"
118+
nproc = self._pyexasol_conn.execute("SELECT nproc()").fetchone()[0]
122119
udf_name = _udf_name(schema, language_alias)
123120
start = datetime.now()
124121
try:

exasol/python_extension_common/deployment/language_container_deployer.py

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
from textwrap import dedent
33
from typing import List, Optional, Dict
44
from pathlib import Path, PurePosixPath
5+
from datetime import timedelta
56
import logging
67
import tempfile
78
import ssl
@@ -10,9 +11,8 @@
1011
import exasol.bucketfs as bfs # type: ignore
1112
from exasol.saas.client.api_access import (get_connection_params, get_database_id) # type: ignore
1213

13-
from exasol.python_extension_common.deployment.language_container_validator import (
14-
wait_language_container, temp_schema
15-
)
14+
from exasol.python_extension_common.deployment.temp_schema import temp_schema
15+
from exasol.python_extension_common.deployment.extract_validator import ExtractValidator
1616

1717

1818
logger = logging.getLogger(__name__)
@@ -92,11 +92,21 @@ class LanguageContainerDeployer:
9292
def __init__(self,
9393
pyexasol_connection: pyexasol.ExaConnection,
9494
language_alias: str,
95-
bucketfs_path: bfs.path.PathLike) -> None:
95+
bucketfs_path: bfs.path.PathLike,
96+
extract_validator: ExtractValidator|None = None,
97+
) -> None:
9698

9799
self._bucketfs_path = bucketfs_path
98100
self._language_alias = language_alias
99101
self._pyexasol_conn = pyexasol_connection
102+
if extract_validator:
103+
self._extract_validator = extract_validator
104+
else:
105+
self._extract_validator = ExtractValidator(
106+
pyexasol_connection,
107+
timeout=timedelta(minutes=5),
108+
interval=timedelta(seconds=10),
109+
)
100110
logger.debug("Init %s", LanguageContainerDeployer.__name__)
101111

102112
def download_and_run(self, url: str,
@@ -124,6 +134,9 @@ def download_and_run(self, url: str,
124134
self.run(Path(tmp_file.name), bucket_file_path, alter_system, allow_override,
125135
wait_for_completion)
126136

137+
def _upload_path(self, bucket_file_path: str|None) -> bfs.path.PathLike:
138+
return self._bucketfs_path / bucket_file_path
139+
127140
def run(self, container_file: Optional[Path] = None,
128141
bucket_file_path: Optional[str] = None,
129142
alter_system: bool = True,
@@ -162,10 +175,12 @@ def run(self, container_file: Optional[Path] = None,
162175
self.activate_container(bucket_file_path, LanguageActivationLevel.Session,
163176
allow_override)
164177

165-
# Maybe wait until the container becomes operational.
178+
# Optionally wait until the container is extracted on all nodes of the
179+
# database cluster.
166180
if container_file and wait_for_completion:
167181
with temp_schema(self._pyexasol_conn) as schema:
168-
wait_language_container(self._pyexasol_conn, self._language_alias, schema)
182+
self._extract_validator.verify_all_nodes(
183+
schema, self._language_alias, self._upload_path(bucket_file_path))
169184

170185
if not alter_system:
171186
message = dedent(f"""
@@ -180,8 +195,7 @@ def run(self, container_file: Optional[Path] = None,
180195
""")
181196
print(message)
182197

183-
def upload_container(self, container_file: Path,
184-
bucket_file_path: Optional[str] = None) -> None:
198+
def upload_container(self, container_file: Path, bucket_file_path: Optional[str] = None) -> None:
185199
"""
186200
Upload the language container to the BucketFS.
187201
@@ -192,8 +206,7 @@ def upload_container(self, container_file: Path,
192206
raise RuntimeError(f"Container file {container_file} "
193207
f"is not a file.")
194208
with open(container_file, "br") as f:
195-
file_path = self._bucketfs_path / bucket_file_path
196-
file_path.write(f)
209+
self._upload_path(bucket_file_path).write(f)
197210
logging.debug("Container is uploaded to bucketfs")
198211

199212
def activate_container(self, bucket_file_path: str,
@@ -333,7 +346,7 @@ def create(cls,
333346
path=path_in_bucket)
334347
else:
335348
raise ValueError('Incomplete parameter list. '
336-
'Please either provide the parameters [dns, db_user, '
349+
'Please either provide the parameters [dsn, db_user, '
337350
'db_password, bucketfs_host, bucketfs_port, bucketfs_name, '
338351
'bucket, bucketfs_user, bucketfs_password] for an On-Prem '
339352
'database or [saas_url, saas_account_id, saas_database_id, '
Lines changed: 3 additions & 148 deletions
Original file line numberDiff line numberDiff line change
@@ -1,149 +1,4 @@
1-
from __future__ import annotations
2-
from typing import Generator
3-
from datetime import timedelta
4-
import random
5-
import string
6-
from contextlib import contextmanager
7-
from textwrap import dedent
1+
# This file is only for providing backwards compatibility.
2+
# As function temp_schema() has been moved to file temp_schema.py.
83

9-
from tenacity import retry
10-
from tenacity.wait import wait_fixed
11-
from tenacity.stop import stop_after_delay, stop_after_attempt
12-
13-
import pyexasol # type: ignore
14-
15-
_DUMMY_UDF_NAME = 'DUMMY_UDF'
16-
17-
18-
def _get_test_udf_name(schema: str | None) -> str:
19-
20-
if schema:
21-
return f'"{schema}"."{_DUMMY_UDF_NAME}"'
22-
return f'"{_DUMMY_UDF_NAME}"'
23-
24-
25-
def _create_dummy_udf(conn: pyexasol.ExaConnection, language_alias: str,
26-
schema: str | None) -> None:
27-
28-
# The dummy UDF returns the ID of the node it is running at.
29-
udf_name = _get_test_udf_name(schema)
30-
sql = dedent(f"""
31-
CREATE OR REPLACE {language_alias} SET SCRIPT {udf_name}(i DECIMAL(10, 0))
32-
RETURNS DECIMAL(10, 0) AS
33-
34-
def run(ctx):
35-
return exa.meta.node_id
36-
/
37-
""")
38-
conn.execute(sql)
39-
40-
41-
def _call_dummy_udf(conn: pyexasol.ExaConnection, schema: str | None) -> None:
42-
43-
# First, need to find out the number of nodes.
44-
sql = "SELECT NPROC();"
45-
nproc = conn.execute(sql).fetchval()
46-
47-
# This query should run the dummy udf on all nodes.
48-
udf_name = _get_test_udf_name(schema)
49-
sql = dedent(f"""
50-
SELECT {udf_name}(i) FROM VALUES BETWEEN 1 AND {nproc} t(i)
51-
GROUP BY i;
52-
""")
53-
# The expected result is a collection of distinct node IDs from 0 to nproc - 1.
54-
result = conn.execute(sql).fetchall()
55-
set_result = {row[0] for row in result}
56-
57-
assert set_result == set(range(nproc))
58-
59-
60-
def _delete_dummy_udf(conn: pyexasol.ExaConnection, schema: str | None) -> None:
61-
62-
udf_name = _get_test_udf_name(schema)
63-
sql = dedent(f"""
64-
DROP SCRIPT IF EXISTS {udf_name};
65-
""")
66-
conn.execute(sql)
67-
68-
69-
@retry(reraise=True, stop=stop_after_attempt(3))
70-
def _create_random_schema(conn: pyexasol.ExaConnection, schema_name_length: int) -> str:
71-
72-
schema = ''.join(random.choice(string.ascii_letters)
73-
for _ in range(schema_name_length))
74-
sql = f'CREATE SCHEMA "{schema}";'
75-
conn.execute(query=sql)
76-
return schema
77-
78-
79-
def _delete_schema(conn: pyexasol.ExaConnection, schema: str) -> None:
80-
81-
sql = f'DROP SCHEMA IF EXISTS "{schema}" CASCADE;'
82-
conn.execute(query=sql)
83-
84-
85-
def validate_language_container(conn: pyexasol.ExaConnection,
86-
language_alias: str,
87-
schema: str | None = None
88-
) -> None:
89-
"""
90-
Runs a test to check if a language container has been installed and is now
91-
operational. Will raise an exception if this is not the case.
92-
93-
conn - pyexasol connection. The language container must be activated either
94-
at the SYSTEM level or at the SESSION associated with this connection.
95-
language_alias - Language alias of the language container.
96-
schema - The schema to run the tests in. If not specified the current schema
97-
is assumed.
98-
"""
99-
try:
100-
_create_dummy_udf(conn, language_alias, schema)
101-
_call_dummy_udf(conn, schema)
102-
finally:
103-
_delete_dummy_udf(conn, schema)
104-
105-
106-
def wait_language_container(conn: pyexasol.ExaConnection,
107-
language_alias: str,
108-
schema: str | None = None,
109-
timeout: timedelta = timedelta(minutes=5),
110-
interval: timedelta = timedelta(seconds=5),
111-
) -> None:
112-
"""
113-
Keeps calling validate_language_container until it succeeds or the timeout expires.
114-
115-
conn - pyexasol connection. The language container must be activated either
116-
at the SYSTEM level or at the SESSION associated with this connection.
117-
language_alias - Language alias of the language container.
118-
schema - The schema to run the tests in. If not specified the current schema
119-
is assumed.
120-
timeout - Will give up after this timeout expires. The last exception thrown
121-
by the validate_language_container will be re-raised.
122-
interval - The calls to validate_language_container are spaced by this time
123-
interval.
124-
"""
125-
@retry(reraise=True, wait=wait_fixed(interval), stop=stop_after_delay(timeout))
126-
def repeat_validate_language_container():
127-
validate_language_container(conn, language_alias, schema)
128-
129-
repeat_validate_language_container()
130-
131-
132-
@contextmanager
133-
def temp_schema(conn: pyexasol.ExaConnection,
134-
schema_name_length: int = 20
135-
) -> Generator[str, None, None]:
136-
"""
137-
A context manager for running an operation in a newly created temporary schema.
138-
The schema will be deleted after the operation is competed. Note, that all objects
139-
created in this schema will be deleted with it. Returns the name of the created schema.
140-
141-
conn - pyexasol connection.
142-
schema_name_length - Number of characters in the temporary schema name.
143-
"""
144-
schema = ''
145-
try:
146-
schema = _create_random_schema(conn, schema_name_length)
147-
yield schema
148-
finally:
149-
_delete_schema(conn, schema)
4+
from exasol.python_extension_common.deployment.temp_schema import temp_schema
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
import pyexasol # type: ignore
2+
import random
3+
import string
4+
from contextlib import contextmanager
5+
6+
from typing import Generator
7+
from tenacity import retry
8+
from tenacity.stop import stop_after_attempt
9+
10+
@retry(reraise=True, stop=stop_after_attempt(3))
11+
def _create_random_schema(conn: pyexasol.ExaConnection, schema_name_length: int) -> str:
12+
13+
schema = ''.join(random.choice(string.ascii_letters)
14+
for _ in range(schema_name_length))
15+
sql = f'CREATE SCHEMA "{schema}";'
16+
conn.execute(query=sql)
17+
return schema
18+
19+
20+
def _delete_schema(conn: pyexasol.ExaConnection, schema: str) -> None:
21+
sql = f'DROP SCHEMA IF EXISTS "{schema}" CASCADE;'
22+
23+
24+
@contextmanager
25+
def temp_schema(conn: pyexasol.ExaConnection,
26+
schema_name_length: int = 20
27+
) -> Generator[str, None, None]:
28+
"""
29+
A context manager for running an operation in a newly created temporary schema.
30+
The schema will be deleted after the operation is competed. Note, that all objects
31+
created in this schema will be deleted with it. Returns the name of the created schema.
32+
33+
conn - pyexasol connection.
34+
schema_name_length - Number of characters in the temporary schema name.
35+
"""
36+
schema = ''
37+
try:
38+
schema = _create_random_schema(conn, schema_name_length)
39+
yield schema
40+
finally:
41+
_delete_schema(conn, schema)

test/unit/deployment/test_extract_validator.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ def execute(self, *args, **kwargs):
4848
return self
4949

5050
def fetchone(self):
51-
return next(self.values)
51+
return [ next(self.values) ]
5252

5353
def fetchall(self):
5454
return [ v for v in self.values ]

0 commit comments

Comments
 (0)