tree_title | description | last_modified |
---|---|---|
Streams |
An overview of Java Streams and when/how to use them |
2020-05-30 15:54:15 +0200 |
- Basic idea
- Creating streams
- Intermediate operations
- Terminal operations
- Streams of primitive types
- Parallel streams
- Drawbacks of using streams
- Resources
Streams are a way to specify operations on lists in a more declarative way.
Example: counting all of the words in a list that are longer than 3 characters.
The classic imperative way (imperative, focus on how to do it):
long numberLongWords = 0;
for (String word: words) {
if (word.length() > 3) {
numberLongWords++;
}
}
The streams approach (more declarative, focus on what to do without specifying exactly how):
numberLongWords = words.stream()
.filter(word -> word.length() > 3)
.count();
Because we are specifying what to do rather than how to do it, it becomes easier to change the exact way that the calculation is performed. For example, if we simply use the parallelStream()
method instead of the stream()
method, the filtering and counting can now happen in parallel (using multiple threads).
Streams can seem similar to collections, but there are some important differences:
- A stream does not necessarily store its elements. They can also be generated on demand. There are even situations when storing all of the elements would be impossible. An example of this are infinite streams, which do not have a finite number of elements.
- Operations on a stream don't change the stream itself. Instead, they generate a new altered stream.
- Stream operations are lazy when possible. This means results are only calculated when needed. For example, if you have a stream expression that filters a list of words to only keep the long words and then takes the first five words, the filter will only be executed until the first five matching words are found. This also makes it possible to perform finite operations on infinite streams.
A stream expression is typically composed of three stages:
- Creating the stream
- Intermediate operations that transform the stream into new streams
- A terminal operation that turns a stream into a non-stream result. Because this is the part that determines what result we need, this is also the part that determines exactly which lazy operations are executed. Without a terminal operation, nothing will happen!
Obtaining a stream from a collection: see above.
Obtaining a stream from an array:
- You can use the static
Stream.of()
method and pass the array to it. That method has a varargs parameter, so instead of an actual array you can also pass it a variable number of arguments that will make up the stream. - If you already have an array but want a stream representing only a part of it, you can use the method
Arrays.stream(array, from, to)
to get such a stream
Stream.of("a", "b", "c");
Stream.ofNullable(nullableString); // 0 elements if nullableString == null, 1 otherwise
Arrays.stream(wordsArray, startIndex, endIndex);
Creating an empty stream: Stream.empty()
Creating infinite streams:
- Use the
Stream.generate()
method, which takes aSupplier<T>
that generates the actual values. Whenever a new value must be generated for the stream, that supplier function is used. - Use the
Stream.iterate()
method when the next value of a stream needs to depend on the previous value- Since Java 9, there is also an overload for this method that takes 3 arguments instead of 2. The added argument (in the middle, not at the end) is a
Predicate
that specifies when the generation of new elements should finish. If thePredicate
fails for a newly generated element, that element is not added to the stream and the generation of new elements is stopped.
- Since Java 9, there is also an overload for this method that takes 3 arguments instead of 2. The added argument (in the middle, not at the end) is a
Stream.generate(() -> "constant"); // infinite constant stream
Stream.generate(Math::random); // infinite stream of random values
Stream<Integer> powersOfTwo =
Stream.iterate(2, n -> n * 2);
Stream<Integer> powersOfTwoSmallerThanFiveHundred =
Stream.iterate(2, n -> n < 500, n -> n * 2);
Filter:
words.stream().filter(word -> word.length() > 12);
Map:
words.stream().map(String::toUpperCase);
Flatmap:
- apply operation that turns every element into a stream
- flatten resulting streams into a single stream
words.stream().flatMap(word -> Stream.of(word.split("")));
Limit number of elements:
infiniteStream.limit(100)
Skip a number of elements:
words.stream().skip(1)
Take elements from stream while a certain condition is true (and stop then)
Stream.of("a", "a", "b", "a").takeWhile(letter -> letter.equals("a")) // a, a
Drop elements while a certain condition is true (get stream of all elements starting from the first element for which the condition was true)
Stream.of("a", "a", "b", "a").dropWhile(letter -> letter.equals("a")) // b, a
Concatenate streams (only makes sense if first one not infinite):
Stream.concat(firstStream, secondStream);
Suppress duplicates:
words.stream().distinct();
Sorting:
words.stream().sorted(Comparator.comparing(String::length));
Invoke a function every time an element is retrieved:
Stream.iterate(2, n -> n * 2)
.peek(System.out::println) // executed every time an element is generated
.limit(20)
.toArray(); // terminal operation to make sure elements are actually retrieved
The peek
function from the above example is also useful for using a debugger on a stream:
Stream.iterate(2, n -> n * 2).peek(x -> {
System.out.println(x); // set breakpoint on this line
}).limit(20).toArray();
Streams are lazy -> without terminal operations, nothing happens at all!
Count number of elements:
words.stream().filter(word -> word.length() > 12).count();
Get min or max:
words.stream().max(String::compareToIgnoreCase); // returns Optional<String>
Find first element:
words.stream()
.filter(word -> word.length() > 12)
.findFirst(); // returns Optional<String>
Find any element (useful with parallel streams):
words.stream()
.filter(word -> word.length() > 12)
.findAny(); // returns Optional<String>
Check if something matches
words.stream().anyMatch(word -> word.length() > 12) // returns boolean
Execute a function for each element:
words.stream().forEach(System.out::println); // not guaranteed to preserve order
words.stream().forEachOrdered(System.out::println) // guaranteed to preserve order
Reduce to a sum, count, average, maximum or minimum value:
IntSummaryStatistics summary =
words.stream().collect(Collectors.summarizingInt(String::length));
int max = summary.getMax();
double average = summary.getAverage();
Concatenate stream of strings:
words.stream().collect(Collectors.joining(", "))
Array:
// to get an array of the correct type (not Object), we need to pass a constructor
String[] result = words.stream().toArray(String[]::new);
List:
words.stream().collect(Collectors.toList());
Set:
words.stream().collect(Collectors.toSet());
Specific kind of collection by passing constructor:
words.stream().collect(Collectors.toCollection(TreeSet::new));
words.stream().collect(
Collectors.toMap(String::length, String::toLowerCase)):
words.stream().collect(
Collectors.toMap(String::length, Function.identity())): // element is value
Note: the above statements will throw if there is more than one element with the same key!
Fix: provide third function that resolves the conflict and determines the value for the key given the existing and new value
words.stream().collect(
Collectors.toMap(
String::length,
Function.identity(),
(existingValue, newValue) -> existingValue));
Specific kind of map -> pass as fourth argument
Transforming into map of lists:
words.stream().collect(
Collectors.groupingBy(String::length));
If classifier function you want to pass to groupingBy is a predicate, partitioningBy is more efficient:
// Map<Boolean, List<String>>
words.stream().collect(
Collectors.partitioningBy(word -> word.startsWith("t")));
Transforming into map of sets:
words.stream().collect(
Collectors.groupingBy(String::length, Collectors.toSet()));
When working with primitive values, it is more efficient to work directly with those primitive values instead of using their boxed versions. There are specialized types IntStream
, DoubleStream
, ... that work directly with primitive types, without using wrappers.
IntStream stream = IntStream.of(1, 1, 2, 3, 5);
IntStream stream = IntStream.range(0, 100); // upper bound excluded
IntStream stream = IntStream.rangeClosed(0, 100); // upper bound included
IntStream stream = words.stream.mapToInt(String::length);
Additional functionality present in IntStream
, DoubleStream
, ...: simple min, max, sum, .. methods
System.out.println(IntStream.rangeClosed(1, 100).sum());
Converting a primitive type stream to an object stream:
Stream<Integer> stream = intStream.boxed();
Getting a parallel stream:
collection.parallelStream()
existingStream.parallel()
Note: if the stream is in parallel mode when the terminal method executes, all intermediate stream operations will also be parallelized!
Note: there is quite some overhead in parallelization, so don't blindly make all of your streams parallel! Parallel streams only make sense for huge in-memory collections of data and computationally expensive processing where different parts of the stream can be processed separately
Getting an idea of the threads involved:
.peek(string -> System.out.println(Thread.currentThread().getName()))
Important: operations to execute in parallel should be stateless and should be able to be executed in arbitrary order!
// bad code (likely to get different - wrong - results each time)
int[] shortWordCounts = new int[12]
words.parallelStream().forEach(word -> {
if (word.length < 12) {
shortWords[word.length()]++; // race condition!
}
})
// better alternative
Map<Integer, Long> shortWordCounts =
words.parallelStream()
.filter(word -> word.length < 12)
.collect(Collectors.groupingBy(
String::length,
Collectors.counting()));
Some operations on parallel streams can be made more efficient by making it clear that you do not care about ordering!
- Example: distinct()
- If you just want distinct elements, but not necessarily in the order in which they first appeared in the original stream, the stream processing can happen in different segments and uniqueness can be tracked using a shared set of duplicates
- Example: limit()
- If you just want x element, not the first x elements, elements can more easily be processed in parallel
Note: by default, streams from ordered collections (arrays and lists), ranges, generators, iterators or from Stream.sorted
are ordered!
Making a stream unordered: simply call Stream.unordered
When using parallel streams, avoid blocking operations!
Parallel streams use the common fork-join pool (ForkJoinPool.commoPool()
). The number of threads in this common pool is determined by the number of cores available and is equal to (#cores - 1). If you use blocking (or in general-long-running) operations in your parallel streams, these will affect all other parallel streams (and any other code that is using the common fork-join pool). It's not that hard to actually block all threads in the common fork-join pool, meaning that no other parallel streams can get any work done until those threads aren't blocked anymore.
Example illustrating this:
public static void main(String[] args) throws InterruptedException {
System.out.println("CommonPool Parallelism: " + ForkJoinPool.commonPool().getParallelism());
ExecutorService es = Executors.newCachedThreadPool();
es.execute(() -> blockingStreamTask());
es.execute(() -> blockingStreamTask());
es.execute(() -> normalStreamTask());
es.execute(() -> normalStreamTask());
es.execute(() -> normalStreamTask());
}
private static void normalStreamTask() {
IntStream.range(1, Integer.MAX_VALUE).parallel().filter(i -> i % 2 == 0).count();
System.out.println("Finished normal stream task");
}
private static void blockingStreamTask() {
IntStream.range(1, Integer.MAX_VALUE).parallel().filter(i -> {
try {
Thread.sleep(Integer.MAX_VALUE);
} catch (InterruptedException e) {}
return i % 2 == 0;
}).count();
System.out.println("Finished blocking stream task");
}
The result of executing the above code might be different every time you execute it, but you will likely see that one or more normal stream tasks are blocked by the blocking stream tasks. Once all threads in the common fork-join pool are executing the Thread.sleep(Integer.MAX_VALUE);
statement, no other work can be processed by the common fork-join pool until the threads stop sleeping.
See also Concurrency
- Performance:
- Using streams implies some overhead, so they are likely slower than hand-written loops etc.
- Note: For huge in-memory collections of data, parallel streams could actually be an easy way to speed up computations. There will still be some overhead compared to the ideal hand-written parallel code, but using parallel streams could be easier and way less error-prone.
- Readability (subjective)
- From a certain level of complexity, code using streams starts to get harder to read (see below)
- Also depends on how familiar the team is with streams
- Stack traces
- Errors happening inside streams lead to more complex stack traces than errors in simple loops
Example:
List<Integer> list = Arrays.asList(1, 2, 3);
// simple nested loop
for (Integer i : list) {
for (int j = 0; j < i; j++) {
System.out.println(i / j);
}
}
/*
Exception in thread "main" java.lang.ArithmeticException: / by zero
at misc.Main.main(Main.java:15)
*/
// streams
list.forEach(i -> {
IntStream.range(0, i).forEach(j -> {
System.out.println(i / j);
});
});
/*
Exception in thread "main" java.lang.ArithmeticException: / by zero
at misc.Main.lambda$1(Main.java:22)
at java.base/java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:104)
at java.base/java.util.stream.IntPipeline$Head.forEach(IntPipeline.java:593)
at misc.Main.lambda$0(Main.java:21)
at java.base/java.util.Arrays$ArrayList.forEach(Arrays.java:4390)
at misc.Main.main(Main.java:20)
*/
- Core Java SE 9 for the Impatient (book by Cay S. Horstmann)
- Be Aware of ForkJoinPool#commonPool()
- Think Twice Before Using Java 8 Parallel Streams
- 3 Reasons why You Shouldn’t Replace Your for-loops by Stream.forEach()