Skip to content

Commit

Permalink
Add Apache Impala JDBC engine dialect
Browse files Browse the repository at this point in the history
  • Loading branch information
tigrulya-exe committed Feb 27, 2024
1 parent d516a5d commit 57d56d0
Show file tree
Hide file tree
Showing 29 changed files with 946 additions and 21 deletions.
4 changes: 2 additions & 2 deletions dev/dependencyList
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ arrow-format/12.0.0//arrow-format-12.0.0.jar
arrow-memory-core/12.0.0//arrow-memory-core-12.0.0.jar
arrow-memory-netty/12.0.0//arrow-memory-netty-12.0.0.jar
arrow-vector/12.0.0//arrow-vector-12.0.0.jar
checker-qual/3.31.0//checker-qual-3.31.0.jar
checker-qual/3.42.0//checker-qual-3.42.0.jar
classgraph/4.8.138//classgraph-4.8.138.jar
commons-codec/1.15//commons-codec-1.15.jar
commons-collections/3.2.2//commons-collections-3.2.2.jar
Expand Down Expand Up @@ -170,7 +170,7 @@ okio/1.15.0//okio-1.15.0.jar
osgi-resource-locator/1.0.3//osgi-resource-locator-1.0.3.jar
paranamer/2.8//paranamer-2.8.jar
perfmark-api/0.26.0//perfmark-api-0.26.0.jar
postgresql/42.6.0//postgresql-42.6.0.jar
postgresql/42.7.2//postgresql-42.7.2.jar
proto-google-common-protos/2.22.0//proto-google-common-protos-2.22.0.jar
protobuf-java-util/3.21.7//protobuf-java-util-3.21.7.jar
protobuf-java/3.21.7//protobuf-java-3.21.7.jar
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,4 @@ org.apache.kyuubi.engine.jdbc.mysql.MySQLConnectionProvider
org.apache.kyuubi.engine.jdbc.phoenix.PhoenixConnectionProvider
org.apache.kyuubi.engine.jdbc.postgresql.PostgreSQLConnectionProvider
org.apache.kyuubi.engine.jdbc.starrocks.StarRocksConnectionProvider
org.apache.kyuubi.engine.jdbc.impala.ImpalaConnectionProvider
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,4 @@ org.apache.kyuubi.engine.jdbc.dialect.MySQLDialect
org.apache.kyuubi.engine.jdbc.dialect.PhoenixDialect
org.apache.kyuubi.engine.jdbc.dialect.PostgreSQLDialect
org.apache.kyuubi.engine.jdbc.dialect.StarRocksDialect
org.apache.kyuubi.engine.jdbc.dialect.ImpalaDialect
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ abstract class JdbcConnectionProvider extends SupportServiceLoader with Logging

val driverClass: String

def canHandle(providerClass: String): Boolean
def canHandle(providerClass: String): Boolean = {
driverClass.equalsIgnoreCase(providerClass)
}

def getConnection(kyuubiConf: KyuubiConf): Connection = {
val properties = new Properties()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.jdbc.dialect

import java.util

import org.apache.commons.lang3.StringUtils

import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.engine.jdbc.impala.{ImpalaSchemaHelper, ImpalaTRowSetGenerator}
import org.apache.kyuubi.engine.jdbc.schema.{JdbcTRowSetGenerator, SchemaHelper}
import org.apache.kyuubi.session.Session

class ImpalaDialect extends JdbcDialect {

override def getTablesQuery(
catalog: String,
schema: String,
tableName: String,
tableTypes: util.List[String]): String = {
if (isPattern(schema)) {
throw KyuubiSQLException.featureNotSupported("Pattern-like schema names not supported")
}

val query = new StringBuilder("show tables ")

if (StringUtils.isNotEmpty(schema) && !isWildcardSetByKyuubi(schema)) {
query.append(s"in $schema ")
}

if (StringUtils.isNotEmpty(tableName)) {
query.append(s"like '${toImpalaRegex(tableName)}'")
}

query.toString()
}

override def getColumnsQuery(
session: Session,
catalogName: String,
schemaName: String,
tableName: String,
columnName: String): String = {
if (isPattern(schemaName)) {
throw KyuubiSQLException.featureNotSupported("Pattern-like schema names not supported")
}

if (isPattern(tableName)) {
throw KyuubiSQLException.featureNotSupported("Pattern-like table names not supported")
}

val query = new StringBuilder("show column stats ")

if (StringUtils.isNotEmpty(schemaName) && !isWildcardSetByKyuubi(schemaName)) {
query.append(s"$schemaName.")
}

query.append(tableName)
query.toString()
}

override def getTRowSetGenerator(): JdbcTRowSetGenerator = new ImpalaTRowSetGenerator

override def getSchemaHelper(): SchemaHelper = new ImpalaSchemaHelper

override def name(): String = "impala"

private def isPattern(value: String): Boolean = {
value != null && !isWildcardSetByKyuubi(value) && value.contains("*")
}

private def isWildcardSetByKyuubi(pattern: String): Boolean = pattern == "%"

private def toImpalaRegex(pattern: String): String = {
pattern.replace("%", "*")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.jdbc.impala

import org.apache.kyuubi.engine.jdbc.connection.JdbcConnectionProvider

class ImpalaConnectionProvider extends JdbcConnectionProvider {

override val name: String = classOf[ImpalaConnectionProvider].getName

override val driverClass: String = ImpalaConnectionProvider.driverClass
}

object ImpalaConnectionProvider {
// we should use kyuubi hive driver instead of original hive one in order
// to get fixed getMoreResults()
val driverClass: String = "org.apache.kyuubi.jdbc.KyuubiHiveDriver"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.jdbc.impala

import org.apache.kyuubi.engine.jdbc.schema.SchemaHelper
import org.apache.kyuubi.shaded.hive.service.rpc.thrift.TTypeId

class ImpalaSchemaHelper extends SchemaHelper {
override protected def floatToTTypeId: TTypeId = {
TTypeId.DOUBLE_TYPE
}

override protected def realToTTypeId: TTypeId = {
TTypeId.DOUBLE_TYPE
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.jdbc.impala

import org.apache.kyuubi.engine.jdbc.schema.DefaultJdbcTRowSetGenerator
import org.apache.kyuubi.shaded.hive.service.rpc.thrift.{TColumn, TColumnValue}

class ImpalaTRowSetGenerator extends DefaultJdbcTRowSetGenerator {
override def toFloatTColumn(rows: Seq[Seq[_]], ordinal: Int): TColumn =
asDoubleTColumn(rows, ordinal)

override def toFloatTColumnValue(row: Seq[_], ordinal: Int): TColumnValue =
asDoubleTColumnValue(row, ordinal)

override def toRealTColumn(rows: Seq[Seq[_]], ordinal: Int): TColumn =
asDoubleTColumn(rows, ordinal)

override def toRealTColumnValue(row: Seq[_], ordinal: Int): TColumnValue =
asDoubleTColumnValue(row, ordinal)
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,6 @@ class MySQL8ConnectionProvider extends JdbcConnectionProvider {
override val name: String = classOf[MySQL8ConnectionProvider].getName

override val driverClass: String = MySQL8ConnectionProvider.driverClass

override def canHandle(providerClass: String): Boolean = {
driverClass.equalsIgnoreCase(providerClass)
}

}

object MySQL8ConnectionProvider {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,9 @@ class ExecuteStatement(
})
} else {
warn(s"Execute in full collect mode")
jdbcStatement.closeOnCompletion()
new ArrayFetchIterator(resultSetWrapper.toArray())
val arrayIter = new ArrayFetchIterator(resultSetWrapper.toArray())
jdbcStatement.close()
arrayIter
}
} else {
schema = Schema(List[Column](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,4 @@ class PhoenixConnectionProvider extends JdbcConnectionProvider {
override val name: String = classOf[PhoenixConnectionProvider].getName

override val driverClass: String = "org.apache.phoenix.queryserver.client.Driver"

override def canHandle(providerClass: String): Boolean = {
driverClass.equalsIgnoreCase(providerClass)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,4 @@ class PostgreSQLConnectionProvider extends JdbcConnectionProvider {

override val driverClass: String = "org.postgresql.Driver"

override def canHandle(providerClass: String): Boolean = {
driverClass.equalsIgnoreCase(providerClass)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,11 @@ class DefaultJdbcTRowSetGenerator extends JdbcTRowSetGenerator {
case INTEGER => toIntegerTColumn(rows, ordinal)
case BIGINT => toBigIntTColumn(rows, ordinal)
case REAL => toRealTColumn(rows, ordinal)
case FLOAT => toFloatTColumn(rows, ordinal)
case DOUBLE => toDoubleTColumn(rows, ordinal)
case CHAR => toCharTColumn(rows, ordinal)
case VARCHAR => toVarcharTColumn(rows, ordinal)
case BOOLEAN => toBooleanTColumn(rows, ordinal)
case _ => toDefaultTColumn(rows, ordinal, sqlType)
}

Expand All @@ -47,9 +49,11 @@ class DefaultJdbcTRowSetGenerator extends JdbcTRowSetGenerator {
case INTEGER => toIntegerTColumnValue(row, ordinal)
case BIGINT => toBigIntTColumnValue(row, ordinal)
case REAL => toRealTColumnValue(row, ordinal)
case FLOAT => toFloatTColumnValue(row, ordinal)
case DOUBLE => toDoubleTColumnValue(row, ordinal)
case CHAR => toCharTColumnValue(row, ordinal)
case VARCHAR => toVarcharTColumnValue(row, ordinal)
case BOOLEAN => toBooleanTColumnValue(row, ordinal)
case otherType => toDefaultTColumnValue(row, ordinal, otherType)
}
}
Expand All @@ -63,6 +67,9 @@ class DefaultJdbcTRowSetGenerator extends JdbcTRowSetGenerator {
def toBitTColumn(rows: Seq[Seq[_]], ordinal: Int): TColumn =
asBooleanTColumn(rows, ordinal)

def toBooleanTColumn(rows: Seq[Seq[_]], ordinal: Int): TColumn =
asBooleanTColumn(rows, ordinal)

def toTinyIntTColumn(rows: Seq[Seq[_]], ordinal: Int): TColumn =
asShortTColumn(rows, ordinal)

Expand All @@ -78,6 +85,9 @@ class DefaultJdbcTRowSetGenerator extends JdbcTRowSetGenerator {
def toRealTColumn(rows: Seq[Seq[_]], ordinal: Int): TColumn =
asFloatTColumn(rows, ordinal)

def toFloatTColumn(rows: Seq[Seq[_]], ordinal: Int): TColumn =
asFloatTColumn(rows, ordinal)

def toDoubleTColumn(rows: Seq[Seq[_]], ordinal: Int): TColumn =
asDoubleTColumn(rows, ordinal)

Expand All @@ -92,6 +102,9 @@ class DefaultJdbcTRowSetGenerator extends JdbcTRowSetGenerator {
def toBitTColumnValue(row: Seq[_], ordinal: Int): TColumnValue =
asBooleanTColumnValue(row, ordinal)

def toBooleanTColumnValue(row: Seq[_], ordinal: Int): TColumnValue =
asBooleanTColumnValue(row, ordinal)

def toTinyIntTColumnValue(row: Seq[_], ordinal: Int): TColumnValue =
asShortTColumnValue(row, ordinal)

Expand All @@ -107,6 +120,9 @@ class DefaultJdbcTRowSetGenerator extends JdbcTRowSetGenerator {
def toRealTColumnValue(row: Seq[_], ordinal: Int): TColumnValue =
asFloatTColumnValue(row, ordinal)

def toFloatTColumnValue(row: Seq[_], ordinal: Int): TColumnValue =
asFloatTColumnValue(row, ordinal)

def toDoubleTColumnValue(row: Seq[_], ordinal: Int): TColumnValue =
asDoubleTColumnValue(row, ordinal)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,12 @@ abstract class SchemaHelper {
case Types.DECIMAL =>
decimalToTTypeId

case Types.FLOAT =>
floatToTTypeId

case Types.BOOLEAN =>
booleanToTTypeId

// TODO add more type support
case _ =>
defaultToTTypeId
Expand All @@ -109,6 +115,10 @@ abstract class SchemaHelper {
TTypeId.BOOLEAN_TYPE
}

protected def booleanToTTypeId: TTypeId = {
TTypeId.BOOLEAN_TYPE
}

protected def tinyIntToTTypeId: TTypeId = {
TTypeId.TINYINT_TYPE
}
Expand All @@ -129,6 +139,10 @@ abstract class SchemaHelper {
TTypeId.FLOAT_TYPE
}

protected def floatToTTypeId: TTypeId = {
TTypeId.FLOAT_TYPE
}

protected def doubleToTTypeId: TTypeId = {
TTypeId.DOUBLE_TYPE
}
Expand Down
Loading

0 comments on commit 57d56d0

Please sign in to comment.