diff --git a/config/protocol.testnet.yml b/config/protocol.testnet.yml index 0ef166253b..9a31251a2b 100644 --- a/config/protocol.testnet.yml +++ b/config/protocol.testnet.yml @@ -107,8 +107,9 @@ ApplicationConfiguration: Timeout: 10m DownloaderWorkersCount: 500 OIDBatchSize: 8000 - BQueueSize: 32000 # must be larger than OIDBatchSize; highly recommended to be 2*OIDBatchSize or 3*OIDBatchSize + BQueueSize: 16000 # must be larger than OIDBatchSize; recommended to be 2*OIDBatchSize or 3*OIDBatchSize SkipIndexFilesSearch: false + IndexFileSize: 128000 ContainerID: "EPGuD26wYgQJbmDdVBoYoNZiMKHwFMJT3A5WqPjdUHxH" BlockAttribute: "block" IndexFileAttribute: "oid" diff --git a/docs/neofs-blockstorage.md b/docs/neofs-blockstorage.md index 3b4987cabf..9755713805 100644 --- a/docs/neofs-blockstorage.md +++ b/docs/neofs-blockstorage.md @@ -21,7 +21,7 @@ attributes: Each index file is an object containing a constant-sized batch of raw block object IDs in binary form ordered by block index. Each index file is marked with the following attributes: - - index file identifier with consecutive file index value (`index:0`) + - index file identifier with consecutive file index value (`oid:0`) - the number of OIDs included into index file (`size:128000`) ### NeoFS BlockFetcher diff --git a/docs/node-configuration.md b/docs/node-configuration.md index 38d5e5fdc3..162ef7909c 100644 --- a/docs/node-configuration.md +++ b/docs/node-configuration.md @@ -174,6 +174,7 @@ BlockFetcher module and has the following structure: ContainerID: "EPGuD26wYgQJbmDdVBoYoNZiMKHwFMJT3A5WqPjdUHxH" BlockAttribute: "block" IndexFileAttribute: "oid" + IndexFileSize: 128000 ``` where: - `Enabled` enables NeoFS BlockFetcher module. @@ -198,6 +199,9 @@ where: to be `2*OIDBatchSize` or `3*OIDBatchSize`. - `SkipIndexFilesSearch` is a flag that allows to skip index files search and search for blocks directly. It is set to `false` by default. +- `IndexFileSize` is the number of OID objects stored in the index files. This + setting depends on the NeoFS block storage configuration and is applicable only if + `SkipIndexFilesSearch` is set to `false`. ### Metrics Services Configuration diff --git a/pkg/config/application_config.go b/pkg/config/application_config.go index 938a48a3e2..efbdc5d948 100644 --- a/pkg/config/application_config.go +++ b/pkg/config/application_config.go @@ -148,7 +148,7 @@ func (a *ApplicationConfiguration) GetAddresses() ([]AnnounceableAddress, error) // configuration is valid and safe to use for further operations. func (a *ApplicationConfiguration) Validate() error { if err := a.NeoFSBlockFetcher.Validate(); err != nil { - return fmt.Errorf("failed to validate NeoFSBlockFetcher section: %w", err) + return fmt.Errorf("invalid NeoFSBlockFetcher config: %w", err) } return nil } diff --git a/pkg/config/blockfetcher_config.go b/pkg/config/blockfetcher_config.go index 8cfdcd9955..22d4d46a05 100644 --- a/pkg/config/blockfetcher_config.go +++ b/pkg/config/blockfetcher_config.go @@ -20,6 +20,7 @@ type NeoFSBlockFetcher struct { DownloaderWorkersCount int `yaml:"DownloaderWorkersCount"` BQueueSize int `yaml:"BQueueSize"` SkipIndexFilesSearch bool `yaml:"SkipIndexFilesSearch"` + IndexFileSize uint32 `yaml:"IndexFileSize"` } // Validate checks NeoFSBlockFetcher for internal consistency and ensures @@ -43,5 +44,8 @@ func (cfg *NeoFSBlockFetcher) Validate() error { if len(cfg.Addresses) == 0 { return errors.New("addresses are not set") } + if !cfg.SkipIndexFilesSearch && cfg.IndexFileSize == 0 { + return errors.New("IndexFileSize is not set") + } return nil } diff --git a/pkg/services/blockfetcher/blockfetcher.go b/pkg/services/blockfetcher/blockfetcher.go index caf67e2049..ee64c85eac 100644 --- a/pkg/services/blockfetcher/blockfetcher.go +++ b/pkg/services/blockfetcher/blockfetcher.go @@ -7,6 +7,7 @@ import ( "fmt" "io" "net/url" + "strings" "sync" "sync/atomic" "time" @@ -25,8 +26,6 @@ import ( const ( // oidSize is the size of the object ID in NeoFS. oidSize = sha256.Size - // indexFileSize is the number of OIDs in a single index file. - indexFileSize = 128000 // defaultTimeout is the default timeout for NeoFS requests. defaultTimeout = 5 * time.Minute // defaultOIDBatchSize is the default number of OIDs to search and fetch at once. @@ -59,18 +58,22 @@ type Service struct { // wg is a wait group for block downloaders. wg sync.WaitGroup + // Global context for download operations cancellation. + ctx context.Context + ctxCancel context.CancelFunc + // A set of routines managing graceful Service shutdown. - quit chan struct{} + quit chan bool quitOnce sync.Once exiterToOIDDownloader chan struct{} exiterToShutdown chan struct{} oidDownloaderToExiter chan struct{} - blockPutterToExiter chan struct{} + blockQueuerToExiter chan struct{} shutdownCallback func() } -// New creates a new BlockFetcherService. +// New creates a new BlockFetcher Service. func New(chain Ledger, cfg config.NeoFSBlockFetcher, logger *zap.Logger, putBlock func(*block.Block) error, shutdownCallback func()) (*Service, error) { var ( account *wallet.Account @@ -106,7 +109,7 @@ func New(chain Ledger, cfg config.NeoFSBlockFetcher, logger *zap.Logger, putBloc if cfg.DownloaderWorkersCount <= 0 { cfg.DownloaderWorkersCount = defaultDownloaderWorkersCount } - if cfg.Addresses == nil { + if len(cfg.Addresses) == 0 { return &Service{}, errors.New("no addresses provided") } return &Service{ @@ -119,11 +122,11 @@ func New(chain Ledger, cfg config.NeoFSBlockFetcher, logger *zap.Logger, putBloc stateRootInHeader: chain.GetConfig().StateRootInHeader, shutdownCallback: shutdownCallback, - quit: make(chan struct{}), + quit: make(chan bool), exiterToOIDDownloader: make(chan struct{}), exiterToShutdown: make(chan struct{}), oidDownloaderToExiter: make(chan struct{}), - blockPutterToExiter: make(chan struct{}), + blockQueuerToExiter: make(chan struct{}), // Use buffer of two batch sizes to load OIDs in advance: // * first full block of OIDs is processing by Downloader @@ -145,7 +148,8 @@ func (bfs *Service) Start() error { bfs.log.Info("starting NeoFS BlockFetcher service") var err error - bfs.client, err = neofs.GetSDKClient(context.Background(), bfs.cfg.Addresses[0], 10*time.Minute) + bfs.ctx, bfs.ctxCancel = context.WithCancel(context.Background()) + bfs.client, err = neofs.GetSDKClient(bfs.ctx, bfs.cfg.Addresses[0], 10*time.Minute) if err != nil { bfs.isActive.CompareAndSwap(true, false) return fmt.Errorf("create SDK client: %w", err) @@ -179,11 +183,13 @@ func (bfs *Service) oidDownloader() { } else { err = bfs.fetchOIDsFromIndexFiles() } + var force bool if err != nil { bfs.log.Error("NeoFS BlockFetcher service: OID downloading routine failed", zap.Error(err)) + force = true } - // Gracefully stop the service since there's nothing to do anymore. - bfs.stopService() + // Stop the service since there's nothing to do anymore. + bfs.stopService(force) } // blockDownloader downloads the block from NeoFS and sends it to the blocks channel. @@ -191,35 +197,51 @@ func (bfs *Service) blockDownloader() { defer bfs.wg.Done() for blkOid := range bfs.oidsCh { - ctx, cancel := context.WithTimeout(context.Background(), bfs.cfg.Timeout) + ctx, cancel := context.WithTimeout(bfs.ctx, bfs.cfg.Timeout) defer cancel() rc, err := bfs.objectGet(ctx, blkOid.String()) if err != nil { + if isContextCanceledErr(err) { + return + } bfs.log.Error("failed to objectGet block", zap.Error(err)) - bfs.stopService() + bfs.stopService(true) return } b, err := bfs.readBlock(rc) if err != nil { + if isContextCanceledErr(err) { + return + } bfs.log.Error("failed to read block", zap.Error(err)) - bfs.stopService() + bfs.stopService(true) return } - bfs.blocksCh <- b + select { + case <-bfs.ctx.Done(): + return + case bfs.blocksCh <- b: + } } } // blockQueuer puts the block into the bqueue. func (bfs *Service) blockQueuer() { - defer close(bfs.blockPutterToExiter) + defer close(bfs.blockQueuerToExiter) + for b := range bfs.blocksCh { - err := bfs.enqueueBlock(b) - if err != nil { - bfs.log.Error("failed to enqueue block", zap.Error(err)) - bfs.stopService() + select { + case <-bfs.ctx.Done(): return + default: + err := bfs.enqueueBlock(b) + if err != nil { + bfs.log.Error("failed to enqueue block", zap.Error(err)) + bfs.stopService(true) + return + } } } } @@ -227,8 +249,8 @@ func (bfs *Service) blockQueuer() { // fetchOIDsFromIndexFiles fetches block OIDs from NeoFS by searching index files first. func (bfs *Service) fetchOIDsFromIndexFiles() error { h := bfs.chain.BlockHeight() - startIndex := h/indexFileSize + 1 - skip := h % indexFileSize + startIndex := h/bfs.cfg.IndexFileSize + 1 + skip := h % bfs.cfg.IndexFileSize for { select { @@ -240,10 +262,13 @@ func (bfs *Service) fetchOIDsFromIndexFiles() error { filters.AddFilter(bfs.cfg.IndexFileAttribute, fmt.Sprintf("%d", startIndex), object.MatchStringEqual) prm.SetFilters(filters) - ctx, cancel := context.WithTimeout(context.Background(), bfs.cfg.Timeout) + ctx, cancel := context.WithTimeout(bfs.ctx, bfs.cfg.Timeout) blockOidsObject, err := bfs.objectSearch(ctx, prm) cancel() if err != nil { + if isContextCanceledErr(err) { + return nil + } return fmt.Errorf("failed to find '%s' object with index %d: %w", bfs.cfg.IndexFileAttribute, startIndex, err) } if len(blockOidsObject) == 0 { @@ -251,15 +276,21 @@ func (bfs *Service) fetchOIDsFromIndexFiles() error { return nil } - blockCtx, blockCancel := context.WithTimeout(context.Background(), bfs.cfg.Timeout) + blockCtx, blockCancel := context.WithTimeout(bfs.ctx, bfs.cfg.Timeout) defer blockCancel() - blockOidsData, err := bfs.objectGet(blockCtx, blockOidsObject[0].String()) + oidsRC, err := bfs.objectGet(blockCtx, blockOidsObject[0].String()) if err != nil { + if isContextCanceledErr(err) { + return nil + } return fmt.Errorf("failed to fetch '%s' object with index %d: %w", bfs.cfg.IndexFileAttribute, startIndex, err) } - err = bfs.streamBlockOIDs(blockOidsData, int(skip)) + err = bfs.streamBlockOIDs(oidsRC, int(skip)) if err != nil { + if isContextCanceledErr(err) { + return nil + } return fmt.Errorf("failed to stream block OIDs with index %d: %w", startIndex, err) } @@ -302,8 +333,8 @@ func (bfs *Service) streamBlockOIDs(rc io.ReadCloser, skip int) error { oidsProcessed++ } - if oidsProcessed != indexFileSize { - return fmt.Errorf("block OIDs count mismatch: expected %d, processed %d", indexFileSize, oidsProcessed) + if oidsProcessed != int(bfs.cfg.IndexFileSize) { + return fmt.Errorf("block OIDs count mismatch: expected %d, processed %d", bfs.cfg.IndexFileSize, oidsProcessed) } return nil } @@ -323,10 +354,13 @@ func (bfs *Service) fetchOIDsBySearch() error { filters.AddFilter(bfs.cfg.BlockAttribute, fmt.Sprintf("%d", startIndex), object.MatchNumGE) filters.AddFilter(bfs.cfg.BlockAttribute, fmt.Sprintf("%d", startIndex+batchSize-1), object.MatchNumLE) prm.SetFilters(filters) - ctx, cancel := context.WithTimeout(context.Background(), bfs.cfg.Timeout) + ctx, cancel := context.WithTimeout(bfs.ctx, bfs.cfg.Timeout) blockOids, err := bfs.objectSearch(ctx, prm) cancel() if err != nil { + if isContextCanceledErr(err) { + return nil + } return err } @@ -355,20 +389,22 @@ func (bfs *Service) readBlock(rc io.ReadCloser) (*block.Block, error) { return b, r.Err } -// Shutdown stops the NeoFS BlockFetcher service gracefully. It prevents service from new -// block OIDs search and waits until all downloaders finish their work. +// Shutdown stops the NeoFS BlockFetcher service. It prevents service from new +// block OIDs search, cancels all in-progress downloading operations and waits +// until all service routines finish their work. func (bfs *Service) Shutdown() { if !bfs.IsActive() { return } - bfs.stopService() + bfs.stopService(true) <-bfs.exiterToShutdown } // stopService close quitting goroutine once. It's the only entrypoint to shutdown // procedure. -func (bfs *Service) stopService() { +func (bfs *Service) stopService(force bool) { bfs.quitOnce.Do(func() { + bfs.quit <- force close(bfs.quit) }) } @@ -377,8 +413,16 @@ func (bfs *Service) stopService() { // Service shutdown process. func (bfs *Service) exiter() { // Closing signal may come from anyone, but only once. - <-bfs.quit - bfs.log.Info("shutting down NeoFS BlockFetcher service") + force := <-bfs.quit + bfs.log.Info("shutting down NeoFS BlockFetcher service", + zap.Bool("force", force), + ) + + // Cansel all pending OIDs/blocks downloads in case if shutdown requested by user + // or caused by downloading error. + if force { + bfs.ctxCancel() + } // Send signal to OID downloader to stop. Wait until OID downloader finishes his // work. @@ -392,10 +436,11 @@ func (bfs *Service) exiter() { // Send signal to block putter to finish his work. Wait until it's finished. close(bfs.blocksCh) - <-bfs.blockPutterToExiter + <-bfs.blockQueuerToExiter - // Everything is done, turn off the activity marker and let the server know about - // it. + // Everything is done, release resources, turn off the activity marker and let + // the server know about it. + _ = bfs.client.Close() _ = bfs.log.Sync() bfs.isActive.CompareAndSwap(true, false) bfs.shutdownCallback() @@ -425,3 +470,10 @@ func (bfs *Service) objectGet(ctx context.Context, oid string) (io.ReadCloser, e func (bfs *Service) objectSearch(ctx context.Context, prm client.PrmObjectSearch) ([]oid.ID, error) { return neofs.ObjectSearch(ctx, bfs.client, bfs.account.PrivateKey(), bfs.cfg.ContainerID, prm) } + +// isContextCanceledErr returns whether error is a wrapped [context.Canceled]. +// Ref. https://github.com/nspcc-dev/neofs-sdk-go/issues/624. +func isContextCanceledErr(err error) bool { + return errors.Is(err, context.Canceled) || + strings.Contains(err.Error(), "context canceled") +} diff --git a/pkg/services/blockfetcher/blockfetcher_test.go b/pkg/services/blockfetcher/blockfetcher_test.go index e6d105050d..339a67ea78 100644 --- a/pkg/services/blockfetcher/blockfetcher_test.go +++ b/pkg/services/blockfetcher/blockfetcher_test.go @@ -1,11 +1,10 @@ -package blockfetcher_test +package blockfetcher import ( "testing" "github.com/nspcc-dev/neo-go/pkg/config" "github.com/nspcc-dev/neo-go/pkg/core/block" - "github.com/nspcc-dev/neo-go/pkg/services/blockfetcher" "github.com/stretchr/testify/require" "go.uber.org/zap" ) @@ -37,18 +36,64 @@ func TestServiceConstructor(t *testing.T) { mockPut := &mockPutBlockFunc{} shutdownCallback := func() {} - cfg := config.NeoFSBlockFetcher{ - Timeout: 0, - OIDBatchSize: 0, - DownloaderWorkersCount: 0, - } - _, err := blockfetcher.New(ledger, cfg, logger, mockPut.putBlock, shutdownCallback) - require.Error(t, err) - cfg.Addresses = []string{"http://localhost:8080"} - service, err := blockfetcher.New(ledger, cfg, logger, mockPut.putBlock, shutdownCallback) - require.NoError(t, err) - require.NotNil(t, service) - require.Equal(t, service.IsActive(), false) - require.Error(t, service.Start()) - require.Equal(t, service.IsActive(), false) + t.Run("empty configuration", func(t *testing.T) { + cfg := config.NeoFSBlockFetcher{ + Timeout: 0, + OIDBatchSize: 0, + DownloaderWorkersCount: 0, + } + _, err := New(ledger, cfg, logger, mockPut.putBlock, shutdownCallback) + require.Error(t, err) + }) + + t.Run("no addresses", func(t *testing.T) { + cfg := config.NeoFSBlockFetcher{ + Addresses: []string{}, + } + _, err := New(ledger, cfg, logger, mockPut.putBlock, shutdownCallback) + require.Error(t, err) + }) + + t.Run("default values", func(t *testing.T) { + cfg := config.NeoFSBlockFetcher{ + Addresses: []string{"http://localhost:8080"}, + } + service, err := New(ledger, cfg, logger, mockPut.putBlock, shutdownCallback) + require.NoError(t, err) + require.NotNil(t, service) + + require.Equal(t, service.IsActive(), false) + require.Equal(t, service.cfg.Timeout, defaultTimeout) + require.Equal(t, service.cfg.OIDBatchSize, defaultOIDBatchSize) + require.Equal(t, service.cfg.DownloaderWorkersCount, defaultDownloaderWorkersCount) + require.Equal(t, service.IsActive(), false) + }) + + t.Run("SDK client", func(t *testing.T) { + cfg := config.NeoFSBlockFetcher{ + Addresses: []string{"http://localhost:8080"}, + } + service, err := New(ledger, cfg, logger, mockPut.putBlock, shutdownCallback) + require.NoError(t, err) + err = service.Start() + require.Error(t, err) + require.Contains(t, err.Error(), "create SDK client") + require.Equal(t, service.IsActive(), false) + }) + + t.Run("invalid wallet", func(t *testing.T) { + cfg := config.NeoFSBlockFetcher{ + Addresses: []string{"http://localhost:8080"}, + InternalService: config.InternalService{ + Enabled: true, + UnlockWallet: config.Wallet{ + Path: "invalid/path/to/wallet.json", + Password: "wrong-password", + }, + }, + } + _, err := New(ledger, cfg, logger, mockPut.putBlock, shutdownCallback) + require.Error(t, err) + require.Contains(t, err.Error(), "no such file or directory") + }) }