Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split ORM from Logic #4

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 8 additions & 92 deletions lib/sidekiq/active_record/manager_worker.rb
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
module Sidekiq
module ActiveRecord
module ManagerWorker
extend Sidekiq::Worker

DEFAULT_IDENTIFIER_KEY = :id
DEFAULT_BATCH_SIZE = 1000
extend Sidekiq::Orm::ManagerWorker

def self.included(base)
base.extend(ClassMethods)
base.class_attribute :sidekiq_manager_options_hash
end

module ClassMethods
include Sidekiq::Orm::ManagerWorker::ClassMethods
# For a given model collection, it delegates each model to a sub-worker (e.g TaskWorker)
# Specify the TaskWorker with the `sidekiq_delegate_task_to` method.
#
Expand Down Expand Up @@ -42,99 +40,17 @@ module ClassMethods
# is equivalent to doing:
# User.active.each {|user| UserTaskWorker.perform(user.id) }
#
def perform_query_async(models_query, options = {})
set_runtime_options(options)
models = models_query.select(selected_attributes)
models.find_in_batches(batch_size: batch_size) do |models_batch|
model_attributes = models_batch.map { |model| model_attributes(model) }
Sidekiq::Client.push_bulk(class: worker_class, args: model_attributes)
end
end

# @required
# The task worker to delegate to.
# @param worker_klass (Sidekiq::Worker, Symbol) - UserTaskWorker or :user_task_worker
def sidekiq_delegate_task_to(worker_klass)
case worker_klass
when String, Symbol
worker_klass.to_s.split('_').map(&:capitalize).join.constantize
else
worker_klass
end
get_sidekiq_manager_options[:worker_class] = worker_klass
end

# Allows customization for this type of ManagerWorker.
# Legal options:
#
# :worker_class - the worker class to delegate the task to. Alternative to `sidekiq_delegate_task_to`
# :identifier_key - the model identifier column. Default 'id'
# :additional_keys - additional model keys
# :batch_size - Specifies the size of the batch. Default to 1000.
def sidekiq_manager_options(opts = {})
self.sidekiq_manager_options_hash = get_sidekiq_manager_options.merge((opts || {}))
end

# private

def default_worker_manager_options
{
identifier_key: DEFAULT_IDENTIFIER_KEY,
additional_keys: [],
batch_size: DEFAULT_BATCH_SIZE
}
end

# returns the model attributes array:
# [model_id, attr1, attr2, ...]
def model_attributes(model)
additional_attributes = additional_keys.map { |key| model.send(key) }
id_attribute = model.send(identifier_key)
additional_attributes.unshift(id_attribute)
end

def selected_attributes
attrs = [identifier_key, additional_keys]
attrs << DEFAULT_IDENTIFIER_KEY unless default_identifier? # :id must be included
attrs
def query_setup(models_query)
models_query.select(selected_attributes)
end

def worker_class
fail NotImplementedError.new('`worker_class` was not specified') unless manager_options[:worker_class].present?
manager_options[:worker_class]
end

def default_identifier?
identifier_key == DEFAULT_IDENTIFIER_KEY
end

def identifier_key
manager_options[:identifier_key]
end

def additional_keys
manager_options[:additional_keys]
end

def batch_size
manager_options[:batch_size]
end

def manager_options
get_sidekiq_manager_options.merge(runtime_options)
end

def get_sidekiq_manager_options
self.sidekiq_manager_options_hash ||= default_worker_manager_options
end

def runtime_options
@sidekiq_manager_runtime_options || {}
def run_query(models_query)
models_query.find_in_batches(batch_size: batch_size) do |models_batch|
yield models_batch
end
end

def set_runtime_options(options={})
@sidekiq_manager_runtime_options = options.delete_if { |_, v| v.to_s.strip == '' }
end
end
end
end
Expand Down
70 changes: 2 additions & 68 deletions lib/sidekiq/active_record/task_worker.rb
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
module Sidekiq
module ActiveRecord
module TaskWorker
extend Sidekiq::Worker
extend Sidekiq::Orm::TaskWorker

def self.included(base)
base.extend(ClassMethods)
base.class_attribute :sidekiq_task_options_hash
end

module ClassMethods
include Sidekiq::Orm::TaskWorker::ClassMethods
# @example:
# class UserMailerTaskWorker
# include Sidekiq::ActiveRecord::TaskWorker
Expand Down Expand Up @@ -37,77 +38,10 @@ module ClassMethods
#
# UserMailerTaskWorker.perform(user.id, :new_email)
#
def perform(identifier, *args)
model = fetch_model(identifier)
return not_found_model(identifier) unless model.present?

if model_valid?(model)
perform_on_model(model, *args)
else
invalid_model(model)
end
end

def sidekiq_task_model(model_klass)
if model_klass.is_a?(String) || model_klass.is_a?(Symbol)
model_klass = model_klass.to_s.split('_').map(&:capitalize).join.constantize
else
model_klass
end
get_sidekiq_task_options[:model_class] = model_klass
end

def perform_on_model(model)
model
end

# recheck the if one of the items is still valid
def model_valid?(_model)
true
end

# Hook to handel an invalid model
def invalid_model(_model)
end

# Hook to handel not found model
def not_found_model(_identifier)
end

# private

def fetch_model(identifier)
model_class.find_by(identifier_key => identifier)
end

def model_class
klass = get_sidekiq_task_options[:model_class]
fail NotImplementedError.new('`model_class` was not specified') unless klass.present?
klass
end

def identifier_key
get_sidekiq_task_options[:identifier_key]
end

#
# Allows customization for this type of TaskWorker.
# Legal options:
#
# :identifier_key - the model identifier column. Default 'id'
def sidekiq_task_options(opts = {})
self.sidekiq_task_options_hash = get_sidekiq_task_options.merge((opts || {}).symbolize_keys!)
end

def get_sidekiq_task_options
self.sidekiq_task_options_hash ||= default_worker_task_options
end

def default_worker_task_options
{
identifier_key: :id
}
end
end
end
end
Expand Down
9 changes: 9 additions & 0 deletions lib/sidekiq/activerecord.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,19 @@


module Sidekiq

module Orm
extend ActiveSupport::Autoload

autoload :TaskWorker
autoload :ManagerWorker
end

module ActiveRecord
extend ActiveSupport::Autoload

autoload :TaskWorker
autoload :ManagerWorker
end

end
151 changes: 151 additions & 0 deletions lib/sidekiq/orm/manager_worker.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
module Sidekiq
module Orm
module ManagerWorker
extend Sidekiq::Worker

DEFAULT_IDENTIFIER_KEY = :id
DEFAULT_BATCH_SIZE = 1000

def self.included(base)
base.extend(ClassMethods)
base.class_attribute :sidekiq_manager_options_hash
end

module ClassMethods
# For a given model collection, it delegates each model to a sub-worker (e.g TaskWorker)
# Specify the TaskWorker with the `sidekiq_delegate_task_to` method.
#
# @param models_query ActiveRecord::Relation
# @param options Hash
# :worker_class - the worker class to delegate the task to. Alternative to the default `sidekiq_delegate_task_to`
# :identifier_key - the model identifier column. Default 'id'
# :additional_keys - additional model keys
# :batch_size - Specifies the size of the batch. Default to 1000.
#
# @example:
# class UserTaskWorker
# include Sidekiq::ActiveRecord::TaskWorker
# end
#
# class UserSyncer
# include Sidekiq::ActiveRecord::ManagerWorker
#
# sidekiq_delegate_task_to :user_task_worker # or UserTaskWorker
# sidekiq_manager_options :batch_size => 500,
# :identifier_key => :user_token,
# :additional_keys => [:status]
# end
#
# UserSyncer.perform_query_async(User.active, :batch_size => 300)
#
#
# is equivalent to doing:
# User.active.each {|user| UserTaskWorker.perform(user.id) }
#
def perform_query_async(models_query, options = {})
set_runtime_options(options)
models_query = query_setup(models_query)
run_query(models_query) do |models_batch|
model_attributes = models_batch.map { |model| model_attributes(model) }
Sidekiq::Client.push_bulk(class: worker_class, args: model_attributes)
end
end

# @required
# The task worker to delegate to.
# @param worker_klass (Sidekiq::Worker, Symbol) - UserTaskWorker or :user_task_worker
def sidekiq_delegate_task_to(worker_klass)
case worker_klass
when String, Symbol
worker_klass.to_s.split('_').map(&:capitalize).join.constantize
else
worker_klass
end
get_sidekiq_manager_options[:worker_class] = worker_klass
end

# Allows customization for this type of ManagerWorker.
# Legal options:
#
# :worker_class - the worker class to delegate the task to. Alternative to `sidekiq_delegate_task_to`
# :identifier_key - the model identifier column. Default 'id'
# :additional_keys - additional model keys
# :batch_size - Specifies the size of the batch. Default to 1000.
def sidekiq_manager_options(opts = {})
self.sidekiq_manager_options_hash = get_sidekiq_manager_options.merge((opts || {}))
end


def query_setup(_models_query)
#override this in each ORM gem
end

def run_query(_models_query)
#override this in each ORM gem
end


# private

def default_worker_manager_options
{
identifier_key: DEFAULT_IDENTIFIER_KEY,
additional_keys: [],
batch_size: DEFAULT_BATCH_SIZE
}
end

# returns the model attributes array:
# [model_id, attr1, attr2, ...]
def model_attributes(model)
additional_attributes = additional_keys.map { |key| model.send(key) }
id_attribute = model.send(identifier_key)
additional_attributes.unshift(id_attribute)
end

def selected_attributes
attrs = [identifier_key, additional_keys]
attrs << DEFAULT_IDENTIFIER_KEY unless default_identifier? # :id must be included
attrs
end

def worker_class
fail NotImplementedError.new('`worker_class` was not specified') unless manager_options[:worker_class].present?
manager_options[:worker_class]
end

def default_identifier?
identifier_key == DEFAULT_IDENTIFIER_KEY
end

def identifier_key
manager_options[:identifier_key]
end

def additional_keys
manager_options[:additional_keys]
end

def batch_size
manager_options[:batch_size]
end

def manager_options
get_sidekiq_manager_options.merge(runtime_options)
end

def get_sidekiq_manager_options
self.sidekiq_manager_options_hash ||= default_worker_manager_options
end

def runtime_options
@sidekiq_manager_runtime_options || {}
end

def set_runtime_options(options={})
@sidekiq_manager_runtime_options = options.delete_if { |_, v| v.to_s.strip == '' }
end
end
end
end
end
Loading