Skip to content

Commit

Permalink
Implements better benchmarks and tests parallel builds
Browse files Browse the repository at this point in the history
  • Loading branch information
nolantait committed Mar 12, 2022
1 parent 50c6e7a commit e638231
Show file tree
Hide file tree
Showing 9 changed files with 67 additions and 104 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
/bin/
/.shards/
*.dwarf
/build/

# Libraries don't need dependency lock
# Dependencies will be locked in applications that use them
Expand Down
9 changes: 9 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
SHELL=/bin/sh

run_benchmarks:
echo "Building benchmarks..."
crystal build -Dpreview_mt benchmarks/producer_consumer.cr --release -p -o build/benchmarks-producer-consumer
crystal build -Dpreview_mt benchmarks/throughput.cr --release -p -o build/benchmarks-throughput
echo "Running benchmarks..."
CRYSTAL_WORKERS=2 ./build/benchmarks-producer-consumer
./build/benchmarks-throughput
24 changes: 12 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,29 +109,29 @@ disruptor = Disruptor::Queue(String).new(1024, Disruptor::WaitWithReturn.new)

## Development

Run benchmarks with `crystal benchmarks/producer_consumer.cr`
Run benchmarks with `make run_benchmarks`

To test run `crystal specs`

### Current benchmarks:

*Producer/Consumer*
*Producer/Consumer multithreading with 2 workers*
```
user system total real
concurrent disruptor: 1.877195 0.010555 1.887750 ( 2.097337)
basic disruptor: 1.504052 0.007077 1.511129 ( 1.520054)
queue: 0.574990 0.002785 0.577775 ( 0.582177)
array: 0.607392 0.004656 0.612048 ( 0.617646)
user system total real
disruptor: 0.100912 0.067791 0.168703 ( 0.169126)
channels: 2.749560 0.082247 2.831807 ( 1.617937)
```
*Throughput of simple pop/push*
*Throughput of simple pop/push in a single thread*
```
disruptor: 4.08k (245.07µs) (± 4.11%) 0.0B/op 3.74× slower
queue: 15.26k ( 65.53µs) (± 2.54%) 0.0B/op fastest
array: 14.21k ( 70.35µs) (± 3.38%) 0.0B/op 1.07× slower
spin disruptor: 21.73M ( 46.02ns) (± 3.46%) 0.0B/op 2.61× slower
yield disruptor: 21.82M ( 45.83ns) (± 4.91%) 0.0B/op 2.60× slower
return disruptor: 21.89M ( 45.67ns) (± 7.20%) 0.0B/op 2.59× slower
queue: 54.70M ( 18.28ns) (±12.40%) 0.0B/op 1.04× slower
array: 56.65M ( 17.65ns) (± 7.54%) 0.0B/op fastest
```
For comparison to the much better performance of the Java implementation see
For comparison to the likely much better performance of the Java implementation see
page 10 of the [Disruptor Technical Paper](https://lmax-exchange.github.io/disruptor/files/Disruptor-1.0.pdf)
Pull requests and discussion are encouraged.
Expand Down
53 changes: 16 additions & 37 deletions benchmarks/producer_consumer.cr
Original file line number Diff line number Diff line change
Expand Up @@ -4,68 +4,47 @@ require "../src/disruptor"

n = 16777216
half = 8388608
value = "Hello"

Benchmark.bm do |x|
disruptor = Disruptor::Queue(String).new(n)
queue = Deque(String).new(n)
array = Array(String).new(n)
x.report("disruptor:") do
disruptor = Disruptor::Queue(Int32).new(n, Disruptor::WaitWithYield.new)

x.report("concurrent disruptor:") do
spawn do
half.times do
disruptor.push(value)
disruptor.push rand(1..100)
end
end

spawn do
half.times do
disruptor.push(value)
disruptor.push rand(1..100)
end
end

spawn do
half.times do
disruptor.pop
end
end

spawn do
half.times do
n.times do
disruptor.pop
end
end

Fiber.yield
end

x.report(" basic disruptor:") do
n.times do
disruptor.push(value)
end

n.times do
disruptor.pop
end
end
x.report("channels:") do
channel = Channel(Int32).new(n)

x.report(" queue:") do
n.times do
queue.push(value)
end

n.times do
queue.pop
spawn do
half.times do
channel.send rand(1..100)
end
end
end

x.report(" array:") do
n.times do
array.push(value)
spawn do
half.times do
channel.send rand(1..100)
end
end

n.times do
array.pop
end
n.times { channel.receive }
end
end
32 changes: 10 additions & 22 deletions benchmarks/throughput.cr
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@ require "benchmark"

require "../src/disruptor"

iterations = 500_000
n = 2048
value = 0

Benchmark.ips do |x|
spin_disruptor = Disruptor::Queue(Int32).new(n, Disruptor::WaitWithSpin.new)
Expand All @@ -14,37 +12,27 @@ Benchmark.ips do |x|
array = Array(Int32).new(n)

x.report("spin disruptor:") do
iterations.times do
spin_disruptor.push(value)
spin_disruptor.pop
end
spin_disruptor.push rand(1..100)
spin_disruptor.pop
end

x.report("yield disruptor:") do
iterations.times do
yield_disruptor.push(value)
yield_disruptor.pop
end
yield_disruptor.push rand(1..100)
yield_disruptor.pop
end

x.report("return disruptor:") do
iterations.times do
return_disruptor.push(value)
return_disruptor.pop
end
return_disruptor.push rand(1..100)
return_disruptor.pop
end

x.report(" queue:") do
iterations.times do
queue.push(value)
queue.pop
end
queue.push rand(1..100)
queue.pop
end

x.report(" array:") do
iterations.times do
array.push(value)
array.pop
end
array.push rand(1..100)
array.pop
end
end
8 changes: 2 additions & 6 deletions examples/binance.cr
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,12 @@ require "../src/disruptor"

size = 2048
channel = Channel(String).new
disruptor = Disruptor::Queue(String).new(size)
disruptor = Disruptor::Queue(String).new(size, Disruptor::WaitWithYield.new)

spawn do
socket = HTTP::WebSocket.new("wss://stream.binance.com:9443/ws/btcusdt@trade")
socket.on_message do |message|
disruptor.push message
channel.send(message)
end
socket.run
end
Expand All @@ -20,14 +19,11 @@ spawn do
socket = HTTP::WebSocket.new("wss://stream.binance.com:9443/ws/ethusdt@trade")
socket.on_message do |message|
disruptor.push message
channel.send(message)
end
socket.run
end


while message = channel.receive?
while message = disruptor.pop
puts disruptor.inspect
puts message
disruptor.pop
end
35 changes: 11 additions & 24 deletions examples/overload.cr
Original file line number Diff line number Diff line change
Expand Up @@ -3,42 +3,29 @@ require "log"
require "../src/disruptor"

n = 2048
half = 1024
value = "Hello"

disruptor = Disruptor::Queue(String).new(n, Disruptor::WaitWithYield.new)
disruptor = Disruptor::Queue(Int32).new(n, Disruptor::WaitWithYield.new)

spawn do
loop do
disruptor.push(value)
disruptor.push(value)
Fiber.yield
end
end
rand(1..100).times do
disruptor.push rand(1..100)
end

spawn do
loop do
disruptor.push(value)
disruptor.push(value)
Fiber.yield
sleep rand(0.01..0.1)
end
end

spawn do
loop do
disruptor.pop
Fiber.yield
end
end
rand(1..100).times do
disruptor.push rand(1..100)
end

spawn do
loop do
disruptor.pop
Fiber.yield
sleep rand(0.01..0.1)
end
end

loop do
while message = disruptor.pop
puts disruptor.inspect
Fiber.yield
puts message
end
2 changes: 1 addition & 1 deletion src/disruptor/queue.cr
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ module Disruptor
nil
end

def pop
def pop : T
next_sequence = @sequence.increment
@barrier.wait_for(next_sequence)
@buffer.get(next_sequence)
Expand Down
7 changes: 5 additions & 2 deletions src/disruptor/ring.cr
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,11 @@ module Disruptor
@buffer[slot % @size] = value
end

def get(slot : Slot)
@buffer[slot % @size]
def get(slot : Slot) : T
value = @buffer[slot % @size]
raise Exception.new unless value

return value
end

def claimed_count
Expand Down

0 comments on commit e638231

Please sign in to comment.