diff --git a/lib/database_transform.rb b/lib/database_transform.rb index 35eac2c..8c11121 100644 --- a/lib/database_transform.rb +++ b/lib/database_transform.rb @@ -13,5 +13,7 @@ module DatabaseTransform autoload :SchemaTableRecordMapping autoload :SchemaTable + autoload :ThreadPool + require 'database_transform/railtie' if defined?(Rails) end diff --git a/lib/database_transform/schema.rb b/lib/database_transform/schema.rb index 0a43fe2..c4fc408 100644 --- a/lib/database_transform/schema.rb +++ b/lib/database_transform/schema.rb @@ -23,7 +23,17 @@ def self.transform_table(source_table, args = {}, &proc) transform.instance_eval(&proc) if proc end + delegate :concurrency, to: :class class << self + # Store the block if a block is given, otherwise the stored block will be returned. + def concurrency(number = nil) + if number + @concurrency = number + else + @concurrency + end + end + private def prepare_models(source_table, destination_table) diff --git a/lib/database_transform/schema_table.rb b/lib/database_transform/schema_table.rb index 9e9f0d4..18a89b7 100644 --- a/lib/database_transform/schema_table.rb +++ b/lib/database_transform/schema_table.rb @@ -136,9 +136,15 @@ def transform!(schema) default_scope = @default_scope || @source.method(:all) collection = @source.instance_exec(&default_scope) + pool_size = schema && schema.concurrency ? schema.concurrency : 1 + pool = DatabaseTransform::ThreadPool.new(pool_size) in_batches(collection) do |group| - group.each { |record| transform_record!(schema, record) } + pool.schedule do + group.each { |record| transform_record!(schema, record) } + end end + + pool.wait end # Transform a collection of records in batches. This method uses `find_in_batches` to split the large collection and diff --git a/lib/database_transform/thread_pool.rb b/lib/database_transform/thread_pool.rb new file mode 100644 index 0000000..e464f8a --- /dev/null +++ b/lib/database_transform/thread_pool.rb @@ -0,0 +1,46 @@ +class DatabaseTransform::ThreadPool + # Special job to stop the thread. + STOP = :stop + + def initialize(size) + @size = size + @jobs = Queue.new + + @workers = (1..@size).map do + thread = Thread.new do + loop do + job = @jobs.pop + break if job == STOP + + execute_job(job) + end + end + + thread.abort_on_exception = true + thread + end + end + + def schedule(&job) + @jobs << job + end + + # Wait for all jobs to finish. + def wait + @size.times do + @jobs << STOP + end + + @workers.each(&:join) + end + + private + + def execute_job(job) + if @around_job_proc + @around_job_proc.call(job) + else + job.call + end + end +end diff --git a/spec/database_transform/schema_spec.rb b/spec/database_transform/schema_spec.rb index f1da9b5..bd0f0f7 100644 --- a/spec/database_transform/schema_spec.rb +++ b/spec/database_transform/schema_spec.rb @@ -38,6 +38,22 @@ class DummySchema3 < DatabaseTransform::Schema; end end end + describe 'configurations' do + class DummySchemaWithConfiguration < DatabaseTransform::Schema + concurrency 4 + + transform_table :source, to: 'destination' do + end + end + let(:schema) { DummySchemaWithConfiguration.new } + + describe '.concurrency' do + subject { schema.concurrency } + + it { is_expected.to eq(4) } + end + end + describe '.transform_table' do it 'defines models for symbol source tables' do expect(DummySchema::Source.const_defined?(:Source)).to be_truthy diff --git a/spec/support/active_record.rb b/spec/support/active_record.rb index d320e9c..3d96116 100644 --- a/spec/support/active_record.rb +++ b/spec/support/active_record.rb @@ -10,6 +10,7 @@ ActiveRecord::Base.configurations['dummy_schema_production'] = dummy_connection ActiveRecord::Base.configurations['dummy_schema4'] = dummy_connection ActiveRecord::Base.configurations['dummy_cyclic_dependency_schema'] = dummy_connection + ActiveRecord::Base.configurations['dummy_schema_with_configuration'] = dummy_connection ActiveRecord::Base.establish_connection(:default) puts "SQLite #{ActiveRecord::Base.connection.execute('SELECT sqlite_version()')}\n"