diff --git a/config/config.conf b/config/config.conf index a33d1c73..47b1b95c 100644 --- a/config/config.conf +++ b/config/config.conf @@ -52,6 +52,7 @@ clusterHostSuffix=".cluster" reaggregationDimensions=["host"] #reaggregationInjectClusterAsHost=True #reaggregationTimeout="PT1M" +healthcheckShutdownDelay="PT5S" # Pekko # ~~~~ @@ -77,6 +78,7 @@ pekkoConfiguration { unhandled="on" } } + coordinated-shutdown.run-by-jvm-shutdown-hook = off cluster { seed-nodes=["pekko://Metrics@127.0.0.1:2551"] downing-provider-class = "org.apache.pekko.cluster.sbr.SplitBrainResolverProvider" diff --git a/lib/awaitility-4.0.2.jar b/lib/awaitility-4.0.2.jar deleted file mode 100644 index 293f83d8..00000000 Binary files a/lib/awaitility-4.0.2.jar and /dev/null differ diff --git a/lib/hamcrest-2.1.jar b/lib/hamcrest-2.1.jar deleted file mode 100644 index e323d5e8..00000000 Binary files a/lib/hamcrest-2.1.jar and /dev/null differ diff --git a/pom.xml b/pom.xml index 5a45ebe7..4524defa 100644 --- a/pom.xml +++ b/pom.xml @@ -85,7 +85,7 @@ Ville Koskela ville.koskela@inscopemetrics.com Inscope Metrics - http://www.inscopemetrics.com + https://www.inscopemetrics.com developer @@ -191,6 +191,12 @@ src/main/resources true + + config + + **/*.conf + + diff --git a/src/main/java/com/arpnetworking/clusteraggregator/Emitter.java b/src/main/java/com/arpnetworking/clusteraggregator/Emitter.java index 78c81dcd..c50e8f70 100644 --- a/src/main/java/com/arpnetworking/clusteraggregator/Emitter.java +++ b/src/main/java/com/arpnetworking/clusteraggregator/Emitter.java @@ -26,9 +26,11 @@ import com.google.common.collect.ImmutableMap; import org.apache.pekko.actor.AbstractActor; import org.apache.pekko.actor.Props; +import org.apache.pekko.pattern.Patterns; import java.time.Duration; import java.time.ZonedDateTime; +import java.util.concurrent.CompletionStage; /** * Holds the sinks and emits to them. @@ -43,7 +45,7 @@ public class Emitter extends AbstractActor { * @return A new {@link Props}. */ public static Props props(final EmitterConfiguration config) { - return Props.create(Emitter.class, config); + return Props.create(Emitter.class, () -> new Emitter(config)); } /** @@ -62,6 +64,11 @@ public Emitter(final EmitterConfiguration config) { .log(); } + @Override + public void preStart() throws Exception, Exception { + super.preStart(); + } + @SuppressWarnings("deprecation") @Override public Receive createReceive() { @@ -90,6 +97,22 @@ public Receive createReceive() { .log(); _sink.recordAggregateData(periodicData); }) + .match(Shutdown.class, ignored -> { + LOGGER.info() + .setMessage("Shutting down emitter") + .log(); + + final CompletionStage shutdownFuture = _sink.shutdownGracefully() + .thenApply(ignore -> ShutdownComplete.getInstance()); + Patterns.pipe(shutdownFuture, context().dispatcher()).to(self(), sender()); + }) + .match(ShutdownComplete.class, ignored -> { + LOGGER.info() + .setMessage("Emitter shutdown complete") + .log(); + sender().tell("OK", self()); + context().stop(self()); + }) .build(); } @@ -101,4 +124,33 @@ public void postStop() throws Exception { private final Sink _sink; private static final Logger LOGGER = LoggerFactory.getLogger(Emitter.class); + /** + * Message to initiate a graceful shutdown. + */ + public static final class Shutdown { + private Shutdown() {} + + /** + * Get the singleton instance. + * + * @return the singleton instance + */ + public static Shutdown getInstance() { + return INSTANCE; + } + private static final Shutdown INSTANCE = new Shutdown(); + } + private static final class ShutdownComplete { + private ShutdownComplete() {} + + /** + * Get the singleton instance. + * + * @return the singleton instance + */ + public static ShutdownComplete getInstance() { + return INSTANCE; + } + private static final ShutdownComplete INSTANCE = new ShutdownComplete(); + } } diff --git a/src/main/java/com/arpnetworking/clusteraggregator/GracefulShutdownActor.java b/src/main/java/com/arpnetworking/clusteraggregator/GracefulShutdownActor.java index 8fe2c38b..dea6d29a 100644 --- a/src/main/java/com/arpnetworking/clusteraggregator/GracefulShutdownActor.java +++ b/src/main/java/com/arpnetworking/clusteraggregator/GracefulShutdownActor.java @@ -15,6 +15,8 @@ */ package com.arpnetworking.clusteraggregator; +import com.arpnetworking.clusteraggregator.client.HttpSourceActor; +import com.arpnetworking.clusteraggregator.http.Routes; import com.arpnetworking.steno.Logger; import com.arpnetworking.steno.LoggerFactory; import com.google.inject.Inject; @@ -23,9 +25,13 @@ import org.apache.pekko.actor.AbstractActor; import org.apache.pekko.actor.ActorRef; import org.apache.pekko.actor.ActorSystem; -import org.apache.pekko.actor.Terminated; import org.apache.pekko.cluster.Cluster; import org.apache.pekko.cluster.sharding.ShardRegion; +import org.apache.pekko.pattern.Patterns; + +import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; /** * Shuts down the Pekko cluster gracefully. @@ -37,11 +43,27 @@ public class GracefulShutdownActor extends AbstractActor { * Public constructor. * * @param shardRegion aggregator shard region + * @param hostEmitter host emitter + * @param clusterEmitter cluster emitter + * @param routes routes + * @param healthcheckShutdownDelay delay after shutting down healthcheck before shutting down emitters + * @param ingestActor ingest actor */ @Inject @SuppressFBWarnings(value = "MC_OVERRIDABLE_METHOD_CALL_IN_CONSTRUCTOR", justification = "Context is safe to use in constructor.") - public GracefulShutdownActor(@Named("aggregator-shard-region") final ActorRef shardRegion) { + public GracefulShutdownActor( + @Named("aggregator-shard-region") final ActorRef shardRegion, + @Named("host-emitter") final ActorRef hostEmitter, + @Named("cluster-emitter") final ActorRef clusterEmitter, + final Routes routes, + @Named("healthcheck-shutdown-delay") final Duration healthcheckShutdownDelay, + @Named("http-ingest-v1") final ActorRef ingestActor) { _shardRegion = shardRegion; + _hostEmitter = hostEmitter; + _clusterEmitter = clusterEmitter; + _routes = routes; + _healthcheckShutdownDelay = healthcheckShutdownDelay; + _ingestActor = ingestActor; } @Override @@ -52,13 +74,65 @@ public Receive createReceive() { .setMessage("Initiating graceful shutdown") .addData("actor", self()) .log(); - context().watch(_shardRegion); + self().tell(ShutdownHealthcheck.getInstance(), sender()); + }) + .match(ShutdownHealthcheck.class, message -> { + LOGGER.info() + .setMessage("Shutting down healthcheck") + .addData("actor", self()) + .log(); + _routes.shutdownHealthcheck(); + _ingestActor.tell(HttpSourceActor.Shutdown.getInstance(), self()); + LOGGER.info() + .setMessage("Waiting before proceeding with shutdown of emitters") + .addData("delay", _healthcheckShutdownDelay) + .addData("actor", self()) + .log(); + context().system().scheduler().scheduleOnce( + _healthcheckShutdownDelay, + self(), + ShutdownEmitter.getInstance(), + context().dispatcher(), + sender()); + }) + .match(ShutdownEmitter.class, message -> { + LOGGER.info() + .setMessage("Shutting down emitters") + .addData("actor", self()) + .log(); + final CompletionStage host = Patterns.ask(_hostEmitter, + Emitter.Shutdown.getInstance(), + Duration.ofSeconds(30)); + final CompletionStage cluster = Patterns.ask(_clusterEmitter, + Emitter.Shutdown.getInstance(), + Duration.ofSeconds(30)); + final CompletableFuture allShutdown = CompletableFuture.allOf( + host.toCompletableFuture(), + cluster.toCompletableFuture()) + .thenApply(result -> ShutdownShardRegion.getInstance()); + Patterns.pipe(allShutdown, context().dispatcher()).to(self(), sender()); + + }) + .match(ShutdownShardRegion.class, message -> { + LOGGER.info() + .setMessage("Shutting down shard region") + .addData("actor", self()) + .log(); + context().watchWith(_shardRegion, new ShutdownShardRegionComplete(sender())); _shardRegion.tell(ShardRegion.gracefulShutdownInstance(), self()); }) - .match(Terminated.class, terminated -> { + .match(ShutdownShardRegionComplete.class, terminated -> { + terminated._replyTo.tell("OK", self()); _cluster.registerOnMemberRemoved(_system::terminate); _cluster.leave(_cluster.selfAddress()); }) + .matchAny(unhandled -> { + LOGGER.warn() + .setMessage("Received unhandled message") + .addData("message", unhandled) + .addData("actor", self()) + .log(); + }) .build(); } @@ -69,7 +143,12 @@ public void preStart() throws Exception { _system = context().system(); } - private ActorRef _shardRegion; + private final ActorRef _shardRegion; + private final ActorRef _hostEmitter; + private final ActorRef _clusterEmitter; + private final Routes _routes; + private final Duration _healthcheckShutdownDelay; + private final ActorRef _ingestActor; private Cluster _cluster; private ActorSystem _system; private static final Logger LOGGER = LoggerFactory.getLogger(GracefulShutdownActor.class); @@ -83,10 +162,38 @@ private Shutdown() {} * * @return a singleton instance */ - public static Shutdown instance() { + public static Shutdown getInstance() { return SHUTDOWN; } private static final Shutdown SHUTDOWN = new Shutdown(); } + private static final class ShutdownHealthcheck { + private ShutdownHealthcheck() {} + public static ShutdownHealthcheck getInstance() { + return INSTANCE; + } + private static final ShutdownHealthcheck INSTANCE = new ShutdownHealthcheck(); + } + private static final class ShutdownShardRegion { + private ShutdownShardRegion() {} + public static ShutdownShardRegion getInstance() { + return INSTANCE; + } + private static final ShutdownShardRegion INSTANCE = new ShutdownShardRegion(); + } + private static final class ShutdownEmitter { + private ShutdownEmitter() {} + public static ShutdownEmitter getInstance() { + return INSTANCE; + } + private static final ShutdownEmitter INSTANCE = new ShutdownEmitter(); + } + private static final class ShutdownShardRegionComplete { + ShutdownShardRegionComplete(final ActorRef replyTo) { + _replyTo = replyTo; + } + + private final ActorRef _replyTo; + } } diff --git a/src/main/java/com/arpnetworking/clusteraggregator/GuiceModule.java b/src/main/java/com/arpnetworking/clusteraggregator/GuiceModule.java index ab589643..10c588ee 100644 --- a/src/main/java/com/arpnetworking/clusteraggregator/GuiceModule.java +++ b/src/main/java/com/arpnetworking/clusteraggregator/GuiceModule.java @@ -207,7 +207,7 @@ private ActorRef provideHostEmitter(final Injector injector, final ActorSystem s private ActorRef launchEmitter(final Injector injector, final ActorSystem system, final File pipelineFile, final String name) { final ActorRef emitterConfigurationProxy = system.actorOf( - ConfigurableActorProxy.props(new RoundRobinEmitterFactory()), + ConfigurableActorProxy.props(new RoundRobinEmitterFactory(_shutdown)), name); final ActorConfigurator configurator = new ActorConfigurator<>(emitterConfigurationProxy, EmitterConfiguration.class); @@ -384,6 +384,13 @@ private boolean provideReaggregationInjectClusterAsHost(final ClusterAggregatorC return config.getReaggregationInjectClusterAsHost(); } + @Provides + @Named("healthcheck-shutdown-delay") + @SuppressFBWarnings("UPM_UNCALLED_PRIVATE_METHOD") // Invoked reflectively by Guice + private Duration provideHealthCheckShutdownDelay(final ClusterAggregatorConfiguration config) { + return config.getHealthcheckShutdownDelay(); + } + @Provides @Named("reaggregation-timeout") @SuppressFBWarnings("UPM_UNCALLED_PRIVATE_METHOD") // Invoked reflectively by Guice @@ -440,6 +447,11 @@ static List createSinks(final ImmutableList monitoringSinks) { private static final ObjectMapper OBJECT_MAPPER = ObjectMapperFactory.getInstance(); private static final class RoundRobinEmitterFactory implements ConfiguredLaunchableFactory { + /** + * Constructor. + */ + RoundRobinEmitterFactory(final LifecycleRegistration shutdown) { + } @Override public Props create(final EmitterConfiguration config) { diff --git a/src/main/java/com/arpnetworking/clusteraggregator/Main.java b/src/main/java/com/arpnetworking/clusteraggregator/Main.java index 833ab638..83bf4f8d 100644 --- a/src/main/java/com/arpnetworking/clusteraggregator/Main.java +++ b/src/main/java/com/arpnetworking/clusteraggregator/Main.java @@ -37,17 +37,19 @@ import org.apache.pekko.actor.ActorRef; import org.apache.pekko.actor.ActorSystem; import org.apache.pekko.http.javadsl.ServerBinding; +import org.apache.pekko.pattern.Patterns; import org.slf4j.LoggerFactory; -import scala.concurrent.Await; -import scala.concurrent.duration.Duration; import java.io.File; +import java.time.Duration; import java.util.List; import java.util.Locale; import java.util.Optional; import java.util.concurrent.CompletionStage; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; /** * Entry point for the pekko-based cluster aggregator. @@ -236,15 +238,21 @@ private void shutdownPekko() { LOGGER.info() .setMessage("Stopping Pekko") .log(); - if (_shutdownActor != null) { - _shutdownActor.tell(GracefulShutdownActor.Shutdown.instance(), ActorRef.noSender()); - } + try { + if (_shutdownActor != null) { + final CompletionStage gracefulShutdown = + Patterns.ask(_shutdownActor, GracefulShutdownActor.Shutdown.getInstance(), Duration.ofMinutes(10)); + gracefulShutdown.toCompletableFuture().join(); + LOGGER.info() + .setMessage("Graceful shutdown actor reported completion") + .log(); + } if (_system != null) { - Await.result(_system.whenTerminated(), SHUTDOWN_TIMEOUT); + _system.getWhenTerminated().toCompletableFuture().get(SHUTDOWN_TIMEOUT.toSeconds(), TimeUnit.SECONDS); } // CHECKSTYLE.OFF: IllegalCatch - Prevent program shutdown - } catch (final Exception e) { + } catch (final InterruptedException | TimeoutException | ExecutionException e) { // CHECKSTYLE.ON: IllegalCatch LOGGER.warn() .setMessage("Interrupted at shutdown") @@ -274,7 +282,7 @@ private static Builder getFileSourceBuilder( private volatile List _databases; private static final Logger LOGGER = com.arpnetworking.steno.LoggerFactory.getLogger(Main.class); - private static final Duration SHUTDOWN_TIMEOUT = Duration.create(3, TimeUnit.MINUTES); + private static final Duration SHUTDOWN_TIMEOUT = Duration.ofMinutes(3); private static final SourceTypeLiteral SOURCE_TYPE_LITERAL = new SourceTypeLiteral(); private static final Semaphore SHUTDOWN_SEMAPHORE = new Semaphore(0); private static final Thread SHUTDOWN_THREAD = new ShutdownThread(); diff --git a/src/main/java/com/arpnetworking/clusteraggregator/client/HttpSourceActor.java b/src/main/java/com/arpnetworking/clusteraggregator/client/HttpSourceActor.java index da6a6193..552d5dda 100644 --- a/src/main/java/com/arpnetworking/clusteraggregator/client/HttpSourceActor.java +++ b/src/main/java/com/arpnetworking/clusteraggregator/client/HttpSourceActor.java @@ -38,6 +38,7 @@ import com.google.inject.name.Named; import com.google.protobuf.GeneratedMessageV3; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import org.apache.http.HttpHeaders; import org.apache.pekko.Done; import org.apache.pekko.NotUsed; import org.apache.pekko.actor.AbstractActor; @@ -215,7 +216,12 @@ public Receive createReceive() { .run(_materializer) .whenComplete((done, err) -> { if (err == null) { - sender.tell(HttpResponse.create().withStatus(200), self()); + HttpResponse response = HttpResponse.create().withStatus(200); + if (_closeConnections) { + response = response.withHeaders( + Collections.singletonList(HttpHeader.parse(HttpHeaders.CONNECTION, "close"))); + } + sender.tell(response, self()); } else { if (err instanceof InvalidRecordsException) { BAD_REQUEST_LOGGER.debug() @@ -239,6 +245,9 @@ public Receive createReceive() { } }); }) + .match(Shutdown.class, message -> { + _closeConnections = true; + }) .build(); } @@ -274,7 +283,7 @@ private AggregationRequest parseRecords(final com.arpnetworking.clusteraggregato recordsBuilder.add(message); current = current.drop(message.getLength()); messageOptional = AggregationMessage.deserialize(current); - if (!messageOptional.isPresent() && current.lengthCompare(0) > 0) { + if (messageOptional.isEmpty() && current.lengthCompare(0) > 0) { throw new InvalidRecordsException( String.format("buffer did not deserialize completely, %d leftover bytes", current.size())); } @@ -378,6 +387,7 @@ private Optional buildPeriodicData(final Messages.StatisticSetReco private final Materializer _materializer; private final Sink> _sink; private final Graph, NotUsed> _processGraph; + private boolean _closeConnections = false; private static final StatisticFactory STATISTIC_FACTORY = new StatisticFactory(); private static final Logger BAD_REQUEST_LOGGER = @@ -398,5 +408,16 @@ private static class InvalidRecordsException extends IOException { private static final long serialVersionUID = 1L; } + + /** + * Message to initiate a graceful shutdown. + */ + public static final class Shutdown { + private Shutdown() {} + public static Shutdown getInstance() { + return INSTANCE; + } + private static final Shutdown INSTANCE = new Shutdown(); + } } diff --git a/src/main/java/com/arpnetworking/clusteraggregator/configuration/ClusterAggregatorConfiguration.java b/src/main/java/com/arpnetworking/clusteraggregator/configuration/ClusterAggregatorConfiguration.java index 1a144e4e..a1baebe1 100644 --- a/src/main/java/com/arpnetworking/clusteraggregator/configuration/ClusterAggregatorConfiguration.java +++ b/src/main/java/com/arpnetworking/clusteraggregator/configuration/ClusterAggregatorConfiguration.java @@ -160,6 +160,10 @@ public Duration getClusterStatusInterval() { return _clusterStatusInterval; } + public Duration getHealthcheckShutdownDelay() { + return _healthcheckShutdownDelay; + } + @Override public String toString() { return MoreObjects.toStringHelper(this) @@ -189,6 +193,7 @@ public String toString() { .add("RebalanceConfiguration", _rebalanceConfiguration) .add("ClusterHostSuffix", _clusterHostSuffix) .add("DatabaseConfigurations", _databaseConfigurations) + .add("HealthcheckShutdownDelay", _healthcheckShutdownDelay) .toString(); } @@ -220,6 +225,7 @@ private ClusterAggregatorConfiguration(final Builder builder) { _calculateClusterAggregations = builder._calculateClusterAggregations; _databaseConfigurations = Maps.newHashMap(builder._databaseConfigurations); _clusterStatusInterval = builder._clusterStatusInterval; + _healthcheckShutdownDelay = builder._healthcheckShutdownDelay; } private final String _monitoringCluster; @@ -249,6 +255,7 @@ private ClusterAggregatorConfiguration(final Builder builder) { private final boolean _calculateClusterAggregations; private final Map _databaseConfigurations; private final Duration _clusterStatusInterval; + private final Duration _healthcheckShutdownDelay; private static final ObjectMapper OBJECT_MAPPER = ObjectMapperFactory.getInstance(); @@ -591,6 +598,16 @@ public Builder setClusterStatusInterval(final Duration value) { return this; } + /** + * The time to wait for the healthcheck to shutdown. Optional. Defaults to 30 seconds. + * @param value The time to wait for the healthcheck to shutdown. + * @return This instance of {@link Builder}. + */ + public Builder setHealthcheckShutdownDelay(final Duration value) { + _healthcheckShutdownDelay = value; + return this; + } + @NotNull @NotEmpty private String _monitoringCluster; @@ -655,5 +672,7 @@ public Builder setClusterStatusInterval(final Duration value) { private Map _databaseConfigurations = Maps.newHashMap(); @NotNull private Duration _clusterStatusInterval = Duration.ofSeconds(10); + @NotNull + private Duration _healthcheckShutdownDelay = Duration.ofSeconds(30); } } diff --git a/src/main/java/com/arpnetworking/clusteraggregator/http/Routes.java b/src/main/java/com/arpnetworking/clusteraggregator/http/Routes.java index ded7968f..33a25cee 100644 --- a/src/main/java/com/arpnetworking/clusteraggregator/http/Routes.java +++ b/src/main/java/com/arpnetworking/clusteraggregator/http/Routes.java @@ -56,6 +56,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import java.util.stream.StreamSupport; import javax.annotation.Nullable; @@ -163,20 +164,39 @@ public CompletionStage apply(final HttpRequest request) { }); } + /** + * Start failing health checks in preparation for service shutdown. + */ + public void shutdownHealthcheck() { + _healthCheckShutdown.set(true); + } + private CompletionStage process(final HttpRequest request) { if (HttpMethods.GET.equals(request.method())) { if (_healthCheckPath.equals(request.getUri().path())) { - return ask("/user/status", new Status.HealthRequest(), Boolean.FALSE) - .thenApply( - isHealthy -> HttpResponse.create() - .withStatus(isHealthy ? StatusCodes.OK : StatusCodes.INTERNAL_SERVER_ERROR) - .addHeader(PING_CACHE_CONTROL_HEADER) - .withEntity( - JSON_CONTENT_TYPE, - ByteString.fromString( - "{\"status\":\"" - + (isHealthy ? HEALTHY_STATE : UNHEALTHY_STATE) - + "\"}"))); + if (!_healthCheckShutdown.get()) { + return ask("/user/status", new Status.HealthRequest(), Boolean.FALSE) + .thenApply( + isHealthy -> HttpResponse.create() + .withStatus(isHealthy ? StatusCodes.OK : StatusCodes.INTERNAL_SERVER_ERROR) + .addHeader(PING_CACHE_CONTROL_HEADER) + .withEntity( + JSON_CONTENT_TYPE, + ByteString.fromString( + "{\"status\":\"" + + (isHealthy ? HEALTHY_STATE : UNHEALTHY_STATE) + + "\"}"))); + } else { + return CompletableFuture.completedFuture(HttpResponse.create() + .withStatus(StatusCodes.INTERNAL_SERVER_ERROR) + .addHeader(PING_CACHE_CONTROL_HEADER) + .withEntity( + JSON_CONTENT_TYPE, + ByteString.fromString( + "{\"status\":\"" + + UNHEALTHY_STATE + + "\"}"))); + } } else if (_statusPath.equals(request.getUri().path())) { return ask("/user/status", new Status.StatusRequest(), (StatusResponse) null) .thenApply( @@ -264,6 +284,7 @@ private String createMetricName(final HttpRequest request, final int responseSta private final String _statusPath; private final String _versionPath; private final ObjectMapper _objectMapper; + private AtomicBoolean _healthCheckShutdown = new AtomicBoolean(false); private static final Logger LOGGER = LoggerFactory.getLogger(Routes.class); diff --git a/src/main/java/com/arpnetworking/tsdcore/sinks/DimensionFilteringSink.java b/src/main/java/com/arpnetworking/tsdcore/sinks/DimensionFilteringSink.java index 9dbf1dd4..15c40b3f 100644 --- a/src/main/java/com/arpnetworking/tsdcore/sinks/DimensionFilteringSink.java +++ b/src/main/java/com/arpnetworking/tsdcore/sinks/DimensionFilteringSink.java @@ -20,6 +20,7 @@ import net.sf.oval.constraint.NotNull; import java.util.Collections; +import java.util.concurrent.CompletionStage; /** * Filtering sink for excluding data based on dimensions present or absent. @@ -43,7 +44,12 @@ public void recordAggregateData(final PeriodicData data) { @Override public void close() { - // Nothing to do + _sink.close(); + } + + @Override + public CompletionStage shutdownGracefully() { + return _sink.shutdownGracefully(); } private DimensionFilteringSink(final Builder builder) { diff --git a/src/main/java/com/arpnetworking/tsdcore/sinks/DimensionInjectingSink.java b/src/main/java/com/arpnetworking/tsdcore/sinks/DimensionInjectingSink.java index e08621ca..e0202192 100644 --- a/src/main/java/com/arpnetworking/tsdcore/sinks/DimensionInjectingSink.java +++ b/src/main/java/com/arpnetworking/tsdcore/sinks/DimensionInjectingSink.java @@ -21,6 +21,7 @@ import net.sf.oval.constraint.NotNull; import java.util.Map; +import java.util.concurrent.CompletionStage; /** * Sink adds any specified dimensions. @@ -40,7 +41,12 @@ public void recordAggregateData(final PeriodicData data) { @Override public void close() { - // Nothing to do + _sink.close(); + } + + @Override + public CompletionStage shutdownGracefully() { + return _sink.shutdownGracefully(); } private DimensionInjectingSink(final Builder builder) { diff --git a/src/main/java/com/arpnetworking/tsdcore/sinks/DomainInjectingSink.java b/src/main/java/com/arpnetworking/tsdcore/sinks/DomainInjectingSink.java index 018e944f..039cb59f 100644 --- a/src/main/java/com/arpnetworking/tsdcore/sinks/DomainInjectingSink.java +++ b/src/main/java/com/arpnetworking/tsdcore/sinks/DomainInjectingSink.java @@ -21,6 +21,7 @@ import net.sf.oval.constraint.NotNull; import java.util.Optional; +import java.util.concurrent.CompletionStage; import javax.annotation.Nullable; /** @@ -49,7 +50,12 @@ public void recordAggregateData(final PeriodicData data) { @Override public void close() { - // Nothing to do + _sink.close(); + } + + @Override + public CompletionStage shutdownGracefully() { + return _sink.shutdownGracefully(); } private Optional getDomain(@Nullable final String host) { diff --git a/src/main/java/com/arpnetworking/tsdcore/sinks/HttpPostSink.java b/src/main/java/com/arpnetworking/tsdcore/sinks/HttpPostSink.java index 066bd378..83eccf39 100644 --- a/src/main/java/com/arpnetworking/tsdcore/sinks/HttpPostSink.java +++ b/src/main/java/com/arpnetworking/tsdcore/sinks/HttpPostSink.java @@ -33,10 +33,10 @@ import net.sf.oval.context.OValContext; import org.apache.pekko.actor.ActorRef; import org.apache.pekko.actor.ActorSystem; -import org.apache.pekko.actor.PoisonPill; import org.apache.pekko.http.javadsl.model.HttpMethods; import org.apache.pekko.http.javadsl.model.MediaTypes; import org.apache.pekko.http.javadsl.model.StatusCodes; +import org.apache.pekko.pattern.Patterns; import org.asynchttpclient.AsyncHttpClient; import org.asynchttpclient.AsyncHttpClientConfig; import org.asynchttpclient.DefaultAsyncHttpClient; @@ -50,6 +50,7 @@ import java.time.Duration; import java.util.Collection; import java.util.Optional; +import java.util.concurrent.CompletionStage; import java.util.function.Function; /** @@ -71,7 +72,17 @@ public void close() { .addData("sink", getName()) .addData("uri", _uri) .log(); - _sinkActor.tell(PoisonPill.getInstance(), ActorRef.noSender()); + _sinkActor.tell(HttpSinkActor.DrainAndShutdown.getInstance(), ActorRef.noSender()); + } + + @Override + public CompletionStage shutdownGracefully() { + LOGGER.info() + .setMessage("Closing sink") + .addData("sink", getName()) + .log(); + return Patterns.ask(_sinkActor, HttpSinkActor.DrainAndShutdown.getInstance(), Duration.ofSeconds(30)) + .thenApply(response -> null); } @LogValue diff --git a/src/main/java/com/arpnetworking/tsdcore/sinks/HttpSinkActor.java b/src/main/java/com/arpnetworking/tsdcore/sinks/HttpSinkActor.java index 077e1ddb..8f2d193c 100644 --- a/src/main/java/com/arpnetworking/tsdcore/sinks/HttpSinkActor.java +++ b/src/main/java/com/arpnetworking/tsdcore/sinks/HttpSinkActor.java @@ -72,12 +72,13 @@ public static Props props( final PeriodicMetrics periodicMetrics) { return Props.create( HttpSinkActor.class, - client, - sink, - maximumConcurrency, - maximumQueueSize, - spreadPeriod, - periodicMetrics); + () -> new HttpSinkActor( + client, + sink, + maximumConcurrency, + maximumQueueSize, + spreadPeriod, + periodicMetrics)); } /** @@ -249,6 +250,7 @@ public Receive createReceive() { _periodicMetrics.recordCounter(_pendingRequestsQueueSizeName, _pendingRequests.size()); _periodicMetrics.recordCounter(_inflightRequestsCountName, _inflightRequestsCount); }) + .match(DrainAndShutdown.class, this::processDrainMessage) .matchAny(message -> { LOGGER.error() .setMessage("Unexpected message") @@ -391,6 +393,34 @@ private void processEmitAggregation(final EmitAggregation emitMessage) { } } + private void processDrainMessage(final DrainAndShutdown message) { + if (_pendingRequests.isEmpty() && _inflightRequestsCount == 0) { + LOGGER.info() + .setMessage("Stopping actor") + .addContext("actor", self()) + .log(); + context().stop(self()); + sender().tell("OK", self()); + + } else { + LOGGER.info() + .setMessage("Waiting for pending requests to complete") + .addData("pendingRequests", _pendingRequests.size()) + .addData("inflightRequests", _inflightRequestsCount) + .addContext("actor", self()) + .log(); + + context().system() + .scheduler() + .scheduleOnce( + Duration.ofSeconds(1), + self(), + DrainAndShutdown.getInstance(), + context().dispatcher(), + sender()); + } + } + /** * Dispatches the number of pending requests needed to drain the pendingRequests queue or meet the maximum concurrency. */ @@ -663,4 +693,21 @@ private static final class SampleMetrics { private SampleMetrics() { } private static final SampleMetrics INSTANCE = new SampleMetrics(); } + + /** + * Message class to indicate that the actor should drain and shutdown. + */ + public static final class DrainAndShutdown { + private DrainAndShutdown() { } + + /** + * Get the singleton instance of this class. + * @return The singleton instance of this class. + */ + public static DrainAndShutdown getInstance() { + return DrainAndShutdown.INSTANCE; + } + + private static final DrainAndShutdown INSTANCE = new DrainAndShutdown(); + } } diff --git a/src/main/java/com/arpnetworking/tsdcore/sinks/MetricNameFilteringSink.java b/src/main/java/com/arpnetworking/tsdcore/sinks/MetricNameFilteringSink.java index 8ebe7c6a..ab2ae68e 100644 --- a/src/main/java/com/arpnetworking/tsdcore/sinks/MetricNameFilteringSink.java +++ b/src/main/java/com/arpnetworking/tsdcore/sinks/MetricNameFilteringSink.java @@ -29,6 +29,7 @@ import java.util.Collections; import java.util.List; +import java.util.concurrent.CompletionStage; import java.util.regex.Pattern; /** @@ -75,6 +76,11 @@ public void close() { _sink.close(); } + @Override + public CompletionStage shutdownGracefully() { + return _sink.shutdownGracefully(); + } + /** * Generate a Steno log compatible representation. * diff --git a/src/main/java/com/arpnetworking/tsdcore/sinks/MultiSink.java b/src/main/java/com/arpnetworking/tsdcore/sinks/MultiSink.java index 716ed48e..1624f3a5 100644 --- a/src/main/java/com/arpnetworking/tsdcore/sinks/MultiSink.java +++ b/src/main/java/com/arpnetworking/tsdcore/sinks/MultiSink.java @@ -24,6 +24,9 @@ import net.sf.oval.constraint.NotNull; import java.util.Collection; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; /** * A publisher that wraps multiple others and publishes to all of them. This @@ -60,6 +63,17 @@ public void close() { } } + @Override + @SuppressWarnings({"unchecked", "rawtypes"}) + public CompletionStage shutdownGracefully() { + final List> futures = _sinks.stream() + .map(Sink::shutdownGracefully) + .map(CompletionStage::toCompletableFuture) + .toList(); + + return CompletableFuture.allOf(futures.>toArray(new CompletableFuture[0])); + } + /** * Generate a Steno log compatible representation. * diff --git a/src/main/java/com/arpnetworking/tsdcore/sinks/PeriodFilteringSink.java b/src/main/java/com/arpnetworking/tsdcore/sinks/PeriodFilteringSink.java index 6b67baa6..5403974a 100644 --- a/src/main/java/com/arpnetworking/tsdcore/sinks/PeriodFilteringSink.java +++ b/src/main/java/com/arpnetworking/tsdcore/sinks/PeriodFilteringSink.java @@ -28,6 +28,7 @@ import java.util.Collections; import java.util.Optional; import java.util.Set; +import java.util.concurrent.CompletionStage; /** * A publisher that wraps another, filters the metrics with specific periods, @@ -50,6 +51,11 @@ public void close() { _sink.close(); } + @Override + public CompletionStage shutdownGracefully() { + return _sink.shutdownGracefully(); + } + /** * Generate a Steno log compatible representation. * diff --git a/src/main/java/com/arpnetworking/tsdcore/sinks/PeriodicStatisticsSink.java b/src/main/java/com/arpnetworking/tsdcore/sinks/PeriodicStatisticsSink.java index f2015fe3..af9f8f24 100644 --- a/src/main/java/com/arpnetworking/tsdcore/sinks/PeriodicStatisticsSink.java +++ b/src/main/java/com/arpnetworking/tsdcore/sinks/PeriodicStatisticsSink.java @@ -45,6 +45,8 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.Executors; @@ -112,6 +114,21 @@ public void close() { flushMetrics(); } + @Override + public CompletionStage shutdownGracefully() { + _executor.shutdown(); + return CompletableFuture.runAsync(() -> { + try { + _executor.awaitTermination(EXECUTOR_TIMEOUT_IN_SECONDS, TimeUnit.SECONDS); + this.flushMetrics(); + } catch (final InterruptedException e) { + Thread.interrupted(); + throw new RuntimeException(e); + } + }); + + } + @LogValue @Override public Object toLogValue() { diff --git a/src/main/java/com/arpnetworking/tsdcore/sinks/RandomMetricNameFilterSink.java b/src/main/java/com/arpnetworking/tsdcore/sinks/RandomMetricNameFilterSink.java index f336ed5b..d775513a 100644 --- a/src/main/java/com/arpnetworking/tsdcore/sinks/RandomMetricNameFilterSink.java +++ b/src/main/java/com/arpnetworking/tsdcore/sinks/RandomMetricNameFilterSink.java @@ -24,6 +24,8 @@ import net.sf.oval.constraint.Min; import net.sf.oval.constraint.NotNull; +import java.util.concurrent.CompletionStage; + /** * A {@link com.arpnetworking.tsdcore.sinks.Sink} that only allows a percentage of data through to the wrapped * {@link com.arpnetworking.tsdcore.sinks.Sink}. @@ -51,6 +53,11 @@ public void close() { _sink.close(); } + @Override + public CompletionStage shutdownGracefully() { + return _sink.shutdownGracefully(); + } + /** * Generate a Steno log compatible representation. * diff --git a/src/main/java/com/arpnetworking/tsdcore/sinks/RrdSink.java b/src/main/java/com/arpnetworking/tsdcore/sinks/RrdSink.java index 9f863c8e..94eb314c 100644 --- a/src/main/java/com/arpnetworking/tsdcore/sinks/RrdSink.java +++ b/src/main/java/com/arpnetworking/tsdcore/sinks/RrdSink.java @@ -33,6 +33,8 @@ import java.io.InputStreamReader; import java.time.Duration; import java.util.HashMap; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; /** * RRD publisher that maintains all the rrd databases for a cluster. This class @@ -75,6 +77,11 @@ public void recordAggregateData(final PeriodicData periodicData) { @Override public void close() {} + @Override + public CompletionStage shutdownGracefully() { + return CompletableFuture.completedFuture(null); + } + /** * Generate a Steno log compatible representation. * diff --git a/src/main/java/com/arpnetworking/tsdcore/sinks/ServiceNameFilteringSink.java b/src/main/java/com/arpnetworking/tsdcore/sinks/ServiceNameFilteringSink.java index b5bb3725..f323075d 100644 --- a/src/main/java/com/arpnetworking/tsdcore/sinks/ServiceNameFilteringSink.java +++ b/src/main/java/com/arpnetworking/tsdcore/sinks/ServiceNameFilteringSink.java @@ -29,6 +29,7 @@ import java.util.Collections; import java.util.List; +import java.util.concurrent.CompletionStage; import java.util.regex.Pattern; /** @@ -76,6 +77,11 @@ public void close() { _sink.close(); } + @Override + public CompletionStage shutdownGracefully() { + return _sink.shutdownGracefully(); + } + /** * Generate a Steno log compatible representation. * diff --git a/src/main/java/com/arpnetworking/tsdcore/sinks/Sink.java b/src/main/java/com/arpnetworking/tsdcore/sinks/Sink.java index 84884634..d8ca8d24 100644 --- a/src/main/java/com/arpnetworking/tsdcore/sinks/Sink.java +++ b/src/main/java/com/arpnetworking/tsdcore/sinks/Sink.java @@ -18,6 +18,8 @@ import com.arpnetworking.tsdcore.model.PeriodicData; import com.fasterxml.jackson.annotation.JsonTypeInfo; +import java.util.concurrent.CompletionStage; + /** * Interface to describe a class that publishes {@link PeriodicData}. * @@ -42,4 +44,12 @@ public interface Sink { * recordAggregation will be made after a call to close. */ void close(); + + /** + * Called to allow the publisher to clean-up. No further calls to + * recordAggregation will be made after a call to close. + * + * @return A {@link CompletionStage} that completes when the sink is closed. + */ + CompletionStage shutdownGracefully(); } diff --git a/src/main/java/com/arpnetworking/tsdcore/sinks/TcpSink.java b/src/main/java/com/arpnetworking/tsdcore/sinks/TcpSink.java index 1ad5ae9e..31d8215f 100644 --- a/src/main/java/com/arpnetworking/tsdcore/sinks/TcpSink.java +++ b/src/main/java/com/arpnetworking/tsdcore/sinks/TcpSink.java @@ -28,9 +28,11 @@ import org.apache.pekko.actor.ActorRef; import org.apache.pekko.actor.ActorSystem; import org.apache.pekko.actor.PoisonPill; +import org.apache.pekko.pattern.Patterns; import org.apache.pekko.util.ByteString; import java.time.Duration; +import java.util.concurrent.CompletionStage; import java.util.function.Function; /** @@ -53,6 +55,16 @@ public void close() { _sinkActor.tell(PoisonPill.getInstance(), ActorRef.noSender()); } + @Override + public CompletionStage shutdownGracefully() { + LOGGER.info() + .setMessage("Closing sink") + .addData("sink", getName()) + .log(); + return Patterns.ask(_sinkActor, TcpSinkActor.DrainAndShutdown.getInstance(), Duration.ofSeconds(30)) + .thenApply(response -> null); + } + /** * {@inheritDoc} */ diff --git a/src/main/java/com/arpnetworking/tsdcore/sinks/TcpSinkActor.java b/src/main/java/com/arpnetworking/tsdcore/sinks/TcpSinkActor.java index 8e1bbe17..d662f5a2 100644 --- a/src/main/java/com/arpnetworking/tsdcore/sinks/TcpSinkActor.java +++ b/src/main/java/com/arpnetworking/tsdcore/sinks/TcpSinkActor.java @@ -177,6 +177,28 @@ public Receive createReceive() { connect(); }) .match(Connect.class, connect -> connect()) + .match(TcpSinkActor.DrainAndShutdown.class, message -> { + if (_pendingRequests.isEmpty()) { + LOGGER.info() + .setMessage("Stopping actor") + .addContext("actor", self()) + .log(); + context().stop(self()); + sender().tell("OK", self()); + } else { + LOGGER.info() + .setMessage("Waiting for pending requests to complete") + .addData("pendingRequests", _pendingRequests.size()) + .addContext("actor", self()) + .log(); + context().system().scheduler().scheduleOnce( + Duration.ofSeconds(1), + self(), + TcpSinkActor.DrainAndShutdown.getInstance(), + context().dispatcher(), + sender()); + } + }) .build(); } @@ -293,4 +315,21 @@ public PeriodicData getData() { private final PeriodicData _data; } + + /** + * Message class to drain the queue and shutdown the actor. + */ + public static final class DrainAndShutdown { + private DrainAndShutdown() { } + /** + * Get the singleton instance. + * + * @return The singleton instance. + */ + public static TcpSinkActor.DrainAndShutdown getInstance() { + return TcpSinkActor.DrainAndShutdown.INSTANCE; + } + + private static final TcpSinkActor.DrainAndShutdown INSTANCE = new TcpSinkActor.DrainAndShutdown(); + } } diff --git a/src/main/java/com/arpnetworking/tsdcore/sinks/TimeThresholdSink.java b/src/main/java/com/arpnetworking/tsdcore/sinks/TimeThresholdSink.java index 997d57ff..0905ac10 100644 --- a/src/main/java/com/arpnetworking/tsdcore/sinks/TimeThresholdSink.java +++ b/src/main/java/com/arpnetworking/tsdcore/sinks/TimeThresholdSink.java @@ -29,6 +29,7 @@ import java.time.ZonedDateTime; import java.util.Collections; import java.util.Set; +import java.util.concurrent.CompletionStage; import java.util.function.Consumer; /** @@ -69,6 +70,11 @@ public void close() { _sink.close(); } + @Override + public CompletionStage shutdownGracefully() { + return _sink.shutdownGracefully(); + } + /** * Generate a Steno log compatible representation. * diff --git a/src/main/java/com/arpnetworking/tsdcore/sinks/UnitMappingSink.java b/src/main/java/com/arpnetworking/tsdcore/sinks/UnitMappingSink.java index aa6833e1..d240e74a 100644 --- a/src/main/java/com/arpnetworking/tsdcore/sinks/UnitMappingSink.java +++ b/src/main/java/com/arpnetworking/tsdcore/sinks/UnitMappingSink.java @@ -29,6 +29,7 @@ import java.io.Serializable; import java.util.Map; +import java.util.concurrent.CompletionStage; /** * Implementation of {@link Sink} which maps values in one unit to another. @@ -83,6 +84,11 @@ public void close() { _sink.close(); } + @Override + public CompletionStage shutdownGracefully() { + return _sink.shutdownGracefully(); + } + /** * Generate a Steno log compatible representation. * diff --git a/src/main/java/com/arpnetworking/tsdcore/sinks/circonus/CirconusSink.java b/src/main/java/com/arpnetworking/tsdcore/sinks/circonus/CirconusSink.java index 63e5bd01..2adade37 100644 --- a/src/main/java/com/arpnetworking/tsdcore/sinks/circonus/CirconusSink.java +++ b/src/main/java/com/arpnetworking/tsdcore/sinks/circonus/CirconusSink.java @@ -32,11 +32,13 @@ import org.apache.pekko.actor.ActorRef; import org.apache.pekko.actor.ActorSystem; import org.apache.pekko.actor.PoisonPill; +import org.apache.pekko.pattern.Patterns; import org.apache.pekko.stream.Materializer; import java.net.URI; import java.time.Duration; import java.util.Collection; +import java.util.concurrent.CompletionStage; import java.util.stream.Collectors; /** @@ -72,6 +74,12 @@ public void close() { _sinkActor.tell(PoisonPill.getInstance(), ActorRef.noSender()); } + @Override + public CompletionStage shutdownGracefully() { + return Patterns.gracefulStop(_sinkActor, Duration.ofSeconds(30)) + .thenApply(result -> null); + } + /** * Generate a Steno log compatible representation. * diff --git a/src/test/java/com/arpnetworking/clusteraggregator/EmitterTest.java b/src/test/java/com/arpnetworking/clusteraggregator/EmitterTest.java index bc54d38e..401e9b2d 100644 --- a/src/test/java/com/arpnetworking/clusteraggregator/EmitterTest.java +++ b/src/test/java/com/arpnetworking/clusteraggregator/EmitterTest.java @@ -108,6 +108,7 @@ public void doesNotSwallowUnhandled() { private EmitterConfiguration _config = null; @Mock private Sink _sink = null; + private LifecycleRegistration _shutdown = new AppShutdown(); private static final StatisticFactory STATISTIC_FACTORY = new StatisticFactory(); private static final Statistic MEDIAN_STATISTIC = STATISTIC_FACTORY.getStatistic("median"); diff --git a/src/test/java/com/arpnetworking/tsdcore/sinks/BaseSinkTest.java b/src/test/java/com/arpnetworking/tsdcore/sinks/BaseSinkTest.java index f92e8bb9..dbcc79e0 100644 --- a/src/test/java/com/arpnetworking/tsdcore/sinks/BaseSinkTest.java +++ b/src/test/java/com/arpnetworking/tsdcore/sinks/BaseSinkTest.java @@ -19,6 +19,9 @@ import org.junit.Assert; import org.junit.Test; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; + /** * Tests for the {@link BaseSink} class. * @@ -55,6 +58,11 @@ public void close() { // Nothing to do } + @Override + public CompletionStage shutdownGracefully() { + return CompletableFuture.completedFuture(null); + } + private TestAggregatedDataSink(final Builder builder) { super(builder); }