diff --git a/config/protocol.testnet.yml b/config/protocol.testnet.yml index bdab3cd20d..5383c14e21 100644 --- a/config/protocol.testnet.yml +++ b/config/protocol.testnet.yml @@ -100,3 +100,15 @@ ApplicationConfiguration: Enabled: false Addresses: - ":2113" + NeoFSBlockFetcher: + Enabled: true + Addresses: + - st1.storage.fs.neo.org:8080 + Timeout: 10m + DownloaderWorkersCount: 500 + OIDBatchSize: 8000 + BQueueSize: 16000 # must be larger than OIDBatchSize; highly recommended to be 2*OIDBatchSize or 3*OIDBatchSize + SkipIndexFilesSearch: false + ContainerID: "EPGuD26wYgQJbmDdVBoYoNZiMKHwFMJT3A5WqPjdUHxH" + BlockAttribute: "block" + OidAttribute: "oid" diff --git a/docs/node-configuration.md b/docs/node-configuration.md index 83c149e620..351ddb06bd 100644 --- a/docs/node-configuration.md +++ b/docs/node-configuration.md @@ -21,6 +21,7 @@ node-related settings described in the table below. | GarbageCollectionPeriod | `uint32` | 10000 | Controls MPT garbage collection interval (in blocks) for configurations with `RemoveUntraceableBlocks` enabled and `KeepOnlyLatestState` disabled. In this mode the node stores a number of MPT trees (corresponding to `MaxTraceableBlocks` and `StateSyncInterval`), but the DB needs to be clean from old entries from time to time. Doing it too often will cause too much processing overhead, doing it too rarely will leave more useless data in the DB. | | KeepOnlyLatestState | `bool` | `false` | Specifies if MPT should only store the latest state (or a set of latest states, see `P2PStateExchangeExtensions` section in the ProtocolConfiguration for details). If true, DB size will be smaller, but older roots won't be accessible. This value should remain the same for the same database. | | | LogPath | `string` | "", so only console logging | File path where to store node logs. | +| NeoFSBlockFetcher | [NeoFSBlockFetcher Configuration](#NeoFSBlockFetcher-Configuration) | | NeoFSBlockFetcher module configuration. See the [NeoFSBlockFetcher Configuration](#NeoFSBlockFetcher-Configuration) section for details. | | Oracle | [Oracle Configuration](#Oracle-Configuration) | | Oracle module configuration. See the [Oracle Configuration](#Oracle-Configuration) section for details. | | P2P | [P2P Configuration](#P2P-Configuration) | | Configuration values for P2P network interaction. See the [P2P Configuration](#P2P-Configuration) section for details. | | P2PNotary | [P2P Notary Configuration](#P2P-Notary-Configuration) | | P2P Notary module configuration. See the [P2P Notary Configuration](#P2P-Notary-Configuration) section for details. | @@ -323,6 +324,40 @@ where: - `Path` is a path to wallet. - `Password` is a wallet password. +### NeoFSBlockFetcher Configuration +`NeoFSBlockFetcher` configuration section contains settings for NeoFS block fetcher +module and has the following structure: +``` + NeoFSBlockFetcher: + Enabled: true + Addresses: + - st1.storage.fs.neo.org:8080 + Timeout: 10m + DownloaderWorkersCount: 500 + OIDBatchSize: 8000 + BQueueSize: 16000 + SkipIndexFilesSearch: false + ContainerID: "EPGuD26wYgQJbmDdVBoYoNZiMKHwFMJT3A5WqPjdUHxH" + BlockAttribute: "block" + OidAttribute: "oid" +``` +where: +- `Enabled` enables NeoFS block fetcher module. +- `UnlockWallet` contains wallet settings, see + [Unlock Wallet Configuration](#Unlock-Wallet-Configuration) section for + structure details. Without this setting, the module will use new generated private key. +- `Addresses` is a list of NeoFS storage nodes addresses. +- `Timeout` is a timeout for a single request to NeoFS storage node. +- `ContainerID` is a container ID to fetch blocks from. +- `BlockAttribute` is an attribute name of NeoFS object that contains block data. +- `OidAttribute` is an attribute name of NeoFS object that contains OIDs of blocks objects. +- `DownloaderWorkersCount` is a number of workers that download blocks from NeoFS. +- `OIDBatchSize` is a number of OIDs to search and fetch from NeoFS in one request in case of SkipIndexFilesSearch=true. +- `BQueueSize` is a size of the block queue. It must be larger than `OIDBatchSize` and + highly recommended to be 2*`OIDBatchSize` or 3*`OIDBatchSize`. +- `SkipIndexFilesSearch` is a flag that allows skipping index files search in NeoFS + storage nodes and search for blocks directly. It is set to `false` by default. + ## Protocol Configuration `ProtocolConfiguration` section of `yaml` node configuration file contains diff --git a/go.mod b/go.mod index 7c9045334b..1f6b143b5e 100644 --- a/go.mod +++ b/go.mod @@ -32,6 +32,7 @@ require ( golang.org/x/term v0.18.0 golang.org/x/text v0.14.0 golang.org/x/tools v0.19.0 + google.golang.org/grpc v1.62.0 gopkg.in/yaml.v3 v3.0.1 ) @@ -69,7 +70,6 @@ require ( golang.org/x/net v0.23.0 // indirect golang.org/x/sys v0.18.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240221002015-b0ce06bbee7c // indirect - google.golang.org/grpc v1.62.0 // indirect google.golang.org/protobuf v1.33.0 // indirect rsc.io/tmplfunc v0.0.3 // indirect ) diff --git a/pkg/config/application_config.go b/pkg/config/application_config.go index 9ac369238a..16299087c0 100644 --- a/pkg/config/application_config.go +++ b/pkg/config/application_config.go @@ -23,12 +23,13 @@ type ApplicationConfiguration struct { Pprof BasicService `yaml:"Pprof"` Prometheus BasicService `yaml:"Prometheus"` - Relay bool `yaml:"Relay"` - Consensus Consensus `yaml:"Consensus"` - RPC RPC `yaml:"RPC"` - Oracle OracleConfiguration `yaml:"Oracle"` - P2PNotary P2PNotary `yaml:"P2PNotary"` - StateRoot StateRoot `yaml:"StateRoot"` + Relay bool `yaml:"Relay"` + Consensus Consensus `yaml:"Consensus"` + RPC RPC `yaml:"RPC"` + Oracle OracleConfiguration `yaml:"Oracle"` + P2PNotary P2PNotary `yaml:"P2PNotary"` + StateRoot StateRoot `yaml:"StateRoot"` + NeoFSBlockFetcher NeoFSBlockFetcher `yaml:"NeoFSBlockFetcher"` } // EqualsButServices returns true when the o is the same as a except for services @@ -141,3 +142,13 @@ func (a *ApplicationConfiguration) GetAddresses() ([]AnnounceableAddress, error) } return addrs, nil } + +// Validate checks ApplicationConfiguration for internal consistency and returns +// an error if any invalid settings are found. This ensures that the application +// configuration is valid and safe to use for further operations. +func (a *ApplicationConfiguration) Validate() error { + if err := a.NeoFSBlockFetcher.Validate(); err != nil { + return fmt.Errorf("failed to validate NeoFSBlockFetcher section: %w", err) + } + return nil +} diff --git a/pkg/config/blockfetcher_config.go b/pkg/config/blockfetcher_config.go new file mode 100644 index 0000000000..9449d6caf7 --- /dev/null +++ b/pkg/config/blockfetcher_config.go @@ -0,0 +1,60 @@ +package config + +import ( + "errors" + "fmt" + "time" + + cid "github.com/nspcc-dev/neofs-sdk-go/container/id" +) + +const ( + DefaultTimeout = 1 * time.Minute + DefaultOIDBatchSize = 8000 + DefaultDownloaderWorkersCount = 500 +) + +// NeoFSBlockFetcher represents the configuration for the NeoFS block fetcher service. +type NeoFSBlockFetcher struct { + InternalService `yaml:",inline"` + Timeout time.Duration `yaml:"Timeout"` + ContainerID string `yaml:"ContainerID"` + Addresses []string `yaml:"Addresses"` + OIDBatchSize int `yaml:"OIDBatchSize"` // valid only for SkipIndexFilesSearch = true + BlockAttribute string `yaml:"BlockAttribute"` + OidAttribute string `yaml:"OidAttribute"` + HeaderAttribute string `yaml:"HeaderAttribute"` + DownloaderWorkersCount int `yaml:"DownloaderWorkersCount"` + BQueueSize int `yaml:"BQueueSize"` + SkipIndexFilesSearch bool `yaml:"SkipIndexFilesSearch"` +} + +// Validate checks NeoFSBlockFetcher for internal consistency and ensures +// that all required fields are properly set. It returns an error if the +// configuration is invalid or if the ContainerID cannot be properly decoded. +func (cfg *NeoFSBlockFetcher) Validate() error { + if !cfg.Enabled { + return nil + } + if cfg.ContainerID == "" { + return errors.New("container ID is not set") + } + var containerID cid.ID + err := containerID.DecodeString(cfg.ContainerID) + if err != nil { + return fmt.Errorf("invalid container ID: %w", err) + } + if cfg.Timeout == 0 { + cfg.Timeout = DefaultTimeout + } + if cfg.OIDBatchSize == 0 { + cfg.OIDBatchSize = DefaultOIDBatchSize + } + if cfg.DownloaderWorkersCount <= 0 { + cfg.DownloaderWorkersCount = DefaultDownloaderWorkersCount + } + if cfg.BQueueSize < cfg.OIDBatchSize { + return fmt.Errorf("BQueueSize (%d) is lower than OIDBatchSize (%d)", cfg.BQueueSize, cfg.OIDBatchSize) + } + return nil +} diff --git a/pkg/config/config.go b/pkg/config/config.go index a82838ec21..0278614333 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -116,6 +116,10 @@ func LoadFile(configPath string, relativePath ...string) (Config, error) { if err != nil { return Config{}, err } + err = config.ApplicationConfiguration.Validate() + if err != nil { + return Config{}, err + } return config, nil } diff --git a/pkg/network/server.go b/pkg/network/server.go index ef0e33ac79..86573b6aa3 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -28,6 +28,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/network/capability" "github.com/nspcc-dev/neo-go/pkg/network/extpool" "github.com/nspcc-dev/neo-go/pkg/network/payload" + "github.com/nspcc-dev/neo-go/pkg/services/blockfetcher" "github.com/nspcc-dev/neo-go/pkg/util" "go.uber.org/zap" ) @@ -103,10 +104,12 @@ type ( chain Ledger bQueue *bqueue.Queue bSyncQueue *bqueue.Queue + bFetcherQueue *bqueue.Queue mempool *mempool.Pool notaryRequestPool *mempool.Pool extensiblePool *extpool.Pool notaryFeer NotaryFeer + blockFetcher *blockfetcher.Service serviceLock sync.RWMutex services map[string]Service @@ -133,6 +136,7 @@ type ( runFin chan struct{} broadcastTxFin chan struct{} runProtoFin chan struct{} + blockFetcherFin chan struct{} transactions chan *transaction.Transaction @@ -171,6 +175,7 @@ func newServerFromConstructors(config ServerConfig, chain Ledger, stSync StateSy newTransport func(*Server, string) Transporter, newDiscovery func([]string, time.Duration, Transporter) Discoverer, ) (*Server, error) { + var err error if log == nil { return nil, errors.New("logger is a required parameter") } @@ -182,28 +187,30 @@ func newServerFromConstructors(config ServerConfig, chain Ledger, stSync StateSy } s := &Server{ - ServerConfig: config, - chain: chain, - id: randomID(), - config: chain.GetConfig().ProtocolConfiguration, - quit: make(chan struct{}), - relayFin: make(chan struct{}), - runFin: make(chan struct{}), - broadcastTxFin: make(chan struct{}), - runProtoFin: make(chan struct{}), - register: make(chan Peer), - unregister: make(chan peerDrop), - handshake: make(chan Peer), - txInMap: make(map[util.Uint256]struct{}), - peers: make(map[Peer]bool), - mempool: chain.GetMemPool(), - extensiblePool: extpool.New(chain, config.ExtensiblePoolSize), - log: log, - txin: make(chan *transaction.Transaction, 64), - transactions: make(chan *transaction.Transaction, 64), - services: make(map[string]Service), - extensHandlers: make(map[string]func(*payload.Extensible) error), - stateSync: stSync, + ServerConfig: config, + chain: chain, + id: randomID(), + config: chain.GetConfig().ProtocolConfiguration, + quit: make(chan struct{}), + relayFin: make(chan struct{}), + runFin: make(chan struct{}), + broadcastTxFin: make(chan struct{}), + runProtoFin: make(chan struct{}), + blockFetcherFin: make(chan struct{}, 1), + register: make(chan Peer), + unregister: make(chan peerDrop), + handshake: make(chan Peer), + txInMap: make(map[util.Uint256]struct{}), + peers: make(map[Peer]bool), + mempool: chain.GetMemPool(), + extensiblePool: extpool.New(chain, config.ExtensiblePoolSize), + log: log, + txin: make(chan *transaction.Transaction, 64), + transactions: make(chan *transaction.Transaction, 64), + services: make(map[string]Service), + extensHandlers: make(map[string]func(*payload.Extensible) error), + stateSync: stSync, + } if chain.P2PSigExtensionsEnabled() { s.notaryFeer = NewNotaryFeer(chain) @@ -216,10 +223,16 @@ func newServerFromConstructors(config ServerConfig, chain Ledger, stSync StateSy } s.bQueue = bqueue.New(chain, log, func(b *block.Block) { s.tryStartServices() - }, bqueue.DefaultCacheSize, updateBlockQueueLenMetric) - - s.bSyncQueue = bqueue.New(s.stateSync, log, nil, bqueue.DefaultCacheSize, updateBlockQueueLenMetric) + }, bqueue.DefaultCacheSize, updateBlockQueueLenMetric, bqueue.NonBlocking) + s.bSyncQueue = bqueue.New(s.stateSync, log, nil, bqueue.DefaultCacheSize, updateBlockQueueLenMetric, bqueue.NonBlocking) + s.bFetcherQueue = bqueue.New(chain, log, nil, s.NeoFSBlockFetcherCfg.BQueueSize, updateBlockQueueLenMetric, bqueue.Blocking) + s.blockFetcher, err = blockfetcher.New(chain, s.NeoFSBlockFetcherCfg, log, s.bFetcherQueue.PutBlock, func() { + close(s.blockFetcherFin) + }) + if err != nil { + return nil, err + } if s.MinPeers < 0 { s.log.Info("bad MinPeers configured, using the default value", zap.Int("configured", s.MinPeers), @@ -295,6 +308,14 @@ func (s *Server) Start() { go s.relayBlocksLoop() go s.bQueue.Run() go s.bSyncQueue.Run() + go s.bFetcherQueue.Run() + if s.ServerConfig.NeoFSBlockFetcherCfg.Enabled && s.blockFetcher != nil { + err := s.blockFetcher.Start() + if err != nil { + s.log.Error("NeoFS block fetcher service:", zap.Error(err)) + s.blockFetcher.Shutdown() + } + } for _, tr := range s.transports { go tr.Accept() } @@ -319,6 +340,7 @@ func (s *Server) Shutdown() { } s.bQueue.Discard() s.bSyncQueue.Discard() + s.bFetcherQueue.Discard() s.serviceLock.RLock() for _, svc := range s.services { svc.Shutdown() @@ -546,6 +568,9 @@ func (s *Server) run() { s.discovery.RegisterGood(p) + s.tryInitStateSync() + s.tryStartServices() + case <-s.blockFetcherFin: s.tryInitStateSync() s.tryStartServices() } @@ -702,7 +727,7 @@ func (s *Server) IsInSync() bool { var peersNumber int var notHigher int - if s.stateSync.IsActive() { + if s.stateSync.IsActive() || s.blockFetcher.IsActive() { return false } @@ -762,6 +787,9 @@ func (s *Server) handleVersionCmd(p Peer, version *payload.Version) error { // handleBlockCmd processes the block received from its peer. func (s *Server) handleBlockCmd(p Peer, block *block.Block) error { + if s.blockFetcher.IsActive() { + return nil + } if s.stateSync.IsActive() { return s.bSyncQueue.PutBlock(block) } @@ -782,6 +810,9 @@ func (s *Server) handlePing(p Peer, ping *payload.Ping) error { } func (s *Server) requestBlocksOrHeaders(p Peer) error { + if s.blockFetcher.IsActive() { + return nil + } if s.stateSync.NeedHeaders() { if s.chain.HeaderHeight() < p.LastBlockIndex() { return s.requestHeaders(p) @@ -1428,6 +1459,9 @@ func (s *Server) handleMessage(peer Peer, msg *Message) error { } func (s *Server) tryInitStateSync() { + if s.blockFetcher.IsActive() { + return + } if !s.stateSync.IsActive() { s.bSyncQueue.Discard() return diff --git a/pkg/network/server_config.go b/pkg/network/server_config.go index c0f1e727f6..a9cc9beda0 100644 --- a/pkg/network/server_config.go +++ b/pkg/network/server_config.go @@ -76,6 +76,8 @@ type ( // BroadcastFactor is the factor (0-100) for fan-out optimization. BroadcastFactor int + + NeoFSBlockFetcherCfg config.NeoFSBlockFetcher } ) @@ -89,24 +91,25 @@ func NewServerConfig(cfg config.Config) (ServerConfig, error) { return ServerConfig{}, fmt.Errorf("failed to parse addresses: %w", err) } c := ServerConfig{ - UserAgent: cfg.GenerateUserAgent(), - Addresses: addrs, - Net: protoConfig.Magic, - Relay: appConfig.Relay, - Seeds: protoConfig.SeedList, - DialTimeout: appConfig.P2P.DialTimeout, - ProtoTickInterval: appConfig.P2P.ProtoTickInterval, - PingInterval: appConfig.P2P.PingInterval, - PingTimeout: appConfig.P2P.PingTimeout, - MaxPeers: appConfig.P2P.MaxPeers, - AttemptConnPeers: appConfig.P2P.AttemptConnPeers, - MinPeers: appConfig.P2P.MinPeers, - TimePerBlock: protoConfig.TimePerBlock, - OracleCfg: appConfig.Oracle, - P2PNotaryCfg: appConfig.P2PNotary, - StateRootCfg: appConfig.StateRoot, - ExtensiblePoolSize: appConfig.P2P.ExtensiblePoolSize, - BroadcastFactor: appConfig.P2P.BroadcastFactor, + UserAgent: cfg.GenerateUserAgent(), + Addresses: addrs, + Net: protoConfig.Magic, + Relay: appConfig.Relay, + Seeds: protoConfig.SeedList, + DialTimeout: appConfig.P2P.DialTimeout, + ProtoTickInterval: appConfig.P2P.ProtoTickInterval, + PingInterval: appConfig.P2P.PingInterval, + PingTimeout: appConfig.P2P.PingTimeout, + MaxPeers: appConfig.P2P.MaxPeers, + AttemptConnPeers: appConfig.P2P.AttemptConnPeers, + MinPeers: appConfig.P2P.MinPeers, + TimePerBlock: protoConfig.TimePerBlock, + OracleCfg: appConfig.Oracle, + P2PNotaryCfg: appConfig.P2PNotary, + StateRootCfg: appConfig.StateRoot, + ExtensiblePoolSize: appConfig.P2P.ExtensiblePoolSize, + BroadcastFactor: appConfig.P2P.BroadcastFactor, + NeoFSBlockFetcherCfg: appConfig.NeoFSBlockFetcher, } return c, nil } diff --git a/pkg/services/blockfetcher/blockfetcher.go b/pkg/services/blockfetcher/blockfetcher.go new file mode 100644 index 0000000000..3f1e72d7e1 --- /dev/null +++ b/pkg/services/blockfetcher/blockfetcher.go @@ -0,0 +1,363 @@ +package blockfetcher + +import ( + "context" + "crypto/sha256" + "errors" + "fmt" + "io" + "net/url" + "sync" + "sync/atomic" + + "github.com/nspcc-dev/neo-go/pkg/config" + "github.com/nspcc-dev/neo-go/pkg/core/block" + gio "github.com/nspcc-dev/neo-go/pkg/io" + "github.com/nspcc-dev/neo-go/pkg/services/oracle/neofs" + "github.com/nspcc-dev/neo-go/pkg/wallet" + "github.com/nspcc-dev/neofs-sdk-go/client" + "github.com/nspcc-dev/neofs-sdk-go/object" + oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + "go.uber.org/zap" +) + +const ( + oidSize = sha256.Size + oidBatchSize = 128000 +) + +// Ledger is an interface to Blockchain sufficient for Service. +type Ledger interface { + AddBlock(block *block.Block) error + GetConfig() config.Blockchain + BlockHeight() uint32 + AddHeaders(...*block.Header) error + HeaderHeight() uint32 +} + +// Service is a service that fetches blocks from NeoFS. +type Service struct { + started atomic.Bool + log *zap.Logger + cfg config.NeoFSBlockFetcher + stateRootInHeader bool + + chain Ledger + client *client.Client + putBlock func(*block.Block) error + account *wallet.Account + + oidsCh chan oid.ID + blocksCh chan *block.Block + wg sync.WaitGroup + + quit chan struct{} + shutdownCallback func() +} + +// New creates a new BlockFetcherService. +func New(chain Ledger, cfg config.NeoFSBlockFetcher, logger *zap.Logger, putBlock func(*block.Block) error, shutdownCallback func()) (*Service, error) { + var ( + account *wallet.Account + err error + ) + + if cfg.UnlockWallet.Path != "" { + walletFromFile, err := wallet.NewWalletFromFile(cfg.UnlockWallet.Path) + if err != nil { + return nil, err + } + for _, acc := range walletFromFile.Accounts { + if err := acc.Decrypt(cfg.UnlockWallet.Password, walletFromFile.Scrypt); err == nil { + account = acc + break + } + } + if account == nil { + return nil, errors.New("failed to decrypt any account in the wallet") + } + } else { + account, err = wallet.NewAccount() + if err != nil { + return nil, err + } + } + return &Service{ + chain: chain, + log: logger, + quit: make(chan struct{}), + cfg: cfg, + + putBlock: putBlock, + account: account, + stateRootInHeader: chain.GetConfig().StateRootInHeader, + shutdownCallback: shutdownCallback, + + // Use buffer of two batch sizes to load OIDs in advance: + // * first full block of OIDs is processing by Downloader + // * second full block of OIDs is available to be fetched by Downloader immediately + // * third half-filled block of OIDs is being collected by OIDsFetcher. + oidsCh: make(chan oid.ID, 2*cfg.OIDBatchSize), + + // Use buffer of a single OIDs batch size to provide smooth downloading and + // avoid pauses during blockqueue insertion. + blocksCh: make(chan *block.Block, cfg.OIDBatchSize), + }, nil +} + +// Start runs the block fetcher service. +func (bfs *Service) Start() error { + if !bfs.started.CompareAndSwap(false, true) { + return nil + } + bfs.log.Info("starting NeoFS block fetcher service") + + var err error + bfs.client, err = neofs.GetSDKClient(context.Background(), bfs.cfg.Addresses[0]) + if err != nil { + return err + } + + // Start OIDs downloader routine. + //bfs.wg.Add(1) + go bfs.oidDownloader() + + // Start the set of blocks downloading routines. + for i := 0; i < bfs.cfg.DownloaderWorkersCount; i++ { + bfs.wg.Add(1) + go bfs.blockDownloader() + } + + // Start routine that puts blocks into bQueue. + //bfs.wg.Add(1) + go bfs.blockPutter() + + return nil +} + +// oidDownloader runs the appropriate blocks OID fetching method based on the configuration. +func (bfs *Service) oidDownloader() { + //defer bfs.wg.Done() + var err error + + if bfs.cfg.SkipIndexFilesSearch { + err = bfs.fetchOIDsBySearch() + } else { + err = bfs.fetchOIDsFromIndexFiles() + } + + if err != nil { + bfs.log.Error("NeoFS block fetcher service: fetch operation failed", zap.Error(err)) + bfs.Shutdown() + } + close(bfs.oidsCh) +} + +func (bfs *Service) blockPutter() { + //defer bfs.wg.Done() + for { + select { + case <-bfs.quit: + return + case b, ok := <-bfs.blocksCh: + if !ok { + return + } + err := bfs.putBlock(b) + if err != nil { + bfs.log.Error("failed to put block", zap.Error(err)) + bfs.Shutdown() + } + } + } +} + +func (bfs *Service) blockDownloader() { + defer bfs.wg.Done() + for { + select { + case <-bfs.quit: + return + case blkOid, ok := <-bfs.oidsCh: + if !ok { + return + } + ctx, cancel := context.WithTimeout(context.Background(), bfs.cfg.Timeout) + defer cancel() + rc, err := bfs.objectGet(ctx, blkOid.String()) + if err != nil { + bfs.log.Error("failed to objectGet block", zap.Error(err)) + bfs.Shutdown() + } + b, err := bfs.getBlock(rc) + if err != nil { + bfs.log.Error("failed to process block", zap.Error(err)) + bfs.Shutdown() + } + bfs.blocksCh <- b + } + } +} + +// fetchOIDsFromIndexFiles fetches block OIDs from NeoFS by searching index files first. +func (bfs *Service) fetchOIDsFromIndexFiles() error { + h := bfs.chain.BlockHeight() + startIndex := h/oidBatchSize + 1 + skip := h % oidBatchSize + + for { + select { + case <-bfs.quit: + return nil + default: + prm := client.PrmObjectSearch{} + filters := object.NewSearchFilters() + filters.AddFilter(bfs.cfg.OidAttribute, fmt.Sprintf("%d", startIndex), object.MatchStringEqual) + prm.SetFilters(filters) + + ctx, cancel := context.WithTimeout(context.Background(), bfs.cfg.Timeout) + blockOidsObject, err := bfs.objectSearch(ctx, prm) + cancel() + if err != nil { + return fmt.Errorf("failed to find '%s' object with index %d: %w", bfs.cfg.OidAttribute, startIndex, err) + } + if len(blockOidsObject) == 0 { + bfs.log.Info(fmt.Sprintf("NeoFS block fetcher service: no '%s' object found with index %d, stopping", bfs.cfg.OidAttribute, startIndex)) + return nil + } + + blockCtx, blockCancel := context.WithTimeout(context.Background(), bfs.cfg.Timeout) + defer blockCancel() + blockOidsData, err := bfs.objectGet(blockCtx, blockOidsObject[0].String()) + if err != nil { + return fmt.Errorf("failed to fetch '%s' object with index %d: %w", bfs.cfg.OidAttribute, startIndex, err) + } + err = bfs.streamBlockOIDs(blockOidsData, int(skip)) + if err != nil { + return fmt.Errorf("failed to stream block OIDs with index %d: %w", startIndex, err) + } + + startIndex++ + skip = 0 + } + } +} + +// streamBlockOIDs reads block OIDs from the read closer and sends them to the OIDs channel. +func (bfs *Service) streamBlockOIDs(rc io.ReadCloser, skip int) error { + defer rc.Close() + oidBytes := make([]byte, oidSize) + oidsProcessed := 0 + + for { + _, err := io.ReadFull(rc, oidBytes) + if err == io.EOF { + break + } + if err != nil { + return fmt.Errorf("failed to read OID: %w", err) + } + + if oidsProcessed < skip { + oidsProcessed++ + continue + } + + var oidBlock oid.ID + if err := oidBlock.Decode(oidBytes); err != nil { + return fmt.Errorf("failed to decode OID: %w", err) + } + + select { + case bfs.oidsCh <- oidBlock: + case <-bfs.quit: + return nil + } + + oidsProcessed++ + } + return nil +} + +// fetchOIDsBySearch fetches block OIDs from NeoFS by searching through the Block objects. +func (bfs *Service) fetchOIDsBySearch() error { + startIndex := bfs.chain.BlockHeight() + batchSize := uint32(bfs.cfg.OIDBatchSize) + + for { + select { + case <-bfs.quit: + bfs.log.Info("stopping NeoFS data fetching operation") + return nil + default: + prm := client.PrmObjectSearch{} + filters := object.NewSearchFilters() + filters.AddFilter(bfs.cfg.BlockAttribute, fmt.Sprintf("%d", startIndex), object.MatchNumGE) + filters.AddFilter(bfs.cfg.BlockAttribute, fmt.Sprintf("%d", startIndex+batchSize-1), object.MatchNumLE) + prm.SetFilters(filters) + ctx, cancel := context.WithTimeout(context.Background(), bfs.cfg.Timeout) + blockOids, err := bfs.objectSearch(ctx, prm) + cancel() + if err != nil { + bfs.log.Error(fmt.Sprintf("NeoFS block fetcher service: failed to find %s object with index from %d to %d", bfs.cfg.BlockAttribute, startIndex, startIndex+batchSize-1), zap.Error(err)) + return err + } + + if len(blockOids) == 0 { + bfs.log.Info(fmt.Sprintf("NeoFS block fetcher service: no block found with index %d, stopping", startIndex)) + return nil + } + for _, oid := range blockOids { + bfs.oidsCh <- oid + } + startIndex += batchSize + } + } +} + +// getBlock decodes the block from the read closer and prepares it for adding to the blockchain. +func (bfs *Service) getBlock(rc io.ReadCloser) (*block.Block, error) { + b := block.New(bfs.stateRootInHeader) + r := gio.NewBinReaderFromIO(rc) + b.DecodeBinary(r) + rc.Close() + if r.Err != nil { + return nil, r.Err + } + return b, nil +} + +// Shutdown stops the block fetcher service gracefully. +func (bfs *Service) Shutdown() { + if !bfs.started.CompareAndSwap(true, false) { + return + } + bfs.log.Info("shutting down NeoFS block fetcher service") + close(bfs.blocksCh) + close(bfs.quit) + bfs.wg.Wait() + bfs.shutdownCallback() + _ = bfs.log.Sync() +} + +// IsActive returns true if the block fetcher service is running. +func (bfs *Service) IsActive() bool { + return bfs.started.Load() +} + +func (bfs *Service) objectGet(ctx context.Context, oid string) (io.ReadCloser, error) { + u, err := url.Parse(fmt.Sprintf("neofs:%s/%s", bfs.cfg.ContainerID, oid)) + if err != nil { + return nil, err + } + rc, err := neofs.GetWithClient(ctx, bfs.client, bfs.account.PrivateKey(), u, false) + if err != nil { + return nil, err + } + + return rc, nil +} + +func (bfs *Service) objectSearch(ctx context.Context, prm client.PrmObjectSearch) ([]oid.ID, error) { + return neofs.ObjectSearch(ctx, bfs.client, bfs.account.PrivateKey(), bfs.cfg.ContainerID, prm) +} diff --git a/pkg/services/blockfetcher/blockfetcher_test.go b/pkg/services/blockfetcher/blockfetcher_test.go new file mode 100644 index 0000000000..f7c048885d --- /dev/null +++ b/pkg/services/blockfetcher/blockfetcher_test.go @@ -0,0 +1,75 @@ +package blockfetcher_test + +import ( + "context" + "fmt" + "io" + "net/url" + "testing" + "time" + + "github.com/nspcc-dev/neo-go/pkg/config" + "github.com/nspcc-dev/neo-go/pkg/crypto/keys" + "github.com/nspcc-dev/neo-go/pkg/services/oracle/neofs" + "github.com/nspcc-dev/neofs-sdk-go/client" + cid "github.com/nspcc-dev/neofs-sdk-go/container/id" + "github.com/nspcc-dev/neofs-sdk-go/object" + oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + "github.com/nspcc-dev/neofs-sdk-go/user" + "github.com/stretchr/testify/require" +) + +func TestService(t *testing.T) { + cfg := config.NeoFSBlockFetcher{ + ContainerID: "9iVfUg8aDHKjPC4LhQXEkVUM4HDkR7UCXYLs8NQwYfSG", + InternalService: config.InternalService{ + Enabled: true, + UnlockWallet: config.Wallet{Password: "one", Path: "./"}, + }, + Addresses: []string{"st1.t5.fs.neo.org:8080"}, + Timeout: 15 * time.Second, + } + prm := client.PrmObjectSearch{} + filters := object.NewSearchFilters() + + filters.AddFilter("index_block", fmt.Sprintf("%d", 0), object.MatchStringEqual) + filters.AddFilter("index_block", fmt.Sprintf("%d", 0), object.MatchNumLE) + prm.SetFilters(filters) + + privateKey, err := keys.NewPrivateKey() + require.NoError(t, err) + + var containerID cid.ID + err = containerID.DecodeString(cfg.ContainerID) + require.NoError(t, err) + var ( + s = user.NewAutoIDSignerRFC6979(privateKey.PrivateKey) + objectIDs []oid.ID + ) + ctx, cancel := context.WithTimeout(context.Background(), cfg.Timeout) + defer cancel() + + neofsClient, err := neofs.GetSDKClient(ctx, + cfg.Addresses[0]) + require.NoError(t, err) + + reader, err := neofsClient.ObjectSearchInit(ctx, containerID, s, prm) + require.NoError(t, err) + defer reader.Close() + + err = reader.Iterate(func(oid oid.ID) bool { + objectIDs = append(objectIDs, oid) + return false + }) + require.NoError(t, err) + fmt.Println(objectIDs) + + oid := "3uHQb3SYPEhoxJigTtRALwhiha3nCzL7GsN6PGYMjwhT" + u, err := url.Parse(fmt.Sprintf("neofs:%s/%s", containerID, oid)) + require.NoError(t, err) + rc, err := neofs.GetWithClient(ctx, neofsClient, privateKey, u, false) + require.NoError(t, err) + data, err := io.ReadAll(rc) + require.NoError(t, err) + fmt.Println(data) +}