Skip to content

Commit

Permalink
Merge branch 'master' into KYUUBI-5594
Browse files Browse the repository at this point in the history
  • Loading branch information
AngersZhuuuu committed Nov 29, 2023
2 parents 0ebdd5d + c1685c6 commit e0f28a6
Show file tree
Hide file tree
Showing 26 changed files with 134 additions and 56 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/master.yml
Original file line number Diff line number Diff line change
Expand Up @@ -398,9 +398,9 @@ jobs:
uses: actions/checkout@v4
# https://github.com/docker/build-push-action
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v2
uses: docker/setup-buildx-action@v3
- name: Build Kyuubi Docker Image
uses: docker/build-push-action@v3
uses: docker/build-push-action@v5
with:
# passthrough CI into build container
build-args: |
Expand Down
8 changes: 4 additions & 4 deletions .github/workflows/publish-snapshot-docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,16 @@ jobs:
- name: Checkout
uses: actions/checkout@v4
- name: Set up QEMU
uses: docker/setup-qemu-action@v2
uses: docker/setup-qemu-action@v3
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v2
uses: docker/setup-buildx-action@v3
- name: Login to Docker Hub
uses: docker/login-action@v2
uses: docker/login-action@v3
with:
username: ${{ secrets.DOCKERHUB_USER }}
password: ${{ secrets.DOCKERHUB_TOKEN }}
- name: Build and Push Kyuubi Docker Image
uses: docker/build-push-action@v4
uses: docker/build-push-action@v5
with:
# build cache on Github Actions, See: https://docs.docker.com/build/cache/backends/gha/#using-dockerbuild-push-action
cache-from: type=gha
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,4 +215,4 @@
} ],
"opType" : "SWITCHDATABASE",
"uriDescs" : [ ]
} ]
} ]
Original file line number Diff line number Diff line change
Expand Up @@ -111,4 +111,4 @@
"comment" : ""
} ],
"opType" : "RELOADFUNCTION"
} ]
} ]
Original file line number Diff line number Diff line change
Expand Up @@ -111,4 +111,4 @@
"comment" : ""
} ],
"uriDescs" : [ ]
} ]
} ]
Original file line number Diff line number Diff line change
Expand Up @@ -2528,4 +2528,4 @@
"isInput" : false,
"comment" : "Delta"
} ]
} ]
} ]
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,12 @@
package org.apache.kyuubi.plugin.spark.authz.rule

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Subquery}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, View}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.TreeNodeTag
import org.apache.spark.sql.execution.SQLExecution.EXECUTION_ID_KEY

import org.apache.kyuubi.plugin.spark.authz.rule.Authorization._
import org.apache.kyuubi.plugin.spark.authz.rule.permanentview.PermanentViewMarker
import org.apache.kyuubi.plugin.spark.authz.util.ReservedKeys._

abstract class Authorization(spark: SparkSession) extends Rule[LogicalPlan] {
Expand Down Expand Up @@ -54,18 +53,15 @@ object Authorization {
def markAuthChecked(plan: LogicalPlan): LogicalPlan = {
plan.setTagValue(KYUUBI_AUTHZ_TAG, ())
plan transformDown {
case pvm: PermanentViewMarker =>
markAllNodesAuthChecked(pvm)
case subquery: Subquery =>
markAllNodesAuthChecked(subquery)
// TODO: Add this line Support for spark3.1, we can remove this
// after spark 3.2 since https://issues.apache.org/jira/browse/SPARK-34269
case view: View =>
markAllNodesAuthChecked(view.child)
}
}

protected def isAuthChecked(plan: LogicalPlan): Boolean = {
plan match {
case subquery: Subquery => isAuthChecked(subquery.child)
case p => p.getTagValue(KYUUBI_AUTHZ_TAG).nonEmpty
}
plan.getTagValue(KYUUBI_AUTHZ_TAG).nonEmpty
}

def setExplainCommandExecutionId(sparkSession: SparkSession): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,7 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan}

import org.apache.kyuubi.plugin.spark.authz.util.WithInternalChild

case class PermanentViewMarker(child: LogicalPlan, catalogTable: CatalogTable) extends LeafNode
with WithInternalChild {
case class PermanentViewMarker(child: LogicalPlan, catalogTable: CatalogTable) extends LeafNode {

override def output: Seq[Attribute] = child.output

override def withNewChildInternal(newChild: LogicalPlan): LogicalPlan =
copy(child = newChild)
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ class FlinkSessionImpl(
case TGetInfoType.CLI_SERVER_NAME | TGetInfoType.CLI_DBMS_NAME =>
TGetInfoValue.stringValue("Apache Flink")
case TGetInfoType.CLI_DBMS_VER => TGetInfoValue.stringValue(EnvironmentInformation.getVersion)
case TGetInfoType.CLI_ODBC_KEYWORDS => TGetInfoValue.stringValue("Unimplemented")
case _ => throw KyuubiSQLException(s"Unrecognized GetInfoType value: $infoType")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,15 @@ object HiveSQLEngine extends Logging {
} else {
val effectiveUser = UserGroupInformation.createProxyUser(sessionUser.get, realUser)
effectiveUser.doAs(new PrivilegedExceptionAction[Unit] {
override def run(): Unit = startEngine()
override def run(): Unit = {
val engineCredentials =
kyuubiConf.getOption(KyuubiReservedKeys.KYUUBI_ENGINE_CREDENTIALS_KEY)
kyuubiConf.unset(KyuubiReservedKeys.KYUUBI_ENGINE_CREDENTIALS_KEY)
engineCredentials.filter(_.nonEmpty).foreach { credentials =>
HiveTBinaryFrontendService.renewDelegationToken(credentials)
}
startEngine()
}
})
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,19 @@

package org.apache.kyuubi.engine.hive

import org.apache.hadoop.io.Text
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hive.service.rpc.thrift.{TRenewDelegationTokenReq, TRenewDelegationTokenResp}

import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.ha.client.{EngineServiceDiscovery, ServiceDiscovery}
import org.apache.kyuubi.service.{Serverable, Service, TBinaryFrontendService}
import org.apache.kyuubi.service.TFrontendService.OK_STATUS
import org.apache.kyuubi.util.KyuubiHadoopUtils

class HiveTBinaryFrontendService(override val serverable: Serverable)
extends TBinaryFrontendService("HiveTBinaryFrontend") {
import HiveTBinaryFrontendService._

override lazy val discoveryService: Option[Service] = {
if (ServiceDiscovery.supportServiceDiscovery(conf)) {
Expand All @@ -30,4 +38,39 @@ class HiveTBinaryFrontendService(override val serverable: Serverable)
None
}
}

override def RenewDelegationToken(req: TRenewDelegationTokenReq): TRenewDelegationTokenResp = {
debug(req.toString)

// We hacked `TCLIService.Iface.RenewDelegationToken` to transfer Credentials from Kyuubi
// Server to Hive SQL engine
val resp = new TRenewDelegationTokenResp()
try {
renewDelegationToken(req.getDelegationToken)
resp.setStatus(OK_STATUS)
} catch {
case e: Exception =>
warn("Error renew delegation tokens: ", e)
resp.setStatus(KyuubiSQLException.toTStatus(e))
}
resp
}
}

object HiveTBinaryFrontendService {

def renewDelegationToken(tokenStr: String): Unit = {
val currentUser = UserGroupInformation.getCurrentUser
// `currentUser` is either `UserGroupInformation.getLoginUser` or a proxy user.
// If `currentUser` is a proxy user, it needs a HIVE_DELEGATION_TOKEN to pass
// HiveMetastoreClient authentication.
if (currentUser.getAuthenticationMethod == UserGroupInformation.AuthenticationMethod.PROXY) {
val newCreds = KyuubiHadoopUtils.decodeCredentials(tokenStr)
KyuubiHadoopUtils.getTokenMap(newCreds).values
.find(_.getKind == new Text("HIVE_DELEGATION_TOKEN"))
.foreach { token =>
UserGroupInformation.getCurrentUser.addToken(token)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@ import java.util.{List => JList}
import java.util.concurrent.Future

import scala.collection.JavaConverters._
import scala.language.reflectiveCalls

import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hive.service.cli.{SessionHandle => ImportedSessionHandle}
import org.apache.hive.service.cli.session.{HiveSessionImplwithUGI => ImportedHiveSessionImpl, HiveSessionProxy, SessionManager => ImportedHiveSessionManager}
import org.apache.hive.service.cli.session.{HiveSessionImpl => ImportedHiveSessionImpl, HiveSessionImplwithUGI => ImportedHiveSessionImplwithUGI, HiveSessionProxy, SessionManager => ImportedHiveSessionManager}
import org.apache.hive.service.rpc.thrift.TProtocolVersion

import org.apache.kyuubi.config.KyuubiConf.ENGINE_SHARE_LEVEL
Expand All @@ -44,11 +46,14 @@ class HiveSessionManager(engine: HiveSQLEngine) extends SessionManager("HiveSess

private val internalSessionManager = new ImportedHiveSessionManager(null) {

var doAsEnabled: Boolean = _

/**
* Avoid unnecessary hive initialization
*/
override def init(hiveConf: HiveConf): Unit = {
// this.hiveConf = hiveConf
this.doAsEnabled = hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS)
}

/**
Expand Down Expand Up @@ -79,11 +84,10 @@ class HiveSessionManager(engine: HiveSQLEngine) extends SessionManager("HiveSess
getSessionOption).getOrElse {
val sessionHandle =
conf.get(KYUUBI_SESSION_HANDLE_KEY).map(SessionHandle.fromUUID).getOrElse(SessionHandle())
val hive = {

val hive = if (internalSessionManager.doAsEnabled) {
val sessionWithUGI = DynConstructors.builder()
.impl( // for Hive 3.1
classOf[ImportedHiveSessionImpl],
classOf[ImportedHiveSessionImplwithUGI],
classOf[ImportedSessionHandle],
classOf[TProtocolVersion],
classOf[String],
Expand All @@ -93,15 +97,15 @@ class HiveSessionManager(engine: HiveSQLEngine) extends SessionManager("HiveSess
classOf[String],
classOf[JList[String]])
.impl( // for Hive 2.3
classOf[ImportedHiveSessionImpl],
classOf[ImportedHiveSessionImplwithUGI],
classOf[ImportedSessionHandle],
classOf[TProtocolVersion],
classOf[String],
classOf[String],
classOf[HiveConf],
classOf[String],
classOf[String])
.build[ImportedHiveSessionImpl]()
.build[ImportedHiveSessionImplwithUGI]()
.newInstance(
new ImportedSessionHandle(sessionHandle.toTSessionHandle, protocol),
protocol,
Expand All @@ -114,6 +118,34 @@ class HiveSessionManager(engine: HiveSQLEngine) extends SessionManager("HiveSess
val proxy = HiveSessionProxy.getProxy(sessionWithUGI, sessionWithUGI.getSessionUgi)
sessionWithUGI.setProxySession(proxy)
proxy
} else {
DynConstructors.builder()
.impl( // for Hive 3.1
classOf[ImportedHiveSessionImpl],
classOf[ImportedSessionHandle],
classOf[TProtocolVersion],
classOf[String],
classOf[String],
classOf[HiveConf],
classOf[String],
classOf[JList[String]])
.impl( // for Hive 2.3
classOf[ImportedHiveSessionImpl],
classOf[ImportedSessionHandle],
classOf[TProtocolVersion],
classOf[String],
classOf[String],
classOf[HiveConf],
classOf[String])
.build[ImportedHiveSessionImpl]()
.newInstance(
new ImportedSessionHandle(sessionHandle.toTSessionHandle, protocol),
protocol,
user,
password,
HiveSQLEngine.hiveConf,
ipAddress,
Seq(ipAddress).asJava)
}
hive.setSessionManager(internalSessionManager)
hive.setOperationManager(internalSessionManager.getOperationManager)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,8 @@ object SchemaHelper {
case dt
if Array(TIMESTAMP_NTZ, DAY_TIME_INTERVAL, YEAR_MONTH_INTERVAL)
.contains(dt.getClass.getSimpleName) => Some(dt.defaultSize)
case dt: DecimalType =>
Some(dt.precision)
case dt @ (BooleanType | _: NumericType | DateType | TimestampType |
CalendarIntervalType | NullType) =>
Some(dt.defaultSize)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ class SparkOperationSuite extends WithSparkSQLEngine with HiveMetadataTests with
val colSize = rowSet.getInt(COLUMN_SIZE)
schema(pos).dataType match {
case StringType | BinaryType | _: ArrayType | _: MapType => assert(colSize === 0)
case d: DecimalType => assert(colSize === d.precision)
case StructType(fields) if fields.length == 1 => assert(colSize === 0)
case o => assert(colSize === o.defaultSize)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ class FlinkOperationSuite extends WithKyuubiServerAndFlinkMiniCluster
req.setSessionHandle(handle)
req.setInfoType(TGetInfoType.CLI_DBMS_NAME)
assert(client.GetInfo(req).getInfoValue.getStringValue === "Apache Flink")
req.setInfoType(TGetInfoType.CLI_ODBC_KEYWORDS)
assert(client.GetInfo(req).getInfoValue.getStringValue === "Unimplemented")
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ object Utils extends Logging {

private val PATTERN_FOR_KEY_VALUE_ARG = "(.+?)=(.+)".r

def redactCommandLineArgs(conf: KyuubiConf, commands: Array[String]): Array[String] = {
def redactCommandLineArgs(conf: KyuubiConf, commands: Iterable[String]): Iterable[String] = {
val redactionPattern = conf.get(SERVER_SECRET_REDACTION_PATTERN)
var nextKV = false
commands.map {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ class UtilsSuite extends KyuubiFunSuite {
buffer += "--conf"
buffer += "kyuubi.regular.property2=regular_value"

val commands = buffer.toArray
val commands = buffer

// Redact sensitive information
val redactedCmdArgs = Utils.redactCommandLineArgs(conf, commands)
Expand All @@ -183,7 +183,7 @@ class UtilsSuite extends KyuubiFunSuite {
expectBuffer += "--conf"
expectBuffer += "kyuubi.regular.property2=regular_value"

assert(expectBuffer.toArray === redactedCmdArgs)
assert(expectBuffer === redactedCmdArgs)
}

test("redact sensitive information") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ trait ProcBuilder {

protected def proxyUser: String

protected val commands: Array[String]
protected val commands: Iterable[String]

def conf: KyuubiConf

Expand Down Expand Up @@ -142,7 +142,7 @@ trait ProcBuilder {
}

final lazy val processBuilder: ProcessBuilder = {
val pb = new ProcessBuilder(commands: _*)
val pb = new ProcessBuilder(commands.toStream.asJava)

val envs = pb.environment()
envs.putAll(env.asJava)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class ChatProcessBuilder(
*/
override protected def mainClass: String = "org.apache.kyuubi.engine.chat.ChatEngine"

override protected val commands: Array[String] = {
override protected val commands: Iterable[String] = {
val buffer = new ArrayBuffer[String]()
buffer += executable

Expand Down Expand Up @@ -98,7 +98,7 @@ class ChatProcessBuilder(
buffer += "--conf"
buffer += s"$k=$v"
}
buffer.toArray
buffer
}

override def toString: String = {
Expand Down
Loading

0 comments on commit e0f28a6

Please sign in to comment.