Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

introduced default shards #320

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
4 changes: 2 additions & 2 deletions lib/octopus/association_shard_tracking.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ module QueryOnCurrentShard

METHODS.each do |m|
define_method m.to_sym do |*args, &block|
if self.respond_to?(:proxy_association) && proxy_association
proxy_association.owner.run_on_shard { super(*args, &block) }
if defined?(@association) && @association
@association.owner.run_on_shard { super(*args, &block) }
else
super(*args, &block)
end
Expand Down
8 changes: 6 additions & 2 deletions lib/octopus/load_balancing/round_robin.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,12 @@ def initialize(slaves_list)
end

# Returns the next available slave in the pool
def next
@slaves_list[@slave_index = (@slave_index + 1) % @slaves_list.length]
def next(index = nil)
if index
@slaves_list[index]
else
@slaves_list[@slave_index = (@slave_index + 1) % @slaves_list.length]
end
end
end
end
Expand Down
84 changes: 60 additions & 24 deletions lib/octopus/proxy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,26 @@ def initialize_shards(config)
@shards = HashWithIndifferentAccess.new
@shards_slave_groups = HashWithIndifferentAccess.new
@slave_groups = HashWithIndifferentAccess.new
@shard_servers = HashWithIndifferentAccess.new
@groups = {}
@adapters = Set.new
@config = ActiveRecord::Base.connection_pool_without_octopus.spec.config

@default_shard = config['defaults'].try(:[], 'shard')
fail 'default shard shoule be set' if @default_shard.blank?

unless config.nil?
@entire_sharded = config['entire_sharded']
@shards_config = config[Octopus.rails_env]
end

@shards_config ||= []
default_slave_group_name = config['defaults'].try(:[], 'slave_group')
if @shards_config.is_a?(Hash)
@default_slave_groups = @shards_config.keys.inject(HashWithIndifferentAccess.new) { |h, k| h[k] = default_slave_group_name; h }
else
@default_slave_groups = {}
end

@shards_config.each do |key, value|
if value.is_a?(String)
Expand All @@ -35,6 +45,7 @@ def initialize_shards(config)
value.merge!(:octopus_shard => key)
initialize_adapter(value['adapter'])
@shards[key.to_sym] = connection_pool_for(value, "#{value['adapter']}_connection")
@shard_servers[key.to_sym] = [@shards[key.to_sym]]

slave_group_configs = value.select do |_k, v|
structurally_slave_group? v
Expand All @@ -45,8 +56,8 @@ def initialize_shards(config)
slave_group_configs.each do |slave_group_name, slave_configs|
slaves = HashWithIndifferentAccess.new
slave_configs.each do |slave_name, slave_config|
@shards[slave_name.to_sym] = connection_pool_for(slave_config, "#{value['adapter']}_connection")
slaves[slave_name.to_sym] = slave_name.to_sym
slaves[slave_name.to_sym] = connection_pool_for(slave_config, "#{value['adapter']}_connection")
@shard_servers[key.to_sym] << slaves[slave_name.to_sym]
end
slave_groups[slave_group_name.to_sym] = Octopus::SlaveGroup.new(slaves)
end
Expand All @@ -72,8 +83,6 @@ def initialize_shards(config)
end
end
end

@shards[:master] ||= ActiveRecord::Base.connection_pool_without_octopus
end

def initialize_replication(config)
Expand All @@ -98,17 +107,20 @@ def current_model=(model)
end

def current_shard
Thread.current['octopus.current_shard'] ||= :master
Thread.current['octopus.current_shard'] ||= @default_shard
end

def current_shard=(shard_symbol)
self.current_slave_group = nil
self.current_slave = nil

if shard_symbol.is_a?(Array)
shard_symbol.each { |symbol| fail "Nonexistent Shard Name: #{symbol}" if @shards[symbol].nil? }
elsif shard_symbol.is_a?(Hash)
hash = shard_symbol
shard_symbol = hash[:shard]
shard_symbol = hash[:shard]
slave_group_symbol = hash[:slave_group]
slave_symbol = hash[:slave]

if shard_symbol.nil? && slave_group_symbol.nil?
fail 'Neither shard or slave group must be specified'
Expand All @@ -119,12 +131,20 @@ def current_shard=(shard_symbol)
end

if slave_group_symbol.present?
if (@shards_slave_groups.try(:[], shard_symbol).present? && @shards_slave_groups[shard_symbol][slave_group_symbol].nil?) ||
(@shards_slave_groups.try(:[], shard_symbol).nil? && @slave_groups[slave_group_symbol].nil?)
if (slave_group_symbol != :master) && ((@shards_slave_groups.try(:[], shard_symbol).present? && @shards_slave_groups[shard_symbol][slave_group_symbol].nil?) ||
(@shards_slave_groups.try(:[], shard_symbol).nil? && @slave_groups[slave_group_symbol].nil?))
fail "Nonexistent Slave Group Name: #{slave_group_symbol} in shards config: #{@shards_config.inspect}"
end
self.current_slave_group = slave_group_symbol
end

if slave_symbol.present?
unless @shards_slave_groups[shard_symbol].try(:[], slave_group_symbol).try(:has_slave?, slave_symbol)
fail "Nonexistent Slave Name: #{slave_symbol} in slave group: #{slave_group_symbol}"
end

self.current_slave = slave_symbol
end
else
fail "Nonexistent Shard Name: #{shard_symbol}" if @shards[shard_symbol].nil?
end
Expand All @@ -146,13 +166,21 @@ def current_group=(group_symbol)
end

def current_slave_group
Thread.current['octopus.current_slave_group']
Thread.current['octopus.current_slave_group'] || @default_slave_groups[current_shard]
end

def current_slave_group=(slave_group_symbol)
Thread.current['octopus.current_slave_group'] = slave_group_symbol
end

def current_slave
Thread.current['octopus.current_slave']
end

def current_slave=(slave_symbol)
Thread.current['octopus.current_slave'] = slave_symbol
end

def block
Thread.current['octopus.block']
end
Expand Down Expand Up @@ -202,14 +230,14 @@ def shards_for_group(group)
# reconnect, but in Rails 3.1 the flag prevents this.
def safe_connection(connection_pool)
connection_pool.automatic_reconnect ||= true
if !connection_pool.connected? && @shards[:master].connection.query_cache_enabled
if !connection_pool.connected? && @shards[@default_shard].connection.query_cache_enabled
connection_pool.connection.enable_query_cache!
end
connection_pool.connection
end

def select_connection
safe_connection(@shards[shard_name])
def select_connection(connection = nil)
safe_connection(connection || @shards[shard_name])
end

def shard_name
Expand All @@ -235,21 +263,21 @@ def send_queries_to_multiple_shards(shards, &block)
end

def clean_connection_proxy
self.current_shard = :master
self.current_shard = @default_shard
self.current_model = nil
self.current_group = nil
self.block = nil
end

def check_schema_migrations(shard)
OctopusModel.using(shard).connection.table_exists?(
ActiveRecord::Migrator.schema_migrations_table_name,
ActiveRecord::Migrator.schema_migrations_table_name,
) || OctopusModel.using(shard).connection.initialize_schema_migrations_table
end

def transaction(options = {}, &block)
if !sharded && current_model_replicated?
run_queries_on_shard(:master) do
run_queries_on_shard(@default_shard) do
select_connection.transaction(options, &block)
end
else
Expand All @@ -266,8 +294,10 @@ def method_missing(method, *args, &block)
elsif should_send_queries_to_shard_slave_group?(method)
send_queries_to_shard_slave_group(method, *args, &block)
elsif should_send_queries_to_slave_group?(method)
raise NotImplementedError.new
send_queries_to_slave_group(method, *args, &block)
elsif should_send_queries_to_replicated_databases?(method)
raise NotImplementedError.new
send_queries_to_selected_slave(method, *args, &block)
else
select_connection.send(method, *args, &block)
Expand Down Expand Up @@ -312,7 +342,11 @@ def should_send_queries_to_shard_slave_group?(method)
end

def send_queries_to_shard_slave_group(method, *args, &block)
send_queries_to_balancer(@shards_slave_groups[current_shard][current_slave_group], method, *args, &block)
if current_slave_group == :master
send_queries_to_slave(@shards[current_shard], method, *args, &block)
else
send_queries_to_balancer(@shards_slave_groups[current_shard][current_slave_group], method, *args, &block)
end
end

def should_send_queries_to_slave_group?(method)
Expand All @@ -325,14 +359,18 @@ def send_queries_to_slave_group(method, *args, &block)

protected

def shard_servers
@shard_servers[current_shard]
end

# Ensure that a single failing slave doesn't take down the entire application
def with_each_healthy_shard
@shards.each do |shard_name, v|
shard_servers.each do |v|
begin
yield(v)
rescue => e
if Octopus.robust_environment?
Octopus.logger.error "Error on shard #{shard_name}: #{e.message}"
Octopus.logger.error "Error on shard #{v.spec.config['host']}, database #{v.spec.config['database']}: #{e.message}"
else
raise
end
Expand All @@ -349,7 +387,7 @@ def with_each_healthy_shard
end

ar_pools.each do |pool|
next if pool == @shards[:master] # Already handled this
next if pool == @shards[@default_shard] # Already handled this

begin
yield(pool)
Expand Down Expand Up @@ -413,7 +451,7 @@ def send_queries_to_selected_slave(method, *args, &block)
if current_model.replicated || fully_replicated?
selected_slave = @slaves_load_balancer.next
else
selected_slave = :master
selected_slave = @default_shard
end

send_queries_to_slave(selected_slave, method, *args, &block)
Expand All @@ -439,15 +477,13 @@ def slaves_grouped?
# Temporarily switch `current_shard` to the next slave in a slave group and send queries to it
# while preserving `current_shard`
def send_queries_to_balancer(balancer, method, *args, &block)
send_queries_to_slave(balancer.next, method, *args, &block)
send_queries_to_slave(balancer.next(current_slave), method, *args, &block)
end

# Temporarily switch `current_shard` to the specified slave and send queries to it
# while preserving `current_shard`
def send_queries_to_slave(slave, method, *args, &block)
using_shard(slave) do
select_connection.send(method, *args, &block)
end
select_connection(slave).send(method, *args, &block)
end

# Temporarily block cleaning connection proxy and run the block
Expand Down
27 changes: 21 additions & 6 deletions lib/octopus/slave_group.rb
Original file line number Diff line number Diff line change
@@ -1,13 +1,28 @@
module Octopus
class SlaveGroup
def initialize(slaves)
slaves = HashWithIndifferentAccess.new(slaves)
slaves_list = slaves.values
@load_balancer = Octopus::LoadBalancing::RoundRobin.new(slaves_list)
@name_index_map = HashWithIndifferentAccess.new
@slaves_list, index = [], 0

slaves.each do |name, db_connection_pool|
@slaves_list << db_connection_pool
@name_index_map[name] = index
index += 1
end

@load_balancer = Octopus::LoadBalancing::RoundRobin.new(@slaves_list)
end

def has_slave?(slave_name)
@name_index_map.has_key?(slave_name)
end

def slaves
@slaves_list
end

def next
@load_balancer.next
def next(slave_name = nil)
@load_balancer.next(@name_index_map[slave_name])
end
end
end
end
6 changes: 6 additions & 0 deletions spec/octopus/association_shard_tracking_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,12 @@
expect(@brazil_client.comments.count).to eq(2)
end

it 'group + count' do
expect(@brazil_client.comments.group(:id).count.length).to eq(1)
_cmt = @brazil_client.comments.create(:name => 'Builded Comment')
expect(@brazil_client.comments.group(:id).count.length).to eq(2)
end

it 'size' do
expect(@brazil_client.comments.size).to eq(1)
_cmt = @brazil_client.comments.create(:name => 'Builded Comment')
Expand Down