Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improvements & fixes to api deadlines & task timeouts #33

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,5 +71,10 @@ public enum Completion {
*/
String concurrencyConfig() default "";

/**
* Indicates whether timeout should take into consideration the deadline specified by {@link com.flipkart.gjex.core.service.Api} if any
*/
boolean respectDeadline() default false;

}

Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public static <T1,T2,T3,R> R compose (FutureDecorator<? extends T1>future1,

/** Convenience method to get the response from completion of the specified FutureDecorator and evaluate completion as defined in the FutureDecorator*/
@SuppressWarnings({ "rawtypes", "unchecked" })
private static Object getResultFromFuture(FutureDecorator future) {
public static Object getResultFromFuture(FutureDecorator future) {
Object result = null;
Integer futureGetTimeout = null;
// check if a Deadline has been set. This will become the first timeout we consider
Expand All @@ -158,7 +158,9 @@ private static Object getResultFromFuture(FutureDecorator future) {
}
// see if a Task timeout has been set, use the smaller of the Deadline and Task timeout
if (future.getTaskExecutor().getTimeout() > 0) {
futureGetTimeout = Math.min(futureGetTimeout, future.getTaskExecutor().getTimeout());
futureGetTimeout = futureGetTimeout == null ?
future.getTaskExecutor().getTimeout() :
Math.min(futureGetTimeout, future.getTaskExecutor().getTimeout());
}
if (future.getTaskExecutor().isWithRequestHedging() && future.getTaskExecutor().getRollingTailLatency() > 0) {
// we'll take the minimum of deadline and rolling tail latency (if request hedging is enabled) as the timeout for the Future
Expand All @@ -178,13 +180,15 @@ private static Object getResultFromFuture(FutureDecorator future) {
LOGGER.info("Sending hedged request for Task : " + future.getTaskExecutor().getInvocation().getMethod().getName());
result = FutureDecorator.getResultFromFuture(new FutureDecorator(future.getTaskExecutor().clone(), future.getCompletion()));
}
if (future.getCompletion().equals(ConcurrentTask.Completion.Mandatory)) {
throw new TaskException("Task execution results not available due to timeout.",
new StatusException(Status.DEADLINE_EXCEEDED.withDescription("Deadline exceeded waiting for results :" + e.getMessage())));
} else {
LOGGER.warn("Timeout in optional task. Not failing the execution and proceeding.");
}
} catch (InterruptedException | ExecutionException e) {
String errorMessage = e.getCause() == null ? e.getMessage() : e.getCause().getMessage();
if (future.getCompletion().equals(ConcurrentTask.Completion.Mandatory)) {
if (TimeoutException.class.isAssignableFrom(e.getClass())) {
throw new TaskException("Task execution results not available.",
new StatusException(Status.DEADLINE_EXCEEDED.withDescription("Deadline exceeded waiting for results :" + e.getMessage())));
}
throw new TaskException("Error executing mandatory Task : " + errorMessage, e);
} else {
LOGGER.warn("Execution exception in optional task :" + errorMessage + " . Not failing the execution and proceeding.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ public Object invoke(MethodInvocation invocation) throws Throwable {
if (api.deadlineConfig().length() > 0) { // check if deadline is specified as a config property
deadline = globalConfigurationProvider.get().getInt(api.deadlineConfig());
}
if (api.deadline() > 0) { // finally override with method level annotation if present
deadline = api.deadline();
}
if (Context.current().getDeadline() == null) {
cancellableContext = Context.current().withDeadlineAfter(deadline, TimeUnit.MILLISECONDS, scheduledExecutorServiceProvider.get());
previous = cancellableContext.attach(); // attach the CancellableContext and store the previous Context
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.inject.AbstractModule;
import com.google.inject.matcher.AbstractMatcher;
import com.google.inject.matcher.Matchers;
import io.grpc.Context;
import org.aopalliance.intercept.MethodInterceptor;
import org.aopalliance.intercept.MethodInvocation;
import org.apache.commons.configuration.Configuration;
Expand All @@ -31,6 +32,7 @@
import javax.inject.Provider;
import java.lang.annotation.Annotation;
import java.lang.reflect.Method;
import java.util.concurrent.TimeUnit;

/**
* A Guice {@link AbstractModule} for managing interception of methods annotated with {@link ConcurrentTask}
Expand Down Expand Up @@ -60,9 +62,12 @@ public Object invoke(MethodInvocation invocation) throws Throwable {
if (task.timeoutConfig().length() > 0) { // check if timeout is specified as a config property
timeout = globalConfig.getInt(task.timeoutConfig());
}
if (task.timeout() > 0) { // we take the method level annotation value as the final override
if (task.timeout() > 0) { // we take the method level annotation value as an override
timeout = task.timeout();
}
if (task.respectDeadline() && Context.current().getDeadline() != null) { // if this task is in the context of a deadlined grpc call, timeout is bound by the deadline
timeout = Math.min(timeout, (int)Context.current().getDeadline().timeRemaining(TimeUnit.MILLISECONDS));
}
int concurrency = 0;
if (task.concurrencyConfig().length() > 0) { // check if concurrency is specified as a config property
concurrency = globalConfig.getInt(task.concurrencyConfig());
Expand Down