diff --git a/dm/pkg/election/election.go b/dm/pkg/election/election.go index 1106ea78726..c8b3b7607b6 100644 --- a/dm/pkg/election/election.go +++ b/dm/pkg/election/election.go @@ -264,6 +264,12 @@ func (e *Election) campaignLoop(ctx context.Context, session *concurrency.Sessio e.l.Debug("begin to campaign", zap.Stringer("current member", e.info)) err2 := elec.Campaign(ctx2, e.infoStr) + failpoint.Inject("mockCapaignSucceedButReturnErr", func() { + if err2 == nil { + err2 = errors.New("mock campaign succeed but return error") + time.Sleep(time.Second) + } + }) if err2 != nil { // because inner commit may return undetermined error, we try to delete the election key manually deleted, err3 := e.ClearSessionIfNeeded(ctx, e.ID()) @@ -282,6 +288,7 @@ func (e *Election) campaignLoop(ctx context.Context, session *concurrency.Sessio var ( leaderKey string leaderInfo *CampaignerInfo + revision int64 ) eleObserveCh := elec.Observe(ctx2) @@ -300,6 +307,7 @@ func (e *Election) campaignLoop(ctx context.Context, session *concurrency.Sessio e.l.Info("get response from election observe", zap.String("key", string(resp.Kvs[0].Key)), zap.String("value", string(resp.Kvs[0].Value))) leaderKey = string(resp.Kvs[0].Key) leaderInfo, err = getCampaignerInfo(resp.Kvs[0].Value) + revision = resp.Header.Revision if err != nil { // this should never happened e.l.Error("fail to get leader information", zap.String("value", string(resp.Kvs[0].Value)), zap.Error(err)) @@ -330,7 +338,7 @@ func (e *Election) campaignLoop(ctx context.Context, session *concurrency.Sessio e.l.Info("become leader", zap.Stringer("current member", e.info)) e.notifyLeader(ctx, leaderInfo) // become the leader now - e.watchLeader(ctx, session, leaderKey, elec) + e.watchLeader(ctx, session, leaderKey, elec, revision) e.l.Info("retire from leader", zap.Stringer("current member", e.info)) e.notifyLeader(ctx, nil) // need to re-campaign oldLeaderID = "" @@ -359,8 +367,8 @@ func (e *Election) notifyLeader(ctx context.Context, leaderInfo *CampaignerInfo) } } -func (e *Election) watchLeader(ctx context.Context, session *concurrency.Session, key string, elec *concurrency.Election) { - e.l.Debug("watch leader key", zap.String("key", key)) +func (e *Election) watchLeader(ctx context.Context, session *concurrency.Session, key string, elec *concurrency.Election, revision int64) { + e.l.Debug("watch leader key", zap.String("key", key), zap.Int64("revision", revision), zap.Stringer("current member", e.info)) e.campaignMu.Lock() e.resignCh = make(chan struct{}) @@ -374,7 +382,7 @@ func (e *Election) watchLeader(ctx context.Context, session *concurrency.Session wCtx, cancel := context.WithCancel(ctx) defer cancel() - wch := e.cli.Watch(wCtx, key) + wch := e.cli.Watch(wCtx, key, clientv3.WithRev(revision+1)) for { if e.evictLeader.Load() { diff --git a/dm/pkg/election/election_test.go b/dm/pkg/election/election_test.go index 7280795faa7..d828829d5f4 100644 --- a/dm/pkg/election/election_test.go +++ b/dm/pkg/election/election_test.go @@ -412,3 +412,82 @@ func (t *testElectionSuite) TestElectionDeleteKey(c *C) { c.Assert(err, IsNil) wg.Wait() } + +func (t *testElectionSuite) TestElectionSucceedButReturnError(c *C) { + var ( + timeout = 5 * time.Second + sessionTTL = 60 + key = "unit-test/election-succeed-but-return-error" + ID1 = "member1" + ID2 = "member2" + addr1 = "127.0.0.1:1" + addr2 = "127.0.0.1:2" + ) + cli, err := etcdutil.CreateClient([]string{t.endPoint}, nil) + c.Assert(err, IsNil) + defer cli.Close() + + ctx1, cancel1 := context.WithCancel(context.Background()) + defer cancel1() + + e1, err := NewElection(ctx1, cli, sessionTTL, key, ID1, addr1, t.notifyBlockTime) + c.Assert(err, IsNil) + defer e1.Close() + + // e1 should become the leader + select { + case leader := <-e1.LeaderNotify(): + c.Assert(leader.ID, Equals, ID1) + case <-time.After(timeout): + c.Fatal("leader campaign timeout") + } + c.Assert(e1.IsLeader(), IsTrue) + _, leaderID, leaderAddr, err := e1.LeaderInfo(ctx1) + c.Assert(err, IsNil) + c.Assert(leaderID, Equals, e1.ID()) + c.Assert(leaderAddr, Equals, addr1) + + // start e2 + ctx2, cancel2 := context.WithCancel(context.Background()) + defer cancel2() + e2, err := NewElection(ctx2, cli, sessionTTL, key, ID2, addr2, t.notifyBlockTime) + c.Assert(err, IsNil) + defer e2.Close() + select { + case leader := <-e2.leaderCh: + c.Assert(leader.ID, Equals, ID1) + case <-time.After(timeout): + c.Fatal("leader campaign timeout") + } + // but the leader should still be e1 + _, leaderID, leaderAddr, err = e2.LeaderInfo(ctx2) + c.Assert(err, IsNil) + c.Assert(leaderID, Equals, e1.ID()) + c.Assert(leaderAddr, Equals, addr1) + c.Assert(e2.IsLeader(), IsFalse) + + c.Assert(failpoint.Enable("github.com/pingcap/tiflow/dm/pkg/election/mockCapaignSucceedButReturnErr", `return()`), IsNil) + //nolint:errcheck + defer failpoint.Disable("github.com/pingcap/tiflow/dm/pkg/election/mockCapaignSucceedButReturnErr") + + e1.Close() // stop the campaign for e1 + c.Assert(e1.IsLeader(), IsFalse) + + // e2 should become the leader + select { + case leader := <-e2.LeaderNotify(): + c.Assert(leader.ID, Equals, ID2) + case <-time.After(timeout): + c.Fatal("leader campaign timeout") + } + + // the leader retired after deleted the key + select { + case err2 := <-e2.ErrorNotify(): + c.Fatalf("delete the leader key should not get an error, %v", err2) + case leader := <-e2.LeaderNotify(): + c.Assert(leader, IsNil) + case <-time.After(timeout): + c.Fatal("leader retire timeout") + } +}