Skip to content

Commit

Permalink
[CELEBORN-819] Worker close should pass close status to support handl…
Browse files Browse the repository at this point in the history
…e graceful shutdown and decommission

### What changes were proposed in this pull request?
Pass exit kind to each component, if the exit kind match:

- GRACEFUL_SHUTDOWN: Behavior as origin code's graceful == true
- Others: will clean the level db file.

### Why are the changes needed?

### Does this PR introduce _any_ user-facing change?

### How was this patch tested?

Closes #1748 from AngersZhuuuu/CELEBORN-819.

Authored-by: Angerszhuuuu <[email protected]>
Signed-off-by: Angerszhuuuu <[email protected]>
(cherry picked from commit 2ab88f7)
Signed-off-by: Angerszhuuuu <[email protected]>
  • Loading branch information
AngersZhuuuu committed Jul 25, 2023
1 parent 627bc55 commit 3a674fe
Show file tree
Hide file tree
Showing 13 changed files with 162 additions and 103 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.celeborn.common.metrics.source.AbstractSource;
import org.apache.celeborn.common.network.TransportContext;
import org.apache.celeborn.common.network.util.*;
import org.apache.celeborn.common.util.CelebornExitKind;
import org.apache.celeborn.common.util.JavaUtils;

/** Server for the efficient, low-level streaming service. */
Expand Down Expand Up @@ -130,24 +131,24 @@ protected void initChannel(SocketChannel ch) {

@Override
public void close() {
shutdown(true);
shutdown(CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN());
}

public void shutdown(boolean graceful) {
public void shutdown(int exitKind) {
if (channelFuture != null) {
// close is a local operation and should finish within milliseconds; timeout just to be safe
channelFuture.channel().close().awaitUninterruptibly(10, TimeUnit.SECONDS);
channelFuture = null;
}
if (bootstrap != null && bootstrap.config().group() != null) {
if (graceful) {
if (exitKind == CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN()) {
bootstrap.config().group().shutdownGracefully();
} else {
bootstrap.config().group().shutdownGracefully(0, 0, TimeUnit.SECONDS);
}
}
if (bootstrap != null && bootstrap.config().childGroup() != null) {
if (graceful) {
if (exitKind == CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN()) {
bootstrap.config().childGroup().shutdownGracefully();
} else {
bootstrap.config().childGroup().shutdownGracefully(0, 0, TimeUnit.SECONDS);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.celeborn.common.util

private[celeborn] object CelebornExitKind {
val EXIT_IMMEDIATELY = 0
val WORKER_GRACEFUL_SHUTDOWN = 1
val WORKER_DECOMMISSION = 2
}
Original file line number Diff line number Diff line change
Expand Up @@ -894,11 +894,11 @@ private[celeborn] class Master(
rpcEnv.awaitTermination()
}

override def stop(graceful: Boolean): Unit = synchronized {
override def stop(exitKind: Int): Unit = synchronized {
if (!stopped) {
logInfo("Stopping Master")
rpcEnv.stop(self)
super.stop(false)
super.stop(exitKind)
logInfo("Master stopped.")
stopped = true
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.scalatest.funsuite.AnyFunSuite

import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.util.CelebornExitKind

class MasterSuite extends AnyFunSuite
with BeforeAndAfterAll
Expand Down Expand Up @@ -53,7 +54,7 @@ class MasterSuite extends AnyFunSuite
}
}.start()
Thread.sleep(5000L)
master.stop(false)
master.stop(CelebornExitKind.EXIT_IMMEDIATELY)
master.rpcEnv.shutdown()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,11 @@ abstract class HttpService extends Service with Logging {
startHttpServer()
}

override def stop(graceful: Boolean): Unit = {
override def stop(exitKind: Int): Unit = {
// may be null when running the unit test
if (null != httpServer) {
httpServer.stop(graceful)
httpServer.stop(exitKind)
}
super.stop(graceful)
super.stop(exitKind)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ abstract class Service extends Logging {
}
}

def stop(graceful: Boolean): Unit = {}
def stop(exitKind: Int): Unit = {}
}

object Service {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import io.netty.handler.logging.{LoggingHandler, LogLevel}

import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.network.util.{IOMode, NettyUtils}
import org.apache.celeborn.common.util.Utils
import org.apache.celeborn.common.util.{CelebornExitKind, Utils}

class HttpServer(
role: String,
Expand Down Expand Up @@ -56,7 +56,7 @@ class HttpServer(
isStarted = true
}

def stop(graceful: Boolean): Unit = synchronized {
def stop(exitCode: Int): Unit = synchronized {
if (isStarted) {
logInfo(s"$role: Stopping HttpServer")
if (bindFuture != null) {
Expand All @@ -66,7 +66,7 @@ class HttpServer(
}
if (bootstrap != null && bootstrap.config.group != null) {
Utils.tryLogNonFatalError {
if (graceful) {
if (exitCode == CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN) {
bootstrap.config.group.shutdownGracefully(3, 5, TimeUnit.SECONDS)
} else {
bootstrap.config.group.shutdownGracefully(0, 0, TimeUnit.SECONDS)
Expand All @@ -75,7 +75,7 @@ class HttpServer(
}
if (bootstrap != null && bootstrap.config.childGroup != null) {
Utils.tryLogNonFatalError {
if (graceful) {
if (exitCode == CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN) {
bootstrap.config.childGroup.shutdownGracefully(3, 5, TimeUnit.SECONDS)
} else {
bootstrap.config.childGroup.shutdownGracefully(0, 0, TimeUnit.SECONDS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,10 +239,10 @@ public void cleanup(HashSet<String> expiredShuffleKeys) {
}
}

public void close() {
public void close(int exitKind) {
logger.info("Closing {}", this.getClass().getSimpleName());
shutdown = true;
if (gracefulShutdown) {
if (exitKind == CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN()) {
long start = System.currentTimeMillis();
try {
fileSorterExecutors.shutdown();
Expand All @@ -254,21 +254,29 @@ public void close() {
} catch (InterruptedException e) {
logger.error("Await partition sorter executor shutdown catch exception: ", e);
}
if (sortedFilesDb != null) {
try {
updateSortedShuffleFilesInDB();
sortedFilesDb.close();
} catch (IOException e) {
logger.error("Store recover data to LevelDB failed.", e);
}
}
long end = System.currentTimeMillis();
logger.info("Await partition sorter executor complete cost " + (end - start) + "ms");
} else {
fileSorterSchedulerThread.interrupt();
fileSorterExecutors.shutdownNow();
}
cachedIndexMaps.clear();
if (sortedFilesDb != null) {
try {
updateSortedShuffleFilesInDB();
sortedFilesDb.close();
} catch (IOException e) {
logger.error("Store recover data to LevelDB failed.", e);
if (sortedFilesDb != null) {
try {
sortedFilesDb.close();
recoverFile.delete();
} catch (IOException e) {
logger.error("Clean LevelDB failed.", e);
}
}
}
cachedIndexMaps.clear();
}

private void reloadAndCleanSortedShuffleFiles(DB db) {
Expand Down
Loading

0 comments on commit 3a674fe

Please sign in to comment.