Skip to content

Commit

Permalink
Remove syncing the v2 store TTLs
Browse files Browse the repository at this point in the history
Signed-off-by: Marek Siarkowicz <[email protected]>
  • Loading branch information
serathius committed Nov 23, 2023
1 parent f04478f commit c72ff1e
Show file tree
Hide file tree
Showing 2 changed files with 0 additions and 191 deletions.
40 changes: 0 additions & 40 deletions server/etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -744,21 +744,6 @@ func (s *EtcdServer) run() {
// asynchronously accept toApply packets, dispatch progress in-order
sched := schedule.NewFIFOScheduler(lg)

var (
smu sync.RWMutex
syncC <-chan time.Time
)
setSyncC := func(ch <-chan time.Time) {
smu.Lock()
syncC = ch
smu.Unlock()
}
getSyncC := func() (ch <-chan time.Time) {
smu.RLock()
ch = syncC
smu.RUnlock()
return
}
rh := &raftReadyHandler{
getLead: func() (lead uint64) { return s.getLead() },
updateLead: func(lead uint64) { s.setLead(lead) },
Expand All @@ -770,15 +755,13 @@ func (s *EtcdServer) run() {
if s.compactor != nil {
s.compactor.Pause()
}
setSyncC(nil)
} else {
if newLeader {
t := time.Now()
s.leadTimeMu.Lock()
s.leadElectedTime = t
s.leadTimeMu.Unlock()
}
setSyncC(s.SyncTicker.C)
if s.compactor != nil {
s.compactor.Resume()
}
Expand Down Expand Up @@ -845,10 +828,6 @@ func (s *EtcdServer) run() {
lg.Warn("server error", zap.Error(err))
lg.Warn("data-dir used by this member must be removed")
return
case <-getSyncC():
if s.v2store.HasTTLKeys() {
s.sync(s.Cfg.ReqTimeout())
}
case <-s.stop:
return
}
Expand Down Expand Up @@ -1689,25 +1668,6 @@ func (s *EtcdServer) configure(ctx context.Context, cc raftpb.ConfChange) ([]*me
}
}

// sync proposes a SYNC request and is non-blocking.
// This makes no guarantee that the request will be proposed or performed.
// The request will be canceled after the given timeout.
func (s *EtcdServer) sync(timeout time.Duration) {
req := pb.Request{
Method: "SYNC",
ID: s.reqIDGen.Next(),
Time: time.Now().UnixNano(),
}
data := pbutil.MustMarshal(&req)
// There is no promise that node has leader when do SYNC request,
// so it uses goroutine to propose.
ctx, cancel := context.WithTimeout(s.ctx, timeout)
s.GoAttach(func() {
s.r.Propose(ctx, data)
cancel()
})
}

// publishV3 registers server information into the cluster using v3 request. The
// information is the JSON representation of this server's member struct, updated
// with the static clientURLs of the server.
Expand Down
151 changes: 0 additions & 151 deletions server/etcdserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -478,143 +478,6 @@ func TestApplyMultiConfChangeShouldStop(t *testing.T) {
}
}

// TestSync tests sync 1. is nonblocking 2. proposes SYNC request.
func TestSync(t *testing.T) {
n := newNodeRecorder()
ctx, cancel := context.WithCancel(context.Background())
srv := &EtcdServer{
lgMu: new(sync.RWMutex),
lg: zaptest.NewLogger(t),
r: *newRaftNode(raftNodeConfig{lg: zaptest.NewLogger(t), Node: n}),
reqIDGen: idutil.NewGenerator(0, time.Time{}),
ctx: ctx,
cancel: cancel,
}
srv.applyV2 = &applierV2store{store: srv.v2store, cluster: srv.cluster}

// check that sync is non-blocking
done := make(chan struct{}, 1)
go func() {
srv.sync(10 * time.Second)
done <- struct{}{}
}()

select {
case <-done:
case <-time.After(time.Second):
t.Fatal("sync should be non-blocking but did not return after 1s!")
}

action, _ := n.Wait(1)
if len(action) != 1 {
t.Fatalf("len(action) = %d, want 1", len(action))
}
if action[0].Name != "Propose" {
t.Fatalf("action = %s, want Propose", action[0].Name)
}
data := action[0].Params[0].([]byte)
var r pb.Request
if err := r.Unmarshal(data); err != nil {
t.Fatalf("unmarshal request error: %v", err)
}
if r.Method != "SYNC" {
t.Errorf("method = %s, want SYNC", r.Method)
}
}

// TestSyncTimeout tests the case that sync 1. is non-blocking 2. cancel request
// after timeout
func TestSyncTimeout(t *testing.T) {
n := newProposalBlockerRecorder()
ctx, cancel := context.WithCancel(context.Background())
srv := &EtcdServer{
lgMu: new(sync.RWMutex),
lg: zaptest.NewLogger(t),
r: *newRaftNode(raftNodeConfig{lg: zaptest.NewLogger(t), Node: n}),
reqIDGen: idutil.NewGenerator(0, time.Time{}),
ctx: ctx,
cancel: cancel,
}
srv.applyV2 = &applierV2store{store: srv.v2store, cluster: srv.cluster}

// check that sync is non-blocking
done := make(chan struct{}, 1)
go func() {
srv.sync(0)
done <- struct{}{}
}()

select {
case <-done:
case <-time.After(time.Second):
t.Fatal("sync should be non-blocking but did not return after 1s!")
}

w := []testutil.Action{{Name: "Propose blocked"}}
if g, _ := n.Wait(1); !reflect.DeepEqual(g, w) {
t.Errorf("action = %v, want %v", g, w)
}
}

// TODO: TestNoSyncWhenNoLeader

// TestSyncTrigger tests that the server proposes a SYNC request when its sync timer ticks
func TestSyncTrigger(t *testing.T) {
n := newReadyNode()
st := make(chan time.Time, 1)
tk := &time.Ticker{C: st}
r := newRaftNode(raftNodeConfig{
lg: zaptest.NewLogger(t),
Node: n,
raftStorage: raft.NewMemoryStorage(),
transport: newNopTransporter(),
storage: mockstorage.NewStorageRecorder(""),
})

srv := &EtcdServer{
lgMu: new(sync.RWMutex),
lg: zaptest.NewLogger(t),
Cfg: config.ServerConfig{Logger: zaptest.NewLogger(t), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
r: *r,
v2store: mockstore.NewNop(),
SyncTicker: tk,
reqIDGen: idutil.NewGenerator(0, time.Time{}),
}

// trigger the server to become a leader and accept sync requests
go func() {
srv.start()
n.readyc <- raft.Ready{
SoftState: &raft.SoftState{
RaftState: raft.StateLeader,
},
}
// trigger a sync request
st <- time.Time{}
}()

action, _ := n.Wait(1)
go srv.Stop()

if len(action) != 1 {
t.Fatalf("len(action) = %d, want 1", len(action))
}
if action[0].Name != "Propose" {
t.Fatalf("action = %s, want Propose", action[0].Name)
}
data := action[0].Params[0].([]byte)
var req pb.Request
if err := req.Unmarshal(data); err != nil {
t.Fatalf("error unmarshalling data: %v", err)
}
if req.Method != "SYNC" {
t.Fatalf("unexpected proposed request: %#v", req.Method)
}

// wait on stop message
<-n.Chan()
}

// TestSnapshot should snapshot the store and cut the persistent
func TestSnapshot(t *testing.T) {
revertFunc := verify.DisableVerifications()
Expand Down Expand Up @@ -1302,20 +1165,6 @@ func (n *nodeRecorder) ForgetLeader(ctx context.Context) error {
return nil
}

type nodeProposalBlockerRecorder struct {
nodeRecorder
}

func newProposalBlockerRecorder() *nodeProposalBlockerRecorder {
return &nodeProposalBlockerRecorder{*newNodeRecorderStream()}
}

func (n *nodeProposalBlockerRecorder) Propose(ctx context.Context, data []byte) error {
<-ctx.Done()
n.Record(testutil.Action{Name: "Propose blocked"})
return nil
}

// readyNode is a nodeRecorder with a user-writeable ready channel
type readyNode struct {
nodeRecorder
Expand Down

0 comments on commit c72ff1e

Please sign in to comment.