diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 96098793626..9b8233c019a 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -744,21 +744,6 @@ func (s *EtcdServer) run() { // asynchronously accept toApply packets, dispatch progress in-order sched := schedule.NewFIFOScheduler(lg) - var ( - smu sync.RWMutex - syncC <-chan time.Time - ) - setSyncC := func(ch <-chan time.Time) { - smu.Lock() - syncC = ch - smu.Unlock() - } - getSyncC := func() (ch <-chan time.Time) { - smu.RLock() - ch = syncC - smu.RUnlock() - return - } rh := &raftReadyHandler{ getLead: func() (lead uint64) { return s.getLead() }, updateLead: func(lead uint64) { s.setLead(lead) }, @@ -770,7 +755,6 @@ func (s *EtcdServer) run() { if s.compactor != nil { s.compactor.Pause() } - setSyncC(nil) } else { if newLeader { t := time.Now() @@ -778,7 +762,6 @@ func (s *EtcdServer) run() { s.leadElectedTime = t s.leadTimeMu.Unlock() } - setSyncC(s.SyncTicker.C) if s.compactor != nil { s.compactor.Resume() } @@ -845,10 +828,6 @@ func (s *EtcdServer) run() { lg.Warn("server error", zap.Error(err)) lg.Warn("data-dir used by this member must be removed") return - case <-getSyncC(): - if s.v2store.HasTTLKeys() { - s.sync(s.Cfg.ReqTimeout()) - } case <-s.stop: return } @@ -1689,25 +1668,6 @@ func (s *EtcdServer) configure(ctx context.Context, cc raftpb.ConfChange) ([]*me } } -// sync proposes a SYNC request and is non-blocking. -// This makes no guarantee that the request will be proposed or performed. -// The request will be canceled after the given timeout. -func (s *EtcdServer) sync(timeout time.Duration) { - req := pb.Request{ - Method: "SYNC", - ID: s.reqIDGen.Next(), - Time: time.Now().UnixNano(), - } - data := pbutil.MustMarshal(&req) - // There is no promise that node has leader when do SYNC request, - // so it uses goroutine to propose. - ctx, cancel := context.WithTimeout(s.ctx, timeout) - s.GoAttach(func() { - s.r.Propose(ctx, data) - cancel() - }) -} - // publishV3 registers server information into the cluster using v3 request. The // information is the JSON representation of this server's member struct, updated // with the static clientURLs of the server. diff --git a/server/etcdserver/server_test.go b/server/etcdserver/server_test.go index b967af0da31..84893e5e8e2 100644 --- a/server/etcdserver/server_test.go +++ b/server/etcdserver/server_test.go @@ -478,143 +478,6 @@ func TestApplyMultiConfChangeShouldStop(t *testing.T) { } } -// TestSync tests sync 1. is nonblocking 2. proposes SYNC request. -func TestSync(t *testing.T) { - n := newNodeRecorder() - ctx, cancel := context.WithCancel(context.Background()) - srv := &EtcdServer{ - lgMu: new(sync.RWMutex), - lg: zaptest.NewLogger(t), - r: *newRaftNode(raftNodeConfig{lg: zaptest.NewLogger(t), Node: n}), - reqIDGen: idutil.NewGenerator(0, time.Time{}), - ctx: ctx, - cancel: cancel, - } - srv.applyV2 = &applierV2store{store: srv.v2store, cluster: srv.cluster} - - // check that sync is non-blocking - done := make(chan struct{}, 1) - go func() { - srv.sync(10 * time.Second) - done <- struct{}{} - }() - - select { - case <-done: - case <-time.After(time.Second): - t.Fatal("sync should be non-blocking but did not return after 1s!") - } - - action, _ := n.Wait(1) - if len(action) != 1 { - t.Fatalf("len(action) = %d, want 1", len(action)) - } - if action[0].Name != "Propose" { - t.Fatalf("action = %s, want Propose", action[0].Name) - } - data := action[0].Params[0].([]byte) - var r pb.Request - if err := r.Unmarshal(data); err != nil { - t.Fatalf("unmarshal request error: %v", err) - } - if r.Method != "SYNC" { - t.Errorf("method = %s, want SYNC", r.Method) - } -} - -// TestSyncTimeout tests the case that sync 1. is non-blocking 2. cancel request -// after timeout -func TestSyncTimeout(t *testing.T) { - n := newProposalBlockerRecorder() - ctx, cancel := context.WithCancel(context.Background()) - srv := &EtcdServer{ - lgMu: new(sync.RWMutex), - lg: zaptest.NewLogger(t), - r: *newRaftNode(raftNodeConfig{lg: zaptest.NewLogger(t), Node: n}), - reqIDGen: idutil.NewGenerator(0, time.Time{}), - ctx: ctx, - cancel: cancel, - } - srv.applyV2 = &applierV2store{store: srv.v2store, cluster: srv.cluster} - - // check that sync is non-blocking - done := make(chan struct{}, 1) - go func() { - srv.sync(0) - done <- struct{}{} - }() - - select { - case <-done: - case <-time.After(time.Second): - t.Fatal("sync should be non-blocking but did not return after 1s!") - } - - w := []testutil.Action{{Name: "Propose blocked"}} - if g, _ := n.Wait(1); !reflect.DeepEqual(g, w) { - t.Errorf("action = %v, want %v", g, w) - } -} - -// TODO: TestNoSyncWhenNoLeader - -// TestSyncTrigger tests that the server proposes a SYNC request when its sync timer ticks -func TestSyncTrigger(t *testing.T) { - n := newReadyNode() - st := make(chan time.Time, 1) - tk := &time.Ticker{C: st} - r := newRaftNode(raftNodeConfig{ - lg: zaptest.NewLogger(t), - Node: n, - raftStorage: raft.NewMemoryStorage(), - transport: newNopTransporter(), - storage: mockstorage.NewStorageRecorder(""), - }) - - srv := &EtcdServer{ - lgMu: new(sync.RWMutex), - lg: zaptest.NewLogger(t), - Cfg: config.ServerConfig{Logger: zaptest.NewLogger(t), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries}, - r: *r, - v2store: mockstore.NewNop(), - SyncTicker: tk, - reqIDGen: idutil.NewGenerator(0, time.Time{}), - } - - // trigger the server to become a leader and accept sync requests - go func() { - srv.start() - n.readyc <- raft.Ready{ - SoftState: &raft.SoftState{ - RaftState: raft.StateLeader, - }, - } - // trigger a sync request - st <- time.Time{} - }() - - action, _ := n.Wait(1) - go srv.Stop() - - if len(action) != 1 { - t.Fatalf("len(action) = %d, want 1", len(action)) - } - if action[0].Name != "Propose" { - t.Fatalf("action = %s, want Propose", action[0].Name) - } - data := action[0].Params[0].([]byte) - var req pb.Request - if err := req.Unmarshal(data); err != nil { - t.Fatalf("error unmarshalling data: %v", err) - } - if req.Method != "SYNC" { - t.Fatalf("unexpected proposed request: %#v", req.Method) - } - - // wait on stop message - <-n.Chan() -} - // TestSnapshot should snapshot the store and cut the persistent func TestSnapshot(t *testing.T) { revertFunc := verify.DisableVerifications() @@ -1302,20 +1165,6 @@ func (n *nodeRecorder) ForgetLeader(ctx context.Context) error { return nil } -type nodeProposalBlockerRecorder struct { - nodeRecorder -} - -func newProposalBlockerRecorder() *nodeProposalBlockerRecorder { - return &nodeProposalBlockerRecorder{*newNodeRecorderStream()} -} - -func (n *nodeProposalBlockerRecorder) Propose(ctx context.Context, data []byte) error { - <-ctx.Done() - n.Record(testutil.Action{Name: "Propose blocked"}) - return nil -} - // readyNode is a nodeRecorder with a user-writeable ready channel type readyNode struct { nodeRecorder