From 81964803c9ea0c9037d7f18422f875f8bb088ac9 Mon Sep 17 00:00:00 2001 From: Fantasy-Jay <13631435453@163.com> Date: Mon, 23 Oct 2023 22:21:17 +0800 Subject: [PATCH] [KYUUBI #5382][JDBC] Duplication cleanup improvement in JdbcDialect and schema helpers ### _Why are the changes needed?_ To close #5382. ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [ ] [Run test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests) locally before make a pull request ### _Was this patch authored or co-authored using generative AI tooling?_ No Closes #5490 from zhuyaogai/issue-5382. Closes #5382 4757445e7 [Fantasy-Jay] Remove unrelated comment. f68c7aa6c [Fantasy-Jay] Refactor JDBC engine to reduce to code duplication. 4ad6b3c53 [Fantasy-Jay] Refactor JDBC engine to reduce to code duplication. Authored-by: Fantasy-Jay <13631435453@163.com> Signed-off-by: liangbowen --- .../engine/jdbc/dialect/DorisDialect.scala | 35 +-- .../engine/jdbc/dialect/JdbcDialect.scala | 40 ++- .../engine/jdbc/dialect/PhoenixDialect.scala | 39 --- .../engine/jdbc/doris/DorisRowSetHelper.scala | 122 +-------- .../engine/jdbc/doris/DorisSchemaHelper.scala | 35 +-- .../jdbc/phoenix/PhoenixRowSetHelper.scala | 142 +---------- .../jdbc/phoenix/PhoenixSchemaHelper.scala | 46 +--- .../engine/jdbc/schema/RowSetHelper.scala | 231 +++++++++++++++++- .../engine/jdbc/schema/SchemaHelper.scala | 94 ++++++- 9 files changed, 366 insertions(+), 418 deletions(-) diff --git a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/dialect/DorisDialect.scala b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/dialect/DorisDialect.scala index c2ae29953c3..f7c1ace6473 100644 --- a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/dialect/DorisDialect.scala +++ b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/dialect/DorisDialect.scala @@ -15,7 +15,7 @@ * limitations under the License. */ package org.apache.kyuubi.engine.jdbc.dialect -import java.sql.{Connection, ResultSet, Statement} +import java.sql.{Connection, Statement} import java.util import scala.collection.JavaConverters._ @@ -23,34 +23,19 @@ import scala.collection.mutable.ArrayBuffer import org.apache.commons.lang3.StringUtils -import org.apache.kyuubi.KyuubiSQLException import org.apache.kyuubi.engine.jdbc.doris.{DorisRowSetHelper, DorisSchemaHelper} import org.apache.kyuubi.engine.jdbc.schema.{RowSetHelper, SchemaHelper} -import org.apache.kyuubi.operation.Operation import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._ import org.apache.kyuubi.session.Session class DorisDialect extends JdbcDialect { override def createStatement(connection: Connection, fetchSize: Int): Statement = { - val statement = - connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY) + val statement = super.createStatement(connection, fetchSize) statement.setFetchSize(Integer.MIN_VALUE) statement } - override def getTypeInfoOperation(session: Session): Operation = { - throw KyuubiSQLException.featureNotSupported() - } - - override def getCatalogsOperation(session: Session): Operation = { - throw KyuubiSQLException.featureNotSupported() - } - - override def getSchemasOperation(session: Session): Operation = { - throw KyuubiSQLException.featureNotSupported() - } - override def getTablesQuery( catalog: String, schema: String, @@ -96,10 +81,6 @@ class DorisDialect extends JdbcDialect { query.toString() } - override def getTableTypesOperation(session: Session): Operation = { - throw KyuubiSQLException.featureNotSupported() - } - override def getColumnsQuery( session: Session, catalogName: String, @@ -139,18 +120,6 @@ class DorisDialect extends JdbcDialect { query.toString() } - override def getFunctionsOperation(session: Session): Operation = { - throw KyuubiSQLException.featureNotSupported() - } - - override def getPrimaryKeysOperation(session: Session): Operation = { - throw KyuubiSQLException.featureNotSupported() - } - - override def getCrossReferenceOperation(session: Session): Operation = { - throw KyuubiSQLException.featureNotSupported() - } - override def getRowSetHelper(): RowSetHelper = { new DorisRowSetHelper } diff --git a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/dialect/JdbcDialect.scala b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/dialect/JdbcDialect.scala index e08b2275875..62e20a1d258 100644 --- a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/dialect/JdbcDialect.scala +++ b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/dialect/JdbcDialect.scala @@ -16,10 +16,10 @@ */ package org.apache.kyuubi.engine.jdbc.dialect -import java.sql.{Connection, Statement} +import java.sql.{Connection, ResultSet, Statement} import java.util -import org.apache.kyuubi.{KyuubiException, Logging} +import org.apache.kyuubi.{KyuubiException, KyuubiSQLException, Logging} import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.config.KyuubiConf.{ENGINE_JDBC_CONNECTION_URL, ENGINE_JDBC_SHORT_NAME} import org.apache.kyuubi.engine.jdbc.schema.{RowSetHelper, SchemaHelper} @@ -30,13 +30,24 @@ import org.apache.kyuubi.util.reflect.ReflectUtils._ abstract class JdbcDialect extends SupportServiceLoader with Logging { - def createStatement(connection: Connection, fetchSize: Int = 1000): Statement + def createStatement(connection: Connection, fetchSize: Int = 1000): Statement = { + val statement = + connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY) + statement.setFetchSize(fetchSize) + statement + } - def getTypeInfoOperation(session: Session): Operation + def getTypeInfoOperation(session: Session): Operation = { + throw KyuubiSQLException.featureNotSupported() + } - def getCatalogsOperation(session: Session): Operation + def getCatalogsOperation(session: Session): Operation = { + throw KyuubiSQLException.featureNotSupported() + } - def getSchemasOperation(session: Session): Operation + def getSchemasOperation(session: Session): Operation = { + throw KyuubiSQLException.featureNotSupported() + } def getTablesQuery( catalog: String, @@ -44,7 +55,9 @@ abstract class JdbcDialect extends SupportServiceLoader with Logging { tableName: String, tableTypes: util.List[String]): String - def getTableTypesOperation(session: Session): Operation + def getTableTypesOperation(session: Session): Operation = { + throw KyuubiSQLException.featureNotSupported() + } def getColumnsQuery( session: Session, @@ -53,16 +66,21 @@ abstract class JdbcDialect extends SupportServiceLoader with Logging { tableName: String, columnName: String): String - def getFunctionsOperation(session: Session): Operation + def getFunctionsOperation(session: Session): Operation = { + throw KyuubiSQLException.featureNotSupported() + } - def getPrimaryKeysOperation(session: Session): Operation + def getPrimaryKeysOperation(session: Session): Operation = { + throw KyuubiSQLException.featureNotSupported() + } - def getCrossReferenceOperation(session: Session): Operation + def getCrossReferenceOperation(session: Session): Operation = { + throw KyuubiSQLException.featureNotSupported() + } def getRowSetHelper(): RowSetHelper def getSchemaHelper(): SchemaHelper - } object JdbcDialects extends Logging { diff --git a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/dialect/PhoenixDialect.scala b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/dialect/PhoenixDialect.scala index 0cce14b42fc..4c8e8f26549 100644 --- a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/dialect/PhoenixDialect.scala +++ b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/dialect/PhoenixDialect.scala @@ -15,8 +15,6 @@ * limitations under the License. */ package org.apache.kyuubi.engine.jdbc.dialect - -import java.sql.{Connection, ResultSet, Statement} import java.util import scala.collection.JavaConverters._ @@ -24,34 +22,13 @@ import scala.collection.mutable.ArrayBuffer import org.apache.commons.lang3.StringUtils -import org.apache.kyuubi.KyuubiSQLException import org.apache.kyuubi.engine.jdbc.phoenix.{PhoenixRowSetHelper, PhoenixSchemaHelper} import org.apache.kyuubi.engine.jdbc.schema.{RowSetHelper, SchemaHelper} -import org.apache.kyuubi.operation.Operation import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._ import org.apache.kyuubi.session.Session class PhoenixDialect extends JdbcDialect { - override def createStatement(connection: Connection, fetchSize: Int): Statement = { - val statement = - connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY) - statement.setFetchSize(fetchSize) - statement - } - - override def getTypeInfoOperation(session: Session): Operation = { - throw KyuubiSQLException.featureNotSupported() - } - - override def getCatalogsOperation(session: Session): Operation = { - throw KyuubiSQLException.featureNotSupported() - } - - override def getSchemasOperation(session: Session): Operation = { - throw KyuubiSQLException.featureNotSupported() - } - override def getTablesQuery( catalog: String, schema: String, @@ -91,10 +68,6 @@ class PhoenixDialect extends JdbcDialect { query.toString() } - override def getTableTypesOperation(session: Session): Operation = { - throw KyuubiSQLException.featureNotSupported() - } - override def getColumnsQuery( session: Session, catalogName: String, @@ -127,18 +100,6 @@ class PhoenixDialect extends JdbcDialect { query.toString() } - override def getFunctionsOperation(session: Session): Operation = { - throw KyuubiSQLException.featureNotSupported() - } - - override def getPrimaryKeysOperation(session: Session): Operation = { - throw KyuubiSQLException.featureNotSupported() - } - - override def getCrossReferenceOperation(session: Session): Operation = { - throw KyuubiSQLException.featureNotSupported() - } - override def getRowSetHelper(): RowSetHelper = { new PhoenixRowSetHelper } diff --git a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/doris/DorisRowSetHelper.scala b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/doris/DorisRowSetHelper.scala index 1ce43c7a4b8..a92942cecdf 100644 --- a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/doris/DorisRowSetHelper.scala +++ b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/doris/DorisRowSetHelper.scala @@ -16,125 +16,21 @@ */ package org.apache.kyuubi.engine.jdbc.doris -import java.sql.{Date, Types} -import java.time.LocalDateTime - -import scala.collection.JavaConverters._ - import org.apache.hive.service.rpc.thrift._ -import org.apache.kyuubi.engine.jdbc.schema.{Column, RowSetHelper} -import org.apache.kyuubi.util.RowSetUtils.{bitSetToBuffer, formatDate, formatLocalDateTime} +import org.apache.kyuubi.engine.jdbc.schema.RowSetHelper class DorisRowSetHelper extends RowSetHelper { - protected def toTColumn( - rows: Seq[Seq[Any]], - ordinal: Int, - sqlType: Int): TColumn = { - val nulls = new java.util.BitSet() - sqlType match { - case Types.BIT => - val values = getOrSetAsNull[java.lang.Boolean](rows, ordinal, nulls, true) - TColumn.boolVal(new TBoolColumn(values, nulls)) - - case Types.TINYINT | Types.SMALLINT | Types.INTEGER => - val values = getOrSetAsNull[java.lang.Integer](rows, ordinal, nulls, 0) - TColumn.i32Val(new TI32Column(values, nulls)) - - case Types.BIGINT => - val values = getOrSetAsNull[java.lang.Long](rows, ordinal, nulls, 0L) - TColumn.i64Val(new TI64Column(values, nulls)) - - case Types.REAL => - val values = getOrSetAsNull[java.lang.Float](rows, ordinal, nulls, 0.toFloat) - .asScala.map(n => java.lang.Double.valueOf(n.toString)).asJava - TColumn.doubleVal(new TDoubleColumn(values, nulls)) - - case Types.DOUBLE => - val values = getOrSetAsNull[java.lang.Double](rows, ordinal, nulls, 0.toDouble) - TColumn.doubleVal(new TDoubleColumn(values, nulls)) - - case Types.CHAR | Types.VARCHAR => - val values = getOrSetAsNull[String](rows, ordinal, nulls, "") - TColumn.stringVal(new TStringColumn(values, nulls)) - - case _ => - val rowSize = rows.length - val values = new java.util.ArrayList[String](rowSize) - var i = 0 - while (i < rowSize) { - val row = rows(i) - nulls.set(i, row(ordinal) == null) - val value = - if (row(ordinal) == null) { - "" - } else { - toHiveString(row(ordinal), sqlType) - } - values.add(value) - i += 1 - } - TColumn.stringVal(new TStringColumn(values, nulls)) - } - } - - protected def toTColumnValue(ordinal: Int, row: List[Any], types: List[Column]): TColumnValue = { - types(ordinal).sqlType match { - case Types.BIT => - val boolValue = new TBoolValue - if (row(ordinal) != null) boolValue.setValue(row(ordinal).asInstanceOf[Boolean]) - TColumnValue.boolVal(boolValue) - - case Types.TINYINT | Types.SMALLINT | Types.INTEGER => - val tI32Value = new TI32Value - if (row(ordinal) != null) tI32Value.setValue(row(ordinal).asInstanceOf[Int]) - TColumnValue.i32Val(tI32Value) - - case Types.BIGINT => - val tI64Value = new TI64Value - if (row(ordinal) != null) tI64Value.setValue(row(ordinal).asInstanceOf[Long]) - TColumnValue.i64Val(tI64Value) - - case Types.REAL => - val tDoubleValue = new TDoubleValue - if (row(ordinal) != null) { - val doubleValue = java.lang.Double.valueOf(row(ordinal).asInstanceOf[Float].toString) - tDoubleValue.setValue(doubleValue) - } - TColumnValue.doubleVal(tDoubleValue) - - case Types.DOUBLE => - val tDoubleValue = new TDoubleValue - if (row(ordinal) != null) tDoubleValue.setValue(row(ordinal).asInstanceOf[Double]) - TColumnValue.doubleVal(tDoubleValue) + override def toTinyIntTColumn(rows: Seq[Seq[Any]], ordinal: Int): TColumn = + toIntegerTColumn(rows, ordinal) - case Types.CHAR | Types.VARCHAR => - val tStringValue = new TStringValue - if (row(ordinal) != null) tStringValue.setValue(row(ordinal).asInstanceOf[String]) - TColumnValue.stringVal(tStringValue) + override def toSmallIntTColumn(rows: Seq[Seq[Any]], ordinal: Int): TColumn = + toIntegerTColumn(rows, ordinal) - case _ => - val tStrValue = new TStringValue - if (row(ordinal) != null) { - tStrValue.setValue( - toHiveString(row(ordinal), types(ordinal).sqlType)) - } - TColumnValue.stringVal(tStrValue) - } - } + override def toTinyIntTColumnValue(row: List[Any], ordinal: Int): TColumnValue = + toIntegerTColumnValue(row, ordinal) - protected def toHiveString(data: Any, sqlType: Int): String = { - (data, sqlType) match { - case (date: Date, Types.DATE) => - formatDate(date) - case (dateTime: LocalDateTime, Types.TIMESTAMP) => - formatLocalDateTime(dateTime) - case (decimal: java.math.BigDecimal, Types.DECIMAL) => - decimal.toPlainString - // TODO support bitmap and hll - case (other, _) => - other.toString - } - } + override def toSmallIntTColumnValue(row: List[Any], ordinal: Int): TColumnValue = + toIntegerTColumnValue(row, ordinal) } diff --git a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/doris/DorisSchemaHelper.scala b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/doris/DorisSchemaHelper.scala index ca8bb6ec314..b323d373142 100644 --- a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/doris/DorisSchemaHelper.scala +++ b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/doris/DorisSchemaHelper.scala @@ -16,44 +16,13 @@ */ package org.apache.kyuubi.engine.jdbc.doris -import java.sql.Types - import org.apache.hive.service.rpc.thrift._ import org.apache.kyuubi.engine.jdbc.schema.SchemaHelper class DorisSchemaHelper extends SchemaHelper { - override def toTTypeId(sqlType: Int): TTypeId = sqlType match { - case Types.BIT => - TTypeId.BOOLEAN_TYPE - - case Types.TINYINT | Types.SMALLINT | Types.INTEGER => - TTypeId.INT_TYPE - - case Types.BIGINT => - TTypeId.BIGINT_TYPE - - case Types.REAL => - TTypeId.FLOAT_TYPE - - case Types.DOUBLE => - TTypeId.DOUBLE_TYPE - - case Types.CHAR | Types.VARCHAR => - TTypeId.STRING_TYPE - - case Types.DATE => - TTypeId.DATE_TYPE - - case Types.TIMESTAMP => - TTypeId.TIMESTAMP_TYPE - - case Types.DECIMAL => - TTypeId.DECIMAL_TYPE + override def tinyIntToTTypeId: TTypeId = TTypeId.INT_TYPE - // TODO add more type support - case _ => - TTypeId.STRING_TYPE - } + override def smallIntToTTypeId: TTypeId = TTypeId.INT_TYPE } diff --git a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/phoenix/PhoenixRowSetHelper.scala b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/phoenix/PhoenixRowSetHelper.scala index a1f6d4ac25c..67d9d09e529 100644 --- a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/phoenix/PhoenixRowSetHelper.scala +++ b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/phoenix/PhoenixRowSetHelper.scala @@ -16,144 +16,6 @@ */ package org.apache.kyuubi.engine.jdbc.phoenix -import java.sql.{Date, Types} -import java.time.LocalDateTime +import org.apache.kyuubi.engine.jdbc.schema.RowSetHelper -import scala.collection.JavaConverters._ - -import org.apache.hive.service.rpc.thrift._ - -import org.apache.kyuubi.engine.jdbc.schema.{Column, RowSetHelper} -import org.apache.kyuubi.util.RowSetUtils.{bitSetToBuffer, formatDate, formatLocalDateTime} - -class PhoenixRowSetHelper extends RowSetHelper { - - protected def toTColumn( - rows: Seq[Seq[Any]], - ordinal: Int, - sqlType: Int): TColumn = { - val nulls = new java.util.BitSet() - sqlType match { - - case Types.BIT => - val values = getOrSetAsNull[java.lang.Boolean](rows, ordinal, nulls, true) - TColumn.boolVal(new TBoolColumn(values, nulls)) - - case Types.TINYINT => - val values = getOrSetAsNull[java.lang.Byte](rows, ordinal, nulls, 0.toByte) - TColumn.byteVal(new TByteColumn(values, nulls)) - - case Types.SMALLINT => - val values = getOrSetAsNull[java.lang.Short](rows, ordinal, nulls, 0.toShort) - TColumn.i16Val(new TI16Column(values, nulls)) - - case Types.INTEGER => - val values = getOrSetAsNull[java.lang.Integer](rows, ordinal, nulls, 0) - TColumn.i32Val(new TI32Column(values, nulls)) - - case Types.BIGINT => - val values = getOrSetAsNull[java.lang.Long](rows, ordinal, nulls, 0L) - TColumn.i64Val(new TI64Column(values, nulls)) - - case Types.REAL => - val values = getOrSetAsNull[java.lang.Float](rows, ordinal, nulls, 0.toFloat) - .asScala.map(n => java.lang.Double.valueOf(n.toString)).asJava - TColumn.doubleVal(new TDoubleColumn(values, nulls)) - - case Types.DOUBLE => - val values = getOrSetAsNull[java.lang.Double](rows, ordinal, nulls, 0.toDouble) - TColumn.doubleVal(new TDoubleColumn(values, nulls)) - - case Types.CHAR | Types.VARCHAR => - val values = getOrSetAsNull[String](rows, ordinal, nulls, "") - TColumn.stringVal(new TStringColumn(values, nulls)) - - case _ => - val rowSize = rows.length - val values = new java.util.ArrayList[String](rowSize) - var i = 0 - while (i < rowSize) { - val row = rows(i) - nulls.set(i, row(ordinal) == null) - val value = - if (row(ordinal) == null) { - "" - } else { - toHiveString(row(ordinal), sqlType) - } - values.add(value) - i += 1 - } - TColumn.stringVal(new TStringColumn(values, nulls)) - } - } - - protected def toTColumnValue(ordinal: Int, row: List[Any], types: List[Column]): TColumnValue = { - types(ordinal).sqlType match { - case Types.BIT => - val boolValue = new TBoolValue - if (row(ordinal) != null) boolValue.setValue(row(ordinal).asInstanceOf[Boolean]) - TColumnValue.boolVal(boolValue) - - case Types.TINYINT => - val byteValue = new TByteValue() - if (row(ordinal) != null) byteValue.setValue(row(ordinal).asInstanceOf[Byte]) - TColumnValue.byteVal(byteValue) - - case Types.SMALLINT => - val tI16Value = new TI16Value() - if (row(ordinal) != null) tI16Value.setValue(row(ordinal).asInstanceOf[Short]) - TColumnValue.i16Val(tI16Value) - - case Types.INTEGER => - val tI32Value = new TI32Value - if (row(ordinal) != null) tI32Value.setValue(row(ordinal).asInstanceOf[Int]) - TColumnValue.i32Val(tI32Value) - - case Types.BIGINT => - val tI64Value = new TI64Value - if (row(ordinal) != null) tI64Value.setValue(row(ordinal).asInstanceOf[Long]) - TColumnValue.i64Val(tI64Value) - - case Types.REAL => - val tDoubleValue = new TDoubleValue - if (row(ordinal) != null) { - val doubleValue = java.lang.Double.valueOf(row(ordinal).asInstanceOf[Float].toString) - tDoubleValue.setValue(doubleValue) - } - TColumnValue.doubleVal(tDoubleValue) - - case Types.DOUBLE => - val tDoubleValue = new TDoubleValue - if (row(ordinal) != null) tDoubleValue.setValue(row(ordinal).asInstanceOf[Double]) - TColumnValue.doubleVal(tDoubleValue) - - case Types.CHAR | Types.VARCHAR => - val tStringValue = new TStringValue - if (row(ordinal) != null) tStringValue.setValue(row(ordinal).asInstanceOf[String]) - TColumnValue.stringVal(tStringValue) - - case _ => - val tStrValue = new TStringValue - if (row(ordinal) != null) { - tStrValue.setValue( - toHiveString(row(ordinal), types(ordinal).sqlType)) - } - TColumnValue.stringVal(tStrValue) - } - } - - protected def toHiveString(data: Any, sqlType: Int): String = { - (data, sqlType) match { - case (date: Date, Types.DATE) => - formatDate(date) - case (dateTime: LocalDateTime, Types.TIMESTAMP) => - formatLocalDateTime(dateTime) - case (decimal: java.math.BigDecimal, Types.DECIMAL) => - decimal.toPlainString - // TODO support bitmap and hll - case (other, _) => - other.toString - } - } -} +class PhoenixRowSetHelper extends RowSetHelper {} diff --git a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/phoenix/PhoenixSchemaHelper.scala b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/phoenix/PhoenixSchemaHelper.scala index f5e04f7ca72..938956cdc40 100644 --- a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/phoenix/PhoenixSchemaHelper.scala +++ b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/phoenix/PhoenixSchemaHelper.scala @@ -16,50 +16,6 @@ */ package org.apache.kyuubi.engine.jdbc.phoenix -import java.sql.Types - -import org.apache.hive.service.rpc.thrift._ - import org.apache.kyuubi.engine.jdbc.schema.SchemaHelper -class PhoenixSchemaHelper extends SchemaHelper { - - override def toTTypeId(sqlType: Int): TTypeId = sqlType match { - case Types.BIT => - TTypeId.BOOLEAN_TYPE - - case Types.TINYINT => - TTypeId.TINYINT_TYPE - - case Types.SMALLINT => - TTypeId.SMALLINT_TYPE - - case Types.INTEGER => - TTypeId.INT_TYPE - - case Types.BIGINT => - TTypeId.BIGINT_TYPE - - case Types.REAL => - TTypeId.FLOAT_TYPE - - case Types.DOUBLE => - TTypeId.DOUBLE_TYPE - - case Types.CHAR | Types.VARCHAR => - TTypeId.STRING_TYPE - - case Types.DATE => - TTypeId.DATE_TYPE - - case Types.TIMESTAMP => - TTypeId.TIMESTAMP_TYPE - - case Types.DECIMAL => - TTypeId.DECIMAL_TYPE - - // TODO add more type support - case _ => - TTypeId.STRING_TYPE - } -} +class PhoenixSchemaHelper extends SchemaHelper {} diff --git a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/schema/RowSetHelper.scala b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/schema/RowSetHelper.scala index d489ed8a2b7..74b4cec108d 100644 --- a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/schema/RowSetHelper.scala +++ b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/schema/RowSetHelper.scala @@ -16,10 +16,16 @@ */ package org.apache.kyuubi.engine.jdbc.schema -import java.util +import java.{lang, util} +import java.sql.{Date, Types} +import java.time.LocalDateTime + +import scala.collection.JavaConverters._ import org.apache.hive.service.rpc.thrift._ +import org.apache.kyuubi.util.RowSetUtils.{bitSetToBuffer, formatDate, formatLocalDateTime} + abstract class RowSetHelper { def toTRowSet( @@ -70,9 +76,73 @@ abstract class RowSetHelper { protected def toTColumn( rows: Seq[Seq[Any]], ordinal: Int, - sqlType: Int): TColumn + sqlType: Int): TColumn = { + sqlType match { + case Types.BIT => + toBitTColumn(rows, ordinal) + + case Types.TINYINT => + toTinyIntTColumn(rows, ordinal) + + case Types.SMALLINT => + toSmallIntTColumn(rows, ordinal) + + case Types.INTEGER => + toIntegerTColumn(rows, ordinal) + + case Types.BIGINT => + toBigIntTColumn(rows, ordinal) + + case Types.REAL => + toRealTColumn(rows, ordinal) + + case Types.DOUBLE => + toDoubleTColumn(rows, ordinal) + + case Types.CHAR => + toCharTColumn(rows, ordinal) + + case Types.VARCHAR => + toVarcharTColumn(rows, ordinal) + + case _ => + toDefaultTColumn(rows, ordinal, sqlType) + } + } + + protected def toTColumnValue(ordinal: Int, row: List[Any], types: List[Column]): TColumnValue = { + types(ordinal).sqlType match { + case Types.BIT => + toBitTColumnValue(row, ordinal) + + case Types.TINYINT => + toTinyIntTColumnValue(row, ordinal) + + case Types.SMALLINT => + toSmallIntTColumnValue(row, ordinal) + + case Types.INTEGER => + toIntegerTColumnValue(row, ordinal) + + case Types.BIGINT => + toBigIntTColumnValue(row, ordinal) + + case Types.REAL => + toRealTColumnValue(row, ordinal) + + case Types.DOUBLE => + toDoubleTColumnValue(row, ordinal) - protected def toTColumnValue(ordinal: Int, row: List[Any], types: List[Column]): TColumnValue + case Types.CHAR => + toCharTColumnValue(row, ordinal) + + case Types.VARCHAR => + toVarcharTColumnValue(row, ordinal) + + case _ => + toDefaultTColumnValue(row, ordinal, types) + } + } protected def getOrSetAsNull[T]( rows: Seq[Seq[Any]], @@ -95,4 +165,159 @@ abstract class RowSetHelper { } ret } + + protected def toDefaultTColumn(rows: Seq[Seq[Any]], ordinal: Int, sqlType: Int): TColumn = { + val nulls = new java.util.BitSet() + val rowSize = rows.length + val values = new util.ArrayList[String](rowSize) + var i = 0 + while (i < rowSize) { + val row = rows(i) + nulls.set(i, row(ordinal) == null) + val value = + if (row(ordinal) == null) { + "" + } else { + toHiveString(row(ordinal), sqlType) + } + values.add(value) + i += 1 + } + TColumn.stringVal(new TStringColumn(values, nulls)) + } + + protected def toBitTColumn(rows: Seq[Seq[Any]], ordinal: Int): TColumn = { + val nulls = new java.util.BitSet() + val values = getOrSetAsNull[java.lang.Boolean](rows, ordinal, nulls, true) + TColumn.boolVal(new TBoolColumn(values, nulls)) + } + + protected def toTinyIntTColumn(rows: Seq[Seq[Any]], ordinal: Int): TColumn = { + val nulls = new java.util.BitSet() + val values = getOrSetAsNull[java.lang.Byte](rows, ordinal, nulls, 0.toByte) + TColumn.byteVal(new TByteColumn(values, nulls)) + } + + protected def toSmallIntTColumn(rows: Seq[Seq[Any]], ordinal: Int): TColumn = { + val nulls = new java.util.BitSet() + val values = getOrSetAsNull[java.lang.Short](rows, ordinal, nulls, 0.toShort) + TColumn.i16Val(new TI16Column(values, nulls)) + } + + protected def toIntegerTColumn(rows: Seq[Seq[Any]], ordinal: Int): TColumn = { + val nulls = new java.util.BitSet() + val values = getOrSetAsNull[java.lang.Integer](rows, ordinal, nulls, 0) + TColumn.i32Val(new TI32Column(values, nulls)) + } + + protected def toBigIntTColumn(rows: Seq[Seq[Any]], ordinal: Int): TColumn = { + val nulls = new java.util.BitSet() + val values = getOrSetAsNull[lang.Long](rows, ordinal, nulls, 0L) + TColumn.i64Val(new TI64Column(values, nulls)) + } + + protected def toRealTColumn(rows: Seq[Seq[Any]], ordinal: Int): TColumn = { + val nulls = new java.util.BitSet() + val values = getOrSetAsNull[lang.Float](rows, ordinal, nulls, 0.toFloat) + .asScala.map(n => java.lang.Double.valueOf(n.toString)).asJava + TColumn.doubleVal(new TDoubleColumn(values, nulls)) + } + + protected def toDoubleTColumn(rows: Seq[Seq[Any]], ordinal: Int): TColumn = { + val nulls = new java.util.BitSet() + val values = getOrSetAsNull[lang.Double](rows, ordinal, nulls, 0.toDouble) + TColumn.doubleVal(new TDoubleColumn(values, nulls)) + } + + protected def toCharTColumn(rows: Seq[Seq[Any]], ordinal: Int): TColumn = { + toVarcharTColumn(rows, ordinal) + } + + protected def toVarcharTColumn(rows: Seq[Seq[Any]], ordinal: Int): TColumn = { + val nulls = new java.util.BitSet() + val values = getOrSetAsNull[String](rows, ordinal, nulls, "") + TColumn.stringVal(new TStringColumn(values, nulls)) + } + + // ========================================================== + + protected def toBitTColumnValue(row: List[Any], ordinal: Int): TColumnValue = { + val boolValue = new TBoolValue + if (row(ordinal) != null) boolValue.setValue(row(ordinal).asInstanceOf[Boolean]) + TColumnValue.boolVal(boolValue) + } + + protected def toTinyIntTColumnValue(row: List[Any], ordinal: Int): TColumnValue = { + val byteValue = new TByteValue + if (row(ordinal) != null) byteValue.setValue(row(ordinal).asInstanceOf[Byte]) + TColumnValue.byteVal(byteValue) + } + + protected def toSmallIntTColumnValue(row: List[Any], ordinal: Int): TColumnValue = { + val tI16Value = new TI16Value + if (row(ordinal) != null) tI16Value.setValue(row(ordinal).asInstanceOf[Short]) + TColumnValue.i16Val(tI16Value) + } + + protected def toIntegerTColumnValue(row: List[Any], ordinal: Int): TColumnValue = { + val tI32Value = new TI32Value + if (row(ordinal) != null) tI32Value.setValue(row(ordinal).asInstanceOf[Int]) + TColumnValue.i32Val(tI32Value) + } + + protected def toBigIntTColumnValue(row: List[Any], ordinal: Int): TColumnValue = { + val tI64Value = new TI64Value + if (row(ordinal) != null) tI64Value.setValue(row(ordinal).asInstanceOf[Long]) + TColumnValue.i64Val(tI64Value) + } + + protected def toRealTColumnValue(row: List[Any], ordinal: Int): TColumnValue = { + val tDoubleValue = new TDoubleValue + if (row(ordinal) != null) { + val doubleValue = java.lang.Double.valueOf(row(ordinal).asInstanceOf[Float].toString) + tDoubleValue.setValue(doubleValue) + } + TColumnValue.doubleVal(tDoubleValue) + } + + protected def toDoubleTColumnValue(row: List[Any], ordinal: Int): TColumnValue = { + val tDoubleValue = new TDoubleValue + if (row(ordinal) != null) tDoubleValue.setValue(row(ordinal).asInstanceOf[Double]) + TColumnValue.doubleVal(tDoubleValue) + } + + protected def toCharTColumnValue(row: List[Any], ordinal: Int): TColumnValue = { + toVarcharTColumnValue(row, ordinal) + } + + protected def toVarcharTColumnValue(row: List[Any], ordinal: Int): TColumnValue = { + val tStringValue = new TStringValue + if (row(ordinal) != null) tStringValue.setValue(row(ordinal).asInstanceOf[String]) + TColumnValue.stringVal(tStringValue) + } + + protected def toDefaultTColumnValue( + row: List[Any], + ordinal: Int, + types: List[Column]): TColumnValue = { + val tStrValue = new TStringValue + if (row(ordinal) != null) { + tStrValue.setValue( + toHiveString(row(ordinal), types(ordinal).sqlType)) + } + TColumnValue.stringVal(tStrValue) + } + + protected def toHiveString(data: Any, sqlType: Int): String = { + (data, sqlType) match { + case (date: Date, Types.DATE) => + formatDate(date) + case (dateTime: LocalDateTime, Types.TIMESTAMP) => + formatLocalDateTime(dateTime) + case (decimal: java.math.BigDecimal, Types.DECIMAL) => + decimal.toPlainString + case (other, _) => + other.toString + } + } } diff --git a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/schema/SchemaHelper.scala b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/schema/SchemaHelper.scala index 3be3c7d4269..455eb2a9224 100644 --- a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/schema/SchemaHelper.scala +++ b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/schema/SchemaHelper.scala @@ -16,6 +16,7 @@ */ package org.apache.kyuubi.engine.jdbc.schema +import java.sql.Types import java.util.Collections import scala.collection.JavaConverters._ @@ -62,6 +63,97 @@ abstract class SchemaHelper { ret } - protected def toTTypeId(sqlType: Int): TTypeId + protected def toTTypeId(sqlType: Int): TTypeId = sqlType match { + case Types.BIT => + bitToTTypeId + case Types.TINYINT => + tinyIntToTTypeId + + case Types.SMALLINT => + smallIntToTTypeId + + case Types.INTEGER => + integerToTTypeId + + case Types.BIGINT => + bigintToTTypeId + + case Types.REAL => + realToTTypeId + + case Types.DOUBLE => + doubleToTTypeId + + case Types.CHAR => + charToTTypeId + + case Types.VARCHAR => + varcharToTTypeId + + case Types.DATE => + dateToTTypeId + + case Types.TIMESTAMP => + timestampToTTypeId + + case Types.DECIMAL => + decimalToTTypeId + + // TODO add more type support + case _ => + defaultToTTypeId + } + + protected def bitToTTypeId: TTypeId = { + TTypeId.BOOLEAN_TYPE + } + + protected def tinyIntToTTypeId: TTypeId = { + TTypeId.TINYINT_TYPE + } + + protected def smallIntToTTypeId: TTypeId = { + TTypeId.SMALLINT_TYPE + } + + protected def integerToTTypeId: TTypeId = { + TTypeId.INT_TYPE + } + + protected def bigintToTTypeId: TTypeId = { + TTypeId.BIGINT_TYPE + } + + protected def realToTTypeId: TTypeId = { + TTypeId.FLOAT_TYPE + } + + protected def doubleToTTypeId: TTypeId = { + TTypeId.DOUBLE_TYPE + } + + protected def charToTTypeId: TTypeId = { + TTypeId.STRING_TYPE + } + + protected def varcharToTTypeId: TTypeId = { + TTypeId.STRING_TYPE + } + + protected def dateToTTypeId: TTypeId = { + TTypeId.DATE_TYPE + } + + protected def timestampToTTypeId: TTypeId = { + TTypeId.TIMESTAMP_TYPE + } + + protected def decimalToTTypeId: TTypeId = { + TTypeId.DECIMAL_TYPE + } + + protected def defaultToTTypeId: TTypeId = { + TTypeId.STRING_TYPE + } }