Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename Task.blocking to Task.callableInExecutor #292

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ private Strategy(long sleepMs) {

@Override
public Task<Map<Integer, Try<String>>> taskForBatch(Set<Integer> keys) {
return Task.blocking(() -> {
return Task.callableInExecutor(() -> {
try {
// make this batching task long-running
Thread.sleep(_sleepMs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,12 @@

import com.linkedin.parseq.Context;
import com.linkedin.parseq.Engine;
import com.linkedin.parseq.MultiException;
import com.linkedin.parseq.Task;
import com.linkedin.parseq.promise.Promise;
import com.linkedin.parseq.promise.PromiseListener;
import com.linkedin.parseq.promise.Promises;
import com.linkedin.parseq.promise.SettablePromise;
import java.io.IOException;
import java.util.ArrayList;
import java.util.EnumMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -282,7 +280,7 @@ public Task<Void> delete(String path, int version) {
*/
@Override
public Task<List<OpResult>> multi(List<Op> ops, Executor executor) {
return Task.blocking(() -> _zkClient.multi(ops), executor);
return Task.callableInExecutor(() -> _zkClient.multi(ops), executor);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/
package com.linkedin.parseq;

import com.linkedin.parseq.EngineBuilder;
import com.linkedin.parseq.internal.ArgumentUtil;
import com.linkedin.parseq.promise.Promise;
import com.linkedin.parseq.promise.Promises;
Expand All @@ -37,7 +36,7 @@
* To use this class with an engine, register an executor with engine using
* {@link #register(EngineBuilder, java.util.concurrent.Executor)}
*
* @deprecated As of 2.0.0, replaced by {@link Task#blocking(String, Callable, Executor) Task.blocking}.
* @deprecated As of 2.0.0, replaced by {@link Task#callableInExecutor(String, Callable, Executor)}.
* @author Walter Fender ([email protected])
*/
@Deprecated
Expand All @@ -51,15 +50,15 @@ public static void register(EngineBuilder builder, Executor executor) {
}

/**
* @deprecated As of 2.0.0, replaced by {@link Task#blocking(String, Callable, Executor) Task.blocking}.
* @deprecated As of 2.0.0, replaced by {@link Task#callableInExecutor(String, Callable, Executor)}.
*/
@Deprecated
public AsyncCallableTask(final Callable<R> syncJob) {
this(null, syncJob);
}

/**
* @deprecated As of 2.0.0, replaced by {@link Task#blocking(String, Callable, Executor) Task.blocking}.
* @deprecated As of 2.0.0, replaced by {@link Task#callableInExecutor(String, Callable, Executor)}.
*/
@Deprecated
public AsyncCallableTask(final String name, final Callable<R> syncJob) {
Expand Down
39 changes: 28 additions & 11 deletions subprojects/parseq/src/main/java/com/linkedin/parseq/Task.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -1079,7 +1078,7 @@ public static <R> Task<R> flatten(final Task<Task<R>> task) {
* Creates a new task that have a value of type {@code Void}. Because the
* returned task returns no value, it is typically used to produce side effects.
* It is not appropriate for long running or blocking actions. If action is
* long running or blocking use {@link #blocking(String, Callable, Executor) blocking} method.
* long running or blocking use {@link #callableInExecutor(String, Callable, Executor)} method.
*
* <blockquote><pre>
* // this task will print "Hello" on standard output
Expand Down Expand Up @@ -1167,7 +1166,7 @@ public static <T> Task<T> failure(final Throwable failure) {
* from the supplied callable. This task is useful when doing basic
* computation that does not require asynchrony. It is not appropriate for
* long running or blocking callables. If callable is long running or blocking
* use {@link #blocking(String, Callable, Executor) blocking} method.
* use {@link #callableInExecutor(String, Callable, Executor)} method.
*
* <blockquote><pre>
* // this task will complete with {@code String} representing current time
Expand Down Expand Up @@ -1313,7 +1312,7 @@ public static <T> Task<T> fromTry(final Try<? extends T> tried) {
*
* This method is not appropriate for long running or blocking callables.
* If callable is long running or blocking use
* {@link #blocking(String, Callable, Executor) blocking} method.
* {@link #callableInExecutor(String, Callable, Executor)} method.
* <p>
*
* @param <T> the type of the return value for this task
Expand Down Expand Up @@ -1403,10 +1402,10 @@ public static <T> Task<T> async(final Function1<Context, Promise<? extends T>> f
* @return a new task that will submit the callable to given executor and complete
* with result returned by that callable
*/
public static <T> Task<T> blocking(final String name, final Callable<? extends T> callable, final Executor executor) {
public static <T> Task<T> callableInExecutor(final String name, final Callable<? extends T> callable, final Executor executor) {
ArgumentUtil.requireNotNull(callable, "callable");
ArgumentUtil.requireNotNull(callable, "executor");
Task<T> blockingTask = async(name, () -> {
Task<T> asyncCallableTask = async(name, () -> {
final SettablePromise<T> promise = Promises.settable();
executor.execute(() -> {
try {
Expand All @@ -1417,18 +1416,36 @@ public static <T> Task<T> blocking(final String name, final Callable<? extends T
} );
return promise;
});
blockingTask.getShallowTraceBuilder().setTaskType(TaskType.BLOCKING.getName());
return blockingTask;
asyncCallableTask.getShallowTraceBuilder().setTaskType(TaskType.CALLABLE_IN_EXECUTOR.getName());
return asyncCallableTask;
}

/**
* Equivalent to {@code callableInExecutor("callableInExecutor", callable, executor)}.
* @see #callableInExecutor(String, Callable, Executor)
*/
public static <T> Task<T> callableInExecutor(final Callable<? extends T> callable, final Executor executor) {
return callableInExecutor("callableInExecutor: " + _taskDescriptor.getDescription(callable.getClass().getName()), callable, executor);
}

/**
* Equivalent to {@code blocking("blocking", callable, executor)}.
* @see #blocking(String, Callable, Executor)
* @deprecated please use {@link Task#callableInExecutor(Callable, Executor)}
*/
@Deprecated

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment on the replacement to use.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

udpated

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add java doc to redirect to new method

public static <T> Task<T> blocking(final Callable<? extends T> callable, final Executor executor) {
return blocking("blocking: " + _taskDescriptor.getDescription(callable.getClass().getName()), callable, executor);
return callableInExecutor("callableInExecutor: " + _taskDescriptor.getDescription(callable.getClass().getName()), callable, executor);
}


/**
* @deprecated please use {@link Task#callableInExecutor(String, Callable, Executor)}
*/
@Deprecated

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment on the replacement to use.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

udpated

public static <T> Task<T> blocking(final String name, final Callable<? extends T> callable, final Executor executor) {
return callableInExecutor(name, callable, executor);
}


/**
* Creates a new task that will run given tasks in parallel. Returned task
* will be resolved with results of all tasks as soon as all of them has
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
*/
public enum TaskType {
FUSION ("fusion"),
BLOCKING ("blocking"),
CALLABLE_IN_EXECUTOR("callbleInExecutor"),
SHAREABLE ("shareable"),
FLATTEN ("flatten"),
WITH_SIDE_EFFECT ("withSideEffect"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,10 @@ public void testAsyncWithContext() {
}

@Test
public void testBlocking() {
public void testcallableInExecutor() {
TestingExecutorService es = new TestingExecutorService(Executors.newSingleThreadExecutor());
try {
Task<String> task = Task.blocking(() -> "from blocking", es);
Task<String> task = Task.callableInExecutor(() -> "from blocking", es);
runAndWait("TestTaskFactoryMethods.testBlocking", task);
assertEquals(task.get(), "from blocking");
assertEquals(es.getCount(), 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ public void testFusionTaskType() {
public void testBlockingTaskType() {
TestingExecutorService es = new TestingExecutorService(Executors.newSingleThreadExecutor());
try {
Task<String> task = Task.blocking(() -> "blocking task", es);
Task<String> task = Task.callableInExecutor(() -> "blocking task", es);
runAndWait("blockingTaskType", task);
assertEquals(task.getShallowTrace().getTaskType(), TaskType.BLOCKING.getName());
assertEquals(task.getShallowTrace().getTaskType(), TaskType.CALLABLE_IN_EXECUTOR.getName());
} finally {
es.shutdown();
}
Expand Down