Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

etcdserver: separate maybeCompactRaftLog function to compact raft log independently #18635

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
67 changes: 67 additions & 0 deletions server/etcdserver/memory_storage_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright 2024 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package etcdserver

import (
"testing"

"github.com/stretchr/testify/assert"

"go.etcd.io/raft/v3"
"go.etcd.io/raft/v3/raftpb"
)

// TestMemoryStorageCompactInclusive tests that compacting is inclusive,
// meaning the first index after compaction is larger by one than compacted index.
func TestMemoryStorageCompactInclusive(t *testing.T) {
// entries: [ {Index: 0} ]
raftStorage := raft.NewMemoryStorage()

firstIndex, err := raftStorage.FirstIndex()
assert.NoError(t, err)
assert.Equal(t, uint64(1), firstIndex)

// after appending, entries should be:
// [ {Index: 0}, {Index: 1}, {Index: 2}, {Index: 3}, {Index: 4}, {Index: 5} ]
appliedIndex := uint64(1)
for ; appliedIndex <= 5; appliedIndex++ {
e := raftpb.Entry{
Type: raftpb.EntryNormal,
Term: 1,
Index: appliedIndex,
}
err := raftStorage.Append([]raftpb.Entry{e})
assert.NoError(t, err)
}

firstIndex, err = raftStorage.FirstIndex()
assert.NoError(t, err)
assert.Equal(t, uint64(1), firstIndex)

lastIndex, err := raftStorage.LastIndex()
assert.NoError(t, err)
assert.Equal(t, uint64(5), lastIndex)

// after compacting, entries should be:
// [ {Index: 3}, {Index: 4}, {Index: 5} ]
compacti := uint64(3)
err = raftStorage.Compact(compacti)
assert.NoError(t, err)


firstIndex, err = raftStorage.FirstIndex()
assert.NoError(t, err)
assert.Equal(t, compacti+1, firstIndex)
}
35 changes: 28 additions & 7 deletions server/etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -745,6 +745,7 @@ type etcdProgress struct {
snapi uint64
appliedt uint64
appliedi uint64
compacti uint64
}

// raftReadyHandler contains a set of EtcdServer operations to be called by raftNode,
Expand All @@ -764,6 +765,10 @@ func (s *EtcdServer) run() {
if err != nil {
lg.Panic("failed to get snapshot from Raft storage", zap.Error(err))
}
firstRaftIndex, err := s.r.raftStorage.FirstIndex()
if err != nil {
lg.Panic("failed to get first index from Raft storage", zap.Error(err))
}

// asynchronously accept toApply packets, dispatch progress in-order
sched := schedule.NewFIFOScheduler(lg)
Expand Down Expand Up @@ -813,6 +818,10 @@ func (s *EtcdServer) run() {
snapi: sn.Metadata.Index,
appliedt: sn.Metadata.Term,
appliedi: sn.Metadata.Index,
// Compaction is inclusive, meaning compact index should be lower by one
// than the first index after compaction.
// This is validated by TestMemoryStorageCompaction.
compacti: firstRaftIndex - 1,
}

defer func() {
Expand Down Expand Up @@ -980,6 +989,7 @@ func (s *EtcdServer) applyAll(ep *etcdProgress, apply *toApply) {
<-apply.notifyc

s.triggerSnapshot(ep)
s.maybeCompactRaftLog(ep)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you really want to get this small merged firstly, then please ensure it's as independent as possible. Currently the s.snapshot performs both snapshot and compaction operations. It makes sense to extract the compaction operation as an independent function/method, but let's call the method inside s.triggerSnapshot,

func (s *EtcdServer) triggerSnapshot(ep *etcdProgress) {
	if !s.shouldSnapshot(ep) {
		return
	}
	lg := s.Logger()
	lg.Info(
		"triggering snapshot",
		zap.String("local-member-id", s.MemberID().String()),
		zap.Uint64("local-member-applied-index", ep.appliedi),
		zap.Uint64("local-member-snapshot-index", ep.snapi),
		zap.Uint64("local-member-snapshot-count", s.Cfg.SnapshotCount),
		zap.Bool("snapshot-forced", s.forceSnapshot),
	)
	s.forceSnapshot = false

	s.snapshot(ep.appliedi, ep.confState)
	ep.snapi = ep.appliedi
	s.compact(xxxxx)   // call the new method here, so we still do it each time after creating a snapshot.

}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No rush on merging this PR. If we do merge it, we need to ensure etcd actually benefits from it. Let's resolve #17098 (comment) first.

Copy link
Member

@serathius serathius Sep 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The goal of the PR is to make compaction independent from snapshot. Not just refactoring it to function.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not just refactor the function.

Just refactoring the function (extract the compact into a separate method) is an independent and safe change, accordingly can be merged soon.

The goal of the PR is to make compaction independent from snapshot

It modifies the logic/semantics, so it's no longer an independent change.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Refactoring it to function" is a subset of "refactoring".

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be clearer about #18635 (comment), I am proposing an independent & safe minor refactoring below as the very first step

ahrtr@efae0d2

select {
// snapshot requested via send()
case m := <-s.r.msgSnapC:
Expand Down Expand Up @@ -2170,6 +2180,22 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
"saved snapshot",
zap.Uint64("snapshot-index", snap.Metadata.Index),
)
}

func (s *EtcdServer) maybeCompactRaftLog(ep *etcdProgress) {
// Retain all log entries up to the latest snapshot index to ensure any member can recover from that snapshot.
// Beyond the snapshot index, preserve the most recent s.Cfg.SnapshotCatchUpEntries entries in memory.
// This allows slow followers to catch up by synchronizing entries instead of requiring a full snapshot transfer.
if ep.snapi <= s.Cfg.SnapshotCatchUpEntries {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if ep.snapi <= s.Cfg.SnapshotCatchUpEntries {
if ep.appliedi <= s.Cfg.SnapshotCatchUpEntries {

Copy link
Member

@serathius serathius Sep 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

snapi is more correct here, the reason is described in the comment.

return
}

compacti := ep.snapi - s.Cfg.SnapshotCatchUpEntries
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
compacti := ep.snapi - s.Cfg.SnapshotCatchUpEntries
compacti := ep.appliedi - s.Cfg.SnapshotCatchUpEntries

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the purpose of the change?
If you use ep.snapi, then the behaviour is exactly the same as existing behaviour, because ep.snapi only gets updated each time after creating the (v2) snapshot.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ahrtr #18622 made a lot of changes, but Serathius and I agreed to keep PRs small. So, in this PR, we just separated the compaction from the snapshot while keeping the existing behavior.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please see #17098 (comment). Can we get that confirmed firstly?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change doesn't change when compaction is run or how many times it's executed. I was aware of https://github.com/etcd-io/raft/blob/5d6eb55c4e6929e461997c9113aba99a5148e921/storage.go#L266-L269 code, that's why I was proposing compacting only ever X applies.

if compacti <= ep.compacti {
return
}

lg := s.Logger()

// When sending a snapshot, etcd will pause compaction.
// After receives a snapshot, the slow follower needs to get all the entries right after
Expand All @@ -2181,13 +2207,7 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
return
}

// keep some in memory log entries for slow followers.
compacti := uint64(1)
if snapi > s.Cfg.SnapshotCatchUpEntries {
compacti = snapi - s.Cfg.SnapshotCatchUpEntries
}

err = s.r.raftStorage.Compact(compacti)
err := s.r.raftStorage.Compact(compacti)
serathius marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
// the compaction was done asynchronously with the progress of raft.
// raft log might already been compact.
Expand All @@ -2196,6 +2216,7 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
}
lg.Panic("failed to compact", zap.Error(err))
}
ep.compacti = compacti
lg.Info(
"compacted Raft logs",
zap.Uint64("compact-index", compacti),
Expand Down
114 changes: 114 additions & 0 deletions tests/integration/raft_log_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
// Copyright 2024 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package integration

import (
"context"
"errors"
"testing"
"time"

"github.com/stretchr/testify/assert"

pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/tests/v3/framework/integration"
)

// TestRaftLogCompaction tests whether raft log snapshot and compaction work correctly.
func TestRaftLogCompaction(t *testing.T) {
integration.BeforeTest(t)

clus := integration.NewCluster(t, &integration.ClusterConfig{
Size: 1,
SnapshotCount: 10,
SnapshotCatchUpEntries: 5,
})
defer clus.Terminate(t)

mem := clus.Members[0]

// Get applied index of raft log
endpoint := mem.Client.Endpoints()[0]
assert.NotEmpty(t, endpoint)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
status, _ := mem.Client.Status(ctx, endpoint)
appliedi := status.RaftAppliedIndex
// Assume applied index is less than 10, should be fine at this stage
assert.Less(t, appliedi, uint64(10))

kvc := integration.ToGRPC(mem.Client).KV

// When applied index is a multiple of 11 (SnapshotCount+1),
// a snapshot is created, and entries are compacted.
//
// increase applied index to 10
for ; appliedi < 10; appliedi++ {
_, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")})
if err != nil {
t.Errorf("#%d: couldn't put key (%v)", appliedi, err)
}
}
// The first snapshot and compaction shouldn't happen because applied index is less than 11
logOccurredAtMostNTimes(t, mem, 5*time.Second, "saved snapshot", 0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Those timeouts are very high, can we not have as big timeout? Snapshot and compaction should happen synchronously to operations, meaning logs should be available immidietly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright, I'll reduce the timeout to 1 or 2 seconds second and see how it works out.

logOccurredAtMostNTimes(t, mem, time.Second, "compacted Raft logs", 0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason sometimes we wait for 1 second, sometimes for 5 seconds?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The first logOccurredAtMostNTimes waits for 5 seconds. Once it's done, we can assume the log is synced up, so the second logOccurredAtMostNTimes doesn't need to wait that long.

Copy link
Member

@serathius serathius Sep 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Logs should appear in in matter of tens of miliseconds not multiple seconds. The whole test should take seconds.


// increase applied index to 11
for ; appliedi < 11; appliedi++ {
_, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")})
if err != nil {
t.Errorf("#%d: couldn't put key (%v)", appliedi, err)
}
}
// The first snapshot and compaction should happen because applied index is 11
logOccurredAtMostNTimes(t, mem, 5*time.Second, "saved snapshot", 1)
serathius marked this conversation as resolved.
Show resolved Hide resolved
logOccurredAtMostNTimes(t, mem, time.Second, "compacted Raft logs", 1)
expectMemberLog(t, mem, time.Second, "\"compact-index\": 6", 1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need to check if compacted Raft log occured at most N times? This is hard to check, why checking if "compact-index": X is not enough ?


// increase applied index to 1100
for ; appliedi < 1100; appliedi++ {
_, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")})
if err != nil {
t.Errorf("#%d: couldn't put key (%v)", appliedi, err)
}
}
// With applied index at 1100, snapshot and compaction should happen 100 times.
logOccurredAtMostNTimes(t, mem, 5*time.Second, "saved snapshot", 100)
logOccurredAtMostNTimes(t, mem, time.Second, "compacted Raft logs", 100)
expectMemberLog(t, mem, time.Second, "\"compact-index\": 1095", 1)
}

// logOccurredAtMostNTimes ensures that the log has exactly `count` occurrences of `s` before timing out, no more, no less.
func logOccurredAtMostNTimes(t *testing.T, m *integration.Member, timeout time.Duration, s string, count int) {
Comment on lines +93 to +94
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// logOccurredAtMostNTimes ensures that the log has exactly `count` occurrences of `s` before timing out, no more, no less.
func logOccurredAtMostNTimes(t *testing.T, m *integration.Member, timeout time.Duration, s string, count int) {
func logOccurredExactlyNTimes(t *testing.T, m *integration.Member, timeout time.Duration, s string, count int) {

ctx, cancel := context.WithTimeout(context.TODO(), timeout)
defer cancel()

// The log must have `count` occurrences before timeout
_, err := m.LogObserver.Expect(ctx, s, count)
if err != nil {
t.Fatalf("failed to expect(log:%s, count:%d): %v", s, count, err)
}

// The log mustn't have `count+1` occurrences before timeout
lines, err := m.LogObserver.Expect(ctx, s, count+1)
serathius marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
return
} else {
t.Fatalf("failed to expect(log:%s, count:%d): %v", s, count+1, err)
}
}
t.Fatalf("failed: too many occurrences of %s, expect %d, got %d", s, count, len(lines))
}
Loading