From adaf4fd8e7e32b98011d45773fb76739638f7de1 Mon Sep 17 00:00:00 2001 From: Taishi Kasuga Date: Sat, 28 Sep 2024 21:20:18 +0900 Subject: [PATCH] test: add a case for node recovery in a broken cluster --- .github/workflows/test.yaml | 2 +- compose.valkey.yaml | 19 ------ test/test_against_cluster_broken.rb | 89 +++++++++++++++-------------- 3 files changed, 46 insertions(+), 64 deletions(-) diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index d24d439..e52f089 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -28,6 +28,7 @@ jobs: matrix: include: - {redis: '7.2', ruby: '3.3'} + - {task: test_cluster_broken, restart: 'no', startup: '6'} - {redis: '7.2', ruby: '3.3', compose: compose.ssl.yaml} - {redis: '7.2', ruby: '3.3', driver: 'hiredis'} - {redis: '7.2', ruby: '3.3', driver: 'hiredis', compose: compose.ssl.yaml} @@ -39,7 +40,6 @@ jobs: - {task: test_cluster_state, pattern: 'ScaleReadLatency', compose: compose.valkey.yaml, redis: '8', replica: '2', startup: '9'} - {ruby: 'jruby'} - {ruby: 'truffleruby'} - - {task: test_cluster_broken, restart: 'no', startup: '6'} - {task: test_cluster_down} - {redis: '8', ruby: '3.3', compose: compose.valkey.yaml, replica: '2'} - {redis: '7.2', ruby: '3.2', compose: compose.auth.yaml} diff --git a/compose.valkey.yaml b/compose.valkey.yaml index 5a5a3d7..07b812e 100644 --- a/compose.valkey.yaml +++ b/compose.valkey.yaml @@ -86,22 +86,3 @@ services: condition: service_healthy node9: condition: service_healthy - ruby: - image: "ruby:${RUBY_VERSION:-3}" - restart: always - working_dir: /client - volumes: - - .:/client - command: - - ruby - - "-e" - - 'Signal.trap(:INT, "EXIT"); Signal.trap(:TERM, "EXIT"); loop { sleep 1 }' - environment: - REDIS_HOST: node1 - cap_drop: - - ALL - healthcheck: - test: ["CMD", "ruby", "-e", "'puts 1'"] - interval: "5s" - timeout: "3s" - retries: 3 diff --git a/test/test_against_cluster_broken.rb b/test/test_against_cluster_broken.rb index 05ee480..7410102 100644 --- a/test/test_against_cluster_broken.rb +++ b/test/test_against_cluster_broken.rb @@ -1,5 +1,6 @@ # frozen_string_literal: true +require 'json' require 'testing_helper' class TestAgainstClusterBroken < TestingWrapper @@ -37,20 +38,36 @@ def teardown "ClusterDownError: #{@cluster_down_error_count} = " end - def test_a_replica_is_down - sacrifice = @controller.select_sacrifice_of_replica - do_test_a_node_is_down(sacrifice, number_of_keys: NUMBER_OF_KEYS) + def test_client_patience + prepare_test_data(number_of_keys: NUMBER_OF_KEYS) + + # a replica + kill_a_node(@controller.select_sacrifice_of_replica) + wait_for_cluster_to_be_ready(wait_attempts: MAX_ATTEMPTS) + do_assertions(number_of_keys: NUMBER_OF_KEYS) + refute(@captured_commands.count('cluster', 'nodes').zero?, @captured_commands.to_a.map(&:command)) + + # a primary + kill_a_node(@controller.select_sacrifice_of_primary) + wait_for_cluster_to_be_ready(wait_attempts: MAX_ATTEMPTS) + do_assertions(number_of_keys: NUMBER_OF_KEYS) refute(@captured_commands.count('cluster', 'nodes').zero?, @captured_commands.to_a.map(&:command)) - end - def test_a_primary_is_down - sacrifice = @controller.select_sacrifice_of_primary - do_test_a_node_is_down(sacrifice, number_of_keys: NUMBER_OF_KEYS) + # recovery + revive_dead_nodes + wait_for_cluster_to_be_ready(wait_attempts: MAX_ATTEMPTS) + do_assertions(number_of_keys: NUMBER_OF_KEYS) refute(@captured_commands.count('cluster', 'nodes').zero?, @captured_commands.to_a.map(&:command)) end private + def prepare_test_data(number_of_keys:) + number_of_keys.times { |i| @client.call('SET', "pre-#{i}", i) } + number_of_keys.times { |i| @client.pipelined { |pi| pi.call('SET', "pre-pipelined-#{i}", i) } } + wait_for_replication + end + def wait_for_replication client_side_timeout = TEST_TIMEOUT_SEC + 1.0 server_side_timeout = (TEST_TIMEOUT_SEC * 1000).to_i @@ -59,61 +76,45 @@ def wait_for_replication end end - def do_test_a_node_is_down(sacrifice, number_of_keys:) - prepare_test_data(number_of_keys: number_of_keys) - - kill_a_node(sacrifice, kill_attempts: MAX_ATTEMPTS) - wait_for_cluster_to_be_ready(wait_attempts: MAX_ATTEMPTS) - - assert_equal('PONG', @client.call('PING'), 'Case: PING') - do_assertions_without_pipelining(number_of_keys: number_of_keys) - do_assertions_with_pipelining(number_of_keys: number_of_keys) - end + def wait_for_cluster_to_be_ready(wait_attempts:) + loop do + raise MaxRetryExceeded if wait_attempts <= 0 - def prepare_test_data(number_of_keys:) - number_of_keys.times { |i| @client.call('SET', "pre-#{i}", i) } - number_of_keys.times { |i| @client.pipelined { |pi| pi.call('SET', "pre-pipelined-#{i}", i) } } - wait_for_replication + wait_attempts -= 1 + break if @client.call('PING') == 'PONG' + rescue ::RedisClient::Cluster::NodeMightBeDown + @cluster_down_error_count += 1 + ensure + sleep WAIT_SEC + end end - def kill_a_node(sacrifice, kill_attempts:) + def kill_a_node(sacrifice) refute_nil(sacrifice, "#{sacrifice.config.host}:#{sacrifice.config.port}") - loop do - raise MaxRetryExceeded if kill_attempts <= 0 + `docker compose ps --format json`.lines.map { |line| JSON.parse(line) }.each do |service| + published_ports = service.fetch('Publishers').map { |e| e.fetch('PublishedPort') }.uniq + next unless published_ports.include?(sacrifice.config.port) - kill_attempts -= 1 - sacrifice.call('SHUTDOWN', 'NOSAVE') - rescue ::RedisClient::CommandError => e - raise unless e.message.include?('Errors trying to SHUTDOWN') - rescue ::RedisClient::ConnectionError + service_name = service.fetch('Service') + system("docker compose --progress quiet pause #{service_name}", exception: true) break - ensure - sleep WAIT_SEC end assert_raises(::RedisClient::ConnectionError) { sacrifice.call('PING') } end - def wait_for_cluster_to_be_ready(wait_attempts:) - loop do - raise MaxRetryExceeded if wait_attempts <= 0 - - wait_attempts -= 1 - break if @client.call('PING') == 'PONG' - rescue ::RedisClient::Cluster::NodeMightBeDown - @cluster_down_error_count += 1 - ensure - sleep WAIT_SEC + def revive_dead_nodes + `docker compose ps --format json --status paused`.lines.map { |line| JSON.parse(line) }.each do |service| + service_name = service.fetch('Service') + system("docker compose --progress quiet unpause #{service_name}", exception: true) end end - def do_assertions_without_pipelining(number_of_keys:) + def do_assertions(number_of_keys:) number_of_keys.times { |i| assert_equal(i.to_s, @client.call('GET', "pre-#{i}"), "Case: pre-#{i}: GET") } number_of_keys.times { |i| assert_equal('OK', @client.call('SET', "post-#{i}", i), "Case: post-#{i}: SET") } - end - def do_assertions_with_pipelining(number_of_keys:) want = Array.new(number_of_keys, &:to_s) got = @client.pipelined { |pi| number_of_keys.times { |i| pi.call('GET', "pre-pipelined-#{i}") } } assert_equal(want, got, 'Case: pre-pipelined: GET')