Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(RFC) Integration Tests on Apache Spark and Spark EMR #992

Open
normanj-bitquill opened this issue Dec 13, 2024 · 10 comments
Open

(RFC) Integration Tests on Apache Spark and Spark EMR #992

normanj-bitquill opened this issue Dec 13, 2024 · 10 comments
Labels
Lang:PPL Pipe Processing Language support testing test related feature

Comments

@normanj-bitquill
Copy link
Contributor

Problem Overview

Work is underway to create the files needed for starting Docker clusters that can be used for integration testing. There will initially be two clusters, one for testing with Apache Spark and one for testing with Spark EMR.

The integration tests should be able switch between the two clusters (and any future clusters) without any changes.

The integration tests will run from either SBT or a standalone script. This to allow setting up CI steps for running the integration tests as well as running them locally.

Need an execution model for the integration tests that will work with both Apache Spark and Spark EMR.

Proposed Solution

Structure the tests as a set of queries. Each query will have an expected query plan, and expected results (if the query succeeds). These tests can be made available to the Spark container in a bound directory. There will be another bound directory for holding the test results and query plans.

The bound directories are:

  • /tests/queries - Each query to run is in a separate file
  • /tests/actual_query_plans
  • /tests/actual_results

A Spark application is created that runs the integration tests. The application will look in the directory /tests/queries. For each query file that it finds, it will:

  • Run the query and save the results to /tests/actual_results
  • Call explain on the query and save the results to /tests/actual_query_plans

The SBT build is updated for the integration test phase to do the following:

  1. Build the Spark application
  2. Start a Docker cluster (chosen based on a defined property or default to Apache Spark). The Spark container will run the Spark application and then shut down.
  3. The SBT build will wait for the Spark container to shutdown.
  4. Shutdown the Docker cluster
  5. Use a local Scala integration test framework to verify the results of of the tests. The framework could generate HTML reports.

This solution does not involve connecting remotely to the Spark container. Since Spark is only running a Spark application, the solution will work for both Apache Spark and Spark EMR.

Docker Clusters

Each Docker cluster will contain the following:

  • Spark container
  • OpenSearch server container
  • OpenSearch Dashboards container
  • Minio container

The Spark container is configured with both the Flint and PPL extensions, enabling it to both execute PPL queries and query indices on the OpenSearch server.

The OpenSearch Dashboards container is configured to connect to the OpenSearch server container.

The Spark container is started up as a driver and runs the Spark application.

@normanj-bitquill
Copy link
Contributor Author

I did some more testing and found another approach that could work.

Create a simple Spark app.

  1. Look in a bound directory for one or more queries to run
  2. Run each query and save the the result dataframes to files in the bound directory

For the integration test to use this, a special SparkSession is created. Start the docker cluster before any tests are run. When SparkSession.sql() is called on the special SparkSession:

  1. Write the query to a file in the bound directory
  2. Start the Spark EMR container. It will quickly run and then stop.
  3. Load the results from the dataframe written to the bound directory
  4. Return the dataframe

The integration tests would then be able to quickly switch between Apache Spark and Spark EMR. For Apache Spark, Spark Connect is used. For Spark EMR, the new SparkSession is used.

One downside is that the starting the docker container for each query will have more overhead and add latency. The container is reused though, and not recreated for each query.

@YANG-DB YANG-DB added Lang:PPL Pipe Processing Language support testing test related feature and removed untriaged labels Dec 18, 2024
@YANG-DB
Copy link
Member

YANG-DB commented Dec 18, 2024

@normanj-bitquill
I like this approach, adding the following requirements for the Tests use cases:

  • use PPL via OpenSearch in the following sequence to test full E2E :

OpenSearch->spark(EMR)->glue->S3->spark(EMR)->OpenSearch

This use case will contains:

Step1:
With OpenSearch security plugin disabled

POST http://{{OpenSearch host:port}}/_plugins/_query/_datasources
content-type: application/json

'{
    "name" : "my_spark",
    "connector": "spark",
    "properties" : {
        "spark.connector": "emr",
        "emr.cluster" : "{{EMR cluster ID}}",
        "emr.auth.type" : "awssigv4",
        "emr.auth.region" : "{{EMR region}}",
        "emr.auth.access_key" : "{{EMR user access key}}",
        "emr.auth.secret_key" : "{{EMR user secret key}}",
        "spark.datasource.flint.host" : "{{OpenSearch host (DNS)}}",
        "spark.datasource.flint.port" : "{{OpenSearch port}}",
        "spark.datasource.flint.scheme" : "http",
        "spark.datasource.flint.auth" : "false",
        "spark.datasource.flint.region" : "{{OpenSearch region}}"
    }
}'

Example: 
curl -XPOST http://localhost:9200/_plugins/_query/_datasources -H 'Content-Type: application/json' -d '
{
    "name" : "my_spark",
    "connector": "spark",
    "properties" : {
        "spark.connector": "emr",
        "emr.cluster" : "name",
        "spark.datasource.flint.host" : "localhost",
        "spark.datasource.flint.port" : "9200",
        "spark.datasource.flint.scheme" : "http",
        "spark.datasource.flint.auth" : "false"
    }
}'

Step2:

Query Endpoint and Syntax

With OpenSearch security plugin disabled

POST http://{{OpenSearch host:port}}/_plugins/_ppl
content-type: application/json

{
    "query": "source = my_spark.sql(\" {{Spark SQL query}} \")"
}

Example:
curl --header "Content-Type: application/json" \
  --request POST \
  --data '{"query": "source = my_spark4.sql(\" SELECT 1 \")"}' \
  localhost:9200/_plugins/_ppl 
  
curl --header "Content-Type: application/json" \
  --request POST \
  --data '{"query": "source = my_spark.sql(\" SHOW TABLES \")"}' \
 localhost:9200/_plugins/_ppl 

@normanj-bitquill
Copy link
Contributor Author

As discussed offline:

Glue is an AWS service. There are docker images of Spark that include the Glue libraries. These images are able to use catalogs from Glue by making remote calls to AWS Glue. I cannot find a docker image of an AWS Glue server.

It may be possible to mock out the AWS Glue server, but another option is to use Hive and S3 (minio) instead.

@normanj-bitquill
Copy link
Contributor Author

I have tested configuring Spark to use the Minio server for storing the data store. This works, but I doubt that it provides any extra value when running queries in Spark. Spark uses Hadoop to access S3. The only OpenSearch code that could be involved in this setup is the PPL extension. The Flint extension requires async query working on OpenSearch.

I have looked further into adding an S3 datasource in OpenSearch. In the docker environment, I was able to add an S3 Glue datasource. When trying to run an async query, it fails when OpenSearch tries to make a call using the AWSEMRServerless client.

A possibly simple solution is to create a Jar file with replacements for the classes AWSEMRServerless and AWSEMRServerlessClientBuilder. The Jar would be placed in the beginning of the classpath. When AWSEMRServerless.startJobRun() is called, it can either call submit for a running Spark master or startup a docker instance of EMR Spark. We would also need to implement job status and possibly cancelling a job. Hopefully this is the minimal amount of stubbing needed.

For our testing, we want Spark to have an OpenSearch catalog that uses async query on OpenSearch.

@normanj-bitquill
Copy link
Contributor Author

Working through a quick test of the above, shows some promise. For my testing, I started with an AWSEMRServerless that returns a constant response for startJobRun(). With this, I was able to add an S3/Glue data source in OpenSearch dashboards. I then tried an async query of a table in that datasource. When getting the async query status, it gives a status of "waiting".

Next steps:

  1. Create a WIP PR of the work so far.
  2. Create the minimal code to run jobs on Spark and return the results to OpenSearch.

@YANG-DB
Copy link
Member

YANG-DB commented Dec 23, 2024

thanks @normanj-bitquill - sound like you have made progress!!

I would even simplify this by adding an environment param for selecting the implementation of the AWSEMRServerless if that make sense so class order would not be our issue.

I think you can add such code code in the AsyncExecutorServiceModule in createEMRServerlessClientFactory function call:

  @Provides
  public EMRServerlessClientFactory createEMRServerlessClientFactory(
      SparkExecutionEngineConfigSupplier sparkExecutionEngineConfigSupplier,
      MetricsService metricsService) {
    // based on config param load the correct EMRServerlessClientFactory(
    return new EMRServerlessClientFactoryImpl(sparkExecutionEngineConfigSupplier, metricsService);
  }

Let me know what you think ?
thanks

@normanj-bitquill
Copy link
Contributor Author

@YANG-DB The only issue here is the time to get the change you suggested into an OpenSearch docker image that the opensearch-spark repo can use. The process is probably something like:

  1. Create PR with change
  2. Get it merged
  3. Wait for a new OpenSearch release
  4. Use the docker image for the new OpenSearch release

This is the correct way forward, but continue forward in the near term, I'll use an altered aws-java-sdk-emrserverless-1.12.651.jar file.

@normanj-bitquill
Copy link
Contributor Author

@YANG-DB I have created this PR

It is still in progress. It does handle configuring Minio and adding the S3/Glue datasource in OpenSearch dashboards. Need to work on the replacement EMRServerlessClient now.

@normanj-bitquill
Copy link
Contributor Author

This work has been merged in. There is some documentation in the repository:
https://github.com/opensearch-project/opensearch-spark/tree/main/docs/docker/integ-test#readme

The real goal was to be able to test the Async API using only local resources. Normally Spark EMR is used in processing Async API queries. The integ-test docker cluster bypasses AWS EMR and instead makes use of the local docker environment.

At present, using Apache Spark containers with the PPL and Flint integration extensions is sufficient for running an async query. The Spark application is the spark-sql-application in the opensearch-spark repository.

When OpenSearch would have called AWS EMR to run a query on a Spark EMR container, it will instead make a docker call to start an Apache Spark container. Async API queries have sessions. When a new session is started, OpenSearch will start a new Apache Spark container. The container will continue processing queries for the session for 3 minutes and then shutdown. If more queries for the session are received, a new container is started. It is recommended to reuses sessions as much as possible.

The integ-test docker cluster contains Spark master and worker containers. The Apache Spark containers started for processing async API queries are not configured to use the Spark master or worker containers. This was done so to simplify workload management. At least in a simple configuration, the Spark master would only allow one job to run at a time. This was causing only one session to run at a time. A better configuration may be possible with future work. The most visible impact is that the Spark worker will not know anything about jobs executed on the async API containers.

@YANG-DB
Copy link
Member

YANG-DB commented Jan 27, 2025

@normanj-bitquill thanks for the detailed review !
IMO once we actually add a spark generic client to OpenSearch we could introduce its configurations approach of how to config spark concurrent session submission and execution

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Lang:PPL Pipe Processing Language support testing test related feature
Projects
Status: Design
Development

No branches or pull requests

2 participants