From eaba946a1816a0e04de300417f490ab081a830ef Mon Sep 17 00:00:00 2001 From: Jerry D'Antonio Date: Thu, 1 Oct 2015 16:27:38 -0400 Subject: [PATCH] Spike: Better multithreading in dispatcher. --- lib/volt/tasks/dispatcher.rb | 129 ++++++++++++++++++----------------- 1 file changed, 65 insertions(+), 64 deletions(-) diff --git a/lib/volt/tasks/dispatcher.rb b/lib/volt/tasks/dispatcher.rb index c95adff3..93b2d05f 100644 --- a/lib/volt/tasks/dispatcher.rb +++ b/lib/volt/tasks/dispatcher.rb @@ -11,6 +11,32 @@ class Dispatcher # When we pass the dispatcher over DRb, don't send a copy, just proxy. include DRb::DRbUndumped + class Responder + def initialize(channel, message) + @start_time = Time.now.to_f + @channel = channel + @message = message + end + + def update(time, value, reason) + callback_id, class_name, method_name, meta_data, *args = @message + result, cookies = *value + + if reason + # Convert the reason into a string so it can be serialized. + reason_str = "#{reason.class.to_s}: #{reason.to_s}" + @channel.send_message('response', callback_id, nil, reason_str, cookies) + else + reply = EJSON.stringify(['response', callback_id, result, nil, cookies]) + @channel.send_string_message(reply) + end + + run_time = ((Time.now.to_f - @start_time) * 1000).round(3) + Volt.logger.log_dispatch(class_name, method_name, run_time, args, reason) + end + end + private_constant :Responder + attr_reader :volt_app def initialize(volt_app) @@ -28,6 +54,11 @@ def initialize(volt_app) end @worker_timeout = Volt.config.worker_timeout || 60 + + # Setting timers on tasks requires two threads: one for the task and other + # to run the timer. For efficiency we'll run all our timers on one thread. + # But we'll handle all the tasks on the main worker pool. + @monitor = Concurrent::TimerSet.new(executor: @worker_pool) end # Mark the last time of the component modification for caching in sprockets @@ -49,22 +80,34 @@ def self.component_last_modified_time # Dispatch takes an incoming Task from the client and runs it on the # server, returning the result to the client. - # Tasks returning a promise will wait to return. def dispatch(channel, message) - # Dispatch the task in the worker pool. Pas in the meta data - @worker_pool.post do - begin - dispatch_in_thread(channel, message) - rescue => e - err = "Worker Thread Exception for #{message}\n" - err += e.inspect - err += e.backtrace.join("\n") if e.respond_to?(:backtrace) + # Create a responder to handle the result, whatever it is. + responder = Responder.new(channel, message) - Volt.logger.error(err) - end + # Start the task on the worker pool. + task = Concurrent::Future.new(executor: @worker_pool, args: [@volt_app, self, channel, message]) do |app, dispatcher, chan, msg| + dispatch_in_thread(app, dispatcher, chan, msg) + end + task.add_observer(responder) + task.execute + + # Set a timer to respond if the task times out. + # Also, how do we guarantee that the timer doesn't get GCed? + # need to get klass.__timeout from message + Concurrent::ScheduledTask.execute(@worker_timeout, + timer_set: @monitor, + executor: @worker_pool, + args: [task, @worker_timeout, message]) do |t, timeout, msg| + # Does nothing if the task is already complete. + t.fail(Timeout::Error.new("Task Timed Out after #{timeout} seconds: #{msg}")) end end + def close_channel(channel) + QueryTasks.new(@volt_app, channel).close! + end + + private # Check if it is safe to use this method def safe_method?(klass, method_name) @@ -86,78 +129,36 @@ def safe_method?(klass, method_name) false end - def close_channel(channel) - QueryTasks.new(@volt_app, channel).close! - end - - private - # Do the actual dispatching, should be running inside of a worker thread at # this point. - def dispatch_in_thread(channel, message) + def dispatch_in_thread(app, dispatcher, channel, message) callback_id, class_name, method_name, meta_data, *args = message method_name = method_name.to_sym # Get the class klass = Object.send(:const_get, class_name) - promise = Promise.new cookies = nil - start_time = Time.now.to_f - # Check that we are calling on a Task class and a method provide at # Task or above in the ancestor chain. (so no :send or anything) if safe_method?(klass, method_name) - promise.resolve(nil) # Init and send the method - promise = promise.then do - result = nil - Timeout.timeout(klass.__timeout || @worker_timeout) do - Thread.current['meta'] = meta_data - begin - klass_inst = klass.new(@volt_app, channel, self) - result = klass_inst.send(method_name, *args) - cookies = klass_inst.fetch_cookies - ensure - Thread.current['meta'] = nil - end - end - - result + result = nil + Thread.current['meta'] = meta_data + begin + klass_inst = klass.new(app, channel, dispatcher) + result = klass_inst.send(method_name, *args) + cookies = klass_inst.fetch_cookies + ensure + Thread.current['meta'] = nil end + return result, cookies else - # Unsafe method - promise.reject(RuntimeError.new("unsafe method: #{method_name}")) - end - - # Called after task runs or fails - finish = proc do |error| - if error.is_a?(Timeout::Error) - # re-raise with a message - error = Timeout::Error.new("Task Timed Out after #{@worker_timeout} seconds: #{message}") - end - - run_time = ((Time.now.to_f - start_time) * 1000).round(3) - Volt.logger.log_dispatch(class_name, method_name, run_time, args, error) - end - - # Run the promise and pass the return value/error back to the client - promise.then do |result| - reply = EJSON.stringify(['response', callback_id, result, nil, cookies]) - channel.send_string_message(reply) - - finish.call - end.fail do |error| - finish.call(error) - # Convert the error into a string so it can be serialized. - error_str = "#{error.class.to_s}: #{error.to_s}" - - channel.send_message('response', callback_id, nil, error_str, cookies) + raise RuntimeError.new("unsafe method: #{method_name}") end - end end end