Skip to content

Commit

Permalink
Add tags to the emr jobs based on the query types
Browse files Browse the repository at this point in the history
Signed-off-by: Vamsi Manohar <[email protected]>
  • Loading branch information
vmmusings committed Sep 27, 2023
1 parent be82714 commit cb0ad4e
Show file tree
Hide file tree
Showing 25 changed files with 3,713 additions and 133 deletions.
18 changes: 11 additions & 7 deletions docs/user/interfaces/asyncqueryinterface.rst
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,16 @@ Async Query Creation API
======================================
If security plugin is enabled, this API can only be invoked by users with permission ``cluster:admin/opensearch/ql/async_query/create``.

HTTP URI: _plugins/_async_query
HTTP VERB: POST
HTTP URI: ``_plugins/_async_query``

HTTP VERB: ``POST``

Sample Request::

curl --location 'http://localhost:9200/_plugins/_async_query' \
--header 'Content-Type: application/json' \
--data '{
"kind" : "sql",
"lang" : "sql",
"query" : "select * from my_glue.default.http_logs limit 10"
}'

Expand All @@ -60,8 +61,9 @@ Async Query Result API
If security plugin is enabled, this API can only be invoked by users with permission ``cluster:admin/opensearch/ql/async_query/result``.
Async Query Creation and Result Query permissions are orthogonal, so any user with result api permissions and queryId can query the corresponding query results irrespective of the user who created the async query.

HTTP URI: _plugins/_async_query/{queryId}
HTTP VERB: GET
HTTP URI: ``_plugins/_async_query/{queryId}``

HTTP VERB: ``GET``

Sample Request BODY::

Expand All @@ -75,6 +77,7 @@ Sample Response if the Query is in Progress ::
Sample Response If the Query is successful ::

{
"status": "SUCCESS",
"schema": [
{
"name": "indexed_col_name",
Expand Down Expand Up @@ -105,8 +108,9 @@ Async Query Cancellation API
======================================
If security plugin is enabled, this API can only be invoked by users with permission ``cluster:admin/opensearch/ql/jobs/delete``.

HTTP URI: _plugins/_async_query/{queryId}
HTTP VERB: DELETE
HTTP URI: ``_plugins/_async_query/{queryId}``

HTTP VERB: ``DELETE``

Sample Request Body ::

Expand Down
17 changes: 10 additions & 7 deletions plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@
import org.opensearch.sql.spark.asyncquery.AsyncQueryExecutorServiceImpl;
import org.opensearch.sql.spark.asyncquery.AsyncQueryJobMetadataStorageService;
import org.opensearch.sql.spark.asyncquery.OpensearchAsyncQueryJobMetadataStorageService;
import org.opensearch.sql.spark.client.EmrServerlessClientImpl;
import org.opensearch.sql.spark.client.SparkJobClient;
import org.opensearch.sql.spark.client.EMRServerlessClient;
import org.opensearch.sql.spark.client.EmrServerlessClientImplEMR;
import org.opensearch.sql.spark.config.SparkExecutionEngineConfig;
import org.opensearch.sql.spark.dispatcher.SparkQueryDispatcher;
import org.opensearch.sql.spark.response.JobExecutionResponseReader;
Expand Down Expand Up @@ -297,20 +297,23 @@ private DataSourceServiceImpl createDataSourceService() {
private AsyncQueryExecutorService createAsyncQueryExecutorService() {
AsyncQueryJobMetadataStorageService asyncQueryJobMetadataStorageService =
new OpensearchAsyncQueryJobMetadataStorageService(client, clusterService);
SparkJobClient sparkJobClient = createEMRServerlessClient();
EMRServerlessClient EMRServerlessClient = createEMRServerlessClient();
JobExecutionResponseReader jobExecutionResponseReader = new JobExecutionResponseReader(client);
SparkQueryDispatcher sparkQueryDispatcher =
new SparkQueryDispatcher(
sparkJobClient, this.dataSourceService, jobExecutionResponseReader);
EMRServerlessClient,
this.dataSourceService,
new DataSourceUserAuthorizationHelperImpl(client),
jobExecutionResponseReader);
return new AsyncQueryExecutorServiceImpl(
asyncQueryJobMetadataStorageService, sparkQueryDispatcher, pluginSettings);
}

private SparkJobClient createEMRServerlessClient() {
private EMRServerlessClient createEMRServerlessClient() {
String sparkExecutionEngineConfigString =
this.pluginSettings.getSettingValue(SPARK_EXECUTION_ENGINE_CONFIG);
return AccessController.doPrivileged(
(PrivilegedAction<SparkJobClient>)
(PrivilegedAction<EMRServerlessClient>)
() -> {
SparkExecutionEngineConfig sparkExecutionEngineConfig =
SparkExecutionEngineConfig.toSparkExecutionEngineConfig(
Expand All @@ -320,7 +323,7 @@ private SparkJobClient createEMRServerlessClient() {
.withRegion(sparkExecutionEngineConfig.getRegion())
.withCredentials(new DefaultAWSCredentialsProviderChain())
.build();
return new EmrServerlessClientImpl(awsemrServerless);
return new EmrServerlessClientImplEMR(awsemrServerless);
});
}
}
36 changes: 33 additions & 3 deletions spark/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,42 @@ plugins {
id 'java-library'
id "io.freefair.lombok"
id 'jacoco'
id 'antlr'
}

repositories {
mavenCentral()
}

tasks.register('downloadG4Files', Exec) {
description = 'Download remote .g4 files from GitHub'

executable 'curl'

// Need to add these back once the grammar issues with indexName and tableName is addressed in flint integration jar.
// args '-o', 'src/main/antlr/FlintSparkSqlExtensions.g4', 'https://raw.githubusercontent.com/opensearch-project/opensearch-spark/main/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4'
// args '-o', 'src/main/antlr/SparkSqlBase.g4', 'https://raw.githubusercontent.com/opensearch-project/opensearch-spark/main/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4'
args '-o', 'src/main/antlr/SqlBaseParser.g4', 'https://raw.githubusercontent.com/apache/spark/master/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4'
args '-o', 'src/main/antlr/SqlBaseLexer.g4', 'https://raw.githubusercontent.com/apache/spark/master/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4'
}

generateGrammarSource {
arguments += ['-visitor', '-package', 'org.opensearch.sql.spark.antlr.parser']
source = sourceSets.main.antlr
outputDirectory = file("build/generated-src/antlr/main/org/opensearch/sql/spark/antlr/parser")
}
configurations {
compile {
extendsFrom = extendsFrom.findAll { it != configurations.antlr }
}
}

// Make sure the downloadG4File task runs before the generateGrammarSource task
generateGrammarSource.dependsOn downloadG4Files

dependencies {
antlr "org.antlr:antlr4:4.7.1"

api project(':core')
implementation project(':protocol')
implementation project(':datasources')
Expand Down Expand Up @@ -46,7 +75,7 @@ jacocoTestReport {
}
afterEvaluate {
classDirectories.setFrom(files(classDirectories.files.collect {
fileTree(dir: it)
fileTree(dir: it, exclude: ['**/antlr/parser/**'])
}))
}
}
Expand All @@ -61,7 +90,8 @@ jacocoTestCoverageVerification {
'org.opensearch.sql.spark.rest.*',
'org.opensearch.sql.spark.transport.model.*',
'org.opensearch.sql.spark.asyncquery.model.*',
'org.opensearch.sql.spark.asyncquery.exceptions.*'
'org.opensearch.sql.spark.asyncquery.exceptions.*',
'org.opensearch.sql.spark.dispatcher.model.*'
]
limit {
counter = 'LINE'
Expand All @@ -75,7 +105,7 @@ jacocoTestCoverageVerification {
}
afterEvaluate {
classDirectories.setFrom(files(classDirectories.files.collect {
fileTree(dir: it)
fileTree(dir: it, exclude: ['**/antlr/parser/**'])
}))
}
}
Expand Down
91 changes: 91 additions & 0 deletions spark/src/main/antlr/FlintSparkSqlExtensions.g4
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

grammar FlintSparkSqlExtensions;

import SparkSqlBase;


// Flint SQL Syntax Extension

singleStatement
: statement SEMICOLON* EOF
;

statement
: skippingIndexStatement
| coveringIndexStatement
;

skippingIndexStatement
: createSkippingIndexStatement
| refreshSkippingIndexStatement
| describeSkippingIndexStatement
| dropSkippingIndexStatement
;

createSkippingIndexStatement
: CREATE SKIPPING INDEX ON tableName
LEFT_PAREN indexColTypeList RIGHT_PAREN
(WITH LEFT_PAREN propertyList RIGHT_PAREN)?
;

refreshSkippingIndexStatement
: REFRESH SKIPPING INDEX ON tableName
;

describeSkippingIndexStatement
: (DESC | DESCRIBE) SKIPPING INDEX ON tableName
;

dropSkippingIndexStatement
: DROP SKIPPING INDEX ON tableName
;

coveringIndexStatement
: createCoveringIndexStatement
| refreshCoveringIndexStatement
| showCoveringIndexStatement
| describeCoveringIndexStatement
| dropCoveringIndexStatement
;

createCoveringIndexStatement
: CREATE INDEX indexName ON tableName
LEFT_PAREN indexColumns=multipartIdentifierPropertyList RIGHT_PAREN
(WITH LEFT_PAREN propertyList RIGHT_PAREN)?
;

refreshCoveringIndexStatement
: REFRESH INDEX indexName ON tableName
;

showCoveringIndexStatement
: SHOW (INDEX | INDEXES) ON tableName
;

describeCoveringIndexStatement
: (DESC | DESCRIBE) INDEX indexName ON tableName
;

dropCoveringIndexStatement
: DROP INDEX indexName ON tableName
;

indexColTypeList
: indexColType (COMMA indexColType)*
;

indexColType
: identifier skipType=(PARTITION | VALUE_SET | MIN_MAX)
;

indexName
: identifier
;

tableName
: multipartIdentifier
;
Loading

0 comments on commit cb0ad4e

Please sign in to comment.