Skip to content

Commit

Permalink
[KYUUBI #5382][JDBC] Duplication cleanup improvement in JdbcDialect a…
Browse files Browse the repository at this point in the history
…nd schema helpers

### _Why are the changes needed?_

To close #5382.

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [ ] [Run test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests) locally before make a pull request

### _Was this patch authored or co-authored using generative AI tooling?_

No

Closes #5490 from zhuyaogai/issue-5382.

Closes #5382

4757445 [Fantasy-Jay] Remove unrelated comment.
f68c7aa [Fantasy-Jay] Refactor JDBC engine to reduce to code duplication.
4ad6b3c [Fantasy-Jay] Refactor JDBC engine to reduce to code duplication.

Authored-by: Fantasy-Jay <[email protected]>
Signed-off-by: liangbowen <[email protected]>
  • Loading branch information
zhuyaogai authored and bowenliang123 committed Oct 23, 2023
1 parent 6ad9307 commit 8196480
Show file tree
Hide file tree
Showing 9 changed files with 366 additions and 418 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,42 +15,27 @@
* limitations under the License.
*/
package org.apache.kyuubi.engine.jdbc.dialect
import java.sql.{Connection, ResultSet, Statement}
import java.sql.{Connection, Statement}
import java.util

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer

import org.apache.commons.lang3.StringUtils

import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.engine.jdbc.doris.{DorisRowSetHelper, DorisSchemaHelper}
import org.apache.kyuubi.engine.jdbc.schema.{RowSetHelper, SchemaHelper}
import org.apache.kyuubi.operation.Operation
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
import org.apache.kyuubi.session.Session

class DorisDialect extends JdbcDialect {

override def createStatement(connection: Connection, fetchSize: Int): Statement = {
val statement =
connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
val statement = super.createStatement(connection, fetchSize)
statement.setFetchSize(Integer.MIN_VALUE)
statement
}

override def getTypeInfoOperation(session: Session): Operation = {
throw KyuubiSQLException.featureNotSupported()
}

override def getCatalogsOperation(session: Session): Operation = {
throw KyuubiSQLException.featureNotSupported()
}

override def getSchemasOperation(session: Session): Operation = {
throw KyuubiSQLException.featureNotSupported()
}

override def getTablesQuery(
catalog: String,
schema: String,
Expand Down Expand Up @@ -96,10 +81,6 @@ class DorisDialect extends JdbcDialect {
query.toString()
}

override def getTableTypesOperation(session: Session): Operation = {
throw KyuubiSQLException.featureNotSupported()
}

override def getColumnsQuery(
session: Session,
catalogName: String,
Expand Down Expand Up @@ -139,18 +120,6 @@ class DorisDialect extends JdbcDialect {
query.toString()
}

override def getFunctionsOperation(session: Session): Operation = {
throw KyuubiSQLException.featureNotSupported()
}

override def getPrimaryKeysOperation(session: Session): Operation = {
throw KyuubiSQLException.featureNotSupported()
}

override def getCrossReferenceOperation(session: Session): Operation = {
throw KyuubiSQLException.featureNotSupported()
}

override def getRowSetHelper(): RowSetHelper = {
new DorisRowSetHelper
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@
*/
package org.apache.kyuubi.engine.jdbc.dialect

import java.sql.{Connection, Statement}
import java.sql.{Connection, ResultSet, Statement}
import java.util

import org.apache.kyuubi.{KyuubiException, Logging}
import org.apache.kyuubi.{KyuubiException, KyuubiSQLException, Logging}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.{ENGINE_JDBC_CONNECTION_URL, ENGINE_JDBC_SHORT_NAME}
import org.apache.kyuubi.engine.jdbc.schema.{RowSetHelper, SchemaHelper}
Expand All @@ -30,21 +30,34 @@ import org.apache.kyuubi.util.reflect.ReflectUtils._

abstract class JdbcDialect extends SupportServiceLoader with Logging {

def createStatement(connection: Connection, fetchSize: Int = 1000): Statement
def createStatement(connection: Connection, fetchSize: Int = 1000): Statement = {
val statement =
connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
statement.setFetchSize(fetchSize)
statement
}

def getTypeInfoOperation(session: Session): Operation
def getTypeInfoOperation(session: Session): Operation = {
throw KyuubiSQLException.featureNotSupported()
}

def getCatalogsOperation(session: Session): Operation
def getCatalogsOperation(session: Session): Operation = {
throw KyuubiSQLException.featureNotSupported()
}

def getSchemasOperation(session: Session): Operation
def getSchemasOperation(session: Session): Operation = {
throw KyuubiSQLException.featureNotSupported()
}

def getTablesQuery(
catalog: String,
schema: String,
tableName: String,
tableTypes: util.List[String]): String

def getTableTypesOperation(session: Session): Operation
def getTableTypesOperation(session: Session): Operation = {
throw KyuubiSQLException.featureNotSupported()
}

def getColumnsQuery(
session: Session,
Expand All @@ -53,16 +66,21 @@ abstract class JdbcDialect extends SupportServiceLoader with Logging {
tableName: String,
columnName: String): String

def getFunctionsOperation(session: Session): Operation
def getFunctionsOperation(session: Session): Operation = {
throw KyuubiSQLException.featureNotSupported()
}

def getPrimaryKeysOperation(session: Session): Operation
def getPrimaryKeysOperation(session: Session): Operation = {
throw KyuubiSQLException.featureNotSupported()
}

def getCrossReferenceOperation(session: Session): Operation
def getCrossReferenceOperation(session: Session): Operation = {
throw KyuubiSQLException.featureNotSupported()
}

def getRowSetHelper(): RowSetHelper

def getSchemaHelper(): SchemaHelper

}

object JdbcDialects extends Logging {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,43 +15,20 @@
* limitations under the License.
*/
package org.apache.kyuubi.engine.jdbc.dialect

import java.sql.{Connection, ResultSet, Statement}
import java.util

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer

import org.apache.commons.lang3.StringUtils

import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.engine.jdbc.phoenix.{PhoenixRowSetHelper, PhoenixSchemaHelper}
import org.apache.kyuubi.engine.jdbc.schema.{RowSetHelper, SchemaHelper}
import org.apache.kyuubi.operation.Operation
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
import org.apache.kyuubi.session.Session

class PhoenixDialect extends JdbcDialect {

override def createStatement(connection: Connection, fetchSize: Int): Statement = {
val statement =
connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
statement.setFetchSize(fetchSize)
statement
}

override def getTypeInfoOperation(session: Session): Operation = {
throw KyuubiSQLException.featureNotSupported()
}

override def getCatalogsOperation(session: Session): Operation = {
throw KyuubiSQLException.featureNotSupported()
}

override def getSchemasOperation(session: Session): Operation = {
throw KyuubiSQLException.featureNotSupported()
}

override def getTablesQuery(
catalog: String,
schema: String,
Expand Down Expand Up @@ -91,10 +68,6 @@ class PhoenixDialect extends JdbcDialect {
query.toString()
}

override def getTableTypesOperation(session: Session): Operation = {
throw KyuubiSQLException.featureNotSupported()
}

override def getColumnsQuery(
session: Session,
catalogName: String,
Expand Down Expand Up @@ -127,18 +100,6 @@ class PhoenixDialect extends JdbcDialect {
query.toString()
}

override def getFunctionsOperation(session: Session): Operation = {
throw KyuubiSQLException.featureNotSupported()
}

override def getPrimaryKeysOperation(session: Session): Operation = {
throw KyuubiSQLException.featureNotSupported()
}

override def getCrossReferenceOperation(session: Session): Operation = {
throw KyuubiSQLException.featureNotSupported()
}

override def getRowSetHelper(): RowSetHelper = {
new PhoenixRowSetHelper
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,125 +16,21 @@
*/
package org.apache.kyuubi.engine.jdbc.doris

import java.sql.{Date, Types}
import java.time.LocalDateTime

import scala.collection.JavaConverters._

import org.apache.hive.service.rpc.thrift._

import org.apache.kyuubi.engine.jdbc.schema.{Column, RowSetHelper}
import org.apache.kyuubi.util.RowSetUtils.{bitSetToBuffer, formatDate, formatLocalDateTime}
import org.apache.kyuubi.engine.jdbc.schema.RowSetHelper

class DorisRowSetHelper extends RowSetHelper {

protected def toTColumn(
rows: Seq[Seq[Any]],
ordinal: Int,
sqlType: Int): TColumn = {
val nulls = new java.util.BitSet()
sqlType match {
case Types.BIT =>
val values = getOrSetAsNull[java.lang.Boolean](rows, ordinal, nulls, true)
TColumn.boolVal(new TBoolColumn(values, nulls))

case Types.TINYINT | Types.SMALLINT | Types.INTEGER =>
val values = getOrSetAsNull[java.lang.Integer](rows, ordinal, nulls, 0)
TColumn.i32Val(new TI32Column(values, nulls))

case Types.BIGINT =>
val values = getOrSetAsNull[java.lang.Long](rows, ordinal, nulls, 0L)
TColumn.i64Val(new TI64Column(values, nulls))

case Types.REAL =>
val values = getOrSetAsNull[java.lang.Float](rows, ordinal, nulls, 0.toFloat)
.asScala.map(n => java.lang.Double.valueOf(n.toString)).asJava
TColumn.doubleVal(new TDoubleColumn(values, nulls))

case Types.DOUBLE =>
val values = getOrSetAsNull[java.lang.Double](rows, ordinal, nulls, 0.toDouble)
TColumn.doubleVal(new TDoubleColumn(values, nulls))

case Types.CHAR | Types.VARCHAR =>
val values = getOrSetAsNull[String](rows, ordinal, nulls, "")
TColumn.stringVal(new TStringColumn(values, nulls))

case _ =>
val rowSize = rows.length
val values = new java.util.ArrayList[String](rowSize)
var i = 0
while (i < rowSize) {
val row = rows(i)
nulls.set(i, row(ordinal) == null)
val value =
if (row(ordinal) == null) {
""
} else {
toHiveString(row(ordinal), sqlType)
}
values.add(value)
i += 1
}
TColumn.stringVal(new TStringColumn(values, nulls))
}
}

protected def toTColumnValue(ordinal: Int, row: List[Any], types: List[Column]): TColumnValue = {
types(ordinal).sqlType match {
case Types.BIT =>
val boolValue = new TBoolValue
if (row(ordinal) != null) boolValue.setValue(row(ordinal).asInstanceOf[Boolean])
TColumnValue.boolVal(boolValue)

case Types.TINYINT | Types.SMALLINT | Types.INTEGER =>
val tI32Value = new TI32Value
if (row(ordinal) != null) tI32Value.setValue(row(ordinal).asInstanceOf[Int])
TColumnValue.i32Val(tI32Value)

case Types.BIGINT =>
val tI64Value = new TI64Value
if (row(ordinal) != null) tI64Value.setValue(row(ordinal).asInstanceOf[Long])
TColumnValue.i64Val(tI64Value)

case Types.REAL =>
val tDoubleValue = new TDoubleValue
if (row(ordinal) != null) {
val doubleValue = java.lang.Double.valueOf(row(ordinal).asInstanceOf[Float].toString)
tDoubleValue.setValue(doubleValue)
}
TColumnValue.doubleVal(tDoubleValue)

case Types.DOUBLE =>
val tDoubleValue = new TDoubleValue
if (row(ordinal) != null) tDoubleValue.setValue(row(ordinal).asInstanceOf[Double])
TColumnValue.doubleVal(tDoubleValue)
override def toTinyIntTColumn(rows: Seq[Seq[Any]], ordinal: Int): TColumn =
toIntegerTColumn(rows, ordinal)

case Types.CHAR | Types.VARCHAR =>
val tStringValue = new TStringValue
if (row(ordinal) != null) tStringValue.setValue(row(ordinal).asInstanceOf[String])
TColumnValue.stringVal(tStringValue)
override def toSmallIntTColumn(rows: Seq[Seq[Any]], ordinal: Int): TColumn =
toIntegerTColumn(rows, ordinal)

case _ =>
val tStrValue = new TStringValue
if (row(ordinal) != null) {
tStrValue.setValue(
toHiveString(row(ordinal), types(ordinal).sqlType))
}
TColumnValue.stringVal(tStrValue)
}
}
override def toTinyIntTColumnValue(row: List[Any], ordinal: Int): TColumnValue =
toIntegerTColumnValue(row, ordinal)

protected def toHiveString(data: Any, sqlType: Int): String = {
(data, sqlType) match {
case (date: Date, Types.DATE) =>
formatDate(date)
case (dateTime: LocalDateTime, Types.TIMESTAMP) =>
formatLocalDateTime(dateTime)
case (decimal: java.math.BigDecimal, Types.DECIMAL) =>
decimal.toPlainString
// TODO support bitmap and hll
case (other, _) =>
other.toString
}
}
override def toSmallIntTColumnValue(row: List[Any], ordinal: Int): TColumnValue =
toIntegerTColumnValue(row, ordinal)
}
Loading

0 comments on commit 8196480

Please sign in to comment.