Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resilience4j integration #68

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ allprojects {
maven { // The google mirror is less flaky than mavenCentral()
url "https://maven-central.storage-download.googleapis.com/maven2/"
}
maven {
url "https://clojars.org/repo"
}
mavenCentral()
mavenLocal()
}
Expand Down
1 change: 1 addition & 0 deletions core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ dependencies {
implementation 'io.prometheus:prometheus-metrics-core:1.2.0'
implementation 'io.prometheus:prometheus-metrics-instrumentation-dropwizard5:1.2.0'
implementation 'io.prometheus:prometheus-metrics-exporter-common:1.2.0'
implementation 'com.flipkart.resilience4all:resilience4j-metrics-event-stream:0.0.1'
implementation libraries.grpc_stub
implementation libraries.grpc_services
implementation libraries.lombok
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,5 +71,11 @@ public enum Completion {
*/
String concurrencyConfig() default "";


/**
* Resilience4j properties configured as a Config property
*/
String resilience4jConfig() default "taskProperties.default";

}

Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,14 @@ public class FutureDecorator<T> implements Future<T> {
/** The original Future decorated by this Future*/
private final Future<T> origin;
/** The TaskExecutor producing the Future*/
private final TaskExecutor<T> taskExecutor;
private final FutureProvider<T> taskExecutor;

/** The BiConsumer to callback on completion*/
private BiConsumer<T, Throwable> completionConsumer;

public FutureDecorator(TaskExecutor<T> taskExecutor, ConcurrentTask.Completion completion) {
public FutureDecorator(FutureProvider<T> taskExecutor, ConcurrentTask.Completion completion) {
this.taskExecutor = taskExecutor;
this.origin = taskExecutor.queue();
this.origin = taskExecutor.getFuture();
this.completion = completion;
}

Expand Down Expand Up @@ -95,7 +95,7 @@ public T get(long timeout, TimeUnit unit) throws InterruptedException, Execution
public ConcurrentTask.Completion getCompletion() {
return completion;
}
public TaskExecutor<T> getTaskExecutor() {
public FutureProvider<T> getTaskExecutor() {
return taskExecutor;
}

Expand Down Expand Up @@ -175,7 +175,7 @@ private static Object getResultFromFuture(FutureDecorator future) {
} catch (TimeoutException e) {
if (future.getTaskExecutor().isWithRequestHedging() && !Context.current().getDeadline().isExpired()) {
// we will reschedule the execution i.e. hedge the request and return the result
LOGGER.info("Sending hedged request for Task : " + future.getTaskExecutor().getInvocation().getMethod().getName());
LOGGER.info("Sending hedged request for Task : " + future.getTaskExecutor().getName());
result = FutureDecorator.getResultFromFuture(new FutureDecorator(future.getTaskExecutor().clone(), future.getCompletion()));
}
} catch (InterruptedException | ExecutionException e) {
Expand Down
15 changes: 15 additions & 0 deletions core/src/main/java/com/flipkart/gjex/core/task/FutureProvider.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.flipkart.gjex.core.task;

import io.reactivex.functions.BiConsumer;

import java.util.concurrent.Future;

public interface FutureProvider<T> extends HasExecutionProperties {

Future<T> getFuture();

FutureProvider<T> clone();

void setCompletionConsumer(BiConsumer<T, Throwable> completionConsumer);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package com.flipkart.gjex.core.task;

public interface HasExecutionProperties {

int getTimeout();

boolean isWithRequestHedging();

long getRollingTailLatency();

String getName();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package com.flipkart.gjex.core.task;

import com.flipkart.gjex.core.logging.Logging;
import io.github.resilience4j.bulkhead.ThreadPoolBulkhead;
import io.github.resilience4j.circuitbreaker.CircuitBreaker;
import io.github.resilience4j.metrics.Timer;

import io.grpc.Context;
import io.reactivex.functions.BiConsumer;
import org.aopalliance.intercept.MethodInvocation;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Supplier;

public class ResilienceTaskExecutor<T> implements FutureProvider<T>, Logging {

/** The MethodInvocation to execute asynchronously*/
private final MethodInvocation invocation;

/** The currently active gRPC Context*/
private Context currentContext;

/** The completion BiConsumer*/
private BiConsumer<T, Throwable> completionConsumer;

/** Indicates if requests may be hedged within the configured timeout duration*/
private boolean withRequestHedging;

/** The rolling tail latency as seen by Hystrix*/
private long rollingTailLatency;

private int timeout;

private final CircuitBreaker circuitBreaker;
private final ThreadPoolBulkhead threadPoolBulkhead;
private final Timer timer;

public ResilienceTaskExecutor(MethodInvocation invocation,
CircuitBreaker circuitBreaker,
ThreadPoolBulkhead bulkhead,
Timer timer,
Boolean withRequestHedging,
int timeout
) {
currentContext = Context.current();
this.invocation = invocation;
this.circuitBreaker = circuitBreaker;
this.timer = timer;
this.threadPoolBulkhead = bulkhead;
this.withRequestHedging = withRequestHedging;
this.timeout = timeout;
}

public ResilienceTaskExecutor<T> clone() {
ResilienceTaskExecutor<T> clone =
new ResilienceTaskExecutor<T>(
this.invocation,
this.circuitBreaker,
this.threadPoolBulkhead,
this.timer,
this.withRequestHedging,
this.timeout
);
return clone;
}

public void setCompletionConsumer(BiConsumer<T, Throwable> completionConsumer) {
this.completionConsumer = completionConsumer;
}

public MethodInvocation getInvocation() {
return invocation;
}

public int getTimeout() {
return timeout;
}

public boolean isWithRequestHedging() {
return withRequestHedging;
}

public long getRollingTailLatency() {
//TODO: hardcoded value kept here for now.
// Need to figure out a way to get rolling 95th percentile latency in resilience
return 10000L;
}

@Override
public String getName() {
return getInvocation().getMethod().getName();
}

public CompletableFuture<T> getFuture() {
Supplier<T> supplier = prepareBaseSupplier();
return decorateSupplierWithResilience(supplier)
.get()
.toCompletableFuture();
}

private Supplier<T> prepareBaseSupplier() {
return () -> {
try {
return run();
} catch (Exception e) {
error("Error executing task", e);
throw new RuntimeException(e);
}
};
}

public T run() throws Exception {
Context previous = this.currentContext.attach(); // setting the current gRPC context for the executing Hystrix thread
Throwable error = null;
T result = null;
try {
result = ((AsyncResult<T>)this.invocation.proceed()).invoke(); // call the AsyncResult#invoke() to execute the actual work to be performed asynchronously
return result;
} catch (Throwable e) {
error = e;
error("Error executing task", e);
throw new RuntimeException(e);
} finally {
if (this.completionConsumer != null) {
this.completionConsumer.accept(result, error); // inform the completion status to the registered completion consumer
}
this.currentContext.detach(previous); // unset the current gRPC context
}
}

public <T> Supplier<CompletionStage<T>> decorateSupplierWithResilience(Supplier<T> baseSupplier) {
Supplier<T> cbSupplier = circuitBreaker.decorateSupplier(baseSupplier);
Supplier<CompletionStage<T>> tpbSupplier = threadPoolBulkhead.decorateSupplier(cbSupplier);
return Timer.decorateCompletionStageSupplier(timer, tpbSupplier);
}

}
15 changes: 14 additions & 1 deletion core/src/main/java/com/flipkart/gjex/core/task/TaskExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,14 @@
import io.grpc.Context;
import io.reactivex.functions.BiConsumer;

import java.util.concurrent.Future;

/**
* A {@link HystrixCommand} implementation to provide async execution and circuit breaking functionality for method invocations.
* @author regu.b
*
*/
public class TaskExecutor<T> extends HystrixCommand<T> implements Logging {
public class TaskExecutor<T> extends HystrixCommand<T> implements FutureProvider<T>, Logging {

/** The MethodInvocation to execute asynchronously*/
private final MethodInvocation invocation;
Expand Down Expand Up @@ -91,10 +93,21 @@ public boolean isWithRequestHedging() {
public long getRollingTailLatency() {
return rollingTailLatency;
}

@Override
public String getName() {
return getInvocation().getMethod().getName();
}

public int getTimeout() {
return timeout;
}

@Override
public Future<T> getFuture() {
return queue();
}

/**
* Clone this TaskExecutor with values that were specified during instantiation
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public ResponseEntity invoke() {
* This method is implicitly(default) marked as {@link ConcurrentTask.Completion#Mandatory}
*/
@Traced
@ConcurrentTask(timeoutConfig = "taskProperties.hello.timeout")
@ConcurrentTask(timeoutConfig = "taskProperties.hello.timeout", resilience4jConfig = "taskProperties.tracerMethod4")
public Future<ResponseEntity> tracedMethod4(ResponseEntity entity) {
return new AsyncResult<ResponseEntity>() {
@Override
Expand Down
18 changes: 18 additions & 0 deletions examples/src/main/resources/hello_world_config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,21 @@ apiProperties:

taskProperties:
hello.timeout: 200
tracerMethod4:
circuitBreaker:
type: "TIME_BASED"
slidingWindowSize: 20
threadPoolBulkHead:
queueCapacity: 10
maxThreadPoolSize: 5
coreThreadPoolSize: 3
default:
circuitBreaker:
type: "TIME_BASED"
slidingWindowSize: 10
threadPoolBulkHead:
queueCapacity: 20
maxThreadPoolSize: 10
coreThreadPoolSize: 5

useResilience4j: true
2 changes: 2 additions & 0 deletions guice/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ dependencies {
implementation 'io.zipkin.reporter2:zipkin-sender-okhttp3:2.7.7'

implementation 'io.prometheus:prometheus-metrics-exporter-servlet-javax:1.2.0'
implementation 'com.codahale.metrics:metrics-core:3.0.2'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we instead use the latest version of metrics? libraries.dw_metrics is already included.

implementation 'com.flipkart.resilience4all:resilience4j-metrics-event-stream:0.0.1'

testImplementation libraries.junit4
testImplementation libraries.assertj
Expand Down
3 changes: 3 additions & 0 deletions guice/src/main/java/com/flipkart/gjex/guice/GuiceBundle.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import com.flipkart.gjex.guice.module.ServerModule;
import com.flipkart.gjex.guice.module.TaskModule;
import com.flipkart.gjex.guice.module.TracingModule;
import com.flipkart.gjex.guice.module.ResilienceRegistriesModule;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.inject.Binding;
Expand Down Expand Up @@ -125,6 +126,8 @@ public void initialize(Bootstrap<?, ?> bootstrap) {
modules.add(new ApiModule());
// add the Tracing module before Task module so that even Concurrent tasks can be traced
modules.add(new TracingModule());
// add the Registry module
modules.add(new ResilienceRegistriesModule());
// add the Task module
modules.add(new TaskModule());
// add the Dashboard module
Expand Down
Loading
Loading