diff --git a/README.md b/README.md index e6884bd..5cb73d6 100644 --- a/README.md +++ b/README.md @@ -4,9 +4,11 @@ These jobs are using _PySpark_ to process larger volumes of data and are suppose ## Pre-requisites Please make sure you have the following installed and can run them -* Python (3.11 or later), you can use for example [pyenv](https://github.com/pyenv/pyenv#installation) to manage your python versions locally +* Python (3.11.x), you can use for example [pyenv](https://github.com/pyenv/pyenv#installation) to manage your python versions locally * [Poetry](https://python-poetry.org/docs/#installation) * Java (11) + * To run pySpark, it's important that the environment variable `JAVA_HOME` is set correctly, check via `echo $JAVA_HOME` + * [test_validate_spark_environment.py](/tests/integration/test_validate_spark_environment.py) will help you figure out if your environment will work ## Install all dependencies ```bash @@ -49,7 +51,7 @@ OR scripts\install_choco.ps1 scripts\install.bat -# For local laptop setup ensure that Java 11 with Spark 3.5.1 is available. More details in README-LOCAL.md +# For local laptop setup ensure that Java 11 with Spark 3.5.1 is available. ``` More: https://python-poetry.org/docs/cli/#build diff --git a/tests/integration/__init__.py b/tests/integration/__init__.py index 09e672b..e69de29 100644 --- a/tests/integration/__init__.py +++ b/tests/integration/__init__.py @@ -1,3 +0,0 @@ -from pyspark.sql import SparkSession - -SPARK = SparkSession.builder.appName("IntegrationTests").getOrCreate() diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py new file mode 100644 index 0000000..c2018ae --- /dev/null +++ b/tests/integration/conftest.py @@ -0,0 +1,7 @@ +import pytest +from pyspark.sql import SparkSession + + +@pytest.fixture(scope="session") +def SPARK(): + return SparkSession.builder.appName("IntegrationTests").getOrCreate() diff --git a/tests/integration/test_distance_transformer.py b/tests/integration/test_distance_transformer.py index 2105fa5..e812618 100644 --- a/tests/integration/test_distance_transformer.py +++ b/tests/integration/test_distance_transformer.py @@ -3,10 +3,10 @@ from typing import Tuple import pytest +from pyspark.sql import SparkSession from pyspark.sql.types import StructField, DoubleType from data_transformations.citibike import distance_transformer -from tests.integration import SPARK BASE_COLUMNS = [ "tripduration", @@ -81,8 +81,8 @@ ] -def test_should_maintain_all_data_it_reads() -> None: - given_ingest_folder, given_transform_folder = __create_ingest_and_transform_folders() +def test_should_maintain_all_data_it_reads(SPARK) -> None: + given_ingest_folder, given_transform_folder = __create_ingest_and_transform_folders(SPARK) given_dataframe = SPARK.read.parquet(given_ingest_folder) distance_transformer.run(SPARK, given_ingest_folder, given_transform_folder) @@ -97,7 +97,7 @@ def test_should_maintain_all_data_it_reads() -> None: @pytest.mark.skip -def test_should_add_distance_column_with_calculated_distance() -> None: +def test_should_add_distance_column_with_calculated_distance(SPARK) -> None: given_ingest_folder, given_transform_folder = __create_ingest_and_transform_folders() distance_transformer.run(SPARK, given_ingest_folder, given_transform_folder) @@ -117,10 +117,10 @@ def test_should_add_distance_column_with_calculated_distance() -> None: assert expected_dataframe.collect() == actual_dataframe.collect() -def __create_ingest_and_transform_folders() -> Tuple[str, str]: +def __create_ingest_and_transform_folders(spark: SparkSession) -> Tuple[str, str]: base_path = tempfile.mkdtemp() ingest_folder = "%s%singest" % (base_path, os.path.sep) transform_folder = "%s%stransform" % (base_path, os.path.sep) - ingest_dataframe = SPARK.createDataFrame(SAMPLE_DATA, BASE_COLUMNS) + ingest_dataframe = spark.createDataFrame(SAMPLE_DATA, BASE_COLUMNS) ingest_dataframe.write.parquet(ingest_folder, mode='overwrite') return ingest_folder, transform_folder diff --git a/tests/integration/test_ingest.py b/tests/integration/test_ingest.py index 03df25d..366be44 100644 --- a/tests/integration/test_ingest.py +++ b/tests/integration/test_ingest.py @@ -4,10 +4,9 @@ from typing import Tuple, List from data_transformations.citibike import ingest -from tests.integration import SPARK -def test_should_sanitize_column_names() -> None: +def test_should_sanitize_column_names(SPARK) -> None: given_ingest_folder, given_transform_folder = __create_ingest_and_transform_folders() input_csv_path = given_ingest_folder + 'input.csv' csv_content = [ diff --git a/tests/integration/test_validate_spark_environment.py b/tests/integration/test_validate_spark_environment.py new file mode 100644 index 0000000..edb1754 --- /dev/null +++ b/tests/integration/test_validate_spark_environment.py @@ -0,0 +1,33 @@ +import os +import re +import subprocess + +import pytest + + +def test_java_home_is_set(): + java_home = os.environ.get("JAVA_HOME") + assert java_home is not None, "Environment variable 'JAVA_HOME' is not set but is required by pySpark to work." + + +def test_java_version_is_greater_or_equal_11(): + version_regex = re.compile(r'(?P\d+)\.(?P\d+)\.\w+') + + java_version_output = subprocess.check_output(['java', '-version'], stderr=subprocess.STDOUT).decode("utf-8") + print(f"\n`java -version` returned\n{java_version_output}") + + version_number = java_version_output.splitlines()[0].split('"')[1].strip('"') + print(f"Assuming {version_number=} is the version to check.") + + regex_match = version_regex.search(version_number) + if not regex_match: + pytest.fail(f"Couldn't parse Java version from {version_number=} using {version_regex=}.") + if regex_match["major"] == "1": + # we need to jump this hoop due to Java version naming conventions - it's fun: + # https://softwareengineering.stackexchange.com/questions/175075/why-is-java-version-1-x-referred-to-as-java-x + actual_major_version = int(regex_match["minor"]) + else: + actual_major_version = int(regex_match["major"]) + expected_major_version = 11 + assert actual_major_version >= expected_major_version, (f"Major version {actual_major_version} is not recent " + f"enough, we need at least version {expected_major_version}.") diff --git a/tests/integration/test_word_count.py b/tests/integration/test_word_count.py index 49c04c1..8ff15a0 100644 --- a/tests/integration/test_word_count.py +++ b/tests/integration/test_word_count.py @@ -5,7 +5,6 @@ import pytest from data_transformations.wordcount import word_count_transformer -from tests.integration import SPARK def _get_file_paths(input_file_lines: List[str]) -> Tuple[str, str]: @@ -20,7 +19,7 @@ def _get_file_paths(input_file_lines: List[str]) -> Tuple[str, str]: @pytest.mark.skip -def test_should_tokenize_words_and_count_them() -> None: +def test_should_tokenize_words_and_count_them(SPARK) -> None: lines = [ "In my younger and more vulnerable years my father gave me some advice that I've been " "turning over in my mind ever since. \"Whenever you feel like criticising any one,\""