diff --git a/cmd/access/node_builder/access_node_builder.go b/cmd/access/node_builder/access_node_builder.go index a8f238ae0a3..a262970eb61 100644 --- a/cmd/access/node_builder/access_node_builder.go +++ b/cmd/access/node_builder/access_node_builder.go @@ -76,7 +76,7 @@ import ( "github.com/onflow/flow-go/module/execution" "github.com/onflow/flow-go/module/executiondatasync/execution_data" execdatacache "github.com/onflow/flow-go/module/executiondatasync/execution_data/cache" - "github.com/onflow/flow-go/module/executiondatasync/pruner" + edpruner "github.com/onflow/flow-go/module/executiondatasync/pruner" edstorage "github.com/onflow/flow-go/module/executiondatasync/storage" "github.com/onflow/flow-go/module/executiondatasync/tracker" finalizer "github.com/onflow/flow-go/module/finalizer/consensus" @@ -180,6 +180,9 @@ type AccessNodeConfig struct { versionControlEnabled bool storeTxResultErrorMessages bool stopControlEnabled bool + registerDBPruningEnabled bool + registerDBPruneTickerInterval time.Duration + registerDBPruneThrottleDelay time.Duration registerDBPruneThreshold uint64 } @@ -273,8 +276,8 @@ func DefaultAccessNodeConfig() *AccessNodeConfig { executionDataIndexingEnabled: false, executionDataDBMode: execution_data.ExecutionDataDBModeBadger.String(), executionDataPrunerHeightRangeTarget: 0, - executionDataPrunerThreshold: pruner.DefaultThreshold, - executionDataPruningInterval: pruner.DefaultPruningInterval, + executionDataPrunerThreshold: edpruner.DefaultThreshold, + executionDataPruningInterval: edpruner.DefaultPruningInterval, registersDBPath: filepath.Join(homedir, ".flow", "execution_state"), checkpointFile: cmd.NotSet, scriptExecutorConfig: query.NewDefaultConfig(), @@ -287,7 +290,10 @@ func DefaultAccessNodeConfig() *AccessNodeConfig { versionControlEnabled: true, storeTxResultErrorMessages: false, stopControlEnabled: false, - registerDBPruneThreshold: pruner.DefaultThreshold, + registerDBPruningEnabled: false, + registerDBPruneTickerInterval: pstorage.DefaultPruneTickerInterval, + registerDBPruneThrottleDelay: pstorage.DefaultPruneThrottleDelay, + registerDBPruneThreshold: pstorage.DefaultPruneThreshold, } } @@ -310,8 +316,9 @@ type FlowAccessNodeBuilder struct { BlocksToMarkExecuted *stdmap.Times BlockTransactions *stdmap.IdentifierMap TransactionMetrics *metrics.TransactionCollector - TransactionValidationMetrics *metrics.TransactionValidationCollector RestMetrics *metrics.RestCollector + RegisterDBPrunerMetrics *metrics.RegisterDBPrunerCollector + TransactionValidationMetrics *metrics.TransactionValidationCollector AccessMetrics module.AccessMetrics PingMetrics module.PingMetrics Committee hotstuff.DynamicCommittee @@ -334,11 +341,13 @@ type FlowAccessNodeBuilder struct { TxResultsIndex *index.TransactionResultsIndex IndexerDependencies *cmd.DependencyList collectionExecutedMetric module.CollectionExecutedMetric - ExecutionDataPruner *pruner.Pruner + ExecutionDataPruner *edpruner.Pruner ExecutionDatastoreManager edstorage.DatastoreManager ExecutionDataTracker tracker.Storage VersionControl *version.VersionControl StopControl *stop.StopControl + RegisterDB *pebble.DB + RegisterDBPrunerDependencies *cmd.DependencyList // The sync engine participants provider is the libp2p peer store for the access node // which is not available until after the network has started. @@ -561,6 +570,10 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess requesterDependable := module.NewProxiedReadyDoneAware() builder.IndexerDependencies.Add(requesterDependable) + // setup dependency chain to ensure register db pruner starts after the indexer + indexerDependable := module.NewProxiedReadyDoneAware() + builder.RegisterDBPrunerDependencies.Add(indexerDependable) + executionDataPrunerEnabled := builder.executionDataPrunerHeightRangeTarget != 0 builder. @@ -790,16 +803,16 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess } var err error - builder.ExecutionDataPruner, err = pruner.NewPruner( + builder.ExecutionDataPruner, err = edpruner.NewPruner( node.Logger, prunerMetrics, builder.ExecutionDataTracker, - pruner.WithPruneCallback(func(ctx context.Context) error { + edpruner.WithPruneCallback(func(ctx context.Context) error { return builder.ExecutionDatastoreManager.CollectGarbage(ctx) }), - pruner.WithHeightRangeTarget(builder.executionDataPrunerHeightRangeTarget), - pruner.WithThreshold(builder.executionDataPrunerThreshold), - pruner.WithPruningInterval(builder.executionDataPruningInterval), + edpruner.WithHeightRangeTarget(builder.executionDataPrunerHeightRangeTarget), + edpruner.WithThreshold(builder.executionDataPrunerThreshold), + edpruner.WithPruningInterval(builder.executionDataPruningInterval), ) if err != nil { return nil, fmt.Errorf("failed to create execution data pruner: %w", err) @@ -864,16 +877,16 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess // Note: using a DependableComponent here to ensure that the indexer does not block // other components from starting while bootstrapping the register db since it may // take hours to complete. - - pdb, err := pstorage.OpenRegisterPebbleDB(builder.registersDBPath) + var err error + builder.RegisterDB, err = pstorage.OpenRegisterPebbleDB(builder.registersDBPath) if err != nil { return nil, fmt.Errorf("could not open registers db: %w", err) } builder.ShutdownFunc(func() error { - return pdb.Close() + return builder.RegisterDB.Close() }) - bootstrapped, err := pstorage.IsBootstrapped(pdb) + bootstrapped, err := pstorage.IsBootstrapped(builder.RegisterDB) if err != nil { return nil, fmt.Errorf("could not check if registers db is bootstrapped: %w", err) } @@ -905,7 +918,7 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess } rootHash := ledger.RootHash(builder.RootSeal.FinalState) - bootstrap, err := pstorage.NewRegisterBootstrap(pdb, checkpointFile, checkpointHeight, rootHash, builder.Logger) + bootstrap, err := pstorage.NewRegisterBootstrap(builder.RegisterDB, checkpointFile, checkpointHeight, rootHash, builder.Logger) if err != nil { return nil, fmt.Errorf("could not create registers bootstrap: %w", err) } @@ -918,7 +931,7 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess } } - registers, err := pstorage.NewRegisters(pdb, builder.registerDBPruneThreshold) + registers, err := pstorage.NewRegisters(builder.RegisterDB, builder.registerDBPruneThreshold) if err != nil { return nil, fmt.Errorf("could not create registers storage: %w", err) } @@ -1016,8 +1029,31 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess builder.StopControl.RegisterHeightRecorder(builder.ExecutionIndexer) } + // add indexer into ReadyDoneAware dependency passed to pruner. This allows the register db pruner + // to wait for the indexer to be ready before starting. + indexerDependable.Init(builder.ExecutionIndexer) + return builder.ExecutionIndexer, nil - }, builder.IndexerDependencies) + }, builder.IndexerDependencies). + DependableComponent("register db pruner", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) { + if !builder.registerDBPruningEnabled { + return &module.NoopReadyDoneAware{}, nil + } + + registerDBPruner, err := pstorage.NewRegisterPruner( + node.Logger, + builder.RegisterDB, + builder.RegisterDBPrunerMetrics, + pstorage.WithPruneThreshold(builder.registerDBPruneThreshold), + pstorage.WithPruneThrottleDelay(builder.registerDBPruneThrottleDelay), + pstorage.WithPruneTickerInterval(builder.registerDBPruneTickerInterval), + ) + if err != nil { + return nil, fmt.Errorf("failed to create register db pruner: %w", err) + } + + return registerDBPruner, nil + }, builder.RegisterDBPrunerDependencies) } if builder.stateStreamConf.ListenAddr != "" { @@ -1144,10 +1180,11 @@ func FlowAccessNode(nodeBuilder *cmd.FlowNodeBuilder) *FlowAccessNodeBuilder { dist := consensuspubsub.NewFollowerDistributor() dist.AddProposalViolationConsumer(notifications.NewSlashingViolationsConsumer(nodeBuilder.Logger)) return &FlowAccessNodeBuilder{ - AccessNodeConfig: DefaultAccessNodeConfig(), - FlowNodeBuilder: nodeBuilder, - FollowerDistributor: dist, - IndexerDependencies: cmd.NewDependencyList(), + AccessNodeConfig: DefaultAccessNodeConfig(), + FlowNodeBuilder: nodeBuilder, + FollowerDistributor: dist, + IndexerDependencies: cmd.NewDependencyList(), + RegisterDBPrunerDependencies: cmd.NewDependencyList(), } } @@ -1331,6 +1368,8 @@ func (builder *FlowAccessNodeBuilder) extraFlags() { "execution-data-db", defaultConfig.executionDataDBMode, "[experimental] the DB type for execution datastore. One of [badger, pebble]") + + // Execution data pruner flags.Uint64Var(&builder.executionDataPrunerHeightRangeTarget, "execution-data-height-range-target", defaultConfig.executionDataPrunerHeightRangeTarget, @@ -1344,6 +1383,20 @@ func (builder *FlowAccessNodeBuilder) extraFlags() { defaultConfig.executionDataPruningInterval, "duration after which the pruner tries to prune execution data. The default value is 10 minutes") + // RegisterDB pruning + flags.BoolVar(&builder.registerDBPruningEnabled, + "registerdb-pruning-enabled", + defaultConfig.registerDBPruningEnabled, + "whether to enable pruning for the register db") + flags.DurationVar(&builder.registerDBPruneThrottleDelay, + "registerdb-prune-throttle-delay", + defaultConfig.registerDBPruneThrottleDelay, + "delay between batches of registers during register db pruning") + flags.DurationVar(&builder.registerDBPruneTickerInterval, + "registerdb-prune-ticker-interval", + defaultConfig.registerDBPruneTickerInterval, + "interval between register db pruning cycles. The default value is 10 minutes") + // Execution State Streaming API flags.Uint32Var(&builder.stateStreamConf.ExecutionDataCacheSize, "execution-data-cache-size", defaultConfig.stateStreamConf.ExecutionDataCacheSize, "block execution data cache size") flags.Uint32Var(&builder.stateStreamConf.MaxGlobalStreams, "state-stream-global-max-streams", defaultConfig.stateStreamConf.MaxGlobalStreams, "global maximum number of concurrent streams") @@ -1732,12 +1785,17 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) { builder.RestMetrics = m return nil }). + Module("register db metrics", func(node *cmd.NodeConfig) error { + builder.RegisterDBPrunerMetrics = metrics.NewRegisterDBPrunerCollector() + return nil + }). Module("access metrics", func(node *cmd.NodeConfig) error { builder.AccessMetrics = metrics.NewAccessCollector( metrics.WithTransactionMetrics(builder.TransactionMetrics), metrics.WithTransactionValidationMetrics(builder.TransactionValidationMetrics), metrics.WithBackendScriptsMetrics(builder.TransactionMetrics), metrics.WithRestMetrics(builder.RestMetrics), + metrics.WithRegisterDBPrunerMetrics(builder.RegisterDBPrunerMetrics), ) return nil }). diff --git a/cmd/observer/node_builder/observer_builder.go b/cmd/observer/node_builder/observer_builder.go index 1bb6a8c04bb..84d372f108c 100644 --- a/cmd/observer/node_builder/observer_builder.go +++ b/cmd/observer/node_builder/observer_builder.go @@ -71,7 +71,7 @@ import ( "github.com/onflow/flow-go/module/execution" "github.com/onflow/flow-go/module/executiondatasync/execution_data" execdatacache "github.com/onflow/flow-go/module/executiondatasync/execution_data/cache" - "github.com/onflow/flow-go/module/executiondatasync/pruner" + edpruner "github.com/onflow/flow-go/module/executiondatasync/pruner" edstorage "github.com/onflow/flow-go/module/executiondatasync/storage" "github.com/onflow/flow-go/module/executiondatasync/tracker" finalizer "github.com/onflow/flow-go/module/finalizer/consensus" @@ -168,6 +168,9 @@ type ObserverServiceConfig struct { registerCacheType string registerCacheSize uint programCacheSize uint + registerDBPruningEnabled bool + registerDBPruneTickerInterval time.Duration + registerDBPruneThrottleDelay time.Duration registerDBPruneThreshold uint64 websocketConfig websockets.Config } @@ -233,8 +236,8 @@ func DefaultObserverServiceConfig() *ObserverServiceConfig { executionDataIndexingEnabled: false, executionDataDBMode: execution_data.ExecutionDataDBModeBadger.String(), executionDataPrunerHeightRangeTarget: 0, - executionDataPrunerThreshold: pruner.DefaultThreshold, - executionDataPruningInterval: pruner.DefaultPruningInterval, + executionDataPrunerThreshold: edpruner.DefaultThreshold, + executionDataPruningInterval: edpruner.DefaultPruningInterval, localServiceAPIEnabled: false, versionControlEnabled: true, stopControlEnabled: false, @@ -248,13 +251,16 @@ func DefaultObserverServiceConfig() *ObserverServiceConfig { RetryDelay: edrequester.DefaultRetryDelay, MaxRetryDelay: edrequester.DefaultMaxRetryDelay, }, - scriptExecMinBlock: 0, - scriptExecMaxBlock: math.MaxUint64, - registerCacheType: pstorage.CacheTypeTwoQueue.String(), - registerCacheSize: 0, - programCacheSize: 0, - registerDBPruneThreshold: pruner.DefaultThreshold, - websocketConfig: websockets.NewDefaultWebsocketConfig(), + scriptExecMinBlock: 0, + scriptExecMaxBlock: math.MaxUint64, + registerCacheType: pstorage.CacheTypeTwoQueue.String(), + registerCacheSize: 0, + programCacheSize: 0, + registerDBPruningEnabled: false, + registerDBPruneTickerInterval: pstorage.DefaultPruneTickerInterval, + registerDBPruneThrottleDelay: pstorage.DefaultPruneThrottleDelay, + registerDBPruneThreshold: pstorage.DefaultPruneThreshold, + websocketConfig: websockets.NewDefaultWebsocketConfig(), } } @@ -287,10 +293,13 @@ type ObserverServiceBuilder struct { ExecutionDataRequester state_synchronization.ExecutionDataRequester ExecutionDataStore execution_data.ExecutionDataStore ExecutionDataBlobstore blobs.Blobstore - ExecutionDataPruner *pruner.Pruner + ExecutionDataPruner *edpruner.Pruner ExecutionDatastoreManager edstorage.DatastoreManager ExecutionDataTracker tracker.Storage + RegisterDB *pebble.DB + RegisterDBPrunerDependencies *cmd.DependencyList + RegistersAsyncStore *execution.RegistersAsyncStore Reporter *index.Reporter EventsIndex *index.EventsIndex @@ -308,9 +317,10 @@ type ObserverServiceBuilder struct { // Public network peerID peer.ID - TransactionMetrics *metrics.TransactionCollector - RestMetrics *metrics.RestCollector - AccessMetrics module.AccessMetrics + TransactionMetrics *metrics.TransactionCollector + RestMetrics *metrics.RestCollector + AccessMetrics module.AccessMetrics + RegisterDBPrunerMetrics *metrics.RegisterDBPrunerCollector // grpc servers secureGrpcServer *grpcserver.GrpcServer @@ -579,10 +589,11 @@ func NewFlowObserverServiceBuilder(opts ...Option) *ObserverServiceBuilder { opt(config) } anb := &ObserverServiceBuilder{ - ObserverServiceConfig: config, - FlowNodeBuilder: cmd.FlowNode("observer"), - FollowerDistributor: pubsub.NewFollowerDistributor(), - IndexerDependencies: cmd.NewDependencyList(), + ObserverServiceConfig: config, + FlowNodeBuilder: cmd.FlowNode("observer"), + FollowerDistributor: pubsub.NewFollowerDistributor(), + IndexerDependencies: cmd.NewDependencyList(), + RegisterDBPrunerDependencies: cmd.NewDependencyList(), } anb.FollowerDistributor.AddProposalViolationConsumer(notifications.NewSlashingViolationsConsumer(anb.Logger)) // the observer gets a version of the root snapshot file that does not contain any node addresses @@ -709,6 +720,20 @@ func (builder *ObserverServiceBuilder) extraFlags() { defaultConfig.executionDataPruningInterval, "duration after which the pruner tries to prune execution data. The default value is 10 minutes") + // RegisterDB pruning + flags.BoolVar(&builder.registerDBPruningEnabled, + "registerdb-pruning-enabled", + defaultConfig.registerDBPruningEnabled, + "whether to enable pruning for the register db") + flags.DurationVar(&builder.registerDBPruneThrottleDelay, + "registerdb-prune-throttle-delay", + defaultConfig.registerDBPruneThrottleDelay, + "delay between batches of registers during register db pruning") + flags.DurationVar(&builder.registerDBPruneTickerInterval, + "registerdb-prune-ticker-interval", + defaultConfig.registerDBPruneTickerInterval, + "interval between register db pruning cycles. The default value is 10 minutes") + // ExecutionDataRequester config flags.BoolVar(&builder.executionDataSyncEnabled, "execution-data-sync-enabled", @@ -1064,6 +1089,10 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS requesterDependable := module.NewProxiedReadyDoneAware() builder.IndexerDependencies.Add(requesterDependable) + // setup dependency chain to ensure register db pruner starts after the indexer + indexerDependable := module.NewProxiedReadyDoneAware() + builder.RegisterDBPrunerDependencies.Add(indexerDependable) + executionDataPrunerEnabled := builder.executionDataPrunerHeightRangeTarget != 0 builder. @@ -1286,16 +1315,16 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS } var err error - builder.ExecutionDataPruner, err = pruner.NewPruner( + builder.ExecutionDataPruner, err = edpruner.NewPruner( node.Logger, prunerMetrics, builder.ExecutionDataTracker, - pruner.WithPruneCallback(func(ctx context.Context) error { + edpruner.WithPruneCallback(func(ctx context.Context) error { return builder.ExecutionDatastoreManager.CollectGarbage(ctx) }), - pruner.WithHeightRangeTarget(builder.executionDataPrunerHeightRangeTarget), - pruner.WithThreshold(builder.executionDataPrunerThreshold), - pruner.WithPruningInterval(builder.executionDataPruningInterval), + edpruner.WithHeightRangeTarget(builder.executionDataPrunerHeightRangeTarget), + edpruner.WithThreshold(builder.executionDataPrunerThreshold), + edpruner.WithPruningInterval(builder.executionDataPruningInterval), ) if err != nil { return nil, fmt.Errorf("failed to create execution data pruner: %w", err) @@ -1319,16 +1348,16 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS // Note: using a DependableComponent here to ensure that the indexer does not block // other components from starting while bootstrapping the register db since it may // take hours to complete. - - pdb, err := pstorage.OpenRegisterPebbleDB(builder.registersDBPath) + var err error + builder.RegisterDB, err = pstorage.OpenRegisterPebbleDB(builder.registersDBPath) if err != nil { return nil, fmt.Errorf("could not open registers db: %w", err) } builder.ShutdownFunc(func() error { - return pdb.Close() + return builder.RegisterDB.Close() }) - bootstrapped, err := pstorage.IsBootstrapped(pdb) + bootstrapped, err := pstorage.IsBootstrapped(builder.RegisterDB) if err != nil { return nil, fmt.Errorf("could not check if registers db is bootstrapped: %w", err) } @@ -1360,7 +1389,7 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS } rootHash := ledger.RootHash(builder.RootSeal.FinalState) - bootstrap, err := pstorage.NewRegisterBootstrap(pdb, checkpointFile, checkpointHeight, rootHash, builder.Logger) + bootstrap, err := pstorage.NewRegisterBootstrap(builder.RegisterDB, checkpointFile, checkpointHeight, rootHash, builder.Logger) if err != nil { return nil, fmt.Errorf("could not create registers bootstrap: %w", err) } @@ -1373,7 +1402,7 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS } } - registers, err := pstorage.NewRegisters(pdb, builder.registerDBPruneThreshold) + registers, err := pstorage.NewRegisters(builder.RegisterDB, builder.registerDBPruneThreshold) if err != nil { return nil, fmt.Errorf("could not create registers storage: %w", err) } @@ -1472,8 +1501,31 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS builder.StopControl.RegisterHeightRecorder(builder.ExecutionIndexer) } + // add indexer into ReadyDoneAware dependency passed to pruner. This allows the register db pruner + // to wait for the indexer to be ready before starting. + indexerDependable.Init(builder.ExecutionIndexer) + return builder.ExecutionIndexer, nil - }, builder.IndexerDependencies) + }, builder.IndexerDependencies). + DependableComponent("register db pruner", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) { + if !builder.registerDBPruningEnabled { + return &module.NoopReadyDoneAware{}, nil + } + + registerDBPruner, err := pstorage.NewRegisterPruner( + node.Logger, + builder.RegisterDB, + builder.RegisterDBPrunerMetrics, + pstorage.WithPruneThreshold(builder.registerDBPruneThreshold), + pstorage.WithPruneThrottleDelay(builder.registerDBPruneThrottleDelay), + pstorage.WithPruneTickerInterval(builder.registerDBPruneTickerInterval), + ) + if err != nil { + return nil, fmt.Errorf("failed to create register db pruner: %w", err) + } + + return registerDBPruner, nil + }, builder.RegisterDBPrunerDependencies) } if builder.stateStreamConf.ListenAddr != "" { @@ -1702,11 +1754,16 @@ func (builder *ObserverServiceBuilder) enqueueRPCServer() { builder.RestMetrics = m return nil }) + builder.Module("register db metrics", func(node *cmd.NodeConfig) error { + builder.RegisterDBPrunerMetrics = metrics.NewRegisterDBPrunerCollector() + return nil + }) builder.Module("access metrics", func(node *cmd.NodeConfig) error { builder.AccessMetrics = metrics.NewAccessCollector( metrics.WithTransactionMetrics(builder.TransactionMetrics), metrics.WithBackendScriptsMetrics(builder.TransactionMetrics), metrics.WithRestMetrics(builder.RestMetrics), + metrics.WithRegisterDBPrunerMetrics(builder.RegisterDBPrunerMetrics), ) return nil }) diff --git a/integration/go.mod b/integration/go.mod index eff334d5a9d..f927c1b76b9 100644 --- a/integration/go.mod +++ b/integration/go.mod @@ -6,6 +6,7 @@ require ( cloud.google.com/go/bigquery v1.59.1 github.com/VividCortex/ewma v1.2.0 github.com/btcsuite/btcd/chaincfg/chainhash v1.0.2 + github.com/cockroachdb/pebble v1.1.1 github.com/coreos/go-semver v0.3.0 github.com/dapperlabs/testingdock v0.4.5-0.20231020233342-a2853fe18724 github.com/dgraph-io/badger/v2 v2.2007.4 @@ -87,7 +88,6 @@ require ( github.com/cockroachdb/errors v1.11.3 // indirect github.com/cockroachdb/fifo v0.0.0-20240606204812-0bbfbd93a7ce // indirect github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b // indirect - github.com/cockroachdb/pebble v1.1.1 // indirect github.com/cockroachdb/redact v1.1.5 // indirect github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06 // indirect github.com/consensys/bavard v0.1.13 // indirect diff --git a/integration/testnet/container.go b/integration/testnet/container.go index e466363b3e4..453551f004b 100644 --- a/integration/testnet/container.go +++ b/integration/testnet/container.go @@ -257,6 +257,10 @@ func (c *Container) DBPath() string { return filepath.Join(c.datadir, DefaultFlowDBDir) } +func (c *Container) PebbleDBPath() string { + return filepath.Join(c.datadir, DefaultExecutionStateDir) +} + func (c *Container) ExecutionDataDBPath() string { return filepath.Join(c.datadir, DefaultExecutionDataServiceDir) } diff --git a/integration/tests/access/cohort3/register_db_pruning_test.go b/integration/tests/access/cohort3/register_db_pruning_test.go new file mode 100644 index 00000000000..6212436f15b --- /dev/null +++ b/integration/tests/access/cohort3/register_db_pruning_test.go @@ -0,0 +1,233 @@ +package cohort3 + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/cockroachdb/pebble" + sdk "github.com/onflow/flow-go-sdk" + accessproto "github.com/onflow/flow/protobuf/go/flow/access" + "github.com/onflow/flow/protobuf/go/flow/entities" + "github.com/onflow/flow/protobuf/go/flow/executiondata" + "github.com/rs/zerolog" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + + "github.com/onflow/flow-go/integration/testnet" + "github.com/onflow/flow-go/model/flow" + pstorage "github.com/onflow/flow-go/storage/pebble" + "github.com/onflow/flow-go/utils/unittest" +) + +const TestRegisterDBPruningThreshold uint64 = 60 +const TestPruneTickerInterval uint64 = 60 + +type RegisterDBPruningSuite struct { + suite.Suite + + log zerolog.Logger + + accessNodeName string + observerNodeName string + + pebbleDb *pebble.DB + + // root context for the current test + ctx context.Context + cancel context.CancelFunc + + net *testnet.FlowNetwork +} + +func TestRegisterDBPruning(t *testing.T) { + suite.Run(t, new(RegisterDBPruningSuite)) +} + +func (s *RegisterDBPruningSuite) TearDownTest() { + s.log.Info().Msg("================> Start TearDownTest") + s.net.Remove() + s.cancel() + s.log.Info().Msg("================> Finish TearDownTest") +} + +func (s *RegisterDBPruningSuite) SetupTest() { + s.log = unittest.LoggerForTest(s.Suite.T(), zerolog.ErrorLevel) + s.log.Info().Msg("================> SetupTest") + defer func() { + s.log.Info().Msg("================> Finish SetupTest") + }() + + // access node + s.accessNodeName = testnet.PrimaryAN + accessNodeConfig := testnet.NewNodeConfig( + flow.RoleAccess, + testnet.WithLogLevel(zerolog.ErrorLevel), + testnet.WithAdditionalFlag("--supports-observer=true"), + testnet.WithAdditionalFlag("--execution-data-sync-enabled=true"), + testnet.WithAdditionalFlagf("--execution-data-dir=%s", testnet.DefaultExecutionDataServiceDir), + testnet.WithAdditionalFlag("--execution-data-retry-delay=1s"), + testnet.WithAdditionalFlag("--execution-data-indexing-enabled=true"), + testnet.WithAdditionalFlagf("--execution-state-dir=%s", testnet.DefaultExecutionStateDir), + testnet.WithAdditionalFlagf("--public-network-execution-data-sync-enabled=true"), + testnet.WithAdditionalFlagf("--event-query-mode=local-only"), + //testnet.WithAdditionalFlag("--registerdb-pruning-enabled=true"), + //testnet.WithAdditionalFlagf("--registerdb-prune-ticker-interval=%ds", TestPruneTickerInterval), + //testnet.WithAdditionalFlagf("--registerdb-pruning-threshold=%d", TestRegisterDBPruningThreshold), + ) + + consensusConfigs := []func(config *testnet.NodeConfig){ + testnet.WithAdditionalFlag("--cruise-ctl-fallback-proposal-duration=400ms"), + testnet.WithAdditionalFlag(fmt.Sprintf("--required-verification-seal-approvals=%d", 1)), + testnet.WithAdditionalFlag(fmt.Sprintf("--required-construction-seal-approvals=%d", 1)), + testnet.WithLogLevel(zerolog.FatalLevel), + } + + nodeConfigs := []testnet.NodeConfig{ + testnet.NewNodeConfig(flow.RoleCollection, testnet.WithLogLevel(zerolog.FatalLevel)), + testnet.NewNodeConfig(flow.RoleCollection, testnet.WithLogLevel(zerolog.FatalLevel)), + testnet.NewNodeConfig(flow.RoleExecution, testnet.WithLogLevel(zerolog.FatalLevel)), + testnet.NewNodeConfig(flow.RoleExecution, testnet.WithLogLevel(zerolog.FatalLevel)), + testnet.NewNodeConfig(flow.RoleConsensus, consensusConfigs...), + testnet.NewNodeConfig(flow.RoleConsensus, consensusConfigs...), + testnet.NewNodeConfig(flow.RoleConsensus, consensusConfigs...), + testnet.NewNodeConfig(flow.RoleVerification, testnet.WithLogLevel(zerolog.FatalLevel)), + accessNodeConfig, // access_1 + } + + // add the observer node config + s.observerNodeName = testnet.PrimaryON + + observers := []testnet.ObserverConfig{{ + ContainerName: s.observerNodeName, + LogLevel: zerolog.ErrorLevel, + AdditionalFlags: []string{ + fmt.Sprintf("--execution-data-dir=%s", testnet.DefaultExecutionDataServiceDir), + fmt.Sprintf("--execution-state-dir=%s", testnet.DefaultExecutionStateDir), + "--execution-data-sync-enabled=true", + "--execution-data-indexing-enabled=true", + "--execution-data-retry-delay=1s", + "--event-query-mode=local-only", + "--local-service-api-enabled=true", + "--registerdb-pruning-enabled=true", + fmt.Sprintf("--registerdb-prune-ticker-interval=%ds", TestPruneTickerInterval), + fmt.Sprintf("--registerdb-pruning-threshold=%d", TestRegisterDBPruningThreshold), + }, + }} + + conf := testnet.NewNetworkConfig("register_db_pruning", nodeConfigs, testnet.WithObservers(observers...)) + s.net = testnet.PrepareFlowNetwork(s.T(), conf, flow.Localnet) + + // start the network + s.T().Logf("starting flow network with docker containers") + s.ctx, s.cancel = context.WithCancel(context.Background()) + + s.net.Start(s.ctx) +} + +func (s *RegisterDBPruningSuite) TestHappyPath() { + accessNode := s.net.ContainerByName(s.accessNodeName) + observerNode := s.net.ContainerByName(s.observerNodeName) + // + waitingBlockHeight := uint64(200) + s.waitUntilExecutionDataForBlockIndexed(observerNode, waitingBlockHeight) + s.net.StopContainers() + + //1. Get AN register height boundaries + pebbleAN := s.getPebbleDB(accessNode.PebbleDBPath()) + _, latestHeightAN, err := pstorage.ReadHeightsFromBootstrappedDB(pebbleAN) + require.NoError(s.T(), err, "could not read from AN db") + + //2. Get AN register height boundaries + pebbleON := s.getPebbleDB(observerNode.PebbleDBPath()) + firstHeightON, latestHeightON, err := pstorage.ReadHeightsFromBootstrappedDB(pebbleON) + require.NoError(s.T(), err, "could not read from ON db") + + //4. Check heights for equality on both AN and ON nodes + assert.Equal(s.T(), latestHeightAN, latestHeightON) + + //5. Check if first height is less or equal predicted first height + //registerAN := s.nodeRegisterStorage(pebbleAN) + //assert.GreaterOrEqual(s.T(), registerAN.FirstHeight(), firstHeightAN) + registerON := s.nodeRegisterStorage(pebbleON) + assert.GreaterOrEqual(s.T(), registerON.FirstHeight(), firstHeightON) +} + +func (s *RegisterDBPruningSuite) getPebbleDB(path string) *pebble.DB { + pebbleDb, err := pstorage.OpenRegisterPebbleDB(path) + require.NoError(s.T(), err, "could not open db") + + return pebbleDb +} + +func (s *RegisterDBPruningSuite) nodeRegisterStorage(db *pebble.DB) *pstorage.Registers { + registers, err := pstorage.NewRegisters(db, TestRegisterDBPruningThreshold) + s.Require().NoError(err) + return registers +} + +// getGRPCClient is the helper func to create an access api client +func (s *RegisterDBPruningSuite) getGRPCClient(address string) (accessproto.AccessAPIClient, error) { + conn, err := grpc.Dial(address, grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + return nil, err + } + + client := accessproto.NewAccessAPIClient(conn) + return client, nil +} + +// waitUntilExecutionDataForBlockIndexed waits until the execution data for the specified block height is indexed. +// It subscribes to events from the start height and waits until the execution data for the specified block height is indexed. +func (s *RegisterDBPruningSuite) waitUntilExecutionDataForBlockIndexed(observerNode *testnet.Container, waitingBlockHeight uint64) { + grpcClient, err := s.getGRPCClient(observerNode.Addr(testnet.GRPCPort)) + s.Require().NoError(err) + + // creating execution data api client + client, err := getClient(fmt.Sprintf("localhost:%s", observerNode.Port(testnet.ExecutionStatePort))) + s.Require().NoError(err) + + // pause until the observer node start indexing blocks, + // getting events from 1-nd block to make sure that 1-st block already indexed, and we can start subscribing + s.Require().Eventually(func() bool { + _, err := grpcClient.GetEventsForHeightRange(s.ctx, &accessproto.GetEventsForHeightRangeRequest{ + Type: sdk.EventAccountCreated, + StartHeight: 1, + EndHeight: 1, + EventEncodingVersion: entities.EventEncodingVersion_CCF_V0, + }) + + return err == nil + }, 2*time.Minute, 10*time.Second) + + // subscribe on events till waitingBlockHeight to make sure that execution data for block indexed till waitingBlockHeight and pruner + // pruned execution data at least once + // SubscribeEventsFromStartHeight used as subscription here because we need to make sure that execution data are already indexed + stream, err := client.SubscribeEventsFromStartHeight(s.ctx, &executiondata.SubscribeEventsFromStartHeightRequest{ + EventEncodingVersion: entities.EventEncodingVersion_CCF_V0, + Filter: &executiondata.EventFilter{}, + HeartbeatInterval: 1, + StartBlockHeight: 0, + }) + s.Require().NoError(err) + eventsChan, errChan, err := SubscribeHandler(s.ctx, stream.Recv, eventsResponseHandler) + s.Require().NoError(err) + + duration := 3 * time.Minute + for { + select { + case err := <-errChan: + s.Require().NoErrorf(err, "unexpected %s error", s.observerNodeName) + case event := <-eventsChan: + if event.Height >= waitingBlockHeight { + return + } + case <-time.After(duration): + s.T().Fatalf("failed to index to %d block within %s", waitingBlockHeight, duration.String()) + } + } +} diff --git a/module/metrics.go b/module/metrics.go index 15204afe081..27d8e828fd6 100644 --- a/module/metrics.go +++ b/module/metrics.go @@ -910,6 +910,7 @@ type AccessMetrics interface { TransactionMetrics TransactionValidationMetrics BackendScriptsMetrics + RegisterDBPrunerMetrics // UpdateExecutionReceiptMaxHeight is called whenever we store an execution receipt from a block from a newer height UpdateExecutionReceiptMaxHeight(height uint64) @@ -918,6 +919,11 @@ type AccessMetrics interface { UpdateLastFullBlockHeight(height uint64) } +type RegisterDBPrunerMetrics interface { + // LatestPrunedHeight tracks the latest pruned height of pruning. + LatestPrunedHeight(lastPrunedHeight uint64) +} + type ExecutionResultStats struct { ComputationUsed uint64 MemoryUsed uint64 diff --git a/module/metrics/access.go b/module/metrics/access.go index aacfe316c76..0b8dac8abdb 100644 --- a/module/metrics/access.go +++ b/module/metrics/access.go @@ -34,11 +34,18 @@ func WithRestMetrics(m module.RestMetrics) AccessCollectorOpts { } } +func WithRegisterDBPrunerMetrics(m module.RegisterDBPrunerMetrics) AccessCollectorOpts { + return func(ac *AccessCollector) { + ac.RegisterDBPrunerMetrics = m + } +} + type AccessCollector struct { module.RestMetrics module.TransactionMetrics module.TransactionValidationMetrics module.BackendScriptsMetrics + module.RegisterDBPrunerMetrics connectionReused prometheus.Counter connectionsInPool *prometheus.GaugeVec diff --git a/module/metrics/namespaces.go b/module/metrics/namespaces.go index c0f99af3fcf..cd5ee405760 100644 --- a/module/metrics/namespaces.go +++ b/module/metrics/namespaces.go @@ -47,6 +47,7 @@ const ( subsystemTransactionValidation = "transaction_validation" subsystemConnectionPool = "connection_pool" subsystemHTTP = "http" + subsystemRegisterDBPruner = "register_db_pruner" ) // Observer subsystem diff --git a/module/metrics/noop.go b/module/metrics/noop.go index 3a18e5f418b..5804e7ba42c 100644 --- a/module/metrics/noop.go +++ b/module/metrics/noop.go @@ -42,6 +42,7 @@ var _ module.HotstuffMetrics = (*NoopCollector)(nil) var _ module.EngineMetrics = (*NoopCollector)(nil) var _ module.HeroCacheMetrics = (*NoopCollector)(nil) var _ module.NetworkMetrics = (*NoopCollector)(nil) +var _ module.RegisterDBPrunerMetrics = (*NoopCollector)(nil) func (nc *NoopCollector) Peers(prefix string, n int) {} func (nc *NoopCollector) Wantlist(prefix string, n int) {} @@ -238,6 +239,7 @@ func (nc *NoopCollector) RequestFailed(duration time.Duration, retryable bool) func (nc *NoopCollector) RequestCanceled() {} func (nc *NoopCollector) ResponseDropped() {} func (nc *NoopCollector) Pruned(height uint64, duration time.Duration) {} +func (nc *NoopCollector) LatestPrunedHeight(lastPrunedHeight uint64) {} func (nc *NoopCollector) UpdateCollectionMaxHeight(height uint64) {} func (nc *NoopCollector) BucketAvailableSlots(uint64, uint64) {} func (nc *NoopCollector) OnKeyPutSuccess(uint32) {} diff --git a/module/metrics/registers_db_pruner.go b/module/metrics/registers_db_pruner.go new file mode 100644 index 00000000000..887dcdce0b8 --- /dev/null +++ b/module/metrics/registers_db_pruner.go @@ -0,0 +1,32 @@ +package metrics + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + + "github.com/onflow/flow-go/module" +) + +// RegisterDBPrunerCollector collects metrics for the database pruning process, the latest height that has been pruned +type RegisterDBPrunerCollector struct { + lastPrunedHeight prometheus.Gauge // The last pruned block height. +} + +var _ module.RegisterDBPrunerMetrics = (*RegisterDBPrunerCollector)(nil) + +// NewRegisterDBPrunerCollector creates and returns a new RegisterDBPrunerCollector instance +// with metrics for monitoring the last pruning height. +func NewRegisterDBPrunerCollector() *RegisterDBPrunerCollector { + return &RegisterDBPrunerCollector{ + lastPrunedHeight: promauto.NewGauge(prometheus.GaugeOpts{ + Name: "last_pruned_height", + Namespace: namespaceAccess, + Subsystem: subsystemRegisterDBPruner, + Help: "The last block height up to which data has been pruned.", + }), + } +} + +func (c *RegisterDBPrunerCollector) LatestPrunedHeight(lastPrunedHeight uint64) { + c.lastPrunedHeight.Set(float64(lastPrunedHeight)) +} diff --git a/module/mock/access_metrics.go b/module/mock/access_metrics.go index df3cb8ad8c2..44e68dd3a76 100644 --- a/module/mock/access_metrics.go +++ b/module/mock/access_metrics.go @@ -53,6 +53,11 @@ func (_m *AccessMetrics) ConnectionFromPoolUpdated() { _m.Called() } +// LatestPrunedHeight provides a mock function with given fields: lastPrunedHeight +func (_m *AccessMetrics) LatestPrunedHeight(lastPrunedHeight uint64) { + _m.Called(lastPrunedHeight) +} + // NewConnectionEstablished provides a mock function with given fields: func (_m *AccessMetrics) NewConnectionEstablished() { _m.Called() diff --git a/module/mock/register_db_pruner_metrics.go b/module/mock/register_db_pruner_metrics.go new file mode 100644 index 00000000000..777f608a721 --- /dev/null +++ b/module/mock/register_db_pruner_metrics.go @@ -0,0 +1,29 @@ +// Code generated by mockery v2.43.2. DO NOT EDIT. + +package mock + +import mock "github.com/stretchr/testify/mock" + +// RegisterDBPrunerMetrics is an autogenerated mock type for the RegisterDBPrunerMetrics type +type RegisterDBPrunerMetrics struct { + mock.Mock +} + +// LatestPrunedHeight provides a mock function with given fields: lastPrunedHeight +func (_m *RegisterDBPrunerMetrics) LatestPrunedHeight(lastPrunedHeight uint64) { + _m.Called(lastPrunedHeight) +} + +// NewRegisterDBPrunerMetrics creates a new instance of RegisterDBPrunerMetrics. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewRegisterDBPrunerMetrics(t interface { + mock.TestingT + Cleanup(func()) +}) *RegisterDBPrunerMetrics { + mock := &RegisterDBPrunerMetrics{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/storage/pebble/interval_worker.go b/storage/pebble/interval_worker.go new file mode 100644 index 00000000000..0c9461ca284 --- /dev/null +++ b/storage/pebble/interval_worker.go @@ -0,0 +1,34 @@ +package pebble + +import ( + "context" + "time" +) + +// IntervalWorker runs a periodic task with support for context cancellation. +type IntervalWorker struct { + interval time.Duration +} + +// NewIntervalWorker initializes a new IntervalWorker. +func NewIntervalWorker(interval time.Duration) *IntervalWorker { + return &IntervalWorker{ + interval: interval, + } +} + +// Run starts the worker and calls the provided function periodically. +// It stops and returns if the context is canceled. +func (iw *IntervalWorker) Run(ctx context.Context, f func()) { + timer := time.NewTicker(iw.interval) + defer timer.Stop() + for { + select { + case <-ctx.Done(): + // Stop if the context is canceled + return + case <-timer.C: + f() + } + } +} diff --git a/storage/pebble/operation/common.go b/storage/pebble/operation/common.go index ad9e96c2c8b..7dfea6c3276 100644 --- a/storage/pebble/operation/common.go +++ b/storage/pebble/operation/common.go @@ -42,6 +42,7 @@ func retrieve(key []byte, sc interface{}) func(r pebble.Reader) error { } } +// convertNotFoundError converts pebble NotFound error to storage NotFound error func convertNotFoundError(err error) error { if errors.Is(err, pebble.ErrNotFound) { return storage.ErrNotFound diff --git a/storage/pebble/operation/registers.go b/storage/pebble/operation/registers.go new file mode 100644 index 00000000000..e50e2f03e80 --- /dev/null +++ b/storage/pebble/operation/registers.go @@ -0,0 +1,17 @@ +package operation + +import ( + "encoding/binary" + + "github.com/cockroachdb/pebble" +) + +func RetrieveRegisterHeight(db *pebble.DB, key []byte) (uint64, error) { + res, closer, err := db.Get(key) + if err != nil { + return 0, convertNotFoundError(err) + } + defer closer.Close() + + return binary.BigEndian.Uint64(res), nil +} diff --git a/storage/pebble/register_pruner_run.go b/storage/pebble/register_pruner_run.go new file mode 100644 index 00000000000..a0245072871 --- /dev/null +++ b/storage/pebble/register_pruner_run.go @@ -0,0 +1,147 @@ +package pebble + +import ( + "fmt" + + "github.com/cockroachdb/pebble" + "github.com/rs/zerolog" + + "github.com/onflow/flow-go/model/flow" +) + +// RegisterPrunerRun handles the pruning of outdated register entries from a Pebble database. +// It removes entries older than a specified prune height, ensuring efficient batch deletions. +// This struct also tracks the total number of keys pruned during its run. +type RegisterPrunerRun struct { + logger zerolog.Logger + pruneHeight uint64 + + db *pebble.DB + lastRegisterID flow.RegisterID // The last processed register ID, used for deduplication. + keepFirstRelevantKey bool // Flag to indicate if the first relevant key is kept. + + totalKeysPruned int // Counter for the total number of pruned keys. +} + +// NewRegisterPrunerRun initializes a new RegisterPrunerRun instance. +// +// Parameters: +// - db: The Pebble database instance to prune registers from. +// - logger: Logger instance for logging pruning events and errors. +// - pruneHeight: The height below which entries should be pruned. +// +// Returns: +// - A pointer to the newly created RegisterPrunerRun instance, ready to execute pruning operations. +// +// Example usage: +// +// pruner := NewRegisterPrunerRun(db, logger, 10000) +func NewRegisterPrunerRun(db *pebble.DB, logger zerolog.Logger, pruneHeight uint64) *RegisterPrunerRun { + return &RegisterPrunerRun{ + logger: logger, + pruneHeight: pruneHeight, + db: db, + totalKeysPruned: 0, + } +} + +// BatchDelete removes the specified keys from the database in a single batch operation. +// This method optimizes deletion by grouping multiple delete operations into a single +// batch, which improves performance and reduces the load on the database. +// +// Parameters: +// - ctx: The context for managing the operation, including cancellation. +// - lookupKeys: A slice of keys to delete from the database. +// +// No errors are expected during normal operations. +func (p *RegisterPrunerRun) BatchDelete(lookupKeys [][]byte) error { + dbBatch := p.db.NewBatch() + defer func() { + if cerr := dbBatch.Close(); cerr != nil { + p.logger.Err(cerr).Msg("error while closing the db batch") + } + }() + + for _, key := range lookupKeys { + if err := dbBatch.Delete(key, nil); err != nil { + keyHeight, registerID, _ := lookupKeyToRegisterID(key) + return fmt.Errorf("failed to delete lookupKey (height: %d, registerID: %v): %w", keyHeight, registerID, err) + } + } + + if err := dbBatch.Commit(pebble.Sync); err != nil { + return fmt.Errorf("failed to commit batch: %w", err) + } + + p.totalKeysPruned += len(lookupKeys) + + return nil +} + +// CanPruneKey evaluates if a given key can be pruned based on its height and +// register ID. This function ensures that only the earliest relevant key for +// each register ID is retained, which helps to preserve the latest state. +// +// Function Behavior: +// 1. Identifies the height and register ID of the key. +// 2. Checks if the key’s height is greater than the prune threshold (in which case it cannot be pruned). +// 3. Ensures that for each unique register ID, only the first key below or equal to the prune threshold +// is retained as the latest necessary state, while all others are marked for pruning. +// +// Parameters: +// - key: The key to evaluate for pruning eligibility. +// +// No errors are expected during normal operations. +func (p *RegisterPrunerRun) CanPruneKey(key []byte) (bool, error) { + keyHeight, registerID, err := lookupKeyToRegisterID(key) + if err != nil { + return false, fmt.Errorf("malformed lookup key %v: %w", key, err) + } + + // If the register ID changes, reset the state, allowing for a fresh assessment of this new register's keys. + // For example if key changed from 0x01/key/owner1 to 0x01/key/owner2 there is another bucket, and the state should + // be reset to check keys in a scope of this bucket. + if p.lastRegisterID != registerID { + p.keepFirstRelevantKey = false + p.lastRegisterID = registerID + } + + // If the height of the key is above the prune height, it should not be pruned. For example, if the prune height is 99989, + // entries with a height greater than 99989 should definitely be kept. + if keyHeight > p.pruneHeight { + return false, nil + } + // For each unique register ID, keep only the first key below or equal to the prune threshold. + // This first key is considered the minimum viable state to retain, and any further keys for the same + // register ID can be safely pruned. For example, if pruneHeight is 99989: + // [0x01/key/owner1/99990] [> 99989] + // [0x01/key/owner1/99988] [first key to keep, < 99989] + // [0x01/key/owner1/85000] [pruned] + // ... + // [0x01/key/owner2/99989] [first key to keep, == 99989] + // [0x01/key/owner2/99988] [pruned] + // ... + // [0x01/key/owner3/99988] [first key to keep, < 99989] + // [0x01/key/owner3/98001] [pruned] + // ... + // [0x02/key/owner0/99900] [first key to keep, < 99989] + if !p.keepFirstRelevantKey { + p.keepFirstRelevantKey = true + return false, nil + } + + return true, nil +} + +// TotalKeysPruned returns the total number of keys that have been pruned during +// this run of the RegisterPrunerRun instance. +// +// Returns: +// - An integer representing the total count of keys that were pruned. +// +// Example usage: +// +// totalPruned := pruner.TotalKeysPruned() +func (p *RegisterPrunerRun) TotalKeysPruned() int { + return p.totalKeysPruned +} diff --git a/storage/pebble/registers.go b/storage/pebble/registers.go index 2f5c78e8ff1..82093eacafc 100644 --- a/storage/pebble/registers.go +++ b/storage/pebble/registers.go @@ -1,16 +1,15 @@ package pebble import ( - "encoding/binary" "fmt" "math" "github.com/cockroachdb/pebble" - "github.com/pkg/errors" "go.uber.org/atomic" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/storage" + "github.com/onflow/flow-go/storage/pebble/operation" ) // Registers library that implements pebble storage for registers @@ -185,26 +184,9 @@ func (s *Registers) calculateFirstHeight(latestHeight uint64) uint64 { } func firstStoredHeight(db *pebble.DB) (uint64, error) { - return heightLookup(db, firstHeightKey) + return operation.RetrieveRegisterHeight(db, firstHeightKey) } func latestStoredHeight(db *pebble.DB) (uint64, error) { - return heightLookup(db, latestHeightKey) -} - -func heightLookup(db *pebble.DB, key []byte) (uint64, error) { - res, closer, err := db.Get(key) - if err != nil { - return 0, convertNotFoundError(err) - } - defer closer.Close() - return binary.BigEndian.Uint64(res), nil -} - -// convert pebble NotFound error to storage NotFound error -func convertNotFoundError(err error) error { - if errors.Is(err, pebble.ErrNotFound) { - return storage.ErrNotFound - } - return err + return operation.RetrieveRegisterHeight(db, latestHeightKey) } diff --git a/storage/pebble/registers_pruner.go b/storage/pebble/registers_pruner.go new file mode 100644 index 00000000000..150b2c348c7 --- /dev/null +++ b/storage/pebble/registers_pruner.go @@ -0,0 +1,274 @@ +package pebble + +import ( + "bytes" + "context" + "errors" + "fmt" + "time" + + "github.com/cockroachdb/pebble" + "github.com/rs/zerolog" + + "github.com/onflow/flow-go/module" + "github.com/onflow/flow-go/module/component" + "github.com/onflow/flow-go/module/irrecoverable" +) + +const ( + // DefaultPruneThreshold defines the default number of blocks to retain below the latest block height. Blocks below + // this threshold will be considered for pruning if they exceed the PruneInterval. + DefaultPruneThreshold = uint64(100_000) + + // DefaultPruneThrottleDelay is the default delay between each batch of keys inspected and pruned. This helps + // to reduce the load on the database during pruning. + DefaultPruneThrottleDelay = 10 * time.Millisecond + + // DefaultPruneTickerInterval is the default interval between consecutive pruning checks. Pruning will be triggered + // at this interval if conditions are met. + DefaultPruneTickerInterval = 10 * time.Minute +) + +// TODO: This configuration should be changed after testing it with real network data for better performance. +const ( + // pruneIntervalRatio represents an additional percentage of pruneThreshold which is used to calculate PruneInterval + // Pruning will start if there are more than `(1 + pruneIntervalRatio) * pruneThreshold` unpruned blocks + pruneIntervalRatio = 0.1 // 10% + // deleteItemsPerBatch defines the number of database keys to delete in each batch operation. + // This value is used to control the size of deletion operations during pruning. + deleteItemsPerBatch = 256 +) + +// PruneInterval calculates the interval at which pruning is triggered based on the given pruneThreshold and +// the pruneIntervalRatio. +func PruneInterval(threshold uint64) uint64 { + return threshold + uint64(float64(threshold)*pruneIntervalRatio) +} + +// RegisterPruner manages the pruning process for register storage, handling +// threshold-based deletion of old data from the Pebble DB to optimize storage use. +type RegisterPruner struct { + component.Component + + logger zerolog.Logger + db *pebble.DB + + metrics module.RegisterDBPrunerMetrics + + // pruningInterval is a number of pruned blocks in the db, above which pruning should be triggered + pruneInterval uint64 + // threshold defines the number of blocks below the latestHeight to keep + pruneThreshold uint64 + // pruneThrottleDelay controls a small pause between batches of registers inspected and pruned + pruneThrottleDelay time.Duration + // pruneTickerInterval defines how frequently pruning can be performed + pruneTickerInterval time.Duration +} + +// PrunerOption is a functional option used to configure a RegisterPruner instance. +type PrunerOption func(*RegisterPruner) + +// WithPruneThreshold configures the RegisterPruner with a custom threshold. The pruneThreshold sets the number of +// blocks below the latest height to keep. +func WithPruneThreshold(threshold uint64) PrunerOption { + return func(p *RegisterPruner) { + p.pruneThreshold = threshold + p.pruneInterval = PruneInterval(threshold) + } +} + +// WithPruneThrottleDelay configures the RegisterPruner with a custom delay between batches of keys inspected and pruned, +// reducing load on the database. +func WithPruneThrottleDelay(throttleDelay time.Duration) PrunerOption { + return func(p *RegisterPruner) { + p.pruneThrottleDelay = throttleDelay + } +} + +// WithPruneTickerInterval configures the RegisterPruner with a custom interval between consecutive pruning checks. +func WithPruneTickerInterval(interval time.Duration) PrunerOption { + return func(p *RegisterPruner) { + p.pruneTickerInterval = interval + } +} + +// NewRegisterPruner creates and initializes a new RegisterPruner instance with the specified logger, database connection, +// and optional configurations provided via PrunerOptions. This sets up the pruning component and returns an error if +// any issues occur. +func NewRegisterPruner( + logger zerolog.Logger, + db *pebble.DB, + metrics module.RegisterDBPrunerMetrics, + opts ...PrunerOption, +) (*RegisterPruner, error) { + pruner := &RegisterPruner{ + logger: logger.With().Str("component", "registerdb_pruner").Logger(), + db: db, + pruneInterval: PruneInterval(DefaultPruneThreshold), + pruneThreshold: DefaultPruneThreshold, + pruneThrottleDelay: DefaultPruneThrottleDelay, + pruneTickerInterval: DefaultPruneTickerInterval, + metrics: metrics, + } + + pruner.Component = component.NewComponentManagerBuilder(). + AddWorker(pruner.loop). + Build() + + for _, opt := range opts { + opt(pruner) + } + + return pruner, nil +} + +// loop is the main worker for the Pruner, responsible for triggering +// pruning operations at regular intervals. It monitors the heights +// of registered height recorders and checks if pruning is necessary. +func (p *RegisterPruner) loop(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) { + ready() + + worker := NewIntervalWorker(p.pruneTickerInterval) + + worker.Run(ctx, func() { + if err := p.checkPrune(ctx); err != nil { + ctx.Throw(err) + } + }) +} + +// checkPrune checks if pruning should be performed based on the height range +// and triggers the pruning operation if necessary. +// +// Parameters: +// - ctx: The context for managing the pruning throttle delay operation. +// +// No errors are expected during normal operations. +func (p *RegisterPruner) checkPrune(ctx context.Context) error { + firstHeight, err := firstStoredHeight(p.db) + if err != nil { + return fmt.Errorf("failed to get first height from register storage: %w", err) + } + + latestHeight, err := latestStoredHeight(p.db) + if err != nil { + return fmt.Errorf("failed to get latest height from register storage: %w", err) + } + + if firstHeight > latestHeight { + return errors.New("the latest height must be greater than the first height") + } + + if latestHeight-firstHeight <= p.pruneInterval { + return nil + } + + pruneHeight := latestHeight - p.pruneThreshold + p.logger.Info().Uint64("prune_height", pruneHeight).Msg("pruning storage") + + if err = p.pruneUpToHeight(ctx, p.db, pruneHeight); err != nil { + return fmt.Errorf("failed to prune: %w", err) + } + + return nil +} + +// pruneUpToHeight prunes all entries in the database with heights less than or equal +// to the specified pruneHeight. For each register prefix, it keeps the earliest entry +// that has a height less than or equal to pruneHeight, and deletes all other entries +// with lower heights. +// +// This function iterates over the database keys, identifies keys to delete in batches, +// and uses the batchDelete function to remove them efficiently. +// +// Parameters: +// - ctx: The context for managing the pruning throttle delay operation. +// - pruneHeight: The maximum height of entries to prune. +// +// No errors are expected during normal operations. +func (p *RegisterPruner) pruneUpToHeight(ctx context.Context, r pebble.Reader, pruneHeight uint64) error { + start := time.Now() + + // first, update firstHeight in the db + // this ensures that if the node crashes during pruning, there will still be a consistent + // view when the node starts up. Subsequent prunes will remove any leftover data. + if err := p.updateFirstStoredHeight(pruneHeight); err != nil { + return fmt.Errorf("failed to update first height for register storage: %w", err) + } + + dbPruner := NewRegisterPrunerRun(p.db, p.logger, pruneHeight) + + prefix := []byte{codeRegister} + it, err := r.NewIter(&pebble.IterOptions{ + LowerBound: prefix, + UpperBound: []byte{codeRegister + 1}, + }) + if err != nil { + return fmt.Errorf("cannot create iterator: %w", err) + } + + defer func() { + if cerr := it.Close(); cerr != nil { + p.logger.Err(cerr).Msg("error while closing the iterator") + } + }() + + var batchKeysToRemove [][]byte + + for it.SeekGE(prefix); it.Valid(); it.Next() { + key := it.Key() + + canPruneKey, err := dbPruner.CanPruneKey(key) + if err != nil { + return fmt.Errorf("cannot check key %v for pruning, reason: %w", key, err) + } + + if !canPruneKey { + continue + } + + // Create a copy of the key to avoid memory issues + batchKeysToRemove = append(batchKeysToRemove, bytes.Clone(key)) + + if len(batchKeysToRemove) >= deleteItemsPerBatch { + // Perform batch delete + if err := dbPruner.BatchDelete(batchKeysToRemove); err != nil { + return err + } + + // Reset batchKeysToRemove to empty slice while retaining capacity + batchKeysToRemove = batchKeysToRemove[:0] + + // Throttle to prevent excessive system load + <-time.After(p.pruneThrottleDelay) + } + } + + if len(batchKeysToRemove) > 0 { + // Perform the final batch delete if there are any remaining keys + if err := dbPruner.BatchDelete(batchKeysToRemove); err != nil { + return err + } + } + + p.logger.Info(). + Uint64("height", pruneHeight). + Int("keys_pruned", dbPruner.TotalKeysPruned()). + Dur("duration_ms", time.Since(start)). + Msg("pruning complete") + + p.metrics.LatestPrunedHeight(pruneHeight) + + return nil +} + +// updateFirstStoredHeight updates the first stored height in the database to the specified height. +// The height is stored using the `firstHeightKey` key. +// +// Parameters: +// - height: The height value to store as the first stored height. +// +// No errors are expected during normal operations. +func (p *RegisterPruner) updateFirstStoredHeight(height uint64) error { + return p.db.Set(firstHeightKey, encodedUint64(height), pebble.Sync) +} diff --git a/storage/pebble/registers_pruner_test.go b/storage/pebble/registers_pruner_test.go new file mode 100644 index 00000000000..10527cab80e --- /dev/null +++ b/storage/pebble/registers_pruner_test.go @@ -0,0 +1,535 @@ +package pebble + +import ( + "context" + "fmt" + "testing" + "time" + + "golang.org/x/exp/rand" + + "github.com/cockroachdb/pebble" + "github.com/rs/zerolog" + "github.com/stretchr/testify/require" + + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/module/irrecoverable" + "github.com/onflow/flow-go/module/metrics" + "github.com/onflow/flow-go/utils/unittest" +) + +const ( + latestHeight uint64 = 12 + pruneHeight uint64 = 7 + pruneThreshold uint64 = 5 +) + +// testCase defines the structure for a single test case, including initial data setup, +// expected data after pruning, and the data that should be pruned from the database. +type testCase struct { + name string + // The initial register data for the test. + initialData map[uint64]flow.RegisterEntries + // The expected first register height in the database after pruning. + expectedFirstHeight uint64 + // The data that is expected to be present in the database after pruning. + expectedData map[uint64]flow.RegisterEntries + // The data that should be pruned (i.e., removed) from the database after pruning. + prunedData map[uint64]flow.RegisterEntries +} + +// TestPrune validates the pruning functionality of the RegisterPruner. +// +// It runs multiple test cases, each of which initializes the database with specific +// register entries and then verifies that the pruning operation behaves as expected. +// The test cases check that: +// - Register entries below a certain height are pruned (i.e., removed) from the database. +// - The remaining data in the database matches the expected state after pruning. +// - The first height of the register entries in the database is correct after pruning. +// +// The test cases include: +// - Straight pruning, where register entries are pruned up to a specific height. +// - Pruning with different entries at varying heights, ensuring only the correct entries are kept. +func TestPrune(t *testing.T) { + // Set up the test case with initial data, expected outcomes, and pruned data. + straightPruneData := straightPruneTestCase() + testCaseWithDiffData := testCaseWithDiffHeights() + + tests := []testCase{ + { + name: "straight pruning to a pruned height", + initialData: straightPruneData.initialData, + expectedFirstHeight: straightPruneData.expectedFirstHeight, + expectedData: straightPruneData.expectedData, + prunedData: straightPruneData.prunedData, + }, + { + name: "pruning with different entries to keep", + initialData: testCaseWithDiffData.initialData, + expectedFirstHeight: testCaseWithDiffData.expectedFirstHeight, + expectedData: testCaseWithDiffData.expectedData, + prunedData: testCaseWithDiffData.prunedData, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Run the test with the provided initial data and Register storage + RunWithRegistersStorageWithInitialData(t, tt.initialData, func(db *pebble.DB) { + pruner, err := NewRegisterPruner( + zerolog.Nop(), + db, + metrics.NewNoopCollector(), + WithPruneThreshold(pruneThreshold), + WithPruneTickerInterval(10*time.Millisecond), + ) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + signalerCtx, errChan := irrecoverable.WithSignaler(ctx) + + // Start the pruning process + pruner.Start(signalerCtx) + + // Ensure pruning happens and the first height after pruning is as expected. + requirePruning(t, db, tt.expectedFirstHeight) + + // Clean up pruner and check for any errors. + cleanupPruner(t, pruner, cancel, errChan) + + // Verify that the data in the database matches the expected and pruned data. + verifyData(t, db, tt) + }) + }) + } +} + +// TestRemainingValuesAfterPruning tests that pruning does not affect values above the pruned height. +// +// This test covers scenarios where: +// - The value entries in the database remains the same after pruning process +func TestRemainingValuesAfterPruning(t *testing.T) { + // Generate random initial data across a range of heights. + testData := generateRandomRegisterData() + + RunWithRegistersStorageWithInitialData(t, testData.initialData, func(db *pebble.DB) { + pruner, err := NewRegisterPruner( + zerolog.Nop(), + db, + metrics.NewNoopCollector(), + WithPruneThreshold(pruneThreshold), + WithPruneTickerInterval(10*time.Millisecond), + ) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + signalerCtx, errChan := irrecoverable.WithSignaler(ctx) + + // Start the pruning process. + pruner.Start(signalerCtx) + + // Wait until pruning completes and verify that values above prune height remain unchanged. + requirePruning(t, db, testData.expectedFirstHeight) + + // Clean up pruner and check for any errors. + cleanupPruner(t, pruner, cancel, errChan) + + // Check that all heights above the prune height have unchanged values. + for height := pruneHeight + 1; height <= latestHeight; height++ { + entries, exists := testData.expectedData[height] + if !exists { + continue + } + + for _, entry := range entries { + val, closer, err := db.Get(newLookupKey(height, entry.Key).Bytes()) + require.NoError(t, err) + require.Equal(t, entry.Value, val) + require.NoError(t, closer.Close()) + } + } + }) +} + +// TestPruningInterruption tests that interrupted pruning does not affect any register values. +// +// This test covers scenarios where: +// - The all values entries in the database remains the same when pruning was interrupted in the middle of the process +func TestPruningInterruption(t *testing.T) { + // Generate random initial data across a range of heights. + testData := generateRandomRegisterData() + + RunWithRegistersStorageWithInitialData(t, testData.initialData, func(db *pebble.DB) { + pruner, err := NewRegisterPruner( + zerolog.Nop(), + db, + metrics.NewNoopCollector(), + WithPruneThreshold(pruneThreshold), + WithPruneTickerInterval(10*time.Millisecond), + ) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + signalerCtx, errChan := irrecoverable.WithSignaler(ctx) + + // Start the pruning process but cancel it immediately to simulate interruption. + go pruner.Start(signalerCtx) + + cleanupPruner(t, pruner, cancel, errChan) + + // Verify that all values remain unchanged after the interrupted pruning attempt. + for height, entries := range testData.initialData { + for _, entry := range entries { + val, closer, err := db.Get(newLookupKey(height, entry.Key).Bytes()) + require.NoError(t, err) + require.Equal(t, entry.Value, val) + require.NoError(t, closer.Close()) + } + } + }) +} + +// TestPruneErrors checks the error handling behavior of the RegisterPruner when certain +// conditions cause failures during the pruning process. +// +// This test covers scenarios where: +// - The first stored height in the database cannot be retrieved, simulating a failure to locate it. +// - The latest height in the database cannot be retrieved, simulating a missing entry. +// +// The tests ensure that the RegisterPruner handles these error conditions correctly by +// triggering the appropriate irrecoverable errors and shutting down gracefully. +func TestPruneErrors(t *testing.T) { + t.Run("not found first height", func(t *testing.T) { + unittest.RunWithTempDir(t, func(dir string) { + // Run the test with the Register storage + db, err := OpenRegisterPebbleDB(dir) + require.NoError(t, err) + + pruner, err := NewRegisterPruner( + zerolog.Nop(), + db, + metrics.NewNoopCollector(), + WithPruneThreshold(pruneThreshold), + WithPruneTickerInterval(10*time.Millisecond), + ) + require.NoError(t, err) + + err = fmt.Errorf("key not found") + signCtxErr := fmt.Errorf("failed to get first height from register storage: %w", err) + ctx := irrecoverable.NewMockSignalerContextExpectError(t, context.Background(), signCtxErr) + + // Start the pruning process + pruner.Start(ctx) + + unittest.AssertClosesBefore(t, pruner.Done(), 2*time.Second) + }) + }) + + t.Run("not found latest height", func(t *testing.T) { + unittest.RunWithTempDir(t, func(dir string) { + // Run the test with the Register storage + db, err := OpenRegisterPebbleDB(dir) + require.NoError(t, err) + + // insert initial first height to pebble + require.NoError(t, db.Set(firstHeightKey, encodedUint64(1), nil)) + + pruner, err := NewRegisterPruner( + zerolog.Nop(), + db, + metrics.NewNoopCollector(), + WithPruneThreshold(pruneThreshold), + WithPruneTickerInterval(10*time.Millisecond), + ) + require.NoError(t, err) + + err = fmt.Errorf("key not found") + signCtxErr := fmt.Errorf("failed to get latest height from register storage: %w", err) + ctx := irrecoverable.NewMockSignalerContextExpectError(t, context.Background(), signCtxErr) + + // Start the pruning process + pruner.Start(ctx) + + unittest.AssertClosesBefore(t, pruner.Done(), 2*time.Second) + }) + }) +} + +// requirePruning checks if the first stored height in the database matches the expected height after pruning. +func requirePruning(t *testing.T, db *pebble.DB, expectedFirstHeightAfterPruning uint64) { + require.Eventually(t, func() bool { + actualFirstHeight, err := firstStoredHeight(db) + require.NoError(t, err) + return expectedFirstHeightAfterPruning == actualFirstHeight + }, 2*time.Second, 15*time.Millisecond) +} + +// cleanupPruner stops the pruner and verifies there are no errors in the error channel. +func cleanupPruner(t *testing.T, pruner *RegisterPruner, cancel context.CancelFunc, errChan <-chan error) { + cancel() + <-pruner.Done() + + select { + case err := <-errChan: + require.NoError(t, err) + default: + } +} + +// straightPruneTestCase initializes and returns a testCase with predefined data for straight pruning. +func straightPruneTestCase() testCase { + initialData := emptyRegistersData(latestHeight) + + key1 := flow.RegisterID{Owner: "owner1", Key: "key1"} + key2 := flow.RegisterID{Owner: "owner2", Key: "key2"} + key3 := flow.RegisterID{Owner: "owner3", Key: "key3"} + + value1 := []byte("value1") + value2 := []byte("value2") + value3 := []byte("value3") + + // Set up initial register entries for different heights. + initialData[3] = flow.RegisterEntries{ + {Key: key1, Value: value1}, + {Key: key2, Value: value2}, + } + initialData[4] = flow.RegisterEntries{ + {Key: key2, Value: value2}, + {Key: key3, Value: value3}, + } + initialData[7] = flow.RegisterEntries{ + {Key: key1, Value: value1}, + {Key: key2, Value: value2}, + {Key: key3, Value: value3}, + } + initialData[8] = flow.RegisterEntries{ + {Key: key1, Value: value1}, + {Key: key3, Value: value3}, + } + initialData[9] = flow.RegisterEntries{ + {Key: key2, Value: value2}, + {Key: key3, Value: value3}, + } + initialData[10] = flow.RegisterEntries{ + {Key: key1, Value: value1}, + {Key: key2, Value: value2}, + } + initialData[12] = flow.RegisterEntries{ + {Key: key1, Value: value1}, + {Key: key3, Value: value3}, + } + + // Define the expected data after pruning. + expectedData := map[uint64]flow.RegisterEntries{ + 7: { + {Key: key1, Value: value1}, // keep, first row <= 7 + {Key: key2, Value: value2}, // keep, first row <= 7 + {Key: key3, Value: value3}, // keep, first row <= 7 + }, + 8: { + {Key: key1, Value: value1}, // keep, height > 7 + {Key: key3, Value: value3}, // keep, height > 7 + }, + 9: { + {Key: key2, Value: value2}, // keep, height > 7 + {Key: key3, Value: value3}, // keep, height > 7 + }, + 10: { + {Key: key1, Value: value1}, // keep, height > 7 + {Key: key2, Value: value2}, // keep, height > 7 + }, + 12: { + {Key: key1, Value: value1}, // keep, height > 7 + {Key: key3, Value: value3}, // keep, height > 7 + }, + } + + // Define the data that should be pruned (i.e., removed) from the database after pruning. + prunedData := map[uint64]flow.RegisterEntries{ + 3: { + {Key: key1, Value: value1}, + {Key: key2, Value: value2}, + }, + 4: { + {Key: key2, Value: value2}, + {Key: key3, Value: value3}, + }, + } + + return testCase{ + initialData: initialData, + expectedFirstHeight: pruneHeight, + expectedData: expectedData, + prunedData: prunedData, + } +} + +// testCaseWithDiffHeights initializes and returns a testCase with predefined data for different entries to keep +func testCaseWithDiffHeights() testCase { + initialData := emptyRegistersData(latestHeight) + + key1 := flow.RegisterID{Owner: "owner1", Key: "key1"} + key2 := flow.RegisterID{Owner: "owner2", Key: "key2"} + key3 := flow.RegisterID{Owner: "owner3", Key: "key3"} + + value1 := []byte("value1") + value2 := []byte("value2") + value3 := []byte("value3") + + // Set up initial register entries for different heights. + initialData[1] = flow.RegisterEntries{ + {Key: key1, Value: value1}, + {Key: key2, Value: value2}, + {Key: key3, Value: value3}, + } + initialData[2] = flow.RegisterEntries{ + {Key: key1, Value: value1}, + {Key: key2, Value: value2}, + } + initialData[5] = flow.RegisterEntries{ + {Key: key1, Value: value1}, + {Key: key3, Value: value3}, + } + initialData[6] = flow.RegisterEntries{ + {Key: key1, Value: value1}, + {Key: key2, Value: value2}, + } + initialData[10] = flow.RegisterEntries{ + {Key: key1, Value: value1}, + {Key: key2, Value: value2}, + {Key: key3, Value: value3}, + } + initialData[11] = flow.RegisterEntries{ + {Key: key1, Value: value1}, + {Key: key3, Value: value3}, + } + + initialData[12] = flow.RegisterEntries{ + {Key: key1, Value: value1}, + {Key: key3, Value: value3}, + } + + // Define the expected data after pruning. + expectedData := map[uint64]flow.RegisterEntries{ + 5: { + {Key: key3, Value: value3}, // keep, first row <= 7 + }, + 6: { + {Key: key1, Value: value1}, // keep, first row <= 7 + {Key: key2, Value: value2}, // keep, first row <= 7 + }, + 10: { + {Key: key1, Value: value1}, // keep, height > 7 + {Key: key2, Value: value2}, // keep, height > 7 + {Key: key3, Value: value3}, // keep, height > 7 + }, + 11: { + {Key: key1, Value: value1}, // keep, height > 7 + {Key: key3, Value: value3}, // keep, height > 7 + }, + 12: { + {Key: key1, Value: value1}, // keep, height > 7 + {Key: key3, Value: value3}, // keep, height > 7 + }, + } + + // Define the data that should be pruned (i.e., removed) from the database after pruning. + prunedData := map[uint64]flow.RegisterEntries{ + 1: { + {Key: key1, Value: value1}, + {Key: key2, Value: value2}, + {Key: key3, Value: value3}, + }, + 2: { + {Key: key1, Value: value1}, + {Key: key2, Value: value2}, + }, + 5: { + {Key: key1, Value: value1}, + }, + } + + return testCase{ + initialData: initialData, + expectedFirstHeight: pruneHeight, + expectedData: expectedData, + prunedData: prunedData, + } +} + +// generateRandomRegisterData generates random register entries up to a given number of heights. +func generateRandomRegisterData() testCase { + initialData := emptyRegistersData(latestHeight) + + keys := []flow.RegisterID{ + {Owner: "owner1", Key: "key1"}, + {Owner: "owner2", Key: "key2"}, + {Owner: "owner3", Key: "key3"}, + } + + values := [][]byte{ + []byte("value1"), + []byte("value2"), + []byte("value3"), + } + + rand.Seed(uint64(time.Now().UnixNano())) + expectedData := make(map[uint64]flow.RegisterEntries) + + for height := uint64(1); height <= latestHeight; height++ { + var entries flow.RegisterEntries + + // Randomly assign register entries for each height. + for j, key := range keys { + if rand.Intn(2) == 0 { + entries = append(entries, flow.RegisterEntry{ + Key: key, + Value: values[j], + }) + } + } + + initialData[height] = entries + if height > pruneHeight { + expectedData[height] = entries + } + } + + return testCase{ + initialData: initialData, + expectedFirstHeight: pruneHeight, + expectedData: expectedData, + } +} + +// emptyRegistersData initializes an empty map for storing register entries. +func emptyRegistersData(count uint64) map[uint64]flow.RegisterEntries { + data := make(map[uint64]flow.RegisterEntries, count) + for i := uint64(1); i <= count; i++ { + data[i] = flow.RegisterEntries{} + } + + return data +} + +// verifyData verifies that the data in the database matches the expected and pruned data after pruning. +func verifyData(t *testing.T, + db *pebble.DB, + data testCase, +) { + for height, entries := range data.expectedData { + for _, entry := range entries { + val, closer, err := db.Get(newLookupKey(height, entry.Key).Bytes()) + require.NoError(t, err) + require.Equal(t, entry.Value, val) + require.NoError(t, closer.Close()) + } + } + + for height, entries := range data.prunedData { + for _, entry := range entries { + _, _, err := db.Get(newLookupKey(height, entry.Key).Bytes()) + require.ErrorIs(t, err, pebble.ErrNotFound) + } + } +} diff --git a/storage/pebble/testutil.go b/storage/pebble/testutil.go index 0d890fef85d..9f3f68bb9a8 100644 --- a/storage/pebble/testutil.go +++ b/storage/pebble/testutil.go @@ -1,11 +1,13 @@ package pebble import ( + "sort" "testing" "github.com/cockroachdb/pebble" "github.com/stretchr/testify/require" + "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/utils/unittest" ) @@ -30,3 +32,59 @@ func NewBootstrappedRegistersWithPathForTest(tb testing.TB, dir string, first, l require.NoError(tb, db.Set(latestHeightKey, encodedUint64(latest), nil)) return db } + +// RunWithRegistersStorageWithInitialData sets up a temporary register storage database +// initialized with the provided map of register entries and runs the provided test function `f`. +// +// This function creates a temporary database and initializes it with register entries at +// specific heights as defined in the `data` map. After initializing the database, it runs +// the test function `f` with the database instance and closes the database when done. +// +// Parameters: +// - tb: The testing object used to manage test state and report errors. +// - data: A map of heights to `RegisterEntries` that should be stored in the database at each height. +// - f: A function to execute with the initialized `pebble.DB` instance. +// +// Example usage: +// +// data := map[uint64]flow.RegisterEntries{ +// 100: flow.RegisterEntries { +// {Key: "key1", Value: []byte("value1")}, +// {Key: "key2", Value: []byte("value2")}, +// }, +// 200: flow.RegisterEntries { +// {Key: "key1", Value: []byte("value1")}, +// {Key: "key3", Value: []byte("value3")}, +// }, +// } +// RunWithRegistersStorageWithInitialData(t, data, func(db *pebble.DB) { +// // Perform test operations with `db` +// }) +func RunWithRegistersStorageWithInitialData( + tb testing.TB, + data map[uint64]flow.RegisterEntries, + f func(db *pebble.DB), +) { + unittest.RunWithTempDir(tb, func(dir string) { + db := NewBootstrappedRegistersWithPathForTest(tb, dir, uint64(0), uint64(0)) + registers, err := NewRegisters(db, 5) + require.NoError(tb, err) + + heights := make([]uint64, 0, len(data)) + for h := range data { + heights = append(heights, h) + } + // Should sort heights before store them through register, as they are not stored in the test data map + sort.Slice(heights, func(i, j int) bool { return heights[i] < heights[j] }) + + // Iterate over the heights in ascending order and store keys in DB through registers + for _, height := range heights { + err = registers.Store(data[height], height) + require.NoError(tb, err) + } + + f(db) + + require.NoError(tb, db.Close()) + }) +}