Skip to content

Commit

Permalink
clean up status actor
Browse files Browse the repository at this point in the history
  • Loading branch information
BrandonArp committed Jan 9, 2017
1 parent 1dd9350 commit 0788e13
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 178 deletions.
110 changes: 48 additions & 62 deletions src/main/java/com/arpnetworking/clusteraggregator/Status.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,25 +21,22 @@
import akka.actor.UntypedActor;
import akka.cluster.Cluster;
import akka.cluster.MemberStatus;
import akka.dispatch.OnComplete;
import akka.dispatch.Recover;
import akka.pattern.Patterns;
import akka.pattern.PatternsCS;
import akka.remote.AssociationErrorEvent;
import akka.util.Timeout;
import com.arpnetworking.clusteraggregator.models.BookkeeperData;
import com.arpnetworking.clusteraggregator.models.MetricsRequest;
import com.arpnetworking.clusteraggregator.models.PeriodMetrics;
import com.arpnetworking.clusteraggregator.models.StatusResponse;
import com.arpnetworking.utility.CastMapper;
import com.arpnetworking.utility.CollectFutureBuilder;
import org.joda.time.Period;
import scala.concurrent.ExecutionContextExecutor;
import scala.concurrent.Future;
import scala.runtime.AbstractFunction0;
import scala.util.Failure;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

/**
* Periodically polls the cluster status and caches the result.
Expand Down Expand Up @@ -108,80 +105,69 @@ public void onReceive(final Object message) throws Exception {
_quarantined = true;
}
} else if (message instanceof HealthRequest) {
final ExecutionContextExecutor executor = getContext().dispatcher();
final Future<ClusterStatusCache.StatusResponse> stateFuture = Patterns
final CompletionStage<ClusterStatusCache.StatusResponse> stateFuture = PatternsCS
.ask(
_clusterStatusCache,
new ClusterStatusCache.GetRequest(),
Timeout.apply(3, TimeUnit.SECONDS))
.map(CAST_MAPPER, executor);
stateFuture.onComplete(
new OnComplete<ClusterStatusCache.StatusResponse>() {
@Override
public void onComplete(final Throwable failure, final ClusterStatusCache.StatusResponse success) {
final boolean healthy = _cluster.readView().self().status() == MemberStatus.up() && !_quarantined;
sender.tell(healthy, getSelf());
}
},
executor);
.thenApply(CAST_MAPPER);
stateFuture.whenComplete(
(statusResponse, throwable) -> {
final boolean healthy = _cluster.readView().self().status() == MemberStatus.up() && !_quarantined;
sender.tell(healthy, getSelf());
});
} else {
unhandled(message);
}
}

private void processStatusRequest(final ActorRef sender) {
final ExecutionContextExecutor executor = getContext().dispatcher();
// Call the bookkeeper
final Future<BookkeeperData> bookkeeperFuture = Patterns.ask(
final CompletableFuture<BookkeeperData> bookkeeperFuture = PatternsCS.ask(
_metricsBookkeeper,
new MetricsRequest(),
Timeout.apply(3, TimeUnit.SECONDS))
.map(new CastMapper<>(), executor)
.recover(new AsNullRecovery<>(), executor);
final Future<ClusterStatusCache.StatusResponse> clusterStateFuture =
Patterns.ask(
.thenApply(new CastMapper<BookkeeperData>())
.exceptionally(new AsNullRecovery<>())
.toCompletableFuture();

final CompletableFuture<ClusterStatusCache.StatusResponse> clusterStateFuture =
PatternsCS.ask(
_clusterStatusCache,
new ClusterStatusCache.GetRequest(),
Timeout.apply(3, TimeUnit.SECONDS))
.map(CAST_MAPPER, executor)
.recover(new AsNullRecovery<>(), executor);
.thenApply(CAST_MAPPER)
.exceptionally(new AsNullRecovery<>())
.toCompletableFuture();

final Future<Map<Period, PeriodMetrics>> localMetricsFuture =
Patterns.ask(
final CompletableFuture<Map<Period, PeriodMetrics>> localMetricsFuture =
PatternsCS.ask(
_localMetrics,
new MetricsRequest(),
Timeout.apply(3, TimeUnit.SECONDS))
.map(new CastMapper<>(), executor)
.recover(new AsNullRecovery<>(), executor);

final Future<StatusResponse> future = new CollectFutureBuilder<StatusResponse>()
.addFuture(bookkeeperFuture)
.addFuture(clusterStateFuture)
.addFuture(localMetricsFuture)
.map(new AbstractFunction0<StatusResponse>() {
@Override
public StatusResponse apply() {
return new StatusResponse.Builder()
.setClusterMetrics(bookkeeperFuture.value().get().get())
.setClusterState(clusterStateFuture.value().get().get())
.setLocalMetrics(localMetricsFuture.value().get().get())
.thenApply(new CastMapper<Map<Period, PeriodMetrics>>())
.exceptionally(new AsNullRecovery<>())
.toCompletableFuture();

CompletableFuture.allOf(
bookkeeperFuture,
clusterStateFuture,
localMetricsFuture)
.thenApply(
(v) -> new StatusResponse.Builder()
.setClusterMetrics(bookkeeperFuture.getNow(null))
.setClusterState(clusterStateFuture.getNow(null))
.setLocalMetrics(localMetricsFuture.getNow(null))
.setLocalAddress(_cluster.selfAddress())
.build();
}
})
.build(executor);
future.onComplete(
new OnComplete<StatusResponse>() {
@Override
public void onComplete(final Throwable failure, final StatusResponse success) {
if (failure != null) {
sender.tell(new Failure<StatusResponse>(failure), getSelf());
} else {
sender.tell(success, getSelf());
}
}
},
executor);
.build())
.whenComplete(
(result, failure) -> {
if (failure != null) {
sender.tell(new Failure<StatusResponse>(failure), getSelf());
} else {
sender.tell(result, getSelf());
}
});
}

private boolean _quarantined = false;
Expand All @@ -191,11 +177,11 @@ public void onComplete(final Throwable failure, final StatusResponse success) {
private final ActorRef _clusterStatusCache;
private final ActorRef _localMetrics;

private static final CastMapper<Object, ClusterStatusCache.StatusResponse> CAST_MAPPER = new CastMapper<>();
private static final CastMapper<ClusterStatusCache.StatusResponse> CAST_MAPPER = new CastMapper<>();

private static class AsNullRecovery<T> extends Recover<T> {
private static class AsNullRecovery<T> implements Function<Throwable, T> {
@Override
public T recover(final Throwable failure) {
public T apply(final Throwable failure) {
return null;
}
}
Expand Down
7 changes: 3 additions & 4 deletions src/main/java/com/arpnetworking/utility/CastMapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,21 @@

package com.arpnetworking.utility;

import akka.dispatch.Mapper;
import java.util.function.Function;

/**
* Map method that just casts to another class.
*
* @param <T> Input type
* @param <R> Output type
* @author Brandon Arp (brandonarp at gmail dot com)
*/
public class CastMapper<T, R> extends Mapper<T, R> {
public class CastMapper<R> implements Function<Object, R> {
/**
* {@inheritDoc}
*/
@Override
@SuppressWarnings("unchecked")
public R apply(final T parameter) {
public R apply(final Object parameter) {
return (R) parameter;
}
}
Expand Down
112 changes: 0 additions & 112 deletions src/main/java/com/arpnetworking/utility/CollectFutureBuilder.java

This file was deleted.

0 comments on commit 0788e13

Please sign in to comment.