Skip to content

Commit

Permalink
minimize syscalls in pruning routine (#13425)
Browse files Browse the repository at this point in the history
* minimize syscalls in pruning routine

* terence feedback

* Update beacon-chain/db/filesystem/pruner.go

Co-authored-by: Justin Traglia <[email protected]>

* pr feedback

---------

Co-authored-by: Kasey Kirkham <[email protected]>
Co-authored-by: Justin Traglia <[email protected]>
  • Loading branch information
3 people authored Jan 8, 2024
1 parent 28596d6 commit 5cea6be
Show file tree
Hide file tree
Showing 8 changed files with 662 additions and 281 deletions.
8 changes: 5 additions & 3 deletions beacon-chain/db/filesystem/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ go_library(
"blob.go",
"ephemeral.go",
"metrics.go",
"pruner.go",
],
importpath = "github.com/prysmaticlabs/prysm/v4/beacon-chain/db/filesystem",
visibility = ["//visibility:public"],
Expand All @@ -19,7 +20,6 @@ go_library(
"//proto/prysm/v1alpha1:go_default_library",
"//runtime/logging:go_default_library",
"//time/slots:go_default_library",
"@com_github_ethereum_go_ethereum//common/hexutil:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@com_github_prometheus_client_golang//prometheus:go_default_library",
"@com_github_prometheus_client_golang//prometheus/promauto:go_default_library",
Expand All @@ -30,7 +30,10 @@ go_library(

go_test(
name = "go_default_test",
srcs = ["blob_test.go"],
srcs = [
"blob_test.go",
"pruner_test.go",
],
embed = [":go_default_library"],
deps = [
"//beacon-chain/verification:go_default_library",
Expand All @@ -40,7 +43,6 @@ go_test(
"//proto/prysm/v1alpha1:go_default_library",
"//testing/require:go_default_library",
"//testing/util:go_default_library",
"@com_github_ethereum_go_ethereum//common/hexutil:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@com_github_prysmaticlabs_fastssz//:go_default_library",
"@com_github_spf13_afero//:go_default_library",
Expand Down
179 changes: 33 additions & 146 deletions beacon-chain/db/filesystem/blob.go
Original file line number Diff line number Diff line change
@@ -1,28 +1,21 @@
package filesystem

import (
"encoding/binary"
"fmt"
"io"
"os"
"path"
"path/filepath"
"strconv"
"strings"
"sync/atomic"
"time"

"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/verification"
fieldparams "github.com/prysmaticlabs/prysm/v4/config/fieldparams"
"github.com/prysmaticlabs/prysm/v4/config/params"
"github.com/prysmaticlabs/prysm/v4/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v4/io/file"
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v4/runtime/logging"
"github.com/prysmaticlabs/prysm/v4/time/slots"
log "github.com/sirupsen/logrus"
"github.com/spf13/afero"
)
Expand All @@ -35,7 +28,6 @@ const (
sszExt = "ssz"
partExt = "part"

bufferEpochs = 2
directoryPermissions = 0700
)

Expand All @@ -45,11 +37,11 @@ type BlobStorageOption func(*BlobStorage) error
// WithBlobRetentionEpochs is an option that changes the number of epochs blobs will be persisted.
func WithBlobRetentionEpochs(e primitives.Epoch) BlobStorageOption {
return func(b *BlobStorage) error {
s, err := slots.EpochStart(e + bufferEpochs)
pruner, err := newBlobPruner(b.fs, e)
if err != nil {
return errors.Wrap(err, "could not set retentionSlots")
return err
}
b.retentionSlots = s
b.pruner = pruner
return nil
}
}
Expand All @@ -71,14 +63,29 @@ func NewBlobStorage(base string, opts ...BlobStorageOption) (*BlobStorage, error
return nil, fmt.Errorf("failed to create blob storage at %s: %w", base, err)
}
}
if b.pruner == nil {
log.Warn("Initializing blob filesystem storage with pruning disabled")
}
return b, nil
}

// BlobStorage is the concrete implementation of the filesystem backend for saving and retrieving BlobSidecars.
type BlobStorage struct {
fs afero.Fs
retentionSlots primitives.Slot
prunedBefore atomic.Uint64
fs afero.Fs
pruner *blobPruner
}

// WarmCache runs the prune routine with an expiration of slot of 0, so nothing will be pruned, but the pruner's cache
// will be populated at node startup, avoiding a costly cold prune (~4s in syscalls) during syncing.
func (bs *BlobStorage) WarmCache() {
if bs.pruner == nil {
return
}
go func() {
if err := bs.pruner.prune(0); err != nil {
log.WithError(err).Error("Error encountered while warming up blob pruner cache.")
}
}()
}

// Save saves blobs given a list of sidecars.
Expand All @@ -94,7 +101,9 @@ func (bs *BlobStorage) Save(sidecar blocks.VerifiedROBlob) error {
log.WithFields(logging.BlobFields(sidecar.ROBlob)).Debug("ignoring a duplicate blob sidecar Save attempt")
return nil
}
bs.tryPrune(sidecar.Slot())
if bs.pruner != nil {
bs.pruner.notify(sidecar.BlockRoot(), sidecar.Slot())
}

// Serialize the ethpb.BlobSidecar to binary data using SSZ.
sidecarData, err := sidecar.MarshalSSZ()
Expand All @@ -106,8 +115,12 @@ func (bs *BlobStorage) Save(sidecar blocks.VerifiedROBlob) error {
}
partPath := fname.partPath()

partialMoved := false
// Ensure the partial file is deleted.
defer func() {
if partialMoved {
return
}
// It's expected to error if the save is successful.
err = bs.fs.Remove(partPath)
if err == nil {
Expand Down Expand Up @@ -141,7 +154,8 @@ func (bs *BlobStorage) Save(sidecar blocks.VerifiedROBlob) error {
if err != nil {
return errors.Wrap(err, "failed to rename partial file to final name")
}
blobsTotalGauge.Inc()
partialMoved = true
blobsWrittenCounter.Inc()
blobSaveLatency.Observe(time.Since(startTime).Seconds())
return nil
}
Expand Down Expand Up @@ -218,7 +232,7 @@ func namerForSidecar(sc blocks.VerifiedROBlob) blobNamer {
}

func (p blobNamer) dir() string {
return fmt.Sprintf("%#x", p.root)
return rootString(p.root)
}

func (p blobNamer) fname(ext string) string {
Expand All @@ -233,133 +247,6 @@ func (p blobNamer) path() string {
return p.fname(sszExt)
}

// Prune prunes blobs in the base directory based on the retention epoch.
// It deletes blobs older than currentEpoch - (retentionEpochs+bufferEpochs).
// This is so that we keep a slight buffer and blobs are deleted after n+2 epochs.
func (bs *BlobStorage) Prune(pruneBefore primitives.Slot) error {
t := time.Now()

var dirs []string
err := afero.Walk(bs.fs, ".", func(path string, info os.FileInfo, err error) error {
if err != nil {
return errors.Wrap(err, "failed to walk blob storage directory")
}
if info.IsDir() && path != "." {
dirs = append(dirs, path)
}
return nil
})
if err != nil {
return errors.Wrap(err, "failed to build directories list")
}

var totalPruned int
for _, dir := range dirs {
num, err := bs.processFolder(dir, pruneBefore)
if err != nil {
return errors.Wrapf(err, "failed to process folder %s", dir)
}
blobsPrunedCounter.Add(float64(num))
blobsTotalGauge.Add(-float64(num))
totalPruned += num
}

if totalPruned > 0 {
pruneTime := time.Since(t)
log.WithFields(log.Fields{
"lastPrunedEpoch": slots.ToEpoch(pruneBefore),
"pruneTime": pruneTime,
"numberBlobsPruned": totalPruned,
}).Debug("Pruned old blobs")
}

return nil
}

// processFolder will delete the folder of blobs if the blob slot is outside the
// retention period. We determine the slot by looking at the first blob in the folder.
func (bs *BlobStorage) processFolder(folder string, pruneBefore primitives.Slot) (int, error) {
var f afero.File
var err error
var slot primitives.Slot

for i := 0; i < fieldparams.MaxBlobsPerBlock; i++ {
f, err = bs.fs.Open(filepath.Join(folder, fmt.Sprintf("%d.%s", i, sszExt)))
if err != nil {
log.WithError(err).Debug("Could not open blob file")
continue
}

slot, err = slotFromBlob(f)
if closeErr := f.Close(); closeErr != nil {
log.WithError(closeErr).Error("Could not close blob file")
}
if err != nil {
log.WithError(err).Error("Could not read from blob file")
continue
}
if slot != 0 {
break
}
}

if slot == 0 {
log.WithField("folder", folder).Warn("Could not find slot for folder")
return 0, nil
}

var num int
if slot < pruneBefore {
num, err = bs.countFiles(folder)
if err != nil {
return 0, err
}
if err = bs.fs.RemoveAll(folder); err != nil {
return 0, errors.Wrapf(err, "failed to delete blob %s", f.Name())
}
}
return num, nil
}

// slotFromBlob reads the ssz data of a file at the specified offset (8 + 131072 + 48 + 48 = 131176 bytes),
// which is calculated based on the size of the BlobSidecar struct and is based on the size of the fields
// preceding the slot information within SignedBeaconBlockHeader.
func slotFromBlob(at io.ReaderAt) (primitives.Slot, error) {
b := make([]byte, 8)
_, err := at.ReadAt(b, 131176)
if err != nil {
return 0, err
}
rawSlot := binary.LittleEndian.Uint64(b)
return primitives.Slot(rawSlot), nil
}

// Delete removes the directory matching the provided block root and all the blobs it contains.
func (bs *BlobStorage) Delete(root [32]byte) error {
if err := bs.fs.RemoveAll(hexutil.Encode(root[:])); err != nil {
return fmt.Errorf("failed to delete blobs for root %#x: %w", root, err)
}
return nil
}

// tryPrune checks whether we should prune and then calls prune
func (bs *BlobStorage) tryPrune(latest primitives.Slot) {
pruned := uint64(pruneBefore(latest, bs.retentionSlots))
if bs.prunedBefore.Swap(pruned) == pruned {
return
}
go func() {
if err := bs.Prune(primitives.Slot(pruned)); err != nil {
log.WithError(err).Errorf("failed to prune blobs from slot %d", latest)
}
}()
}

func pruneBefore(latest primitives.Slot, offset primitives.Slot) primitives.Slot {
// Safely compute the first slot in the epoch for the latest slot
latest = latest - latest%params.BeaconConfig().SlotsPerEpoch
if latest < offset {
return 0
}
return latest - offset
func rootString(root [32]byte) string {
return fmt.Sprintf("%#x", root)
}
Loading

0 comments on commit 5cea6be

Please sign in to comment.