Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
KirIgor committed May 17, 2024
1 parent 9b600aa commit 72692ab
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 49 deletions.
77 changes: 30 additions & 47 deletions lib/umbrellio_utils/database.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
# frozen_string_literal: true

# rubocop:disable Metrics/ModuleLength
module UmbrellioUtils
module Database
extend self
Expand All @@ -26,9 +25,9 @@ def each_record(dataset, **options, &block)
primary_key = primary_key_from(**options)

with_temp_table(dataset, **options) do |ids|
if primary_key.is_a?(Array)
if primary_key.size > 1
where_expr = Sequel.|(*ids.map { |id| complex_key_expr(primary_key, id) })
dataset.model.where(where_expr).each(&block)
dataset.model.where(where_expr).reverse(row(primary_key)).each(&block)
else
dataset.model.where(primary_key => ids).reverse(primary_key).each(&block)
end
Expand All @@ -45,7 +44,7 @@ def with_temp_table(dataset, page_size: 1_000, sleep: nil, **options)

loop do
DB.transaction do
pk_set = pop_pk_batch(primary_key, temp_table_name, page_size)
pk_set = pop_next_pk_batch(temp_table_name, primary_key, page_size)
yield(pk_set) if pk_set.any?
end

Expand All @@ -63,64 +62,39 @@ def clear_lamian_logs!
Lamian.logger.send(:logdevs).each { |x| x.truncate(0) && x.rewind }
end

def create_temp_table(dataset, primary_key:)
def create_temp_table(dataset, **options)
time = Time.current
temp_table_name = "temp_#{dataset.model.table_name}_#{time.to_i}_#{time.nsec}".to_sym

DB.drop_table?(temp_table_name)
if primary_key.is_a?(Array)
create_complex_key_temp_table(temp_table_name, dataset, primary_key)
else
create_simple_key_temp_table(temp_table_name, dataset, primary_key)
end

temp_table_name
end

private

def create_simple_key_temp_table(temp_table_name, dataset, primary_key)
model = dataset.model
type = model.db_schema[primary_key][:db_type]

DB.create_table(temp_table_name, unlogged: true) do
column primary_key, type, primary_key: true
end

insert_ds = dataset.select(Sequel[model.table_name][primary_key])
DB[temp_table_name].disable_insert_returning.insert(insert_ds)
end

def create_complex_key_temp_table(temp_table_name, dataset, primary_key)
model = dataset.model
temp_table_name = "temp_#{model.table_name}_#{time.to_i}_#{time.nsec}".to_sym
primary_key = primary_key_from(**options)

DB.create_table(temp_table_name, unlogged: true) do
primary_key(:temp_table_id)

primary_key.each do |field|
type = model.db_schema[field][:db_type]
column field, type
end

primary_key primary_key
end

insert_ds = dataset.select(
Sequel.function(:row_number).over, *primary_key.map { |f| Sequel[model.table_name][f] }
)
insert_ds = dataset.select(*qualified_pk(model.table_name, primary_key))
DB[temp_table_name].disable_insert_returning.insert(insert_ds)

temp_table_name
end

def pop_pk_batch(primary_key, temp_table_name, batch_size)
pk_column = primary_key.is_a?(Array) ? :temp_table_id : primary_key
pk_expr = DB[temp_table_name].select(pk_column).reverse(pk_column).limit(batch_size)
deleted_items = DB[temp_table_name].where(pk_column => pk_expr).returning.delete
deleted_items.map do |item|
next complex_key_expr(primary_key, item) if primary_key.is_a?(Array)
item[primary_key]
end
private

def row(primary_key)
Sequel.function(:row, *primary_key)
end

def primary_key_from(**options)
options.fetch(:primary_key, :id)
Array(options.fetch(:primary_key, :id))
end

def qualified_pk(table_name, primary_key)
primary_key.map { |f| Sequel[table_name][f] }
end

def sleep_interval_from(sleep)
Expand All @@ -137,6 +111,15 @@ def sleep_interval_from(sleep)
def complex_key_expr(primary_key, record)
primary_key.to_h { |field| [field, record[field]] }
end

def pop_next_pk_batch(temp_table_name, primary_key, batch_size)
row = row(primary_key)
pk_expr = DB[temp_table_name].select(*primary_key).reverse(row).limit(batch_size)
deleted_items = DB[temp_table_name].where(row => pk_expr).returning.delete
deleted_items.map do |item|
next item if primary_key.size > 1
item[primary_key.first]
end
end
end
end
# rubocop:enable Metrics/ModuleLength
4 changes: 2 additions & 2 deletions spec/umbrellio_utils/database_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
Array.new(10) { |index| Hash[geo: "Europe #{index + 1}", nick: "user#{index + 1}"] }
end

let(:nicks) { complex_users_data.pluck(:nick) }
let(:reversed_nicks) { complex_users_data.pluck(:nick).reverse }

subject(:result_nicks) do
users = []
Expand All @@ -76,7 +76,7 @@
end

it "yields all records" do
expect(result_nicks).to match_array(nicks)
expect(result_nicks).to match_array(reversed_nicks)
expect(sleep_calls).to eq([])
end
end
Expand Down

0 comments on commit 72692ab

Please sign in to comment.