Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split ORM Logic to core [NEW] #15

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 4 additions & 138 deletions lib/sidekiq/active_record/manager_worker.rb
Original file line number Diff line number Diff line change
@@ -1,154 +1,20 @@
module Sidekiq
module ActiveRecord
class ManagerWorker
include Sidekiq::Worker

DEFAULT_IDENTIFIER_KEY = :id
DEFAULT_BATCH_SIZE = 1000

def perform(options = {})
default_query = self.class.get_default_models_query
self.class.perform_query_async(default_query, options)
end

class ManagerWorker < Sidekiq::Orm::ManagerWorker

class << self

# For a given model collection, it delegates each model to a sub-worker (e.g TaskWorker)
# Specify the TaskWorker with the `sidekiq_delegate_task_to` method.
#
# @param models_query ActiveRecord::Relation
# @param options Hash
# :worker_class - the worker class to delegate the task to. Alternative to the default `sidekiq_delegate_task_to`
# :identifier_key - the model identifier column. Default 'id'
# :additional_keys - additional model keys
# :batch_size - Specifies the size of the batch. Default to 1000.
#
# @example:
# class UserTaskWorker
# def perform(user_id)
# # user task logic
# end
# end
#
# class UserSyncer
# include Sidekiq::ActiveRecord::ManagerWorker
#
# sidekiq_delegate_task_to :user_task_worker # or UserTaskWorker
# sidekiq_manager_options :batch_size => 500,
# :identifier_key => :user_token,
# :additional_keys => [:status]
# end
#
# UserSyncer.perform_query_async(User.active, :batch_size => 300)
#
#
# is equivalent to doing:
# User.active.each {|user| UserTaskWorker.perform(user.id) }
#
def perform_query_async(models_query, options = {})
set_runtime_options(options)
models = prepare_models_query(models_query)
models.find_in_batches(batch_size: batch_size) do |models_batch|
model_attributes = models_batch.map { |model| model_attributes(model) }
Sidekiq::Client.push_bulk(class: worker_class, args: model_attributes)
def find_in_batches(models)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@seuros WDYT about this?

also, I looked into Mongoid's, for possible solutions - this is basic pagination...

see:
https://github.com/mongoid/mongoid/issues/1334

models.find_in_batches(batch_size: batch_size) do |batch|
yield batch
end
end

# @required
# The task worker to delegate to.
# @param worker_klass (Sidekiq::Worker, Symbol) - UserTaskWorker or :user_task_worker
def sidekiq_delegate_task_to(worker_klass)
case worker_klass
when String, Symbol
worker_klass.to_s.split('_').map(&:capitalize).join.constantize
else
worker_klass
end
get_sidekiq_manager_options[:worker_class] = worker_klass
end

# Allows customization for this type of ManagerWorker.
# Legal options:
#
# :worker_class - the worker class to delegate the task to. Alternative to `sidekiq_delegate_task_to`
# :identifier_key - the model identifier column. Default 'id'
# :additional_keys - additional model keys
# :batch_size - Specifies the size of the batch. Default to 1000.
def sidekiq_manager_options(opts = {})
@sidekiq_manager_options_hash = get_sidekiq_manager_options.merge((opts || {}))
end

# The default of query to run, when the workers runs perform
# example
# class UserManagerWorker < Sidekiq::ActiveRecord::ManagerWorker
# sidekiq_delegate_task_to UserTaskWorker
# default_models_query -> { User.active }
# end
#
# UserManagerWorker.perform_async(:batch_size => 300)
def default_models_query(query)
@query = query
end

def get_default_models_query
@query.call() if @query.present?
end

def default_worker_manager_options
{
identifier_key: DEFAULT_IDENTIFIER_KEY,
additional_keys: [],
batch_size: DEFAULT_BATCH_SIZE
}
end

# returns the model attributes array:
# [model_id, attr1, attr2, ...]
def model_attributes(model)
additional_attributes = additional_keys.map { |key| model.send(key) }
id_attribute = model.send(identifier_key)
additional_attributes.unshift(id_attribute)
end

def prepare_models_query(models_query)
selected_attributes = [models_query.primary_key.to_sym, identifier_key, additional_keys].uniq
models_query.select(selected_attributes)
end

def worker_class
fail NotImplementedError.new('`worker_class` was not specified') unless manager_options[:worker_class].present?
manager_options[:worker_class]
end

def identifier_key
manager_options[:identifier_key]
end

def additional_keys
manager_options[:additional_keys]
end

def batch_size
manager_options[:batch_size]
end

def manager_options
get_sidekiq_manager_options.merge(runtime_options)
end

def get_sidekiq_manager_options
@sidekiq_manager_options_hash ||= default_worker_manager_options
end

def runtime_options
@sidekiq_manager_runtime_options || {}
end

def set_runtime_options(options={})
@sidekiq_manager_runtime_options = options.delete_if { |_, v| v.to_s.strip == '' }
end

end

end
Expand Down
149 changes: 4 additions & 145 deletions lib/sidekiq/active_record/task_worker.rb
Original file line number Diff line number Diff line change
@@ -1,158 +1,17 @@
module Sidekiq
module ActiveRecord
class TaskWorker
include Sidekiq::Worker

attr_reader :task_model

# @example:
# class UserMailerTaskWorker < Sidekiq::ActiveRecord::TaskWorker
#
# sidekiq_task_model :user_model # or UserModel
# sidekiq_task_options :identifier_key => :token
#
# def perform_on_model
# UserMailer.deliver_registration_confirmation(user, email_type)
# end
#
# def not_found_model(token)
# Log.error "User not found for token:#{token}"
# end
#
# def should_perform_on_model?
# user.active?
# end
#
# def did_not_perform_on_model
# Log.error "User #{user.token} is inactive"
# end
#
# end
#
#
# UserMailerTaskWorker.perform(user.id, :new_email)
#
def perform(identifier, *args)
@task_model = fetch_model(identifier, *args)
return not_found_model(identifier, *args) unless @task_model.present?

if should_perform_on_model?
perform_on_model(*args)
else
did_not_perform_on_model
end
end

def perform_on_model(*args)
task_model
end

# Hook that can block perform_on_model from being triggered,
# e.g in cases when the model is no longer valid
def should_perform_on_model?
true
end

# Hook to handel a model that was not performed
def did_not_perform_on_model
task_model
end

# Hook to handel not found model
def not_found_model(identifier, *args)
identifier
end
class TaskWorker < Sidekiq::Orm::TaskWorker

def fetch_model(identifier, *args)
self.class.model_class.find_by(self.class.identifier_key => identifier)
end


class << self

def sidekiq_task_model(model_klass)
return if model_klass.blank?

setup_task_model_alias(model_klass)

get_sidekiq_task_options[:model_class] = active_record_class(model_klass)
end

def model_class
klass = get_sidekiq_task_options[:model_class]
fail NotImplementedError.new('`sidekiq_task_model` was not specified') unless klass.present?
klass
end

def identifier_key
get_sidekiq_task_options[:identifier_key]
end

#
# Allows customization for this type of TaskWorker.
# Legal options:
#
# :identifier_key - the model identifier column. Default 'id'
def sidekiq_task_options(opts = {})
@sidekiq_task_options_hash = get_sidekiq_task_options.merge((opts).symbolize_keys!)
end


private

# aliases task_model with the name of the model
#
# example:
# sidekiq_task_model: AdminUser # or :admin_user
#
# then the worker will have access to `admin_user`, which is an alias to `task_model`
#
# def perform_on_admin_user
# admin_user == task_model
# end
#
# it will add the following method aliases to the hooks:
#
# def not_found_admin_user; end
# def should_perform_on_admin_user?; end
# def did_not_perform_on_admin_user; end
#
def setup_task_model_alias(model_klass_name)
if model_klass_name.is_a?(Class)
model_klass_name = model_klass_name.name.underscore
end
{
:task_model => model_klass_name,
:fetch_model => "fetch_#{model_klass_name}",
:not_found_model => "not_found_#{model_klass_name}",
:should_perform_on_model? => "should_perform_on_#{model_klass_name}?",
:did_not_perform_on_model => "did_not_perform_on_#{model_klass_name}",
:perform_on_model => "perform_on_#{model_klass_name}"
}.each do |old_name, new_name|
self.class_exec do
alias_method new_name.to_sym, old_name
end
end
end

def get_sidekiq_task_options
@sidekiq_task_options_hash ||= default_worker_task_options
end

def default_worker_task_options
{
identifier_key: :id
}
end
protected

def active_record_class(model_klass)
begin
model_klass = model_klass.to_s.classify.constantize
raise unless model_klass <= ::ActiveRecord::Base
rescue
fail ArgumentError.new '`sidekiq_task_model` must be an ActiveRecord model'
end
model_klass
def task_model_base_class
::ActiveRecord::Base
end

end
Expand Down
9 changes: 9 additions & 0 deletions lib/sidekiq/activerecord.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,15 @@
require 'sidekiq/active_record/version'


module Sidekiq
module Orm
extend ActiveSupport::Autoload

autoload :TaskWorker
autoload :ManagerWorker
end
end

module Sidekiq
module ActiveRecord
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't we nest ActiveRecord into Orm ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mperham , What do you think ? Should we nest all Orm modules under Orm to not bloat the Sidekiq Namespace ?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, please don't use the Sidekiq module directly. You should use your own base module: SidekiqOrm or similar.

extend ActiveSupport::Autoload
Expand Down
Loading