Skip to content

Commit

Permalink
[KYUUBI #5384] kyuubi-spark-connector-hive supports Spark 3.5
Browse files Browse the repository at this point in the history
  • Loading branch information
wangjunbo committed Mar 6, 2024
1 parent a9b90c7 commit aa3fa02
Show file tree
Hide file tree
Showing 35 changed files with 4,994 additions and 0 deletions.
181 changes: 181 additions & 0 deletions extensions/spark/kyuubi-spark-3-5-connector-hive/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi-parent</artifactId>
<version>1.9.0-SNAPSHOT</version>
<relativePath>../../../pom.xml</relativePath>
</parent>

<artifactId>kyuubi-spark-3-5-connector-hive_${scala.binary.version}</artifactId>
<packaging>jar</packaging>
<name>Kyuubi Spark 3.5 Hive Connector</name>
<description>A Kyuubi hive connector based on Spark 3.5 V2 DataSource</description>
<url>https://kyuubi.apache.org/</url>

<dependencies>
<dependency>
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi-spark-connector-common_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_${scala.binary.version}</artifactId>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client-api</artifactId>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi-spark-connector-common_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.scalatestplus</groupId>
<artifactId>scalacheck-1-17_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-catalyst_${scala.binary.version}</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi-common_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi-common_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client-runtime</artifactId>
<scope>test</scope>
</dependency>

<!--
Spark requires `commons-collections` and `commons-io` but got them from transitive
dependencies of `hadoop-client`. As we are using Hadoop Shaded Client, we need add
them explicitly. See more details at SPARK-33212.
-->
<dependency>
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>jakarta.xml.bind</groupId>
<artifactId>jakarta.xml.bind-api</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<configuration>
<skip>true</skip>
</configuration>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<configuration>
<shadedArtifactAttached>false</shadedArtifactAttached>
<artifactSet>
<includes>
<include>com.google.guava:guava</include>
<include>org.apache.kyuubi:*</include>
</includes>
</artifactSet>
<relocations>
<relocation>
<pattern>com.google.common</pattern>
<shadedPattern>${kyuubi.shade.packageName}.com.google.common</shadedPattern>
<includes>
<include>com.google.common.**</include>
</includes>
</relocation>
</relocations>
</configuration>
<executions>
<execution>
<goals>
<goal>shade</goal>
</goals>
<phase>package</phase>
</execution>
</executions>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<executions>
<execution>
<id>prepare-test-jar</id>
<goals>
<goal>test-jar</goal>
</goals>
<phase>test-compile</phase>
</execution>
</executions>
</plugin>
</plugins>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

org.apache.kyuubi.spark.connector.hive.KyuubiHiveConnectorDelegationTokenProvider
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.spark.connector.hive

import java.util.concurrent.ConcurrentHashMap

import org.apache.hadoop.conf.Configuration
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper.HiveExternalCatalog

import org.apache.kyuubi.spark.connector.hive.KyuubiHiveConnectorConf.EXTERNAL_CATALOG_SHARE_POLICY

object ExternalCatalogManager {
private var manager: ExternalCatalogManager = _

def getOrCreate(sparkSession: SparkSession): ExternalCatalogManager = {
if (manager == null) {
synchronized {
if (manager == null) {
val conf = sparkSession.sessionState.conf
manager = ExternalCatalogSharePolicy.fromString(
conf.getConf(EXTERNAL_CATALOG_SHARE_POLICY)) match {
case OneForAllPolicy => new OneForAllPolicyManager()
case OneForOnePolicy => OneForOnePolicyManager
}
}
}
}
manager
}

private[kyuubi] def reset(): Unit = {
if (manager != null) {
manager.invalidateAll()
manager = null
}
}
}

abstract class ExternalCatalogManager {

def take(ticket: Ticket): HiveExternalCatalog

def invalidateAll(): Unit = {}
}

/**
* A [[OneForAllPolicy]] policy for the externalCatalog manager, which caches the externalCatalog
* according to the catalogName, aiming for only one of each catalog globally.
*/
class OneForAllPolicyManager() extends ExternalCatalogManager {
private val catalogCache = new ConcurrentHashMap[String, HiveExternalCatalog]()

override def take(ticket: Ticket): HiveExternalCatalog = {
catalogCache.computeIfAbsent(
ticket.catalogName,
_ => {
new HiveExternalCatalog(ticket.sparkConf, ticket.hadoopConf)
})
}
}

/**
* A [[OneForOnePolicy]] policy for the externalCatalog manager, It doesn't actually cache any
* externalCatalog, each session will have its own externalCatalog.
*/
object OneForOnePolicyManager extends ExternalCatalogManager {

override def take(ticket: Ticket): HiveExternalCatalog = {
new HiveExternalCatalog(ticket.sparkConf, ticket.hadoopConf)
}
}

case class Ticket(catalogName: String, sparkConf: SparkConf, hadoopConf: Configuration)
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.spark.connector.hive

import java.util.Locale

sealed trait ExternalCatalogSharePolicy {

/**
* String name of the share policy
*/
def name: String
}

/**
* Indicate to an external catalog is shared globally with the HiveCatalogs
* with the same catalogName.
*/
case object OneForAllPolicy extends ExternalCatalogSharePolicy { val name = "ONE_FOR_ALL" }

/**
* Indicate to an external catalog is used by only one HiveCatalog.
*/
case object OneForOnePolicy extends ExternalCatalogSharePolicy { val name = "ONE_FOR_ONE" }

object ExternalCatalogSharePolicy {

/**
* Returns the share policy from the given string.
*/
def fromString(policy: String): ExternalCatalogSharePolicy =
policy.toUpperCase(Locale.ROOT) match {
case OneForAllPolicy.name => OneForAllPolicy
case OneForOnePolicy.name => OneForOnePolicy
case _ => throw new IllegalArgumentException(s"Unknown share policy: $policy. Accepted " +
"policies are 'ONE_FOR_ONE', 'ONE_FOR_ALL'.")
}
}
Loading

0 comments on commit aa3fa02

Please sign in to comment.