Skip to content

Commit

Permalink
Merge pull request #56 from engelhardtnick-at-TW/specify-java-setup-c…
Browse files Browse the repository at this point in the history
…onstraints

Specify java setup constraints
  • Loading branch information
lauris-tw authored Jun 18, 2024
2 parents c07bff5 + 2e56847 commit 6fef747
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 15 deletions.
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ These jobs are using _PySpark_ to process larger volumes of data and are suppose

## Pre-requisites
Please make sure you have the following installed and can run them
* Python (3.11 or later), you can use for example [pyenv](https://github.com/pyenv/pyenv#installation) to manage your python versions locally
* Python (3.11.x), you can use for example [pyenv](https://github.com/pyenv/pyenv#installation) to manage your python versions locally
* [Poetry](https://python-poetry.org/docs/#installation)
* Java (11)
* To run pySpark, it's important that the environment variable `JAVA_HOME` is set correctly, check via `echo $JAVA_HOME`
* [test_validate_spark_environment.py](/tests/integration/test_validate_spark_environment.py) will help you figure out if your environment will work

## Install all dependencies
```bash
Expand Down Expand Up @@ -49,7 +51,7 @@ OR
scripts\install_choco.ps1
scripts\install.bat

# For local laptop setup ensure that Java 11 with Spark 3.5.1 is available. More details in README-LOCAL.md
# For local laptop setup ensure that Java 11 with Spark 3.5.1 is available.
```
More: https://python-poetry.org/docs/cli/#build

Expand Down
3 changes: 0 additions & 3 deletions tests/integration/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +0,0 @@
from pyspark.sql import SparkSession

SPARK = SparkSession.builder.appName("IntegrationTests").getOrCreate()
7 changes: 7 additions & 0 deletions tests/integration/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import pytest
from pyspark.sql import SparkSession


@pytest.fixture(scope="session")
def SPARK():
return SparkSession.builder.appName("IntegrationTests").getOrCreate()
12 changes: 6 additions & 6 deletions tests/integration/test_distance_transformer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
from typing import Tuple

import pytest
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, DoubleType

from data_transformations.citibike import distance_transformer
from tests.integration import SPARK

BASE_COLUMNS = [
"tripduration",
Expand Down Expand Up @@ -81,8 +81,8 @@
]


def test_should_maintain_all_data_it_reads() -> None:
given_ingest_folder, given_transform_folder = __create_ingest_and_transform_folders()
def test_should_maintain_all_data_it_reads(SPARK) -> None:
given_ingest_folder, given_transform_folder = __create_ingest_and_transform_folders(SPARK)
given_dataframe = SPARK.read.parquet(given_ingest_folder)
distance_transformer.run(SPARK, given_ingest_folder, given_transform_folder)

Expand All @@ -97,7 +97,7 @@ def test_should_maintain_all_data_it_reads() -> None:


@pytest.mark.skip
def test_should_add_distance_column_with_calculated_distance() -> None:
def test_should_add_distance_column_with_calculated_distance(SPARK) -> None:
given_ingest_folder, given_transform_folder = __create_ingest_and_transform_folders()
distance_transformer.run(SPARK, given_ingest_folder, given_transform_folder)

Expand All @@ -117,10 +117,10 @@ def test_should_add_distance_column_with_calculated_distance() -> None:
assert expected_dataframe.collect() == actual_dataframe.collect()


def __create_ingest_and_transform_folders() -> Tuple[str, str]:
def __create_ingest_and_transform_folders(spark: SparkSession) -> Tuple[str, str]:
base_path = tempfile.mkdtemp()
ingest_folder = "%s%singest" % (base_path, os.path.sep)
transform_folder = "%s%stransform" % (base_path, os.path.sep)
ingest_dataframe = SPARK.createDataFrame(SAMPLE_DATA, BASE_COLUMNS)
ingest_dataframe = spark.createDataFrame(SAMPLE_DATA, BASE_COLUMNS)
ingest_dataframe.write.parquet(ingest_folder, mode='overwrite')
return ingest_folder, transform_folder
3 changes: 1 addition & 2 deletions tests/integration/test_ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,9 @@
from typing import Tuple, List

from data_transformations.citibike import ingest
from tests.integration import SPARK


def test_should_sanitize_column_names() -> None:
def test_should_sanitize_column_names(SPARK) -> None:
given_ingest_folder, given_transform_folder = __create_ingest_and_transform_folders()
input_csv_path = given_ingest_folder + 'input.csv'
csv_content = [
Expand Down
33 changes: 33 additions & 0 deletions tests/integration/test_validate_spark_environment.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import os
import re
import subprocess

import pytest


def test_java_home_is_set():
java_home = os.environ.get("JAVA_HOME")
assert java_home is not None, "Environment variable 'JAVA_HOME' is not set but is required by pySpark to work."


def test_java_version_is_greater_or_equal_11():
version_regex = re.compile(r'(?P<major>\d+)\.(?P<minor>\d+)\.\w+')

java_version_output = subprocess.check_output(['java', '-version'], stderr=subprocess.STDOUT).decode("utf-8")
print(f"\n`java -version` returned\n{java_version_output}")

version_number = java_version_output.splitlines()[0].split('"')[1].strip('"')
print(f"Assuming {version_number=} is the version to check.")

regex_match = version_regex.search(version_number)
if not regex_match:
pytest.fail(f"Couldn't parse Java version from {version_number=} using {version_regex=}.")
if regex_match["major"] == "1":
# we need to jump this hoop due to Java version naming conventions - it's fun:
# https://softwareengineering.stackexchange.com/questions/175075/why-is-java-version-1-x-referred-to-as-java-x
actual_major_version = int(regex_match["minor"])
else:
actual_major_version = int(regex_match["major"])
expected_major_version = 11
assert actual_major_version >= expected_major_version, (f"Major version {actual_major_version} is not recent "
f"enough, we need at least version {expected_major_version}.")
3 changes: 1 addition & 2 deletions tests/integration/test_word_count.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import pytest

from data_transformations.wordcount import word_count_transformer
from tests.integration import SPARK


def _get_file_paths(input_file_lines: List[str]) -> Tuple[str, str]:
Expand All @@ -20,7 +19,7 @@ def _get_file_paths(input_file_lines: List[str]) -> Tuple[str, str]:


@pytest.mark.skip
def test_should_tokenize_words_and_count_them() -> None:
def test_should_tokenize_words_and_count_them(SPARK) -> None:
lines = [
"In my younger and more vulnerable years my father gave me some advice that I've been "
"turning over in my mind ever since. \"Whenever you feel like criticising any one,\""
Expand Down

0 comments on commit 6fef747

Please sign in to comment.