Skip to content

Commit

Permalink
[KYUUBI #1501] Introduce operationsResource
Browse files Browse the repository at this point in the history
### _Why are the changes needed?_
#1501
Introduce operationsResource
mv parseSessionHandle() to SessionHandle
mv parseOperationHandle() to OperationHandle

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [ ] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #1502 from simon824/operationresource.

Closes #1501

c4b1b64 [simon] fix
72446d3 [simon] introduce operationsResource
06d1201 [simon] init

Authored-by: simon <[email protected]>
Signed-off-by: ulysses-you <[email protected]>
  • Loading branch information
simon824 authored and ulysses-you committed Dec 6, 2021
1 parent bd3d4d7 commit 4319df7
Show file tree
Hide file tree
Showing 7 changed files with 201 additions and 145 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@

package org.apache.kyuubi.operation

import java.util.Objects
import java.util.{Objects, UUID}

import scala.language.implicitConversions
import scala.util.control.NonFatal

import org.apache.hive.service.rpc.thrift.{TOperationHandle, TProtocolVersion}

import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.cli.{Handle, HandleIdentifier}
import org.apache.kyuubi.operation.OperationType.OperationType

Expand Down Expand Up @@ -81,4 +83,23 @@ object OperationHandle {
tOperationHandle.setHasResultSet(handle._hasResultSet)
tOperationHandle
}

def parseOperationHandle(operationHandleStr: String): OperationHandle = {
try {
val operationHandleParts = operationHandleStr.split("\\|")
require(
operationHandleParts.size == 4,
s"Expected 4 parameters but found ${operationHandleParts.size}.")

val handleIdentifier = HandleIdentifier(
UUID.fromString(operationHandleParts(0)),
UUID.fromString(operationHandleParts(1)))
val protocolVersion = TProtocolVersion.findByValue(operationHandleParts(2).toInt)
val operationType = OperationType.withName(operationHandleParts(3))
OperationHandle(handleIdentifier, operationType, protocolVersion)
} catch {
case NonFatal(e) =>
throw KyuubiSQLException(s"Invalid $operationHandleStr", e)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@

package org.apache.kyuubi.session

import java.util.Objects
import java.util.{Objects, UUID}

import scala.util.control.NonFatal

import org.apache.hive.service.rpc.thrift.{TProtocolVersion, TSessionHandle}

import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.cli.{Handle, HandleIdentifier}

case class SessionHandle(
Expand Down Expand Up @@ -55,4 +58,22 @@ object SessionHandle {
def apply(protocol: TProtocolVersion): SessionHandle = {
apply(HandleIdentifier(), protocol)
}

def parseSessionHandle(sessionHandleStr: String): SessionHandle = {
try {
val sessionHandleParts = sessionHandleStr.split("\\|")
require(
sessionHandleParts.size == 3,
s"Expected 3 parameters but found ${sessionHandleParts.size}.")

val handleIdentifier = HandleIdentifier(
UUID.fromString(sessionHandleParts(0)),
UUID.fromString(sessionHandleParts(1)))
val protocolVersion = TProtocolVersion.findByValue(sessionHandleParts(2).toInt)
SessionHandle(handleIdentifier, protocolVersion)
} catch {
case NonFatal(e) =>
throw KyuubiSQLException(s"Invalid $sessionHandleStr", e)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ private[v1] class ApiRootResource extends ApiRequestContext {
@Path("sessions")
def sessions: Class[SessionsResource] = classOf[SessionsResource]

@Path("operations")
def operations: Class[OperationsResource] = classOf[OperationsResource]

@GET
@Path("exception")
@Produces(Array(MediaType.TEXT_PLAIN))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.server.api.v1

import javax.ws.rs.{GET, Path, PathParam, Produces, _}
import javax.ws.rs.core.MediaType

import scala.util.control.NonFatal

import io.swagger.v3.oas.annotations.media.Content
import io.swagger.v3.oas.annotations.responses.ApiResponse
import io.swagger.v3.oas.annotations.tags.Tag

import org.apache.kyuubi.operation.OperationHandle.parseOperationHandle
import org.apache.kyuubi.server.api.ApiRequestContext

@Tag(name = "Operation")
@Produces(Array(MediaType.APPLICATION_JSON))
private[v1] class OperationsResource extends ApiRequestContext {

@ApiResponse(
responseCode = "200",
content = Array(new Content(
mediaType = MediaType.APPLICATION_JSON)),
description =
"Get an operation detail with a given session identifier and operation identifier")
@GET
@Path("{operationHandle}")
def getOperationDetail(
@PathParam("operationHandle") operationHandleStr: String): OperationDetail = {
try {
val operation = backendService.sessionManager.operationManager
.getOperation(parseOperationHandle(operationHandleStr))
OperationDetail(operation.shouldRunAsync, operation.isTimedOut, operation.getStatus)
} catch {
case NonFatal(_) =>
throw new NotFoundException(s"Error getting an operation detail")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.kyuubi.server.api.v1

import java.util.UUID
import javax.ws.rs.{Consumes, DELETE, GET, Path, PathParam, POST, Produces, _}
import javax.ws.rs.core.{MediaType, Response}

Expand All @@ -30,10 +29,11 @@ import io.swagger.v3.oas.annotations.tags.Tag
import org.apache.hive.service.rpc.thrift.{TGetInfoType, TProtocolVersion}

import org.apache.kyuubi.Utils.error
import org.apache.kyuubi.cli.HandleIdentifier
import org.apache.kyuubi.operation.{OperationHandle, OperationType}
import org.apache.kyuubi.operation.OperationHandle
import org.apache.kyuubi.operation.OperationHandle.parseOperationHandle
import org.apache.kyuubi.server.api.ApiRequestContext
import org.apache.kyuubi.session.SessionHandle
import org.apache.kyuubi.session.SessionHandle.parseSessionHandle

@Tag(name = "Session")
@Produces(Array(MediaType.APPLICATION_JSON))
Expand Down Expand Up @@ -61,8 +61,8 @@ private[v1] class SessionsResource extends ApiRequestContext {
@GET
@Path("{sessionHandle}")
def sessionInfo(@PathParam("sessionHandle") sessionHandleStr: String): SessionDetail = {
val sessionHandle = parseSessionHandle(sessionHandleStr)
try {
val sessionHandle = parseSessionHandle(sessionHandleStr)
val session = backendService.sessionManager.getSession(sessionHandle)
SessionDetail(
session.user,
Expand All @@ -75,8 +75,8 @@ private[v1] class SessionsResource extends ApiRequestContext {
session.conf)
} catch {
case NonFatal(e) =>
error(s"Invalid $sessionHandle", e)
throw new NotFoundException(s"Invalid $sessionHandle")
error(s"Invalid $sessionHandleStr", e)
throw new NotFoundException(s"Invalid $sessionHandleStr")
}
}

Expand All @@ -91,11 +91,9 @@ private[v1] class SessionsResource extends ApiRequestContext {
def getInfo(
@PathParam("sessionHandle") sessionHandleStr: String,
@PathParam("infoType") infoType: Int): InfoDetail = {
val sessionHandle = parseSessionHandle(sessionHandleStr)
val info = TGetInfoType.findByValue(infoType)

try {
val infoValue = backendService.getInfo(sessionHandle, info)
val info = TGetInfoType.findByValue(infoType)
val infoValue = backendService.getInfo(parseSessionHandle(sessionHandleStr), info)
InfoDetail(info.toString, infoValue.getStringValue)
} catch {
case NonFatal(e) =>
Expand Down Expand Up @@ -152,8 +150,7 @@ private[v1] class SessionsResource extends ApiRequestContext {
@DELETE
@Path("{sessionHandle}")
def closeSession(@PathParam("sessionHandle") sessionHandleStr: String): Response = {
val sessionHandle = parseSessionHandle(sessionHandleStr)
backendService.closeSession(sessionHandle)
backendService.closeSession(parseSessionHandle(sessionHandleStr))
Response.ok().build()
}

Expand All @@ -167,10 +164,9 @@ private[v1] class SessionsResource extends ApiRequestContext {
def executeStatement(
@PathParam("sessionHandle") sessionHandleStr: String,
request: StatementRequest): OperationHandle = {
val sessionHandle = parseSessionHandle(sessionHandleStr)
try {
backendService.executeStatement(
sessionHandle,
parseSessionHandle(sessionHandleStr),
request.statement,
request.runAsync,
request.queryTimeout)
Expand All @@ -188,9 +184,8 @@ private[v1] class SessionsResource extends ApiRequestContext {
@POST
@Path("{sessionHandle}/operations/typeInfo")
def getTypeInfo(@PathParam("sessionHandle") sessionHandleStr: String): OperationHandle = {
val sessionHandle = parseSessionHandle(sessionHandleStr)
try {
backendService.getTypeInfo(sessionHandle)
backendService.getTypeInfo(parseSessionHandle(sessionHandleStr))
} catch {
case NonFatal(_) =>
throw new NotFoundException(s"Error getting type information")
Expand All @@ -205,9 +200,8 @@ private[v1] class SessionsResource extends ApiRequestContext {
@POST
@Path("{sessionHandle}/operations/catalogs")
def getCatalogs(@PathParam("sessionHandle") sessionHandleStr: String): OperationHandle = {
val sessionHandle = parseSessionHandle(sessionHandleStr)
try {
backendService.getCatalogs(sessionHandle)
backendService.getCatalogs(parseSessionHandle(sessionHandleStr))
} catch {
case NonFatal(_) =>
throw new NotFoundException(s"Error getting catalogs")
Expand All @@ -224,9 +218,11 @@ private[v1] class SessionsResource extends ApiRequestContext {
def getSchemas(
@PathParam("sessionHandle") sessionHandleStr: String,
request: GetSchemasRequest): OperationHandle = {
val sessionHandle = parseSessionHandle(sessionHandleStr)
try {
backendService.getSchemas(sessionHandle, request.catalogName, request.schemaName)
backendService.getSchemas(
parseSessionHandle(sessionHandleStr),
request.catalogName,
request.schemaName)
} catch {
case NonFatal(_) =>
throw new NotFoundException(s"Error getting schemas")
Expand All @@ -243,10 +239,9 @@ private[v1] class SessionsResource extends ApiRequestContext {
def getTables(
@PathParam("sessionHandle") sessionHandleStr: String,
request: GetTablesRequest): OperationHandle = {
val sessionHandle = parseSessionHandle(sessionHandleStr)
try {
backendService.getTables(
sessionHandle,
parseSessionHandle(sessionHandleStr),
request.catalogName,
request.schemaName,
request.tableName,
Expand All @@ -265,9 +260,8 @@ private[v1] class SessionsResource extends ApiRequestContext {
@POST
@Path("{sessionHandle}/operations/tableTypes")
def getTableTypes(@PathParam("sessionHandle") sessionHandleStr: String): OperationHandle = {
val sessionHandle = parseSessionHandle(sessionHandleStr)
try {
backendService.getTableTypes(sessionHandle)
backendService.getTableTypes(parseSessionHandle(sessionHandleStr))
} catch {
case NonFatal(_) =>
throw new NotFoundException(s"Error getting table types")
Expand All @@ -284,10 +278,9 @@ private[v1] class SessionsResource extends ApiRequestContext {
def getColumns(
@PathParam("sessionHandle") sessionHandleStr: String,
request: GetColumnsRequest): OperationHandle = {
val sessionHandle = parseSessionHandle(sessionHandleStr)
try {
backendService.getColumns(
sessionHandle,
parseSessionHandle(sessionHandleStr),
request.catalogName,
request.schemaName,
request.tableName,
Expand All @@ -308,10 +301,9 @@ private[v1] class SessionsResource extends ApiRequestContext {
def getFunctions(
@PathParam("sessionHandle") sessionHandleStr: String,
request: GetFunctionsRequest): OperationHandle = {
val sessionHandle = parseSessionHandle(sessionHandleStr)
try {
backendService.getFunctions(
sessionHandle,
parseSessionHandle(sessionHandleStr),
request.catalogName,
request.schemaName,
request.functionName)
Expand All @@ -331,77 +323,15 @@ private[v1] class SessionsResource extends ApiRequestContext {
def closeOperation(
@PathParam("sessionHandle") sessionHandleStr: String,
@PathParam("operationHandle") operationHandleStr: String): OperationHandle = {
val sessionHandle = parseSessionHandle(sessionHandleStr)
val operationHandle = parseOperationHandle(operationHandleStr)

try {
backendService.sessionManager.getSession(sessionHandle).closeOperation(operationHandle)
val operationHandle = parseOperationHandle(operationHandleStr)
backendService.sessionManager.getSession(parseSessionHandle(sessionHandleStr))
.closeOperation(operationHandle)
operationHandle
} catch {
case NonFatal(_) =>
throw new NotFoundException(s"Error closing an operation")
}
}

@ApiResponse(
responseCode = "200",
content = Array(new Content(
mediaType = MediaType.APPLICATION_JSON)),
description =
"Get an operation detail with a given session identifier and operation identifier")
@GET
@Path("{sessionHandle}/operations/{operationHandle}")
def getOperationHandle(
@PathParam("sessionHandle") sessionHandleStr: String,
@PathParam("operationHandle") operationHandleStr: String): OperationDetail = {
val operationHandle = parseOperationHandle(operationHandleStr)
try {
val operation = backendService.sessionManager.operationManager.getOperation(operationHandle)
OperationDetail(operation.shouldRunAsync, operation.isTimedOut, operation.getStatus)
} catch {
case NonFatal(e) =>
throw new NotFoundException(s"Error closing an operation")
}
}

def parseOperationHandle(operationHandleStr: String): OperationHandle = {
try {
val operationHandleParts = operationHandleStr.split("\\|")
require(
operationHandleParts.size == 4,
s"Expected 4 parameters but found ${operationHandleParts.size}.")

val handleIdentifier = new HandleIdentifier(
UUID.fromString(operationHandleParts(0)),
UUID.fromString(operationHandleParts(1)))

val protocolVersion = TProtocolVersion.findByValue(operationHandleParts(2).toInt)
val operationType = OperationType.withName(operationHandleParts(3))
val operationHandle = new OperationHandle(handleIdentifier, operationType, protocolVersion)

operationHandle
} catch {
case NonFatal(e) =>
error(s"Error getting operationHandle by $operationHandleStr.", e)
throw new NotFoundException(s"Error getting operationHandle by $operationHandleStr.")
}
}

def parseSessionHandle(sessionHandleStr: String): SessionHandle = {
try {
val splitSessionHandle = sessionHandleStr.split("\\|")
val handleIdentifier = new HandleIdentifier(
UUID.fromString(splitSessionHandle(0)),
UUID.fromString(splitSessionHandle(1)))
val protocolVersion = TProtocolVersion.findByValue(splitSessionHandle(2).toInt)
val sessionHandle = new SessionHandle(handleIdentifier, protocolVersion)

// if the sessionHandle is invalid, KyuubiSQLException will be thrown here.
backendService.sessionManager.getSession(sessionHandle)
sessionHandle
} catch {
case NonFatal(e) =>
error(s"Error getting sessionHandle by $sessionHandleStr.", e)
throw new NotFoundException(s"Error getting sessionHandle by $sessionHandleStr.")
}
}
}
Loading

0 comments on commit 4319df7

Please sign in to comment.