Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dm: fix leader did not retire after delete the key (#11604) #11732

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions dm/pkg/election/election.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,12 @@ func (e *Election) campaignLoop(ctx context.Context, session *concurrency.Sessio
e.l.Debug("begin to campaign", zap.Stringer("current member", e.info))

err2 := elec.Campaign(ctx2, e.infoStr)
failpoint.Inject("mockCapaignSucceedButReturnErr", func() {
if err2 == nil {
err2 = errors.New("mock campaign succeed but return error")
time.Sleep(time.Second)
}
})
if err2 != nil {
// because inner commit may return undetermined error, we try to delete the election key manually
deleted, err3 := e.ClearSessionIfNeeded(ctx, e.ID())
Expand All @@ -282,6 +288,7 @@ func (e *Election) campaignLoop(ctx context.Context, session *concurrency.Sessio
var (
leaderKey string
leaderInfo *CampaignerInfo
revision int64
)
eleObserveCh := elec.Observe(ctx2)

Expand All @@ -300,6 +307,7 @@ func (e *Election) campaignLoop(ctx context.Context, session *concurrency.Sessio
e.l.Info("get response from election observe", zap.String("key", string(resp.Kvs[0].Key)), zap.String("value", string(resp.Kvs[0].Value)))
leaderKey = string(resp.Kvs[0].Key)
leaderInfo, err = getCampaignerInfo(resp.Kvs[0].Value)
revision = resp.Header.Revision
if err != nil {
// this should never happened
e.l.Error("fail to get leader information", zap.String("value", string(resp.Kvs[0].Value)), zap.Error(err))
Expand Down Expand Up @@ -330,7 +338,7 @@ func (e *Election) campaignLoop(ctx context.Context, session *concurrency.Sessio

e.l.Info("become leader", zap.Stringer("current member", e.info))
e.notifyLeader(ctx, leaderInfo) // become the leader now
e.watchLeader(ctx, session, leaderKey, elec)
e.watchLeader(ctx, session, leaderKey, elec, revision)
e.l.Info("retire from leader", zap.Stringer("current member", e.info))
e.notifyLeader(ctx, nil) // need to re-campaign
oldLeaderID = ""
Expand Down Expand Up @@ -359,8 +367,8 @@ func (e *Election) notifyLeader(ctx context.Context, leaderInfo *CampaignerInfo)
}
}

func (e *Election) watchLeader(ctx context.Context, session *concurrency.Session, key string, elec *concurrency.Election) {
e.l.Debug("watch leader key", zap.String("key", key))
func (e *Election) watchLeader(ctx context.Context, session *concurrency.Session, key string, elec *concurrency.Election, revision int64) {
e.l.Debug("watch leader key", zap.String("key", key), zap.Int64("revision", revision), zap.Stringer("current member", e.info))

e.campaignMu.Lock()
e.resignCh = make(chan struct{})
Expand All @@ -374,7 +382,7 @@ func (e *Election) watchLeader(ctx context.Context, session *concurrency.Session

wCtx, cancel := context.WithCancel(ctx)
defer cancel()
wch := e.cli.Watch(wCtx, key)
wch := e.cli.Watch(wCtx, key, clientv3.WithRev(revision+1))

for {
if e.evictLeader.Load() {
Expand Down
79 changes: 79 additions & 0 deletions dm/pkg/election/election_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,3 +412,82 @@ func (t *testElectionSuite) TestElectionDeleteKey(c *C) {
c.Assert(err, IsNil)
wg.Wait()
}

func (t *testElectionSuite) TestElectionSucceedButReturnError(c *C) {
var (
timeout = 5 * time.Second
sessionTTL = 60
key = "unit-test/election-succeed-but-return-error"
ID1 = "member1"
ID2 = "member2"
addr1 = "127.0.0.1:1"
addr2 = "127.0.0.1:2"
)
cli, err := etcdutil.CreateClient([]string{t.endPoint}, nil)
c.Assert(err, IsNil)
defer cli.Close()

ctx1, cancel1 := context.WithCancel(context.Background())
defer cancel1()

e1, err := NewElection(ctx1, cli, sessionTTL, key, ID1, addr1, t.notifyBlockTime)
c.Assert(err, IsNil)
defer e1.Close()

// e1 should become the leader
select {
case leader := <-e1.LeaderNotify():
c.Assert(leader.ID, Equals, ID1)
case <-time.After(timeout):
c.Fatal("leader campaign timeout")
}
c.Assert(e1.IsLeader(), IsTrue)
_, leaderID, leaderAddr, err := e1.LeaderInfo(ctx1)
c.Assert(err, IsNil)
c.Assert(leaderID, Equals, e1.ID())
c.Assert(leaderAddr, Equals, addr1)

// start e2
ctx2, cancel2 := context.WithCancel(context.Background())
defer cancel2()
e2, err := NewElection(ctx2, cli, sessionTTL, key, ID2, addr2, t.notifyBlockTime)
c.Assert(err, IsNil)
defer e2.Close()
select {
case leader := <-e2.leaderCh:
c.Assert(leader.ID, Equals, ID1)
case <-time.After(timeout):
c.Fatal("leader campaign timeout")
}
// but the leader should still be e1
_, leaderID, leaderAddr, err = e2.LeaderInfo(ctx2)
c.Assert(err, IsNil)
c.Assert(leaderID, Equals, e1.ID())
c.Assert(leaderAddr, Equals, addr1)
c.Assert(e2.IsLeader(), IsFalse)

c.Assert(failpoint.Enable("github.com/pingcap/tiflow/dm/pkg/election/mockCapaignSucceedButReturnErr", `return()`), IsNil)
//nolint:errcheck
defer failpoint.Disable("github.com/pingcap/tiflow/dm/pkg/election/mockCapaignSucceedButReturnErr")

e1.Close() // stop the campaign for e1
c.Assert(e1.IsLeader(), IsFalse)

// e2 should become the leader
select {
case leader := <-e2.LeaderNotify():
c.Assert(leader.ID, Equals, ID2)
case <-time.After(timeout):
c.Fatal("leader campaign timeout")
}

// the leader retired after deleted the key
select {
case err2 := <-e2.ErrorNotify():
c.Fatalf("delete the leader key should not get an error, %v", err2)
case leader := <-e2.LeaderNotify():
c.Assert(leader, IsNil)
case <-time.After(timeout):
c.Fatal("leader retire timeout")
}
}
Loading