From 99509628e369594695ca5f4c8569beca65e2c6c0 Mon Sep 17 00:00:00 2001 From: Nick Engelhardt Date: Thu, 13 Jun 2024 22:15:11 +0200 Subject: [PATCH 1/4] Emphasize the importance of JAVA_HOME --- README.md | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 326c88b..f3d1078 100644 --- a/README.md +++ b/README.md @@ -4,9 +4,10 @@ These jobs are using _PySpark_ to process larger volumes of data and are suppose ## Pre-requisites Please make sure you have the following installed and can run them -* Python (3.11 or later), you can use for example [pyenv](https://github.com/pyenv/pyenv#installation) to manage your python versions locally +* Python (3.11.x), you can use for example [pyenv](https://github.com/pyenv/pyenv#installation) to manage your python versions locally * [Poetry](https://python-poetry.org/docs/#installation) * Java (11) + * To run pySpark, it's important that the environment variable `JAVA_HOME` is set correctly, check via `echo $JAVA_HOME` ## Install all dependencies ```bash @@ -49,7 +50,7 @@ OR scripts\install_choco.ps1 scripts\install.bat -# For local laptop setup ensure that Java 11 with Spark 3.5.1 is available. More details in README-LOCAL.md +# For local laptop setup ensure that Java 11 with Spark 3.5.1 is available. ``` More: https://python-poetry.org/docs/cli/#build From c0a46dd429d027d924838a5f768ee2511b1fa45e Mon Sep 17 00:00:00 2001 From: Nick Engelhardt Date: Thu, 13 Jun 2024 22:16:02 +0200 Subject: [PATCH 2/4] Extract SparkSession-setup to conftest This way we won't have ALL tests failing for an error of not being able to setup a SparkSession. --- tests/integration/__init__.py | 3 --- tests/integration/conftest.py | 7 +++++++ tests/integration/test_distance_transformer.py | 12 ++++++------ tests/integration/test_ingest.py | 3 +-- tests/integration/test_word_count.py | 3 +-- 5 files changed, 15 insertions(+), 13 deletions(-) create mode 100644 tests/integration/conftest.py diff --git a/tests/integration/__init__.py b/tests/integration/__init__.py index 09e672b..e69de29 100644 --- a/tests/integration/__init__.py +++ b/tests/integration/__init__.py @@ -1,3 +0,0 @@ -from pyspark.sql import SparkSession - -SPARK = SparkSession.builder.appName("IntegrationTests").getOrCreate() diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py new file mode 100644 index 0000000..c2018ae --- /dev/null +++ b/tests/integration/conftest.py @@ -0,0 +1,7 @@ +import pytest +from pyspark.sql import SparkSession + + +@pytest.fixture(scope="session") +def SPARK(): + return SparkSession.builder.appName("IntegrationTests").getOrCreate() diff --git a/tests/integration/test_distance_transformer.py b/tests/integration/test_distance_transformer.py index 2105fa5..e812618 100644 --- a/tests/integration/test_distance_transformer.py +++ b/tests/integration/test_distance_transformer.py @@ -3,10 +3,10 @@ from typing import Tuple import pytest +from pyspark.sql import SparkSession from pyspark.sql.types import StructField, DoubleType from data_transformations.citibike import distance_transformer -from tests.integration import SPARK BASE_COLUMNS = [ "tripduration", @@ -81,8 +81,8 @@ ] -def test_should_maintain_all_data_it_reads() -> None: - given_ingest_folder, given_transform_folder = __create_ingest_and_transform_folders() +def test_should_maintain_all_data_it_reads(SPARK) -> None: + given_ingest_folder, given_transform_folder = __create_ingest_and_transform_folders(SPARK) given_dataframe = SPARK.read.parquet(given_ingest_folder) distance_transformer.run(SPARK, given_ingest_folder, given_transform_folder) @@ -97,7 +97,7 @@ def test_should_maintain_all_data_it_reads() -> None: @pytest.mark.skip -def test_should_add_distance_column_with_calculated_distance() -> None: +def test_should_add_distance_column_with_calculated_distance(SPARK) -> None: given_ingest_folder, given_transform_folder = __create_ingest_and_transform_folders() distance_transformer.run(SPARK, given_ingest_folder, given_transform_folder) @@ -117,10 +117,10 @@ def test_should_add_distance_column_with_calculated_distance() -> None: assert expected_dataframe.collect() == actual_dataframe.collect() -def __create_ingest_and_transform_folders() -> Tuple[str, str]: +def __create_ingest_and_transform_folders(spark: SparkSession) -> Tuple[str, str]: base_path = tempfile.mkdtemp() ingest_folder = "%s%singest" % (base_path, os.path.sep) transform_folder = "%s%stransform" % (base_path, os.path.sep) - ingest_dataframe = SPARK.createDataFrame(SAMPLE_DATA, BASE_COLUMNS) + ingest_dataframe = spark.createDataFrame(SAMPLE_DATA, BASE_COLUMNS) ingest_dataframe.write.parquet(ingest_folder, mode='overwrite') return ingest_folder, transform_folder diff --git a/tests/integration/test_ingest.py b/tests/integration/test_ingest.py index 03df25d..366be44 100644 --- a/tests/integration/test_ingest.py +++ b/tests/integration/test_ingest.py @@ -4,10 +4,9 @@ from typing import Tuple, List from data_transformations.citibike import ingest -from tests.integration import SPARK -def test_should_sanitize_column_names() -> None: +def test_should_sanitize_column_names(SPARK) -> None: given_ingest_folder, given_transform_folder = __create_ingest_and_transform_folders() input_csv_path = given_ingest_folder + 'input.csv' csv_content = [ diff --git a/tests/integration/test_word_count.py b/tests/integration/test_word_count.py index 49c04c1..8ff15a0 100644 --- a/tests/integration/test_word_count.py +++ b/tests/integration/test_word_count.py @@ -5,7 +5,6 @@ import pytest from data_transformations.wordcount import word_count_transformer -from tests.integration import SPARK def _get_file_paths(input_file_lines: List[str]) -> Tuple[str, str]: @@ -20,7 +19,7 @@ def _get_file_paths(input_file_lines: List[str]) -> Tuple[str, str]: @pytest.mark.skip -def test_should_tokenize_words_and_count_them() -> None: +def test_should_tokenize_words_and_count_them(SPARK) -> None: lines = [ "In my younger and more vulnerable years my father gave me some advice that I've been " "turning over in my mind ever since. \"Whenever you feel like criticising any one,\"" From 06b3046fb0929a9c9f5002134e6f69ca2823666f Mon Sep 17 00:00:00 2001 From: Nick Engelhardt Date: Thu, 13 Jun 2024 22:22:21 +0200 Subject: [PATCH 3/4] Add tests to validate spark environment prereqs --- README.md | 1 + .../test_validate_spark_environment.py | 23 +++++++++++++++++++ 2 files changed, 24 insertions(+) create mode 100644 tests/integration/test_validate_spark_environment.py diff --git a/README.md b/README.md index f3d1078..d0fbac5 100644 --- a/README.md +++ b/README.md @@ -8,6 +8,7 @@ Please make sure you have the following installed and can run them * [Poetry](https://python-poetry.org/docs/#installation) * Java (11) * To run pySpark, it's important that the environment variable `JAVA_HOME` is set correctly, check via `echo $JAVA_HOME` + * [test_validate_spark_environment.py](/tests/integration/test_validate_spark_environment.py) will help you figure out if your environment will work ## Install all dependencies ```bash diff --git a/tests/integration/test_validate_spark_environment.py b/tests/integration/test_validate_spark_environment.py new file mode 100644 index 0000000..2292355 --- /dev/null +++ b/tests/integration/test_validate_spark_environment.py @@ -0,0 +1,23 @@ +import os +import subprocess + + +def test_java_home_is_set(): + java_home = os.environ.get("JAVA_HOME") + assert java_home is not None, "Environment variable 'JAVA_HOME' is not set but is required by pySpark to work." + + +def test_java_version_is_greater_or_equal_11(): + java_version = subprocess.check_output(['java', '-version'], stderr=subprocess.STDOUT).decode("utf-8") + print(f"\n`java -version` returned\n{java_version}") + version_number = java_version.splitlines()[0].split('"')[1].strip('"') + print(f"Assuming {version_number=} is the version to check.") + version_identifier_at_idx_0, version_identifier_at_idx_1, _ = version_number.split('.') + if eval(version_identifier_at_idx_0) == 1: + print( + f"Found {version_identifier_at_idx_0=}, this is not the major version. Using {version_identifier_at_idx_1=} to check version requirements.") + actual_major_version = eval(version_identifier_at_idx_1) + else: + actual_major_version = eval(version_identifier_at_idx_0) + expected_major_version = 11 + assert actual_major_version >= expected_major_version, f"Major version {actual_major_version} is not recent enough, we need at least version {expected_major_version}." From 2e5684759d869bb80ada067867cc963ddf9032cb Mon Sep 17 00:00:00 2001 From: Nick Engelhardt Date: Tue, 18 Jun 2024 08:58:03 +0200 Subject: [PATCH 4/4] Parse version string using a regex --- .../test_validate_spark_environment.py | 30 ++++++++++++------- 1 file changed, 20 insertions(+), 10 deletions(-) diff --git a/tests/integration/test_validate_spark_environment.py b/tests/integration/test_validate_spark_environment.py index 2292355..edb1754 100644 --- a/tests/integration/test_validate_spark_environment.py +++ b/tests/integration/test_validate_spark_environment.py @@ -1,6 +1,9 @@ import os +import re import subprocess +import pytest + def test_java_home_is_set(): java_home = os.environ.get("JAVA_HOME") @@ -8,16 +11,23 @@ def test_java_home_is_set(): def test_java_version_is_greater_or_equal_11(): - java_version = subprocess.check_output(['java', '-version'], stderr=subprocess.STDOUT).decode("utf-8") - print(f"\n`java -version` returned\n{java_version}") - version_number = java_version.splitlines()[0].split('"')[1].strip('"') + version_regex = re.compile(r'(?P\d+)\.(?P\d+)\.\w+') + + java_version_output = subprocess.check_output(['java', '-version'], stderr=subprocess.STDOUT).decode("utf-8") + print(f"\n`java -version` returned\n{java_version_output}") + + version_number = java_version_output.splitlines()[0].split('"')[1].strip('"') print(f"Assuming {version_number=} is the version to check.") - version_identifier_at_idx_0, version_identifier_at_idx_1, _ = version_number.split('.') - if eval(version_identifier_at_idx_0) == 1: - print( - f"Found {version_identifier_at_idx_0=}, this is not the major version. Using {version_identifier_at_idx_1=} to check version requirements.") - actual_major_version = eval(version_identifier_at_idx_1) + + regex_match = version_regex.search(version_number) + if not regex_match: + pytest.fail(f"Couldn't parse Java version from {version_number=} using {version_regex=}.") + if regex_match["major"] == "1": + # we need to jump this hoop due to Java version naming conventions - it's fun: + # https://softwareengineering.stackexchange.com/questions/175075/why-is-java-version-1-x-referred-to-as-java-x + actual_major_version = int(regex_match["minor"]) else: - actual_major_version = eval(version_identifier_at_idx_0) + actual_major_version = int(regex_match["major"]) expected_major_version = 11 - assert actual_major_version >= expected_major_version, f"Major version {actual_major_version} is not recent enough, we need at least version {expected_major_version}." + assert actual_major_version >= expected_major_version, (f"Major version {actual_major_version} is not recent " + f"enough, we need at least version {expected_major_version}.")