-
Notifications
You must be signed in to change notification settings - Fork 48
Parallel Stream
Parallel Stream provides parallel processing of streams, implemented by Fork/Join framework. It is an implementation of asynchronous programming.
The Fork/Join framework is a parallel execution framework provided in Java7. Its strategy is to Divide-and-Conquer, which is to divide a large task into many small subtasks, and then merge the results after the subtasks are executed.

public static void main(String[] args) {
System.out.println("Main Thread Start");
System.out.println("Main Thread Id : " + Thread.currentThread());
sequentialStream();
parallelStream();
System.out.println("Main Thread End");
}
public static void sequentialStream() {
long start = System.currentTimeMillis();
IntStream.rangeClosed(1, 10).forEach(x -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.fillInStackTrace();
}
System.out.println(x + " Sequential Stream Id : " + Thread.currentThread());
});
long end = System.currentTimeMillis();
System.out.println("Sequential Stream: " + (end - start));
}
public static void parallelStream() {
long start = System.currentTimeMillis();
IntStream.rangeClosed(1, 10).parallel().forEach(x -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.fillInStackTrace();
}
System.out.println(x + " Parallel Stream Id : " + Thread.currentThread());
});
long end = System.currentTimeMillis();
System.out.println("Parallel Stream: " + (end - start));
}
/* Output */
Main Thread Start
Main Thread Id : Thread[main,5,main]
1 Sequential Stream Id : Thread[main,5,main]
2 Sequential Stream Id : Thread[main,5,main]
3 Sequential Stream Id : Thread[main,5,main]
4 Sequential Stream Id : Thread[main,5,main]
5 Sequential Stream Id : Thread[main,5,main]
6 Sequential Stream Id : Thread[main,5,main]
7 Sequential Stream Id : Thread[main,5,main]
8 Sequential Stream Id : Thread[main,5,main]
9 Sequential Stream Id : Thread[main,5,main]
10 Sequential Stream Id : Thread[main,5,main]
Sequential Stream: 10057
4 Parallel Stream Id : Thread[ForkJoinPool.commonPool-worker-3,5,main]
2 Parallel Stream Id : Thread[ForkJoinPool.commonPool-worker-27,5,main]
6 Parallel Stream Id : Thread[ForkJoinPool.commonPool-worker-23,5,main]
8 Parallel Stream Id : Thread[ForkJoinPool.commonPool-worker-9,5,main]
3 Parallel Stream Id : Thread[ForkJoinPool.commonPool-worker-19,5,main]
10 Parallel Stream Id : Thread[ForkJoinPool.commonPool-worker-31,5,main]
5 Parallel Stream Id : Thread[ForkJoinPool.commonPool-worker-13,5,main]
7 Parallel Stream Id : Thread[main,5,main]
1 Parallel Stream Id : Thread[ForkJoinPool.commonPool-worker-17,5,main]
9 Parallel Stream Id : Thread[ForkJoinPool.commonPool-worker-5,5,main]
Parallel Stream: 1019
Main Thread End
The syntax difference between sequentialStream and parallelStream is small. From the execution result, the sequentialStream is output in order, while the parallelStream is output out of order; parallelStream takes one-fifth of execution time of sequentialStream.
In the above code, the forEach
method will create different subtasks to operate each element, which will be processed by the ForkJoinPool mentioned above.
View
ForkJoinPool.commonPool().getParallelism()
Update
// By setting the system property during program run
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "4")
// or including the following flag when running the program
-Djava.util.concurrent.ForkJoinPool.common.parallelism = {n}
- Not Thread Safe
public static void main(String[] args) {
List<Integer> sequentialList = new ArrayList<>();
List<Integer> parallelList = new ArrayList<>();
IntStream.range(0, 1000).forEach(sequentialList::add);
IntStream.range(0, 1000).parallel().forEach(parallelList::add);
System.out.println("Sequential List size: " + sequentialList.size());
System.out.println("Parallel List size: " + parallelList.size());
}
/* Output */
Sequential List size: 1000
Parallel List size: 778
public static void main(String[] args) {
final long START_TIME = System.currentTimeMillis();
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "4");
List<Integer> parallelList = new ArrayList<>();
IntStream.range(0, 10).parallel().forEach(x -> {
List<Integer> tmp = new ArrayList<>(parallelList);
System.out.println((System.currentTimeMillis() - START_TIME) +
" Asynchronous Thread Id : " + Thread.currentThread() + " " + tmp.toString());
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.fillInStackTrace();
}
parallelList.add(x);
});
System.out.println(parallelList.toString() + " Parallel List size :" + parallelList.size());
}
/* Output */
5 Asynchronous Thread Id : Thread[ForkJoinPool.commonPool-worker-3,5,main] []
5 Asynchronous Thread Id : Thread[ForkJoinPool.commonPool-worker-1,5,main] []
5 Asynchronous Thread Id : Thread[ForkJoinPool.commonPool-worker-5,5,main] []
5 Asynchronous Thread Id : Thread[main,5,main] []
5 Asynchronous Thread Id : Thread[ForkJoinPool.commonPool-worker-7,5,main] []
533 Asynchronous Thread Id : Thread[ForkJoinPool.commonPool-worker-1,5,main] [8]
533 Asynchronous Thread Id : Thread[ForkJoinPool.commonPool-worker-5,5,main] [8]
533 Asynchronous Thread Id : Thread[ForkJoinPool.commonPool-worker-3,5,main] [8]
533 Asynchronous Thread Id : Thread[ForkJoinPool.commonPool-worker-7,5,main] [8]
1045 Asynchronous Thread Id : Thread[ForkJoinPool.commonPool-worker-5,5,main] [8, 0, 9, 3]
[8, 0, 9, 3, 7] Parallel List size: 5
The operations executed in IntStream.range(0, 10).parallel().forEach()
are not thread safe. There is no data synchronization between different threads at the same time.
According to Fork/Join Framework, we know that IntStream.range(0, 10).parallel().forEach()
will be split into multiple subtasks, each subtask is only responsible for processing a small part of the data, and then process these subtasks with asynchronous multithreading. The problem is that parallelList
is not a thread safe container, and concurrently calling add()
will cause thread safety problems.
/* add() source code */
public boolean add(E e) {
modCount++;
add(e, elementData, size);
return true;
}
private void add(E e, Object[] elementData, int s) {
if (s == elementData.length)
elementData = grow();
elementData[s] = e;
size = s + 1;
}
add()
consists of the following 4 steps:
- Determine the length of elementData. If the capacity of the current array is full,
grow()
will automatically expand the new capacity to 1.5 times of the original capacity, and then copy the elements in the original array to the new array to complete the expansion. - Add e to the position of size, that is, elementData[size] = e
- Read size
- size + 1
Due to the memory visibility problem, when thread A reads the size from the memory, it adds 1 to the size and then writes it to the memory. In this process, thread B may also modify the size and write to the memory. At this time, the update of thread A will overwrite the update of thread B, which also explains why the length of parallelList
is smaller than the IntStream.range(0, 10)
after the Parallel Stream is completed.
If you need thread safety, you can do the following:
- Use
collect()
andreduce()
interfaces (support stateful operations)
public static void main(String[] args) {
List<Integer> list = IntStream.rangeClosed(1, 100)
.boxed()
.parallel()
.collect(Collectors.toList());
System.out.println(list);
System.out.println(list.size());
}
/* Output */
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20]
20
- Try to use
forEach()
as little as possible for some stateful operations during concurrency. If you have to use it, you can useforEachOrdered()
instead
public static void main(String[] args) {
List<Integer> list = new ArrayList<>();
IntStream.rangeClosed(1, 20)
.parallel()
.forEachOrdered(list::add);
System.out.println(list);
System.out.println(list.size());
}
/* Output */
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20]
20
- If you must use
forEach()
, you can use safe shared variable
public static void main(String[] args) {
List<Integer> list1 = Collections.synchronizedList(new ArrayList<>());
IntStream.rangeClosed(1, 20)
.parallel()
.forEach(list1::add);
System.out.println(list1);
System.out.println(list1.size());
}
/* Output */
[13, 15, 14, 11, 12, 1, 16, 17, 20, 19, 3, 7, 5, 4, 8, 6, 18, 2, 9, 10]
20
- If you must use
forEach()
, you can also use thread safeCopyOnWriteArrayList()
public static void main(String[] args) {
List<Integer> list = new CopyOnWriteArrayList<>();
IntStream.rangeClosed(1, 20)
.parallel()
.forEach(list::add);
System.out.println(list);
System.out.println(list.size());
}
/* Output */
[13, 15, 14, 12, 11, 7, 1, 17, 3, 8, 20, 10, 16, 19, 5, 4, 6, 9, 2, 18]
20
- If you must use
forEach()
, you can also add lock inforEach()
public static void main(String[] args) {
Lock lock = new ReentrantLock();
List<Integer> list = new ArrayList<>();
IntStream.rangeClosed(1, 20)
.parallel()
.forEach(e -> {
lock.lock();
list.add(e);
lock.unlock();
});
System.out.println(list);
System.out.println(list.size());
}
/* Output */
[13, 15, 14, 11, 12, 3, 18, 4, 8, 16, 20, 19, 6, 17, 7, 2, 9, 10, 5, 1]
20
- Use Parallel Stream to write concurrent code concisely and efficiently.
- The execution of Parallel Stream is out of order.
- When using Parallel Stream, tasks should be stateless, because Parallel Stream is non thread safe, which may bring uncertainty in the results with stateful operations.
Peer Learning
Codecrunch Contributions
Piazza Contributions
Wiki Contributions
Guides
Setting Up Checkstyle
Setting Up Java
Setting Up MacVim
Setting Up Sunfire
Setting Up Unix For Mac
Setting Up Unix For Windows
Setting Up Vim
Setting up SSH Config
CS2030 Contents
Lecture 1 SummaryCompile-run vs Run-time Summary
Quick Guide To Abstraction
Generics and Variance of Types
Comparable vs Comparator
Summary of completable future
CS2030S Notes
ELI5 Optional.map vs Optional.flatMap
PECS Example Code
Java Collection Framework (Iterator)
Generic
Generic Type Parameter and Generic Wildcard
Calculator
Lambda-Expression
Single Abstract Method (SAM)
Method Reference
Functional Interfaces 2
Simple Usage of Sandbox
Associative-but-not-commutative
Higher Order function
Functional Programming
Calculator With Functor
Eager Evaluation VS Lazy Evaluation
Simple Usage of Lazy Evaluation
Lazy Evaluation for LazyList
Lazy Evaluation for BinaryTree
Stream
Parallel Stream
Optional
Simple Usage of Stream
Asynchronous Programming
Notes on CompletableFuture
Notes on CompletableFuture 2
Simple Usage of CompletableFuture
Mind Map
Exception Handling
Links
CS2030 Java Style Guide
CS2030 Javadoc Specification
JDK 11 Download Link
JDK 11 API Docs
Codecrunch
Piazza Forum