Skip to content

Commit

Permalink
test: add a case for node recovery in a broken cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
supercaracal committed Sep 28, 2024
1 parent bf83b04 commit adaf4fd
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 64 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ jobs:
matrix:
include:
- {redis: '7.2', ruby: '3.3'}
- {task: test_cluster_broken, restart: 'no', startup: '6'}
- {redis: '7.2', ruby: '3.3', compose: compose.ssl.yaml}
- {redis: '7.2', ruby: '3.3', driver: 'hiredis'}
- {redis: '7.2', ruby: '3.3', driver: 'hiredis', compose: compose.ssl.yaml}
Expand All @@ -39,7 +40,6 @@ jobs:
- {task: test_cluster_state, pattern: 'ScaleReadLatency', compose: compose.valkey.yaml, redis: '8', replica: '2', startup: '9'}
- {ruby: 'jruby'}
- {ruby: 'truffleruby'}
- {task: test_cluster_broken, restart: 'no', startup: '6'}
- {task: test_cluster_down}
- {redis: '8', ruby: '3.3', compose: compose.valkey.yaml, replica: '2'}
- {redis: '7.2', ruby: '3.2', compose: compose.auth.yaml}
Expand Down
19 changes: 0 additions & 19 deletions compose.valkey.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -86,22 +86,3 @@ services:
condition: service_healthy
node9:
condition: service_healthy
ruby:
image: "ruby:${RUBY_VERSION:-3}"
restart: always
working_dir: /client
volumes:
- .:/client
command:
- ruby
- "-e"
- 'Signal.trap(:INT, "EXIT"); Signal.trap(:TERM, "EXIT"); loop { sleep 1 }'
environment:
REDIS_HOST: node1
cap_drop:
- ALL
healthcheck:
test: ["CMD", "ruby", "-e", "'puts 1'"]
interval: "5s"
timeout: "3s"
retries: 3
89 changes: 45 additions & 44 deletions test/test_against_cluster_broken.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# frozen_string_literal: true

require 'json'
require 'testing_helper'

class TestAgainstClusterBroken < TestingWrapper
Expand Down Expand Up @@ -37,20 +38,36 @@ def teardown
"ClusterDownError: #{@cluster_down_error_count} = "
end

def test_a_replica_is_down
sacrifice = @controller.select_sacrifice_of_replica
do_test_a_node_is_down(sacrifice, number_of_keys: NUMBER_OF_KEYS)
def test_client_patience
prepare_test_data(number_of_keys: NUMBER_OF_KEYS)

# a replica
kill_a_node(@controller.select_sacrifice_of_replica)
wait_for_cluster_to_be_ready(wait_attempts: MAX_ATTEMPTS)
do_assertions(number_of_keys: NUMBER_OF_KEYS)
refute(@captured_commands.count('cluster', 'nodes').zero?, @captured_commands.to_a.map(&:command))

# a primary
kill_a_node(@controller.select_sacrifice_of_primary)
wait_for_cluster_to_be_ready(wait_attempts: MAX_ATTEMPTS)
do_assertions(number_of_keys: NUMBER_OF_KEYS)
refute(@captured_commands.count('cluster', 'nodes').zero?, @captured_commands.to_a.map(&:command))
end

def test_a_primary_is_down
sacrifice = @controller.select_sacrifice_of_primary
do_test_a_node_is_down(sacrifice, number_of_keys: NUMBER_OF_KEYS)
# recovery
revive_dead_nodes
wait_for_cluster_to_be_ready(wait_attempts: MAX_ATTEMPTS)
do_assertions(number_of_keys: NUMBER_OF_KEYS)
refute(@captured_commands.count('cluster', 'nodes').zero?, @captured_commands.to_a.map(&:command))
end

private

def prepare_test_data(number_of_keys:)
number_of_keys.times { |i| @client.call('SET', "pre-#{i}", i) }
number_of_keys.times { |i| @client.pipelined { |pi| pi.call('SET', "pre-pipelined-#{i}", i) } }
wait_for_replication
end

def wait_for_replication
client_side_timeout = TEST_TIMEOUT_SEC + 1.0
server_side_timeout = (TEST_TIMEOUT_SEC * 1000).to_i
Expand All @@ -59,61 +76,45 @@ def wait_for_replication
end
end

def do_test_a_node_is_down(sacrifice, number_of_keys:)
prepare_test_data(number_of_keys: number_of_keys)

kill_a_node(sacrifice, kill_attempts: MAX_ATTEMPTS)
wait_for_cluster_to_be_ready(wait_attempts: MAX_ATTEMPTS)

assert_equal('PONG', @client.call('PING'), 'Case: PING')
do_assertions_without_pipelining(number_of_keys: number_of_keys)
do_assertions_with_pipelining(number_of_keys: number_of_keys)
end
def wait_for_cluster_to_be_ready(wait_attempts:)
loop do
raise MaxRetryExceeded if wait_attempts <= 0

def prepare_test_data(number_of_keys:)
number_of_keys.times { |i| @client.call('SET', "pre-#{i}", i) }
number_of_keys.times { |i| @client.pipelined { |pi| pi.call('SET', "pre-pipelined-#{i}", i) } }
wait_for_replication
wait_attempts -= 1
break if @client.call('PING') == 'PONG'
rescue ::RedisClient::Cluster::NodeMightBeDown
@cluster_down_error_count += 1
ensure
sleep WAIT_SEC
end
end

def kill_a_node(sacrifice, kill_attempts:)
def kill_a_node(sacrifice)
refute_nil(sacrifice, "#{sacrifice.config.host}:#{sacrifice.config.port}")

loop do
raise MaxRetryExceeded if kill_attempts <= 0
`docker compose ps --format json`.lines.map { |line| JSON.parse(line) }.each do |service|
published_ports = service.fetch('Publishers').map { |e| e.fetch('PublishedPort') }.uniq
next unless published_ports.include?(sacrifice.config.port)

kill_attempts -= 1
sacrifice.call('SHUTDOWN', 'NOSAVE')
rescue ::RedisClient::CommandError => e
raise unless e.message.include?('Errors trying to SHUTDOWN')
rescue ::RedisClient::ConnectionError
service_name = service.fetch('Service')
system("docker compose --progress quiet pause #{service_name}", exception: true)
break
ensure
sleep WAIT_SEC
end

assert_raises(::RedisClient::ConnectionError) { sacrifice.call('PING') }
end

def wait_for_cluster_to_be_ready(wait_attempts:)
loop do
raise MaxRetryExceeded if wait_attempts <= 0

wait_attempts -= 1
break if @client.call('PING') == 'PONG'
rescue ::RedisClient::Cluster::NodeMightBeDown
@cluster_down_error_count += 1
ensure
sleep WAIT_SEC
def revive_dead_nodes
`docker compose ps --format json --status paused`.lines.map { |line| JSON.parse(line) }.each do |service|
service_name = service.fetch('Service')
system("docker compose --progress quiet unpause #{service_name}", exception: true)
end
end

def do_assertions_without_pipelining(number_of_keys:)
def do_assertions(number_of_keys:)
number_of_keys.times { |i| assert_equal(i.to_s, @client.call('GET', "pre-#{i}"), "Case: pre-#{i}: GET") }
number_of_keys.times { |i| assert_equal('OK', @client.call('SET', "post-#{i}", i), "Case: post-#{i}: SET") }
end

def do_assertions_with_pipelining(number_of_keys:)
want = Array.new(number_of_keys, &:to_s)
got = @client.pipelined { |pi| number_of_keys.times { |i| pi.call('GET', "pre-pipelined-#{i}") } }
assert_equal(want, got, 'Case: pre-pipelined: GET')
Expand Down

0 comments on commit adaf4fd

Please sign in to comment.