From 17119681165d94e69a37d8fc96881d514a7d65fc Mon Sep 17 00:00:00 2001 From: Ekaterina Pavlova Date: Thu, 5 Sep 2024 15:23:15 +0400 Subject: [PATCH] services: add new service for fetching blocks from NeoFS Close #3496 Co-authored-by: Anna Shaleva Signed-off-by: Ekaterina Pavlova --- config/protocol.testnet.yml | 13 + docs/neofs-blockstorage.md | 74 +++ docs/node-configuration.md | 50 ++ pkg/config/application_config.go | 23 +- pkg/config/blockfetcher_config.go | 51 ++ pkg/config/config.go | 4 + pkg/network/server.go | 87 +++- pkg/network/server_config.go | 39 +- pkg/services/blockfetcher/blockfetcher.go | 479 ++++++++++++++++++ .../blockfetcher/blockfetcher_test.go | 99 ++++ 10 files changed, 872 insertions(+), 47 deletions(-) create mode 100644 docs/neofs-blockstorage.md create mode 100644 pkg/config/blockfetcher_config.go create mode 100644 pkg/services/blockfetcher/blockfetcher.go create mode 100644 pkg/services/blockfetcher/blockfetcher_test.go diff --git a/config/protocol.testnet.yml b/config/protocol.testnet.yml index bdab3cd20d..9a31251a2b 100644 --- a/config/protocol.testnet.yml +++ b/config/protocol.testnet.yml @@ -100,3 +100,16 @@ ApplicationConfiguration: Enabled: false Addresses: - ":2113" + NeoFSBlockFetcher: + Enabled: false + Addresses: + - st1.storage.fs.neo.org:8080 + Timeout: 10m + DownloaderWorkersCount: 500 + OIDBatchSize: 8000 + BQueueSize: 16000 # must be larger than OIDBatchSize; recommended to be 2*OIDBatchSize or 3*OIDBatchSize + SkipIndexFilesSearch: false + IndexFileSize: 128000 + ContainerID: "EPGuD26wYgQJbmDdVBoYoNZiMKHwFMJT3A5WqPjdUHxH" + BlockAttribute: "block" + IndexFileAttribute: "oid" diff --git a/docs/neofs-blockstorage.md b/docs/neofs-blockstorage.md new file mode 100644 index 0000000000..9755713805 --- /dev/null +++ b/docs/neofs-blockstorage.md @@ -0,0 +1,74 @@ +# NeoFS block storage + +Using NeoFS to store chain's blocks and snapshots was proposed in +[#3463](https://github.com/neo-project/neo/issues/3463). NeoGo contains several +extensions utilizing NeoFS block storage aimed to improve node synchronization +efficiency and reduce node storage size. + +## Components and functionality + +### Block storage schema + +A single NeoFS container is used to store blocks and index files. Each block +is stored in a binary form as a separate object with a unique OID and a set of +attributes: + - block object identifier with block index value (`block:1`) + - primary node index (`primary:0`) + - block hash in the LE form (`hash:5412a781caf278c0736556c0e544c7cfdbb6e3c62ae221ef53646be89364566b`) + - previous block hash in the LE form (`prevHash:3654a054d82a8178c7dfacecc2c57282e23468a42ee407f14506368afe22d929`) + - millisecond-precision block timestamp (`time:1627894840919`) + +Each index file is an object containing a constant-sized batch of raw block object +IDs in binary form ordered by block index. Each index file is marked with the +following attributes: + - index file identifier with consecutive file index value (`oid:0`) + - the number of OIDs included into index file (`size:128000`) + +### NeoFS BlockFetcher + +NeoFS BlockFetcher service is designed as an alternative to P2P synchronisation +protocol. It allows to download blocks from a trusted container in the NeoFS network +and persist them to database using standard verification flow. NeoFS BlockFetcher +service primarily used during the node's bootstrap, providing a fast alternative to +P2P blocks synchronisation. + +NeoFS BlockFetcher service has two modes of operation: +- Index File Search: Search for index files, which contain batches of block object + IDs and fetch blocks from NeoFS by retrieved OIDs. +- Direct Block Search: Search and fetch blocks directly from NeoFS container via + built-in NeoFS object search mechanism. + +Operation mode of BlockFetcher can be configured via `SkipIndexFilesSearch` +parameter. + +#### Operation flow + +1. **OID Fetching**: + Depending on the mode, the service either: + - Searches for index files by index file attribute and reads block OIDs from index + file object-by-object. + - Searches batches of blocks directly by block attribute (the batch size is + configured via `OIDBatchSize` parameter). + + Once the OIDs are retrieved, they are immediately redirected to the + block downloading routines for further processing. The channel that + is used to redirect block OIDs to downloading routines is buffered + to provide smooth OIDs delivery without delays. The size of this channel + can be configured via `OIDBatchSize` parameter and equals to `2*OIDBatchSize`. +2. **Parallel Block Downloading**: + The number of downloading routines can be configured via + `DownloaderWorkersCount` parameter. It's up to the user to find the + balance between the downloading speed and blocks persist speed for every + node that uses NeoFS BlockFetcher. Downloaded blocks are placed into a + buffered channel of size `IDBatchSize` with further redirection to the + block queue. +3. **Block Insertion**: + Downloaded blocks are inserted into the blockchain using the same logic + as in the P2P synchronisation protocol. The block queue is used to order + downloaded blocks before they are inserted into the blockchain. The + size of the queue can be configured via the `BQueueSize` parameter + and should be larger than the `OIDBatchSize` parameter to avoid blocking + the downloading routines. + +Once all blocks available in the NeoFS container are processed, the service +shuts down automatically. diff --git a/docs/node-configuration.md b/docs/node-configuration.md index 83c149e620..162ef7909c 100644 --- a/docs/node-configuration.md +++ b/docs/node-configuration.md @@ -21,6 +21,7 @@ node-related settings described in the table below. | GarbageCollectionPeriod | `uint32` | 10000 | Controls MPT garbage collection interval (in blocks) for configurations with `RemoveUntraceableBlocks` enabled and `KeepOnlyLatestState` disabled. In this mode the node stores a number of MPT trees (corresponding to `MaxTraceableBlocks` and `StateSyncInterval`), but the DB needs to be clean from old entries from time to time. Doing it too often will cause too much processing overhead, doing it too rarely will leave more useless data in the DB. | | KeepOnlyLatestState | `bool` | `false` | Specifies if MPT should only store the latest state (or a set of latest states, see `P2PStateExchangeExtensions` section in the ProtocolConfiguration for details). If true, DB size will be smaller, but older roots won't be accessible. This value should remain the same for the same database. | | | LogPath | `string` | "", so only console logging | File path where to store node logs. | +| NeoFSBlockFetcher | [NeoFS BlockFetcher Configuration](#NeoFS-BlockFetcher-Configuration) | | NeoFS BlockFetcher module configuration. See the [NeoFS BlockFetcher Configuration](#NeoFS-BlockFetcher-Configuration) section for details. | | Oracle | [Oracle Configuration](#Oracle-Configuration) | | Oracle module configuration. See the [Oracle Configuration](#Oracle-Configuration) section for details. | | P2P | [P2P Configuration](#P2P-Configuration) | | Configuration values for P2P network interaction. See the [P2P Configuration](#P2P-Configuration) section for details. | | P2PNotary | [P2P Notary Configuration](#P2P-Notary-Configuration) | | P2P Notary module configuration. See the [P2P Notary Configuration](#P2P-Notary-Configuration) section for details. | @@ -153,6 +154,55 @@ where: Please, refer to the [Notary module documentation](./notary.md#Notary node module) for details on module features. +### NeoFS BlockFetcher Configuration + +`NeoFSBlockFetcher` configuration section contains settings for NeoFS +BlockFetcher module and has the following structure: +``` + NeoFSBlockFetcher: + Enabled: true + UnlockWallet: + Path: "./wallet.json" + Password: "pass" + Addresses: + - st1.storage.fs.neo.org:8080 + Timeout: 10m + DownloaderWorkersCount: 500 + OIDBatchSize: 8000 + BQueueSize: 16000 + SkipIndexFilesSearch: false + ContainerID: "EPGuD26wYgQJbmDdVBoYoNZiMKHwFMJT3A5WqPjdUHxH" + BlockAttribute: "block" + IndexFileAttribute: "oid" + IndexFileSize: 128000 +``` +where: +- `Enabled` enables NeoFS BlockFetcher module. +- `UnlockWallet` contains wallet settings to retrieve account to sign requests to + NeoFS. Without this setting, the module will use randomly generated private key. + For configuration details see [Unlock Wallet Configuration](#Unlock-Wallet-Configuration) +- `Addresses` is a list of NeoFS storage nodes addresses. +- `Timeout` is a timeout for a single request to NeoFS storage node. +- `ContainerID` is a container ID to fetch blocks from. +- `BlockAttribute` is an attribute name of NeoFS object that contains block + data. +- `IndexFileAttribute` is an attribute name of NeoFS index object that contains block + object IDs. +- `DownloaderWorkersCount` is a number of workers that download blocks from + NeoFS in parallel. +- `OIDBatchSize` is the number of blocks to search per a single request to NeoFS + in case of disabled index files search. Also, for both modes of BlockFetcher + operation this setting manages the buffer size of OIDs and blocks transferring + channels. +- `BQueueSize` is a size of the block queue used to manage consecutive blocks + addition to the chain. It must be larger than `OIDBatchSize` and highly recommended + to be `2*OIDBatchSize` or `3*OIDBatchSize`. +- `SkipIndexFilesSearch` is a flag that allows to skip index files search and search + for blocks directly. It is set to `false` by default. +- `IndexFileSize` is the number of OID objects stored in the index files. This + setting depends on the NeoFS block storage configuration and is applicable only if + `SkipIndexFilesSearch` is set to `false`. + ### Metrics Services Configuration Metrics services configuration describes options for metrics services (pprof, diff --git a/pkg/config/application_config.go b/pkg/config/application_config.go index 9ac369238a..efbdc5d948 100644 --- a/pkg/config/application_config.go +++ b/pkg/config/application_config.go @@ -23,12 +23,13 @@ type ApplicationConfiguration struct { Pprof BasicService `yaml:"Pprof"` Prometheus BasicService `yaml:"Prometheus"` - Relay bool `yaml:"Relay"` - Consensus Consensus `yaml:"Consensus"` - RPC RPC `yaml:"RPC"` - Oracle OracleConfiguration `yaml:"Oracle"` - P2PNotary P2PNotary `yaml:"P2PNotary"` - StateRoot StateRoot `yaml:"StateRoot"` + Relay bool `yaml:"Relay"` + Consensus Consensus `yaml:"Consensus"` + RPC RPC `yaml:"RPC"` + Oracle OracleConfiguration `yaml:"Oracle"` + P2PNotary P2PNotary `yaml:"P2PNotary"` + StateRoot StateRoot `yaml:"StateRoot"` + NeoFSBlockFetcher NeoFSBlockFetcher `yaml:"NeoFSBlockFetcher"` } // EqualsButServices returns true when the o is the same as a except for services @@ -141,3 +142,13 @@ func (a *ApplicationConfiguration) GetAddresses() ([]AnnounceableAddress, error) } return addrs, nil } + +// Validate checks ApplicationConfiguration for internal consistency and returns +// an error if any invalid settings are found. This ensures that the application +// configuration is valid and safe to use for further operations. +func (a *ApplicationConfiguration) Validate() error { + if err := a.NeoFSBlockFetcher.Validate(); err != nil { + return fmt.Errorf("invalid NeoFSBlockFetcher config: %w", err) + } + return nil +} diff --git a/pkg/config/blockfetcher_config.go b/pkg/config/blockfetcher_config.go new file mode 100644 index 0000000000..22d4d46a05 --- /dev/null +++ b/pkg/config/blockfetcher_config.go @@ -0,0 +1,51 @@ +package config + +import ( + "errors" + "fmt" + "time" + + cid "github.com/nspcc-dev/neofs-sdk-go/container/id" +) + +// NeoFSBlockFetcher represents the configuration for the NeoFS BlockFetcher service. +type NeoFSBlockFetcher struct { + InternalService `yaml:",inline"` + Timeout time.Duration `yaml:"Timeout"` + ContainerID string `yaml:"ContainerID"` + Addresses []string `yaml:"Addresses"` + OIDBatchSize int `yaml:"OIDBatchSize"` + BlockAttribute string `yaml:"BlockAttribute"` + IndexFileAttribute string `yaml:"IndexFileAttribute"` + DownloaderWorkersCount int `yaml:"DownloaderWorkersCount"` + BQueueSize int `yaml:"BQueueSize"` + SkipIndexFilesSearch bool `yaml:"SkipIndexFilesSearch"` + IndexFileSize uint32 `yaml:"IndexFileSize"` +} + +// Validate checks NeoFSBlockFetcher for internal consistency and ensures +// that all required fields are properly set. It returns an error if the +// configuration is invalid or if the ContainerID cannot be properly decoded. +func (cfg *NeoFSBlockFetcher) Validate() error { + if !cfg.Enabled { + return nil + } + if cfg.ContainerID == "" { + return errors.New("container ID is not set") + } + var containerID cid.ID + err := containerID.DecodeString(cfg.ContainerID) + if err != nil { + return fmt.Errorf("invalid container ID: %w", err) + } + if cfg.BQueueSize < cfg.OIDBatchSize { + return fmt.Errorf("BQueueSize (%d) is lower than OIDBatchSize (%d)", cfg.BQueueSize, cfg.OIDBatchSize) + } + if len(cfg.Addresses) == 0 { + return errors.New("addresses are not set") + } + if !cfg.SkipIndexFilesSearch && cfg.IndexFileSize == 0 { + return errors.New("IndexFileSize is not set") + } + return nil +} diff --git a/pkg/config/config.go b/pkg/config/config.go index a82838ec21..0278614333 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -116,6 +116,10 @@ func LoadFile(configPath string, relativePath ...string) (Config, error) { if err != nil { return Config{}, err } + err = config.ApplicationConfiguration.Validate() + if err != nil { + return Config{}, err + } return config, nil } diff --git a/pkg/network/server.go b/pkg/network/server.go index 3bbdceac6e..6c9c3df649 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -28,6 +28,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/network/capability" "github.com/nspcc-dev/neo-go/pkg/network/extpool" "github.com/nspcc-dev/neo-go/pkg/network/payload" + "github.com/nspcc-dev/neo-go/pkg/services/blockfetcher" "github.com/nspcc-dev/neo-go/pkg/util" "go.uber.org/zap" ) @@ -103,10 +104,12 @@ type ( chain Ledger bQueue *bqueue.Queue bSyncQueue *bqueue.Queue + bFetcherQueue *bqueue.Queue mempool *mempool.Pool notaryRequestPool *mempool.Pool extensiblePool *extpool.Pool notaryFeer NotaryFeer + blockFetcher *blockfetcher.Service serviceLock sync.RWMutex services map[string]Service @@ -133,6 +136,7 @@ type ( runFin chan struct{} broadcastTxFin chan struct{} runProtoFin chan struct{} + blockFetcherFin chan struct{} transactions chan *transaction.Transaction @@ -182,28 +186,29 @@ func newServerFromConstructors(config ServerConfig, chain Ledger, stSync StateSy } s := &Server{ - ServerConfig: config, - chain: chain, - id: randomID(), - config: chain.GetConfig().ProtocolConfiguration, - quit: make(chan struct{}), - relayFin: make(chan struct{}), - runFin: make(chan struct{}), - broadcastTxFin: make(chan struct{}), - runProtoFin: make(chan struct{}), - register: make(chan Peer), - unregister: make(chan peerDrop), - handshake: make(chan Peer), - txInMap: make(map[util.Uint256]struct{}), - peers: make(map[Peer]bool), - mempool: chain.GetMemPool(), - extensiblePool: extpool.New(chain, config.ExtensiblePoolSize), - log: log, - txin: make(chan *transaction.Transaction, 64), - transactions: make(chan *transaction.Transaction, 64), - services: make(map[string]Service), - extensHandlers: make(map[string]func(*payload.Extensible) error), - stateSync: stSync, + ServerConfig: config, + chain: chain, + id: randomID(), + config: chain.GetConfig().ProtocolConfiguration, + quit: make(chan struct{}), + relayFin: make(chan struct{}), + runFin: make(chan struct{}), + broadcastTxFin: make(chan struct{}), + runProtoFin: make(chan struct{}), + blockFetcherFin: make(chan struct{}), + register: make(chan Peer), + unregister: make(chan peerDrop), + handshake: make(chan Peer), + txInMap: make(map[util.Uint256]struct{}), + peers: make(map[Peer]bool), + mempool: chain.GetMemPool(), + extensiblePool: extpool.New(chain, config.ExtensiblePoolSize), + log: log, + txin: make(chan *transaction.Transaction, 64), + transactions: make(chan *transaction.Transaction, 64), + services: make(map[string]Service), + extensHandlers: make(map[string]func(*payload.Extensible) error), + stateSync: stSync, } if chain.P2PSigExtensionsEnabled() { s.notaryFeer = NewNotaryFeer(chain) @@ -219,6 +224,14 @@ func newServerFromConstructors(config ServerConfig, chain Ledger, stSync StateSy }, bqueue.DefaultCacheSize, updateBlockQueueLenMetric, bqueue.NonBlocking) s.bSyncQueue = bqueue.New(s.stateSync, log, nil, bqueue.DefaultCacheSize, updateBlockQueueLenMetric, bqueue.NonBlocking) + s.bFetcherQueue = bqueue.New(chain, log, nil, s.NeoFSBlockFetcherCfg.BQueueSize, updateBlockQueueLenMetric, bqueue.Blocking) + var err error + s.blockFetcher, err = blockfetcher.New(chain, s.NeoFSBlockFetcherCfg, log, s.bFetcherQueue.PutBlock, func() { + close(s.blockFetcherFin) + }) + if err != nil && config.NeoFSBlockFetcherCfg.Enabled { + return nil, fmt.Errorf("failed to create NeoFS BlockFetcher: %w", err) + } if s.MinPeers < 0 { s.log.Info("bad MinPeers configured, using the default value", @@ -295,6 +308,13 @@ func (s *Server) Start() { go s.relayBlocksLoop() go s.bQueue.Run() go s.bSyncQueue.Run() + go s.bFetcherQueue.Run() + if s.ServerConfig.NeoFSBlockFetcherCfg.Enabled { + err := s.blockFetcher.Start() + if err != nil { + s.log.Error("skipping NeoFS BlockFetcher", zap.Error(err)) + } + } for _, tr := range s.transports { go tr.Accept() } @@ -311,6 +331,9 @@ func (s *Server) Shutdown() { return } s.log.Info("shutting down server", zap.Int("peers", s.PeerCount())) + if s.ServerConfig.NeoFSBlockFetcherCfg.Enabled { + s.blockFetcher.Shutdown() + } for _, tr := range s.transports { tr.Close() } @@ -319,6 +342,7 @@ func (s *Server) Shutdown() { } s.bQueue.Discard() s.bSyncQueue.Discard() + s.bFetcherQueue.Discard() s.serviceLock.RLock() for _, svc := range s.services { svc.Shutdown() @@ -548,6 +572,11 @@ func (s *Server) run() { s.tryInitStateSync() s.tryStartServices() + case <-s.blockFetcherFin: + if s.started.Load() { + s.tryInitStateSync() + s.tryStartServices() + } } } } @@ -702,7 +731,7 @@ func (s *Server) IsInSync() bool { var peersNumber int var notHigher int - if s.stateSync.IsActive() { + if s.stateSync.IsActive() || s.blockFetcher.IsActive() { return false } @@ -762,6 +791,9 @@ func (s *Server) handleVersionCmd(p Peer, version *payload.Version) error { // handleBlockCmd processes the block received from its peer. func (s *Server) handleBlockCmd(p Peer, block *block.Block) error { + if s.blockFetcher.IsActive() { + return nil + } if s.stateSync.IsActive() { return s.bSyncQueue.PutBlock(block) } @@ -782,6 +814,9 @@ func (s *Server) handlePing(p Peer, ping *payload.Ping) error { } func (s *Server) requestBlocksOrHeaders(p Peer) error { + if s.blockFetcher.IsActive() { + return nil + } if s.stateSync.NeedHeaders() { if s.chain.HeaderHeight() < p.LastBlockIndex() { return s.requestHeaders(p) @@ -1100,6 +1135,9 @@ func (s *Server) handleGetHeadersCmd(p Peer, gh *payload.GetBlockByIndex) error // handleHeadersCmd processes headers payload. func (s *Server) handleHeadersCmd(p Peer, h *payload.Headers) error { + if s.blockFetcher.IsActive() { + return nil + } return s.stateSync.AddHeaders(h.Hdrs...) } @@ -1428,6 +1466,9 @@ func (s *Server) handleMessage(peer Peer, msg *Message) error { } func (s *Server) tryInitStateSync() { + if s.blockFetcher.IsActive() { + return + } if !s.stateSync.IsActive() { s.bSyncQueue.Discard() return diff --git a/pkg/network/server_config.go b/pkg/network/server_config.go index c0f1e727f6..a9cc9beda0 100644 --- a/pkg/network/server_config.go +++ b/pkg/network/server_config.go @@ -76,6 +76,8 @@ type ( // BroadcastFactor is the factor (0-100) for fan-out optimization. BroadcastFactor int + + NeoFSBlockFetcherCfg config.NeoFSBlockFetcher } ) @@ -89,24 +91,25 @@ func NewServerConfig(cfg config.Config) (ServerConfig, error) { return ServerConfig{}, fmt.Errorf("failed to parse addresses: %w", err) } c := ServerConfig{ - UserAgent: cfg.GenerateUserAgent(), - Addresses: addrs, - Net: protoConfig.Magic, - Relay: appConfig.Relay, - Seeds: protoConfig.SeedList, - DialTimeout: appConfig.P2P.DialTimeout, - ProtoTickInterval: appConfig.P2P.ProtoTickInterval, - PingInterval: appConfig.P2P.PingInterval, - PingTimeout: appConfig.P2P.PingTimeout, - MaxPeers: appConfig.P2P.MaxPeers, - AttemptConnPeers: appConfig.P2P.AttemptConnPeers, - MinPeers: appConfig.P2P.MinPeers, - TimePerBlock: protoConfig.TimePerBlock, - OracleCfg: appConfig.Oracle, - P2PNotaryCfg: appConfig.P2PNotary, - StateRootCfg: appConfig.StateRoot, - ExtensiblePoolSize: appConfig.P2P.ExtensiblePoolSize, - BroadcastFactor: appConfig.P2P.BroadcastFactor, + UserAgent: cfg.GenerateUserAgent(), + Addresses: addrs, + Net: protoConfig.Magic, + Relay: appConfig.Relay, + Seeds: protoConfig.SeedList, + DialTimeout: appConfig.P2P.DialTimeout, + ProtoTickInterval: appConfig.P2P.ProtoTickInterval, + PingInterval: appConfig.P2P.PingInterval, + PingTimeout: appConfig.P2P.PingTimeout, + MaxPeers: appConfig.P2P.MaxPeers, + AttemptConnPeers: appConfig.P2P.AttemptConnPeers, + MinPeers: appConfig.P2P.MinPeers, + TimePerBlock: protoConfig.TimePerBlock, + OracleCfg: appConfig.Oracle, + P2PNotaryCfg: appConfig.P2PNotary, + StateRootCfg: appConfig.StateRoot, + ExtensiblePoolSize: appConfig.P2P.ExtensiblePoolSize, + BroadcastFactor: appConfig.P2P.BroadcastFactor, + NeoFSBlockFetcherCfg: appConfig.NeoFSBlockFetcher, } return c, nil } diff --git a/pkg/services/blockfetcher/blockfetcher.go b/pkg/services/blockfetcher/blockfetcher.go new file mode 100644 index 0000000000..ee64c85eac --- /dev/null +++ b/pkg/services/blockfetcher/blockfetcher.go @@ -0,0 +1,479 @@ +package blockfetcher + +import ( + "context" + "crypto/sha256" + "errors" + "fmt" + "io" + "net/url" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/nspcc-dev/neo-go/pkg/config" + "github.com/nspcc-dev/neo-go/pkg/core/block" + gio "github.com/nspcc-dev/neo-go/pkg/io" + "github.com/nspcc-dev/neo-go/pkg/services/oracle/neofs" + "github.com/nspcc-dev/neo-go/pkg/wallet" + "github.com/nspcc-dev/neofs-sdk-go/client" + "github.com/nspcc-dev/neofs-sdk-go/object" + oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + "go.uber.org/zap" +) + +const ( + // oidSize is the size of the object ID in NeoFS. + oidSize = sha256.Size + // defaultTimeout is the default timeout for NeoFS requests. + defaultTimeout = 5 * time.Minute + // defaultOIDBatchSize is the default number of OIDs to search and fetch at once. + defaultOIDBatchSize = 8000 + // defaultDownloaderWorkersCount is the default number of workers downloading blocks. + defaultDownloaderWorkersCount = 100 +) + +// Ledger is an interface to Blockchain sufficient for Service. +type Ledger interface { + GetConfig() config.Blockchain + BlockHeight() uint32 +} + +// Service is a service that fetches blocks from NeoFS. +type Service struct { + // isActive denotes whether the service is working or in the process of shutdown. + isActive atomic.Bool + log *zap.Logger + cfg config.NeoFSBlockFetcher + stateRootInHeader bool + + chain Ledger + client *client.Client + enqueueBlock func(*block.Block) error + account *wallet.Account + + oidsCh chan oid.ID + blocksCh chan *block.Block + // wg is a wait group for block downloaders. + wg sync.WaitGroup + + // Global context for download operations cancellation. + ctx context.Context + ctxCancel context.CancelFunc + + // A set of routines managing graceful Service shutdown. + quit chan bool + quitOnce sync.Once + exiterToOIDDownloader chan struct{} + exiterToShutdown chan struct{} + oidDownloaderToExiter chan struct{} + blockQueuerToExiter chan struct{} + + shutdownCallback func() +} + +// New creates a new BlockFetcher Service. +func New(chain Ledger, cfg config.NeoFSBlockFetcher, logger *zap.Logger, putBlock func(*block.Block) error, shutdownCallback func()) (*Service, error) { + var ( + account *wallet.Account + err error + ) + + if cfg.UnlockWallet.Path != "" { + walletFromFile, err := wallet.NewWalletFromFile(cfg.UnlockWallet.Path) + if err != nil { + return &Service{}, err + } + for _, acc := range walletFromFile.Accounts { + if err := acc.Decrypt(cfg.UnlockWallet.Password, walletFromFile.Scrypt); err == nil { + account = acc + break + } + } + if account == nil { + return &Service{}, errors.New("failed to decrypt any account in the wallet") + } + } else { + account, err = wallet.NewAccount() + if err != nil { + return &Service{}, err + } + } + if cfg.Timeout <= 0 { + cfg.Timeout = defaultTimeout + } + if cfg.OIDBatchSize <= 0 { + cfg.OIDBatchSize = defaultOIDBatchSize + } + if cfg.DownloaderWorkersCount <= 0 { + cfg.DownloaderWorkersCount = defaultDownloaderWorkersCount + } + if len(cfg.Addresses) == 0 { + return &Service{}, errors.New("no addresses provided") + } + return &Service{ + chain: chain, + log: logger, + cfg: cfg, + + enqueueBlock: putBlock, + account: account, + stateRootInHeader: chain.GetConfig().StateRootInHeader, + shutdownCallback: shutdownCallback, + + quit: make(chan bool), + exiterToOIDDownloader: make(chan struct{}), + exiterToShutdown: make(chan struct{}), + oidDownloaderToExiter: make(chan struct{}), + blockQueuerToExiter: make(chan struct{}), + + // Use buffer of two batch sizes to load OIDs in advance: + // * first full block of OIDs is processing by Downloader + // * second full block of OIDs is available to be fetched by Downloader immediately + // * third half-filled block of OIDs is being collected by OIDsFetcher. + oidsCh: make(chan oid.ID, 2*cfg.OIDBatchSize), + + // Use buffer of a single OIDs batch size to provide smooth downloading and + // avoid pauses during blockqueue insertion. + blocksCh: make(chan *block.Block, cfg.OIDBatchSize), + }, nil +} + +// Start runs the NeoFS BlockFetcher service. +func (bfs *Service) Start() error { + if !bfs.isActive.CompareAndSwap(false, true) { + return nil + } + bfs.log.Info("starting NeoFS BlockFetcher service") + + var err error + bfs.ctx, bfs.ctxCancel = context.WithCancel(context.Background()) + bfs.client, err = neofs.GetSDKClient(bfs.ctx, bfs.cfg.Addresses[0], 10*time.Minute) + if err != nil { + bfs.isActive.CompareAndSwap(true, false) + return fmt.Errorf("create SDK client: %w", err) + } + + // Start routine that manages Service shutdown process. + go bfs.exiter() + + // Start OIDs downloader routine. + go bfs.oidDownloader() + + // Start the set of blocks downloading routines. + for range bfs.cfg.DownloaderWorkersCount { + bfs.wg.Add(1) + go bfs.blockDownloader() + } + + // Start routine that puts blocks into bQueue. + go bfs.blockQueuer() + + return nil +} + +// oidDownloader runs the appropriate blocks OID fetching method based on the configuration. +func (bfs *Service) oidDownloader() { + defer close(bfs.oidDownloaderToExiter) + + var err error + if bfs.cfg.SkipIndexFilesSearch { + err = bfs.fetchOIDsBySearch() + } else { + err = bfs.fetchOIDsFromIndexFiles() + } + var force bool + if err != nil { + bfs.log.Error("NeoFS BlockFetcher service: OID downloading routine failed", zap.Error(err)) + force = true + } + // Stop the service since there's nothing to do anymore. + bfs.stopService(force) +} + +// blockDownloader downloads the block from NeoFS and sends it to the blocks channel. +func (bfs *Service) blockDownloader() { + defer bfs.wg.Done() + + for blkOid := range bfs.oidsCh { + ctx, cancel := context.WithTimeout(bfs.ctx, bfs.cfg.Timeout) + defer cancel() + + rc, err := bfs.objectGet(ctx, blkOid.String()) + if err != nil { + if isContextCanceledErr(err) { + return + } + bfs.log.Error("failed to objectGet block", zap.Error(err)) + bfs.stopService(true) + return + } + + b, err := bfs.readBlock(rc) + if err != nil { + if isContextCanceledErr(err) { + return + } + bfs.log.Error("failed to read block", zap.Error(err)) + bfs.stopService(true) + return + } + select { + case <-bfs.ctx.Done(): + return + case bfs.blocksCh <- b: + } + } +} + +// blockQueuer puts the block into the bqueue. +func (bfs *Service) blockQueuer() { + defer close(bfs.blockQueuerToExiter) + + for b := range bfs.blocksCh { + select { + case <-bfs.ctx.Done(): + return + default: + err := bfs.enqueueBlock(b) + if err != nil { + bfs.log.Error("failed to enqueue block", zap.Error(err)) + bfs.stopService(true) + return + } + } + } +} + +// fetchOIDsFromIndexFiles fetches block OIDs from NeoFS by searching index files first. +func (bfs *Service) fetchOIDsFromIndexFiles() error { + h := bfs.chain.BlockHeight() + startIndex := h/bfs.cfg.IndexFileSize + 1 + skip := h % bfs.cfg.IndexFileSize + + for { + select { + case <-bfs.exiterToOIDDownloader: + return nil + default: + prm := client.PrmObjectSearch{} + filters := object.NewSearchFilters() + filters.AddFilter(bfs.cfg.IndexFileAttribute, fmt.Sprintf("%d", startIndex), object.MatchStringEqual) + prm.SetFilters(filters) + + ctx, cancel := context.WithTimeout(bfs.ctx, bfs.cfg.Timeout) + blockOidsObject, err := bfs.objectSearch(ctx, prm) + cancel() + if err != nil { + if isContextCanceledErr(err) { + return nil + } + return fmt.Errorf("failed to find '%s' object with index %d: %w", bfs.cfg.IndexFileAttribute, startIndex, err) + } + if len(blockOidsObject) == 0 { + bfs.log.Info(fmt.Sprintf("NeoFS BlockFetcher service: no '%s' object found with index %d, stopping", bfs.cfg.IndexFileAttribute, startIndex)) + return nil + } + + blockCtx, blockCancel := context.WithTimeout(bfs.ctx, bfs.cfg.Timeout) + defer blockCancel() + oidsRC, err := bfs.objectGet(blockCtx, blockOidsObject[0].String()) + if err != nil { + if isContextCanceledErr(err) { + return nil + } + return fmt.Errorf("failed to fetch '%s' object with index %d: %w", bfs.cfg.IndexFileAttribute, startIndex, err) + } + + err = bfs.streamBlockOIDs(oidsRC, int(skip)) + if err != nil { + if isContextCanceledErr(err) { + return nil + } + return fmt.Errorf("failed to stream block OIDs with index %d: %w", startIndex, err) + } + + startIndex++ + skip = 0 + } + } +} + +// streamBlockOIDs reads block OIDs from the read closer and sends them to the OIDs channel. +func (bfs *Service) streamBlockOIDs(rc io.ReadCloser, skip int) error { + defer rc.Close() + oidBytes := make([]byte, oidSize) + oidsProcessed := 0 + + for { + _, err := io.ReadFull(rc, oidBytes) + if err == io.EOF { + break + } + if err != nil { + return fmt.Errorf("failed to read OID: %w", err) + } + + if oidsProcessed < skip { + oidsProcessed++ + continue + } + + var oidBlock oid.ID + if err := oidBlock.Decode(oidBytes); err != nil { + return fmt.Errorf("failed to decode OID: %w", err) + } + + select { + case <-bfs.exiterToOIDDownloader: + return nil + case bfs.oidsCh <- oidBlock: + } + + oidsProcessed++ + } + if oidsProcessed != int(bfs.cfg.IndexFileSize) { + return fmt.Errorf("block OIDs count mismatch: expected %d, processed %d", bfs.cfg.IndexFileSize, oidsProcessed) + } + return nil +} + +// fetchOIDsBySearch fetches block OIDs from NeoFS by searching through the Block objects. +func (bfs *Service) fetchOIDsBySearch() error { + startIndex := bfs.chain.BlockHeight() + batchSize := uint32(bfs.cfg.OIDBatchSize) + + for { + select { + case <-bfs.exiterToOIDDownloader: + return nil + default: + prm := client.PrmObjectSearch{} + filters := object.NewSearchFilters() + filters.AddFilter(bfs.cfg.BlockAttribute, fmt.Sprintf("%d", startIndex), object.MatchNumGE) + filters.AddFilter(bfs.cfg.BlockAttribute, fmt.Sprintf("%d", startIndex+batchSize-1), object.MatchNumLE) + prm.SetFilters(filters) + ctx, cancel := context.WithTimeout(bfs.ctx, bfs.cfg.Timeout) + blockOids, err := bfs.objectSearch(ctx, prm) + cancel() + if err != nil { + if isContextCanceledErr(err) { + return nil + } + return err + } + + if len(blockOids) == 0 { + bfs.log.Info(fmt.Sprintf("NeoFS BlockFetcher service: no block found with index %d, stopping", startIndex)) + return nil + } + for _, oid := range blockOids { + select { + case <-bfs.exiterToOIDDownloader: + return nil + case bfs.oidsCh <- oid: + } + } + startIndex += batchSize + } + } +} + +// readBlock decodes the block from the read closer and prepares it for adding to the blockchain. +func (bfs *Service) readBlock(rc io.ReadCloser) (*block.Block, error) { + b := block.New(bfs.stateRootInHeader) + r := gio.NewBinReaderFromIO(rc) + b.DecodeBinary(r) + rc.Close() + return b, r.Err +} + +// Shutdown stops the NeoFS BlockFetcher service. It prevents service from new +// block OIDs search, cancels all in-progress downloading operations and waits +// until all service routines finish their work. +func (bfs *Service) Shutdown() { + if !bfs.IsActive() { + return + } + bfs.stopService(true) + <-bfs.exiterToShutdown +} + +// stopService close quitting goroutine once. It's the only entrypoint to shutdown +// procedure. +func (bfs *Service) stopService(force bool) { + bfs.quitOnce.Do(func() { + bfs.quit <- force + close(bfs.quit) + }) +} + +// exiter is a routine that is listening to a quitting signal and manages graceful +// Service shutdown process. +func (bfs *Service) exiter() { + // Closing signal may come from anyone, but only once. + force := <-bfs.quit + bfs.log.Info("shutting down NeoFS BlockFetcher service", + zap.Bool("force", force), + ) + + // Cansel all pending OIDs/blocks downloads in case if shutdown requested by user + // or caused by downloading error. + if force { + bfs.ctxCancel() + } + + // Send signal to OID downloader to stop. Wait until OID downloader finishes his + // work. + close(bfs.exiterToOIDDownloader) + <-bfs.oidDownloaderToExiter + + // Close OIDs channel to let block downloaders know that there are no more OIDs + // expected. Wait until all downloaders finish their work. + close(bfs.oidsCh) + bfs.wg.Wait() + + // Send signal to block putter to finish his work. Wait until it's finished. + close(bfs.blocksCh) + <-bfs.blockQueuerToExiter + + // Everything is done, release resources, turn off the activity marker and let + // the server know about it. + _ = bfs.client.Close() + _ = bfs.log.Sync() + bfs.isActive.CompareAndSwap(true, false) + bfs.shutdownCallback() + + // Notify Shutdown routine in case if it's user-triggered shutdown. + close(bfs.exiterToShutdown) +} + +// IsActive returns true if the NeoFS BlockFetcher service is running. +func (bfs *Service) IsActive() bool { + return bfs.isActive.Load() +} + +func (bfs *Service) objectGet(ctx context.Context, oid string) (io.ReadCloser, error) { + u, err := url.Parse(fmt.Sprintf("neofs:%s/%s", bfs.cfg.ContainerID, oid)) + if err != nil { + return nil, err + } + rc, err := neofs.GetWithClient(ctx, bfs.client, bfs.account.PrivateKey(), u, false) + if err != nil { + return nil, err + } + + return rc, nil +} + +func (bfs *Service) objectSearch(ctx context.Context, prm client.PrmObjectSearch) ([]oid.ID, error) { + return neofs.ObjectSearch(ctx, bfs.client, bfs.account.PrivateKey(), bfs.cfg.ContainerID, prm) +} + +// isContextCanceledErr returns whether error is a wrapped [context.Canceled]. +// Ref. https://github.com/nspcc-dev/neofs-sdk-go/issues/624. +func isContextCanceledErr(err error) bool { + return errors.Is(err, context.Canceled) || + strings.Contains(err.Error(), "context canceled") +} diff --git a/pkg/services/blockfetcher/blockfetcher_test.go b/pkg/services/blockfetcher/blockfetcher_test.go new file mode 100644 index 0000000000..339a67ea78 --- /dev/null +++ b/pkg/services/blockfetcher/blockfetcher_test.go @@ -0,0 +1,99 @@ +package blockfetcher + +import ( + "testing" + + "github.com/nspcc-dev/neo-go/pkg/config" + "github.com/nspcc-dev/neo-go/pkg/core/block" + "github.com/stretchr/testify/require" + "go.uber.org/zap" +) + +type mockLedger struct { + height uint32 +} + +func (m *mockLedger) GetConfig() config.Blockchain { + return config.Blockchain{} +} + +func (m *mockLedger) BlockHeight() uint32 { + return m.height +} + +type mockPutBlockFunc struct { + putCalled bool +} + +func (m *mockPutBlockFunc) putBlock(b *block.Block) error { + m.putCalled = true + return nil +} + +func TestServiceConstructor(t *testing.T) { + logger := zap.NewNop() + ledger := &mockLedger{height: 10} + mockPut := &mockPutBlockFunc{} + shutdownCallback := func() {} + + t.Run("empty configuration", func(t *testing.T) { + cfg := config.NeoFSBlockFetcher{ + Timeout: 0, + OIDBatchSize: 0, + DownloaderWorkersCount: 0, + } + _, err := New(ledger, cfg, logger, mockPut.putBlock, shutdownCallback) + require.Error(t, err) + }) + + t.Run("no addresses", func(t *testing.T) { + cfg := config.NeoFSBlockFetcher{ + Addresses: []string{}, + } + _, err := New(ledger, cfg, logger, mockPut.putBlock, shutdownCallback) + require.Error(t, err) + }) + + t.Run("default values", func(t *testing.T) { + cfg := config.NeoFSBlockFetcher{ + Addresses: []string{"http://localhost:8080"}, + } + service, err := New(ledger, cfg, logger, mockPut.putBlock, shutdownCallback) + require.NoError(t, err) + require.NotNil(t, service) + + require.Equal(t, service.IsActive(), false) + require.Equal(t, service.cfg.Timeout, defaultTimeout) + require.Equal(t, service.cfg.OIDBatchSize, defaultOIDBatchSize) + require.Equal(t, service.cfg.DownloaderWorkersCount, defaultDownloaderWorkersCount) + require.Equal(t, service.IsActive(), false) + }) + + t.Run("SDK client", func(t *testing.T) { + cfg := config.NeoFSBlockFetcher{ + Addresses: []string{"http://localhost:8080"}, + } + service, err := New(ledger, cfg, logger, mockPut.putBlock, shutdownCallback) + require.NoError(t, err) + err = service.Start() + require.Error(t, err) + require.Contains(t, err.Error(), "create SDK client") + require.Equal(t, service.IsActive(), false) + }) + + t.Run("invalid wallet", func(t *testing.T) { + cfg := config.NeoFSBlockFetcher{ + Addresses: []string{"http://localhost:8080"}, + InternalService: config.InternalService{ + Enabled: true, + UnlockWallet: config.Wallet{ + Path: "invalid/path/to/wallet.json", + Password: "wrong-password", + }, + }, + } + _, err := New(ledger, cfg, logger, mockPut.putBlock, shutdownCallback) + require.Error(t, err) + require.Contains(t, err.Error(), "no such file or directory") + }) +}