From 636baa62f993cb85f8223c02467a04ade265451f Mon Sep 17 00:00:00 2001 From: gmhdbjd Date: Wed, 18 Sep 2024 20:51:15 +0800 Subject: [PATCH 1/5] dm: fix leader did not retire after delete the key --- dm/pkg/election/election.go | 16 ++++-- dm/pkg/election/election_test.go | 88 ++++++++++++++++++++++++++++++++ 2 files changed, 100 insertions(+), 4 deletions(-) mode change 100644 => 100755 dm/pkg/election/election.go diff --git a/dm/pkg/election/election.go b/dm/pkg/election/election.go old mode 100644 new mode 100755 index 1106ea78726..482905c768c --- a/dm/pkg/election/election.go +++ b/dm/pkg/election/election.go @@ -264,6 +264,12 @@ func (e *Election) campaignLoop(ctx context.Context, session *concurrency.Sessio e.l.Debug("begin to campaign", zap.Stringer("current member", e.info)) err2 := elec.Campaign(ctx2, e.infoStr) + failpoint.Inject("mockCapaignSucceedButReturnErr", func(val failpoint.Value) { + if err2 == nil { + err2 = errors.New("mock campaign succeed but return error") + time.Sleep(time.Second * time.Duration(val.(int))) + } + }) if err2 != nil { // because inner commit may return undetermined error, we try to delete the election key manually deleted, err3 := e.ClearSessionIfNeeded(ctx, e.ID()) @@ -282,6 +288,7 @@ func (e *Election) campaignLoop(ctx context.Context, session *concurrency.Sessio var ( leaderKey string leaderInfo *CampaignerInfo + revision int64 ) eleObserveCh := elec.Observe(ctx2) @@ -300,6 +307,7 @@ func (e *Election) campaignLoop(ctx context.Context, session *concurrency.Sessio e.l.Info("get response from election observe", zap.String("key", string(resp.Kvs[0].Key)), zap.String("value", string(resp.Kvs[0].Value))) leaderKey = string(resp.Kvs[0].Key) leaderInfo, err = getCampaignerInfo(resp.Kvs[0].Value) + revision = resp.Header.Revision if err != nil { // this should never happened e.l.Error("fail to get leader information", zap.String("value", string(resp.Kvs[0].Value)), zap.Error(err)) @@ -330,7 +338,7 @@ func (e *Election) campaignLoop(ctx context.Context, session *concurrency.Sessio e.l.Info("become leader", zap.Stringer("current member", e.info)) e.notifyLeader(ctx, leaderInfo) // become the leader now - e.watchLeader(ctx, session, leaderKey, elec) + e.watchLeader(ctx, session, leaderKey, elec, revision) e.l.Info("retire from leader", zap.Stringer("current member", e.info)) e.notifyLeader(ctx, nil) // need to re-campaign oldLeaderID = "" @@ -359,8 +367,8 @@ func (e *Election) notifyLeader(ctx context.Context, leaderInfo *CampaignerInfo) } } -func (e *Election) watchLeader(ctx context.Context, session *concurrency.Session, key string, elec *concurrency.Election) { - e.l.Debug("watch leader key", zap.String("key", key)) +func (e *Election) watchLeader(ctx context.Context, session *concurrency.Session, key string, elec *concurrency.Election, revision int64) { + e.l.Debug("watch leader key", zap.String("key", key), zap.Int64("revision", revision), zap.Stringer("current member", e.info)) e.campaignMu.Lock() e.resignCh = make(chan struct{}) @@ -374,7 +382,7 @@ func (e *Election) watchLeader(ctx context.Context, session *concurrency.Session wCtx, cancel := context.WithCancel(ctx) defer cancel() - wch := e.cli.Watch(wCtx, key) + wch := e.cli.Watch(wCtx, key, clientv3.WithRev(revision+1)) for { if e.evictLeader.Load() { diff --git a/dm/pkg/election/election_test.go b/dm/pkg/election/election_test.go index 7280795faa7..77157276ca9 100644 --- a/dm/pkg/election/election_test.go +++ b/dm/pkg/election/election_test.go @@ -412,3 +412,91 @@ func (t *testElectionSuite) TestElectionDeleteKey(c *C) { c.Assert(err, IsNil) wg.Wait() } + +func (t *testElectionSuite) TestElectionSucceedButReturnError(c *C) { + var ( + timeout = 5 * time.Second + sessionTTL = 60 + key = "unit-test/election-succeed-but-return-error" + ID1 = "member1" + ID2 = "member2" + addr1 = "127.0.0.1:1" + addr2 = "127.0.0.1:2" + ) + cli, err := etcdutil.CreateClient([]string{t.endPoint}, nil) + c.Assert(err, IsNil) + defer cli.Close() + ctx0, cancel0 := context.WithCancel(context.Background()) + defer cancel0() + _, err = cli.Delete(ctx0, key, clientv3.WithPrefix()) + c.Assert(err, IsNil) + + ctx1, cancel1 := context.WithCancel(context.Background()) + defer cancel1() + + e1, err := NewElection(ctx1, cli, sessionTTL, key, ID1, addr1, t.notifyBlockTime) + c.Assert(err, IsNil) + defer e1.Close() + + // e1 should become the leader + select { + case leader := <-e1.LeaderNotify(): + c.Assert(leader.ID, Equals, ID1) + case <-time.After(timeout): + c.Fatal("leader campaign timeout") + } + c.Assert(e1.IsLeader(), IsTrue) + _, leaderID, leaderAddr, err := e1.LeaderInfo(ctx1) + c.Assert(err, IsNil) + c.Assert(leaderID, Equals, e1.ID()) + c.Assert(leaderAddr, Equals, addr1) + + c.Assert(failpoint.Enable("github.com/pingcap/tiflow/dm/pkg/election/mockCapaignSucceedButReturnErr", `return(1)`), IsNil) + defer failpoint.Disable("github.com/pingcap/tiflow/dm/pkg/election/mockCapaignSucceedButReturnErr") + + // start e2 + ctx2, cancel2 := context.WithCancel(context.Background()) + defer cancel2() + e2, err := NewElection(ctx2, cli, sessionTTL, key, ID2, addr2, t.notifyBlockTime) + c.Assert(err, IsNil) + defer e2.Close() + select { + case leader := <-e2.leaderCh: + c.Assert(leader.ID, Equals, ID1) + case <-time.After(timeout): + c.Fatal("leader campaign timeout") + } + // but the leader should still be e1 + _, leaderID, leaderAddr, err = e2.LeaderInfo(ctx2) + c.Assert(err, IsNil) + c.Assert(leaderID, Equals, e1.ID()) + c.Assert(leaderAddr, Equals, addr1) + c.Assert(e2.IsLeader(), IsFalse) + + e1.Close() // stop the campaign for e1 + c.Assert(e1.IsLeader(), IsFalse) + + ctx3, cancel3 := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel3() + deleted, err := e2.ClearSessionIfNeeded(ctx3, ID1) + c.Assert(err, IsNil) + c.Assert(deleted, IsFalse) + + // e2 should become the leader + select { + case leader := <-e2.LeaderNotify(): + c.Assert(leader.ID, Equals, ID2) + case <-time.After(timeout): + c.Fatal("leader campaign timeout") + } + + // the leader retired after deleted the key + select { + case err2 := <-e2.ErrorNotify(): + c.Fatalf("delete the leader key should not get an error, %v", err2) + case leader := <-e2.LeaderNotify(): + c.Assert(leader, IsNil) + case <-time.After(timeout): + c.Fatal("leader retire timeout") + } +} From c964fbe3e22b5c732b7835e806ad27df17272c8b Mon Sep 17 00:00:00 2001 From: gmhdbjd Date: Wed, 18 Sep 2024 21:30:38 +0800 Subject: [PATCH 2/5] fix --- dm/pkg/election/election.go | 4 ++-- dm/pkg/election/election_test.go | 3 ++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/dm/pkg/election/election.go b/dm/pkg/election/election.go index 482905c768c..c8b3b7607b6 100755 --- a/dm/pkg/election/election.go +++ b/dm/pkg/election/election.go @@ -264,10 +264,10 @@ func (e *Election) campaignLoop(ctx context.Context, session *concurrency.Sessio e.l.Debug("begin to campaign", zap.Stringer("current member", e.info)) err2 := elec.Campaign(ctx2, e.infoStr) - failpoint.Inject("mockCapaignSucceedButReturnErr", func(val failpoint.Value) { + failpoint.Inject("mockCapaignSucceedButReturnErr", func() { if err2 == nil { err2 = errors.New("mock campaign succeed but return error") - time.Sleep(time.Second * time.Duration(val.(int))) + time.Sleep(time.Second) } }) if err2 != nil { diff --git a/dm/pkg/election/election_test.go b/dm/pkg/election/election_test.go index 77157276ca9..5dbd664a51b 100644 --- a/dm/pkg/election/election_test.go +++ b/dm/pkg/election/election_test.go @@ -451,7 +451,8 @@ func (t *testElectionSuite) TestElectionSucceedButReturnError(c *C) { c.Assert(leaderID, Equals, e1.ID()) c.Assert(leaderAddr, Equals, addr1) - c.Assert(failpoint.Enable("github.com/pingcap/tiflow/dm/pkg/election/mockCapaignSucceedButReturnErr", `return(1)`), IsNil) + c.Assert(failpoint.Enable("github.com/pingcap/tiflow/dm/pkg/election/mockCapaignSucceedButReturnErr", `return()`), IsNil) + //nolint:errcheck defer failpoint.Disable("github.com/pingcap/tiflow/dm/pkg/election/mockCapaignSucceedButReturnErr") // start e2 From 11167c2a3c9ef7e50f2b2d8991bdfff9d44f0e08 Mon Sep 17 00:00:00 2001 From: gmhdbjd Date: Thu, 19 Sep 2024 14:28:43 +0800 Subject: [PATCH 3/5] update --- dm/pkg/election/election.go | 0 1 file changed, 0 insertions(+), 0 deletions(-) mode change 100755 => 100644 dm/pkg/election/election.go diff --git a/dm/pkg/election/election.go b/dm/pkg/election/election.go old mode 100755 new mode 100644 From 6d63abb9672cadf2fcdd66365661e8bdbbd5cd95 Mon Sep 17 00:00:00 2001 From: gmhdbjd Date: Thu, 19 Sep 2024 23:25:36 +0800 Subject: [PATCH 4/5] address comment --- dm/pkg/election/election_test.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/dm/pkg/election/election_test.go b/dm/pkg/election/election_test.go index 5dbd664a51b..483096c1303 100644 --- a/dm/pkg/election/election_test.go +++ b/dm/pkg/election/election_test.go @@ -426,10 +426,6 @@ func (t *testElectionSuite) TestElectionSucceedButReturnError(c *C) { cli, err := etcdutil.CreateClient([]string{t.endPoint}, nil) c.Assert(err, IsNil) defer cli.Close() - ctx0, cancel0 := context.WithCancel(context.Background()) - defer cancel0() - _, err = cli.Delete(ctx0, key, clientv3.WithPrefix()) - c.Assert(err, IsNil) ctx1, cancel1 := context.WithCancel(context.Background()) defer cancel1() From 88d0d0cdc57b43caff51a508b1b956247c4afcc9 Mon Sep 17 00:00:00 2001 From: gmhdbjd Date: Mon, 23 Sep 2024 15:23:18 +0800 Subject: [PATCH 5/5] address comment --- dm/pkg/election/election_test.go | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/dm/pkg/election/election_test.go b/dm/pkg/election/election_test.go index 483096c1303..d828829d5f4 100644 --- a/dm/pkg/election/election_test.go +++ b/dm/pkg/election/election_test.go @@ -447,10 +447,6 @@ func (t *testElectionSuite) TestElectionSucceedButReturnError(c *C) { c.Assert(leaderID, Equals, e1.ID()) c.Assert(leaderAddr, Equals, addr1) - c.Assert(failpoint.Enable("github.com/pingcap/tiflow/dm/pkg/election/mockCapaignSucceedButReturnErr", `return()`), IsNil) - //nolint:errcheck - defer failpoint.Disable("github.com/pingcap/tiflow/dm/pkg/election/mockCapaignSucceedButReturnErr") - // start e2 ctx2, cancel2 := context.WithCancel(context.Background()) defer cancel2() @@ -470,15 +466,13 @@ func (t *testElectionSuite) TestElectionSucceedButReturnError(c *C) { c.Assert(leaderAddr, Equals, addr1) c.Assert(e2.IsLeader(), IsFalse) + c.Assert(failpoint.Enable("github.com/pingcap/tiflow/dm/pkg/election/mockCapaignSucceedButReturnErr", `return()`), IsNil) + //nolint:errcheck + defer failpoint.Disable("github.com/pingcap/tiflow/dm/pkg/election/mockCapaignSucceedButReturnErr") + e1.Close() // stop the campaign for e1 c.Assert(e1.IsLeader(), IsFalse) - ctx3, cancel3 := context.WithTimeout(context.Background(), 60*time.Second) - defer cancel3() - deleted, err := e2.ClearSessionIfNeeded(ctx3, ID1) - c.Assert(err, IsNil) - c.Assert(deleted, IsFalse) - // e2 should become the leader select { case leader := <-e2.LeaderNotify():