From e6962cb4dbd27c33d906ef9c02bb705c8a6662bd Mon Sep 17 00:00:00 2001 From: Julien Portalier Date: Tue, 3 Dec 2024 19:36:51 +0100 Subject: [PATCH 01/14] Add Thread::WaitGroup Simple abstraction on top of a mutex and condition variable to synchronize the execution of a set of threads. --- src/crystal/system/thread_wait_group.cr | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) create mode 100644 src/crystal/system/thread_wait_group.cr diff --git a/src/crystal/system/thread_wait_group.cr b/src/crystal/system/thread_wait_group.cr new file mode 100644 index 000000000000..3494e1e7f569 --- /dev/null +++ b/src/crystal/system/thread_wait_group.cr @@ -0,0 +1,20 @@ +# :nodoc: +class Thread::WaitGroup + def initialize(@count : Int32) + @mutex = Thread::Mutex.new + @condition = Thread::ConditionVariable.new + end + + def done : Nil + @mutex.synchronize do + @count -= 1 + @condition.broadcast if @count == 0 + end + end + + def wait : Nil + @mutex.synchronize do + @condition.wait(@mutex) unless @count == 0 + end + end +end From 2fc450fb18de1c1445dcb6d79bb4fe576e8f106c Mon Sep 17 00:00:00 2001 From: Julien Portalier Date: Tue, 3 Dec 2024 18:28:53 +0100 Subject: [PATCH 02/14] Add Fiber::Queue singly-linked LIFO queue --- spec/std/fiber/queue_spec.cr | 183 +++++++++++++++++++++++++++++++++++ src/fiber.cr | 3 + src/fiber/queue.cr | 83 ++++++++++++++++ 3 files changed, 269 insertions(+) create mode 100644 spec/std/fiber/queue_spec.cr create mode 100644 src/fiber/queue.cr diff --git a/spec/std/fiber/queue_spec.cr b/spec/std/fiber/queue_spec.cr new file mode 100644 index 000000000000..39c67bc52bee --- /dev/null +++ b/spec/std/fiber/queue_spec.cr @@ -0,0 +1,183 @@ +require "../spec_helper" +require "fiber/queue" + +describe Fiber::Queue do + describe "#initialize" do + it "creates an empty queue" do + q = Fiber::Queue.new + q.@head.should be_nil + q.@tail.should be_nil + q.size.should eq(0) + q.empty?.should be_true + end + + it "creates a filled queue" do + f1 = Fiber.new(name: "f1") { } + f2 = Fiber.new(name: "f2") { } + f1.schedlink = f2 + f2.schedlink = nil + + q = Fiber::Queue.new(f2, f1, size: 2) + q.@head.should be(f2) + q.@tail.should be(f1) + q.size.should eq(2) + q.empty?.should be_false + end + end + + describe "#push" do + it "to head" do + q = Fiber::Queue.new + f1 = Fiber.new(name: "f1") { } + f2 = Fiber.new(name: "f2") { } + f3 = Fiber.new(name: "f3") { } + + # simulate fibers previously added to other queues + f1.schedlink = f3 + f2.schedlink = f1 + + # push first fiber + q.push(f1) + q.@head.should be(f1) + q.@tail.should be(f1) + f1.schedlink.should be_nil + q.size.should eq(1) + + # push second fiber + q.push(f2) + q.@head.should be(f2) + q.@tail.should be(f1) + f2.schedlink.should be(f1) + f1.schedlink.should be_nil + q.size.should eq(2) + + # push third fiber + q.push(f3) + q.@head.should be(f3) + q.@tail.should be(f1) + f3.schedlink.should be(f2) + f2.schedlink.should be(f1) + f1.schedlink.should be_nil + q.size.should eq(3) + end + end + + describe "#bulk_unshift" do + it "to empty queue" do + # manually create a queue + f1 = Fiber.new(name: "f1") { } + f2 = Fiber.new(name: "f2") { } + f3 = Fiber.new(name: "f3") { } + f3.schedlink = f2 + f2.schedlink = f1 + f1.schedlink = nil + q1 = Fiber::Queue.new(f3, f1, size: 3) + + # push in bulk + q2 = Fiber::Queue.new(nil, nil, size: 0) + q2.bulk_unshift(pointerof(q1)) + q2.@head.should be(f3) + q2.@tail.should be(f1) + q2.size.should eq(3) + end + + it "to filled queue" do + f1 = Fiber.new(name: "f1") { } + f2 = Fiber.new(name: "f2") { } + f3 = Fiber.new(name: "f3") { } + f4 = Fiber.new(name: "f4") { } + f5 = Fiber.new(name: "f5") { } + + # source queue + f3.schedlink = f2 + f2.schedlink = f1 + f1.schedlink = nil + q1 = Fiber::Queue.new(f3, f1, size: 3) + + # destination queue + f5.schedlink = f4 + f4.schedlink = nil + q2 = Fiber::Queue.new(f5, f4, size: 2) + + # push in bulk + q2.bulk_unshift(pointerof(q1)) + q2.@head.should be(f5) + q2.@tail.should be(f1) + q2.size.should eq(5) + + f5.schedlink.should be(f4) + f4.schedlink.should be(f3) + f3.schedlink.should be(f2) + f2.schedlink.should be(f1) + f1.schedlink.should be(nil) + end + end + + describe "#pop" do + it "from head" do + f1 = Fiber.new(name: "f1") { } + f2 = Fiber.new(name: "f2") { } + f3 = Fiber.new(name: "f3") { } + f3.schedlink = f2 + f2.schedlink = f1 + f1.schedlink = nil + q = Fiber::Queue.new(f3, f1, size: 3) + + # removes third element + q.pop.should be(f3) + q.@head.should be(f2) + q.@tail.should be(f1) + q.size.should eq(2) + + # removes second element + q.pop.should be(f2) + q.@head.should be(f1) + q.@tail.should be(f1) + q.size.should eq(1) + + # removes first element + q.pop.should be(f1) + q.@head.should be_nil + q.@tail.should be_nil + q.size.should eq(0) + + # empty queue + expect_raises(IndexError) { q.pop } + q.size.should eq(0) + end + end + + describe "#pop?" do + it "from head" do + f1 = Fiber.new(name: "f1") { } + f2 = Fiber.new(name: "f2") { } + f3 = Fiber.new(name: "f3") { } + f3.schedlink = f2 + f2.schedlink = f1 + f1.schedlink = nil + q = Fiber::Queue.new(f3, f1, size: 3) + + # removes third element + q.pop?.should be(f3) + q.@head.should be(f2) + q.@tail.should be(f1) + q.size.should eq(2) + + # removes second element + q.pop?.should be(f2) + q.@head.should be(f1) + q.@tail.should be(f1) + q.size.should eq(1) + + # removes first element + q.pop?.should be(f1) + q.@head.should be_nil + q.@tail.should be_nil + q.size.should eq(0) + + # empty queue + q.pop?.should be_nil + q.size.should eq(0) + end + end +end diff --git a/src/fiber.cr b/src/fiber.cr index 55745666c66d..2697b9b6283e 100644 --- a/src/fiber.cr +++ b/src/fiber.cr @@ -66,6 +66,9 @@ class Fiber # :nodoc: property previous : Fiber? + # :nodoc: + property schedlink : Fiber? + # :nodoc: def self.inactive(fiber : Fiber) fibers.delete(fiber) diff --git a/src/fiber/queue.cr b/src/fiber/queue.cr new file mode 100644 index 000000000000..9695ac8b6a83 --- /dev/null +++ b/src/fiber/queue.cr @@ -0,0 +1,83 @@ +# The queue is modeled after Go's `gQueue`, distributed under a BSD-like +# license: + +class Fiber + # :nodoc: + # + # Singly-linked list of `Fiber`. + # Last-in, first-out (LIFO) semantic. + # A fiber can only exist within a single `Queue` at any time. + # + # Unlike `Crystal::PointerLinkedList` doubly-linked list, this `Queue` is + # meant to maintain a queue of runnable fibers, or to quickly collect an + # arbitrary number of fibers. + # + # Thread unsafe! An external lock is required for concurrent accesses. + struct Queue + getter size : Int32 + + def initialize(@head : Fiber? = nil, @tail : Fiber? = nil, @size = 0) + end + + def push(fiber : Fiber) : Nil + fiber.schedlink = @head + @head = fiber + @tail = fiber if @tail.nil? + @size += 1 + end + + def bulk_unshift(queue : Queue*) : Nil + return unless last = queue.value.@tail + last.schedlink = nil + + if tail = @tail + tail.schedlink = queue.value.@head + else + @head = queue.value.@head + end + @tail = queue.value.@tail + + @size += queue.value.size + end + + @[AlwaysInline] + def pop : Fiber + pop { raise IndexError.new } + end + + @[AlwaysInline] + def pop? : Fiber? + pop { nil } + end + + private def pop(&) + if fiber = @head + @head = fiber.schedlink + @tail = nil if @head.nil? + @size -= 1 + fiber.schedlink = nil + fiber + else + yield + end + end + + @[AlwaysInline] + def empty? : Bool + @head == nil + end + + def clear + @size = 0 + @head = @tail = nil + end + + def each(&) : Nil + cursor = @head + while cursor + yield cursor + cursor = cursor.schedlink + end + end + end +end From 761349490b2b3e052e24f00d52f8424a77586151 Mon Sep 17 00:00:00 2001 From: Julien Portalier Date: Tue, 3 Dec 2024 19:39:22 +0100 Subject: [PATCH 03/14] Add Runnables and GlobalQueue for schedulers to keep fibers --- .../execution_context/global_queue_spec.cr | 225 +++++++++++++++ spec/std/execution_context/runnables_spec.cr | 264 ++++++++++++++++++ spec/std/execution_context/spec_helper.cr | 21 ++ src/execution_context/global_queue.cr | 104 +++++++ src/execution_context/runnables.cr | 210 ++++++++++++++ 5 files changed, 824 insertions(+) create mode 100644 spec/std/execution_context/global_queue_spec.cr create mode 100644 spec/std/execution_context/runnables_spec.cr create mode 100644 spec/std/execution_context/spec_helper.cr create mode 100644 src/execution_context/global_queue.cr create mode 100644 src/execution_context/runnables.cr diff --git a/spec/std/execution_context/global_queue_spec.cr b/spec/std/execution_context/global_queue_spec.cr new file mode 100644 index 000000000000..838a31406c01 --- /dev/null +++ b/spec/std/execution_context/global_queue_spec.cr @@ -0,0 +1,225 @@ +require "./spec_helper" + +describe ExecutionContext::GlobalQueue do + it "#initialize" do + q = ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + q.empty?.should be_true + end + + it "#unsafe_push and #unsafe_pop" do + f1 = Fiber.new(name: "f1") { } + f2 = Fiber.new(name: "f2") { } + f3 = Fiber.new(name: "f3") { } + + q = ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + q.unsafe_push(f1) + q.size.should eq(1) + + q.unsafe_push(f2) + q.unsafe_push(f3) + q.size.should eq(3) + + q.unsafe_pop?.should be(f3) + q.size.should eq(2) + + q.unsafe_pop?.should be(f2) + q.unsafe_pop?.should be(f1) + q.unsafe_pop?.should be_nil + q.size.should eq(0) + q.empty?.should be_true + end + + describe "#unsafe_grab?" do + it "can't grab from empty queue" do + q = ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + runnables = ExecutionContext::Runnables(6).new(q) + q.unsafe_grab?(runnables, 4).should be_nil + end + + it "grabs fibers" do + q = ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + fibers = 10.times.map { |i| Fiber.new(name: "f#{i}") { } }.to_a + fibers.each { |f| q.unsafe_push(f) } + + runnables = ExecutionContext::Runnables(6).new(q) + fiber = q.unsafe_grab?(runnables, 4) + + # returned the last enqueued fiber + fiber.should be(fibers[9]) + + # enqueued the next 2 fibers + runnables.size.should eq(2) + runnables.get?.should be(fibers[8]) + runnables.get?.should be(fibers[7]) + + # the remaining fibers are still there: + 6.downto(0).each do |i| + q.unsafe_pop?.should be(fibers[i]) + end + end + + it "can't grab more than available" do + f = Fiber.new { } + q = ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + q.unsafe_push(f) + + # dequeues the unique fiber + runnables = ExecutionContext::Runnables(6).new(q) + fiber = q.unsafe_grab?(runnables, 4) + fiber.should be(f) + + # had nothing left to dequeue + runnables.size.should eq(0) + end + + it "clamps divisor to 1" do + f = Fiber.new { } + q = ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + q.unsafe_push(f) + + # dequeues the unique fiber + runnables = ExecutionContext::Runnables(6).new(q) + fiber = q.unsafe_grab?(runnables, 0) + fiber.should be(f) + + # had nothing left to dequeue + runnables.size.should eq(0) + end + end + + # interpreter doesn't support threads yet (#14287) + pending_interpreted describe: "thread safety" do + it "one by one" do + fibers = StaticArray(ExecutionContext::FiberCounter, 763).new do |i| + ExecutionContext::FiberCounter.new(Fiber.new(name: "f#{i}") { }) + end + + n = 7 + increments = 15 + queue = ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + ready = Thread::WaitGroup.new(n) + shutdown = Thread::WaitGroup.new(n) + + n.times do |i| + Thread.new(name: "ONE-#{i}") do |thread| + slept = 0 + ready.done + + loop do + if fiber = queue.pop? + fc = fibers.find { |x| x.@fiber == fiber }.not_nil! + queue.push(fiber) if fc.increment < increments + slept = 0 + elsif slept < 100 + slept += 1 + Thread.sleep(1.nanosecond) # don't burn CPU + else + break + end + end + rescue exception + Crystal::System.print_error "\nthread: #{thread.name}: exception: #{exception}" + ensure + shutdown.done + end + end + ready.wait + + fibers.each_with_index do |fc, i| + queue.push(fc.@fiber) + Thread.sleep(10.nanoseconds) if i % 10 == 9 + end + + shutdown.wait + + # must have dequeued each fiber exactly X times + fibers.each { |fc| fc.counter.should eq(increments) } + end + + it "bulk operations" do + n = 7 + increments = 15 + + fibers = StaticArray(ExecutionContext::FiberCounter, 765).new do |i| # 765 can be divided by 3 and 5 + ExecutionContext::FiberCounter.new(Fiber.new(name: "f#{i}") { }) + end + + queue = ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + ready = Thread::WaitGroup.new(n) + shutdown = Thread::WaitGroup.new(n) + + n.times do |i| + Thread.new(name: "BULK-#{i}") do |thread| + slept = 0 + + r = ExecutionContext::Runnables(3).new(queue) + + batch = Fiber::Queue.new + size = 0 + + reenqueue = -> { + if size > 0 + queue.bulk_push(pointerof(batch)) + names = [] of String? + batch.each { |f| names << f.name } + batch.clear + size = 0 + end + } + + execute = ->(fiber : Fiber) { + fc = fibers.find { |x| x.@fiber == fiber }.not_nil! + + if fc.increment < increments + batch.push(fc.@fiber) + size += 1 + end + } + + ready.done + + loop do + if fiber = r.get? + execute.call(fiber) + slept = 0 + next + end + + if fiber = queue.grab?(r, 1) + reenqueue.call + execute.call(fiber) + slept = 0 + next + end + + if slept >= 100 + break + end + + reenqueue.call + slept += 1 + Thread.sleep(1.nanosecond) # don't burn CPU + end + rescue exception + Crystal::System.print_error "\nthread #{thread.name} raised: #{exception}" + ensure + shutdown.done + end + end + ready.wait + + # enqueue in batches of 5 + 0.step(to: fibers.size - 1, by: 5) do |i| + q = Fiber::Queue.new + 5.times { |j| q.push(fibers[i + j].@fiber) } + queue.bulk_push(pointerof(q)) + Thread.sleep(10.nanoseconds) if i % 4 == 3 + end + + shutdown.wait + + # must have dequeued each fiber exactly X times (no less, no more) + fibers.each { |fc| fc.counter.should eq(increments) } + end + end +end diff --git a/spec/std/execution_context/runnables_spec.cr b/spec/std/execution_context/runnables_spec.cr new file mode 100644 index 000000000000..6fa342675402 --- /dev/null +++ b/spec/std/execution_context/runnables_spec.cr @@ -0,0 +1,264 @@ +require "./spec_helper" + +describe ExecutionContext::Runnables do + it "#initialize" do + g = ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + r = ExecutionContext::Runnables(16).new(g) + r.capacity.should eq(16) + end + + describe "#push" do + it "enqueues the fiber in local queue" do + fibers = 4.times.map { |i| Fiber.new(name: "f#{i}") { } }.to_a + + # local enqueue + g = ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + r = ExecutionContext::Runnables(4).new(g) + fibers.each { |f| r.push(f) } + + # local dequeue + fibers.each { |f| r.get?.should be(f) } + r.get?.should be_nil + + # didn't push to global queue + g.pop?.should be_nil + end + + it "moves half the local queue to the global queue on overflow" do + fibers = 5.times.map { |i| Fiber.new(name: "f#{i}") { } }.to_a + + # local enqueue + overflow + g = ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + r = ExecutionContext::Runnables(4).new(g) + fibers.each { |f| r.push(f) } + + # kept half of local queue + r.get?.should be(fibers[2]) + r.get?.should be(fibers[3]) + + # moved half of local queue + last push to global queue + g.pop?.should eq(fibers[0]) + g.pop?.should eq(fibers[1]) + g.pop?.should eq(fibers[4]) + end + + it "can always push up to capacity" do + g = ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + r = ExecutionContext::Runnables(4).new(g) + + 4.times do + # local + 4.times { r.push(Fiber.new { }) } + 2.times { r.get? } + 2.times { r.push(Fiber.new { }) } + + # overflow (2+1 fibers are sent to global queue + 1 local) + 2.times { r.push(Fiber.new { }) } + + # clear + 3.times { r.get? } + end + + # on each iteration we pushed 2+1 fibers to the global queue + g.size.should eq(12) + + # grab fibers back from the global queue + fiber = g.unsafe_grab?(r, divisor: 1) + fiber.should_not be_nil + r.get?.should_not be_nil + r.get?.should be_nil + end + end + + describe "#bulk_push" do + it "fills the local queue" do + q = Fiber::Queue.new + fibers = 4.times.map { |i| Fiber.new(name: "f#{i}") { } }.to_a + fibers.each { |f| q.push(f) } + + # local enqueue + g = ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + r = ExecutionContext::Runnables(4).new(g) + r.bulk_push(pointerof(q)) + + fibers.reverse_each { |f| r.get?.should be(f) } + g.empty?.should be_true + end + + it "pushes the overflow to the global queue" do + q = Fiber::Queue.new + fibers = 7.times.map { |i| Fiber.new(name: "f#{i}") { } }.to_a + fibers.each { |f| q.push(f) } + + # local enqueue + overflow + g = ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + r = ExecutionContext::Runnables(4).new(g) + r.bulk_push(pointerof(q)) + + # filled the local queue + r.get?.should eq(fibers[6]) + r.get?.should eq(fibers[5]) + r.get?.should be(fibers[4]) + r.get?.should be(fibers[3]) + + # moved the rest to the global queue + g.pop?.should eq(fibers[2]) + g.pop?.should eq(fibers[1]) + g.pop?.should eq(fibers[0]) + end + end + + describe "#get?" do + # TODO: need specific tests (though we already use it in the above tests?) + end + + describe "#steal_from" do + it "steals from another runnables" do + g = ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + fibers = 6.times.map { |i| Fiber.new(name: "f#{i}") { } }.to_a + + # fill the source queue + r1 = ExecutionContext::Runnables(16).new(g) + fibers.each { |f| r1.push(f) } + + # steal from source queue + r2 = ExecutionContext::Runnables(16).new(g) + fiber = r2.steal_from(r1) + + # stole half of the runnable fibers + fiber.should be(fibers[2]) + r2.get?.should be(fibers[0]) + r2.get?.should be(fibers[1]) + r2.get?.should be_nil + + # left the other half + r1.get?.should be(fibers[3]) + r1.get?.should be(fibers[4]) + r1.get?.should be(fibers[5]) + r1.get?.should be_nil + + # global queue is left untouched + g.empty?.should be_true + end + + it "steals the last fiber" do + g = ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + lone = Fiber.new(name: "lone") { } + + # fill the source queue + r1 = ExecutionContext::Runnables(16).new(g) + r1.push(lone) + + # steal from source queue + r2 = ExecutionContext::Runnables(16).new(g) + fiber = r2.steal_from(r1) + + # stole the fiber & local queue is still empty + fiber.should be(lone) + r2.get?.should be_nil + + # left nothing in original queue + r1.get?.should be_nil + + # global queue is left untouched + g.empty?.should be_true + end + + it "steals nothing" do + g = ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + r1 = ExecutionContext::Runnables(16).new(g) + r2 = ExecutionContext::Runnables(16).new(g) + + fiber = r2.steal_from(r1) + fiber.should be_nil + r2.get?.should be_nil + r1.get?.should be_nil + end + end + + # interpreter doesn't support threads yet (#14287) + pending_interpreted describe: "thread safety" do + it "stress test" do + n = 7 + increments = 7919 + + # less fibers than space in runnables (so threads can starve) + # 54 is roughly half of 16 × 7 and can be divided by 9 (for batch enqueues below) + fibers = Array(ExecutionContext::FiberCounter).new(54) do |i| + ExecutionContext::FiberCounter.new(Fiber.new(name: "f#{i}") { }) + end + + global_queue = ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + ready = Thread::WaitGroup.new(n) + shutdown = Thread::WaitGroup.new(n) + + all_runnables = Array(ExecutionContext::Runnables(16)).new(n) do + ExecutionContext::Runnables(16).new(global_queue) + end + + n.times do |i| + Thread.new(name: "RUN-#{i}") do |thread| + runnables = all_runnables[i] + slept = 0 + + execute = ->(fiber : Fiber) { + fc = fibers.find { |x| x.@fiber == fiber }.not_nil! + runnables.push(fiber) if fc.increment < increments + } + + ready.done + + loop do + # dequeue from local queue + if fiber = runnables.get? + execute.call(fiber) + slept = 0 + next + end + + # steal from another queue + while (r = all_runnables.sample) == runnables + end + if fiber = runnables.steal_from(r) + execute.call(fiber) + slept = 0 + next + end + + # dequeue from global queue + if fiber = global_queue.grab?(runnables, n) + execute.call(fiber) + slept = 0 + next + end + + if slept >= 100 + break + end + + slept += 1 + Thread.sleep(1.nanosecond) # don't burn CPU + end + rescue exception + Crystal::System.print_error "\nthread #{thread.name} raised: #{exception}" + ensure + shutdown.done + end + end + ready.wait + + # enqueue in batches + 0.step(to: fibers.size - 1, by: 9) do |i| + q = Fiber::Queue.new + 9.times { |j| q.push(fibers[i + j].@fiber) } + global_queue.bulk_push(pointerof(q)) + Thread.sleep(10.nanoseconds) if i % 2 == 1 + end + + shutdown.wait + + # must have dequeued each fiber exactly X times (no less, no more) + fibers.each { |fc| fc.counter.should eq(increments) } + end + end +end diff --git a/spec/std/execution_context/spec_helper.cr b/spec/std/execution_context/spec_helper.cr new file mode 100644 index 000000000000..9a1dbb881cee --- /dev/null +++ b/spec/std/execution_context/spec_helper.cr @@ -0,0 +1,21 @@ +require "../spec_helper" +require "crystal/system/thread_wait_group" +require "execution_context/runnables" +require "execution_context/global_queue" + +module ExecutionContext + class FiberCounter + def initialize(@fiber : Fiber) + @counter = Atomic(Int32).new(0) + end + + # fetch and add + def increment + @counter.add(1, :relaxed) + 1 + end + + def counter + @counter.get(:relaxed) + end + end +end diff --git a/src/execution_context/global_queue.cr b/src/execution_context/global_queue.cr new file mode 100644 index 000000000000..22535ab01ed6 --- /dev/null +++ b/src/execution_context/global_queue.cr @@ -0,0 +1,104 @@ +# The queue is a port of Go's `globrunq*` functions, distributed under a +# BSD-like license: +# + +require "../fiber/queue" +require "./runnables" + +module ExecutionContext + # Global queue of runnable fibers. + # Unbounded. + # Shared by all schedulers in an execution context. + # + # Basically a `Fiber::Queue` wrapped in a `Thread::Mutex`, at the exception of + # the `#grab?` method that tries to grab 1/Nth of the queue at once. + class GlobalQueue + def initialize(@mutex : Thread::Mutex) + @queue = Fiber::Queue.new + end + + # Grabs the lock and enqueues a runnable fiber on the global runnable queue. + def push(fiber : Fiber) : Nil + @mutex.synchronize { unsafe_push(fiber) } + end + + # Enqueues a runnable fiber on the global runnable queue. Assumes the lock + # is currently held. + def unsafe_push(fiber : Fiber) : Nil + @queue.push(fiber) + end + + # Grabs the lock and puts a runnable fiber on the global runnable queue. + def bulk_push(queue : Fiber::Queue*) : Nil + @mutex.synchronize { unsafe_bulk_push(queue) } + end + + # Puts a runnable fiber on the global runnable queue. Assumes the lock is + # currently held. + def unsafe_bulk_push(queue : Fiber::Queue*) : Nil + @queue.bulk_unshift(queue) + end + + # Grabs the lock and dequeues one runnable fiber from the global runnable + # queue. + def pop? : Fiber? + @mutex.synchronize { unsafe_pop? } + end + + # Dequeues one runnable fiber from the global runnable queue. Assumes the + # lock is currently held. + def unsafe_pop? : Fiber? + @queue.pop? + end + + # Grabs the lock then tries to grab a batch of fibers from the global + # runnable queue. Returns the next runnable fiber or `nil` if the queue was + # empty. + # + # `divisor` is meant for fair distribution of fibers across threads in the + # execution context; it should be the number of threads. + def grab?(runnables : Runnables, divisor : Int32) : Fiber? + @mutex.synchronize { unsafe_grab?(runnables, divisor) } + end + + # Try to grab a batch of fibers from the global runnable queue. Returns the + # next runnable fiber or `nil` if the queue was empty. Assumes the lock is + # currently held. + # + # `divisor` is meant for fair distribution of fibers across threads in the + # execution context; it should be the number of threads. + def unsafe_grab?(runnables : Runnables, divisor : Int32) : Fiber? + # ported from Go: globrunqget + return if @queue.empty? + + divisor = 1 if divisor < 1 + size = @queue.size + + n = { + size, # can't grab more than available + size // divisor + 1, # divide + try to take at least 1 fiber + runnables.capacity // 2, # refill half the destination queue + }.min + + fiber = @queue.pop? + + # OPTIMIZE: q = @queue.split(n - 1) then `runnables.push(pointerof(q))` (?) + (n - 1).times do + break unless f = @queue.pop? + runnables.push(f) + end + + fiber + end + + @[AlwaysInline] + def empty? : Bool + @queue.empty? + end + + @[AlwaysInline] + def size : Int32 + @queue.size + end + end +end diff --git a/src/execution_context/runnables.cr b/src/execution_context/runnables.cr new file mode 100644 index 000000000000..6be2fda446c0 --- /dev/null +++ b/src/execution_context/runnables.cr @@ -0,0 +1,210 @@ +# The queue is a port of Go's `runq*` functions, distributed under a BSD-like +# license: +# +# The queue derivates from the chase-lev lock-free queue with adaptations: +# +# - single ring buffer (per scheduler); +# - on overflow: bulk push half the ring to `GlobalQueue`; +# - on empty: bulk grab up to half the ring from `GlobalQueue`; +# - bulk push operation; + +require "../fiber/queue" +require "./global_queue" + +module ExecutionContext + # :nodoc: + # + # Local queue or runnable fibers for schedulers. + # Bounded. + # First-in, first-out semantics (FIFO). + # Single producer, multiple consumers thread safety. + # + # Private to an execution context scheduler, except for stealing methods that + # can be called from any thread in the execution context. + class Runnables(N) + def initialize(@global_queue : GlobalQueue) + # head is an index to the buffer where the next fiber to dequeue is. + # + # tail is an index to the buffer where the next fiber to enqueue will be + # (on the next push). + # + # head is always behind tail (not empty) or equal (empty) but never after + # tail (the queue would have a negative size => bug). + @head = Atomic(UInt32).new(0) + @tail = Atomic(UInt32).new(0) + @buffer = uninitialized Fiber[N] + end + + @[AlwaysInline] + def capacity : Int32 + N + end + + # Tries to push fiber on the local runnable queue. If the run queue is full, + # pushes fiber on the global queue, which will grab the global lock. + # + # Executed only by the owner. + def push(fiber : Fiber) : Nil + # ported from Go: runqput + loop do + head = @head.get(:acquire) # sync with consumers + tail = @tail.get(:relaxed) + + if (tail &- head) < N + # put fiber to local queue + @buffer.to_unsafe[tail % N] = fiber + + # make the fiber available for consumption + @tail.set(tail &+ 1, :release) + return + end + + if push_slow(fiber, head, tail) + return + end + + # failed to advance head (another scheduler stole fibers), + # the queue isn't full, now the push above must succeed + end + end + + private def push_slow(fiber : Fiber, head : UInt32, tail : UInt32) : Bool + # ported from Go: runqputslow + n = (tail &- head) // 2 + raise "BUG: queue is not full" if n != N // 2 + + # first, try to grab half of the fibers from local queue + batch = uninitialized Fiber[N] # actually N // 2 + 1 but that doesn't compile + n.times do |i| + batch.to_unsafe[i] = @buffer.to_unsafe[(head &+ i) % N] + end + _, success = @head.compare_and_set(head, head &+ n, :acquire_release, :acquire) + return false unless success + + # append fiber to the batch + batch.to_unsafe[n] = fiber + + # link the fibers + n.times do |i| + batch.to_unsafe[i].schedlink = batch.to_unsafe[i &+ 1] + end + queue = Fiber::Queue.new(batch.to_unsafe[0], batch.to_unsafe[n], size: (n &+ 1).to_i32) + + # now put the batch on global queue (grabs the global lock) + @global_queue.bulk_push(pointerof(queue)) + + true + end + + # Tries to enqueue all the fibers in `queue` into the local queue. If the + # queue is full, the overflow will be pushed to the global queue; in that + # case this will temporarily acquire the global queue lock. + # + # Executed only by the owner. + def bulk_push(queue : Fiber::Queue*) : Nil + # ported from Go: runqputbatch + head = @head.get(:acquire) # sync with other consumers + tail = @tail.get(:relaxed) + + while !queue.value.empty? && (tail &- head) < N + fiber = queue.value.pop + @buffer.to_unsafe[tail % N] = fiber + tail &+= 1 + end + + # make the fibers available for consumption + @tail.set(tail, :release) + + # put any overflow on global queue + @global_queue.bulk_push(queue) unless queue.value.empty? + end + + # Dequeues the next runnable fiber from the local queue. + # + # Executed only by the owner. + # TODO: rename as `#shift?` + def get? : Fiber? + # ported from Go: runqget + + head = @head.get(:acquire) # sync with other consumers + loop do + tail = @tail.get(:relaxed) + return if tail == head + + fiber = @buffer.to_unsafe[head % N] + head, success = @head.compare_and_set(head, head &+ 1, :acquire_release, :acquire) + return fiber if success + end + end + + # Steals half the fibers from the local queue of `src` and puts them onto + # the local queue. Returns one of the stolen fibers, or `nil` on failure. + # + # Only executed from the owner (when the local queue is empty). + def steal_from(src : Runnables) : Fiber? + # ported from Go: runqsteal + + tail = @tail.get(:relaxed) + n = src.grab(@buffer.to_unsafe, tail) + return if n == 0 + + # 'dequeue' last fiber from @buffer + n &-= 1 + fiber = @buffer.to_unsafe[(tail &+ n) % N] + return fiber if n == 0 + + head = @head.get(:acquire) # sync with consumers + if tail &- head &+ n >= N + raise "BUG: local queue overflow" + end + + # make the fibers available for consumption + @tail.set(tail &+ n, :release) + + fiber + end + + # Grabs a batch of fibers from local queue into `buffer` of size N (normally + # the ring buffer of another `Runnables`) starting at `buffer_head`. Returns + # number of grabbed fibers. + # + # Can be executed by any scheduler. + protected def grab(buffer : Fiber*, buffer_head : UInt32) : UInt32 + # ported from Go: runqgrab + + head = @head.get(:acquire) # sync with other consumers + loop do + tail = @tail.get(:acquire) # sync with the producer + + n = tail &- head + n -= n // 2 + return 0_u32 if n == 0 # queue is empty + + if n > N // 2 + # read inconsistent head and tail + head = @head.get(:acquire) + next + end + + n.times do |i| + fiber = @buffer.to_unsafe[(head &+ i) % N] + buffer[(buffer_head &+ i) % N] = fiber + end + + # try to mark the fiber as consumed + head, success = @head.compare_and_set(head, head &+ n, :acquire_release, :acquire) + return n if success + end + end + + @[AlwaysInline] + def empty? : Bool + @head.get(:relaxed) == @tail.get(:relaxed) + end + + @[AlwaysInline] + def size : UInt32 + @tail.get(:relaxed) &- @head.get(:relaxed) + end + end +end From 4bffd310883d46bc8f0c4f080eea71d82f62a417 Mon Sep 17 00:00:00 2001 From: Julien Portalier Date: Mon, 20 Jan 2025 18:42:34 +0100 Subject: [PATCH 04/14] Move to Fiber::ExecutionContext --- .../execution_context/global_queue_spec.cr | 36 ++++++------- .../execution_context/runnables_spec.cr | 54 +++++++++---------- .../execution_context/spec_helper.cr | 8 +-- .../execution_context/global_queue.cr | 4 +- .../execution_context/runnables.cr | 4 +- 5 files changed, 53 insertions(+), 53 deletions(-) rename spec/std/{ => fiber}/execution_context/global_queue_spec.cr (79%) rename spec/std/{ => fiber}/execution_context/runnables_spec.cr (77%) rename spec/std/{ => fiber}/execution_context/spec_helper.cr (66%) rename src/{ => fiber}/execution_context/global_queue.cr (98%) rename src/{ => fiber}/execution_context/runnables.cr (99%) diff --git a/spec/std/execution_context/global_queue_spec.cr b/spec/std/fiber/execution_context/global_queue_spec.cr similarity index 79% rename from spec/std/execution_context/global_queue_spec.cr rename to spec/std/fiber/execution_context/global_queue_spec.cr index 838a31406c01..7f3624aedaa2 100644 --- a/spec/std/execution_context/global_queue_spec.cr +++ b/spec/std/fiber/execution_context/global_queue_spec.cr @@ -1,8 +1,8 @@ require "./spec_helper" -describe ExecutionContext::GlobalQueue do +describe Fiber::ExecutionContext::GlobalQueue do it "#initialize" do - q = ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + q = Fiber::ExecutionContext::GlobalQueue.new(Thread::Mutex.new) q.empty?.should be_true end @@ -11,7 +11,7 @@ describe ExecutionContext::GlobalQueue do f2 = Fiber.new(name: "f2") { } f3 = Fiber.new(name: "f3") { } - q = ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + q = Fiber::ExecutionContext::GlobalQueue.new(Thread::Mutex.new) q.unsafe_push(f1) q.size.should eq(1) @@ -31,17 +31,17 @@ describe ExecutionContext::GlobalQueue do describe "#unsafe_grab?" do it "can't grab from empty queue" do - q = ExecutionContext::GlobalQueue.new(Thread::Mutex.new) - runnables = ExecutionContext::Runnables(6).new(q) + q = Fiber::ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + runnables = Fiber::ExecutionContext::Runnables(6).new(q) q.unsafe_grab?(runnables, 4).should be_nil end it "grabs fibers" do - q = ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + q = Fiber::ExecutionContext::GlobalQueue.new(Thread::Mutex.new) fibers = 10.times.map { |i| Fiber.new(name: "f#{i}") { } }.to_a fibers.each { |f| q.unsafe_push(f) } - runnables = ExecutionContext::Runnables(6).new(q) + runnables = Fiber::ExecutionContext::Runnables(6).new(q) fiber = q.unsafe_grab?(runnables, 4) # returned the last enqueued fiber @@ -60,11 +60,11 @@ describe ExecutionContext::GlobalQueue do it "can't grab more than available" do f = Fiber.new { } - q = ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + q = Fiber::ExecutionContext::GlobalQueue.new(Thread::Mutex.new) q.unsafe_push(f) # dequeues the unique fiber - runnables = ExecutionContext::Runnables(6).new(q) + runnables = Fiber::ExecutionContext::Runnables(6).new(q) fiber = q.unsafe_grab?(runnables, 4) fiber.should be(f) @@ -74,11 +74,11 @@ describe ExecutionContext::GlobalQueue do it "clamps divisor to 1" do f = Fiber.new { } - q = ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + q = Fiber::ExecutionContext::GlobalQueue.new(Thread::Mutex.new) q.unsafe_push(f) # dequeues the unique fiber - runnables = ExecutionContext::Runnables(6).new(q) + runnables = Fiber::ExecutionContext::Runnables(6).new(q) fiber = q.unsafe_grab?(runnables, 0) fiber.should be(f) @@ -90,13 +90,13 @@ describe ExecutionContext::GlobalQueue do # interpreter doesn't support threads yet (#14287) pending_interpreted describe: "thread safety" do it "one by one" do - fibers = StaticArray(ExecutionContext::FiberCounter, 763).new do |i| - ExecutionContext::FiberCounter.new(Fiber.new(name: "f#{i}") { }) + fibers = StaticArray(Fiber::ExecutionContext::FiberCounter, 763).new do |i| + Fiber::ExecutionContext::FiberCounter.new(Fiber.new(name: "f#{i}") { }) end n = 7 increments = 15 - queue = ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + queue = Fiber::ExecutionContext::GlobalQueue.new(Thread::Mutex.new) ready = Thread::WaitGroup.new(n) shutdown = Thread::WaitGroup.new(n) @@ -140,11 +140,11 @@ describe ExecutionContext::GlobalQueue do n = 7 increments = 15 - fibers = StaticArray(ExecutionContext::FiberCounter, 765).new do |i| # 765 can be divided by 3 and 5 - ExecutionContext::FiberCounter.new(Fiber.new(name: "f#{i}") { }) + fibers = StaticArray(Fiber::ExecutionContext::FiberCounter, 765).new do |i| # 765 can be divided by 3 and 5 + Fiber::ExecutionContext::FiberCounter.new(Fiber.new(name: "f#{i}") { }) end - queue = ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + queue = Fiber::ExecutionContext::GlobalQueue.new(Thread::Mutex.new) ready = Thread::WaitGroup.new(n) shutdown = Thread::WaitGroup.new(n) @@ -152,7 +152,7 @@ describe ExecutionContext::GlobalQueue do Thread.new(name: "BULK-#{i}") do |thread| slept = 0 - r = ExecutionContext::Runnables(3).new(queue) + r = Fiber::ExecutionContext::Runnables(3).new(queue) batch = Fiber::Queue.new size = 0 diff --git a/spec/std/execution_context/runnables_spec.cr b/spec/std/fiber/execution_context/runnables_spec.cr similarity index 77% rename from spec/std/execution_context/runnables_spec.cr rename to spec/std/fiber/execution_context/runnables_spec.cr index 6fa342675402..f53b76bafe45 100644 --- a/spec/std/execution_context/runnables_spec.cr +++ b/spec/std/fiber/execution_context/runnables_spec.cr @@ -1,9 +1,9 @@ require "./spec_helper" -describe ExecutionContext::Runnables do +describe Fiber::ExecutionContext::Runnables do it "#initialize" do - g = ExecutionContext::GlobalQueue.new(Thread::Mutex.new) - r = ExecutionContext::Runnables(16).new(g) + g = Fiber::ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + r = Fiber::ExecutionContext::Runnables(16).new(g) r.capacity.should eq(16) end @@ -12,8 +12,8 @@ describe ExecutionContext::Runnables do fibers = 4.times.map { |i| Fiber.new(name: "f#{i}") { } }.to_a # local enqueue - g = ExecutionContext::GlobalQueue.new(Thread::Mutex.new) - r = ExecutionContext::Runnables(4).new(g) + g = Fiber::ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + r = Fiber::ExecutionContext::Runnables(4).new(g) fibers.each { |f| r.push(f) } # local dequeue @@ -28,8 +28,8 @@ describe ExecutionContext::Runnables do fibers = 5.times.map { |i| Fiber.new(name: "f#{i}") { } }.to_a # local enqueue + overflow - g = ExecutionContext::GlobalQueue.new(Thread::Mutex.new) - r = ExecutionContext::Runnables(4).new(g) + g = Fiber::ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + r = Fiber::ExecutionContext::Runnables(4).new(g) fibers.each { |f| r.push(f) } # kept half of local queue @@ -43,8 +43,8 @@ describe ExecutionContext::Runnables do end it "can always push up to capacity" do - g = ExecutionContext::GlobalQueue.new(Thread::Mutex.new) - r = ExecutionContext::Runnables(4).new(g) + g = Fiber::ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + r = Fiber::ExecutionContext::Runnables(4).new(g) 4.times do # local @@ -77,8 +77,8 @@ describe ExecutionContext::Runnables do fibers.each { |f| q.push(f) } # local enqueue - g = ExecutionContext::GlobalQueue.new(Thread::Mutex.new) - r = ExecutionContext::Runnables(4).new(g) + g = Fiber::ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + r = Fiber::ExecutionContext::Runnables(4).new(g) r.bulk_push(pointerof(q)) fibers.reverse_each { |f| r.get?.should be(f) } @@ -91,8 +91,8 @@ describe ExecutionContext::Runnables do fibers.each { |f| q.push(f) } # local enqueue + overflow - g = ExecutionContext::GlobalQueue.new(Thread::Mutex.new) - r = ExecutionContext::Runnables(4).new(g) + g = Fiber::ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + r = Fiber::ExecutionContext::Runnables(4).new(g) r.bulk_push(pointerof(q)) # filled the local queue @@ -114,15 +114,15 @@ describe ExecutionContext::Runnables do describe "#steal_from" do it "steals from another runnables" do - g = ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + g = Fiber::ExecutionContext::GlobalQueue.new(Thread::Mutex.new) fibers = 6.times.map { |i| Fiber.new(name: "f#{i}") { } }.to_a # fill the source queue - r1 = ExecutionContext::Runnables(16).new(g) + r1 = Fiber::ExecutionContext::Runnables(16).new(g) fibers.each { |f| r1.push(f) } # steal from source queue - r2 = ExecutionContext::Runnables(16).new(g) + r2 = Fiber::ExecutionContext::Runnables(16).new(g) fiber = r2.steal_from(r1) # stole half of the runnable fibers @@ -142,15 +142,15 @@ describe ExecutionContext::Runnables do end it "steals the last fiber" do - g = ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + g = Fiber::ExecutionContext::GlobalQueue.new(Thread::Mutex.new) lone = Fiber.new(name: "lone") { } # fill the source queue - r1 = ExecutionContext::Runnables(16).new(g) + r1 = Fiber::ExecutionContext::Runnables(16).new(g) r1.push(lone) # steal from source queue - r2 = ExecutionContext::Runnables(16).new(g) + r2 = Fiber::ExecutionContext::Runnables(16).new(g) fiber = r2.steal_from(r1) # stole the fiber & local queue is still empty @@ -165,9 +165,9 @@ describe ExecutionContext::Runnables do end it "steals nothing" do - g = ExecutionContext::GlobalQueue.new(Thread::Mutex.new) - r1 = ExecutionContext::Runnables(16).new(g) - r2 = ExecutionContext::Runnables(16).new(g) + g = Fiber::ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + r1 = Fiber::ExecutionContext::Runnables(16).new(g) + r2 = Fiber::ExecutionContext::Runnables(16).new(g) fiber = r2.steal_from(r1) fiber.should be_nil @@ -184,16 +184,16 @@ describe ExecutionContext::Runnables do # less fibers than space in runnables (so threads can starve) # 54 is roughly half of 16 × 7 and can be divided by 9 (for batch enqueues below) - fibers = Array(ExecutionContext::FiberCounter).new(54) do |i| - ExecutionContext::FiberCounter.new(Fiber.new(name: "f#{i}") { }) + fibers = Array(Fiber::ExecutionContext::FiberCounter).new(54) do |i| + Fiber::ExecutionContext::FiberCounter.new(Fiber.new(name: "f#{i}") { }) end - global_queue = ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + global_queue = Fiber::ExecutionContext::GlobalQueue.new(Thread::Mutex.new) ready = Thread::WaitGroup.new(n) shutdown = Thread::WaitGroup.new(n) - all_runnables = Array(ExecutionContext::Runnables(16)).new(n) do - ExecutionContext::Runnables(16).new(global_queue) + all_runnables = Array(Fiber::ExecutionContext::Runnables(16)).new(n) do + Fiber::ExecutionContext::Runnables(16).new(global_queue) end n.times do |i| diff --git a/spec/std/execution_context/spec_helper.cr b/spec/std/fiber/execution_context/spec_helper.cr similarity index 66% rename from spec/std/execution_context/spec_helper.cr rename to spec/std/fiber/execution_context/spec_helper.cr index 9a1dbb881cee..0eca123b37bc 100644 --- a/spec/std/execution_context/spec_helper.cr +++ b/spec/std/fiber/execution_context/spec_helper.cr @@ -1,9 +1,9 @@ -require "../spec_helper" +require "../../spec_helper" require "crystal/system/thread_wait_group" -require "execution_context/runnables" -require "execution_context/global_queue" +require "fiber/execution_context/runnables" +require "fiber/execution_context/global_queue" -module ExecutionContext +module Fiber::ExecutionContext class FiberCounter def initialize(@fiber : Fiber) @counter = Atomic(Int32).new(0) diff --git a/src/execution_context/global_queue.cr b/src/fiber/execution_context/global_queue.cr similarity index 98% rename from src/execution_context/global_queue.cr rename to src/fiber/execution_context/global_queue.cr index 22535ab01ed6..28e0d5dc8f75 100644 --- a/src/execution_context/global_queue.cr +++ b/src/fiber/execution_context/global_queue.cr @@ -2,10 +2,10 @@ # BSD-like license: # -require "../fiber/queue" +require "../queue" require "./runnables" -module ExecutionContext +module Fiber::ExecutionContext # Global queue of runnable fibers. # Unbounded. # Shared by all schedulers in an execution context. diff --git a/src/execution_context/runnables.cr b/src/fiber/execution_context/runnables.cr similarity index 99% rename from src/execution_context/runnables.cr rename to src/fiber/execution_context/runnables.cr index 6be2fda446c0..fcdb5181306d 100644 --- a/src/execution_context/runnables.cr +++ b/src/fiber/execution_context/runnables.cr @@ -8,10 +8,10 @@ # - on empty: bulk grab up to half the ring from `GlobalQueue`; # - bulk push operation; -require "../fiber/queue" +require "../queue" require "./global_queue" -module ExecutionContext +module Fiber::ExecutionContext # :nodoc: # # Local queue or runnable fibers for schedulers. From 7c67d2751a818d50761ce255d4b0c179c60b3119 Mon Sep 17 00:00:00 2001 From: Julien Portalier Date: Mon, 20 Jan 2025 18:45:04 +0100 Subject: [PATCH 05/14] Fix: mark GlobalQueue as :nodoc: --- src/fiber/execution_context/global_queue.cr | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/fiber/execution_context/global_queue.cr b/src/fiber/execution_context/global_queue.cr index 28e0d5dc8f75..89997537ecb7 100644 --- a/src/fiber/execution_context/global_queue.cr +++ b/src/fiber/execution_context/global_queue.cr @@ -6,6 +6,8 @@ require "../queue" require "./runnables" module Fiber::ExecutionContext + # :nodoc: + # # Global queue of runnable fibers. # Unbounded. # Shared by all schedulers in an execution context. From c829ecf979851f730b484a29c52296190e85a5aa Mon Sep 17 00:00:00 2001 From: Julien Portalier Date: Mon, 3 Feb 2025 10:43:34 +0100 Subject: [PATCH 06/14] Renamed Fiber#schedlink as #queue_next --- spec/std/fiber/queue_spec.cr | 58 ++++++++++++------------ src/fiber.cr | 2 +- src/fiber/execution_context/runnables.cr | 2 +- src/fiber/queue.cr | 12 ++--- 4 files changed, 37 insertions(+), 37 deletions(-) diff --git a/spec/std/fiber/queue_spec.cr b/spec/std/fiber/queue_spec.cr index 39c67bc52bee..b3e59a20085e 100644 --- a/spec/std/fiber/queue_spec.cr +++ b/spec/std/fiber/queue_spec.cr @@ -14,8 +14,8 @@ describe Fiber::Queue do it "creates a filled queue" do f1 = Fiber.new(name: "f1") { } f2 = Fiber.new(name: "f2") { } - f1.schedlink = f2 - f2.schedlink = nil + f1.queue_next = f2 + f2.queue_next = nil q = Fiber::Queue.new(f2, f1, size: 2) q.@head.should be(f2) @@ -33,31 +33,31 @@ describe Fiber::Queue do f3 = Fiber.new(name: "f3") { } # simulate fibers previously added to other queues - f1.schedlink = f3 - f2.schedlink = f1 + f1.queue_next = f3 + f2.queue_next = f1 # push first fiber q.push(f1) q.@head.should be(f1) q.@tail.should be(f1) - f1.schedlink.should be_nil + f1.queue_next.should be_nil q.size.should eq(1) # push second fiber q.push(f2) q.@head.should be(f2) q.@tail.should be(f1) - f2.schedlink.should be(f1) - f1.schedlink.should be_nil + f2.queue_next.should be(f1) + f1.queue_next.should be_nil q.size.should eq(2) # push third fiber q.push(f3) q.@head.should be(f3) q.@tail.should be(f1) - f3.schedlink.should be(f2) - f2.schedlink.should be(f1) - f1.schedlink.should be_nil + f3.queue_next.should be(f2) + f2.queue_next.should be(f1) + f1.queue_next.should be_nil q.size.should eq(3) end end @@ -68,9 +68,9 @@ describe Fiber::Queue do f1 = Fiber.new(name: "f1") { } f2 = Fiber.new(name: "f2") { } f3 = Fiber.new(name: "f3") { } - f3.schedlink = f2 - f2.schedlink = f1 - f1.schedlink = nil + f3.queue_next = f2 + f2.queue_next = f1 + f1.queue_next = nil q1 = Fiber::Queue.new(f3, f1, size: 3) # push in bulk @@ -89,14 +89,14 @@ describe Fiber::Queue do f5 = Fiber.new(name: "f5") { } # source queue - f3.schedlink = f2 - f2.schedlink = f1 - f1.schedlink = nil + f3.queue_next = f2 + f2.queue_next = f1 + f1.queue_next = nil q1 = Fiber::Queue.new(f3, f1, size: 3) # destination queue - f5.schedlink = f4 - f4.schedlink = nil + f5.queue_next = f4 + f4.queue_next = nil q2 = Fiber::Queue.new(f5, f4, size: 2) # push in bulk @@ -105,11 +105,11 @@ describe Fiber::Queue do q2.@tail.should be(f1) q2.size.should eq(5) - f5.schedlink.should be(f4) - f4.schedlink.should be(f3) - f3.schedlink.should be(f2) - f2.schedlink.should be(f1) - f1.schedlink.should be(nil) + f5.queue_next.should be(f4) + f4.queue_next.should be(f3) + f3.queue_next.should be(f2) + f2.queue_next.should be(f1) + f1.queue_next.should be(nil) end end @@ -118,9 +118,9 @@ describe Fiber::Queue do f1 = Fiber.new(name: "f1") { } f2 = Fiber.new(name: "f2") { } f3 = Fiber.new(name: "f3") { } - f3.schedlink = f2 - f2.schedlink = f1 - f1.schedlink = nil + f3.queue_next = f2 + f2.queue_next = f1 + f1.queue_next = nil q = Fiber::Queue.new(f3, f1, size: 3) # removes third element @@ -152,9 +152,9 @@ describe Fiber::Queue do f1 = Fiber.new(name: "f1") { } f2 = Fiber.new(name: "f2") { } f3 = Fiber.new(name: "f3") { } - f3.schedlink = f2 - f2.schedlink = f1 - f1.schedlink = nil + f3.queue_next = f2 + f2.queue_next = f1 + f1.queue_next = nil q = Fiber::Queue.new(f3, f1, size: 3) # removes third element diff --git a/src/fiber.cr b/src/fiber.cr index 2697b9b6283e..1e35a4c38fac 100644 --- a/src/fiber.cr +++ b/src/fiber.cr @@ -67,7 +67,7 @@ class Fiber property previous : Fiber? # :nodoc: - property schedlink : Fiber? + property queue_next : Fiber? # :nodoc: def self.inactive(fiber : Fiber) diff --git a/src/fiber/execution_context/runnables.cr b/src/fiber/execution_context/runnables.cr index fcdb5181306d..0fc00109be74 100644 --- a/src/fiber/execution_context/runnables.cr +++ b/src/fiber/execution_context/runnables.cr @@ -86,7 +86,7 @@ module Fiber::ExecutionContext # link the fibers n.times do |i| - batch.to_unsafe[i].schedlink = batch.to_unsafe[i &+ 1] + batch.to_unsafe[i].queue_next = batch.to_unsafe[i &+ 1] end queue = Fiber::Queue.new(batch.to_unsafe[0], batch.to_unsafe[n], size: (n &+ 1).to_i32) diff --git a/src/fiber/queue.cr b/src/fiber/queue.cr index 9695ac8b6a83..d76fab2fe054 100644 --- a/src/fiber/queue.cr +++ b/src/fiber/queue.cr @@ -20,7 +20,7 @@ class Fiber end def push(fiber : Fiber) : Nil - fiber.schedlink = @head + fiber.queue_next = @head @head = fiber @tail = fiber if @tail.nil? @size += 1 @@ -28,10 +28,10 @@ class Fiber def bulk_unshift(queue : Queue*) : Nil return unless last = queue.value.@tail - last.schedlink = nil + last.queue_next = nil if tail = @tail - tail.schedlink = queue.value.@head + tail.queue_next = queue.value.@head else @head = queue.value.@head end @@ -52,10 +52,10 @@ class Fiber private def pop(&) if fiber = @head - @head = fiber.schedlink + @head = fiber.queue_next @tail = nil if @head.nil? @size -= 1 - fiber.schedlink = nil + fiber.queue_next = nil fiber else yield @@ -76,7 +76,7 @@ class Fiber cursor = @head while cursor yield cursor - cursor = cursor.schedlink + cursor = cursor.queue_next end end end From 9fc55503f880ab62756599ede16e0b6fa4443383 Mon Sep 17 00:00:00 2001 From: Julien Portalier Date: Mon, 3 Feb 2025 10:44:21 +0100 Subject: [PATCH 07/14] Apply suggestion from @straight-shoota --- src/fiber/queue.cr | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/fiber/queue.cr b/src/fiber/queue.cr index d76fab2fe054..e5e66b73eab1 100644 --- a/src/fiber/queue.cr +++ b/src/fiber/queue.cr @@ -35,7 +35,7 @@ class Fiber else @head = queue.value.@head end - @tail = queue.value.@tail + @tail = last @size += queue.value.size end From a0cb6b3afb2195ffeb7c9ba9d6b7b73c05241826 Mon Sep 17 00:00:00 2001 From: Julien Portalier Date: Mon, 3 Feb 2025 10:53:30 +0100 Subject: [PATCH 08/14] Fix: restrict N in --- src/fiber/execution_context/runnables.cr | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/fiber/execution_context/runnables.cr b/src/fiber/execution_context/runnables.cr index 0fc00109be74..b50be401fc8c 100644 --- a/src/fiber/execution_context/runnables.cr +++ b/src/fiber/execution_context/runnables.cr @@ -141,7 +141,7 @@ module Fiber::ExecutionContext # the local queue. Returns one of the stolen fibers, or `nil` on failure. # # Only executed from the owner (when the local queue is empty). - def steal_from(src : Runnables) : Fiber? + def steal_from(src : Runnables(N)) : Fiber? # ported from Go: runqsteal tail = @tail.get(:relaxed) From f86d3b09a0ffa6074feb4af620431237b47d0cf9 Mon Sep 17 00:00:00 2001 From: Julien Portalier Date: Mon, 3 Feb 2025 11:28:44 +0100 Subject: [PATCH 09/14] Rename Fiber::Queue as Fiber::List --- .../execution_context/global_queue_spec.cr | 8 +- .../fiber/execution_context/runnables_spec.cr | 18 +- spec/std/fiber/list_spec.cr | 183 ++++++++++++++++++ spec/std/fiber/queue_spec.cr | 183 ------------------ src/fiber.cr | 2 +- src/fiber/execution_context/global_queue.cr | 32 +-- src/fiber/execution_context/runnables.cr | 22 +-- src/fiber/{queue.cr => list.cr} | 40 ++-- 8 files changed, 247 insertions(+), 241 deletions(-) create mode 100644 spec/std/fiber/list_spec.cr delete mode 100644 spec/std/fiber/queue_spec.cr rename src/fiber/{queue.cr => list.cr} (50%) diff --git a/spec/std/fiber/execution_context/global_queue_spec.cr b/spec/std/fiber/execution_context/global_queue_spec.cr index 7f3624aedaa2..09777c46f393 100644 --- a/spec/std/fiber/execution_context/global_queue_spec.cr +++ b/spec/std/fiber/execution_context/global_queue_spec.cr @@ -154,7 +154,7 @@ describe Fiber::ExecutionContext::GlobalQueue do r = Fiber::ExecutionContext::Runnables(3).new(queue) - batch = Fiber::Queue.new + batch = Fiber::List.new size = 0 reenqueue = -> { @@ -210,9 +210,9 @@ describe Fiber::ExecutionContext::GlobalQueue do # enqueue in batches of 5 0.step(to: fibers.size - 1, by: 5) do |i| - q = Fiber::Queue.new - 5.times { |j| q.push(fibers[i + j].@fiber) } - queue.bulk_push(pointerof(q)) + list = Fiber::List.new + 5.times { |j| list.push(fibers[i + j].@fiber) } + queue.bulk_push(pointerof(list)) Thread.sleep(10.nanoseconds) if i % 4 == 3 end diff --git a/spec/std/fiber/execution_context/runnables_spec.cr b/spec/std/fiber/execution_context/runnables_spec.cr index f53b76bafe45..ffbb43e90e00 100644 --- a/spec/std/fiber/execution_context/runnables_spec.cr +++ b/spec/std/fiber/execution_context/runnables_spec.cr @@ -72,28 +72,28 @@ describe Fiber::ExecutionContext::Runnables do describe "#bulk_push" do it "fills the local queue" do - q = Fiber::Queue.new + l = Fiber::List.new fibers = 4.times.map { |i| Fiber.new(name: "f#{i}") { } }.to_a - fibers.each { |f| q.push(f) } + fibers.each { |f| l.push(f) } # local enqueue g = Fiber::ExecutionContext::GlobalQueue.new(Thread::Mutex.new) r = Fiber::ExecutionContext::Runnables(4).new(g) - r.bulk_push(pointerof(q)) + r.bulk_push(pointerof(l)) fibers.reverse_each { |f| r.get?.should be(f) } g.empty?.should be_true end it "pushes the overflow to the global queue" do - q = Fiber::Queue.new + l = Fiber::List.new fibers = 7.times.map { |i| Fiber.new(name: "f#{i}") { } }.to_a - fibers.each { |f| q.push(f) } + fibers.each { |f| l.push(f) } # local enqueue + overflow g = Fiber::ExecutionContext::GlobalQueue.new(Thread::Mutex.new) r = Fiber::ExecutionContext::Runnables(4).new(g) - r.bulk_push(pointerof(q)) + r.bulk_push(pointerof(l)) # filled the local queue r.get?.should eq(fibers[6]) @@ -249,9 +249,9 @@ describe Fiber::ExecutionContext::Runnables do # enqueue in batches 0.step(to: fibers.size - 1, by: 9) do |i| - q = Fiber::Queue.new - 9.times { |j| q.push(fibers[i + j].@fiber) } - global_queue.bulk_push(pointerof(q)) + list = Fiber::List.new + 9.times { |j| list.push(fibers[i + j].@fiber) } + global_queue.bulk_push(pointerof(list)) Thread.sleep(10.nanoseconds) if i % 2 == 1 end diff --git a/spec/std/fiber/list_spec.cr b/spec/std/fiber/list_spec.cr new file mode 100644 index 000000000000..fb27b865fa57 --- /dev/null +++ b/spec/std/fiber/list_spec.cr @@ -0,0 +1,183 @@ +require "../spec_helper" +require "fiber/list" + +describe Fiber::List do + describe "#initialize" do + it "creates an empty queue" do + list = Fiber::List.new + list.@head.should be_nil + list.@tail.should be_nil + list.size.should eq(0) + list.empty?.should be_true + end + + it "creates a filled queue" do + f1 = Fiber.new(name: "f1") { } + f2 = Fiber.new(name: "f2") { } + f1.list_next = f2 + f2.list_next = nil + + list = Fiber::List.new(f2, f1, size: 2) + list.@head.should be(f2) + list.@tail.should be(f1) + list.size.should eq(2) + list.empty?.should be_false + end + end + + describe "#push" do + it "to head" do + list = Fiber::List.new + f1 = Fiber.new(name: "f1") { } + f2 = Fiber.new(name: "f2") { } + f3 = Fiber.new(name: "f3") { } + + # simulate fibers previously added to other queues + f1.list_next = f3 + f2.list_next = f1 + + # push first fiber + list.push(f1) + list.@head.should be(f1) + list.@tail.should be(f1) + f1.list_next.should be_nil + list.size.should eq(1) + + # push second fiber + list.push(f2) + list.@head.should be(f2) + list.@tail.should be(f1) + f2.list_next.should be(f1) + f1.list_next.should be_nil + list.size.should eq(2) + + # push third fiber + list.push(f3) + list.@head.should be(f3) + list.@tail.should be(f1) + f3.list_next.should be(f2) + f2.list_next.should be(f1) + f1.list_next.should be_nil + list.size.should eq(3) + end + end + + describe "#bulk_unshift" do + it "to empty queue" do + # manually create a queue + f1 = Fiber.new(name: "f1") { } + f2 = Fiber.new(name: "f2") { } + f3 = Fiber.new(name: "f3") { } + f3.list_next = f2 + f2.list_next = f1 + f1.list_next = nil + q1 = Fiber::List.new(f3, f1, size: 3) + + # push in bulk + q2 = Fiber::List.new(nil, nil, size: 0) + q2.bulk_unshift(pointerof(q1)) + q2.@head.should be(f3) + q2.@tail.should be(f1) + q2.size.should eq(3) + end + + it "to filled queue" do + f1 = Fiber.new(name: "f1") { } + f2 = Fiber.new(name: "f2") { } + f3 = Fiber.new(name: "f3") { } + f4 = Fiber.new(name: "f4") { } + f5 = Fiber.new(name: "f5") { } + + # source queue + f3.list_next = f2 + f2.list_next = f1 + f1.list_next = nil + q1 = Fiber::List.new(f3, f1, size: 3) + + # destination queue + f5.list_next = f4 + f4.list_next = nil + q2 = Fiber::List.new(f5, f4, size: 2) + + # push in bulk + q2.bulk_unshift(pointerof(q1)) + q2.@head.should be(f5) + q2.@tail.should be(f1) + q2.size.should eq(5) + + f5.list_next.should be(f4) + f4.list_next.should be(f3) + f3.list_next.should be(f2) + f2.list_next.should be(f1) + f1.list_next.should be(nil) + end + end + + describe "#pop" do + it "from head" do + f1 = Fiber.new(name: "f1") { } + f2 = Fiber.new(name: "f2") { } + f3 = Fiber.new(name: "f3") { } + f3.list_next = f2 + f2.list_next = f1 + f1.list_next = nil + list = Fiber::List.new(f3, f1, size: 3) + + # removes third element + list.pop.should be(f3) + list.@head.should be(f2) + list.@tail.should be(f1) + list.size.should eq(2) + + # removes second element + list.pop.should be(f2) + list.@head.should be(f1) + list.@tail.should be(f1) + list.size.should eq(1) + + # removes first element + list.pop.should be(f1) + list.@head.should be_nil + list.@tail.should be_nil + list.size.should eq(0) + + # empty queue + expect_raises(IndexError) { list.pop } + list.size.should eq(0) + end + end + + describe "#pop?" do + it "from head" do + f1 = Fiber.new(name: "f1") { } + f2 = Fiber.new(name: "f2") { } + f3 = Fiber.new(name: "f3") { } + f3.list_next = f2 + f2.list_next = f1 + f1.list_next = nil + list = Fiber::List.new(f3, f1, size: 3) + + # removes third element + list.pop?.should be(f3) + list.@head.should be(f2) + list.@tail.should be(f1) + list.size.should eq(2) + + # removes second element + list.pop?.should be(f2) + list.@head.should be(f1) + list.@tail.should be(f1) + list.size.should eq(1) + + # removes first element + list.pop?.should be(f1) + list.@head.should be_nil + list.@tail.should be_nil + list.size.should eq(0) + + # empty queue + list.pop?.should be_nil + list.size.should eq(0) + end + end +end diff --git a/spec/std/fiber/queue_spec.cr b/spec/std/fiber/queue_spec.cr deleted file mode 100644 index b3e59a20085e..000000000000 --- a/spec/std/fiber/queue_spec.cr +++ /dev/null @@ -1,183 +0,0 @@ -require "../spec_helper" -require "fiber/queue" - -describe Fiber::Queue do - describe "#initialize" do - it "creates an empty queue" do - q = Fiber::Queue.new - q.@head.should be_nil - q.@tail.should be_nil - q.size.should eq(0) - q.empty?.should be_true - end - - it "creates a filled queue" do - f1 = Fiber.new(name: "f1") { } - f2 = Fiber.new(name: "f2") { } - f1.queue_next = f2 - f2.queue_next = nil - - q = Fiber::Queue.new(f2, f1, size: 2) - q.@head.should be(f2) - q.@tail.should be(f1) - q.size.should eq(2) - q.empty?.should be_false - end - end - - describe "#push" do - it "to head" do - q = Fiber::Queue.new - f1 = Fiber.new(name: "f1") { } - f2 = Fiber.new(name: "f2") { } - f3 = Fiber.new(name: "f3") { } - - # simulate fibers previously added to other queues - f1.queue_next = f3 - f2.queue_next = f1 - - # push first fiber - q.push(f1) - q.@head.should be(f1) - q.@tail.should be(f1) - f1.queue_next.should be_nil - q.size.should eq(1) - - # push second fiber - q.push(f2) - q.@head.should be(f2) - q.@tail.should be(f1) - f2.queue_next.should be(f1) - f1.queue_next.should be_nil - q.size.should eq(2) - - # push third fiber - q.push(f3) - q.@head.should be(f3) - q.@tail.should be(f1) - f3.queue_next.should be(f2) - f2.queue_next.should be(f1) - f1.queue_next.should be_nil - q.size.should eq(3) - end - end - - describe "#bulk_unshift" do - it "to empty queue" do - # manually create a queue - f1 = Fiber.new(name: "f1") { } - f2 = Fiber.new(name: "f2") { } - f3 = Fiber.new(name: "f3") { } - f3.queue_next = f2 - f2.queue_next = f1 - f1.queue_next = nil - q1 = Fiber::Queue.new(f3, f1, size: 3) - - # push in bulk - q2 = Fiber::Queue.new(nil, nil, size: 0) - q2.bulk_unshift(pointerof(q1)) - q2.@head.should be(f3) - q2.@tail.should be(f1) - q2.size.should eq(3) - end - - it "to filled queue" do - f1 = Fiber.new(name: "f1") { } - f2 = Fiber.new(name: "f2") { } - f3 = Fiber.new(name: "f3") { } - f4 = Fiber.new(name: "f4") { } - f5 = Fiber.new(name: "f5") { } - - # source queue - f3.queue_next = f2 - f2.queue_next = f1 - f1.queue_next = nil - q1 = Fiber::Queue.new(f3, f1, size: 3) - - # destination queue - f5.queue_next = f4 - f4.queue_next = nil - q2 = Fiber::Queue.new(f5, f4, size: 2) - - # push in bulk - q2.bulk_unshift(pointerof(q1)) - q2.@head.should be(f5) - q2.@tail.should be(f1) - q2.size.should eq(5) - - f5.queue_next.should be(f4) - f4.queue_next.should be(f3) - f3.queue_next.should be(f2) - f2.queue_next.should be(f1) - f1.queue_next.should be(nil) - end - end - - describe "#pop" do - it "from head" do - f1 = Fiber.new(name: "f1") { } - f2 = Fiber.new(name: "f2") { } - f3 = Fiber.new(name: "f3") { } - f3.queue_next = f2 - f2.queue_next = f1 - f1.queue_next = nil - q = Fiber::Queue.new(f3, f1, size: 3) - - # removes third element - q.pop.should be(f3) - q.@head.should be(f2) - q.@tail.should be(f1) - q.size.should eq(2) - - # removes second element - q.pop.should be(f2) - q.@head.should be(f1) - q.@tail.should be(f1) - q.size.should eq(1) - - # removes first element - q.pop.should be(f1) - q.@head.should be_nil - q.@tail.should be_nil - q.size.should eq(0) - - # empty queue - expect_raises(IndexError) { q.pop } - q.size.should eq(0) - end - end - - describe "#pop?" do - it "from head" do - f1 = Fiber.new(name: "f1") { } - f2 = Fiber.new(name: "f2") { } - f3 = Fiber.new(name: "f3") { } - f3.queue_next = f2 - f2.queue_next = f1 - f1.queue_next = nil - q = Fiber::Queue.new(f3, f1, size: 3) - - # removes third element - q.pop?.should be(f3) - q.@head.should be(f2) - q.@tail.should be(f1) - q.size.should eq(2) - - # removes second element - q.pop?.should be(f2) - q.@head.should be(f1) - q.@tail.should be(f1) - q.size.should eq(1) - - # removes first element - q.pop?.should be(f1) - q.@head.should be_nil - q.@tail.should be_nil - q.size.should eq(0) - - # empty queue - q.pop?.should be_nil - q.size.should eq(0) - end - end -end diff --git a/src/fiber.cr b/src/fiber.cr index 1e35a4c38fac..55d32e66283e 100644 --- a/src/fiber.cr +++ b/src/fiber.cr @@ -67,7 +67,7 @@ class Fiber property previous : Fiber? # :nodoc: - property queue_next : Fiber? + property list_next : Fiber? # :nodoc: def self.inactive(fiber : Fiber) diff --git a/src/fiber/execution_context/global_queue.cr b/src/fiber/execution_context/global_queue.cr index 89997537ecb7..9462e2a26dad 100644 --- a/src/fiber/execution_context/global_queue.cr +++ b/src/fiber/execution_context/global_queue.cr @@ -2,7 +2,7 @@ # BSD-like license: # -require "../queue" +require "../list" require "./runnables" module Fiber::ExecutionContext @@ -12,11 +12,11 @@ module Fiber::ExecutionContext # Unbounded. # Shared by all schedulers in an execution context. # - # Basically a `Fiber::Queue` wrapped in a `Thread::Mutex`, at the exception of + # Basically a `Fiber::List` protected by a `Thread::Mutex`, at the exception of # the `#grab?` method that tries to grab 1/Nth of the queue at once. class GlobalQueue def initialize(@mutex : Thread::Mutex) - @queue = Fiber::Queue.new + @list = Fiber::List.new end # Grabs the lock and enqueues a runnable fiber on the global runnable queue. @@ -27,18 +27,18 @@ module Fiber::ExecutionContext # Enqueues a runnable fiber on the global runnable queue. Assumes the lock # is currently held. def unsafe_push(fiber : Fiber) : Nil - @queue.push(fiber) + @list.push(fiber) end # Grabs the lock and puts a runnable fiber on the global runnable queue. - def bulk_push(queue : Fiber::Queue*) : Nil - @mutex.synchronize { unsafe_bulk_push(queue) } + def bulk_push(list : Fiber::List*) : Nil + @mutex.synchronize { unsafe_bulk_push(list) } end # Puts a runnable fiber on the global runnable queue. Assumes the lock is # currently held. - def unsafe_bulk_push(queue : Fiber::Queue*) : Nil - @queue.bulk_unshift(queue) + def unsafe_bulk_push(list : Fiber::List*) : Nil + @list.bulk_unshift(list) end # Grabs the lock and dequeues one runnable fiber from the global runnable @@ -50,7 +50,7 @@ module Fiber::ExecutionContext # Dequeues one runnable fiber from the global runnable queue. Assumes the # lock is currently held. def unsafe_pop? : Fiber? - @queue.pop? + @list.pop? end # Grabs the lock then tries to grab a batch of fibers from the global @@ -71,10 +71,10 @@ module Fiber::ExecutionContext # execution context; it should be the number of threads. def unsafe_grab?(runnables : Runnables, divisor : Int32) : Fiber? # ported from Go: globrunqget - return if @queue.empty? + return if @list.empty? divisor = 1 if divisor < 1 - size = @queue.size + size = @list.size n = { size, # can't grab more than available @@ -82,11 +82,11 @@ module Fiber::ExecutionContext runnables.capacity // 2, # refill half the destination queue }.min - fiber = @queue.pop? + fiber = @list.pop? - # OPTIMIZE: q = @queue.split(n - 1) then `runnables.push(pointerof(q))` (?) + # OPTIMIZE: list = @list.split(n - 1) then `runnables.push(pointerof(list))` (?) (n - 1).times do - break unless f = @queue.pop? + break unless f = @list.pop? runnables.push(f) end @@ -95,12 +95,12 @@ module Fiber::ExecutionContext @[AlwaysInline] def empty? : Bool - @queue.empty? + @list.empty? end @[AlwaysInline] def size : Int32 - @queue.size + @list.size end end end diff --git a/src/fiber/execution_context/runnables.cr b/src/fiber/execution_context/runnables.cr index b50be401fc8c..23965cce5b67 100644 --- a/src/fiber/execution_context/runnables.cr +++ b/src/fiber/execution_context/runnables.cr @@ -8,7 +8,7 @@ # - on empty: bulk grab up to half the ring from `GlobalQueue`; # - bulk push operation; -require "../queue" +require "../list" require "./global_queue" module Fiber::ExecutionContext @@ -86,28 +86,28 @@ module Fiber::ExecutionContext # link the fibers n.times do |i| - batch.to_unsafe[i].queue_next = batch.to_unsafe[i &+ 1] + batch.to_unsafe[i].list_next = batch.to_unsafe[i &+ 1] end - queue = Fiber::Queue.new(batch.to_unsafe[0], batch.to_unsafe[n], size: (n &+ 1).to_i32) + list = Fiber::List.new(batch.to_unsafe[0], batch.to_unsafe[n], size: (n &+ 1).to_i32) # now put the batch on global queue (grabs the global lock) - @global_queue.bulk_push(pointerof(queue)) + @global_queue.bulk_push(pointerof(list)) true end - # Tries to enqueue all the fibers in `queue` into the local queue. If the - # queue is full, the overflow will be pushed to the global queue; in that - # case this will temporarily acquire the global queue lock. + # Tries to enqueue all the fibers in *list* into the local queue. If the + # local queue is full, the overflow will be pushed to the global queue; in + # that case this will temporarily acquire the global queue lock. # # Executed only by the owner. - def bulk_push(queue : Fiber::Queue*) : Nil + def bulk_push(list : Fiber::List*) : Nil # ported from Go: runqputbatch head = @head.get(:acquire) # sync with other consumers tail = @tail.get(:relaxed) - while !queue.value.empty? && (tail &- head) < N - fiber = queue.value.pop + while !list.value.empty? && (tail &- head) < N + fiber = list.value.pop @buffer.to_unsafe[tail % N] = fiber tail &+= 1 end @@ -116,7 +116,7 @@ module Fiber::ExecutionContext @tail.set(tail, :release) # put any overflow on global queue - @global_queue.bulk_push(queue) unless queue.value.empty? + @global_queue.bulk_push(list) unless list.value.empty? end # Dequeues the next runnable fiber from the local queue. diff --git a/src/fiber/queue.cr b/src/fiber/list.cr similarity index 50% rename from src/fiber/queue.cr rename to src/fiber/list.cr index e5e66b73eab1..4918d0c0b92f 100644 --- a/src/fiber/queue.cr +++ b/src/fiber/list.cr @@ -1,4 +1,4 @@ -# The queue is modeled after Go's `gQueue`, distributed under a BSD-like +# The list is modeled after Go's `gQueue`, distributed under a BSD-like # license: class Fiber @@ -6,45 +6,51 @@ class Fiber # # Singly-linked list of `Fiber`. # Last-in, first-out (LIFO) semantic. - # A fiber can only exist within a single `Queue` at any time. + # A fiber can only exist within a single `List` at any time. # - # Unlike `Crystal::PointerLinkedList` doubly-linked list, this `Queue` is - # meant to maintain a queue of runnable fibers, or to quickly collect an - # arbitrary number of fibers. + # This list if simpler than `Crystal::PointerLinkedList` which is a doubly + # linked list. It's meant to maintain a queue of runnable fibers, or to + # quickly collect an arbitrary number of fibers; situations where we don't + # need arbitrary deletions from anywhere in the list. # # Thread unsafe! An external lock is required for concurrent accesses. - struct Queue + struct List getter size : Int32 def initialize(@head : Fiber? = nil, @tail : Fiber? = nil, @size = 0) end + # Appends *fiber* to the head of the list. def push(fiber : Fiber) : Nil - fiber.queue_next = @head + fiber.list_next = @head @head = fiber @tail = fiber if @tail.nil? @size += 1 end - def bulk_unshift(queue : Queue*) : Nil - return unless last = queue.value.@tail - last.queue_next = nil + # Appends all the fibers from *other* to the tail of the list. + def bulk_unshift(other : List*) : Nil + return unless last = other.value.@tail + last.list_next = nil if tail = @tail - tail.queue_next = queue.value.@head + tail.list_next = other.value.@head else - @head = queue.value.@head + @head = other.value.@head end @tail = last - @size += queue.value.size + @size += other.value.size end + # Removes a fiber from the head of the list. Raises `IndexError` when + # empty. @[AlwaysInline] def pop : Fiber pop { raise IndexError.new } end + # Removes a fiber from the head of the list. Returns `nil` when empty. @[AlwaysInline] def pop? : Fiber? pop { nil } @@ -52,10 +58,10 @@ class Fiber private def pop(&) if fiber = @head - @head = fiber.queue_next + @head = fiber.list_next @tail = nil if @head.nil? @size -= 1 - fiber.queue_next = nil + fiber.list_next = nil fiber else yield @@ -67,7 +73,7 @@ class Fiber @head == nil end - def clear + def clear : Nil @size = 0 @head = @tail = nil end @@ -76,7 +82,7 @@ class Fiber cursor = @head while cursor yield cursor - cursor = cursor.queue_next + cursor = cursor.list_next end end end From 0e7fa5355a15ac2c31ff50358fa147e3d40c363c Mon Sep 17 00:00:00 2001 From: Julien Portalier Date: Mon, 3 Feb 2025 11:29:08 +0100 Subject: [PATCH 10/14] Rename Runnables#get? as #shift? --- .../execution_context/global_queue_spec.cr | 6 +-- .../fiber/execution_context/runnables_spec.cr | 52 +++++++++---------- src/fiber/execution_context/runnables.cr | 3 +- 3 files changed, 30 insertions(+), 31 deletions(-) diff --git a/spec/std/fiber/execution_context/global_queue_spec.cr b/spec/std/fiber/execution_context/global_queue_spec.cr index 09777c46f393..42a1482ce239 100644 --- a/spec/std/fiber/execution_context/global_queue_spec.cr +++ b/spec/std/fiber/execution_context/global_queue_spec.cr @@ -49,8 +49,8 @@ describe Fiber::ExecutionContext::GlobalQueue do # enqueued the next 2 fibers runnables.size.should eq(2) - runnables.get?.should be(fibers[8]) - runnables.get?.should be(fibers[7]) + runnables.shift?.should be(fibers[8]) + runnables.shift?.should be(fibers[7]) # the remaining fibers are still there: 6.downto(0).each do |i| @@ -179,7 +179,7 @@ describe Fiber::ExecutionContext::GlobalQueue do ready.done loop do - if fiber = r.get? + if fiber = r.shift? execute.call(fiber) slept = 0 next diff --git a/spec/std/fiber/execution_context/runnables_spec.cr b/spec/std/fiber/execution_context/runnables_spec.cr index ffbb43e90e00..0fbdfcbb3e34 100644 --- a/spec/std/fiber/execution_context/runnables_spec.cr +++ b/spec/std/fiber/execution_context/runnables_spec.cr @@ -17,8 +17,8 @@ describe Fiber::ExecutionContext::Runnables do fibers.each { |f| r.push(f) } # local dequeue - fibers.each { |f| r.get?.should be(f) } - r.get?.should be_nil + fibers.each { |f| r.shift?.should be(f) } + r.shift?.should be_nil # didn't push to global queue g.pop?.should be_nil @@ -33,8 +33,8 @@ describe Fiber::ExecutionContext::Runnables do fibers.each { |f| r.push(f) } # kept half of local queue - r.get?.should be(fibers[2]) - r.get?.should be(fibers[3]) + r.shift?.should be(fibers[2]) + r.shift?.should be(fibers[3]) # moved half of local queue + last push to global queue g.pop?.should eq(fibers[0]) @@ -49,14 +49,14 @@ describe Fiber::ExecutionContext::Runnables do 4.times do # local 4.times { r.push(Fiber.new { }) } - 2.times { r.get? } + 2.times { r.shift? } 2.times { r.push(Fiber.new { }) } # overflow (2+1 fibers are sent to global queue + 1 local) 2.times { r.push(Fiber.new { }) } # clear - 3.times { r.get? } + 3.times { r.shift? } end # on each iteration we pushed 2+1 fibers to the global queue @@ -65,8 +65,8 @@ describe Fiber::ExecutionContext::Runnables do # grab fibers back from the global queue fiber = g.unsafe_grab?(r, divisor: 1) fiber.should_not be_nil - r.get?.should_not be_nil - r.get?.should be_nil + r.shift?.should_not be_nil + r.shift?.should be_nil end end @@ -81,7 +81,7 @@ describe Fiber::ExecutionContext::Runnables do r = Fiber::ExecutionContext::Runnables(4).new(g) r.bulk_push(pointerof(l)) - fibers.reverse_each { |f| r.get?.should be(f) } + fibers.reverse_each { |f| r.shift?.should be(f) } g.empty?.should be_true end @@ -96,10 +96,10 @@ describe Fiber::ExecutionContext::Runnables do r.bulk_push(pointerof(l)) # filled the local queue - r.get?.should eq(fibers[6]) - r.get?.should eq(fibers[5]) - r.get?.should be(fibers[4]) - r.get?.should be(fibers[3]) + r.shift?.should eq(fibers[6]) + r.shift?.should eq(fibers[5]) + r.shift?.should be(fibers[4]) + r.shift?.should be(fibers[3]) # moved the rest to the global queue g.pop?.should eq(fibers[2]) @@ -108,7 +108,7 @@ describe Fiber::ExecutionContext::Runnables do end end - describe "#get?" do + describe "#shift?" do # TODO: need specific tests (though we already use it in the above tests?) end @@ -127,15 +127,15 @@ describe Fiber::ExecutionContext::Runnables do # stole half of the runnable fibers fiber.should be(fibers[2]) - r2.get?.should be(fibers[0]) - r2.get?.should be(fibers[1]) - r2.get?.should be_nil + r2.shift?.should be(fibers[0]) + r2.shift?.should be(fibers[1]) + r2.shift?.should be_nil # left the other half - r1.get?.should be(fibers[3]) - r1.get?.should be(fibers[4]) - r1.get?.should be(fibers[5]) - r1.get?.should be_nil + r1.shift?.should be(fibers[3]) + r1.shift?.should be(fibers[4]) + r1.shift?.should be(fibers[5]) + r1.shift?.should be_nil # global queue is left untouched g.empty?.should be_true @@ -155,10 +155,10 @@ describe Fiber::ExecutionContext::Runnables do # stole the fiber & local queue is still empty fiber.should be(lone) - r2.get?.should be_nil + r2.shift?.should be_nil # left nothing in original queue - r1.get?.should be_nil + r1.shift?.should be_nil # global queue is left untouched g.empty?.should be_true @@ -171,8 +171,8 @@ describe Fiber::ExecutionContext::Runnables do fiber = r2.steal_from(r1) fiber.should be_nil - r2.get?.should be_nil - r1.get?.should be_nil + r2.shift?.should be_nil + r1.shift?.should be_nil end end @@ -210,7 +210,7 @@ describe Fiber::ExecutionContext::Runnables do loop do # dequeue from local queue - if fiber = runnables.get? + if fiber = runnables.shift? execute.call(fiber) slept = 0 next diff --git a/src/fiber/execution_context/runnables.cr b/src/fiber/execution_context/runnables.cr index 23965cce5b67..7162b1a659fe 100644 --- a/src/fiber/execution_context/runnables.cr +++ b/src/fiber/execution_context/runnables.cr @@ -122,8 +122,7 @@ module Fiber::ExecutionContext # Dequeues the next runnable fiber from the local queue. # # Executed only by the owner. - # TODO: rename as `#shift?` - def get? : Fiber? + def shift? : Fiber? # ported from Go: runqget head = @head.get(:acquire) # sync with other consumers From 84c3b363c0f5f51be78812d70ee05debac4bb843 Mon Sep 17 00:00:00 2001 From: Julien Portalier Date: Mon, 3 Feb 2025 11:33:20 +0100 Subject: [PATCH 11/14] Runnables: normalize thread safety comment --- src/fiber/execution_context/runnables.cr | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/fiber/execution_context/runnables.cr b/src/fiber/execution_context/runnables.cr index 7162b1a659fe..64488966ec5f 100644 --- a/src/fiber/execution_context/runnables.cr +++ b/src/fiber/execution_context/runnables.cr @@ -139,7 +139,7 @@ module Fiber::ExecutionContext # Steals half the fibers from the local queue of `src` and puts them onto # the local queue. Returns one of the stolen fibers, or `nil` on failure. # - # Only executed from the owner (when the local queue is empty). + # Executed only by the owner (when the local queue is empty). def steal_from(src : Runnables(N)) : Fiber? # ported from Go: runqsteal From 6af826875fe15440ad175aa93343e890dff7b9f1 Mon Sep 17 00:00:00 2001 From: Julien Portalier Date: Mon, 10 Feb 2025 17:49:52 +0100 Subject: [PATCH 12/14] Move shadow space reservation to x86_64 `makecontext` (#15434) --- src/fiber.cr | 18 +++--------------- src/fiber/context/x86_64-microsoft.cr | 5 +++-- 2 files changed, 6 insertions(+), 17 deletions(-) diff --git a/src/fiber.cr b/src/fiber.cr index 55d32e66283e..6892fa322ab9 100644 --- a/src/fiber.cr +++ b/src/fiber.cr @@ -102,21 +102,9 @@ class Fiber fiber_main = ->(f : Fiber) { f.run } - # FIXME: This line shouldn't be necessary (#7975) - stack_ptr = nil - {% if flag?(:win32) %} - # align stack bottom to 16 bytes - @stack_bottom = Pointer(Void).new(@stack_bottom.address & ~0x0f_u64) - - # It's the caller's responsibility to allocate 32 bytes of "shadow space" on the stack right - # before calling the function (regardless of the actual number of parameters used) - - stack_ptr = @stack_bottom - sizeof(Void*) * 6 - {% else %} - # point to first addressable pointer on the stack (@stack_bottom points past - # the stack because the stack grows down): - stack_ptr = @stack_bottom - sizeof(Void*) - {% end %} + # point to first addressable pointer on the stack (@stack_bottom points past + # the stack because the stack grows down): + stack_ptr = @stack_bottom - sizeof(Void*) # align the stack pointer to 16 bytes: stack_ptr = Pointer(Void*).new(stack_ptr.address & ~0x0f_u64) diff --git a/src/fiber/context/x86_64-microsoft.cr b/src/fiber/context/x86_64-microsoft.cr index 08576fc348aa..a1b9fa281074 100644 --- a/src/fiber/context/x86_64-microsoft.cr +++ b/src/fiber/context/x86_64-microsoft.cr @@ -6,14 +6,15 @@ class Fiber # A great explanation on stack contexts for win32: # https://web.archive.org/web/20220527113808/https://cfsamson.gitbook.io/green-threads-explained-in-200-lines-of-rust/supporting-windows - # 8 registers + 3 qwords for NT_TIB + 1 parameter + 10 128bit XMM registers - @context.stack_top = (stack_ptr - (12 + 10*2)).as(Void*) + # 4 shadow space + (8 registers + 3 qwords for NT_TIB + 1 parameter) + 10 128bit XMM registers + @context.stack_top = (stack_ptr - (4 + 12 + 10*2)).as(Void*) @context.resumable = 1 # actual stack top, not including guard pages and reserved pages LibC.GetNativeSystemInfo(out system_info) stack_top = @stack_bottom - system_info.dwPageSize + stack_ptr -= 4 # shadow space (or home space) before return address stack_ptr[0] = fiber_main.pointer # %rbx: Initial `resume` will `ret` to this address stack_ptr[-1] = self.as(Void*) # %rcx: puts `self` as first argument for `fiber_main` From 44d16d300ddc7dedc3a879453006a22d8fea72c6 Mon Sep 17 00:00:00 2001 From: Julien Portalier Date: Mon, 10 Feb 2025 17:51:42 +0100 Subject: [PATCH 13/14] Initialize Fiber with an explicit stack (#15409) --- src/compiler/crystal/interpreter/context.cr | 4 +-- src/crystal/scheduler.cr | 2 +- src/crystal/system/unix/signal.cr | 4 +-- src/fiber.cr | 36 +++++++++++---------- src/fiber/context/aarch64-microsoft.cr | 8 ++--- src/fiber/context/x86_64-microsoft.cr | 8 ++--- src/fiber/stack.cr | 17 ++++++++++ src/fiber/stack_pool.cr | 19 +++++------ src/gc/boehm.cr | 2 +- 9 files changed, 60 insertions(+), 40 deletions(-) create mode 100644 src/fiber/stack.cr diff --git a/src/compiler/crystal/interpreter/context.cr b/src/compiler/crystal/interpreter/context.cr index 987781c4aefb..06a67c456370 100644 --- a/src/compiler/crystal/interpreter/context.cr +++ b/src/compiler/crystal/interpreter/context.cr @@ -106,10 +106,10 @@ class Crystal::Repl::Context # Once the block returns, the stack is returned to the pool. # The stack is not cleared after or before it's used. def checkout_stack(& : UInt8* -> _) - stack, _ = @stack_pool.checkout + stack = @stack_pool.checkout begin - yield stack.as(UInt8*) + yield stack.pointer.as(UInt8*) ensure @stack_pool.release(stack) end diff --git a/src/crystal/scheduler.cr b/src/crystal/scheduler.cr index efee6b3c06f1..51494fa2944b 100644 --- a/src/crystal/scheduler.cr +++ b/src/crystal/scheduler.cr @@ -124,7 +124,7 @@ class Crystal::Scheduler {% elsif flag?(:interpreted) %} # No need to change the stack bottom! {% else %} - GC.set_stackbottom(fiber.@stack_bottom) + GC.set_stackbottom(fiber.@stack.bottom) {% end %} current, @thread.current_fiber = @thread.current_fiber, fiber diff --git a/src/crystal/system/unix/signal.cr b/src/crystal/system/unix/signal.cr index 12804ea00267..6c992478db5f 100644 --- a/src/crystal/system/unix/signal.cr +++ b/src/crystal/system/unix/signal.cr @@ -183,8 +183,8 @@ module Crystal::System::Signal is_stack_overflow = begin - stack_top = Pointer(Void).new(::Fiber.current.@stack.address - 4096) - stack_bottom = ::Fiber.current.@stack_bottom + stack_top = ::Fiber.current.@stack.pointer - 4096 + stack_bottom = ::Fiber.current.@stack.bottom stack_top <= addr < stack_bottom rescue e Crystal::System.print_error "Error while trying to determine if a stack overflow has occurred. Probable memory corruption\n" diff --git a/src/fiber.cr b/src/fiber.cr index 6892fa322ab9..3aff3c025647 100644 --- a/src/fiber.cr +++ b/src/fiber.cr @@ -1,5 +1,6 @@ require "crystal/system/thread_linked_list" require "./fiber/context" +require "./fiber/stack" # :nodoc: @[NoInline] @@ -47,12 +48,11 @@ class Fiber protected class_getter(fibers) { Thread::LinkedList(Fiber).new } @context : Context - @stack : Void* + @stack : Stack @resume_event : Crystal::EventLoop::Event? @timeout_event : Crystal::EventLoop::Event? # :nodoc: property timeout_select_action : Channel::TimeoutAction? - protected property stack_bottom : Void* # The name of the fiber, used as internal reference. property name : String? @@ -91,31 +91,30 @@ class Fiber # When the fiber is executed, it runs *proc* in its context. # # *name* is an optional and used only as an internal reference. - def initialize(@name : String? = nil, &@proc : ->) - @context = Context.new - @stack, @stack_bottom = + def self.new(name : String? = nil, &proc : ->) + stack = {% if flag?(:interpreted) %} - {Pointer(Void).null, Pointer(Void).null} + # the interpreter is managing the stacks + Stack.new(Pointer(Void).null, Pointer(Void).null) {% else %} Crystal::Scheduler.stack_pool.checkout {% end %} + new(name, stack, &proc) + end - fiber_main = ->(f : Fiber) { f.run } - - # point to first addressable pointer on the stack (@stack_bottom points past - # the stack because the stack grows down): - stack_ptr = @stack_bottom - sizeof(Void*) - - # align the stack pointer to 16 bytes: - stack_ptr = Pointer(Void*).new(stack_ptr.address & ~0x0f_u64) + # :nodoc: + def initialize(@name : String?, @stack : Stack, &@proc : ->) + @context = Context.new + fiber_main = ->(f : Fiber) { f.run } + stack_ptr = @stack.first_addressable_pointer makecontext(stack_ptr, fiber_main) Fiber.fibers.push(self) end # :nodoc: - def initialize(@stack : Void*, thread) + def initialize(stack : Void*, thread) @proc = Proc(Void).new { } # TODO: should creating a new context for the main fiber also be platform specific? @@ -127,7 +126,10 @@ class Fiber {% else %} Context.new(_fiber_get_stack_top) {% end %} - thread.gc_thread_handler, @stack_bottom = GC.current_thread_stack_bottom + + thread.gc_thread_handler, stack_bottom = GC.current_thread_stack_bottom + @stack = Stack.new(stack, stack_bottom) + @name = "main" {% if flag?(:preview_mt) %} @current_thread.set(thread) {% end %} Fiber.fibers.push(self) @@ -317,7 +319,7 @@ class Fiber # :nodoc: def push_gc_roots : Nil # Push the used section of the stack - GC.push_stack @context.stack_top, @stack_bottom + GC.push_stack @context.stack_top, @stack.bottom end {% if flag?(:preview_mt) %} diff --git a/src/fiber/context/aarch64-microsoft.cr b/src/fiber/context/aarch64-microsoft.cr index b2fa76580418..b9e86dfbc6cf 100644 --- a/src/fiber/context/aarch64-microsoft.cr +++ b/src/fiber/context/aarch64-microsoft.cr @@ -13,16 +13,16 @@ class Fiber # actual stack top, not including guard pages and reserved pages LibC.GetNativeSystemInfo(out system_info) - stack_top = @stack_bottom - system_info.dwPageSize + stack_top = @stack.bottom - system_info.dwPageSize stack_ptr[-4] = self.as(Void*) # x0 (r0): puts `self` as first argument for `fiber_main` stack_ptr[-16] = fiber_main.pointer # x30 (lr): initial `resume` will `ret` to this address # The following three values are stored in the Thread Information Block (NT_TIB) # and are used by Windows to track the current stack limits - stack_ptr[-3] = @stack # [x18, #0x1478]: Win32 DeallocationStack - stack_ptr[-2] = stack_top # [x18, #16]: Stack Limit - stack_ptr[-1] = @stack_bottom # [x18, #8]: Stack Base + stack_ptr[-3] = @stack.pointer # [x18, #0x1478]: Win32 DeallocationStack + stack_ptr[-2] = stack_top # [x18, #16]: Stack Limit + stack_ptr[-1] = @stack.bottom # [x18, #8]: Stack Base end # :nodoc: diff --git a/src/fiber/context/x86_64-microsoft.cr b/src/fiber/context/x86_64-microsoft.cr index a1b9fa281074..3a405e24d1f9 100644 --- a/src/fiber/context/x86_64-microsoft.cr +++ b/src/fiber/context/x86_64-microsoft.cr @@ -12,7 +12,7 @@ class Fiber # actual stack top, not including guard pages and reserved pages LibC.GetNativeSystemInfo(out system_info) - stack_top = @stack_bottom - system_info.dwPageSize + stack_top = @stack.bottom - system_info.dwPageSize stack_ptr -= 4 # shadow space (or home space) before return address stack_ptr[0] = fiber_main.pointer # %rbx: Initial `resume` will `ret` to this address @@ -20,9 +20,9 @@ class Fiber # The following three values are stored in the Thread Information Block (NT_TIB) # and are used by Windows to track the current stack limits - stack_ptr[-2] = @stack # %gs:0x1478: Win32 DeallocationStack - stack_ptr[-3] = stack_top # %gs:0x10: Stack Limit - stack_ptr[-4] = @stack_bottom # %gs:0x08: Stack Base + stack_ptr[-2] = @stack.pointer # %gs:0x1478: Win32 DeallocationStack + stack_ptr[-3] = stack_top # %gs:0x10: Stack Limit + stack_ptr[-4] = @stack.bottom # %gs:0x08: Stack Base end # :nodoc: diff --git a/src/fiber/stack.cr b/src/fiber/stack.cr new file mode 100644 index 000000000000..9666b506db0c --- /dev/null +++ b/src/fiber/stack.cr @@ -0,0 +1,17 @@ +class Fiber + # :nodoc: + struct Stack + getter pointer : Void* + getter bottom : Void* + getter? reusable : Bool + + def initialize(@pointer, @bottom, *, @reusable = false) + end + + def first_addressable_pointer : Void** + ptr = @bottom # stacks grow down + ptr -= sizeof(Void*) # point to first addressable pointer + Pointer(Void*).new(ptr.address & ~15_u64) # align to 16 bytes + end + end +end diff --git a/src/fiber/stack_pool.cr b/src/fiber/stack_pool.cr index 8f809335f46c..04954de40a94 100644 --- a/src/fiber/stack_pool.cr +++ b/src/fiber/stack_pool.cr @@ -12,12 +12,12 @@ class Fiber # Interpreter stacks grow upwards (pushing values increases the stack # pointer value) rather than downwards, so *protect* must be false. def initialize(@protect : Bool = true) - @deque = Deque(Void*).new + @deque = Deque(Stack).new end def finalize @deque.each do |stack| - Crystal::System::Fiber.free_stack(stack, STACK_SIZE) + Crystal::System::Fiber.free_stack(stack.pointer, STACK_SIZE) end end @@ -26,7 +26,7 @@ class Fiber def collect(count = lazy_size // 2) : Nil count.times do if stack = @deque.shift? - Crystal::System::Fiber.free_stack(stack, STACK_SIZE) + Crystal::System::Fiber.free_stack(stack.pointer, STACK_SIZE) else return end @@ -41,18 +41,19 @@ class Fiber end # Removes a stack from the bottom of the pool, or allocates a new one. - def checkout : {Void*, Void*} + def checkout : Stack if stack = @deque.pop? - Crystal::System::Fiber.reset_stack(stack, STACK_SIZE, @protect) + Crystal::System::Fiber.reset_stack(stack.pointer, STACK_SIZE, @protect) + stack else - stack = Crystal::System::Fiber.allocate_stack(STACK_SIZE, @protect) + pointer = Crystal::System::Fiber.allocate_stack(STACK_SIZE, @protect) + Stack.new(pointer, pointer + STACK_SIZE, reusable: true) end - {stack, stack + STACK_SIZE} end # Appends a stack to the bottom of the pool. - def release(stack) : Nil - @deque.push(stack) + def release(stack : Stack) : Nil + @deque.push(stack) if stack.reusable? end # Returns the approximated size of the pool. It may be equal or slightly diff --git a/src/gc/boehm.cr b/src/gc/boehm.cr index 327b3d50409f..3a7a63d68153 100644 --- a/src/gc/boehm.cr +++ b/src/gc/boehm.cr @@ -223,7 +223,7 @@ module GC {% if flag?(:preview_mt) %} Thread.unsafe_each do |thread| if fiber = thread.current_fiber? - GC.set_stackbottom(thread.gc_thread_handler, fiber.@stack_bottom) + GC.set_stackbottom(thread.gc_thread_handler, fiber.@stack.bottom) end end {% end %} From edcb13f74af0adde997e64d3355c631d09e3f4f0 Mon Sep 17 00:00:00 2001 From: Julien Portalier Date: Tue, 4 Feb 2025 18:30:21 +0100 Subject: [PATCH 14/14] Fix: use fake fiber stacks in queue specs (consumes less virtual memory) --- .../execution_context/global_queue_spec.cr | 20 +++++----- .../fiber/execution_context/runnables_spec.cr | 22 +++++------ .../fiber/execution_context/spec_helper.cr | 1 + spec/std/fiber/list_spec.cr | 39 ++++++++++--------- spec/support/fibers.cr | 10 +++++ 5 files changed, 52 insertions(+), 40 deletions(-) diff --git a/spec/std/fiber/execution_context/global_queue_spec.cr b/spec/std/fiber/execution_context/global_queue_spec.cr index 42a1482ce239..17b746c7dc86 100644 --- a/spec/std/fiber/execution_context/global_queue_spec.cr +++ b/spec/std/fiber/execution_context/global_queue_spec.cr @@ -7,9 +7,9 @@ describe Fiber::ExecutionContext::GlobalQueue do end it "#unsafe_push and #unsafe_pop" do - f1 = Fiber.new(name: "f1") { } - f2 = Fiber.new(name: "f2") { } - f3 = Fiber.new(name: "f3") { } + f1 = new_fake_fiber("f1") + f2 = new_fake_fiber("f2") + f3 = new_fake_fiber("f3") q = Fiber::ExecutionContext::GlobalQueue.new(Thread::Mutex.new) q.unsafe_push(f1) @@ -38,7 +38,7 @@ describe Fiber::ExecutionContext::GlobalQueue do it "grabs fibers" do q = Fiber::ExecutionContext::GlobalQueue.new(Thread::Mutex.new) - fibers = 10.times.map { |i| Fiber.new(name: "f#{i}") { } }.to_a + fibers = 10.times.map { |i| new_fake_fiber("f#{i}") }.to_a fibers.each { |f| q.unsafe_push(f) } runnables = Fiber::ExecutionContext::Runnables(6).new(q) @@ -59,7 +59,7 @@ describe Fiber::ExecutionContext::GlobalQueue do end it "can't grab more than available" do - f = Fiber.new { } + f = new_fake_fiber q = Fiber::ExecutionContext::GlobalQueue.new(Thread::Mutex.new) q.unsafe_push(f) @@ -73,7 +73,7 @@ describe Fiber::ExecutionContext::GlobalQueue do end it "clamps divisor to 1" do - f = Fiber.new { } + f = new_fake_fiber q = Fiber::ExecutionContext::GlobalQueue.new(Thread::Mutex.new) q.unsafe_push(f) @@ -91,7 +91,7 @@ describe Fiber::ExecutionContext::GlobalQueue do pending_interpreted describe: "thread safety" do it "one by one" do fibers = StaticArray(Fiber::ExecutionContext::FiberCounter, 763).new do |i| - Fiber::ExecutionContext::FiberCounter.new(Fiber.new(name: "f#{i}") { }) + Fiber::ExecutionContext::FiberCounter.new(new_fake_fiber("f#{i}")) end n = 7 @@ -101,7 +101,7 @@ describe Fiber::ExecutionContext::GlobalQueue do shutdown = Thread::WaitGroup.new(n) n.times do |i| - Thread.new(name: "ONE-#{i}") do |thread| + Thread.new("ONE-#{i}") do |thread| slept = 0 ready.done @@ -141,7 +141,7 @@ describe Fiber::ExecutionContext::GlobalQueue do increments = 15 fibers = StaticArray(Fiber::ExecutionContext::FiberCounter, 765).new do |i| # 765 can be divided by 3 and 5 - Fiber::ExecutionContext::FiberCounter.new(Fiber.new(name: "f#{i}") { }) + Fiber::ExecutionContext::FiberCounter.new(new_fake_fiber("f#{i}")) end queue = Fiber::ExecutionContext::GlobalQueue.new(Thread::Mutex.new) @@ -149,7 +149,7 @@ describe Fiber::ExecutionContext::GlobalQueue do shutdown = Thread::WaitGroup.new(n) n.times do |i| - Thread.new(name: "BULK-#{i}") do |thread| + Thread.new("BULK-#{i}") do |thread| slept = 0 r = Fiber::ExecutionContext::Runnables(3).new(queue) diff --git a/spec/std/fiber/execution_context/runnables_spec.cr b/spec/std/fiber/execution_context/runnables_spec.cr index 0fbdfcbb3e34..4c4a227e374f 100644 --- a/spec/std/fiber/execution_context/runnables_spec.cr +++ b/spec/std/fiber/execution_context/runnables_spec.cr @@ -9,7 +9,7 @@ describe Fiber::ExecutionContext::Runnables do describe "#push" do it "enqueues the fiber in local queue" do - fibers = 4.times.map { |i| Fiber.new(name: "f#{i}") { } }.to_a + fibers = 4.times.map { |i| new_fake_fiber("f#{i}") }.to_a # local enqueue g = Fiber::ExecutionContext::GlobalQueue.new(Thread::Mutex.new) @@ -25,7 +25,7 @@ describe Fiber::ExecutionContext::Runnables do end it "moves half the local queue to the global queue on overflow" do - fibers = 5.times.map { |i| Fiber.new(name: "f#{i}") { } }.to_a + fibers = 5.times.map { |i| new_fake_fiber("f#{i}") }.to_a # local enqueue + overflow g = Fiber::ExecutionContext::GlobalQueue.new(Thread::Mutex.new) @@ -48,12 +48,12 @@ describe Fiber::ExecutionContext::Runnables do 4.times do # local - 4.times { r.push(Fiber.new { }) } + 4.times { r.push(new_fake_fiber) } 2.times { r.shift? } - 2.times { r.push(Fiber.new { }) } + 2.times { r.push(new_fake_fiber) } # overflow (2+1 fibers are sent to global queue + 1 local) - 2.times { r.push(Fiber.new { }) } + 2.times { r.push(new_fake_fiber) } # clear 3.times { r.shift? } @@ -73,7 +73,7 @@ describe Fiber::ExecutionContext::Runnables do describe "#bulk_push" do it "fills the local queue" do l = Fiber::List.new - fibers = 4.times.map { |i| Fiber.new(name: "f#{i}") { } }.to_a + fibers = 4.times.map { |i| new_fake_fiber("f#{i}") }.to_a fibers.each { |f| l.push(f) } # local enqueue @@ -87,7 +87,7 @@ describe Fiber::ExecutionContext::Runnables do it "pushes the overflow to the global queue" do l = Fiber::List.new - fibers = 7.times.map { |i| Fiber.new(name: "f#{i}") { } }.to_a + fibers = 7.times.map { |i| new_fake_fiber("f#{i}") }.to_a fibers.each { |f| l.push(f) } # local enqueue + overflow @@ -115,7 +115,7 @@ describe Fiber::ExecutionContext::Runnables do describe "#steal_from" do it "steals from another runnables" do g = Fiber::ExecutionContext::GlobalQueue.new(Thread::Mutex.new) - fibers = 6.times.map { |i| Fiber.new(name: "f#{i}") { } }.to_a + fibers = 6.times.map { |i| new_fake_fiber("f#{i}") }.to_a # fill the source queue r1 = Fiber::ExecutionContext::Runnables(16).new(g) @@ -143,7 +143,7 @@ describe Fiber::ExecutionContext::Runnables do it "steals the last fiber" do g = Fiber::ExecutionContext::GlobalQueue.new(Thread::Mutex.new) - lone = Fiber.new(name: "lone") { } + lone = new_fake_fiber("lone") # fill the source queue r1 = Fiber::ExecutionContext::Runnables(16).new(g) @@ -185,7 +185,7 @@ describe Fiber::ExecutionContext::Runnables do # less fibers than space in runnables (so threads can starve) # 54 is roughly half of 16 × 7 and can be divided by 9 (for batch enqueues below) fibers = Array(Fiber::ExecutionContext::FiberCounter).new(54) do |i| - Fiber::ExecutionContext::FiberCounter.new(Fiber.new(name: "f#{i}") { }) + Fiber::ExecutionContext::FiberCounter.new(new_fake_fiber("f#{i}")) end global_queue = Fiber::ExecutionContext::GlobalQueue.new(Thread::Mutex.new) @@ -197,7 +197,7 @@ describe Fiber::ExecutionContext::Runnables do end n.times do |i| - Thread.new(name: "RUN-#{i}") do |thread| + Thread.new("RUN-#{i}") do |thread| runnables = all_runnables[i] slept = 0 diff --git a/spec/std/fiber/execution_context/spec_helper.cr b/spec/std/fiber/execution_context/spec_helper.cr index 0eca123b37bc..465005bbe5b9 100644 --- a/spec/std/fiber/execution_context/spec_helper.cr +++ b/spec/std/fiber/execution_context/spec_helper.cr @@ -1,4 +1,5 @@ require "../../spec_helper" +require "../../../support/fibers" require "crystal/system/thread_wait_group" require "fiber/execution_context/runnables" require "fiber/execution_context/global_queue" diff --git a/spec/std/fiber/list_spec.cr b/spec/std/fiber/list_spec.cr index fb27b865fa57..e46a5117ae46 100644 --- a/spec/std/fiber/list_spec.cr +++ b/spec/std/fiber/list_spec.cr @@ -1,4 +1,5 @@ require "../spec_helper" +require "../../support/fibers" require "fiber/list" describe Fiber::List do @@ -12,8 +13,8 @@ describe Fiber::List do end it "creates a filled queue" do - f1 = Fiber.new(name: "f1") { } - f2 = Fiber.new(name: "f2") { } + f1 = new_fake_fiber("f1") + f2 = new_fake_fiber("f2") f1.list_next = f2 f2.list_next = nil @@ -28,9 +29,9 @@ describe Fiber::List do describe "#push" do it "to head" do list = Fiber::List.new - f1 = Fiber.new(name: "f1") { } - f2 = Fiber.new(name: "f2") { } - f3 = Fiber.new(name: "f3") { } + f1 = new_fake_fiber("f1") + f2 = new_fake_fiber("f2") + f3 = new_fake_fiber("f3") # simulate fibers previously added to other queues f1.list_next = f3 @@ -65,9 +66,9 @@ describe Fiber::List do describe "#bulk_unshift" do it "to empty queue" do # manually create a queue - f1 = Fiber.new(name: "f1") { } - f2 = Fiber.new(name: "f2") { } - f3 = Fiber.new(name: "f3") { } + f1 = new_fake_fiber("f1") + f2 = new_fake_fiber("f2") + f3 = new_fake_fiber("f3") f3.list_next = f2 f2.list_next = f1 f1.list_next = nil @@ -82,11 +83,11 @@ describe Fiber::List do end it "to filled queue" do - f1 = Fiber.new(name: "f1") { } - f2 = Fiber.new(name: "f2") { } - f3 = Fiber.new(name: "f3") { } - f4 = Fiber.new(name: "f4") { } - f5 = Fiber.new(name: "f5") { } + f1 = new_fake_fiber("f1") + f2 = new_fake_fiber("f2") + f3 = new_fake_fiber("f3") + f4 = new_fake_fiber("f4") + f5 = new_fake_fiber("f5") # source queue f3.list_next = f2 @@ -115,9 +116,9 @@ describe Fiber::List do describe "#pop" do it "from head" do - f1 = Fiber.new(name: "f1") { } - f2 = Fiber.new(name: "f2") { } - f3 = Fiber.new(name: "f3") { } + f1 = new_fake_fiber("f1") + f2 = new_fake_fiber("f2") + f3 = new_fake_fiber("f3") f3.list_next = f2 f2.list_next = f1 f1.list_next = nil @@ -149,9 +150,9 @@ describe Fiber::List do describe "#pop?" do it "from head" do - f1 = Fiber.new(name: "f1") { } - f2 = Fiber.new(name: "f2") { } - f3 = Fiber.new(name: "f3") { } + f1 = new_fake_fiber("f1") + f2 = new_fake_fiber("f2") + f3 = new_fake_fiber("f3") f3.list_next = f2 f2.list_next = f1 f1.list_next = nil diff --git a/spec/support/fibers.cr b/spec/support/fibers.cr index 1b095a4422d6..424987677803 100644 --- a/spec/support/fibers.cr +++ b/spec/support/fibers.cr @@ -14,3 +14,13 @@ def wait_until_finished(f : Fiber, timeout = 5.seconds) raise "Fiber failed to finish within #{timeout}" if (Time.monotonic - now) > timeout end end + +# Fake stack for `makecontext` to have somewhere to write in #initialize. We +# don't actually run the fiber. The worst case is windows with ~300 bytes (with +# shadow space and alignment taken into account). We allocate more to be safe. +FAKE_FIBER_STACK = GC.malloc(512) + +def new_fake_fiber(name = nil) + stack = Fiber::Stack.new(FAKE_FIBER_STACK, FAKE_FIBER_STACK + 512) + Fiber.new(name, stack) { } +end