diff --git a/Streams/Parallelism/JavaApple.java b/Streams/Parallelism/JavaApple.java new file mode 100644 index 0000000..3332969 --- /dev/null +++ b/Streams/Parallelism/JavaApple.java @@ -0,0 +1,148 @@ +/** + * Example to demomstrate Parallelism in Streams + */ +import java.util.Scanner; +import java.util.stream.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +/** + * A simple Beverage POJO + */ +class Beverage { + private StringBuilder beverageName; + private int beveragePrice; + + public Beverage(StringBuilder beverageName, int beveragePrice) { + this.beverageName = beverageName; + this.beveragePrice = beveragePrice; + } + + public StringBuilder getBeverageName() { + return beverageName; + } + + public int getBeveragePrice() { + return beveragePrice; + } + + @Override + public String toString() { + return beverageName.toString() + "-" + beveragePrice+" "; + } +} + +/** + * To invoke a parallel stream either use - parallelStream() or parallel() + * + * parallelStream should only be used for Collections and parallel for the + * other Basestream + */ +public class JavaApple { + + public ArrayList getFilledArrayList(ArrayList beverages, String input) { + Scanner beverageScanner = new Scanner(input); + beverageScanner.useDelimiter(" "); + + Scanner tempScanner = null; + + while(beverageScanner.hasNext()) { + tempScanner = new Scanner(beverageScanner.next()); + tempScanner.useDelimiter(","); + beverages.add(new Beverage(new StringBuilder(tempScanner.next()), Integer.parseInt(tempScanner.next()))); + } + + tempScanner.close(); + beverageScanner.close(); + return beverages; + } + + public Stream getBeveragesStream(ArrayList beverages) { + return beverages.stream(); + } + + public Stream getConcurrentBeverageStream(ArrayList beverages) { + return beverages.parallelStream(); + } + + public static void main(String[] args) { + Scanner scanner = new Scanner(System.in); + System.out.println("Enter beverage name and beverage price separted by a comma followed by "+ + "spaces for subsequent beverages."); + ArrayList beverages = new ArrayList<>(); + beverages = new JavaApple().getFilledArrayList(beverages, scanner.nextLine()); + + System.out.println(beverages); + + /** + * A concurrent stream that reduces the stream to an integer + */ + int sum = new JavaApple().getConcurrentBeverageStream(beverages) + .map(Beverage::getBeveragePrice) + .reduce((totalPrice, individualPrice) -> { + return totalPrice+individualPrice; + }) + .get(); + System.out.println(sum); + + /** + * A concurrent stream is processed by the Java compiler and runtime. + * They aren't executed in the order in which they are defined. + */ + new JavaApple().getConcurrentBeverageStream(beverages) + .forEach(System.out::print); + System.out.println(); + new JavaApple().getConcurrentBeverageStream(beverages) + .forEach(System.out::print); + System.out.println(); + + /** + * Adding an element to a collection is a stateful lambda expression + * Statful lambda expression can give unexpected results when executed + * espicially in case of parallel stream + */ + ArrayList integerList = new ArrayList<>(Arrays.asList(1,2,3,4,5,6,7,8,10)); + ArrayList serialStorage = new ArrayList<>(); + /** + * A thread safe collection is created so as to prevent multiple-threads from + * accessing it at the same time. + */ + List concurrentStorage = Collections.synchronizedList(new ArrayList()); + + integerList.stream() + .map((integer) -> { + serialStorage.add(integer); + return integer; + }) + .forEachOrdered((element) -> { + System.out.print(element+" "); + }); + System.out.println(); + + serialStorage.stream() + .forEachOrdered((element) -> { + System.out.print(element+" "); + }); + System.out.println("\n-------------------------\n"); + + integerList.parallelStream() + .map((integer) -> { + concurrentStorage.add(integer); + return integer; + }) + .forEachOrdered((element) -> { + System.out.print(element+" "); + }); + System.out.println(); + + concurrentStorage.parallelStream() + .forEachOrdered((element) -> { + System.out.print(element+" "); + }); + System.out.println(); + + scanner.close(); + } +} \ No newline at end of file diff --git a/Streams/Parallelism/README.md b/Streams/Parallelism/README.md new file mode 100644 index 0000000..2be51ae --- /dev/null +++ b/Streams/Parallelism/README.md @@ -0,0 +1,21 @@ +# Parallelism Theory + +Theory for parallelism in streams. + +## Side Effects + +A method or an expression has a side effect if, in addition to returning or producing a value, it also modifies the state of the computer. + +### Laziness + +An expression, method, or algorithm is lazy if its value is evaluated only when it is required. On the other hand, an algorithm is eager if it is evaluated or processed immediately. + +All intermediate operations are lazy. + +### Interference + +Interference occurs when the source of a stream is modified while a pipeline processes the stream. + +## Stateful Lambda Expressions + +A stateful lambda expression is one whose result depends on any state that might change during the execution of a pipeline. Its result can vary every time the code is run. \ No newline at end of file diff --git a/Streams/Parallelism/input.dat b/Streams/Parallelism/input.dat new file mode 100644 index 0000000..2afe425 --- /dev/null +++ b/Streams/Parallelism/input.dat @@ -0,0 +1 @@ +watermelon-juice,20 dragon-fruit-smoothee,200 yum-yum-juice,23 mango-smoothee,20 frozen-kiwi-shake,20 fruit-god-shake,30 \ No newline at end of file