-
Notifications
You must be signed in to change notification settings - Fork 48
Notes on CompletableFuture
Hello, I was a bit confused when I first learned about CompletableFuture so made some note on completable future and its methods. Hope it helps!
CompletableFuture is a class that implements Future and CompletionStage. Let's take a look at the CompletableFuture example to see how it works.
Before that let's define log method so that we can visually see what is going on in which thread.
public static void log(String msg) {
System.out.println(LocalTime.now() + " ("
+ Thread.currentThread().getName() + ") " + msg);
}
CompletableFuture can be generated by new CompletableFuture and Type refers to the type of data that the job has completed and saved. You can create a CompletableFuture as follows and use it as a general Future.
CompletableFuture<String> future
= new CompletableFuture<>();
Executors.newCachedThreadPool().submit(() -> {
Thread.sleep(2000);
future.complete("Finished");
return null;
});
log(future.get());
Output:
22:58:40.478 (main) Finished
If you already know the completedFuture()value, you can assign it without creating a thread using CompletableFuture.completedFuture(value)
Future<String> completableFuture =
CompletableFuture.completedFuture("Skip!");
String result = completableFuture.get();
log(result);
Output:
22:59:42.553 (main) Skip!
supplyAsync() can be used to handle the async work without creating a direct thread to provide. You can pass supplier into SupplyAsync() and it will be processed by another thread.
In the example below, I put thread in sleep for 2 seconds so that future.get() (which will be called from main thread) will be delayed by 2 seconds before receiving the return value from supplyAsync().
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
log("Starting....");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Finished works";
});
log("get(): " + future.get());
Output :
15:46:09.715 (ForkJoinPool.commonPool-worker-1) Starting....
15:46:11.716 (main) get(): Finished works
As you can see from the output, that it is processed by a different thread other than main before getting called back from main.
You may get an Exception when processing your work in CompletableFuture. In such cases, you use handle() to handle the exception.
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
String name = null;
if (name == null) {
throw new RuntimeException("Computation error!");
}
return "Hello, " + name;
}).handle((s, t) -> s != null ? s : "Hello, Stranger!");
log(future.get());
Output :
15:51:35.385 (main) Hello, Stranger!
Once something is processed though SupplyAsync(), you can perform another task using thenApply().
thenApply() runs the function passed as a parameter using the return value from SupplyAsync() as an argument.
*This method is equivalent to map.
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> "Future1")
.thenApply(s -> s + " + Future2");
log("future.get(): " + future.get());
Output :
15:57:49.343 (main) future.get(): Future1 + Future2
thenApply() also has a return value, so you can apply one after another.
E.g)
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> "Hello")
.thenApply(s -> s + " World")
.thenApply(s -> s + " Future");
log("future.get(): " + future.get());
Output:
16:00:00.513 (main) future.get(): Hello World Future
thenAccept() and thenApply() are similar but the argument thenAccept() handle has no return value(i.e Consumer )
Thus, the type of CompletableFuture is Void
E.g)
CompletableFuture<String> future1 = CompletableFuture
.supplyAsync(() -> "Hello");
CompletableFuture<Void> future2 = future1.thenAccept(
s -> log(s + " World"));
log("future1.get(): " + future1.get());
log("future2.get(): " + future2.get());
Output:
16:02:05.452 (main) Hello World
16:02:05.453 (main) future1.get(): Hello
16:02:05.453 (main) future2.get(): null
As you can see from the output above, the one with thenAccept returns null.
thenCompose() plays the role of creating one CompletableFuture form two CompletableFutures like a chain.
When the result of the first CompletableFuture is returned, the result is transmitted to the second CompletableFuture and the jobs are processed sequentially.
It takes in a function that returns CompletableFutures as an argument.
*it is equivalent to flatmap
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> "Hello")
.thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " World"));
log(future.get());
Output:
16:07:33.196 (main) Hello World
thenCombine() takes in two CompletableFutures and process them parallelly.
After both processes of two CompletableFutures are done, it combines the two and returns one CompletableFuture.\
For example, let's have two CompletableFutures and use thenCombine() to process them so that these two futures are done in parallel and the results are combined into one.
CompletableFuture<String> future1 = CompletableFuture
.supplyAsync(() -> "Future1")
.thenApply((s) -> {
log("Starting future1");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return s + "!";
});
CompletableFuture<String> future2 = CompletableFuture
.supplyAsync(() -> "Future2")
.thenApply((s) -> {
log("Starting future2");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return s + "!";
});
future1.thenCombine(future2, (s1, s2) -> s1 + " + " + s2)
.thenAccept((s) -> log(s));
Output:
16:12:03.569 (main) Starting future1
16:12:05.571 (main) Starting future2
16:12:07.573 (main) Future1! + Future2!
Looking at the results, it looks as if they were processed sequentially. The reason is that thenApply() is using the same thread which makes the waiting time.
Instead of using thenApply(), using thenApplyAsync() will make it to process them in different threads.
CompletableFuture<String> future1 = CompletableFuture
.supplyAsync(() -> "Future1")
.thenApplyAsync((s) -> {
log("Starting future1");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return s + "!";
});
CompletableFuture<String> future2 = CompletableFuture
.supplyAsync(() -> "Future2")
.thenApplyAsync((s) -> {
log("Starting future2");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return s + "!";
});
future1.thenCombine(future2, (s1, s2) -> s1 + " + " + s2)
.thenAccept((s) -> log(s));
Output:\
16:15:39.532 (ForkJoinPool.commonPool-worker-2) Starting future2
16:15:39.537 (ForkJoinPool.commonPool-worker-1) Starting future1
16:15:41.537 (ForkJoinPool.commonPool-worker-1) Future1! + Future2!
As you can see from the output, now both tasks are done in different threads.
anyOf() is a method to get the fastest result that is processed from multiple CompletableFutures.
For example, let's have three CompletableFuture with different completion times.
CompletableFuture<String> future1 = CompletableFuture
.supplyAsync(() -> "Future1")
.thenApplyAsync((s) -> {
log("Starting future1");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return s + "!";
});
CompletableFuture<String> future2 = CompletableFuture
.supplyAsync(() -> "Future2")
.thenApplyAsync((s) -> {
log("Starting future2");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return s + "!";
});
CompletableFuture<String> future3 = CompletableFuture
.supplyAsync(() -> "Future3")
.thenApplyAsync((s) -> {
log("Starting future3");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return s + "!";
});
CompletableFuture.anyOf(future1, future2, future3)
.thenAccept(s -> log("Result: " + s));
Output:
03:49:58.100954 (ForkJoinPool.commonPool-worker-1) Starting future1
03:49:58.101765 (ForkJoinPool.commonPool-worker-2) Starting future2
03:49:58.102536 (ForkJoinPool.commonPool-worker-3) Starting future3
03:49:59.102378 (ForkJoinPool.commonPool-worker-1) Result: Future1!
As you can see from the output, all the tasks will be processed but only the one executed the fasetest will be passed to the thenAccept().
Unlike anyOf(), allOf() waits for all of the tasks to be completed and performs another task.
allOf() can be processed using stream API as well.
e.g.)
CompletableFuture<String> future1 = CompletableFuture
.supplyAsync(() -> "Future1");
CompletableFuture<String> future2 = CompletableFuture
.supplyAsync(() -> "Future2");
CompletableFuture<String> future3 = CompletableFuture
.supplyAsync(() -> "Future3");
CompletableFuture<Void> combinedFuture
= CompletableFuture.allOf(future1, future2, future3);
log("get() : " + combinedFuture.get());
log("future1.isDone() : " + future1.isDone());
log("future2.isDone() : " + future2.isDone());
log("future3.isDone() : " + future3.isDone());
String combined = Stream.of(future1, future2, future3)
.map(CompletableFuture::join)
.collect(Collectors.joining(" + "));
log("Combined: " + combined);
Output:
16:22:26.615 (main) get() : null
16:22:26.615 (main) future1.isDone() : true
16:22:26.615 (main) future2.isDone() : true
16:22:26.616 (main) future3.isDone() : true
16:22:26.620 (main) Combined: Future1 + Future2 + Future3
Peer Learning
Codecrunch Contributions
Piazza Contributions
Wiki Contributions
Guides
Setting Up Checkstyle
Setting Up Java
Setting Up MacVim
Setting Up Sunfire
Setting Up Unix For Mac
Setting Up Unix For Windows
Setting Up Vim
Setting up SSH Config
CS2030 Contents
Lecture 1 SummaryCompile-run vs Run-time Summary
Quick Guide To Abstraction
Generics and Variance of Types
Comparable vs Comparator
Summary of completable future
CS2030S Notes
ELI5 Optional.map vs Optional.flatMap
PECS Example Code
Java Collection Framework (Iterator)
Generic
Generic Type Parameter and Generic Wildcard
Calculator
Lambda-Expression
Single Abstract Method (SAM)
Method Reference
Functional Interfaces 2
Simple Usage of Sandbox
Associative-but-not-commutative
Higher Order function
Functional Programming
Calculator With Functor
Eager Evaluation VS Lazy Evaluation
Simple Usage of Lazy Evaluation
Lazy Evaluation for LazyList
Lazy Evaluation for BinaryTree
Stream
Parallel Stream
Optional
Simple Usage of Stream
Asynchronous Programming
Notes on CompletableFuture
Notes on CompletableFuture 2
Simple Usage of CompletableFuture
Mind Map
Exception Handling
Links
CS2030 Java Style Guide
CS2030 Javadoc Specification
JDK 11 Download Link
JDK 11 API Docs
Codecrunch
Piazza Forum