Skip to content
jac18281828 edited this page Jun 10, 2016 · 23 revisions

Conversant ConcurrentQueue and Disruptor BlockingQueue

Disruptor is the highest performing intra-thread transfer mechanism available in Java. Conversant Disruptor is the highest performing implementation of this type of ring buffer queue because it has almost no overhead and it exploits a particularly simple design.

Introduction

The Conversant Disruptor is designed to be fast by exploiting only cache level optimizations while keeping the overall approach to thread transfers direct and simple. The main advantage of the Conversant Disruptor over other Disruptor implementations is that it is based on the Java BlockingQueue interface so existing software can be immediately adapted to use this disruptor without significant programming changes.

Conversant Disruptor is a drop in replacement for ArrayBlockingQueue with an order of magnitude better performance. In comparison with LinkedTransferQueue this disruptor implementation is roughly two times faster and does not allocate any objects internally. Given that LinkedTransferQueue is state of the art in Java performance, Conversant Disruptor may yield significant performance improvements for applications that rely on fast intra-thread transfers.

Conversant Disruptor is capable of 4ns intra-thread transfers for push-pull senarios and 20ns multi-thread transfers in an 1 to N senario. The disruptor approach is sensitive to huge numbers of threads and will not exhibit good performance if your application exploits hundreds or thousands of waiting threads. Waiting threads will spin-lock the CPU. From a performance perspective, designing applications to exploit hundreds or thousands of waiting threads is never advisable.

Discussion Forum

Conversant Disruptor has a google group so you can follow releases and changes:

https://groups.google.com/forum/#!forum/conversant-disruptor

Getting Started

Getting started with Conversant Disruptor is easy! Simply download and build the Java package. Once you have included the dependency in your project, you may use the Conversant Disruptor as you would any other Java BlockingQueue.

Conversant Disruptor comes in two flavors: PushPull and Multithread. PushPull is designed to work in the special case where there is only one producer thread and one consumer thread present. It is critical that this requirement is met, because the performance advantage of the PushPull implementation comes at the cost of not being thread safe for multiple producer or consumer threads. If you access the PushPull implementation with multiple simultaneous threads data will be lost.

The Multithread implementation provides a full blocking queue implementation which may be used with multiple producer and consumer threads, however keep in mind that performance is degraded as the number of threads increase. This is a natural artifact of the disruptor strategy because most of performance comes from a particular thread obtaining the shared context by chance. The number of threads should be kept to a minimum. Ultimately there is a 5x performance penalty for using the Multithread version even for a push-pull scenario.

Conversant Disruptor provides four different implementations. The ConcurrentQueue implementations provide the rudimentary queuing functionality without any of the timing or waiting functionality spec'd by Java's api. This is useful if you are developing an application that will utilize spin locking or where timed waiting is not needed. The BlockingQueue implementations provide full support of the Java BlockingQueue interface including blocking and waiting timing support.

Disruptor Description
PushPullConcurrentQueue Single producer, single consumer ConcurrentQueue
MultithreadConcurrentQueue Multi producer, multi consumer ConcurrentQueue
PushPullBlockingQueue Single producer, single consumer Java BlockingQueue
DisruptorBlockingQueue Multi producer, multi consumer Java BlockingQueue

Batch

It is convenient to access the Conversant Disruptor as a BlockingQueue, using the typical offer(E e) and E poll() approach. However ConcurrentQueue also provides a method to bulk remove: int remove(E[] elements). In the case where thread contention is assured, the bulk removal can achieve transfer of multiple elements with the equivalent CAS/sequence overhead of just a single transaction. For some applications this could afford an order of magnitude performance improvement.

Using Maven

Maven users can incorporate Conversant Disruptor the usual way.

<dependency>
  <groupId>com.conversantmedia</groupId>
  <artifactId>disruptor</artifactId>
  <version>1.2.1</version>
</dependency>

32-bit

Conversant Disruptor is tuned and tested on a 64bit HotSpot JVM running on 64-bit (x86-64) Intel processors. The Disruptor should be tested carefully and completely on other processor families or architectures prior to using it. The maintainers can provide implementation alternatives or bug fixes if you find Disruptor is not working in your environment.

Examples

Intra-thread messaging

This is an extremely contrived example that performs calculations in a separate thread. It would not be wise to perform extremely fast operations such as INCREMENT and DECREMENT in a separate thread. However, this example could be adapted easily for a use case where lower performing asynchronous operations are substituted.

Things to notice in this example:

  1. The message passed is immutable and small, it does not create garbage
  2. The activity on the accumulator is synchronized using the accumulatorSequence
package com.conversant.util.concurrent;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * Created by jcairns on 12/30/15.
 */
public class DisruptorExample {
    private final BlockingQueue<Message> messageQueue;
    private final ExecutorService calcThreads;

    private volatile boolean isStopped = false;

    private final AtomicSequence accumulatorSequence = new AtomicSequence();
    private volatile int accumulator = -1;

    DisruptorExample() {
        messageQueue = new DisruptorBlockingQueue<>(512);
        final int nThreads = Runtime.getRuntime().availableProcessors();
        calcThreads = Executors.newFixedThreadPool(nThreads);
        for(int i=0; i<nThreads; i++) {
            calcThreads.execute(new Calculator());
        }
    }

    public static void main(final String[] args) {
        final DisruptorExample example = new DisruptorExample();

        try {
            example.clear();

            while(example.getAccumulator() != 0) Thread.yield();

            example.addX(1000);
            example.decrementX(500);
            example.addX(100);
            example.decrementX(50);
            example.addX(10);
            example.decrementX(5);

            while(!example.messageQueue.isEmpty() || example.getAccumulator() != 555) Thread.yield();

            System.out.println("Got it:"+example.getAccumulator());
        } finally {
            example.stop();
        }

    }

    private void addX(final int n) {
        for(int i=0; i<n; i++) {
            while(!messageQueue.offer(Message.INCREMENT))
                Thread.yield();
        }
    }

    private void decrementX(final int n) {
        for(int i=0; i<n; i++) {
            while(!messageQueue.offer(Message.DECREMENT))
                Thread.yield();
        }
    }

    private void clear() {
        while(!messageQueue.offer(Message.CLEAR))
            Thread.yield();
    }

    private void stop() {
        while(!messageQueue.offer(Message.STOP))
            Thread.yield();

        calcThreads.shutdown();
    }

    private int getAccumulator() {
        return accumulator;
    }

    private enum Message {
        INCREMENT,
        DECREMENT,
        CLEAR,
        STOP
    }

    private final class Calculator implements Runnable {

        @Override
        public void run() {
            while(!isStopped) {
                try {
                    final Message m = messageQueue.poll(100L, TimeUnit.MILLISECONDS);
                    if(m != null) {
                        for(;;) {
                            final long seq = accumulatorSequence.get();
                            if(accumulatorSequence.update(seq)) {
                                try {
                                    switch (m) {
                                        case INCREMENT:
                                            accumulator = accumulator+1;
                                            break;
                                        case DECREMENT:
                                            accumulator = accumulator-1;
                                            break;
                                        case CLEAR:
                                            accumulator = 0;
                                            break;
                                        case STOP:
                                            isStopped = true;
                                            break;
                                    }
                                } finally {
                                    accumulatorSequence.commit();
                                }
                                break; // sequence check
                            } else {
                                Thread.yield();
                            }
                        }
                    } else {
                        Thread.yield();
                    }
                } catch (InterruptedException e) {
                    isStopped = true;
                }
            }
        }
    }
}