From b81c5325b0f406c1daacfda11efd7c5727006f4f Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Fri, 24 Jan 2025 15:43:54 -0800 Subject: [PATCH] Made newTerm operation idempotent instead of returning AlreadyFenced error --- common/error_codes.go | 2 -- coordinator/impl/shard_controller.go | 2 +- server/follower_controller.go | 10 ---------- server/follower_controller_test.go | 20 ++++++++++++++++---- 4 files changed, 17 insertions(+), 17 deletions(-) diff --git a/common/error_codes.go b/common/error_codes.go index b4205c3f..8d99363e 100644 --- a/common/error_codes.go +++ b/common/error_codes.go @@ -33,7 +33,6 @@ const ( CodeNamespaceNotFound codes.Code = 110 CodeNotificationsNotEnabled codes.Code = 111 CodeFollowerAlreadyPresent codes.Code = 112 - CodeFollowerAlreadyFenced codes.Code = 113 ) var ( @@ -50,5 +49,4 @@ var ( ErrorNamespaceNotFound = status.Error(CodeNamespaceNotFound, "oxia: namespace not found") ErrorNotificationsNotEnabled = status.Error(CodeNotificationsNotEnabled, "oxia: notifications not enabled on namespace") ErrorFollowerAlreadyPresent = status.Error(CodeFollowerAlreadyPresent, "oxia: follower is already present") - ErrorFollowerAlreadyFenced = status.Error(CodeFollowerAlreadyFenced, "oxia: follower is already fenced") ) diff --git a/coordinator/impl/shard_controller.go b/coordinator/impl/shard_controller.go index 3e919079..36961db9 100644 --- a/coordinator/impl/shard_controller.go +++ b/coordinator/impl/shard_controller.go @@ -498,7 +498,7 @@ func (s *shardController) newTermAndAddFollower(ctx context.Context, node model. func (s *shardController) internalNewTermAndAddFollower(ctx context.Context, node model.ServerAddress, res chan error) { fr, err := s.newTerm(ctx, node) - if err != nil && status.Code(err) != common.CodeFollowerAlreadyFenced { + if err != nil { res <- err return } diff --git a/server/follower_controller.go b/server/follower_controller.go index 4c75d773..7f0b2688 100644 --- a/server/follower_controller.go +++ b/server/follower_controller.go @@ -267,16 +267,6 @@ func (fc *followerController) NewTerm(req *proto.NewTermRequest) (*proto.NewTerm slog.Int64("new-term", req.Term), ) return nil, common.ErrorInvalidTerm - } else if req.Term == fc.term && fc.status != proto.ServingStatus_FENCED { - // It's OK to receive a duplicate Fence request, for the same term, as long as we haven't moved - // out of the Fenced state for that term - fc.log.Warn( - "Failed to fence with same term in invalid state", - slog.Int64("follower-term", fc.term), - slog.Int64("new-term", req.Term), - slog.Any("status", fc.status), - ) - return nil, common.ErrorFollowerAlreadyFenced } if fc.db == nil { diff --git a/server/follower_controller_test.go b/server/follower_controller_test.go index cb19dba0..792faf10 100644 --- a/server/follower_controller_test.go +++ b/server/follower_controller_test.go @@ -368,8 +368,7 @@ func TestFollower_DuplicateNewTermInFollowerState(t *testing.T) { stream := newMockServerReplicateStream() go func() { - // cancelled due to fc.Close() below - assert.ErrorIs(t, fc.Replicate(stream), context.Canceled) + assert.NoError(t, fc.Replicate(stream)) }() stream.AddRequest(createAddRequest(t, 1, 0, map[string]string{"a": "0", "b": "1"}, 10)) @@ -384,8 +383,21 @@ func TestFollower_DuplicateNewTermInFollowerState(t *testing.T) { }, 10*time.Second, 10*time.Millisecond) r, err := fc.NewTerm(&proto.NewTermRequest{Term: 1}) - assert.Nil(t, r) - assert.Equal(t, common.CodeFollowerAlreadyFenced, status.Code(err)) + assert.NoError(t, err) + assert.NotNil(t, r) + assert.EqualValues(t, r1.Offset, r.HeadEntryId.Offset) + assert.EqualValues(t, 1, r.HeadEntryId.Term) + + stream.AddRequest(createAddRequest(t, 1, 1, map[string]string{"a": "1", "b": "2"}, 11)) + + // Wait for acks + r2 := stream.GetResponse() + + assert.EqualValues(t, 1, r2.Offset) + + assert.Eventually(t, func() bool { + return fc.CommitOffset() == 1 + }, 10*time.Second, 10*time.Millisecond) assert.NoError(t, fc.Close()) assert.NoError(t, kvFactory.Close())