Skip to content

Commit

Permalink
comment
Browse files Browse the repository at this point in the history
  • Loading branch information
turboFei committed Nov 19, 2023
1 parent 5d9d4dc commit 7ce89f8
Show file tree
Hide file tree
Showing 14 changed files with 102 additions and 98 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ class BatchJobSubmission(
OperationLog.removeCurrentOperationLog()
}

override protected def runKyuubiOperationInternal(): Unit = {
override protected def runInternal(): Unit = session.handleSessionException {
val asyncOperation: Runnable = () => {
try {
metadata match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.kyuubi.config.KyuubiConf.OPERATION_QUERY_TIMEOUT_MONITOR_ENABL
import org.apache.kyuubi.metrics.{MetricsConstants, MetricsSystem}
import org.apache.kyuubi.operation.FetchOrientation.FETCH_NEXT
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.session.Session
import org.apache.kyuubi.session.{KyuubiSession, Session}

class ExecuteStatement(
session: Session,
Expand Down Expand Up @@ -164,20 +164,21 @@ class ExecuteStatement(
}
}

override protected def runKyuubiOperationInternal(): Unit = {
if (isTimeoutMonitorEnabled) {
addTimeoutMonitor(queryTimeout)
}
executeStatement()
val sessionManager = session.sessionManager
val asyncOperation: Runnable = () => waitStatementComplete()
try {
val opHandle = sessionManager.submitBackgroundOperation(asyncOperation)
setBackgroundHandle(opHandle)
} catch onError("submitting query in background, query rejected")
override protected def runInternal(): Unit =
session.asInstanceOf[KyuubiSession].handleSessionException {
if (isTimeoutMonitorEnabled) {
addTimeoutMonitor(queryTimeout)
}
executeStatement()
val sessionManager = session.sessionManager
val asyncOperation: Runnable = () => waitStatementComplete()
try {
val opHandle = sessionManager.submitBackgroundOperation(asyncOperation)
setBackgroundHandle(opHandle)
} catch onError("submitting query in background, query rejected")

if (!shouldRunAsync) getBackgroundHandle.get()
}
if (!shouldRunAsync) getBackgroundHandle.get()
}

override protected def eventEnabled: Boolean = true
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class ExecutedCommandExec(
OperationLog.removeCurrentOperationLog()
}

override protected def runKyuubiOperationInternal(): Unit = {
override protected def runInternal(): Unit = session.handleSessionException {
val asyncOperation: Runnable = () => {
setState(OperationState.RUNNING)
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@

package org.apache.kyuubi.operation

import org.apache.kyuubi.session.Session
import org.apache.kyuubi.session.{KyuubiSession, Session}

class GetCatalogs(session: Session) extends KyuubiOperation(session) {

override protected def runKyuubiOperationInternal(): Unit = {
try {
_remoteOpHandle = client.getCatalogs
} catch onError()
}
override protected def runInternal(): Unit =
session.asInstanceOf[KyuubiSession].handleSessionException {
try {
_remoteOpHandle = client.getCatalogs
} catch onError()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.kyuubi.operation

import org.apache.kyuubi.session.Session
import org.apache.kyuubi.session.{KyuubiSession, Session}

class GetColumns(
session: Session,
Expand All @@ -27,9 +27,10 @@ class GetColumns(
columnName: String)
extends KyuubiOperation(session) {

override protected def runKyuubiOperationInternal(): Unit = {
try {
_remoteOpHandle = client.getColumns(catalogName, schemaName, tableName, columnName)
} catch onError()
}
override protected def runInternal(): Unit =
session.asInstanceOf[KyuubiSession].handleSessionException {
try {
_remoteOpHandle = client.getColumns(catalogName, schemaName, tableName, columnName)
} catch onError()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.kyuubi.operation

import org.apache.kyuubi.session.Session
import org.apache.kyuubi.session.{KyuubiSession, Session}

class GetCrossReference(
session: Session,
Expand All @@ -29,15 +29,16 @@ class GetCrossReference(
foreignTable: String)
extends KyuubiOperation(session) {

override protected def runKyuubiOperationInternal(): Unit = {
try {
_remoteOpHandle = client.getCrossReference(
primaryCatalog,
primarySchema,
primaryTable,
foreignCatalog,
foreignSchema,
foreignTable)
} catch onError()
}
override protected def runInternal(): Unit =
session.asInstanceOf[KyuubiSession].handleSessionException {
try {
_remoteOpHandle = client.getCrossReference(
primaryCatalog,
primarySchema,
primaryTable,
foreignCatalog,
foreignSchema,
foreignTable)
} catch onError()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.kyuubi.operation

import org.apache.kyuubi.session.Session
import org.apache.kyuubi.session.{KyuubiSession, Session}

class GetFunctions(
session: Session,
Expand All @@ -26,9 +26,10 @@ class GetFunctions(
functionName: String)
extends KyuubiOperation(session) {

override protected def runKyuubiOperationInternal(): Unit = {
try {
_remoteOpHandle = client.getFunctions(catalogName, schemaName, functionName)
} catch onError()
}
override protected def runInternal(): Unit =
session.asInstanceOf[KyuubiSession].handleSessionException {
try {
_remoteOpHandle = client.getFunctions(catalogName, schemaName, functionName)
} catch onError()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.kyuubi.operation

import org.apache.kyuubi.session.Session
import org.apache.kyuubi.session.{KyuubiSession, Session}

class GetPrimaryKeys(
session: Session,
Expand All @@ -26,9 +26,10 @@ class GetPrimaryKeys(
tableName: String)
extends KyuubiOperation(session) {

override protected def runKyuubiOperationInternal(): Unit = {
try {
_remoteOpHandle = client.getPrimaryKeys(catalogName, schemaName, tableName)
} catch onError()
}
override protected def runInternal(): Unit =
session.asInstanceOf[KyuubiSession].handleSessionException {
try {
_remoteOpHandle = client.getPrimaryKeys(catalogName, schemaName, tableName)
} catch onError()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,18 @@

package org.apache.kyuubi.operation

import org.apache.kyuubi.session.Session
import org.apache.kyuubi.session.{KyuubiSession, Session}

class GetSchemas(
session: Session,
catalogName: String,
schemaName: String)
extends KyuubiOperation(session) {

override protected def runKyuubiOperationInternal(): Unit = {
try {
_remoteOpHandle = client.getSchemas(catalogName, schemaName)
} catch onError()
}
override protected def runInternal(): Unit =
session.asInstanceOf[KyuubiSession].handleSessionException {
try {
_remoteOpHandle = client.getSchemas(catalogName, schemaName)
} catch onError()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@

package org.apache.kyuubi.operation

import org.apache.kyuubi.session.Session
import org.apache.kyuubi.session.{KyuubiSession, Session}

class GetTableTypes(session: Session) extends KyuubiOperation(session) {

override protected def runKyuubiOperationInternal(): Unit = {
try {
_remoteOpHandle = client.getTableTypes
} catch onError()
}
override protected def runInternal(): Unit =
session.asInstanceOf[KyuubiSession].handleSessionException {
try {
_remoteOpHandle = client.getTableTypes
} catch onError()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.kyuubi.operation

import org.apache.kyuubi.session.Session
import org.apache.kyuubi.session.{KyuubiSession, Session}

class GetTables(
session: Session,
Expand All @@ -27,9 +27,10 @@ class GetTables(
tableTypes: java.util.List[String])
extends KyuubiOperation(session) {

override protected def runKyuubiOperationInternal(): Unit = {
try {
_remoteOpHandle = client.getTables(catalogName, schemaName, tableName, tableTypes)
} catch onError()
}
override protected def runInternal(): Unit =
session.asInstanceOf[KyuubiSession].handleSessionException {
try {
_remoteOpHandle = client.getTables(catalogName, schemaName, tableName, tableTypes)
} catch onError()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@

package org.apache.kyuubi.operation

import org.apache.kyuubi.session.Session
import org.apache.kyuubi.session.{KyuubiSession, Session}

class GetTypeInfo(session: Session) extends KyuubiOperation(session) {

override protected def runKyuubiOperationInternal(): Unit = {
try {
_remoteOpHandle = client.getTypeInfo
} catch onError()
}
override protected def runInternal(): Unit =
session.asInstanceOf[KyuubiSession].handleSessionException {
try {
_remoteOpHandle = client.getTypeInfo
} catch onError()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.kyuubi.metrics.MetricsConstants.{OPERATION_FAIL, OPERATION_OPE
import org.apache.kyuubi.metrics.MetricsSystem
import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
import org.apache.kyuubi.operation.OperationState.OperationState
import org.apache.kyuubi.session.{KyuubiSession, KyuubiSessionImpl, KyuubiSessionManager, Session}
import org.apache.kyuubi.session.{KyuubiSessionImpl, KyuubiSessionManager, Session}
import org.apache.kyuubi.util.ThriftUtils

abstract class KyuubiOperation(session: Session) extends AbstractOperation(session) {
Expand Down Expand Up @@ -100,13 +100,6 @@ abstract class KyuubiOperation(session: Session) extends AbstractOperation(sessi
}
}

final override protected def runInternal(): Unit =
session.asInstanceOf[KyuubiSession].handleSessionException {
runKyuubiOperationInternal()
}

protected def runKyuubiOperationInternal(): Unit

override protected def beforeRun(): Unit = {
setHasResultSet(true)
setState(OperationState.RUNNING)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.kyuubi.operation

import org.apache.kyuubi.engine.{ApplicationInfo, ApplicationState}
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.session.KyuubiSessionImpl
import org.apache.kyuubi.session.{KyuubiSession, KyuubiSessionImpl}

class LaunchEngine(session: KyuubiSessionImpl, override val shouldRunAsync: Boolean)
extends KyuubiApplicationOperation(session) {
Expand Down Expand Up @@ -53,21 +53,22 @@ class LaunchEngine(session: KyuubiSessionImpl, override val shouldRunAsync: Bool
OperationLog.removeCurrentOperationLog()
}

override protected def runKyuubiOperationInternal(): Unit = {
val asyncOperation: Runnable = () => {
setState(OperationState.RUNNING)
override protected def runInternal(): Unit =
session.asInstanceOf[KyuubiSession].handleSessionException {
val asyncOperation: Runnable = () => {
setState(OperationState.RUNNING)
try {
session.openEngineSession(getOperationLog)
setState(OperationState.FINISHED)
} catch onError()
}
try {
session.openEngineSession(getOperationLog)
setState(OperationState.FINISHED)
} catch onError()
}
try {
val opHandle = session.sessionManager.submitBackgroundOperation(asyncOperation)
setBackgroundHandle(opHandle)
} catch onError("submitting open engine operation in background, request rejected")
val opHandle = session.sessionManager.submitBackgroundOperation(asyncOperation)
setBackgroundHandle(opHandle)
} catch onError("submitting open engine operation in background, request rejected")

if (!shouldRunAsync) getBackgroundHandle.get()
}
if (!shouldRunAsync) getBackgroundHandle.get()
}

override protected def applicationInfoMap: Option[Map[String, String]] = {
super.applicationInfoMap.map { _ + ("refId" -> session.engine.getEngineRefId()) }
Expand Down

0 comments on commit 7ce89f8

Please sign in to comment.