-
Notifications
You must be signed in to change notification settings - Fork 48
Simple Usage of CompletableFuture
To start off, CompletableFuture is based off Future API (https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/concurrent/Future.html). When we talk about CompletableFuture, we use it for asynchronous programming where we promise values to be returned hence the name.
Completable Javadocs API: https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/concurrent/CompletableFuture.html
import java.util.concurrent.CompletableFuture;
class A {
private final int x;
A() {
this(0);
}
A(int x) {
this.x = x;
}
void sleep() {
System.out.println(Thread.currentThread().getName() + " " + x);
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
System.out.println("interrupted");
}
}
A incr() {
sleep();
return new A(this.x + 1);
}
A decr() {
sleep();
if (x < 0) {
throw new IllegalStateException();
}
return new A(this.x - 1);
}
/**
* A foo(A a) {
* return a.incr().decr();
* }
*/
/**
* CompletableFuture<A> foo(A a) {
* return CompletableFuture.supplyAsync(() -> a.incr().decr());
* }
*/
/**
* CompletableFuture<A> foo(A a) {
* return CompletableFuture.supplyAsync(a::incr).thenApply(A::decr);
* }
*/
CompletableFuture<A> foo(A a) {
return CompletableFuture.supplyAsync(a::incr).thenApplyAsync(A::decr);
}
/**
* A bar(A a) {
* return a.incr();
* }
*/
CompletableFuture<A> bar(A a) {
return CompletableFuture.supplyAsync(a::incr);
}
/**
* A baz(A a, int x) {
* if (x == 0) {
* return new A(0);
* } else {
* return a.incr().decr();
* }
* }
*/
CompletableFuture<A> baz(A a, int x) {
if (x == 0) {
return CompletableFuture.completedFuture(new A(0));
} else {
return CompletableFuture.supplyAsync(() -> a.incr().decr());
}
}
B f(A a) {
System.out.println(Thread.currentThread().getName() + " A: " + a);
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
System.out.println("interrupted");
}
return new B(2);
}
C g(B b) {
System.out.println(Thread.currentThread().getName() + " B: " + b);
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
System.out.println("interrupted");
}
return new C(3);
}
D h(B b) {
System.out.println(Thread.currentThread().getName() + " B: " + b);
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
System.out.println("interrupted");
}
return new D(3);
}
D h(C c) {
System.out.println(Thread.currentThread().getName() + " C: " + c);
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
System.out.println("interrupted");
}
return new D(4);
}
E i(C c, D d) {
System.out.println(Thread.currentThread().getName() + " C: " + c + ", D: " + d);
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
System.out.println("interrupted");
}
return new E(Integer.parseInt(c.toString()) + Integer.parseInt(d.toString()));
}
@Override
public String toString() {
return "" + x;
}
}
public class B {
private final int x;
B() {
this(0);
}
B(int x) {
this.x = x;
}
@Override
public String toString() {
return "" + x;
}
}
public class C {
private final int x;
C() {
this(0);
}
C(int x) {
this.x = x;
}
@Override
public String toString() {
return "" + x;
}
}
public class D {
private final int x;
D() {
this(0);
}
D(int x) {
this.x = x;
}
@Override
public String toString() {
return "" + x;
}
}
public class E {
private final int x;
E() {
this(0);
}
E(int x) {
this.x = x;
}
@Override
public String toString() {
return "" + x;
}
}
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class Main {
public static void main(String[] args) {
// q1a();
// q1b();
// q1d();
// q1e();
// q2a();
// q2b();
q2c();
}
public static void q1a() {
final long START_TIME = System.currentTimeMillis();
A a = new A();
CompletableFuture<A> cf = a.foo(a);
// Busy waiting
// Do not end the main thread immediately,
// otherwise the thread pool used by CompletableFuture
// by default will be closed immediately
while (!cf.isDone()) {
try {
System.out.println("CompletableFuture Processing in other thread... (" +
(System.currentTimeMillis() - START_TIME) + ")");
Thread.sleep(1000);
} catch (InterruptedException e) {
e.fillInStackTrace();
}
}
try {
System.out.println(cf.get());
} catch (InterruptedException | ExecutionException e) {
e.fillInStackTrace();
}
}
public static void q1b() {
final long START_TIME = System.currentTimeMillis();
A a = new A();
CompletableFuture<A> cf = a.foo(new A()).thenCompose(a::bar);
// Busy waiting
// Do not end the main thread immediately,
// otherwise the thread pool used by CompletableFuture
// by default will be closed immediately
while (!cf.isDone()) {
try {
System.out.println("CompletableFuture Processing in other thread... (" +
(System.currentTimeMillis() - START_TIME) + ")");
Thread.sleep(1000);
} catch (InterruptedException e) {
e.fillInStackTrace();
}
}
try {
System.out.println(cf.get());
} catch (InterruptedException | ExecutionException e) {
e.fillInStackTrace();
}
}
public static void q1d() {
final long START_TIME = System.currentTimeMillis();
A a = new A();
CompletableFuture<Void> allCFs = CompletableFuture.allOf(
a.foo(new A()),
a.bar(new A()),
a.baz(new A(), 1));
// Busy waiting
// Do not end the main thread immediately,
// otherwise the thread pool used by CompletableFuture
// by default will be closed immediately
while (!allCFs.isDone()) {
try {
System.out.println("CompletableFuture Processing in other thread... (" +
(System.currentTimeMillis() - START_TIME) + ")");
Thread.sleep(1000);
} catch (InterruptedException e) {
e.fillInStackTrace();
}
}
}
public static void q1e() {
final long START_TIME = System.currentTimeMillis();
CompletableFuture<A> cf = CompletableFuture
.supplyAsync(() -> new A().decr().decr())
.handle((result, exception) -> {
if (result == null) {
System.out.println("ERROR: " + exception);
return new A();
} else {
return result;
}
});
// Busy waiting
// Do not end the main thread immediately,
// otherwise the thread pool used by CompletableFuture
// by default will be closed immediately
while (!cf.isDone()) {
try {
System.out.println("CompletableFuture Processing in other thread... (" +
(System.currentTimeMillis() - START_TIME) + ")");
Thread.sleep(1000);
} catch (InterruptedException e) {
e.fillInStackTrace();
}
}
try {
System.out.println(cf.get());
} catch (InterruptedException | ExecutionException e) {
e.fillInStackTrace();
}
}
public static void q2a() {
final long START_TIME = System.currentTimeMillis();
A a = new A(1);
/**
* B b = a.f(a);
* C c = a.g(b);
* D d = a.h(c);
*/
CompletableFuture<D> cf = CompletableFuture
.supplyAsync(() -> a.f(a))
.thenApply(a::g)
.thenApply(a::h);
// Busy waiting
// Do not end the main thread immediately,
// otherwise the thread pool used by CompletableFuture
// by default will be closed immediately
while (!cf.isDone()) {
try {
System.out.println("CompletableFuture Processing in other thread... (" +
(System.currentTimeMillis() - START_TIME) + ")");
Thread.sleep(1000);
} catch (InterruptedException e) {
e.fillInStackTrace();
}
}
try {
System.out.println(cf.get());
} catch (InterruptedException | ExecutionException e) {
e.fillInStackTrace();
}
}
public static void q2b() {
final long START_TIME = System.currentTimeMillis();
A a = new A(1);
/**
* B b = a.f(a);
* C c = a.g(b);
* a.h(c); // no return value
*/
CompletableFuture<Void> cf = CompletableFuture
.supplyAsync(() -> a.f(a))
.thenApply(a::g)
.thenAccept(a::h);
// Busy waiting
// Do not end the main thread immediately,
// otherwise the thread pool used by CompletableFuture
// by default will be closed immediately
while (!cf.isDone()) {
try {
System.out.println("CompletableFuture Processing in other thread... (" +
(System.currentTimeMillis() - START_TIME) + ")");
Thread.sleep(1000);
} catch (InterruptedException e) {
e.fillInStackTrace();
}
}
}
public static void q2c() {
final long START_TIME = System.currentTimeMillis();
A a = new A(1);
/**
* B b = f(a);
* C c = g(b);
* D d = h(b);
* E e = i(c, d);
*/
CompletableFuture<B> cfb = CompletableFuture
.supplyAsync(() -> a.f(a));
CompletableFuture<C> cfc = cfb.thenApply(a::g);
CompletableFuture<D> cfd = cfb.thenApply(a::h);
CompletableFuture<E> cfe = cfc.thenCombine(cfd, a::i);
// Busy waiting
// Do not end the main thread immediately,
// otherwise the thread pool used by CompletableFuture
// by default will be closed immediately
while (!cfe.isDone()) {
try {
System.out.println("CompletableFuture Processing in other thread... (" +
(System.currentTimeMillis() - START_TIME) + ")");
Thread.sleep(1000);
} catch (InterruptedException e) {
e.fillInStackTrace();
}
}
try {
System.out.println(cfe.get());
} catch (InterruptedException | ExecutionException e) {
e.fillInStackTrace();
}
}
}
Peer Learning
Codecrunch Contributions
Piazza Contributions
Wiki Contributions
Guides
Setting Up Checkstyle
Setting Up Java
Setting Up MacVim
Setting Up Sunfire
Setting Up Unix For Mac
Setting Up Unix For Windows
Setting Up Vim
Setting up SSH Config
CS2030 Contents
Lecture 1 SummaryCompile-run vs Run-time Summary
Quick Guide To Abstraction
Generics and Variance of Types
Comparable vs Comparator
Summary of completable future
CS2030S Notes
ELI5 Optional.map vs Optional.flatMap
PECS Example Code
Java Collection Framework (Iterator)
Generic
Generic Type Parameter and Generic Wildcard
Calculator
Lambda-Expression
Single Abstract Method (SAM)
Method Reference
Functional Interfaces 2
Simple Usage of Sandbox
Associative-but-not-commutative
Higher Order function
Functional Programming
Calculator With Functor
Eager Evaluation VS Lazy Evaluation
Simple Usage of Lazy Evaluation
Lazy Evaluation for LazyList
Lazy Evaluation for BinaryTree
Stream
Parallel Stream
Optional
Simple Usage of Stream
Asynchronous Programming
Notes on CompletableFuture
Notes on CompletableFuture 2
Simple Usage of CompletableFuture
Mind Map
Exception Handling
Links
CS2030 Java Style Guide
CS2030 Javadoc Specification
JDK 11 Download Link
JDK 11 API Docs
Codecrunch
Piazza Forum