From 3041ef26ad04c0bddd2257a28694aa4e2b4cc837 Mon Sep 17 00:00:00 2001 From: atovk Date: Tue, 1 Dec 2020 18:05:44 +0800 Subject: [PATCH] SUBMARINE-679. Spark Security Plugin Support Spark-3.0.1 ### What is this PR for? Submarine Spark Security Plugin Support Spark-3.x ### What type of PR is it? [Improvement | Feature] ### What is the Jira issue? [SUBMARINE-679](https://issues.apache.org/jira/browse/SUBMARINE-679) ### Questions: * Does this need documentation? YES Author: atovk Closes #463 from atovk/SUBMARINE-679 and squashes the following commits: 56742a9 [atovk] fit scala-style check 90788fa [atovk] SubmarineSqlParser class Compatible for spark2.x and spark3.x b1309f9 [atovk] clean Dependency for test case in profiles hadoop-2.9 (hadoop3.x untested) 27eb9c2 [atovk] add doc available profiles spark-3.0 ba296c6 [atovk] fit test case in spark2.x and spark3.x eee31b5 [atovk] fix ranger in spark3.x subquery Permission filter e002c3d [atovk] fit spark 3.0 Dependency version 138f729 [atovk] add Test submarine spark security with spark 3.0 276af6c [atovk] SUBMARINE-679. Spark Security Plugin Support Spark-3.0.1 --- .travis.yml | 40 ++++++ .../build-submarine-spark-security-plugin.md | 2 +- submarine-security/spark-security/pom.xml | 33 +++++ .../spark/compatible/CompatibleFunc.scala | 34 +++++ .../spark/compatible/SubqueryCompatible.scala | 34 +++++ .../command/CompatibleCommand.scala | 33 +++++ .../security/parser/SubmarineSqlParser.scala | 8 +- .../spark/compatible/CompatibleFunc.scala | 36 ++++++ .../spark/compatible/SubqueryCompatible.scala | 29 +++++ .../command/CompatibleCommand.scala | 36 ++++++ .../security/parser/SubmarineSqlParser.scala | 116 ++++++++++++++++++ .../SubmarineDataMaskingExtension.scala | 5 +- .../SubmarineRowFilterExtension.scala | 7 +- ...ineSparkRangerAuthorizationExtension.scala | 7 +- .../SubmarineShowDatabasesCommand.scala | 14 ++- .../spark/sql/hive/PrivilegesBuilder.scala | 16 ++- .../test/resources/sparkSql_hive_jenkins.json | 77 ++++++++---- ...parkRangerAuthorizationExtensionTest.scala | 6 +- .../spark/security/AuthorizationTest.scala | 8 +- .../submarine/spark/security/TPCDSTest.scala | 3 +- 20 files changed, 491 insertions(+), 53 deletions(-) create mode 100644 submarine-security/spark-security/spark-2/src/main/scala/org/apache/submarine/spark/compatible/CompatibleFunc.scala create mode 100644 submarine-security/spark-security/spark-2/src/main/scala/org/apache/submarine/spark/compatible/SubqueryCompatible.scala create mode 100644 submarine-security/spark-security/spark-2/src/main/scala/org/apache/submarine/spark/compatible/command/CompatibleCommand.scala rename submarine-security/spark-security/{ => spark-2}/src/main/scala/org/apache/submarine/spark/security/parser/SubmarineSqlParser.scala (93%) create mode 100644 submarine-security/spark-security/spark-3/src/main/scala/org/apache/submarine/spark/compatible/CompatibleFunc.scala create mode 100644 submarine-security/spark-security/spark-3/src/main/scala/org/apache/submarine/spark/compatible/SubqueryCompatible.scala create mode 100644 submarine-security/spark-security/spark-3/src/main/scala/org/apache/submarine/spark/compatible/command/CompatibleCommand.scala create mode 100644 submarine-security/spark-security/spark-3/src/main/scala/org/apache/submarine/spark/security/parser/SubmarineSqlParser.scala diff --git a/.travis.yml b/.travis.yml index 3e08a27c35..da0379d269 100644 --- a/.travis.yml +++ b/.travis.yml @@ -333,6 +333,46 @@ matrix: - TEST_FLAG=$BUILD_FLAG - PROFILE="-Pspark-2.4 -Pranger-2.0" - MODULES="-pl :submarine-spark-security" + + - name: Test submarine spark security with spark 3.0 and ranger 1.0 + language: scala + jdk: openjdk8 + env: + - MAVEN_OPTS="-Xmx2g -XX:ReservedCodeCacheSize=512m -Dorg.slf4j.simpleLogger.defaultLogLevel=WARN" + - BUILD_FLAG="--batch-mode clean install -Dmaven.javadoc.skip=true" + - TEST_FLAG=$BUILD_FLAG + - PROFILE="-Pspark-3.0 -Pranger-1.0" + - MODULES="-pl :submarine-spark-security" + + - name: Test submarine spark security with spark 3.0 and ranger 1.1 + language: scala + jdk: openjdk8 + env: + - MAVEN_OPTS="-Xmx2g -XX:ReservedCodeCacheSize=512m -Dorg.slf4j.simpleLogger.defaultLogLevel=WARN" + - BUILD_FLAG="--batch-mode clean install -Dmaven.javadoc.skip=true" + - TEST_FLAG=$BUILD_FLAG + - PROFILE="-Pspark-3.0 -Pranger-1.1" + - MODULES="-pl :submarine-spark-security" + + - name: Test submarine spark security with spark 3.0 and ranger 1.2 + language: scala + jdk: openjdk8 + env: + - MAVEN_OPTS="-Xmx2g -XX:ReservedCodeCacheSize=512m -Dorg.slf4j.simpleLogger.defaultLogLevel=WARN" + - BUILD_FLAG="--batch-mode clean install -Dmaven.javadoc.skip=true" + - TEST_FLAG=$BUILD_FLAG + - PROFILE="-Pspark-3.0 -Pranger-1.2" + - MODULES="-pl :submarine-spark-security" + + - name: Test submarine spark security with spark 3.0 and ranger 2.0 + language: scala + jdk: openjdk8 + env: + - MAVEN_OPTS="-Xmx2g -XX:ReservedCodeCacheSize=512m -Dorg.slf4j.simpleLogger.defaultLogLevel=WARN" + - BUILD_FLAG="--batch-mode clean install -Dmaven.javadoc.skip=true" + - TEST_FLAG=$BUILD_FLAG + - PROFILE="-Pspark-3.0 -Pranger-2.0" + - MODULES="-pl :submarine-spark-security" install: - mvn --version - echo ">>> mvn $BUILD_FLAG $MODULES $PROFILE -B" diff --git a/docs/submarine-security/spark-security/build-submarine-spark-security-plugin.md b/docs/submarine-security/spark-security/build-submarine-spark-security-plugin.md index c58bf4f7c2..ff3011230d 100644 --- a/docs/submarine-security/spark-security/build-submarine-spark-security-plugin.md +++ b/docs/submarine-security/spark-security/build-submarine-spark-security-plugin.md @@ -25,6 +25,6 @@ By default, Submarine Spark Security Plugin is built against Apache Spark 2.3.x Currently, available profiles are: -Spark: -Pspark-2.3, -Pspark-2.4 +Spark: -Pspark-2.3, -Pspark-2.4, -Pspark-3.0 Ranger: -Pranger-1.0, -Pranger-1.1, -Pranger-1.2 -Pranger-2.0 diff --git a/submarine-security/spark-security/pom.xml b/submarine-security/spark-security/pom.xml index ee88738af7..36a1f43e29 100644 --- a/submarine-security/spark-security/pom.xml +++ b/submarine-security/spark-security/pom.xml @@ -46,6 +46,7 @@ submarine_spark_ranger_project 1.1.0 1 + 2 2.11.8 2.11 2.2.6 @@ -60,6 +61,11 @@ 1.9.13 + + org.apache.commons + commons-lang3 + test + org.scala-lang scala-library @@ -321,6 +327,19 @@ + + + add-spark-source + generate-sources + + add-source + + + + spark-${spark.compatible.version}/src/main/scala + + + @@ -551,6 +570,20 @@ + + spark-3.0 + + 3.0.1 + 2.12.10 + 2.12 + + 3 + 3.9 + 2.10.5 + 2.10.5 + + + ranger-1.0 diff --git a/submarine-security/spark-security/spark-2/src/main/scala/org/apache/submarine/spark/compatible/CompatibleFunc.scala b/submarine-security/spark-security/spark-2/src/main/scala/org/apache/submarine/spark/compatible/CompatibleFunc.scala new file mode 100644 index 0000000000..4b2e6dd8d5 --- /dev/null +++ b/submarine-security/spark-security/spark-2/src/main/scala/org/apache/submarine/spark/compatible/CompatibleFunc.scala @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.submarine.spark.compatible + +import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation +import org.apache.spark.sql.execution.command.{AnalyzeColumnCommand, SetDatabaseCommand, ShowDatabasesCommand} + +object CompatibleFunc { + + def getPattern(child: ShowDatabasesCommand) = child.databasePattern + + def getCatLogName(s: SetDatabaseCommand) = s.databaseName + + def analyzeColumnName(column: AnalyzeColumnCommand) = column.columnNames + + def tableIdentifier(u: UnresolvedRelation) = u.tableIdentifier +} diff --git a/submarine-security/spark-security/spark-2/src/main/scala/org/apache/submarine/spark/compatible/SubqueryCompatible.scala b/submarine-security/spark-security/spark-2/src/main/scala/org/apache/submarine/spark/compatible/SubqueryCompatible.scala new file mode 100644 index 0000000000..7c0847f37f --- /dev/null +++ b/submarine-security/spark-security/spark-2/src/main/scala/org/apache/submarine/spark/compatible/SubqueryCompatible.scala @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.submarine.spark.compatible + +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Subquery} + +case class SubqueryCompatible(child: LogicalPlan, correlated: Boolean= false) { + Subquery(child) +} + +object SubqueryCompatible { + // def apply(child: LogicalPlan, correlated: Boolean= false) = Subquery(child) + def unapply(subquery: Subquery): Option[LogicalPlan] = Subquery.unapply(subquery) +} + + + diff --git a/submarine-security/spark-security/spark-2/src/main/scala/org/apache/submarine/spark/compatible/command/CompatibleCommand.scala b/submarine-security/spark-security/spark-2/src/main/scala/org/apache/submarine/spark/compatible/command/CompatibleCommand.scala new file mode 100644 index 0000000000..84325c3d0b --- /dev/null +++ b/submarine-security/spark-security/spark-2/src/main/scala/org/apache/submarine/spark/compatible/command/CompatibleCommand.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.submarine.spark.compatible + +import org.apache.spark.sql.execution.command.{PersistedView, SetDatabaseCommand, ShowDatabasesCommand} + +package object CompatibleCommand { + + type ShowDatabasesCommandCompatible = ShowDatabasesCommand + type SetDatabaseCommandCompatible = SetDatabaseCommand + +} + +object PersistedViewCompatible { + val obj: PersistedView.type = PersistedView +} diff --git a/submarine-security/spark-security/src/main/scala/org/apache/submarine/spark/security/parser/SubmarineSqlParser.scala b/submarine-security/spark-security/spark-2/src/main/scala/org/apache/submarine/spark/security/parser/SubmarineSqlParser.scala similarity index 93% rename from submarine-security/spark-security/src/main/scala/org/apache/submarine/spark/security/parser/SubmarineSqlParser.scala rename to submarine-security/spark-security/spark-2/src/main/scala/org/apache/submarine/spark/security/parser/SubmarineSqlParser.scala index f1fd8d0687..dc8c43e881 100644 --- a/submarine-security/spark-security/src/main/scala/org/apache/submarine/spark/security/parser/SubmarineSqlParser.scala +++ b/submarine-security/spark-security/spark-2/src/main/scala/org/apache/submarine/spark/security/parser/SubmarineSqlParser.scala @@ -43,10 +43,10 @@ class SubmarineSqlParser(val delegate: ParserInterface) extends ParserInterface // scalastyle:off line.size.limit /** - * Fork from `org.apache.spark.sql.catalyst.parser.AbstractSqlParser#parse(java.lang.String, scala.Function1)`. - * - * @see https://github.com/apache/spark/blob/v2.4.4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParseDriver.scala#L81 - */ + * Fork from `org.apache.spark.sql.catalyst.parser.AbstractSqlParser#parse(java.lang.String, scala.Function1)`. + * + * @see https://github.com/apache/spark/blob/v2.4.4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParseDriver.scala#L81 + */ // scalastyle:on private def parse[T](command: String)(toResult: SubmarineSqlBaseParser => T): T = { val lexer = new SubmarineSqlBaseLexer(new UpperCaseCharStream(CharStreams.fromString(command))) diff --git a/submarine-security/spark-security/spark-3/src/main/scala/org/apache/submarine/spark/compatible/CompatibleFunc.scala b/submarine-security/spark-security/spark-3/src/main/scala/org/apache/submarine/spark/compatible/CompatibleFunc.scala new file mode 100644 index 0000000000..5c50ced8ab --- /dev/null +++ b/submarine-security/spark-security/spark-3/src/main/scala/org/apache/submarine/spark/compatible/CompatibleFunc.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.submarine.spark.compatible + +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation +import org.apache.spark.sql.catalyst.plans.logical.{SetCatalogAndNamespace, ShowNamespaces} +import org.apache.spark.sql.execution.command.AnalyzeColumnCommand + +object CompatibleFunc { + + def getPattern(child: ShowNamespaces) = child.pattern + + def getCatLogName(s: SetCatalogAndNamespace) = s.catalogName + + def analyzeColumnName(column: AnalyzeColumnCommand) = column.columnNames.get + + def tableIdentifier(u: UnresolvedRelation) = TableIdentifier.apply(u.tableName) +} diff --git a/submarine-security/spark-security/spark-3/src/main/scala/org/apache/submarine/spark/compatible/SubqueryCompatible.scala b/submarine-security/spark-security/spark-3/src/main/scala/org/apache/submarine/spark/compatible/SubqueryCompatible.scala new file mode 100644 index 0000000000..63b9695307 --- /dev/null +++ b/submarine-security/spark-security/spark-3/src/main/scala/org/apache/submarine/spark/compatible/SubqueryCompatible.scala @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.submarine.spark.compatible + +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Subquery} + +object SubqueryCompatible { + def apply(child: LogicalPlan, correlated: Boolean) = Subquery(child, correlated) + def unapply(subquery: Subquery): Option[(LogicalPlan, Boolean)] = Subquery.unapply(subquery) +} + + diff --git a/submarine-security/spark-security/spark-3/src/main/scala/org/apache/submarine/spark/compatible/command/CompatibleCommand.scala b/submarine-security/spark-security/spark-3/src/main/scala/org/apache/submarine/spark/compatible/command/CompatibleCommand.scala new file mode 100644 index 0000000000..8bd33117c8 --- /dev/null +++ b/submarine-security/spark-security/spark-3/src/main/scala/org/apache/submarine/spark/compatible/command/CompatibleCommand.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.submarine.spark.compatible + +import org.apache.spark.sql.catalyst.analysis.PersistedView +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SetCatalogAndNamespace, ShowNamespaces, Subquery} + + +package object CompatibleCommand { + + type ShowDatabasesCommandCompatible = ShowNamespaces + type SetDatabaseCommandCompatible = SetCatalogAndNamespace +} + +object PersistedViewCompatible { + val obj: PersistedView.type = PersistedView +} + + diff --git a/submarine-security/spark-security/spark-3/src/main/scala/org/apache/submarine/spark/security/parser/SubmarineSqlParser.scala b/submarine-security/spark-security/spark-3/src/main/scala/org/apache/submarine/spark/security/parser/SubmarineSqlParser.scala new file mode 100644 index 0000000000..c6e11d093b --- /dev/null +++ b/submarine-security/spark-security/spark-3/src/main/scala/org/apache/submarine/spark/security/parser/SubmarineSqlParser.scala @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.submarine.spark.security.parser + +import org.antlr.v4.runtime.{CharStreams, CommonTokenStream} +import org.antlr.v4.runtime.atn.PredictionMode +import org.antlr.v4.runtime.misc.ParseCancellationException +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.parser.{ParseErrorListener, ParseException, ParserInterface, PostProcessor} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.trees.Origin +import org.apache.spark.sql.types.{DataType, StructType} + +class SubmarineSqlParser(val delegate: ParserInterface) extends ParserInterface { + + private val astBuilder = new SubmarineSqlAstBuilder + + override def parsePlan(sqlText: String): LogicalPlan = parse(sqlText) { parser => + astBuilder.visit(parser.singleStatement()) match { + case plan: LogicalPlan => plan + case _ => delegate.parsePlan(sqlText) + } + } + + // scalastyle:off line.size.limit + /** + * Fork from `org.apache.spark.sql.catalyst.parser.AbstractSqlParser#parse(java.lang.String, scala.Function1)`. + * + * @see https://github.com/apache/spark/blob/v2.4.4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParseDriver.scala#L81 + */ + // scalastyle:on + private def parse[T](command: String)(toResult: SubmarineSqlBaseParser => T): T = { + val lexer = new SubmarineSqlBaseLexer(new UpperCaseCharStream(CharStreams.fromString(command))) + lexer.removeErrorListeners() + lexer.addErrorListener(ParseErrorListener) + + val tokenStream = new CommonTokenStream(lexer) + val parser = new SubmarineSqlBaseParser(tokenStream) + parser.addParseListener(PostProcessor) + parser.removeErrorListeners() + parser.addErrorListener(ParseErrorListener) + + try { + try { + // first, try parsing with potentially faster SLL mode + parser.getInterpreter.setPredictionMode(PredictionMode.SLL) + toResult(parser) + } catch { + case e: ParseCancellationException => + // if we fail, parse with LL mode + tokenStream.seek(0) // rewind input stream + parser.reset() + + // Try Again. + parser.getInterpreter.setPredictionMode(PredictionMode.LL) + toResult(parser) + } + } catch { + case e: ParseException if e.command.isDefined => + throw e + case e: ParseException => + throw e.withCommand(command) + case e: AnalysisException => + val position = Origin(e.line, e.startPosition) + throw new ParseException(Option(command), e.message, position, position) + } + } + + override def parseExpression(sqlText: String): Expression = { + delegate.parseExpression(sqlText) + } + + override def parseTableIdentifier(sqlText: String): TableIdentifier = { + delegate.parseTableIdentifier(sqlText) + } + + override def parseFunctionIdentifier(sqlText: String): FunctionIdentifier = { + delegate.parseFunctionIdentifier(sqlText) + } + + override def parseTableSchema(sqlText: String): StructType = { + delegate.parseTableSchema(sqlText) + } + + override def parseDataType(sqlText: String): DataType = { + delegate.parseDataType(sqlText) + } + + override def parseMultipartIdentifier(sqlText: String): Seq[String] = { + delegate.parseMultipartIdentifier(sqlText) + } + + override def parseRawDataType(sqlText: String): DataType = { + delegate.parseRawDataType(sqlText) + } + +} diff --git a/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineDataMaskingExtension.scala b/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineDataMaskingExtension.scala index 013238517b..84c8733b3f 100644 --- a/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineDataMaskingExtension.scala +++ b/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineDataMaskingExtension.scala @@ -35,9 +35,11 @@ import org.apache.spark.sql.execution.command.{CreateDataSourceTableAsSelectComm import org.apache.spark.sql.execution.datasources.{InsertIntoDataSourceCommand, InsertIntoHadoopFsRelationCommand, LogicalRelation, SaveIntoDataSourceCommand} import org.apache.spark.sql.hive.execution.{CreateHiveTableAsSelectCommand, InsertIntoHiveDirCommand, InsertIntoHiveTable} +import org.apache.submarine.spark.compatible.SubqueryCompatible import org.apache.submarine.spark.security._ import org.apache.submarine.spark.security.SparkObjectType.COLUMN + /** * An Apache Spark's [[Optimizer]] extension for column data masking. * TODO(kent yao) implement this as analyzer rule @@ -233,7 +235,8 @@ case class SubmarineDataMaskingExtension(spark: SparkSession) extends Rule[Logic marked transformAllExpressions { case s: SubqueryExpression => - val Subquery(newPlan) = Subquery(SubmarineDataMasking(s.plan)) + val SubqueryCompatible(newPlan, _) = SubqueryCompatible( + SubmarineDataMasking(s.plan), SubqueryExpression.hasCorrelatedSubquery(s)) s.withNewPlan(newPlan) } } diff --git a/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineRowFilterExtension.scala b/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineRowFilterExtension.scala index be3c031cfd..fc439b535c 100644 --- a/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineRowFilterExtension.scala +++ b/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineRowFilterExtension.scala @@ -32,6 +32,7 @@ import org.apache.spark.sql.execution.command.{CreateDataSourceTableAsSelectComm import org.apache.spark.sql.execution.datasources.{InsertIntoDataSourceCommand, InsertIntoHadoopFsRelationCommand, LogicalRelation, SaveIntoDataSourceCommand} import org.apache.spark.sql.hive.execution.{CreateHiveTableAsSelectCommand, InsertIntoHiveDirCommand, InsertIntoHiveTable} +import org.apache.submarine.spark.compatible.SubqueryCompatible import org.apache.submarine.spark.security._ /** @@ -62,9 +63,9 @@ case class SubmarineRowFilterExtension(spark: SparkSession) extends Rule[Logical val analyzed = spark.sessionState.analyzer.execute(Filter(condition, plan)) val optimized = analyzed transformAllExpressions { case s: SubqueryExpression => - val Subquery(newPlan) = - rangerSparkOptimizer.execute(Subquery(SubmarineRowFilter(s.plan))) - s.withNewPlan(newPlan) + val SubqueryCompatible(newPlan, _) = SubqueryCompatible( + SubmarineRowFilter(s.plan), SubqueryExpression.hasCorrelatedSubquery(s)) + s.withNewPlan(rangerSparkOptimizer.execute(newPlan)) } SubmarineRowFilter(optimized) } else { diff --git a/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineSparkRangerAuthorizationExtension.scala b/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineSparkRangerAuthorizationExtension.scala index 0236d7c88d..68dc720270 100644 --- a/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineSparkRangerAuthorizationExtension.scala +++ b/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineSparkRangerAuthorizationExtension.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql.execution.datasources.{CreateTempViewUsing, InsertIn import org.apache.spark.sql.hive.PrivilegesBuilder import org.apache.spark.sql.hive.execution.CreateHiveTableAsSelectCommand +import org.apache.submarine.spark.compatible.CompatibleCommand._ import org.apache.submarine.spark.security.{RangerSparkAuthorizer, SparkAccessControlException} /** @@ -52,7 +53,7 @@ case class SubmarineSparkRangerAuthorizationExtension(spark: SparkSession) */ override def apply(plan: LogicalPlan): LogicalPlan = { plan match { - case s: ShowDatabasesCommand => SubmarineShowDatabasesCommand(s) + case s: ShowDatabasesCommandCompatible => SubmarineShowDatabasesCommand(s) case s: SubmarineShowDatabasesCommand => s case s: ShowTablesCommand => SubmarineShowTablesCommand(s) case s: SubmarineShowTablesCommand => s @@ -144,10 +145,10 @@ case class SubmarineSparkRangerAuthorizationExtension(spark: SparkSession) case p if p.nodeName == "SaveIntoDataSourceCommand" => QUERY case s: SetCommand if s.kv.isEmpty || s.kv.get._2.isEmpty => SHOWCONF - case _: SetDatabaseCommand => SWITCHDATABASE + case _: SetDatabaseCommandCompatible => SWITCHDATABASE case _: ShowCreateTableCommand => SHOW_CREATETABLE case _: ShowColumnsCommand => SHOWCOLUMNS - case _: ShowDatabasesCommand => SHOWDATABASES + case _: ShowDatabasesCommandCompatible => SHOWDATABASES case _: ShowFunctionsCommand => SHOWFUNCTIONS case _: ShowPartitionsCommand => SHOWPARTITIONS case _: ShowTablesCommand => SHOWTABLES diff --git a/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/execution/SubmarineShowDatabasesCommand.scala b/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/execution/SubmarineShowDatabasesCommand.scala index d74c180ce9..dd782f0d5a 100644 --- a/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/execution/SubmarineShowDatabasesCommand.scala +++ b/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/execution/SubmarineShowDatabasesCommand.scala @@ -20,16 +20,22 @@ package org.apache.spark.sql.execution import org.apache.spark.sql.{Row, SparkSession} -import org.apache.spark.sql.execution.command.{RunnableCommand, ShowDatabasesCommand} +import org.apache.spark.sql.execution.command.RunnableCommand +import org.apache.submarine.spark.compatible.CompatibleCommand.ShowDatabasesCommandCompatible +import org.apache.submarine.spark.compatible.CompatibleFunc import org.apache.submarine.spark.security.{RangerSparkAuthorizer, SparkPrivilegeObject, SparkPrivilegeObjectType} -case class SubmarineShowDatabasesCommand(child: ShowDatabasesCommand) extends RunnableCommand { +case class SubmarineShowDatabasesCommand(child: ShowDatabasesCommandCompatible) + extends RunnableCommand { override val output = child.output override def run(sparkSession: SparkSession): Seq[Row] = { - val rows = child.run(sparkSession) - rows.filter(r => RangerSparkAuthorizer.isAllowed(toSparkPrivilegeObject(r))) + val catalog = sparkSession.sessionState.catalog + val databases = CompatibleFunc.getPattern(child) + .map(catalog.listDatabases).getOrElse(catalog.listDatabases()).map { d => Row(d) } + + databases.filter(r => RangerSparkAuthorizer.isAllowed(toSparkPrivilegeObject(r))) } private def toSparkPrivilegeObject(row: Row): SparkPrivilegeObject = { diff --git a/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/hive/PrivilegesBuilder.scala b/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/hive/PrivilegesBuilder.scala index bbc1a47eb0..958223fd56 100644 --- a/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/hive/PrivilegesBuilder.scala +++ b/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/hive/PrivilegesBuilder.scala @@ -28,14 +28,17 @@ import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.catalyst.expressions.NamedExpression import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan, Project} -import org.apache.spark.sql.execution.command.{AlterDatabasePropertiesCommand, AlterTableAddPartitionCommand, AlterTableDropPartitionCommand, AlterTableRecoverPartitionsCommand, AlterTableRenameCommand, AlterTableRenamePartitionCommand, AlterTableSerDePropertiesCommand, AlterTableSetLocationCommand, AlterTableSetPropertiesCommand, AlterTableUnsetPropertiesCommand, AlterViewAsCommand, AnalyzeColumnCommand, AnalyzeTableCommand, CacheTableCommand, CreateDatabaseCommand, CreateDataSourceTableAsSelectCommand, CreateDataSourceTableCommand, CreateFunctionCommand, CreateTableCommand, CreateTableLikeCommand, CreateViewCommand, DescribeDatabaseCommand, DescribeFunctionCommand, DescribeTableCommand, DropDatabaseCommand, DropFunctionCommand, DropTableCommand, ExplainCommand, LoadDataCommand, PersistedView, RunnableCommand, SetDatabaseCommand, ShowColumnsCommand, ShowCreateTableCommand, ShowFunctionsCommand, ShowPartitionsCommand, ShowTablePropertiesCommand, ShowTablesCommand, TruncateTableCommand} +import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.execution.datasources.{InsertIntoDataSourceCommand, InsertIntoHadoopFsRelationCommand, LogicalRelation} import org.apache.spark.sql.hive.execution.CreateHiveTableAsSelectCommand import org.apache.spark.sql.types.StructField +import org.apache.submarine.spark.compatible.{CompatibleFunc, PersistedViewCompatible} +import org.apache.submarine.spark.compatible.CompatibleCommand.SetDatabaseCommandCompatible import org.apache.submarine.spark.security.{SparkPrivilegeObject, SparkPrivilegeObjectType, SparkPrivObjectActionType} import org.apache.submarine.spark.security.SparkPrivObjectActionType.SparkPrivObjectActionType + /** * [[LogicalPlan]] -> list of [[SparkPrivilegeObject]]s */ @@ -119,7 +122,7 @@ private[sql] object PrivilegesBuilder { // Unfortunately, the real world is always a place where miracles happen. // We check the privileges directly without resolving the plan and leave everything // to spark to do. - addTableOrViewLevelObjs(u.tableIdentifier, privilegeObjects) + addTableOrViewLevelObjs(CompatibleFunc.tableIdentifier(u), privilegeObjects) case p => for (child <- p.children) { @@ -203,9 +206,9 @@ private[sql] object PrivilegesBuilder { case a: AnalyzeColumnCommand => addTableOrViewLevelObjs( - a.tableIdent, inputObjs, columns = a.columnNames) + a.tableIdent, inputObjs, columns = CompatibleFunc.analyzeColumnName(a)) addTableOrViewLevelObjs( - a.tableIdent, outputObjs, columns = a.columnNames) + a.tableIdent, outputObjs, columns = CompatibleFunc.analyzeColumnName(a)) case a if a.nodeName == "AnalyzePartitionCommand" => addTableOrViewLevelObjs( @@ -252,7 +255,7 @@ private[sql] object PrivilegesBuilder { case c: CreateViewCommand => c.viewType match { - case PersistedView => + case PersistedViewCompatible.obj => // PersistedView will be tied to a database addDbLevelObjs(c.name, outputObjs) addTableOrViewLevelObjs(c.name, outputObjs) @@ -328,7 +331,8 @@ private[sql] object PrivilegesBuilder { case s if s.nodeName == "SaveIntoDataSourceCommand" => buildQuery(getFieldVal(s, "query").asInstanceOf[LogicalPlan], outputObjs) - case s: SetDatabaseCommand => addDbLevelObjs(s.databaseName, inputObjs) + case s: SetDatabaseCommandCompatible => + addDbLevelObjs(CompatibleFunc.getCatLogName(s), inputObjs) case s: ShowColumnsCommand => addTableOrViewLevelObjs(s.tableName, inputObjs) diff --git a/submarine-security/spark-security/src/test/resources/sparkSql_hive_jenkins.json b/submarine-security/spark-security/src/test/resources/sparkSql_hive_jenkins.json index b356fd54ac..3a7b473e43 100644 --- a/submarine-security/spark-security/src/test/resources/sparkSql_hive_jenkins.json +++ b/submarine-security/spark-security/src/test/resources/sparkSql_hive_jenkins.json @@ -336,7 +336,8 @@ "resources": { "database": { "values": [ - "default" + "default", + "spark_catalog" ], "isExcludes": false, "isRecursive": false @@ -433,7 +434,8 @@ "resources": { "database": { "values": [ - "default" + "default", + "spark_catalog" ], "isExcludes": false, "isRecursive": false @@ -652,7 +654,8 @@ "resources": { "database": { "values": [ - "default" + "default", + "spark_catalog" ], "isExcludes": false, "isRecursive": false @@ -806,7 +809,8 @@ "resources": { "database": { "values": [ - "default" + "default", + "spark_catalog" ], "isExcludes": false, "isRecursive": false @@ -920,7 +924,8 @@ "resources": { "database": { "values": [ - "default" + "default", + "spark_catalog" ], "isExcludes": false, "isRecursive": false @@ -977,7 +982,8 @@ "resources": { "database": { "values": [ - "default" + "default", + "spark_catalog" ], "isExcludes": false, "isRecursive": false @@ -1034,7 +1040,8 @@ "resources": { "database": { "values": [ - "default" + "default", + "spark_catalog" ], "isExcludes": false, "isRecursive": false @@ -1091,7 +1098,8 @@ "resources": { "database": { "values": [ - "default" + "default", + "spark_catalog" ], "isExcludes": false, "isRecursive": false @@ -1156,7 +1164,8 @@ "resources": { "database": { "values": [ - "default" + "default", + "spark_catalog" ], "isExcludes": false, "isRecursive": false @@ -1221,7 +1230,8 @@ "resources": { "database": { "values": [ - "default" + "default", + "spark_catalog" ], "isExcludes": false, "isRecursive": false @@ -1285,7 +1295,8 @@ "resources": { "database": { "values": [ - "default" + "default", + "spark_catalog" ], "isExcludes": false, "isRecursive": false @@ -1349,7 +1360,8 @@ "resources": { "database": { "values": [ - "default" + "default", + "spark_catalog" ], "isExcludes": false, "isRecursive": false @@ -1413,7 +1425,8 @@ "resources": { "database": { "values": [ - "default" + "default", + "spark_catalog" ], "isExcludes": false, "isRecursive": false @@ -1477,7 +1490,8 @@ "resources": { "database": { "values": [ - "default" + "default", + "spark_catalog" ], "isExcludes": false, "isRecursive": false @@ -1541,7 +1555,8 @@ "resources": { "database": { "values": [ - "default" + "default", + "spark_catalog" ], "isExcludes": false, "isRecursive": false @@ -1598,7 +1613,8 @@ "resources": { "database": { "values": [ - "default" + "default", + "spark_catalog" ], "isExcludes": false, "isRecursive": false @@ -1655,7 +1671,8 @@ "resources": { "database": { "values": [ - "default" + "default", + "spark_catalog" ], "isExcludes": false, "isRecursive": false @@ -1712,7 +1729,8 @@ "resources": { "database": { "values": [ - "default" + "default", + "spark_catalog" ], "isExcludes": false, "isRecursive": false @@ -1769,7 +1787,8 @@ "resources": { "database": { "values": [ - "default" + "default", + "spark_catalog" ], "isExcludes": false, "isRecursive": false @@ -1826,7 +1845,8 @@ "resources": { "database": { "values": [ - "default" + "default", + "spark_catalog" ], "isExcludes": false, "isRecursive": false @@ -1883,7 +1903,8 @@ "resources": { "database": { "values": [ - "default" + "default", + "spark_catalog" ], "isExcludes": false, "isRecursive": false @@ -1947,7 +1968,8 @@ "resources": { "database": { "values": [ - "default" + "default", + "spark_catalog" ], "isExcludes": false, "isRecursive": false @@ -2011,7 +2033,8 @@ "resources": { "database": { "values": [ - "default" + "default", + "spark_catalog" ], "isExcludes": false, "isRecursive": false @@ -2075,7 +2098,8 @@ "resources": { "database": { "values": [ - "default" + "default", + "spark_catalog" ], "isExcludes": false, "isRecursive": false @@ -2139,7 +2163,8 @@ "resources": { "database": { "values": [ - "default" + "default", + "spark_catalog" ], "isExcludes": false, "isRecursive": false @@ -2652,4 +2677,4 @@ "version": 1 }, "auditMode": "audit-default" -} \ No newline at end of file +} diff --git a/submarine-security/spark-security/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineSparkRangerAuthorizationExtensionTest.scala b/submarine-security/spark-security/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineSparkRangerAuthorizationExtensionTest.scala index 92b9ca4a9f..ad23cf2945 100644 --- a/submarine-security/spark-security/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineSparkRangerAuthorizationExtensionTest.scala +++ b/submarine-security/spark-security/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineSparkRangerAuthorizationExtensionTest.scala @@ -20,12 +20,14 @@ package org.apache.spark.sql.catalyst.optimizer import org.apache.spark.sql.execution.{SubmarineShowDatabasesCommand, SubmarineShowTablesCommand} -import org.apache.spark.sql.execution.command.{CreateDatabaseCommand, ShowDatabasesCommand, ShowTablesCommand} +import org.apache.spark.sql.execution.command.{CreateDatabaseCommand, ShowTablesCommand} import org.apache.spark.sql.hive.test.TestHive import org.scalatest.{BeforeAndAfterAll, FunSuite} +import org.apache.submarine.spark.compatible.CompatibleCommand.ShowDatabasesCommandCompatible import org.apache.submarine.spark.security.SparkAccessControlException + class SubmarineSparkRangerAuthorizationExtensionTest extends FunSuite with BeforeAndAfterAll { private val spark = TestHive.sparkSession.newSession() @@ -35,7 +37,7 @@ class SubmarineSparkRangerAuthorizationExtensionTest extends FunSuite with Befor test("replace submarine show databases") { val df = spark.sql("show databases") val originalPlan = df.queryExecution.optimizedPlan - assert(originalPlan.isInstanceOf[ShowDatabasesCommand]) + assert(originalPlan.isInstanceOf[ShowDatabasesCommandCompatible]) val newPlan = authz(originalPlan) assert(newPlan.isInstanceOf[SubmarineShowDatabasesCommand]) } diff --git a/submarine-security/spark-security/src/test/scala/org/apache/submarine/spark/security/AuthorizationTest.scala b/submarine-security/spark-security/src/test/scala/org/apache/submarine/spark/security/AuthorizationTest.scala index 5cbf43941c..e8e9b8f1ad 100644 --- a/submarine-security/spark-security/src/test/scala/org/apache/submarine/spark/security/AuthorizationTest.scala +++ b/submarine-security/spark-security/src/test/scala/org/apache/submarine/spark/security/AuthorizationTest.scala @@ -118,8 +118,12 @@ class AuthorizationTest extends FunSuite with BeforeAndAfterAll { test("use database") { withUser("alice") { val e = intercept[SparkAccessControlException](sql("use default")) - assert(e.getMessage === "Permission denied: user [alice] does not have [USE] privilege" + - " on [default]") + assert( + e.getMessage === "Permission denied: user [alice] " + + "does not have [USE] privilege on [default]" + || + e.getMessage === "Permission denied: user [alice] " + + "does not have [USE] privilege on [spark_catalog]") } withUser("bob") { sql("use default") diff --git a/submarine-security/spark-security/src/test/scala/org/apache/submarine/spark/security/TPCDSTest.scala b/submarine-security/spark-security/src/test/scala/org/apache/submarine/spark/security/TPCDSTest.scala index 13a7d64410..ddd281ba49 100644 --- a/submarine-security/spark-security/src/test/scala/org/apache/submarine/spark/security/TPCDSTest.scala +++ b/submarine-security/spark-security/src/test/scala/org/apache/submarine/spark/security/TPCDSTest.scala @@ -35,7 +35,6 @@ class TPCDSTest extends FunSuite with BeforeAndAfterAll { override def beforeAll(): Unit = { super.beforeAll() - SubmarineSparkUtils.enableAll(spark) spark.conf.set(SQLConf.CROSS_JOINS_ENABLED.key, "true") sql( @@ -322,6 +321,8 @@ class TPCDSTest extends FunSuite with BeforeAndAfterAll { |`wp_image_count` INT, `wp_max_ad_count` INT) |USING parquet """.stripMargin) + + SubmarineSparkUtils.enableAll(spark) } private val tpcdsQueries = Seq(