Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Various Kinds of Consistent Hash #17817

Merged
merged 6 commits into from
Jan 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,25 +39,18 @@
*/
public class ConsistentHashPolicy implements WorkerLocationPolicy {
private static final Logger LOG = LoggerFactory.getLogger(ConsistentHashPolicy.class);
private final ConsistentHashProvider mHashProvider =
new ConsistentHashProvider(100, Constants.SECOND_MS);
/**
* This is the number of virtual nodes in the consistent hashing algorithm.
* In a consistent hashing algorithm, on membership changes, some virtual nodes are
* re-distributed instead of rebuilding the whole hash table.
* This guarantees the hash table is changed only in a minimal.
* In order to achieve that, the number of virtual nodes should be X times the physical nodes
* in the cluster, where X is a balance between redistribution granularity and size.
*/
private final int mNumVirtualNodes;
private final ConsistentHashProvider mHashProvider;

/**
* Constructs a new {@link ConsistentHashPolicy}.
*
* @param conf the configuration used by the policy
*/
public ConsistentHashPolicy(AlluxioConfiguration conf) {
mNumVirtualNodes = conf.getInt(PropertyKey.USER_CONSISTENT_HASH_VIRTUAL_NODE_COUNT_PER_WORKER);
LOG.debug("%s is chosen for user worker hash algorithm",
conf.getString(PropertyKey.USER_WORKER_SELECTION_POLICY));
mHashProvider = new ConsistentHashProvider(100, Constants.SECOND_MS,
conf.getInt(PropertyKey.USER_CONSISTENT_HASH_VIRTUAL_NODE_COUNT_PER_WORKER));
}

@Override
Expand All @@ -69,7 +62,7 @@ public List<BlockWorkerInfo> getPreferredWorkers(WorkerClusterView workerCluster
workerClusterView.size(), count));
}
Set<WorkerIdentity> workerIdentities = workerClusterView.workerIds();
mHashProvider.refresh(workerIdentities, mNumVirtualNodes);
mHashProvider.refresh(workerIdentities);
List<WorkerIdentity> workers = mHashProvider.getMultiple(fileId, count);
if (workers.size() != count) {
throw new ResourceExhaustedException(String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,15 +91,27 @@ public class ConsistentHashProvider {
*/
private final Object mInitLock = new Object();

/**
* This is the number of virtual nodes in the consistent hashing algorithm.
* In a consistent hashing algorithm, on membership changes, some virtual nodes are
* re-distributed instead of rebuilding the whole hash table.
* This guarantees the hash table is changed only in a minimal.
* In order to achieve that, the number of virtual nodes should be X times the physical nodes
* in the cluster, where X is a balance between redistribution granularity and size.
*/
private final int mNumVirtualNodes;

/**
* Constructor.
*
* @param maxAttempts max attempts to rehash
* @param workerListTtlMs interval between retries
* @param numVirtualNodes number of virtual nodes
*/
public ConsistentHashProvider(int maxAttempts, long workerListTtlMs) {
public ConsistentHashProvider(int maxAttempts, long workerListTtlMs, int numVirtualNodes) {
mMaxAttempts = maxAttempts;
mWorkerInfoUpdateIntervalNs = workerListTtlMs * Constants.MS_NANO;
mNumVirtualNodes = numVirtualNodes;
}

/**
Expand Down Expand Up @@ -130,12 +142,11 @@ public List<WorkerIdentity> getMultiple(String key, int count) {
* others will not change the internal state of the hash provider.
*
* @param workers the up-to-date worker list
* @param numVirtualNodes the number of virtual nodes used by consistent hashing
*/
public void refresh(Set<WorkerIdentity> workers, int numVirtualNodes) {
public void refresh(Set<WorkerIdentity> workers) {
Preconditions.checkArgument(!workers.isEmpty(),
"cannot refresh hash provider with empty worker list");
maybeInitialize(workers, numVirtualNodes);
maybeInitialize(workers);
// check if the worker list has expired
if (shouldRebuildActiveNodesMapExclusively()) {
// thread safety is valid provided that build() takes less than
Expand All @@ -144,7 +155,7 @@ public void refresh(Set<WorkerIdentity> workers, int numVirtualNodes) {
Set<WorkerIdentity> lastWorkerIds = mLastWorkers.get();
if (!workers.equals(lastWorkerIds)) {
Set<WorkerIdentity> newWorkerIds = ImmutableSet.copyOf(workers);
NavigableMap<Integer, WorkerIdentity> nodes = build(newWorkerIds, numVirtualNodes);
NavigableMap<Integer, WorkerIdentity> nodes = build(newWorkerIds, mNumVirtualNodes);
mActiveNodesByConsistentHashing = nodes;
mLastWorkers.set(newWorkerIds);
mUpdateCount.increment();
Expand Down Expand Up @@ -176,14 +187,14 @@ private boolean shouldRebuildActiveNodesMapExclusively() {
* Only one caller gets to initialize the map while all others are blocked.
* After the initialization, the map must not be null.
*/
private void maybeInitialize(Set<WorkerIdentity> workers, int numVirtualNodes) {
private void maybeInitialize(Set<WorkerIdentity> workers) {
if (mActiveNodesByConsistentHashing == null) {
synchronized (mInitLock) {
// only one thread should reach here
// test again to skip re-initialization
if (mActiveNodesByConsistentHashing == null) {
Set<WorkerIdentity> workerIdentities = ImmutableSet.copyOf(workers);
mActiveNodesByConsistentHashing = build(workerIdentities, numVirtualNodes);
mActiveNodesByConsistentHashing = build(workerIdentities, mNumVirtualNodes);
mLastWorkers.set(workerIdentities);
mLastUpdatedTimestamp.set(System.nanoTime());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* The Alluxio Open Foundation licenses this work under the Apache License, version 2.0
* (the "License"). You may not use this work except in compliance with the License, which is
* available at www.apache.org/licenses/LICENSE-2.0
*
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied, as more fully set forth in the License.
*
* See the NOTICE file distributed with this work for information regarding copyright ownership.
*/

package alluxio.client.file.dora;

import alluxio.Constants;
import alluxio.client.block.BlockWorkerInfo;
import alluxio.conf.AlluxioConfiguration;
import alluxio.conf.PropertyKey;
import alluxio.exception.status.ResourceExhaustedException;
import alluxio.membership.WorkerClusterView;
import alluxio.wire.WorkerIdentity;
import alluxio.wire.WorkerInfo;
import alluxio.wire.WorkerState;

import com.google.common.collect.ImmutableList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.Optional;
import java.util.Set;

/**
* An impl of Jump Consistent Hash Policy.
*
* A policy where a file path is matched to worker(s) by Jump Consistent Hashing Algorithm.
* The algorithm is described in this paper:
* https://arxiv.org/pdf/1406.2294.pdf
*
* The disadvantage of this algorithm is that
* buckets can only be inserted and deleted at the head and tail of the worker list
* to maintain hash consistency,
* that is, nodes can only be added and deleted at the head and tail of the worker list.
*/
public class JumpHashPolicy implements WorkerLocationPolicy {
private static final Logger LOG = LoggerFactory.getLogger(JumpHashPolicy.class);
private final JumpHashProvider mHashProvider;

/**
* Constructs a new {@link JumpHashPolicy}.
*
* @param conf the configuration used by the policy
*/
public JumpHashPolicy(AlluxioConfiguration conf) {
LOG.debug("%s is chosen for user worker hash algorithm",
conf.getString(PropertyKey.USER_WORKER_SELECTION_POLICY));
mHashProvider = new JumpHashProvider(100, Constants.SECOND_MS);
}

@Override
public List<BlockWorkerInfo> getPreferredWorkers(WorkerClusterView workerClusterView,
String fileId, int count) throws ResourceExhaustedException {
if (workerClusterView.size() < count) {
throw new ResourceExhaustedException(String.format(
"Not enough workers in the cluster %d workers in the cluster but %d required",
workerClusterView.size(), count));
}
Set<WorkerIdentity> workerIdentities = workerClusterView.workerIds();
mHashProvider.refresh(workerIdentities);
List<WorkerIdentity> workers = mHashProvider.getMultiple(fileId, count);
if (workers.size() != count) {
throw new ResourceExhaustedException(String.format(
"Found %d workers from the hash ring but %d required", workers.size(), count));
}
ImmutableList.Builder<BlockWorkerInfo> builder = ImmutableList.builder();
for (WorkerIdentity worker : workers) {
Optional<WorkerInfo> optionalWorkerInfo = workerClusterView.getWorkerById(worker);
final WorkerInfo workerInfo;
if (optionalWorkerInfo.isPresent()) {
workerInfo = optionalWorkerInfo.get();
} else {
// the worker returned by the policy does not exist in the cluster view
// supplied by the client.
// this can happen when the membership changes and some callers fail to update
// to the latest worker cluster view.
// in this case, just skip this worker
LOG.debug("Inconsistency between caller's view of cluster and that of "
+ "the consistent hash policy's: worker {} selected by policy does not exist in "
+ "caller's view {}. Skipping this worker.",
worker, workerClusterView);
continue;
}

BlockWorkerInfo blockWorkerInfo = new BlockWorkerInfo(
worker, workerInfo.getAddress(), workerInfo.getCapacityBytes(),
workerInfo.getUsedBytes(), workerInfo.getState() == WorkerState.LIVE
);
builder.add(blockWorkerInfo);
}
List<BlockWorkerInfo> infos = builder.build();
return infos;
}
}
Loading
Loading