Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SPIKE: Better multithreading in dispatcher. #313

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 65 additions & 64 deletions lib/volt/tasks/dispatcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,32 @@ class Dispatcher
# When we pass the dispatcher over DRb, don't send a copy, just proxy.
include DRb::DRbUndumped

class Responder
def initialize(channel, message)
@start_time = Time.now.to_f
@channel = channel
@message = message
end

def update(time, value, reason)
callback_id, class_name, method_name, meta_data, *args = @message
result, cookies = *value

if reason
# Convert the reason into a string so it can be serialized.
reason_str = "#{reason.class.to_s}: #{reason.to_s}"
@channel.send_message('response', callback_id, nil, reason_str, cookies)
else
reply = EJSON.stringify(['response', callback_id, result, nil, cookies])
@channel.send_string_message(reply)
end

run_time = ((Time.now.to_f - @start_time) * 1000).round(3)
Volt.logger.log_dispatch(class_name, method_name, run_time, args, reason)
end
end
private_constant :Responder

attr_reader :volt_app

def initialize(volt_app)
Expand All @@ -28,6 +54,11 @@ def initialize(volt_app)
end

@worker_timeout = Volt.config.worker_timeout || 60

# Setting timers on tasks requires two threads: one for the task and other
# to run the timer. For efficiency we'll run all our timers on one thread.
# But we'll handle all the tasks on the main worker pool.
@monitor = Concurrent::TimerSet.new(executor: @worker_pool)
end

# Mark the last time of the component modification for caching in sprockets
Expand All @@ -49,22 +80,34 @@ def self.component_last_modified_time

# Dispatch takes an incoming Task from the client and runs it on the
# server, returning the result to the client.
# Tasks returning a promise will wait to return.
def dispatch(channel, message)
# Dispatch the task in the worker pool. Pas in the meta data
@worker_pool.post do
begin
dispatch_in_thread(channel, message)
rescue => e
err = "Worker Thread Exception for #{message}\n"
err += e.inspect
err += e.backtrace.join("\n") if e.respond_to?(:backtrace)
# Create a responder to handle the result, whatever it is.
responder = Responder.new(channel, message)

Volt.logger.error(err)
end
# Start the task on the worker pool.
task = Concurrent::Future.new(executor: @worker_pool, args: [@volt_app, self, channel, message]) do |app, dispatcher, chan, msg|
dispatch_in_thread(app, dispatcher, chan, msg)
end
task.add_observer(responder)
task.execute

# Set a timer to respond if the task times out.
# Also, how do we guarantee that the timer doesn't get GCed?
# need to get klass.__timeout from message
Concurrent::ScheduledTask.execute(@worker_timeout,
timer_set: @monitor,
executor: @worker_pool,
args: [task, @worker_timeout, message]) do |t, timeout, msg|
# Does nothing if the task is already complete.
t.fail(Timeout::Error.new("Task Timed Out after #{timeout} seconds: #{msg}"))
end
end

def close_channel(channel)
QueryTasks.new(@volt_app, channel).close!
end

private

# Check if it is safe to use this method
def safe_method?(klass, method_name)
Expand All @@ -86,78 +129,36 @@ def safe_method?(klass, method_name)
false
end

def close_channel(channel)
QueryTasks.new(@volt_app, channel).close!
end

private

# Do the actual dispatching, should be running inside of a worker thread at
# this point.
def dispatch_in_thread(channel, message)
def dispatch_in_thread(app, dispatcher, channel, message)
callback_id, class_name, method_name, meta_data, *args = message
method_name = method_name.to_sym

# Get the class
klass = Object.send(:const_get, class_name)

promise = Promise.new
cookies = nil

start_time = Time.now.to_f

# Check that we are calling on a Task class and a method provide at
# Task or above in the ancestor chain. (so no :send or anything)
if safe_method?(klass, method_name)
promise.resolve(nil)

# Init and send the method
promise = promise.then do
result = nil
Timeout.timeout(klass.__timeout || @worker_timeout) do
Thread.current['meta'] = meta_data
begin
klass_inst = klass.new(@volt_app, channel, self)
result = klass_inst.send(method_name, *args)
cookies = klass_inst.fetch_cookies
ensure
Thread.current['meta'] = nil
end
end

result
result = nil
Thread.current['meta'] = meta_data
begin
klass_inst = klass.new(app, channel, dispatcher)
result = klass_inst.send(method_name, *args)
cookies = klass_inst.fetch_cookies
ensure
Thread.current['meta'] = nil
end

return result, cookies
else
# Unsafe method
promise.reject(RuntimeError.new("unsafe method: #{method_name}"))
end

# Called after task runs or fails
finish = proc do |error|
if error.is_a?(Timeout::Error)
# re-raise with a message
error = Timeout::Error.new("Task Timed Out after #{@worker_timeout} seconds: #{message}")
end

run_time = ((Time.now.to_f - start_time) * 1000).round(3)
Volt.logger.log_dispatch(class_name, method_name, run_time, args, error)
end

# Run the promise and pass the return value/error back to the client
promise.then do |result|
reply = EJSON.stringify(['response', callback_id, result, nil, cookies])
channel.send_string_message(reply)

finish.call
end.fail do |error|
finish.call(error)
# Convert the error into a string so it can be serialized.
error_str = "#{error.class.to_s}: #{error.to_s}"

channel.send_message('response', callback_id, nil, error_str, cookies)
raise RuntimeError.new("unsafe method: #{method_name}")
end

end
end
end