Skip to content

Latest commit

 

History

History
190 lines (166 loc) · 6.22 KB

ch13.md

File metadata and controls

190 lines (166 loc) · 6.22 KB

Concurrency

synchronized

synchronized on class

  String synchronizedOnClass() {
    //this is a synchronized block on the class
    synchronized (SynchronizedMethodAndBlock.class) {
      return message();
    }
  }

Threads

Runnable

Invalid Runnable

Runnable r = ()->5; //does not compile

Start thread

Thread t = new Thread(() -> System.out.println("hello"));
t.start();
System.out.println("completed");

Basic Threads

Thread interrupt

interrupt() only impacts a thread in a WAITING or TIMED_WAITING state.
Calling interrupt() on a thread in a NEW or RUNNABLE state has no impact. mInterrupting a thread that is not alive not have any effect.

Parallel Stream

The forEachOrdered() method will process the elements in the order in which they are stored in the stream.

Reduce

Stream<T> stream

<U> U reduce(U identity,
    BiFunction<U,? super T,U>accumulator,
    BinaryOperator<U> combiner)

Rules

  • identity: same type as the return type
  • accumulator: 1st parameter as the return type, 2nd parameter as the source stream
  • combiner: 1st and 2nd parameter as the return type

Performs a reduction on the elements of this stream, using the provided identity, accumulation and combining functions. The accumulator and combiner must be:

  • associative;
  • non-interfering;
  • stateless.

reduce stream parallel.png

predictable results at runtime

Parallel vs Sequential reduce

Sequential

To obtain an equivalent sequential stream of an existing parallel stream.

List<Integer> sequentialResult = numbers.parallelStream()
    .sequential()  // This makes the stream sequential
    .collect(Collectors.toList());

Methods of Executors

    ExecutorService executorService = Executors.newSingleThreadExecutor();
    ExecutorService cachedExecutorService = Executors.newCachedThreadPool();
    ExecutorService fixedExecutorService = Executors.newFixedThreadPool(10);
    //scheduled methods
    ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(10);
    ScheduledExecutorService singleScheduledExecutorService = Executors.newSingleThreadScheduledExecutor();

ExecutorService

Methods

Future<?> submit(Runnable task)

<T> Future<T> submit(Callable<T> task)
    
void execute(Runnable command) //from the parent Executor

Calling get on Future<?> returns null.

InvokeAny

List<Callable<String>> list = buildActions();
String message;

//NOTE: it only accepts Collection<Callable<T>>
//note that it returns not a future but the data
message = executorService.invokeAny(list);

Future

V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException,TimeoutException

Mind to catch the checked exception when you call Future.get(...)

try {
    result = future.get(1, TimeUnit.SECONDS);
    //mind the three checked exceptions to be caught
    } catch (InterruptedException | ExecutionException | TimeoutException e) {
    throw new RuntimeException(e);
}

Shutdown

void executor(){
  ExecutorService executorService = Executors.newSingleThreadExecutor();
  try{
      executorService.submit(printInventory);
    }finally{
      //if you do not call this the method
      //the code will run but never terminate,
      executorService.shutdown();
    }
}

ScheduledExecutorService

public interface ScheduledExecutorService extends ExecutorService {/**/}
ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
<V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit)
SheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit
ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
  • scheduleAtFixedRate()can result in the same action being executed by two threads at the same time.
  • scheduleAtFixedRate() takes 4 args

Concurrent Collections

Sorted Concurrent Collections

If you see SkipList as part of a concurrent class name, it means it is sorted in some way.

  • ConcurrentSkipListSet
  • ConcurrentSkipListMap

Requirements for Parallel Reduction with collect()

  • The stream is parallel.
  • The parameter of the collect() operation has the Characteristics.CONCURRENT characteristic.
  • Either the stream is unordered or the collector has the characteristic Characteristics.UNORDERED.
<R> R collect(Supplier<R> supplier,
 BiConsumer<R,? super T> accumulator,
 BiConsumer<R,R> combiner)

Example

Check Characteristic

CyclicBarrier

public int await() throws InterruptedException, BrokenBarrierException {...}

Mind that InterruptedException and BrokenBarrierException are both checked exceptions.
CyclicBarrier examples

Lock

tryLock

Acquires the lock if it is free within the given waiting time:

 if (lock.tryLock(1, TimeUnit.MINUTES)) {..}

Returns immediately

 if (lock.tryLock() {..}

tryLock

parallel stream

findFirst() method always returns the first element on an ordered stream, regardless if it is serial or parallel!

Identifying Threading Problems

There are three types of liveness issues with which you should be familiar: deadlock, starvation, and livelock.

Deadlock

Deadlock occurs when two or more threads are blocked forever.
Deadlock 1
Deadlock 2

Starvation

Starvation occurs when a single thread is perpetually denied access to a shared resource.

Livelock

Livelock is a form of starvation where two or more threads are active but conceptually blocked forever.