Skip to content

Commit

Permalink
Fix the missing cluster_spec due to the null when getting clusterSpec…
Browse files Browse the repository at this point in the history
… in task executor (#663)

Signed-off-by: zhangjunfan <[email protected]>
  • Loading branch information
zuston authored Apr 24, 2022
1 parent e522354 commit 63fe90f
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -912,7 +912,7 @@ public String getClusterSpec(String taskId) throws IOException {
if (amRuntimeAdapter.canStartTask(distributedMode, taskId)) {
return amRuntimeAdapter.constructClusterSpec(taskId);
}
return null;
return StringUtils.EMPTY;
}

@Override
Expand Down
8 changes: 7 additions & 1 deletion tony-core/src/main/java/com/linkedin/tony/TaskExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,13 @@ private String registerAndGetClusterSpec() throws IOException {
throw new IOException("Errors on registering to AM, maybe due to the network failure.");
}

return Utils.pollForeverTillNonNull(() -> proxy.getClusterSpec(taskId), DEFAULT_REQUEST_POLL_INTERVAL);
return Utils.pollTillConditionReached(
() -> proxy.getClusterSpec(taskId),
x -> StringUtils.isNotEmpty(x),
() -> null,
DEFAULT_REQUEST_POLL_INTERVAL,
0
);
}

public void callbackInfoToAM(String taskId, String callbackInfo) throws IOException {
Expand Down
10 changes: 3 additions & 7 deletions tony-core/src/main/java/com/linkedin/tony/util/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,6 @@ public static <T> T pollTillNonNull(Callable<T> func, int interval, int timeout)
return pollTillConditionReached(func, Objects::nonNull, () -> null, interval, timeout);
}

public static <T> T pollForeverTillNonNull(Callable<T> func, int interval) {
return pollTillNonNull(func, interval, 0);
}

public static <T> T pollTillConditionReached(Callable<T> callFunc, Function<T, Boolean> conditionFunc,
CallableWithoutException<T> defaultReturnedFunc, int interval, int timeout) {
Preconditions.checkArgument(interval >= 0, "Interval must be non-negative.");
Expand All @@ -133,16 +129,16 @@ public static <T> T pollTillConditionReached(Callable<T> callFunc, Function<T, B
while (timeout == 0 || remainingTime >= 0) {
ret = callFunc.call();
if (conditionFunc.apply(ret)) {
LOG.info("pollTillNonNull function finished within " + timeout + " seconds");
LOG.info("pollTillConditionReached function finished within " + timeout + " seconds");
return ret;
}
Thread.sleep(interval * 1000);
remainingTime -= interval;
}
} catch (Exception e) {
LOG.error("pollTillNonNull function threw exception", e);
LOG.error("pollTillConditionReached function threw exception", e);
}
LOG.warn("Function didn't return non-null within " + timeout + " seconds.");
LOG.warn("Function didn't satisfy applied condition within " + timeout + " seconds.");
return defaultReturnedFunc.call();
}

Expand Down

0 comments on commit 63fe90f

Please sign in to comment.