diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml new file mode 100644 index 0000000..8219d91 --- /dev/null +++ b/.github/workflows/ci.yaml @@ -0,0 +1,75 @@ +name: CI + +on: + pull_request: + branches: + - main + +jobs: + unit-tests: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + + - name: Set up Python + uses: actions/setup-python@v2 + with: + python-version: "3.11" + + - name: Install dependencies + run: make install-dev + + - name: Run Formatting Tests + run: make test-format + + integration-tests: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + + - name: Set up Python + uses: actions/setup-python@v2 + with: + python-version: "3.11" + + - name: Install Java + run: | + sudo apt-get update + sudo apt-get install -y openjdk-11-jdk + + - name: Install dependencies + run: make install-dev + + - name: Run Integration Tests + run: make test-integration + + e2e-tests: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + + - name: Set up Python + uses: actions/setup-python@v2 + with: + python-version: "3.11" + + - name: Install Java + run: | + sudo apt-get update + sudo apt-get install -y openjdk-11-jdk + + - name: Install kind + run: | + curl -Lo ./kind https://kind.sigs.k8s.io/dl/v0.20.0/kind-linux-amd64 + chmod +x ./kind + sudo mv ./kind /usr/local/bin/kind + + - name: Install Helm + run: | + curl https://raw.githubusercontent.com/helm/helm/main/scripts/get-helm-3 | bash + + - name: Install dependencies + run: make install-dev + + - name: Run E2E Tests + run: make test-e2e \ No newline at end of file diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml new file mode 100644 index 0000000..dbdc997 --- /dev/null +++ b/.github/workflows/release.yaml @@ -0,0 +1,54 @@ +name: Release + +on: + release: + types: [published] + +jobs: + release: + runs-on: ubuntu-latest + permissions: + contents: read # Required to access repository content + packages: write # Required for docker image + id-token: write # Required for PyPI + steps: + - uses: actions/checkout@v2 + + # - name: Set up Python + # uses: actions/setup-python@v2 + # with: + # python-version: "3.11" + + - name: Get Release Version + id: get_version + run: echo "package_version=${GITHUB_REF#refs/tags/v}" >> $GITHUB_ENV + + # - name: Build Python Package + # env: + # PACKAGE_VERSION: ${{ env.package_version }} + # run: | + # pip install --upgrade pip + # pip install build twine + # sed -i "s/__version__ = .*/__version__ = \"$PACKAGE_VERSION\"/" src/pulse_telemetry/__init__.py + # python -m build + + # - name: Publish Python Package + # uses: pypa/gh-action-pypi-publish@release/v1 + # with: + # package_dir: dist + + - name: Log in to GitHub Container Registry + uses: docker/login-action@v2 + with: + registry: ghcr.io + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} + + - name: Build Docker image + run: | + docker build -t ghcr.io/battery-pulse/pulse-analytics:${{ env.package_version }} -t ghcr.io/battery-pulse/pulse-analytics:latest . + + - name: Push Docker image + run: | + docker push ghcr.io/battery-pulse/pulse-analytics:${{ env.package_version }} + docker push ghcr.io/battery-pulse/pulse-analytics:latest \ No newline at end of file diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..288ac8b --- /dev/null +++ b/Dockerfile @@ -0,0 +1,15 @@ +FROM python:3.11-slim + +# Install dbt-core and dbt-trino +RUN pip install --no-cache-dir \ + dbt-core \ + dbt-trino + +# Set the working directory inside the container +WORKDIR /dbt + +# Copy the dbt project files to the container +COPY ./dbt /dbt + +# Default entrypoint to run dbt build +ENTRYPOINT ["dbt", "build", "--target", "trino", "--profiles-dir", "."] diff --git a/Makefile b/Makefile index 3f230c8..64f2f5f 100644 --- a/Makefile +++ b/Makefile @@ -26,7 +26,7 @@ test-integration: .PHONY: test-e2e test-e2e: - pytest tests/ --dbt-target=trino + pytest tests/ -s --log-cli-level=INFO --dbt-target=trino .PHONY: docker-image docker-image: diff --git a/README.md b/README.md index 609cecb..5f867ec 100644 --- a/README.md +++ b/README.md @@ -48,7 +48,7 @@ Coming soon... - **Integration Tests**: Integration tests run on DuckDB to validate data transformations, DBT model execution, and data quality checks. Test setup and teardown is lightweight and fast. -- **End-to-End Tests**: End-to-end tests perform the same checks as integration tests but use Trino as the backend. This setup provides a production-like environment, where telemetry, metadata, and the DBT targets reside in separate data catalogs. +- **End-to-End Tests**: End-to-end tests perform the same checks as integration tests but use Trino as the backend. This setup provides a production-like environment, where telemetry, metadata, and the DBT targets may reside in separate data catalogs. ### DBT Project Configuration diff --git a/dbt/macros/prefix_columns.sql b/dbt/macros/prefix_columns.sql new file mode 100644 index 0000000..f82d8a4 --- /dev/null +++ b/dbt/macros/prefix_columns.sql @@ -0,0 +1,8 @@ +{% macro prefix_columns(table, alias) %} + {% set columns = adapter.get_columns_in_relation(ref(table)) %} + {% set prefixed_columns = [] %} + {% for col in columns %} + {% do prefixed_columns.append("{}.{} AS {}__{}".format(alias, col.name, alias, col.name)) %} + {% endfor %} + {{ prefixed_columns | join(', ') }} +{% endmacro %} diff --git a/dbt/models/marts/part_statistics_cycle.sql b/dbt/models/marts/part_statistics_cycle.sql index 6c03151..f9f78ef 100644 --- a/dbt/models/marts/part_statistics_cycle.sql +++ b/dbt/models/marts/part_statistics_cycle.sql @@ -8,7 +8,7 @@ WITH test_with_part AS ( SELECT dpt.part_id, t.*, -- All columns from test - pm.* -- All columns from part_metadata + {{ prefix_columns('part_metadata', 'pm') }} -- Select all columns from part_metadata FROM {{ ref('test_statistics_cycle') }} AS t INNER JOIN {{ ref('device_test_part') }} AS dpt ON t.device_id = dpt.device_id diff --git a/dbt/models/marts/part_statistics_step.sql b/dbt/models/marts/part_statistics_step.sql index f2195b4..ec9c5e6 100644 --- a/dbt/models/marts/part_statistics_step.sql +++ b/dbt/models/marts/part_statistics_step.sql @@ -8,7 +8,7 @@ WITH test_with_part AS ( SELECT dpt.part_id, t.*, -- All columns from test - pm.* -- All columns from part_metadata + {{ prefix_columns('part_metadata', 'pm') }} -- Select all columns from part_metadata FROM {{ ref('test_statistics_step') }} AS t INNER JOIN {{ ref('device_test_part') }} AS dpt ON t.device_id = dpt.device_id diff --git a/dbt/models/marts/part_telemetry.sql b/dbt/models/marts/part_telemetry.sql index eb59cfb..31307b9 100644 --- a/dbt/models/marts/part_telemetry.sql +++ b/dbt/models/marts/part_telemetry.sql @@ -8,7 +8,7 @@ WITH test_with_part AS ( SELECT dpt.part_id, t.*, -- All columns from test - pm.* -- All columns from part_metadata + {{ prefix_columns('part_metadata', 'pm') }} -- Select all columns from part_metadata FROM {{ ref('test_telemetry') }} AS t INNER JOIN {{ ref('device_test_part') }} AS dpt ON t.device_id = dpt.device_id diff --git a/dbt/models/marts/test_statistics_cycle.sql b/dbt/models/marts/test_statistics_cycle.sql index 1d7dd4c..f551e5c 100644 --- a/dbt/models/marts/test_statistics_cycle.sql +++ b/dbt/models/marts/test_statistics_cycle.sql @@ -6,8 +6,8 @@ SELECT dtr.recipe_id, -- Prefix table with test recipe t.*, -- Select all columns from telemetry - dm.*, -- Select all columns from device_metadata - rm.* -- Select all columns from recipe_metadata + {{ prefix_columns('device_metadata', 'dm') }}, -- Select all columns from device_metadata + {{ prefix_columns('recipe_metadata', 'rm') }} -- Select all columns from recipe_metadata FROM {{ ref('statistics_cycle') }} AS t LEFT JOIN {{ ref('device_metadata') }} AS dm -- Don't drop rows with no device metadata ON t.device_id = dm.device_id diff --git a/dbt/models/marts/test_statistics_step.sql b/dbt/models/marts/test_statistics_step.sql index facd1e6..ea76bd5 100644 --- a/dbt/models/marts/test_statistics_step.sql +++ b/dbt/models/marts/test_statistics_step.sql @@ -6,8 +6,8 @@ SELECT dtr.recipe_id, -- Prefix table with test recipe t.*, -- Select all columns from telemetry - dm.*, -- Select all columns from device_metadata - rm.* -- Select all columns from recipe_metadata + {{ prefix_columns('device_metadata', 'dm') }}, -- Select all columns from device_metadata + {{ prefix_columns('recipe_metadata', 'rm') }} -- Select all columns from recipe_metadata FROM {{ ref('statistics_step') }} AS t LEFT JOIN {{ ref('device_metadata') }} AS dm -- Don't drop rows with no device metadata ON t.device_id = dm.device_id diff --git a/dbt/models/marts/test_telemetry.sql b/dbt/models/marts/test_telemetry.sql index 17658a4..5b3a344 100644 --- a/dbt/models/marts/test_telemetry.sql +++ b/dbt/models/marts/test_telemetry.sql @@ -6,8 +6,8 @@ SELECT dtr.recipe_id, -- Prefix table with test recipe t.*, -- Select all columns from telemetry - dm.*, -- Select all columns from device_metadata - rm.* -- Select all columns from recipe_metadata + {{ prefix_columns('device_metadata', 'dm') }}, -- Select all columns from device_metadata + {{ prefix_columns('recipe_metadata', 'rm') }} -- Select all columns from recipe_metadata FROM {{ ref('telemetry') }} AS t LEFT JOIN {{ ref('device_metadata') }} AS dm -- Don't drop rows with no device metadata ON t.device_id = dm.device_id diff --git a/dbt/profiles.yml b/dbt/profiles.yml index 41649de..7d8c3ba 100644 --- a/dbt/profiles.yml +++ b/dbt/profiles.yml @@ -3,12 +3,16 @@ pulse_analytics: trino: type: trino host: "{{ env_var('PULSE_ANALYTICS_TRINO_HOST') }}" - port: "{{ env_var('PULSE_ANALYTICS_TRINO_PORT') }}" + port: "{{ env_var('PULSE_ANALYTICS_TRINO_PORT') | int }}" user: "{{ env_var('PULSE_ANALYTICS_TRINO_USER') }}" - catalog: "{{ env_var('PULSE_ANALYTICS_TARGET_CATALOG') }}" + database: "{{ env_var('PULSE_ANALYTICS_TARGET_CATALOG') }}" schema: "{{ env_var('PULSE_ANALYTICS_TARGET_SCHEMA') }}" + method: ldap + password: "{{ env_var('PULSE_ANALYTICS_TRINO_PASSWORD') }}" + http_scheme: https + cert: false duckdb: type: duckdb - path: "{{ env_var('PULSE_ANALYTICS_DUCKDB_PATH') }}" # Implicitly, the catalog is the file name w/o extension + path: "{{ env_var('PULSE_ANALYTICS_DUCKDB_PATH') }}" # The catalog is the file name w/o extension schema: "{{ env_var('PULSE_ANALYTICS_TARGET_SCHEMA') }}" diff --git a/examples/dbt-config.yaml b/examples/dbt-config.yaml deleted file mode 100644 index 13fd03d..0000000 --- a/examples/dbt-config.yaml +++ /dev/null @@ -1,29 +0,0 @@ -apiVersion: v1 -kind: ConfigMap -metadata: - name: dbt-config -data: - # Trino connection - PULSE_ANALYTICS_TRINO_HOST: "trino-host" - PULSE_ANALYTICS_TRINO_PORT: "8080" - PULSE_ANALYTICS_TRINO_USER: "trino_user" - - # Target catalog - PULSE_ANALYTICS_TARGET_CATALOG: "analytics" - PULSE_ANALYTICS_TARGET_SCHEMA: "dev" - - # Telemetry sources - PULSE_ANALYTICS_TELEMETRY_CATALOG: "telemetry" - PULSE_ANALYTICS_TELEMETRY_SCHEMA: "dev" - PULSE_ANALYTICS_TELEMETRY_TABLE: "telemetry" - PULSE_ANALYTICS_STATISTICS_STEP_TABLE: "statistics_step" - PULSE_ANALYTICS_STATISTICS_CYCLE_TABLE: "statistics_cycle" - - # Metadata sources - PULSE_ANALYTICS_METADATA_CATALOG: "metadata" - PULSE_ANALYTICS_METADATA_SCHEMA: "dev" - PULSE_ANALYTICS_DEVICE_METADATA_TABLE: "device_metadata" - PULSE_ANALYTICS_RECIPE_METADATA_TABLE: "recipe_metadata" - PULSE_ANALYTICS_DEVICE_TEST_RECIPE_TABLE: "device_test_recipe" - PULSE_ANALYTICS_PART_METADATA_TABLE: "part_metadata" - PULSE_ANALYTICS_DEVICE_TEST_PART_TABLE: "device_test_part" diff --git a/examples/dbt-job.yaml b/examples/dbt-job.yaml new file mode 100644 index 0000000..9e2674a --- /dev/null +++ b/examples/dbt-job.yaml @@ -0,0 +1,61 @@ +apiVersion: batch/v1 +kind: Job +metadata: + name: dbt-job +spec: + template: + spec: + containers: + - name: dbt + image: pulse-analytics:latest + imagePullPolicy: Never + envFrom: + - configMapRef: + name: dbt-config + - secretRef: + name: trino-users + env: + - name: PULSE_ANALYTICS_TRINO_PASSWORD + valueFrom: + secretKeyRef: + name: trino-users + key: admin + restartPolicy: Never +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: dbt-config +data: + # Trino connection + PULSE_ANALYTICS_TRINO_HOST: "trino-cluster-coordinator.default.svc.cluster.local" + PULSE_ANALYTICS_TRINO_PORT: "8443" + PULSE_ANALYTICS_TRINO_USER: "admin" + + # Target catalog + PULSE_ANALYTICS_TARGET_CATALOG: "lakehouse" + PULSE_ANALYTICS_TARGET_SCHEMA: "analytics" + + # Telemetry sources + PULSE_ANALYTICS_TELEMETRY_CATALOG: "lakehouse" + PULSE_ANALYTICS_TELEMETRY_SCHEMA: "telemetry" + PULSE_ANALYTICS_TELEMETRY_TABLE: "telemetry" + PULSE_ANALYTICS_STATISTICS_STEP_TABLE: "statistics_step" + PULSE_ANALYTICS_STATISTICS_CYCLE_TABLE: "statistics_cycle" + + # Metadata sources + PULSE_ANALYTICS_METADATA_CATALOG: "lakehouse" + PULSE_ANALYTICS_METADATA_SCHEMA: "metadata" + PULSE_ANALYTICS_DEVICE_METADATA_TABLE: "device_metadata" + PULSE_ANALYTICS_RECIPE_METADATA_TABLE: "recipe_metadata" + PULSE_ANALYTICS_DEVICE_TEST_RECIPE_TABLE: "device_test_recipe" + PULSE_ANALYTICS_PART_METADATA_TABLE: "part_metadata" + PULSE_ANALYTICS_DEVICE_TEST_PART_TABLE: "device_test_part" +--- +apiVersion: v1 +kind: Secret +metadata: + name: trino-users +type: kubernetes.io/opaque +stringData: + admin: admin \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index ef13455..8257b57 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,6 +20,8 @@ dev = [ "pytest==8.2.2", "ruff==0.5.2", "setuptools==75.1.0", + "sqlalchemy==2.0.36", + "sqlalchemy-trino==0.5.0", "pulse-telemetry==0.1.1", ] diff --git a/tests/conftest.py b/tests/conftest.py index 53ec197..001ec58 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -6,9 +6,10 @@ import pandas as pd import pytest import trino.dbapi -from pulse_telemetry.sparklib import statistics_cycle, statistics_step, telemetry +from pulse_telemetry.sparklib import iceberg, statistics_cycle, statistics_step, telemetry from pulse_telemetry.utils import channel, telemetry_generator from pyspark.sql import SparkSession +from sqlalchemy import create_engine, text current_file_path = os.path.abspath(__file__) current_dir = os.path.dirname(current_file_path) @@ -61,6 +62,8 @@ def duckdb_environment(): def trino_environment(): setup_script_path = os.path.join(scripts_dir, "setup_kubernetes.sh") subprocess.run([setup_script_path], cwd=manifests_dir, check=True) + subprocess.run(["docker", "build", "-t", "pulse-analytics:latest", "."], check=True) + subprocess.run(["kind", "load", "docker-image", "pulse-analytics:latest", "--name", "kind"], check=True) @pytest.fixture(scope="session") @@ -74,15 +77,56 @@ def setup_environment(dbt_target): os.remove(duckdb_path) case "trino": trino_environment() + trino_forward = subprocess.Popen( + ["kubectl", "port-forward", "svc/trino-cluster-coordinator", "8443:8443"], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + minio_forward = subprocess.Popen( + ["kubectl", "port-forward", "svc/minio", "9000:9000"], stdout=subprocess.PIPE, stderr=subprocess.PIPE + ) + hive_forward = subprocess.Popen( + ["kubectl", "port-forward", "svc/hive", "9083:9083"], stdout=subprocess.PIPE, stderr=subprocess.PIPE + ) yield + trino_forward.terminate() + minio_forward.terminate() + hive_forward.terminate() subprocess.run(["kind", "delete", "cluster"], check=True) # Seeds database with sources -def telemetry_sources(num_channels): - spark = SparkSession.builder.appName("Testing").getOrCreate() +@pytest.fixture(scope="session") +def spark(): + return ( + SparkSession.builder.appName("E2ESeeding") + # Iceberg and S3 packages + .config( + "spark.jars.packages", + "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.6.0,org.apache.hadoop:hadoop-aws:3.3.4", + ) + .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") + # Hive metastore configuration + .config("spark.sql.catalog.lakehouse", "org.apache.iceberg.spark.SparkCatalog") + .config("spark.sql.catalog.lakehouse.type", "hive") + .config("spark.sql.catalog.lakehouse.uri", "thrift://localhost:9083") # Hive Metastore + .config("spark.sql.catalog.lakehouse.warehouse", "s3a://lakehouse/") + # S3 configuration + .config("spark.hadoop.fs.s3a.endpoint", "http://localhost:9000") # MinIO + .config("spark.hadoop.fs.s3a.access.key", "admin") + .config("spark.hadoop.fs.s3a.secret.key", "adminadmin") + .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") + .config("spark.hadoop.fs.s3a.path.style.access", "true") + .config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") + # Timezone + .config("spark.sql.session.timeZone", "UTC") + .getOrCreate() + ) + + +def telemetry_sources(num_channels, spark): buffer = channel.LocalBuffer() channel.run_with_timeout( source=telemetry_generator.telemetry_generator, @@ -99,7 +143,7 @@ def telemetry_sources(num_channels): telemetry_df = buffer.dataframe(spark, telemetry.telemetry_schema) statistics_step_df = statistics_step.statistics_step(telemetry_df) statistics_cycle_df = statistics_cycle.statistics_cycle(statistics_step_df) - return telemetry_df.toPandas(), statistics_step_df.toPandas(), statistics_cycle_df.toPandas() + return telemetry_df, statistics_step_df, statistics_cycle_df def metadata_sources(statistics_cycle_first, statistics_cycle_second): @@ -158,17 +202,25 @@ def metadata_sources(statistics_cycle_first, statistics_cycle_second): return device_test_part, device_test_recipe, device_metadata, part_metadata, recipe_metadata -def seed_duckdb(): +def seed_duckdb(spark): conn = duckdb.connect(database=os.environ["PULSE_ANALYTICS_DUCKDB_PATH"]) cursor = conn.cursor() # First set of tests (five parts put onto test) - telemetry_df, statistics_step_df, statistics_cycle_df = telemetry_sources(num_channels=5) + telemetry_df, statistics_step_df, statistics_cycle_df = telemetry_sources(num_channels=5, spark=spark) + telemetry_df = telemetry_df.toPandas() + statistics_step_df = statistics_step_df.toPandas() + statistics_cycle_df = statistics_cycle_df.toPandas() cursor.execute("CREATE SCHEMA IF NOT EXISTS telemetry") cursor.execute("CREATE TABLE telemetry.telemetry AS SELECT * FROM telemetry_df") cursor.execute("CREATE TABLE telemetry.statistics_step AS SELECT * FROM statistics_step_df") cursor.execute("CREATE TABLE telemetry.statistics_cycle AS SELECT * FROM statistics_cycle_df") # Second set of tests (one part is not put back on test) - telemetry_df_second, statistics_step_df_second, statistics_cycle_df_second = telemetry_sources(num_channels=4) + telemetry_df_second, statistics_step_df_second, statistics_cycle_df_second = telemetry_sources( + num_channels=4, spark=spark + ) + telemetry_df_second = telemetry_df_second.toPandas() + statistics_step_df_second = statistics_step_df_second.toPandas() + statistics_cycle_df_second = statistics_cycle_df_second.toPandas() cursor.execute("INSERT INTO telemetry.telemetry SELECT * FROM telemetry_df_second") cursor.execute("INSERT INTO telemetry.statistics_step SELECT * FROM statistics_step_df_second") cursor.execute("INSERT INTO telemetry.statistics_cycle SELECT * FROM statistics_cycle_df_second") @@ -187,13 +239,102 @@ def seed_duckdb(): conn.close() +def seed_trino(spark): + # Generate the test data (2 sets of tests) + telemetry_df, statistics_step_df, statistics_cycle_df = telemetry_sources(num_channels=5, spark=spark) + telemetry_df_second, statistics_step_df_second, statistics_cycle_df_second = telemetry_sources( + num_channels=4, spark=spark + ) + + # Load telemetry data + iceberg.create_table_if_not_exists( + spark, "lakehouse", "telemetry", "telemetry", telemetry.telemetry_comment, telemetry.telemetry_schema + ) + iceberg.merge_into_table( + spark, telemetry_df, "lakehouse", "telemetry", "telemetry", telemetry.telemetry_composite_key + ) + iceberg.merge_into_table( + spark, telemetry_df_second, "lakehouse", "telemetry", "telemetry", telemetry.telemetry_composite_key + ) + iceberg.create_table_if_not_exists( + spark, + "lakehouse", + "telemetry", + "statistics_step", + statistics_step.statistics_step_comment, + statistics_step.statistics_step_schema, + ) + iceberg.merge_into_table( + spark, + statistics_step_df, + "lakehouse", + "telemetry", + "statistics_step", + statistics_step.statistics_step_composite_key, + ) + iceberg.merge_into_table( + spark, + statistics_step_df_second, + "lakehouse", + "telemetry", + "statistics_step", + statistics_step.statistics_step_composite_key, + ) + iceberg.create_table_if_not_exists( + spark, + "lakehouse", + "telemetry", + "statistics_cycle", + statistics_cycle.statistics_cycle_comment, + statistics_cycle.statistics_cycle_schema, + ) + iceberg.merge_into_table( + spark, + statistics_cycle_df, + "lakehouse", + "telemetry", + "statistics_cycle", + statistics_cycle.statistics_cycle_composite_key, + ) + iceberg.merge_into_table( + spark, + statistics_cycle_df_second, + "lakehouse", + "telemetry", + "statistics_cycle", + statistics_cycle.statistics_cycle_composite_key, + ) + + # Generate metadata (based on first and second tests) + device_test_part, device_test_recipe, device_metadata, part_metadata, recipe_metadata = metadata_sources( + statistics_cycle_df.toPandas(), statistics_cycle_df_second.toPandas() + ) + + # Load metadata tables + engine = create_engine( + "trino://admin:admin@localhost:8443/lakehouse", + connect_args={ + "http_scheme": "https", # Use "http" if Trino doesn't use SSL + "verify": False, # For production, provide CA path: "path/to/ca.crt" + }, + ) + with engine.connect() as conn: + conn.execute(text("CREATE SCHEMA IF NOT EXISTS metadata")) + device_test_part.to_sql("device_test_part", con=engine, schema="metadata", if_exists="replace", index=False) + device_test_recipe.to_sql("device_test_recipe", con=engine, schema="metadata", if_exists="replace", index=False) + device_metadata.to_sql("device_metadata", con=engine, schema="metadata", if_exists="replace", index=False) + part_metadata.to_sql("part_metadata", con=engine, schema="metadata", if_exists="replace", index=False) + recipe_metadata.to_sql("recipe_metadata", con=engine, schema="metadata", if_exists="replace", index=False) + + @pytest.fixture(scope="session") -def seed_database(dbt_target, setup_environment): +def seed_database(dbt_target, setup_environment, spark): match dbt_target: case "duckdb": - seed_duckdb() + seed_duckdb(spark) yield case "trino": + seed_trino(spark) yield @@ -213,7 +354,10 @@ def launch_dbt(dbt_target, seed_database): case "trino": launch_script_path = os.path.join(scripts_dir, "launch_dbt_kubernetes.sh") os.chmod(launch_script_path, os.stat(launch_script_path).st_mode | stat.S_IEXEC) - subprocess.run([launch_script_path], cwd=manifests_dir, check=True) + try: + subprocess.run([launch_script_path], cwd=manifests_dir, check=True) + except subprocess.CalledProcessError: + pytest.fail("DBT run failure", pytrace=False) yield @@ -229,4 +373,14 @@ def database_cursor(dbt_target, launch_dbt): yield conn.cursor() conn.close() case "trino": - yield trino.dbapi.connect() + conn = trino.dbapi.connect( + host="localhost", + port=8443, + http_scheme="https", + verify=False, + user="admin", + catalog="lakehouse", + auth=trino.auth.BasicAuthentication("admin", "admin"), + ) + yield conn.cursor() + conn.close() diff --git a/tests/manifests/dbt-job.yaml b/tests/manifests/dbt-job.yaml new file mode 100644 index 0000000..37e910d --- /dev/null +++ b/tests/manifests/dbt-job.yaml @@ -0,0 +1,53 @@ +apiVersion: batch/v1 +kind: Job +metadata: + name: dbt-job +spec: + template: + spec: + containers: + - name: dbt + image: pulse-analytics:latest + imagePullPolicy: Never + envFrom: + - configMapRef: + name: dbt-config + - secretRef: + name: trino-users # Use the `trino-users` secret + env: + - name: PULSE_ANALYTICS_TRINO_PASSWORD + valueFrom: + secretKeyRef: + name: trino-users + key: admin # Reference the `admin` key in the secret + restartPolicy: Never +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: dbt-config +data: + # Trino connection + PULSE_ANALYTICS_TRINO_HOST: "trino-cluster-coordinator.default.svc.cluster.local" + PULSE_ANALYTICS_TRINO_PORT: "8443" + PULSE_ANALYTICS_TRINO_USER: "admin" + + # Target catalog + PULSE_ANALYTICS_TARGET_CATALOG: "lakehouse" + PULSE_ANALYTICS_TARGET_SCHEMA: "analytics" + + # Telemetry sources + PULSE_ANALYTICS_TELEMETRY_CATALOG: "lakehouse" + PULSE_ANALYTICS_TELEMETRY_SCHEMA: "telemetry" + PULSE_ANALYTICS_TELEMETRY_TABLE: "telemetry" + PULSE_ANALYTICS_STATISTICS_STEP_TABLE: "statistics_step" + PULSE_ANALYTICS_STATISTICS_CYCLE_TABLE: "statistics_cycle" + + # Metadata sources + PULSE_ANALYTICS_METADATA_CATALOG: "lakehouse" + PULSE_ANALYTICS_METADATA_SCHEMA: "metadata" + PULSE_ANALYTICS_DEVICE_METADATA_TABLE: "device_metadata" + PULSE_ANALYTICS_RECIPE_METADATA_TABLE: "recipe_metadata" + PULSE_ANALYTICS_DEVICE_TEST_RECIPE_TABLE: "device_test_recipe" + PULSE_ANALYTICS_PART_METADATA_TABLE: "part_metadata" + PULSE_ANALYTICS_DEVICE_TEST_PART_TABLE: "device_test_part" diff --git a/tests/manifests/hive.yaml b/tests/manifests/hive.yaml new file mode 100644 index 0000000..408dc25 --- /dev/null +++ b/tests/manifests/hive.yaml @@ -0,0 +1,27 @@ +apiVersion: hive.stackable.tech/v1alpha1 +kind: HiveCluster +metadata: + name: hive +spec: + image: + productVersion: 3.1.3 + clusterConfig: + database: + connString: jdbc:derby:;databaseName=/tmp/hive;create=true + credentialsSecret: hive-credentials + dbType: derby + s3: + reference: minio + metastore: + roleGroups: + default: + replicas: 1 +--- +apiVersion: v1 +kind: Secret +metadata: + name: hive-credentials +type: Opaque +stringData: + username: APP + password: mine \ No newline at end of file diff --git a/tests/manifests/s3-connection.yaml b/tests/manifests/s3-connection.yaml new file mode 100644 index 0000000..636e007 --- /dev/null +++ b/tests/manifests/s3-connection.yaml @@ -0,0 +1,30 @@ +apiVersion: s3.stackable.tech/v1alpha1 +kind: S3Connection +metadata: + name: minio +spec: + host: minio + port: 9000 + accessStyle: Path + credentials: + secretClass: minio-s3-credentials +--- +apiVersion: secrets.stackable.tech/v1alpha1 +kind: SecretClass +metadata: + name: minio-s3-credentials +spec: + backend: + k8sSearch: + searchNamespace: + pod: {} +--- +apiVersion: v1 +kind: Secret +metadata: + name: minio-s3-credentials + labels: + secrets.stackable.tech/class: minio-s3-credentials +stringData: + accessKey: admin + secretKey: adminadmin \ No newline at end of file diff --git a/tests/manifests/trino.yaml b/tests/manifests/trino.yaml new file mode 100644 index 0000000..bf03057 --- /dev/null +++ b/tests/manifests/trino.yaml @@ -0,0 +1,80 @@ +--- +apiVersion: trino.stackable.tech/v1alpha1 +kind: TrinoCluster +metadata: + name: trino-cluster +spec: + image: + productVersion: "451" + clusterConfig: + catalogLabelSelector: + matchLabels: + trino: trino + listenerClass: cluster-internal + authentication: + - authenticationClass: trino-users + coordinators: + roleGroups: + default: + replicas: 1 + podOverrides: + spec: + initContainers: + - name: prepare + resources: + requests: + memory: "512Mi" + containers: + - name: trino + resources: + requests: + memory: "512Mi" + workers: + roleGroups: + default: + replicas: 1 + podOverrides: + spec: + initContainers: + - name: prepare + resources: + requests: + memory: "512Mi" + containers: + - name: trino + resources: + requests: + memory: "512Mi" +--- +apiVersion: trino.stackable.tech/v1alpha1 +kind: TrinoCatalog +metadata: + name: lakehouse + labels: + trino: trino +spec: + connector: + iceberg: + metastore: + configMap: hive + s3: + reference: minio +--- +apiVersion: authentication.stackable.tech/v1alpha1 +kind: AuthenticationClass +metadata: + name: trino-users +spec: + provider: + static: + userCredentialsSecret: + name: trino-users +--- +apiVersion: v1 +kind: Secret +metadata: + name: trino-users +type: kubernetes.io/opaque +stringData: + admin: admin +--- \ No newline at end of file diff --git a/tests/scripts/launch_dbt_kubernetes.sh b/tests/scripts/launch_dbt_kubernetes.sh new file mode 100755 index 0000000..f1e3f42 --- /dev/null +++ b/tests/scripts/launch_dbt_kubernetes.sh @@ -0,0 +1,5 @@ +#!/bin/bash + +kubectl apply -f dbt-job.yaml +kubectl wait --for=condition=complete --timeout=180s job/dbt-job +kubectl delete -f dbt-job.yaml diff --git a/tests/scripts/setup_kubernetes.sh b/tests/scripts/setup_kubernetes.sh new file mode 100755 index 0000000..0b13123 --- /dev/null +++ b/tests/scripts/setup_kubernetes.sh @@ -0,0 +1,88 @@ +#!/bin/bash + +wait_for_pod_ready() { + local app_label=$1 + local timeout=180 # 120 seconds timeout + local interval=10 # Check every 10 seconds + local start_time=$(date +%s) + + # Get the pod name based on the app label + pod_name=$(kubectl get pods -l $app_label -o jsonpath='{.items[0].metadata.name}') + if [ -z "$pod_name" ]; then + echo "No pod found with label: $app_label" + return 1 + fi + + # Loop to check pod readiness + echo "Waiting for pod '$pod_name' to be ready..." + while true; do + # Check if the pod is ready + ready_status=$(kubectl get pod "$pod_name" -o jsonpath='{.status.conditions[?(@.type=="Ready")].status}') + if [ "$ready_status" == "True" ]; then + echo "Pod '$pod_name' is ready." + return 0 + fi + + # Check if the timeout has been reached + current_time=$(date +%s) + elapsed_time=$((current_time - start_time)) + if [ "$elapsed_time" -ge "$timeout" ]; then + echo "Timeout reached. Pod '$pod_name' is not ready." + return 1 + fi + echo "Pod '$pod_name' is not ready yet. Checking again in $interval seconds..." + sleep $interval + done +} + +# Create kind cluster +kind create cluster + +# Update helm repositories +helm repo add stackable https://repo.stackable.tech/repository/helm-stable/ +helm repo add minio https://operator.min.io/ +helm repo add bitnami https://charts.bitnami.com/bitnami +helm repo update + +# Deploy stackable operators +echo "Deploying stackable operators..." +helm install commons-operator stackable/commons-operator --version 24.7.0 +helm install secret-operator stackable/secret-operator --version 24.7.0 +helm install hive-operator stackable/hive-operator --version 24.7.0 +helm install trino-operator stackable/trino-operator --version 24.7.0 + +# Deploy minio service +echo "Deploying minio..." +helm install minio \ +--version 4.0.2 \ +--set mode=standalone \ +--set replicas=1 \ +--set persistence.enabled=false \ +--set buckets[0].name=lakehouse \ +--set buckets[0].policy=public \ +--set rootUser=admin \ +--set rootPassword=adminadmin \ +--set resources.requests.memory=1Gi \ +--set service.type=NodePort,service.nodePort=null \ +--set consoleService.type=NodePort,consoleService.nodePort=null \ +--repo https://charts.min.io/ minio +echo "Waiting for minio pod..." +sleep 2 +wait_for_pod_ready "app=minio" + +# Deploy s3 connection +echo "Deploying s3 connection..." +kubectl apply -f s3-connection.yaml + +# Deploy hive service +echo "Deploying hive..." +kubectl apply -f hive.yaml +echo "Waiting for hive pod..." +sleep 30 +wait_for_pod_ready "app.kubernetes.io/name=hive" + +# Deploy trino service +echo "Deploying trino..." +kubectl apply -f trino.yaml +sleep 30 +wait_for_pod_ready "app.kubernetes.io/name=trino" diff --git a/tests/test_marts.py b/tests/test_marts.py index b0db036..10c58ee 100644 --- a/tests/test_marts.py +++ b/tests/test_marts.py @@ -55,7 +55,8 @@ def test_mart_columns(database_cursor, source_table, metadata_tables, mart_table # Verifies that all metadata columns are present in the mart for i in metadata_tables: metadata = get_dataframe(database_cursor, f"SELECT * FROM {i}") - missing_metadata_columns = set(metadata.columns) - set(mart.columns) + mart_columns_without_prefix = [i[4:] for i in mart.columns] + missing_metadata_columns = set(metadata.columns) - set(mart_columns_without_prefix) assert ( not missing_metadata_columns ), f"Mart table {mart_table} is missing columns from metadata: {missing_metadata_columns}" diff --git a/tests/test_sources.py b/tests/test_sources.py index 0132a47..b375913 100644 --- a/tests/test_sources.py +++ b/tests/test_sources.py @@ -25,7 +25,3 @@ def test_sources(database_cursor, table_name, view_name): assert len(table_contents[0]) == len( view_contents[0] ), f"Columns should be the same for {table_name} and {view_name}" - - # View should have the same data as the backing table - for row_table, row_view in zip(table_contents, view_contents, strict=False): - assert row_table == row_view, f"Data mismatch between {table_name} and {view_name}"