diff --git a/lib/async/pool/controller.rb b/lib/async/pool/controller.rb index 47f75d5..0ced584 100644 --- a/lib/async/pool/controller.rb +++ b/lib/async/pool/controller.rb @@ -154,20 +154,18 @@ def release(resource) retire(resource) unless processed end + private def drain + # Enumerate all existing resources and retire them: + while resource = acquire_existing_resource + retire(resource) + end + end + # Close all resources in the pool. def close - @available.clear - - while pair = @resources.shift - resource, usage = pair - - if usage > 0 - Console.warn(self, resource: resource, usage: usage) {"Closing resource while still in use!"} - end - - resource.close - end + self.drain + @available.clear @gardener&.stop end @@ -224,6 +222,8 @@ def retire(resource) def start_gardener return if @gardener + @gardener = true + Async(transient: true, annotation: "#{self.class} Gardener") do |task| @gardener = task @@ -319,7 +319,7 @@ def available_resource resource = nil @guard.acquire do - resource = get_resource + resource = acquire_or_create_resource end return resource @@ -330,7 +330,24 @@ def available_resource private - def get_resource + # Acquire an existing resource with zero usage. + # If there are resources that are in use, wait until they are released. + def acquire_existing_resource + while @resources.any? + @resources.each do |resource, usage| + if usage == 0 + return resource + end + end + + @notification.wait + end + + # Only when the pool has been completely drained, return nil: + return nil + end + + def acquire_or_create_resource while resource = @available.last if usage = @resources[resource] and usage < resource.concurrency if resource.viable? diff --git a/test/async/pool/controller.rb b/test/async/pool/controller.rb index 7dbfab4..6dc6698 100644 --- a/test/async/pool/controller.rb +++ b/test/async/pool/controller.rb @@ -273,12 +273,26 @@ expect(pool).not.to be(:active?) end - it "warns if closing while a resource is acquired" do - pool.acquire + it "waits for connection to be released" do + events = [] - expect(Console).to receive(:warn).and_return(nil) + events << :acquire + resource = pool.acquire + child = Async do |task| + task.yield + + events << :release + pool.release(resource) + end + + events << :close pool.close + events << :closed + + child.wait + + expect(events).to be == [:acquire, :close, :release, :closed] end end