Skip to content

Commit

Permalink
Clean shutdowns (#331)
Browse files Browse the repository at this point in the history
* add an async shutdown method to sinks
* add a method to shutdown health checks
* add connection close on ingest actor
  • Loading branch information
BrandonArp authored May 7, 2024
1 parent ed51a84 commit b07bc77
Show file tree
Hide file tree
Showing 31 changed files with 518 additions and 41 deletions.
2 changes: 2 additions & 0 deletions config/config.conf
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ clusterHostSuffix=".cluster"
reaggregationDimensions=["host"]
#reaggregationInjectClusterAsHost=True
#reaggregationTimeout="PT1M"
healthcheckShutdownDelay="PT5S"

# Pekko
# ~~~~
Expand All @@ -77,6 +78,7 @@ pekkoConfiguration {
unhandled="on"
}
}
coordinated-shutdown.run-by-jvm-shutdown-hook = off
cluster {
seed-nodes=["pekko://[email protected]:2551"]
downing-provider-class = "org.apache.pekko.cluster.sbr.SplitBrainResolverProvider"
Expand Down
Binary file removed lib/awaitility-4.0.2.jar
Binary file not shown.
Binary file removed lib/hamcrest-2.1.jar
Binary file not shown.
8 changes: 7 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@
<name>Ville Koskela</name>
<email>[email protected]</email>
<organization>Inscope Metrics</organization>
<organizationUrl>http://www.inscopemetrics.com</organizationUrl>
<organizationUrl>https://www.inscopemetrics.com</organizationUrl>
<roles>
<role>developer</role>
</roles>
Expand Down Expand Up @@ -191,6 +191,12 @@
<directory>src/main/resources</directory>
<filtering>true</filtering>
</resource>
<resource>
<directory>config</directory>
<includes>
<include>**/*.conf</include>
</includes>
</resource>
</resources>
<plugins>
<!-- Enable Inherited Plugins -->
Expand Down
54 changes: 53 additions & 1 deletion src/main/java/com/arpnetworking/clusteraggregator/Emitter.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@
import com.google.common.collect.ImmutableMap;
import org.apache.pekko.actor.AbstractActor;
import org.apache.pekko.actor.Props;
import org.apache.pekko.pattern.Patterns;

import java.time.Duration;
import java.time.ZonedDateTime;
import java.util.concurrent.CompletionStage;

/**
* Holds the sinks and emits to them.
Expand All @@ -43,7 +45,7 @@ public class Emitter extends AbstractActor {
* @return A new {@link Props}.
*/
public static Props props(final EmitterConfiguration config) {
return Props.create(Emitter.class, config);
return Props.create(Emitter.class, () -> new Emitter(config));
}

/**
Expand All @@ -62,6 +64,11 @@ public Emitter(final EmitterConfiguration config) {
.log();
}

@Override
public void preStart() throws Exception, Exception {
super.preStart();
}

@SuppressWarnings("deprecation")
@Override
public Receive createReceive() {
Expand Down Expand Up @@ -90,6 +97,22 @@ public Receive createReceive() {
.log();
_sink.recordAggregateData(periodicData);
})
.match(Shutdown.class, ignored -> {
LOGGER.info()
.setMessage("Shutting down emitter")
.log();

final CompletionStage<Object> shutdownFuture = _sink.shutdownGracefully()
.thenApply(ignore -> ShutdownComplete.getInstance());
Patterns.pipe(shutdownFuture, context().dispatcher()).to(self(), sender());
})
.match(ShutdownComplete.class, ignored -> {
LOGGER.info()
.setMessage("Emitter shutdown complete")
.log();
sender().tell("OK", self());
context().stop(self());
})
.build();
}

Expand All @@ -101,4 +124,33 @@ public void postStop() throws Exception {

private final Sink _sink;
private static final Logger LOGGER = LoggerFactory.getLogger(Emitter.class);
/**
* Message to initiate a graceful shutdown.
*/
public static final class Shutdown {
private Shutdown() {}

/**
* Get the singleton instance.
*
* @return the singleton instance
*/
public static Shutdown getInstance() {
return INSTANCE;
}
private static final Shutdown INSTANCE = new Shutdown();
}
private static final class ShutdownComplete {
private ShutdownComplete() {}

/**
* Get the singleton instance.
*
* @return the singleton instance
*/
public static ShutdownComplete getInstance() {
return INSTANCE;
}
private static final ShutdownComplete INSTANCE = new ShutdownComplete();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package com.arpnetworking.clusteraggregator;

import com.arpnetworking.clusteraggregator.client.HttpSourceActor;
import com.arpnetworking.clusteraggregator.http.Routes;
import com.arpnetworking.steno.Logger;
import com.arpnetworking.steno.LoggerFactory;
import com.google.inject.Inject;
Expand All @@ -23,9 +25,13 @@
import org.apache.pekko.actor.AbstractActor;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.actor.Terminated;
import org.apache.pekko.cluster.Cluster;
import org.apache.pekko.cluster.sharding.ShardRegion;
import org.apache.pekko.pattern.Patterns;

import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

/**
* Shuts down the Pekko cluster gracefully.
Expand All @@ -37,11 +43,27 @@ public class GracefulShutdownActor extends AbstractActor {
* Public constructor.
*
* @param shardRegion aggregator shard region
* @param hostEmitter host emitter
* @param clusterEmitter cluster emitter
* @param routes routes
* @param healthcheckShutdownDelay delay after shutting down healthcheck before shutting down emitters
* @param ingestActor ingest actor
*/
@Inject
@SuppressFBWarnings(value = "MC_OVERRIDABLE_METHOD_CALL_IN_CONSTRUCTOR", justification = "Context is safe to use in constructor.")
public GracefulShutdownActor(@Named("aggregator-shard-region") final ActorRef shardRegion) {
public GracefulShutdownActor(
@Named("aggregator-shard-region") final ActorRef shardRegion,
@Named("host-emitter") final ActorRef hostEmitter,
@Named("cluster-emitter") final ActorRef clusterEmitter,
final Routes routes,
@Named("healthcheck-shutdown-delay") final Duration healthcheckShutdownDelay,
@Named("http-ingest-v1") final ActorRef ingestActor) {
_shardRegion = shardRegion;
_hostEmitter = hostEmitter;
_clusterEmitter = clusterEmitter;
_routes = routes;
_healthcheckShutdownDelay = healthcheckShutdownDelay;
_ingestActor = ingestActor;
}

@Override
Expand All @@ -52,13 +74,65 @@ public Receive createReceive() {
.setMessage("Initiating graceful shutdown")
.addData("actor", self())
.log();
context().watch(_shardRegion);
self().tell(ShutdownHealthcheck.getInstance(), sender());
})
.match(ShutdownHealthcheck.class, message -> {
LOGGER.info()
.setMessage("Shutting down healthcheck")
.addData("actor", self())
.log();
_routes.shutdownHealthcheck();
_ingestActor.tell(HttpSourceActor.Shutdown.getInstance(), self());
LOGGER.info()
.setMessage("Waiting before proceeding with shutdown of emitters")
.addData("delay", _healthcheckShutdownDelay)
.addData("actor", self())
.log();
context().system().scheduler().scheduleOnce(
_healthcheckShutdownDelay,
self(),
ShutdownEmitter.getInstance(),
context().dispatcher(),
sender());
})
.match(ShutdownEmitter.class, message -> {
LOGGER.info()
.setMessage("Shutting down emitters")
.addData("actor", self())
.log();
final CompletionStage<Object> host = Patterns.ask(_hostEmitter,
Emitter.Shutdown.getInstance(),
Duration.ofSeconds(30));
final CompletionStage<Object> cluster = Patterns.ask(_clusterEmitter,
Emitter.Shutdown.getInstance(),
Duration.ofSeconds(30));
final CompletableFuture<ShutdownShardRegion> allShutdown = CompletableFuture.allOf(
host.toCompletableFuture(),
cluster.toCompletableFuture())
.thenApply(result -> ShutdownShardRegion.getInstance());
Patterns.pipe(allShutdown, context().dispatcher()).to(self(), sender());

})
.match(ShutdownShardRegion.class, message -> {
LOGGER.info()
.setMessage("Shutting down shard region")
.addData("actor", self())
.log();
context().watchWith(_shardRegion, new ShutdownShardRegionComplete(sender()));
_shardRegion.tell(ShardRegion.gracefulShutdownInstance(), self());
})
.match(Terminated.class, terminated -> {
.match(ShutdownShardRegionComplete.class, terminated -> {
terminated._replyTo.tell("OK", self());
_cluster.registerOnMemberRemoved(_system::terminate);
_cluster.leave(_cluster.selfAddress());
})
.matchAny(unhandled -> {
LOGGER.warn()
.setMessage("Received unhandled message")
.addData("message", unhandled)
.addData("actor", self())
.log();
})
.build();
}

Expand All @@ -69,7 +143,12 @@ public void preStart() throws Exception {
_system = context().system();
}

private ActorRef _shardRegion;
private final ActorRef _shardRegion;
private final ActorRef _hostEmitter;
private final ActorRef _clusterEmitter;
private final Routes _routes;
private final Duration _healthcheckShutdownDelay;
private final ActorRef _ingestActor;
private Cluster _cluster;
private ActorSystem _system;
private static final Logger LOGGER = LoggerFactory.getLogger(GracefulShutdownActor.class);
Expand All @@ -83,10 +162,38 @@ private Shutdown() {}
*
* @return a singleton instance
*/
public static Shutdown instance() {
public static Shutdown getInstance() {
return SHUTDOWN;
}

private static final Shutdown SHUTDOWN = new Shutdown();
}
private static final class ShutdownHealthcheck {
private ShutdownHealthcheck() {}
public static ShutdownHealthcheck getInstance() {
return INSTANCE;
}
private static final ShutdownHealthcheck INSTANCE = new ShutdownHealthcheck();
}
private static final class ShutdownShardRegion {
private ShutdownShardRegion() {}
public static ShutdownShardRegion getInstance() {
return INSTANCE;
}
private static final ShutdownShardRegion INSTANCE = new ShutdownShardRegion();
}
private static final class ShutdownEmitter {
private ShutdownEmitter() {}
public static ShutdownEmitter getInstance() {
return INSTANCE;
}
private static final ShutdownEmitter INSTANCE = new ShutdownEmitter();
}
private static final class ShutdownShardRegionComplete {
ShutdownShardRegionComplete(final ActorRef replyTo) {
_replyTo = replyTo;
}

private final ActorRef _replyTo;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ private ActorRef provideHostEmitter(final Injector injector, final ActorSystem s

private ActorRef launchEmitter(final Injector injector, final ActorSystem system, final File pipelineFile, final String name) {
final ActorRef emitterConfigurationProxy = system.actorOf(
ConfigurableActorProxy.props(new RoundRobinEmitterFactory()),
ConfigurableActorProxy.props(new RoundRobinEmitterFactory(_shutdown)),
name);
final ActorConfigurator<EmitterConfiguration> configurator =
new ActorConfigurator<>(emitterConfigurationProxy, EmitterConfiguration.class);
Expand Down Expand Up @@ -384,6 +384,13 @@ private boolean provideReaggregationInjectClusterAsHost(final ClusterAggregatorC
return config.getReaggregationInjectClusterAsHost();
}

@Provides
@Named("healthcheck-shutdown-delay")
@SuppressFBWarnings("UPM_UNCALLED_PRIVATE_METHOD") // Invoked reflectively by Guice
private Duration provideHealthCheckShutdownDelay(final ClusterAggregatorConfiguration config) {
return config.getHealthcheckShutdownDelay();
}

@Provides
@Named("reaggregation-timeout")
@SuppressFBWarnings("UPM_UNCALLED_PRIVATE_METHOD") // Invoked reflectively by Guice
Expand Down Expand Up @@ -440,6 +447,11 @@ static List<Sink> createSinks(final ImmutableList<JsonNode> monitoringSinks) {
private static final ObjectMapper OBJECT_MAPPER = ObjectMapperFactory.getInstance();

private static final class RoundRobinEmitterFactory implements ConfiguredLaunchableFactory<Props, EmitterConfiguration> {
/**
* Constructor.
*/
RoundRobinEmitterFactory(final LifecycleRegistration shutdown) {
}

@Override
public Props create(final EmitterConfiguration config) {
Expand Down
24 changes: 16 additions & 8 deletions src/main/java/com/arpnetworking/clusteraggregator/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,19 @@
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.http.javadsl.ServerBinding;
import org.apache.pekko.pattern.Patterns;
import org.slf4j.LoggerFactory;
import scala.concurrent.Await;
import scala.concurrent.duration.Duration;

import java.io.File;
import java.time.Duration;
import java.util.List;
import java.util.Locale;
import java.util.Optional;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
* Entry point for the pekko-based cluster aggregator.
Expand Down Expand Up @@ -236,15 +238,21 @@ private void shutdownPekko() {
LOGGER.info()
.setMessage("Stopping Pekko")
.log();
if (_shutdownActor != null) {
_shutdownActor.tell(GracefulShutdownActor.Shutdown.instance(), ActorRef.noSender());
}

try {
if (_shutdownActor != null) {
final CompletionStage<Object> gracefulShutdown =
Patterns.ask(_shutdownActor, GracefulShutdownActor.Shutdown.getInstance(), Duration.ofMinutes(10));
gracefulShutdown.toCompletableFuture().join();
LOGGER.info()
.setMessage("Graceful shutdown actor reported completion")
.log();
}
if (_system != null) {
Await.result(_system.whenTerminated(), SHUTDOWN_TIMEOUT);
_system.getWhenTerminated().toCompletableFuture().get(SHUTDOWN_TIMEOUT.toSeconds(), TimeUnit.SECONDS);
}
// CHECKSTYLE.OFF: IllegalCatch - Prevent program shutdown
} catch (final Exception e) {
} catch (final InterruptedException | TimeoutException | ExecutionException e) {
// CHECKSTYLE.ON: IllegalCatch
LOGGER.warn()
.setMessage("Interrupted at shutdown")
Expand Down Expand Up @@ -274,7 +282,7 @@ private static Builder<? extends JsonNodeSource> getFileSourceBuilder(
private volatile List<Database> _databases;

private static final Logger LOGGER = com.arpnetworking.steno.LoggerFactory.getLogger(Main.class);
private static final Duration SHUTDOWN_TIMEOUT = Duration.create(3, TimeUnit.MINUTES);
private static final Duration SHUTDOWN_TIMEOUT = Duration.ofMinutes(3);
private static final SourceTypeLiteral SOURCE_TYPE_LITERAL = new SourceTypeLiteral();
private static final Semaphore SHUTDOWN_SEMAPHORE = new Semaphore(0);
private static final Thread SHUTDOWN_THREAD = new ShutdownThread();
Expand Down
Loading

0 comments on commit b07bc77

Please sign in to comment.