Skip to content

Commit

Permalink
etcdserver: change the snapshot + compact into sync operation
Browse files Browse the repository at this point in the history
Signed-off-by: Clement <[email protected]>
  • Loading branch information
clement2026 committed Jul 4, 2024
1 parent c9fdc60 commit d820cd2
Showing 1 changed file with 48 additions and 50 deletions.
98 changes: 48 additions & 50 deletions server/etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2126,65 +2126,63 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
// the go routine created below.
s.KV().Commit()

s.GoAttach(func() {
lg := s.Logger()
lg := s.Logger()

// For backward compatibility, generate v2 snapshot from v3 state.
snap, err := s.r.raftStorage.CreateSnapshot(snapi, &confState, d)
if err != nil {
// the snapshot was done asynchronously with the progress of raft.
// raft might have already got a newer snapshot.
if err == raft.ErrSnapOutOfDate {
return
}
lg.Panic("failed to create snapshot", zap.Error(err))
// For backward compatibility, generate v2 snapshot from v3 state.
snap, err := s.r.raftStorage.CreateSnapshot(snapi, &confState, d)
if err != nil {
// the snapshot was done asynchronously with the progress of raft.
// raft might have already got a newer snapshot.
if err == raft.ErrSnapOutOfDate {
return
}
lg.Panic("failed to create snapshot", zap.Error(err))
}

verifyConsistentIndexIsLatest(lg, snap, s.consistIndex.ConsistentIndex())
verifyConsistentIndexIsLatest(lg, snap, s.consistIndex.ConsistentIndex())

// SaveSnap saves the snapshot to file and appends the corresponding WAL entry.
if err = s.r.storage.SaveSnap(snap); err != nil {
lg.Panic("failed to save snapshot", zap.Error(err))
}
if err = s.r.storage.Release(snap); err != nil {
lg.Panic("failed to release wal", zap.Error(err))
}
// SaveSnap saves the snapshot to file and appends the corresponding WAL entry.
if err = s.r.storage.SaveSnap(snap); err != nil {
lg.Panic("failed to save snapshot", zap.Error(err))
}
if err = s.r.storage.Release(snap); err != nil {
lg.Panic("failed to release wal", zap.Error(err))
}

lg.Info(
"saved snapshot",
zap.Uint64("snapshot-index", snap.Metadata.Index),
)
lg.Info(
"saved snapshot",
zap.Uint64("snapshot-index", snap.Metadata.Index),
)

// When sending a snapshot, etcd will pause compaction.
// After receives a snapshot, the slow follower needs to get all the entries right after
// the snapshot sent to catch up. If we do not pause compaction, the log entries right after
// the snapshot sent might already be compacted. It happens when the snapshot takes long time
// to send and save. Pausing compaction avoids triggering a snapshot sending cycle.
if atomic.LoadInt64(&s.inflightSnapshots) != 0 {
lg.Info("skip compaction since there is an inflight snapshot")
return
}
// When sending a snapshot, etcd will pause compaction.
// After receives a snapshot, the slow follower needs to get all the entries right after
// the snapshot sent to catch up. If we do not pause compaction, the log entries right after
// the snapshot sent might already be compacted. It happens when the snapshot takes long time
// to send and save. Pausing compaction avoids triggering a snapshot sending cycle.
if atomic.LoadInt64(&s.inflightSnapshots) != 0 {
lg.Info("skip compaction since there is an inflight snapshot")
return
}

// keep some in memory log entries for slow followers.
compacti := uint64(1)
if snapi > s.Cfg.SnapshotCatchUpEntries {
compacti = snapi - s.Cfg.SnapshotCatchUpEntries
}
// keep some in memory log entries for slow followers.
compacti := uint64(1)
if snapi > s.Cfg.SnapshotCatchUpEntries {
compacti = snapi - s.Cfg.SnapshotCatchUpEntries
}

err = s.r.raftStorage.Compact(compacti)
if err != nil {
// the compaction was done asynchronously with the progress of raft.
// raft log might already been compact.
if err == raft.ErrCompacted {
return
}
lg.Panic("failed to compact", zap.Error(err))
err = s.r.raftStorage.Compact(compacti)
if err != nil {
// the compaction was done asynchronously with the progress of raft.
// raft log might already been compact.
if err == raft.ErrCompacted {
return
}
lg.Info(
"compacted Raft logs",
zap.Uint64("compact-index", compacti),
)
})
lg.Panic("failed to compact", zap.Error(err))
}
lg.Info(
"compacted Raft logs",
zap.Uint64("compact-index", compacti),
)
}

// CutPeer drops messages to the specified peer.
Expand Down

0 comments on commit d820cd2

Please sign in to comment.