-
Notifications
You must be signed in to change notification settings - Fork 0
/
YARN-1458-2.4.1.patch
200 lines (197 loc) · 8.31 KB
/
YARN-1458-2.4.1.patch
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/ComputeFairShares.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/ComputeFairShares.java
index c8fd450..ca7d0d8 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/ComputeFairShares.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/ComputeFairShares.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies;
+import java.util.ArrayList;
import java.util.Collection;
import org.apache.hadoop.yarn.api.records.Resource;
@@ -76,8 +77,13 @@ public class ComputeFairShares {
* iterations of binary search is a constant (dependent on desired precision).
*/
public static void computeShares(
- Collection<? extends Schedulable> schedulables, Resource totalResources,
- ResourceType type) {
+ Collection<? extends Schedulable> allSchedulables,
+ Resource totalResources, ResourceType type) {
+
+ Collection<Schedulable> schedulables = new ArrayList<Schedulable>();
+ int takenResources = handleFixedFairShares(
+ allSchedulables, schedulables, type);
+
if (schedulables.isEmpty()) {
return;
}
@@ -94,8 +100,10 @@ public class ComputeFairShares {
totalMaxShare += maxShare;
}
}
- int totalResource = Math.min(totalMaxShare,
- getResourceValue(totalResources, type));
+
+ int totalResource = Math.max((getResourceValue(totalResources, type) -
+ takenResources), 0);
+ totalResource = Math.min(totalMaxShare, totalResource);
double rMax = 1.0;
while (resourceUsedWithWeightToResourceRatio(rMax, schedulables, type)
@@ -145,7 +153,55 @@ public class ComputeFairShares {
share = Math.min(share, getResourceValue(sched.getMaxShare(), type));
return (int) share;
}
-
+
+ /**
+ * Helper method to handle Schedulabes with fixed fairshares.
+ * Returns the resources taken by fixed fairshare schedulables,
+ * and adds the remaining to the passed nonFixedSchedulables.
+ */
+ private static int handleFixedFairShares(
+ Collection<? extends Schedulable> schedulables,
+ Collection<Schedulable> nonFixedSchedulables,
+ ResourceType type) {
+ int totalResource = 0;
+
+ for (Schedulable sched : schedulables) {
+ int fixedShare = getFairShareIfFixed(sched, type);
+ if (fixedShare < 0) {
+ nonFixedSchedulables.add(sched);
+ } else {
+ setResourceValue(fixedShare,
+ sched.getFairShare(),
+ type);
+ totalResource = (int) Math.min((long)totalResource + (long)fixedShare,
+ Integer.MAX_VALUE);
+ }
+ }
+ return totalResource;
+ }
+
+ /**
+ * Get the fairshare for the {@link Schedulable} if it is fixed, -1 otherwise.
+ *
+ * The fairshare is fixed if either the maxShare is 0 or weight is 0.
+ */
+ private static int getFairShareIfFixed(Schedulable sched,
+ ResourceType type) {
+
+ // Check if maxShare is 0
+ if (getResourceValue(sched.getMaxShare(), type) <= 0) {
+ return 0;
+ }
+
+ // Check if weight is 0
+ if (sched.getWeights().getWeight(type) <= 0) {
+ int minShare = getResourceValue(sched.getMinShare(), type);
+ return (minShare <= 0) ? 0 : minShare;
+ }
+
+ return -1;
+ }
+
private static int getResourceValue(Resource resource, ResourceType type) {
switch (type) {
case MEMORY:
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
index e3fbb49..968b3da 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
@@ -417,7 +417,96 @@ public class TestFairScheduler {
assertEquals(3414, p.getMetrics().getFairShareMB());
}
}
-
+
+ @Test
+ public void testFairShareWithZeroWeight() throws IOException {
+ conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
+ // set queueA and queueB weight zero.
+ PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+ out.println("<?xml version=\"1.0\"?>");
+ out.println("<allocations>");
+ out.println("<queue name=\"queueA\">");
+ out.println("<weight>0.0</weight>");
+ out.println("</queue>");
+ out.println("<queue name=\"queueB\">");
+ out.println("<weight>0.0</weight>");
+ out.println("</queue>");
+ out.println("</allocations>");
+ out.close();
+
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+ // Add one big node (only care about aggregate capacity)
+ RMNode node1 =
+ MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 1,
+ "127.0.0.1");
+ NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
+ scheduler.handle(nodeEvent1);
+
+ // Queue A wants 2 * 1024.
+ createSchedulingRequest(2 * 1024, "queueA", "user1");
+ // Queue B wants 6 * 1024
+ createSchedulingRequest(6 * 1024, "queueB", "user1");
+
+ scheduler.update();
+
+ FSLeafQueue queue = scheduler.getQueueManager().getLeafQueue(
+ "queueA", false);
+ // queueA's weight is 0.0, so its fair share should be 0.
+ assertEquals(0, queue.getFairShare().getMemory());
+ // queueB's weight is 0.0, so its fair share should be 0.
+ queue = scheduler.getQueueManager().getLeafQueue(
+ "queueB", false);
+ assertEquals(0, queue.getFairShare().getMemory());
+ }
+
+ @Test
+ public void testFairShareWithZeroWeightNoneZeroMinRes() throws IOException {
+ conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
+ // set queueA and queueB weight zero.
+ // set queueA and queueB minResources 1.
+ PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+ out.println("<?xml version=\"1.0\"?>");
+ out.println("<allocations>");
+ out.println("<queue name=\"queueA\">");
+ out.println("<minResources>1 mb 1 vcores</minResources>");
+ out.println("<weight>0.0</weight>");
+ out.println("</queue>");
+ out.println("<queue name=\"queueB\">");
+ out.println("<minResources>1 mb 1 vcores</minResources>");
+ out.println("<weight>0.0</weight>");
+ out.println("</queue>");
+ out.println("</allocations>");
+ out.close();
+
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+ // Add one big node (only care about aggregate capacity)
+ RMNode node1 =
+ MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 1,
+ "127.0.0.1");
+ NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
+ scheduler.handle(nodeEvent1);
+
+ // Queue A wants 2 * 1024.
+ createSchedulingRequest(2 * 1024, "queueA", "user1");
+ // Queue B wants 6 * 1024
+ createSchedulingRequest(6 * 1024, "queueB", "user1");
+
+ scheduler.update();
+
+ FSLeafQueue queue = scheduler.getQueueManager().getLeafQueue(
+ "queueA", false);
+ // queueA's weight is 0.0 and minResources is 1,
+ // so its fair share should be 1 (minShare).
+ assertEquals(1, queue.getFairShare().getMemory());
+ // queueB's weight is 0.0 and minResources is 1,
+ // so its fair share should be 1 (minShare).
+ queue = scheduler.getQueueManager().getLeafQueue(
+ "queueB", false);
+ assertEquals(1, queue.getFairShare().getMemory());
+ }
+
@Test
public void testSimpleHierarchicalFairShareCalculation() throws IOException {
scheduler.reinitialize(conf, resourceManager.getRMContext());