Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[KYUUBI #6034] Kyuubi Server HA&ZK get server from serverHosts support more strategy #6213

Closed
Closed
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
7c0c605
fix_4186
davidyuan1223 Oct 10, 2023
1892f14
add common method to get session level config
davidyuan1223 Oct 26, 2023
6a04453
Kyuubi Server HA&ZK get server from serverHosts support more strategy
davidyuan1223 Mar 27, 2024
ee5a9ad
Kyuubi Server HA&ZK get server from serverHosts support more strategy
davidyuan1223 Mar 27, 2024
97b9597
Kyuubi Server HA&ZK get server from serverHosts support more strategy
davidyuan1223 Mar 27, 2024
73952f8
Kyuubi Server HA&ZK get server from serverHosts support more strategy
davidyuan1223 Mar 27, 2024
8eddd76
Add Strategy Unit Test Case and fix the polling strategy counter begi…
davidyuan1223 Mar 28, 2024
9ed2cac
remove test comment
davidyuan1223 Mar 28, 2024
c95382a
fix var not valid and counter getAndIncrement
davidyuan1223 Apr 14, 2024
7b0c1b8
fix var not valid and counter getAndIncrement
davidyuan1223 Apr 14, 2024
93f4a26
remove reset
davidyuan1223 Apr 14, 2024
09a84f1
remove the distirbuted lock
davidyuan1223 Apr 15, 2024
4655480
update
davidyuan1223 Apr 15, 2024
b4aeb3d
repeat testing on pollingChooseStrategy
bowenliang123 Oct 17, 2024
125c823
rename ChooseServerStrategy to ServerSelectStrategy
bowenliang123 Oct 23, 2024
228bf10
rename parameter zooKeeperStrategy to serverSelectStrategy
bowenliang123 Oct 23, 2024
8f8ca28
nit
bowenliang123 Oct 23, 2024
1ab79b4
strategyName
bowenliang123 Oct 23, 2024
b39c567
style
bowenliang123 Oct 23, 2024
265965e
polling
bowenliang123 Oct 23, 2024
e194ea6
remove ZooKeeperHiveClientException from method signature of chooseSe…
bowenliang123 Oct 23, 2024
7668f99
test name
bowenliang123 Oct 23, 2024
40f427a
rename StrategyFactory to StrategyFactoryServerStrategyFactory
bowenliang123 Oct 23, 2024
e94f9e9
nit
bowenliang123 Oct 23, 2024
6193394
nit
bowenliang123 Oct 23, 2024
8822ad4
repeat
bowenliang123 Oct 23, 2024
353f940
repeat
bowenliang123 Oct 23, 2024
961d3e9
rename ServerStrategyFactory to ServerSelectStrategyFactory
bowenliang123 Oct 23, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions kyuubi-ha/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,13 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi-hive-jdbc</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@ import org.apache.kyuubi.ha.HighAvailabilityConf._
import org.apache.kyuubi.ha.client._
import org.apache.kyuubi.ha.client.DiscoveryClientProvider.withDiscoveryClient
import org.apache.kyuubi.ha.client.zookeeper.ZookeeperClientProvider._
import org.apache.kyuubi.jdbc.hive.strategy.{ServerSelectStrategy, ServerStrategyFactory}
import org.apache.kyuubi.service._
import org.apache.kyuubi.shaded.curator.framework.CuratorFrameworkFactory
import org.apache.kyuubi.shaded.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
import org.apache.kyuubi.shaded.curator.retry.ExponentialBackoffRetry
import org.apache.kyuubi.shaded.zookeeper.ZooDefs
import org.apache.kyuubi.shaded.zookeeper.data.ACL
Expand Down Expand Up @@ -227,4 +228,41 @@ abstract class ZookeeperDiscoveryClientSuite extends DiscoveryClientTests
discovery.stop()
}
}

test("server select strategy with zookeeper") {
val zkClient = CuratorFrameworkFactory.builder()
.connectString(getConnectString)
.sessionTimeoutMs(5000)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build
zkClient.start()

val namespace = "kyuubi-strategy-test"
val testServerHosts = Seq(
"testNode1",
"testNode2",
"testNode3").asJava
// test polling strategy
val pollingStrategy = ServerStrategyFactory.createStrategy("polling")
1 to testServerHosts.size() * 2 foreach { _ =>
assertResult(f"testNode1")(pollingStrategy.chooseServer(testServerHosts, zkClient, namespace))
assertResult(f"testNode2")(pollingStrategy.chooseServer(testServerHosts, zkClient, namespace))
assertResult(f"testNode3")(pollingStrategy.chooseServer(testServerHosts, zkClient, namespace))
}

// test only get first serverHost strategy
val customStrategy = new ServerSelectStrategy {
override def chooseServer(
serverHosts: util.List[String],
zkClient: CuratorFramework,
namespace: String): String = serverHosts.get(0)
}
1 to testServerHosts.size() * 2 foreach { _ =>
assertResult("testNode1") {
customStrategy.chooseServer(testServerHosts, zkClient, namespace)
}
}

zkClient.close()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public class JdbcConnectionParams {
// Use ZooKeeper for indirection while using dynamic service discovery
static final String SERVICE_DISCOVERY_MODE_ZOOKEEPER = "zooKeeper";
static final String ZOOKEEPER_NAMESPACE = "zooKeeperNamespace";
static final String SERVER_SELECT_STRATEGY = "serverSelectStrategy";
// Default namespace value on ZooKeeper.
// This value is used if the param "zooKeeperNamespace" is not specified in the JDBC Uri.
static final String ZOOKEEPER_DEFAULT_NAMESPACE = "hiveserver2";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.kyuubi.jdbc.hive.strategy.ServerSelectStrategy;
import org.apache.kyuubi.jdbc.hive.strategy.ServerStrategyFactory;
import org.apache.kyuubi.jdbc.hive.strategy.zk.RandomSelectStrategy;
import org.apache.kyuubi.shaded.curator.framework.CuratorFramework;
import org.apache.kyuubi.shaded.curator.framework.CuratorFrameworkFactory;
import org.apache.kyuubi.shaded.curator.retry.ExponentialBackoffRetry;
Expand Down Expand Up @@ -111,7 +113,7 @@ static void configureConnParams(JdbcConnectionParams connParams)
try (CuratorFramework zooKeeperClient = getZkClient(connParams)) {
List<String> serverHosts = getServerHosts(connParams, zooKeeperClient);
// Now pick a server node randomly
String serverNode = serverHosts.get(ThreadLocalRandom.current().nextInt(serverHosts.size()));
String serverNode = chooseServer(connParams, serverHosts, zooKeeperClient);
updateParamsWithZKServerNode(connParams, zooKeeperClient, serverNode);
} catch (Exception e) {
throw new ZooKeeperHiveClientException(
Expand All @@ -120,6 +122,22 @@ static void configureConnParams(JdbcConnectionParams connParams)
// Close the client connection with ZooKeeper
}

private static String chooseServer(
JdbcConnectionParams connParams, List<String> serverHosts, CuratorFramework zkClient) {
String zooKeeperNamespace = getZooKeeperNamespace(connParams);
String strategyName =
connParams
.getSessionVars()
.getOrDefault(
JdbcConnectionParams.SERVER_SELECT_STRATEGY, RandomSelectStrategy.strategyName);
try {
ServerSelectStrategy strategy = ServerStrategyFactory.createStrategy(strategyName);
return strategy.chooseServer(serverHosts, zkClient, zooKeeperNamespace);
} catch (Exception e) {
throw new RuntimeException("Failed to choose server with strategy " + strategyName, e);
}
}

static List<JdbcConnectionParams> getDirectParamsList(JdbcConnectionParams connParams)
throws ZooKeeperHiveClientException {
try (CuratorFramework zooKeeperClient = getZkClient(connParams)) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.jdbc.hive.strategy;

import java.util.List;
import org.apache.kyuubi.shaded.curator.framework.CuratorFramework;

public interface ServerSelectStrategy {
String chooseServer(List<String> serverHosts, CuratorFramework zkClient, String namespace);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.jdbc.hive.strategy;

import java.lang.reflect.Constructor;
import org.apache.kyuubi.jdbc.hive.strategy.zk.PollingSelectStrategy;
import org.apache.kyuubi.jdbc.hive.strategy.zk.RandomSelectStrategy;

public class ServerStrategyFactory {
public static ServerSelectStrategy createStrategy(String strategyName) {
try {
switch (strategyName) {
case PollingSelectStrategy.strategyName:
return new PollingSelectStrategy();
case RandomSelectStrategy.strategyName:
return new RandomSelectStrategy();
default:
Class<?> clazz = Class.forName(strategyName);
if (ServerSelectStrategy.class.isAssignableFrom(clazz)) {
Constructor<? extends ServerSelectStrategy> constructor =
clazz.asSubclass(ServerSelectStrategy.class).getConstructor();
return constructor.newInstance();
} else {
throw new ClassNotFoundException(
"The loaded class does not implement ServerSelectStrategy");
}
}
} catch (Exception e) {
throw new RuntimeException("Failed to init server select strategy", e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.jdbc.hive.strategy.zk;

import java.util.List;
import org.apache.kyuubi.jdbc.hive.strategy.ServerSelectStrategy;
import org.apache.kyuubi.shaded.curator.framework.CuratorFramework;
import org.apache.kyuubi.shaded.curator.framework.recipes.atomic.AtomicValue;
import org.apache.kyuubi.shaded.curator.framework.recipes.atomic.DistributedAtomicInteger;
import org.apache.kyuubi.shaded.curator.retry.RetryForever;

public class PollingSelectStrategy implements ServerSelectStrategy {
public static final String strategyName = "polling";

private static final String COUNTER_PATH_PREFIX = "/";
private static final String COUNTER_PATH_SUFFIX = "-counter";

@Override
public String chooseServer(
List<String> serverHosts, CuratorFramework zkClient, String namespace) {
String counterPath = COUNTER_PATH_PREFIX + namespace + COUNTER_PATH_SUFFIX;
try {
return serverHosts.get(getAndIncrement(zkClient, counterPath) % serverHosts.size());
} catch (Exception e) {
throw new RuntimeException("Failed to choose server by polling select strategy", e);
}
}

private int getAndIncrement(CuratorFramework zkClient, String path) throws Exception {
DistributedAtomicInteger dai =
new DistributedAtomicInteger(zkClient, path, new RetryForever(3000));
AtomicValue<Integer> atomicVal;
do {
atomicVal = dai.add(1);
} while (atomicVal == null || !atomicVal.succeeded());
return atomicVal.preValue();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.jdbc.hive.strategy.zk;

import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.kyuubi.jdbc.hive.strategy.ServerSelectStrategy;
import org.apache.kyuubi.shaded.curator.framework.CuratorFramework;

public class RandomSelectStrategy implements ServerSelectStrategy {
public static final String strategyName = "random";

@Override
public String chooseServer(
List<String> serverHosts, CuratorFramework zkClient, String namespace) {
return serverHosts.get(ThreadLocalRandom.current().nextInt(serverHosts.size()));
}
}
Loading