Skip to content

Commit 8980e15

Browse files
committed
Weighted load balancing policy
The weighted load balancing policy uses a number of factors as weights along with the number of in-flight requests for each host to score and then sort the list of known live hosts.
1 parent ea2e475 commit 8980e15

File tree

4 files changed

+374
-0
lines changed

4 files changed

+374
-0
lines changed

core/src/main/java/com/datastax/oss/driver/api/core/config/DefaultDriverOption.java

+35
Original file line numberDiff line numberDiff line change
@@ -853,6 +853,41 @@ public enum DefaultDriverOption implements DriverOption {
853853
LOAD_BALANCING_DC_FAILOVER_ALLOW_FOR_LOCAL_CONSISTENCY_LEVELS(
854854
"advanced.load-balancing-policy.dc-failover.allow-for-local-consistency-levels"),
855855

856+
/**
857+
* How many items in the plan to score.
858+
*
859+
* <p>Value-Type: int
860+
*/
861+
LOAD_BALANCING_SCORED_PLAN_SIZE("advanced.load-balancing-policy.weighted.scored-plan-size"),
862+
863+
/**
864+
* Weight to apply when load balancing for a non-rack node.
865+
*
866+
* <p>Value-Type: double
867+
*/
868+
LOAD_BALANCING_WEIGHT_NON_RACK("advanced.load-balancing-policy.weighted.non-rack"),
869+
870+
/**
871+
* Weight to apply when load balancing for a non-replica node.
872+
*
873+
* <p>Value-Type: double
874+
*/
875+
LOAD_BALANCING_WEIGHT_NON_REPLICA("advanced.load-balancing-policy.weighted.non-replica"),
876+
877+
/**
878+
* Weight to apply when load balancing for a node that is still starting up.
879+
*
880+
* <p>Value-Type: double
881+
*/
882+
LOAD_BALANCING_WEIGHT_STARTING("advanced.load-balancing-policy.weighted.starting"),
883+
884+
/**
885+
* Weight to apply when load balancing for an unhealthy node.
886+
*
887+
* <p>Value-Type: double
888+
*/
889+
LOAD_BALANCING_WEIGHT_UNHEALTHY("advanced.load-balancing-policy.weighted.unhealthy"),
890+
856891
/**
857892
* The classname of the desired {@code MetricIdGenerator} implementation.
858893
*

core/src/main/java/com/datastax/oss/driver/api/core/config/TypedDriverOption.java

+30
Original file line numberDiff line numberDiff line change
@@ -891,6 +891,36 @@ public String toString() {
891891
new TypedDriverOption<>(
892892
DefaultDriverOption.LOAD_BALANCING_DC_FAILOVER_ALLOW_FOR_LOCAL_CONSISTENCY_LEVELS,
893893
GenericType.BOOLEAN);
894+
/** How many items in the plan to score. */
895+
public static final TypedDriverOption<Integer>
896+
LOAD_BALANCING_SCORED_PLAN_SIZE =
897+
new TypedDriverOption<>(
898+
DefaultDriverOption.LOAD_BALANCING_SCORED_PLAN_SIZE,
899+
GenericType.INTEGER);
900+
/** Weight to apply when load balancing for a non-rack node. */
901+
public static final TypedDriverOption<Double>
902+
LOAD_BALANCING_WEIGHT_NON_RACK =
903+
new TypedDriverOption<>(
904+
DefaultDriverOption.LOAD_BALANCING_WEIGHT_NON_RACK,
905+
GenericType.DOUBLE);
906+
/** Weight to apply when load balancing for a non-replica node. */
907+
public static final TypedDriverOption<Double>
908+
LOAD_BALANCING_WEIGHT_NON_REPLICA =
909+
new TypedDriverOption<>(
910+
DefaultDriverOption.LOAD_BALANCING_WEIGHT_NON_REPLICA,
911+
GenericType.DOUBLE);
912+
/** Weight to apply when load balancing for a node that is still starting up. */
913+
public static final TypedDriverOption<Double>
914+
LOAD_BALANCING_WEIGHT_STARTING =
915+
new TypedDriverOption<>(
916+
DefaultDriverOption.LOAD_BALANCING_WEIGHT_STARTING,
917+
GenericType.DOUBLE);
918+
/** Weight to apply when load balancing for an unhealthy node. */
919+
public static final TypedDriverOption<Double>
920+
LOAD_BALANCING_WEIGHT_UNHEALTHY =
921+
new TypedDriverOption<>(
922+
DefaultDriverOption.LOAD_BALANCING_WEIGHT_UNHEALTHY,
923+
GenericType.DOUBLE);
894924

895925
private static Iterable<TypedDriverOption<?>> introspectBuiltInValues() {
896926
try {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,265 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package com.datastax.oss.driver.internal.core.loadbalancing;
19+
20+
import static java.util.concurrent.TimeUnit.SECONDS;
21+
22+
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
23+
import com.datastax.oss.driver.api.core.context.DriverContext;
24+
import com.datastax.oss.driver.api.core.metadata.Node;
25+
import com.datastax.oss.driver.api.core.session.Request;
26+
import com.datastax.oss.driver.api.core.session.Session;
27+
import com.datastax.oss.driver.internal.core.util.ArrayUtils;
28+
import com.datastax.oss.driver.internal.core.util.collection.QueryPlan;
29+
import com.datastax.oss.driver.internal.core.util.collection.SimpleQueryPlan;
30+
import edu.umd.cs.findbugs.annotations.NonNull;
31+
import java.util.ArrayList;
32+
import java.util.Comparator;
33+
import java.util.List;
34+
import java.util.Objects;
35+
import java.util.Queue;
36+
import java.util.Random;
37+
import java.util.Set;
38+
import java.util.concurrent.ThreadLocalRandom;
39+
import org.slf4j.Logger;
40+
import org.slf4j.LoggerFactory;
41+
42+
/**
43+
* A load balancing policy that optimally balances between sending load to local token holder,
44+
* rack replicas, and local datacenter replicas (in that order).
45+
*
46+
* The default weights are good for the vast majority of use cases, but you can tweak them to get different behavior.
47+
*/
48+
public class WeightedLoadBalancingPolicy extends DefaultLoadBalancingPolicy {
49+
private static final Logger LOG =
50+
LoggerFactory.getLogger(WeightedLoadBalancingPolicy.class);
51+
// Each client will randomly skew so traffic is introduced gradually to a newly up replica
52+
// Each client will start sending at a period between 15s and 30, and they will gradually
53+
// increase load over the next 15 seconds.
54+
private static final long DELAY_TRAFFIC_SKEW_MILLIS = SECONDS.toMillis(15);
55+
private static final long DELAY_TRAFFIC_MILLIS =
56+
DELAY_TRAFFIC_SKEW_MILLIS + ThreadLocalRandom.current().nextLong(DELAY_TRAFFIC_SKEW_MILLIS);
57+
58+
// By default we will only score this many nodes, the rest will get added on without scoring.
59+
// We don't usually need to score every single node if there are more than a few.
60+
static final int DEFAULT_SCORED_PLAN_SIZE = 8;
61+
// Default multiplicative weights. Think of this like "How much concurrency must occur
62+
// before I fail off this node to the next". Note that these defaults are intentionally
63+
// meant to shed load to unloaded rack coordinators when a replica set is all
64+
// relatively heavily loaded (specifically 3x as loaded).
65+
static final double DEFAULT_WEIGHT_NON_RACK = 4.0;
66+
static final double DEFAULT_WEIGHT_NON_REPLICA = 12.0;
67+
static final double DEFAULT_WEIGHT_STARTING = 16.0;
68+
static final double DEFAULT_WEIGHT_UNHEALTHY = 64.0;
69+
70+
private final int planSize;
71+
private final double weightNonRack;
72+
private final double weightNonReplica;
73+
private final double weightStarting;
74+
private final double weightUnhealthy;
75+
76+
public WeightedLoadBalancingPolicy(
77+
@NonNull DriverContext context,
78+
@NonNull String profileName) {
79+
super(context, profileName);
80+
this.planSize = profile.getInt(DefaultDriverOption.LOAD_BALANCING_SCORED_PLAN_SIZE, DEFAULT_SCORED_PLAN_SIZE);
81+
// Choices of weights will change how this load balancer prefers endpoints.
82+
// The weight is relative to the outstanding concurrency.
83+
this.weightNonRack = profile.getDouble(
84+
DefaultDriverOption.LOAD_BALANCING_WEIGHT_NON_RACK, DEFAULT_WEIGHT_NON_RACK);
85+
this.weightNonReplica = profile.getDouble(
86+
DefaultDriverOption.LOAD_BALANCING_WEIGHT_NON_REPLICA, DEFAULT_WEIGHT_NON_REPLICA);
87+
this.weightStarting = profile.getDouble(
88+
DefaultDriverOption.LOAD_BALANCING_WEIGHT_STARTING, DEFAULT_WEIGHT_STARTING);
89+
this.weightUnhealthy = profile.getDouble(
90+
DefaultDriverOption.LOAD_BALANCING_WEIGHT_UNHEALTHY, DEFAULT_WEIGHT_UNHEALTHY);
91+
}
92+
93+
@NonNull
94+
@Override
95+
public Queue<Node> newQueryPlan(Request request, Session session) {
96+
if (session == null) {
97+
return super.newQueryPlan(request, null);
98+
}
99+
100+
// Take a copy of nodes and reference to replicas since the node map is concurrent
101+
Set<Node> dcNodeSet = getLiveNodes().dc(getLocalDatacenter());
102+
Set<Node> replicaSet = getReplicas(request, session);
103+
104+
long nowNanos = nanoTime();
105+
long nowMillis = milliTime();
106+
107+
// collect known replica nodes
108+
List<NodeScore> nodeScores = new ArrayList<>(this.planSize);
109+
for (Node replicaNode : replicaSet) {
110+
if (dcNodeSet.contains(replicaNode)) {
111+
nodeScores.add(new NodeScore(replicaNode,
112+
getWeightedScore(replicaNode, session, nowMillis, nowNanos, true)));
113+
114+
if (nodeScores.size() == this.planSize) {
115+
break; // TODO (akhaku) add the rest of the nodes once we port the tests to OSS
116+
}
117+
}
118+
}
119+
120+
// collect any non-replicas, if we need to and there are some available
121+
if (nodeScores.size() < this.planSize && nodeScores.size() < dcNodeSet.size()) {
122+
Random rand = getRandom();
123+
final Node[] dcNodes = dcNodeSet.toArray(new Node[0]);
124+
125+
for (int i = 0; i < dcNodes.length; i++) {
126+
// pick a random target; shuffle it up front, so we don't revisit
127+
int nextIndex = i + rand.nextInt(dcNodes.length - i);
128+
ArrayUtils.swap(dcNodes, i, nextIndex);
129+
Node dcNode = dcNodes[i];
130+
131+
// skip replicas; they were already inserted
132+
// otherwise, found a valid node: score it
133+
if (!replicaSet.contains(dcNode)) {
134+
nodeScores.add(new NodeScore(dcNode,
135+
getWeightedScore(dcNode, session, nowMillis, nowNanos, false)));
136+
137+
// if we scored, we might by now have already scored enough of what we need
138+
if (nodeScores.size() == this.planSize || nodeScores.size() == dcNodes.length) {
139+
break;
140+
}
141+
}
142+
}
143+
144+
// by now we've scored everything we need to meet planSize, or if not, at least everything available
145+
}
146+
147+
// At this point we have a small, typically 8 element array containing all local
148+
// datacenter replicas and potentially some random choices from the rest of the datacenter.
149+
//
150+
// We now rank nodes by a score function that takes into account outstanding requests weighted
151+
// by replica status, rack placement, uptime, and health status. In general, we expect to
152+
// see the following order
153+
// 1. Rack replicas
154+
// 2. Datacenter replicas
155+
// 3. Rack nodes
156+
// 4. Datacenter nodes
157+
// We expect these orderings to move around when nodes are overloaded. For example if the
158+
// local zone replica has too much load we will failover to other replicas. If those
159+
// replicas are too slow we will failover to other rack nodes.
160+
161+
// sort, extract, convert to query plan
162+
nodeScores.sort(Comparator.comparingDouble(NodeScore::getScore));
163+
Node[] candidate = new Node[nodeScores.size()];
164+
for (int i = 0; i < candidate.length; i++) {
165+
candidate[i] = nodeScores.get(i).getNode();
166+
}
167+
168+
QueryPlan queryPlan = candidate.length == 0 ? QueryPlan.EMPTY : new SimpleQueryPlan((Object[]) candidate);
169+
return maybeAddDcFailover(request, queryPlan);
170+
}
171+
172+
protected String getLocalRack() {
173+
return ""; // TODO (akhaku) internally we passed it through the context, for OSS perhaps something like the local DC helper?
174+
}
175+
176+
protected boolean inRack(Node node) {
177+
if (node == null || node.getRack() == null) return false;
178+
return node.getRack().equals(this.getLocalRack());
179+
}
180+
181+
protected double getWeightedScore(Node node, Session session, long nowMillis, long nowNanos, boolean isReplica) {
182+
int base = Math.min(32768, 1 + getInFlight(node, session));
183+
double weight = 1.0;
184+
185+
if (!inRack(node)) weight *= this.weightNonRack; // 4.0
186+
if (!isReplica) weight *= this.weightNonReplica; // 12.0
187+
if (isUnhealthy(node, session, nowNanos)) weight *= this.weightUnhealthy; // 64.0
188+
189+
// We want to gradually ramp up traffic, shedding heavily at first and then allowing it back
190+
// in gradually. Note:
191+
//
192+
// 1. We cannot use nowNanos for this since node.getUpSinceMillis uses
193+
// System.currentTimeMillis (which may have no relation to nano time).
194+
// 2. getUpSinceMillis might return 0 or -1, in either case don't consider it freshly up.
195+
// 3. When a client starts up everything will appear to be freshly up, which is fine since
196+
// all nodes will randomly be shuffled to the front and back.
197+
long millisSinceUp = nowMillis - node.getUpSinceMillis();
198+
if (millisSinceUp < (DELAY_TRAFFIC_MILLIS + DELAY_TRAFFIC_SKEW_MILLIS)) {
199+
double pShed = 1.0 - ((double) millisSinceUp / (DELAY_TRAFFIC_MILLIS + DELAY_TRAFFIC_SKEW_MILLIS));
200+
if (pShed > getRandom().nextDouble()) {
201+
if (LOG.isTraceEnabled()) {
202+
String shed = String.format("%.2f", pShed);
203+
LOG.trace("[{}] shedding at startup [pShed={}, millisSinceUp={}]", node, shed, millisSinceUp);
204+
}
205+
weight *= this.weightStarting; // 16.0
206+
}
207+
}
208+
209+
double score = base * weight;
210+
if (LOG.isDebugEnabled()) {
211+
String msg = String.format("score=%.2f [base=%d, weight=%.2f]", score, base, weight);
212+
LOG.debug("[{}] {}", node, msg);
213+
}
214+
return score;
215+
}
216+
217+
protected long milliTime() {
218+
return System.currentTimeMillis();
219+
}
220+
221+
protected Random getRandom() {
222+
return ThreadLocalRandom.current();
223+
}
224+
225+
/**
226+
* Wraps a Node alongside its score.
227+
*
228+
* Calculating scores is expensive, and not stable (could vary). By wrapping them we can be sure the score
229+
* is calculated only once and does not change during processing.
230+
*/
231+
static class NodeScore {
232+
final double score;
233+
final Node node;
234+
235+
public NodeScore(Node node, double score) {
236+
this.node = node;
237+
this.score = score;
238+
}
239+
240+
public Node getNode() {
241+
return node;
242+
}
243+
244+
public double getScore() {
245+
return score;
246+
}
247+
248+
@Override
249+
public boolean equals(Object o) {
250+
if (this == o) {
251+
return true;
252+
}
253+
if (o == null || getClass() != o.getClass()) {
254+
return false;
255+
}
256+
NodeScore nodeScore = (NodeScore) o;
257+
return Double.compare(score, nodeScore.score) == 0 && Objects.equals(node, nodeScore.node);
258+
}
259+
260+
@Override
261+
public int hashCode() {
262+
return Objects.hash(score, node);
263+
}
264+
}
265+
}

core/src/main/resources/reference.conf

+44
Original file line numberDiff line numberDiff line change
@@ -575,6 +575,50 @@ datastax-java-driver {
575575
# Overridable in a profile: yes
576576
allow-for-local-consistency-levels = false
577577
}
578+
579+
# These configuration options apply when using the WeightedLoadBalancingPolicy.
580+
# That policy calculates scores for 8 nodes (unless you modify scored-plan-size), multiplies
581+
# them by the number of in-flight requests, then sorts the nodes by weight. The default weights
582+
# will prefer in-rack replicas, followed by non-rack replicas, then rack non-replicas, followed
583+
# by nodes that are not "unhealthy", followed by "unhealthy", and then the rest of the nodes.
584+
# There is also an aversion to nodes that have recently started up, based on the node's
585+
# advertised "millis since up". Changing the weights can change the order of preference.
586+
weighted {
587+
# How many items in the plan to score.
588+
#
589+
# Required: no
590+
# Modifiable at runtime: no
591+
# Overridable in a profile: yes
592+
// scored-plan-size: 8
593+
594+
# Weight to apply when load balancing for a non-rack node.
595+
#
596+
# Required: no
597+
# Modifiable at runtime: no
598+
# Overridable in a profile: yes
599+
// non-rack: 4.0
600+
601+
# Weight to apply when load balancing for a non-replica node.
602+
#
603+
# Required: no
604+
# Modifiable at runtime: no
605+
# Overridable in a profile: yes
606+
// non-replica: 8.0
607+
608+
# Weight to apply when load balancing for a node that is still starting up.
609+
#
610+
# Required: no
611+
# Modifiable at runtime: no
612+
# Overridable in a profile: yes
613+
// starting: 16.0
614+
615+
# Weight to apply when load balancing for an unhealthy node.
616+
#
617+
# Required: no
618+
# Modifiable at runtime: no
619+
# Overridable in a profile: yes
620+
// unhealthy: 64.0
621+
}
578622
}
579623

580624
# Whether to schedule reconnection attempts if all contact points are unreachable on the first

0 commit comments

Comments
 (0)