Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Using thread pool to transform records #9

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions lib/database_transform.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,7 @@ module DatabaseTransform
autoload :SchemaTableRecordMapping
autoload :SchemaTable

autoload :ThreadPool

require 'database_transform/railtie' if defined?(Rails)
end
10 changes: 10 additions & 0 deletions lib/database_transform/schema.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,17 @@ def self.transform_table(source_table, args = {}, &proc)
transform.instance_eval(&proc) if proc
end

delegate :concurrency, to: :class
class << self
# Store the block if a block is given, otherwise the stored block will be returned.
def concurrency(number = nil)
if number
@concurrency = number
else
@concurrency
end
end

private

def prepare_models(source_table, destination_table)
Expand Down
8 changes: 7 additions & 1 deletion lib/database_transform/schema_table.rb
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,15 @@ def transform!(schema)
default_scope = @default_scope || @source.method(:all)
collection = @source.instance_exec(&default_scope)

pool_size = schema && schema.concurrency ? schema.concurrency : 1
pool = DatabaseTransform::ThreadPool.new(pool_size)
in_batches(collection) do |group|
group.each { |record| transform_record!(schema, record) }
pool.schedule do
group.each { |record| transform_record!(schema, record) }
end
end

pool.wait
end

# Transform a collection of records in batches. This method uses `find_in_batches` to split the large collection and
Expand Down
46 changes: 46 additions & 0 deletions lib/database_transform/thread_pool.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
class DatabaseTransform::ThreadPool
# Special job to stop the thread.
STOP = :stop

def initialize(size)
@size = size
@jobs = Queue.new

@workers = (1..@size).map do
thread = Thread.new do
loop do
job = @jobs.pop
break if job == STOP

execute_job(job)
end
end

thread.abort_on_exception = true
thread
end
end

def schedule(&job)
@jobs << job
end

# Wait for all jobs to finish.
def wait
@size.times do
@jobs << STOP
end

@workers.each(&:join)
end

private

def execute_job(job)
if @around_job_proc
@around_job_proc.call(job)
else
job.call
end
end
end
16 changes: 16 additions & 0 deletions spec/database_transform/schema_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,22 @@ class DummySchema3 < DatabaseTransform::Schema; end
end
end

describe 'configurations' do
class DummySchemaWithConfiguration < DatabaseTransform::Schema
concurrency 4

transform_table :source, to: 'destination' do
end
end
let(:schema) { DummySchemaWithConfiguration.new }

describe '.concurrency' do
subject { schema.concurrency }

it { is_expected.to eq(4) }
end
end

describe '.transform_table' do
it 'defines models for symbol source tables' do
expect(DummySchema::Source.const_defined?(:Source)).to be_truthy
Expand Down
1 change: 1 addition & 0 deletions spec/support/active_record.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
ActiveRecord::Base.configurations['dummy_schema_production'] = dummy_connection
ActiveRecord::Base.configurations['dummy_schema4'] = dummy_connection
ActiveRecord::Base.configurations['dummy_cyclic_dependency_schema'] = dummy_connection
ActiveRecord::Base.configurations['dummy_schema_with_configuration'] = dummy_connection

ActiveRecord::Base.establish_connection(:default)
puts "SQLite #{ActiveRecord::Base.connection.execute('SELECT sqlite_version()')}\n"
Expand Down