diff --git a/src/main/java/com/arpnetworking/clusteraggregator/Status.java b/src/main/java/com/arpnetworking/clusteraggregator/Status.java index 3685be58..5abd3ad8 100644 --- a/src/main/java/com/arpnetworking/clusteraggregator/Status.java +++ b/src/main/java/com/arpnetworking/clusteraggregator/Status.java @@ -21,9 +21,7 @@ import akka.actor.UntypedActor; import akka.cluster.Cluster; import akka.cluster.MemberStatus; -import akka.dispatch.OnComplete; -import akka.dispatch.Recover; -import akka.pattern.Patterns; +import akka.pattern.PatternsCS; import akka.remote.AssociationErrorEvent; import akka.util.Timeout; import com.arpnetworking.clusteraggregator.models.BookkeeperData; @@ -31,15 +29,14 @@ import com.arpnetworking.clusteraggregator.models.PeriodMetrics; import com.arpnetworking.clusteraggregator.models.StatusResponse; import com.arpnetworking.utility.CastMapper; -import com.arpnetworking.utility.CollectFutureBuilder; import org.joda.time.Period; -import scala.concurrent.ExecutionContextExecutor; -import scala.concurrent.Future; -import scala.runtime.AbstractFunction0; import scala.util.Failure; import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; +import java.util.function.Function; /** * Periodically polls the cluster status and caches the result. @@ -108,80 +105,69 @@ public void onReceive(final Object message) throws Exception { _quarantined = true; } } else if (message instanceof HealthRequest) { - final ExecutionContextExecutor executor = getContext().dispatcher(); - final Future stateFuture = Patterns + final CompletionStage stateFuture = PatternsCS .ask( _clusterStatusCache, new ClusterStatusCache.GetRequest(), Timeout.apply(3, TimeUnit.SECONDS)) - .map(CAST_MAPPER, executor); - stateFuture.onComplete( - new OnComplete() { - @Override - public void onComplete(final Throwable failure, final ClusterStatusCache.StatusResponse success) { - final boolean healthy = _cluster.readView().self().status() == MemberStatus.up() && !_quarantined; - sender.tell(healthy, getSelf()); - } - }, - executor); + .thenApply(CAST_MAPPER); + stateFuture.whenComplete( + (statusResponse, throwable) -> { + final boolean healthy = _cluster.readView().self().status() == MemberStatus.up() && !_quarantined; + sender.tell(healthy, getSelf()); + }); } else { unhandled(message); } } private void processStatusRequest(final ActorRef sender) { - final ExecutionContextExecutor executor = getContext().dispatcher(); // Call the bookkeeper - final Future bookkeeperFuture = Patterns.ask( + final CompletableFuture bookkeeperFuture = PatternsCS.ask( _metricsBookkeeper, new MetricsRequest(), Timeout.apply(3, TimeUnit.SECONDS)) - .map(new CastMapper<>(), executor) - .recover(new AsNullRecovery<>(), executor); - final Future clusterStateFuture = - Patterns.ask( + .thenApply(new CastMapper()) + .exceptionally(new AsNullRecovery<>()) + .toCompletableFuture(); + + final CompletableFuture clusterStateFuture = + PatternsCS.ask( _clusterStatusCache, new ClusterStatusCache.GetRequest(), Timeout.apply(3, TimeUnit.SECONDS)) - .map(CAST_MAPPER, executor) - .recover(new AsNullRecovery<>(), executor); + .thenApply(CAST_MAPPER) + .exceptionally(new AsNullRecovery<>()) + .toCompletableFuture(); - final Future> localMetricsFuture = - Patterns.ask( + final CompletableFuture> localMetricsFuture = + PatternsCS.ask( _localMetrics, new MetricsRequest(), Timeout.apply(3, TimeUnit.SECONDS)) - .map(new CastMapper<>(), executor) - .recover(new AsNullRecovery<>(), executor); - - final Future future = new CollectFutureBuilder() - .addFuture(bookkeeperFuture) - .addFuture(clusterStateFuture) - .addFuture(localMetricsFuture) - .map(new AbstractFunction0() { - @Override - public StatusResponse apply() { - return new StatusResponse.Builder() - .setClusterMetrics(bookkeeperFuture.value().get().get()) - .setClusterState(clusterStateFuture.value().get().get()) - .setLocalMetrics(localMetricsFuture.value().get().get()) + .thenApply(new CastMapper>()) + .exceptionally(new AsNullRecovery<>()) + .toCompletableFuture(); + + CompletableFuture.allOf( + bookkeeperFuture, + clusterStateFuture, + localMetricsFuture) + .thenApply( + (v) -> new StatusResponse.Builder() + .setClusterMetrics(bookkeeperFuture.getNow(null)) + .setClusterState(clusterStateFuture.getNow(null)) + .setLocalMetrics(localMetricsFuture.getNow(null)) .setLocalAddress(_cluster.selfAddress()) - .build(); - } - }) - .build(executor); - future.onComplete( - new OnComplete() { - @Override - public void onComplete(final Throwable failure, final StatusResponse success) { - if (failure != null) { - sender.tell(new Failure(failure), getSelf()); - } else { - sender.tell(success, getSelf()); - } - } - }, - executor); + .build()) + .whenComplete( + (result, failure) -> { + if (failure != null) { + sender.tell(new Failure(failure), getSelf()); + } else { + sender.tell(result, getSelf()); + } + }); } private boolean _quarantined = false; @@ -191,11 +177,11 @@ public void onComplete(final Throwable failure, final StatusResponse success) { private final ActorRef _clusterStatusCache; private final ActorRef _localMetrics; - private static final CastMapper CAST_MAPPER = new CastMapper<>(); + private static final CastMapper CAST_MAPPER = new CastMapper<>(); - private static class AsNullRecovery extends Recover { + private static class AsNullRecovery implements Function { @Override - public T recover(final Throwable failure) { + public T apply(final Throwable failure) { return null; } } diff --git a/src/main/java/com/arpnetworking/utility/CastMapper.java b/src/main/java/com/arpnetworking/utility/CastMapper.java index 9bf2036e..cf61379e 100644 --- a/src/main/java/com/arpnetworking/utility/CastMapper.java +++ b/src/main/java/com/arpnetworking/utility/CastMapper.java @@ -16,22 +16,21 @@ package com.arpnetworking.utility; -import akka.dispatch.Mapper; +import java.util.function.Function; /** * Map method that just casts to another class. * - * @param Input type * @param Output type * @author Brandon Arp (brandonarp at gmail dot com) */ -public class CastMapper extends Mapper { +public class CastMapper implements Function { /** * {@inheritDoc} */ @Override @SuppressWarnings("unchecked") - public R apply(final T parameter) { + public R apply(final Object parameter) { return (R) parameter; } } diff --git a/src/main/java/com/arpnetworking/utility/CollectFutureBuilder.java b/src/main/java/com/arpnetworking/utility/CollectFutureBuilder.java deleted file mode 100644 index 42379e68..00000000 --- a/src/main/java/com/arpnetworking/utility/CollectFutureBuilder.java +++ /dev/null @@ -1,112 +0,0 @@ -/** - * Copyright 2014 Groupon.com - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.arpnetworking.utility; - -import akka.dispatch.Futures; -import akka.dispatch.OnComplete; -import com.google.common.collect.Lists; -import scala.Function0; -import scala.concurrent.ExecutionContext; -import scala.concurrent.Future; -import scala.concurrent.Promise; -import scala.runtime.AbstractFunction0; - -import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * Collects futures and provides them in a combined promise. - * - * @param Return future type - * @author Brandon Arp (brandonarp at gmail dot com) - */ -public final class CollectFutureBuilder { - /** - * Method to create a {@code } from the completed {@link scala.concurrent.Future}s. - * - * @param callback Callback function - * @return this builder - */ - public CollectFutureBuilder map(final Function0 callback) { - _callback = callback; - return this; - } - - /** - * Registers a {@link scala.concurrent.Future} in the collection. A future must be registered in order - * to be waited on. - * - * @param future Future to register - * @return this builder - */ - public CollectFutureBuilder addFuture(final Future future) { - _futures.add(future); - return this; - } - - /** - * Sets the list of {@link scala.concurrent.Future}s to wait on. - * - * @param futures The list of futures - * @return this builder - */ - public CollectFutureBuilder setFutures(final List> futures) { - _futures.clear(); - _futures.addAll(futures); - return this; - } - - /** - * Builds the final future. - * - * @param context context to execute the futures on - * @return the new future - */ - @SuppressWarnings("unchecked") - public Future build(final ExecutionContext context) { - final Promise result = Futures.promise(); - final AtomicInteger latch = new AtomicInteger(_futures.size()); - - final OnComplete onComplete = new OnComplete() { - @Override - public void onComplete(final Throwable failure, final Object success) { - if (failure != null) { - result.failure(failure); - } - final int count = latch.decrementAndGet(); - if (count == 0) { - result.success(_callback.apply()); - } - } - }; - - for (final Future future : _futures) { - ((Future) future).onComplete(onComplete, context); - } - - return result.future(); - } - - private Function0 _callback = new AbstractFunction0() { - @Override - public T apply() { - return null; - } - }; - - private final List> _futures = Lists.newArrayList(); -} -