diff --git a/cli/server/dump_bin.go b/cli/server/dump_bin.go new file mode 100644 index 0000000000..00aeb932e2 --- /dev/null +++ b/cli/server/dump_bin.go @@ -0,0 +1,87 @@ +package server + +import ( + "fmt" + "os" + "path/filepath" + + "github.com/nspcc-dev/neo-go/cli/cmdargs" + "github.com/nspcc-dev/neo-go/cli/options" + "github.com/nspcc-dev/neo-go/pkg/core/block" + "github.com/nspcc-dev/neo-go/pkg/io" + "github.com/urfave/cli/v2" +) + +func dumpBin(ctx *cli.Context) error { + if err := cmdargs.EnsureNone(ctx); err != nil { + return err + } + cfg, err := options.GetConfigFromContext(ctx) + if err != nil { + return cli.Exit(err, 1) + } + log, _, logCloser, err := options.HandleLoggingParams(ctx.Bool("debug"), cfg.ApplicationConfiguration) + if err != nil { + return cli.Exit(err, 1) + } + if logCloser != nil { + defer func() { _ = logCloser() }() + } + count := uint32(ctx.Uint("count")) + start := uint32(ctx.Uint("start")) + + chain, prometheus, pprof, err := initBCWithMetrics(cfg, log) + if err != nil { + return err + } + defer func() { + pprof.ShutDown() + prometheus.ShutDown() + chain.Close() + }() + + blocksCount := chain.BlockHeight() + 1 + if start+count > blocksCount { + return cli.Exit(fmt.Errorf("chain is not that high (%d) to dump %d blocks starting from %d", blocksCount-1, count, start), 1) + } + if count == 0 { + count = blocksCount - start + } + + out := ctx.String("out") + if out == "" { + return cli.Exit("output directory is not specified", 1) + } + if _, err = os.Stat(out); os.IsNotExist(err) { + if err = os.MkdirAll(out, os.ModePerm); err != nil { + return cli.Exit(fmt.Sprintf("failed to create directory %s: %s", out, err), 1) + } + } + if err != nil { + return cli.Exit(fmt.Sprintf("failed to check directory %s: %s", out, err), 1) + } + + for i := start; i < start+count; i++ { + blk, err := chain.GetBlock(chain.GetHeaderHash(i)) + if err != nil { + return cli.Exit(fmt.Sprintf("failed to get block %d: %s", i, err), 1) + } + filePath := filepath.Join(out, fmt.Sprintf("block-%d.bin", i)) + if err = saveBlockToFile(blk, filePath); err != nil { + return cli.Exit(fmt.Sprintf("failed to save block %d to file %s: %s", i, filePath, err), 1) + } + } + return nil +} + +func saveBlockToFile(blk *block.Block, filePath string) error { + file, err := os.Create(filePath) + if err != nil { + return err + } + defer file.Close() + + writer := io.NewBinWriterFromIO(file) + blk.EncodeBinary(writer) + return writer.Err +} diff --git a/cli/server/dump_bin_test.go b/cli/server/dump_bin_test.go new file mode 100644 index 0000000000..0524f2c1d0 --- /dev/null +++ b/cli/server/dump_bin_test.go @@ -0,0 +1,91 @@ +package server_test + +import ( + "os" + "path/filepath" + "strconv" + "testing" + + "github.com/nspcc-dev/neo-go/internal/testcli" + "github.com/nspcc-dev/neo-go/pkg/config" + "github.com/nspcc-dev/neo-go/pkg/core/storage/dbconfig" + "github.com/stretchr/testify/require" + "gopkg.in/yaml.v3" +) + +func TestDumpBin(t *testing.T) { + tmpDir := t.TempDir() + + loadConfig := func(t *testing.T) config.Config { + chainPath := filepath.Join(tmpDir, "neogotestchain") + cfg, err := config.LoadFile(filepath.Join("..", "..", "config", "protocol.unit_testnet.yml")) + require.NoError(t, err, "could not load config") + cfg.ApplicationConfiguration.DBConfiguration.Type = dbconfig.LevelDB + cfg.ApplicationConfiguration.DBConfiguration.LevelDBOptions.DataDirectoryPath = chainPath + return cfg + } + + cfg := loadConfig(t) + out, err := yaml.Marshal(cfg) + require.NoError(t, err) + + cfgPath := filepath.Join(tmpDir, "protocol.unit_testnet.yml") + require.NoError(t, os.WriteFile(cfgPath, out, os.ModePerm)) + + e := testcli.NewExecutor(t, false) + + restoreArgs := []string{"neo-go", "db", "restore", + "--config-file", cfgPath, "--in", inDump} + e.Run(t, restoreArgs...) + + t.Run("missing output directory", func(t *testing.T) { + args := []string{"neo-go", "db", "dump-bin", + "--config-file", cfgPath, "--out", ""} + e.RunWithErrorCheck(t, "output directory is not specified", args...) + }) + + t.Run("successful dump", func(t *testing.T) { + outDir := filepath.Join(tmpDir, "blocks") + args := []string{"neo-go", "db", "dump-bin", + "--config-file", cfgPath, "--out", outDir, "--count", "5", "--start", "0"} + + e.Run(t, args...) + + require.DirExists(t, outDir) + + for i := range 5 { + blockFile := filepath.Join(outDir, "block-"+strconv.Itoa(i)+".bin") + require.FileExists(t, blockFile) + } + }) + + t.Run("invalid block range", func(t *testing.T) { + outDir := filepath.Join(tmpDir, "invalid-blocks") + args := []string{"neo-go", "db", "dump-bin", + "--config-file", cfgPath, "--out", outDir, "--count", "1000", "--start", "0"} + + e.RunWithError(t, args...) + }) + + t.Run("zero blocks (full chain dump)", func(t *testing.T) { + outDir := filepath.Join(tmpDir, "full-dump") + args := []string{"neo-go", "db", "dump-bin", + "--config-file", cfgPath, "--out", outDir} + + e.Run(t, args...) + + require.DirExists(t, outDir) + for i := range 5 { + blockFile := filepath.Join(outDir, "block-"+strconv.Itoa(i)+".bin") + require.FileExists(t, blockFile) + } + }) + + t.Run("invalid config file", func(t *testing.T) { + outDir := filepath.Join(tmpDir, "blocks") + args := []string{"neo-go", "db", "dump-bin", + "--config-file", "invalid-config-path", "--out", outDir} + + e.RunWithError(t, args...) + }) +} diff --git a/cli/server/server.go b/cli/server/server.go index ceab195866..b3f9ec22b9 100644 --- a/cli/server/server.go +++ b/cli/server/server.go @@ -102,6 +102,13 @@ func NewCommands() []*cli.Command { Action: dumpDB, Flags: cfgCountOutFlags, }, + { + Name: "dump-bin", + Usage: "Dump blocks (starting with the genesis or specified block) to the directory in binary format", + UsageText: "neo-go db dump-bin -o directory [-s start] [-c count] [--config-path path] [-p/-m/-t] [--config-file file]", + Action: dumpBin, + Flags: cfgCountOutFlags, + }, { Name: "restore", Usage: "Restore blocks from the file", diff --git a/config/protocol.testnet.yml b/config/protocol.testnet.yml index bdab3cd20d..9a31251a2b 100644 --- a/config/protocol.testnet.yml +++ b/config/protocol.testnet.yml @@ -100,3 +100,16 @@ ApplicationConfiguration: Enabled: false Addresses: - ":2113" + NeoFSBlockFetcher: + Enabled: false + Addresses: + - st1.storage.fs.neo.org:8080 + Timeout: 10m + DownloaderWorkersCount: 500 + OIDBatchSize: 8000 + BQueueSize: 16000 # must be larger than OIDBatchSize; recommended to be 2*OIDBatchSize or 3*OIDBatchSize + SkipIndexFilesSearch: false + IndexFileSize: 128000 + ContainerID: "EPGuD26wYgQJbmDdVBoYoNZiMKHwFMJT3A5WqPjdUHxH" + BlockAttribute: "block" + IndexFileAttribute: "oid" diff --git a/docs/neofs-blockstorage.md b/docs/neofs-blockstorage.md new file mode 100644 index 0000000000..9755713805 --- /dev/null +++ b/docs/neofs-blockstorage.md @@ -0,0 +1,74 @@ +# NeoFS block storage + +Using NeoFS to store chain's blocks and snapshots was proposed in +[#3463](https://github.com/neo-project/neo/issues/3463). NeoGo contains several +extensions utilizing NeoFS block storage aimed to improve node synchronization +efficiency and reduce node storage size. + +## Components and functionality + +### Block storage schema + +A single NeoFS container is used to store blocks and index files. Each block +is stored in a binary form as a separate object with a unique OID and a set of +attributes: + - block object identifier with block index value (`block:1`) + - primary node index (`primary:0`) + - block hash in the LE form (`hash:5412a781caf278c0736556c0e544c7cfdbb6e3c62ae221ef53646be89364566b`) + - previous block hash in the LE form (`prevHash:3654a054d82a8178c7dfacecc2c57282e23468a42ee407f14506368afe22d929`) + - millisecond-precision block timestamp (`time:1627894840919`) + +Each index file is an object containing a constant-sized batch of raw block object +IDs in binary form ordered by block index. Each index file is marked with the +following attributes: + - index file identifier with consecutive file index value (`oid:0`) + - the number of OIDs included into index file (`size:128000`) + +### NeoFS BlockFetcher + +NeoFS BlockFetcher service is designed as an alternative to P2P synchronisation +protocol. It allows to download blocks from a trusted container in the NeoFS network +and persist them to database using standard verification flow. NeoFS BlockFetcher +service primarily used during the node's bootstrap, providing a fast alternative to +P2P blocks synchronisation. + +NeoFS BlockFetcher service has two modes of operation: +- Index File Search: Search for index files, which contain batches of block object + IDs and fetch blocks from NeoFS by retrieved OIDs. +- Direct Block Search: Search and fetch blocks directly from NeoFS container via + built-in NeoFS object search mechanism. + +Operation mode of BlockFetcher can be configured via `SkipIndexFilesSearch` +parameter. + +#### Operation flow + +1. **OID Fetching**: + Depending on the mode, the service either: + - Searches for index files by index file attribute and reads block OIDs from index + file object-by-object. + - Searches batches of blocks directly by block attribute (the batch size is + configured via `OIDBatchSize` parameter). + + Once the OIDs are retrieved, they are immediately redirected to the + block downloading routines for further processing. The channel that + is used to redirect block OIDs to downloading routines is buffered + to provide smooth OIDs delivery without delays. The size of this channel + can be configured via `OIDBatchSize` parameter and equals to `2*OIDBatchSize`. +2. **Parallel Block Downloading**: + The number of downloading routines can be configured via + `DownloaderWorkersCount` parameter. It's up to the user to find the + balance between the downloading speed and blocks persist speed for every + node that uses NeoFS BlockFetcher. Downloaded blocks are placed into a + buffered channel of size `IDBatchSize` with further redirection to the + block queue. +3. **Block Insertion**: + Downloaded blocks are inserted into the blockchain using the same logic + as in the P2P synchronisation protocol. The block queue is used to order + downloaded blocks before they are inserted into the blockchain. The + size of the queue can be configured via the `BQueueSize` parameter + and should be larger than the `OIDBatchSize` parameter to avoid blocking + the downloading routines. + +Once all blocks available in the NeoFS container are processed, the service +shuts down automatically. diff --git a/docs/node-configuration.md b/docs/node-configuration.md index 83c149e620..162ef7909c 100644 --- a/docs/node-configuration.md +++ b/docs/node-configuration.md @@ -21,6 +21,7 @@ node-related settings described in the table below. | GarbageCollectionPeriod | `uint32` | 10000 | Controls MPT garbage collection interval (in blocks) for configurations with `RemoveUntraceableBlocks` enabled and `KeepOnlyLatestState` disabled. In this mode the node stores a number of MPT trees (corresponding to `MaxTraceableBlocks` and `StateSyncInterval`), but the DB needs to be clean from old entries from time to time. Doing it too often will cause too much processing overhead, doing it too rarely will leave more useless data in the DB. | | KeepOnlyLatestState | `bool` | `false` | Specifies if MPT should only store the latest state (or a set of latest states, see `P2PStateExchangeExtensions` section in the ProtocolConfiguration for details). If true, DB size will be smaller, but older roots won't be accessible. This value should remain the same for the same database. | | | LogPath | `string` | "", so only console logging | File path where to store node logs. | +| NeoFSBlockFetcher | [NeoFS BlockFetcher Configuration](#NeoFS-BlockFetcher-Configuration) | | NeoFS BlockFetcher module configuration. See the [NeoFS BlockFetcher Configuration](#NeoFS-BlockFetcher-Configuration) section for details. | | Oracle | [Oracle Configuration](#Oracle-Configuration) | | Oracle module configuration. See the [Oracle Configuration](#Oracle-Configuration) section for details. | | P2P | [P2P Configuration](#P2P-Configuration) | | Configuration values for P2P network interaction. See the [P2P Configuration](#P2P-Configuration) section for details. | | P2PNotary | [P2P Notary Configuration](#P2P-Notary-Configuration) | | P2P Notary module configuration. See the [P2P Notary Configuration](#P2P-Notary-Configuration) section for details. | @@ -153,6 +154,55 @@ where: Please, refer to the [Notary module documentation](./notary.md#Notary node module) for details on module features. +### NeoFS BlockFetcher Configuration + +`NeoFSBlockFetcher` configuration section contains settings for NeoFS +BlockFetcher module and has the following structure: +``` + NeoFSBlockFetcher: + Enabled: true + UnlockWallet: + Path: "./wallet.json" + Password: "pass" + Addresses: + - st1.storage.fs.neo.org:8080 + Timeout: 10m + DownloaderWorkersCount: 500 + OIDBatchSize: 8000 + BQueueSize: 16000 + SkipIndexFilesSearch: false + ContainerID: "EPGuD26wYgQJbmDdVBoYoNZiMKHwFMJT3A5WqPjdUHxH" + BlockAttribute: "block" + IndexFileAttribute: "oid" + IndexFileSize: 128000 +``` +where: +- `Enabled` enables NeoFS BlockFetcher module. +- `UnlockWallet` contains wallet settings to retrieve account to sign requests to + NeoFS. Without this setting, the module will use randomly generated private key. + For configuration details see [Unlock Wallet Configuration](#Unlock-Wallet-Configuration) +- `Addresses` is a list of NeoFS storage nodes addresses. +- `Timeout` is a timeout for a single request to NeoFS storage node. +- `ContainerID` is a container ID to fetch blocks from. +- `BlockAttribute` is an attribute name of NeoFS object that contains block + data. +- `IndexFileAttribute` is an attribute name of NeoFS index object that contains block + object IDs. +- `DownloaderWorkersCount` is a number of workers that download blocks from + NeoFS in parallel. +- `OIDBatchSize` is the number of blocks to search per a single request to NeoFS + in case of disabled index files search. Also, for both modes of BlockFetcher + operation this setting manages the buffer size of OIDs and blocks transferring + channels. +- `BQueueSize` is a size of the block queue used to manage consecutive blocks + addition to the chain. It must be larger than `OIDBatchSize` and highly recommended + to be `2*OIDBatchSize` or `3*OIDBatchSize`. +- `SkipIndexFilesSearch` is a flag that allows to skip index files search and search + for blocks directly. It is set to `false` by default. +- `IndexFileSize` is the number of OID objects stored in the index files. This + setting depends on the NeoFS block storage configuration and is applicable only if + `SkipIndexFilesSearch` is set to `false`. + ### Metrics Services Configuration Metrics services configuration describes options for metrics services (pprof, diff --git a/go.mod b/go.mod index cc0b94faa2..6c4f23cdea 100644 --- a/go.mod +++ b/go.mod @@ -30,6 +30,7 @@ require ( golang.org/x/term v0.23.0 golang.org/x/text v0.17.0 golang.org/x/tools v0.24.0 + google.golang.org/grpc v1.62.0 gopkg.in/yaml.v3 v3.0.1 ) @@ -72,7 +73,6 @@ require ( golang.org/x/sync v0.8.0 // indirect golang.org/x/sys v0.23.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240221002015-b0ce06bbee7c // indirect - google.golang.org/grpc v1.62.0 // indirect google.golang.org/protobuf v1.34.2 // indirect rsc.io/tmplfunc v0.0.3 // indirect ) diff --git a/pkg/config/application_config.go b/pkg/config/application_config.go index 9ac369238a..efbdc5d948 100644 --- a/pkg/config/application_config.go +++ b/pkg/config/application_config.go @@ -23,12 +23,13 @@ type ApplicationConfiguration struct { Pprof BasicService `yaml:"Pprof"` Prometheus BasicService `yaml:"Prometheus"` - Relay bool `yaml:"Relay"` - Consensus Consensus `yaml:"Consensus"` - RPC RPC `yaml:"RPC"` - Oracle OracleConfiguration `yaml:"Oracle"` - P2PNotary P2PNotary `yaml:"P2PNotary"` - StateRoot StateRoot `yaml:"StateRoot"` + Relay bool `yaml:"Relay"` + Consensus Consensus `yaml:"Consensus"` + RPC RPC `yaml:"RPC"` + Oracle OracleConfiguration `yaml:"Oracle"` + P2PNotary P2PNotary `yaml:"P2PNotary"` + StateRoot StateRoot `yaml:"StateRoot"` + NeoFSBlockFetcher NeoFSBlockFetcher `yaml:"NeoFSBlockFetcher"` } // EqualsButServices returns true when the o is the same as a except for services @@ -141,3 +142,13 @@ func (a *ApplicationConfiguration) GetAddresses() ([]AnnounceableAddress, error) } return addrs, nil } + +// Validate checks ApplicationConfiguration for internal consistency and returns +// an error if any invalid settings are found. This ensures that the application +// configuration is valid and safe to use for further operations. +func (a *ApplicationConfiguration) Validate() error { + if err := a.NeoFSBlockFetcher.Validate(); err != nil { + return fmt.Errorf("invalid NeoFSBlockFetcher config: %w", err) + } + return nil +} diff --git a/pkg/config/application_config_test.go b/pkg/config/application_config_test.go index 83ea488e5f..6b5af8d816 100644 --- a/pkg/config/application_config_test.go +++ b/pkg/config/application_config_test.go @@ -3,6 +3,7 @@ package config import ( "path/filepath" "testing" + "time" "github.com/stretchr/testify/require" ) @@ -133,3 +134,101 @@ func TestGetAddresses(t *testing.T) { } } } + +func TestNeoFSBlockFetcherValidation(t *testing.T) { + type testcase struct { + cfg NeoFSBlockFetcher + shouldFail bool + errMsg string + } + validContainerID := "9iVfUg8aDHKjPC4LhQXEkVUM4HDkR7UCXYLs8NQwYfSG" + invalidContainerID := "invalid-container-id" + + cases := []testcase{ + { + cfg: NeoFSBlockFetcher{ + InternalService: InternalService{Enabled: true}, + Timeout: time.Second, + ContainerID: validContainerID, + Addresses: []string{"127.0.0.1"}, + OIDBatchSize: 10, + BQueueSize: 20, + SkipIndexFilesSearch: true, + DownloaderWorkersCount: 4, + }, + shouldFail: false, + }, + { + cfg: NeoFSBlockFetcher{ + InternalService: InternalService{Enabled: true}, + Timeout: time.Second, + ContainerID: "", + Addresses: []string{"127.0.0.1"}, + OIDBatchSize: 10, + BQueueSize: 20, + }, + shouldFail: true, + errMsg: "container ID is not set", + }, + { + cfg: NeoFSBlockFetcher{ + InternalService: InternalService{Enabled: true}, + Timeout: time.Second, + ContainerID: invalidContainerID, + Addresses: []string{"127.0.0.1"}, + OIDBatchSize: 10, + BQueueSize: 20, + }, + shouldFail: true, + errMsg: "invalid container ID", + }, + { + cfg: NeoFSBlockFetcher{ + InternalService: InternalService{Enabled: true}, + Timeout: time.Second, + ContainerID: validContainerID, + Addresses: []string{}, + OIDBatchSize: 10, + BQueueSize: 20, + }, + shouldFail: true, + errMsg: "addresses are not set", + }, + { + cfg: NeoFSBlockFetcher{ + InternalService: InternalService{Enabled: true}, + Timeout: time.Second, + ContainerID: validContainerID, + Addresses: []string{"127.0.0.1"}, + OIDBatchSize: 10, + BQueueSize: 5, + }, + shouldFail: true, + errMsg: "BQueueSize (5) is lower than OIDBatchSize (10)", + }, + { + cfg: NeoFSBlockFetcher{ + InternalService: InternalService{Enabled: true}, + Timeout: time.Second, + ContainerID: validContainerID, + Addresses: []string{"127.0.0.1"}, + OIDBatchSize: 10, + BQueueSize: 20, + SkipIndexFilesSearch: false, + IndexFileSize: 0, + }, + shouldFail: true, + errMsg: "IndexFileSize is not set", + }, + } + + for _, c := range cases { + err := c.cfg.Validate() + if c.shouldFail { + require.Error(t, err) + require.Contains(t, err.Error(), c.errMsg) + } else { + require.NoError(t, err) + } + } +} diff --git a/pkg/config/blockfetcher_config.go b/pkg/config/blockfetcher_config.go new file mode 100644 index 0000000000..22d4d46a05 --- /dev/null +++ b/pkg/config/blockfetcher_config.go @@ -0,0 +1,51 @@ +package config + +import ( + "errors" + "fmt" + "time" + + cid "github.com/nspcc-dev/neofs-sdk-go/container/id" +) + +// NeoFSBlockFetcher represents the configuration for the NeoFS BlockFetcher service. +type NeoFSBlockFetcher struct { + InternalService `yaml:",inline"` + Timeout time.Duration `yaml:"Timeout"` + ContainerID string `yaml:"ContainerID"` + Addresses []string `yaml:"Addresses"` + OIDBatchSize int `yaml:"OIDBatchSize"` + BlockAttribute string `yaml:"BlockAttribute"` + IndexFileAttribute string `yaml:"IndexFileAttribute"` + DownloaderWorkersCount int `yaml:"DownloaderWorkersCount"` + BQueueSize int `yaml:"BQueueSize"` + SkipIndexFilesSearch bool `yaml:"SkipIndexFilesSearch"` + IndexFileSize uint32 `yaml:"IndexFileSize"` +} + +// Validate checks NeoFSBlockFetcher for internal consistency and ensures +// that all required fields are properly set. It returns an error if the +// configuration is invalid or if the ContainerID cannot be properly decoded. +func (cfg *NeoFSBlockFetcher) Validate() error { + if !cfg.Enabled { + return nil + } + if cfg.ContainerID == "" { + return errors.New("container ID is not set") + } + var containerID cid.ID + err := containerID.DecodeString(cfg.ContainerID) + if err != nil { + return fmt.Errorf("invalid container ID: %w", err) + } + if cfg.BQueueSize < cfg.OIDBatchSize { + return fmt.Errorf("BQueueSize (%d) is lower than OIDBatchSize (%d)", cfg.BQueueSize, cfg.OIDBatchSize) + } + if len(cfg.Addresses) == 0 { + return errors.New("addresses are not set") + } + if !cfg.SkipIndexFilesSearch && cfg.IndexFileSize == 0 { + return errors.New("IndexFileSize is not set") + } + return nil +} diff --git a/pkg/config/config.go b/pkg/config/config.go index a82838ec21..0278614333 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -116,6 +116,10 @@ func LoadFile(configPath string, relativePath ...string) (Config, error) { if err != nil { return Config{}, err } + err = config.ApplicationConfiguration.Validate() + if err != nil { + return Config{}, err + } return config, nil } diff --git a/pkg/network/bqueue/queue.go b/pkg/network/bqueue/queue.go index 779448ac28..2899b1ac51 100644 --- a/pkg/network/bqueue/queue.go +++ b/pkg/network/bqueue/queue.go @@ -3,6 +3,7 @@ package bqueue import ( "sync" "sync/atomic" + "time" "github.com/nspcc-dev/neo-go/pkg/core/block" "go.uber.org/zap" @@ -15,6 +16,17 @@ type Blockqueuer interface { BlockHeight() uint32 } +// OperationMode is the mode of operation for the block queue. +// Could be either Blocking or NonBlocking. +type OperationMode byte + +const ( + // NonBlocking means that PutBlock will return immediately if the queue is full. + NonBlocking OperationMode = 0 + // Blocking means that PutBlock will wait until there is enough space in the queue. + Blocking OperationMode = 1 +) + // Queue is the block queue. type Queue struct { log *zap.Logger @@ -27,29 +39,36 @@ type Queue struct { discarded atomic.Bool len int lenUpdateF func(int) + cacheSize int + mode OperationMode } -// CacheSize is the amount of blocks above the current height +// DefaultCacheSize is the default amount of blocks above the current height // which are stored in the queue. -const CacheSize = 2000 +const DefaultCacheSize = 2000 -func indexToPosition(i uint32) int { - return int(i) % CacheSize +func (bq *Queue) indexToPosition(i uint32) int { + return int(i) % bq.cacheSize } // New creates an instance of BlockQueue. -func New(bc Blockqueuer, log *zap.Logger, relayer func(*block.Block), lenMetricsUpdater func(l int)) *Queue { +func New(bc Blockqueuer, log *zap.Logger, relayer func(*block.Block), cacheSize int, lenMetricsUpdater func(l int), mode OperationMode) *Queue { if log == nil { return nil } + if cacheSize <= 0 { + cacheSize = DefaultCacheSize + } return &Queue{ log: log, - queue: make([]*block.Block, CacheSize), + queue: make([]*block.Block, cacheSize), checkBlocks: make(chan struct{}, 1), chain: bc, relayF: relayer, lenUpdateF: lenMetricsUpdater, + cacheSize: cacheSize, + mode: mode, } } @@ -63,12 +82,12 @@ func (bq *Queue) Run() { } for { h := bq.chain.BlockHeight() - pos := indexToPosition(h + 1) + pos := bq.indexToPosition(h + 1) bq.queueLock.Lock() b := bq.queue[pos] // The chain moved forward using blocks from other sources (consensus). for i := lastHeight; i < h; i++ { - old := indexToPosition(i + 1) + old := bq.indexToPosition(i + 1) if bq.queue[old] != nil && bq.queue[old].Index == i { bq.len-- bq.queue[old] = nil @@ -114,17 +133,38 @@ func (bq *Queue) PutBlock(block *block.Block) error { if bq.discarded.Load() { return nil } - if block.Index <= h || h+CacheSize < block.Index { - // can easily happen when fetching the same blocks from - // different peers, thus not considered as error + // Can easily happen when fetching the same blocks from + // different peers, thus not considered as error. + if block.Index <= h { return nil } - pos := indexToPosition(block.Index) + if h+uint32(bq.cacheSize) < block.Index { + switch bq.mode { + case NonBlocking: + return nil + case Blocking: + bq.queueLock.Unlock() + t := time.NewTicker(time.Second) + defer t.Stop() + for range t.C { + if bq.discarded.Load() { + bq.queueLock.Lock() + return nil + } + h = bq.chain.BlockHeight() + if h+uint32(bq.cacheSize) >= block.Index { + bq.queueLock.Lock() + break + } + } + } + } + pos := bq.indexToPosition(block.Index) // If we already have it, keep the old block, throw away the new one. if bq.queue[pos] == nil || bq.queue[pos].Index < block.Index { bq.len++ bq.queue[pos] = block - for pos < CacheSize && bq.queue[pos] != nil && bq.lastQ+1 == bq.queue[pos].Index { + for pos < bq.cacheSize && bq.queue[pos] != nil && bq.lastQ+1 == bq.queue[pos].Index { bq.lastQ = bq.queue[pos].Index pos++ } @@ -147,7 +187,7 @@ func (bq *Queue) PutBlock(block *block.Block) error { func (bq *Queue) LastQueued() (uint32, int) { bq.queueLock.RLock() defer bq.queueLock.RUnlock() - return bq.lastQ, CacheSize - bq.len + return bq.lastQ, bq.cacheSize - bq.len } // Discard stops the queue and prevents it from accepting more blocks to enqueue. diff --git a/pkg/network/bqueue/queue_test.go b/pkg/network/bqueue/queue_test.go index 34eabc4b62..e481fba56f 100644 --- a/pkg/network/bqueue/queue_test.go +++ b/pkg/network/bqueue/queue_test.go @@ -13,7 +13,7 @@ import ( func TestBlockQueue(t *testing.T) { chain := fakechain.NewFakeChain() // notice, it's not yet running - bq := New(chain, zaptest.NewLogger(t), nil, nil) + bq := New(chain, zaptest.NewLogger(t), nil, 0, nil, NonBlocking) blocks := make([]*block.Block, 11) for i := 1; i < 11; i++ { blocks[i] = &block.Block{Header: block.Header{Index: uint32(i)}} @@ -24,7 +24,7 @@ func TestBlockQueue(t *testing.T) { } last, capLeft := bq.LastQueued() assert.Equal(t, uint32(0), last) - assert.Equal(t, CacheSize-2, capLeft) + assert.Equal(t, DefaultCacheSize-2, capLeft) // nothing should be put into the blockchain assert.Equal(t, uint32(0), chain.BlockHeight()) assert.Equal(t, 2, bq.length()) @@ -35,18 +35,18 @@ func TestBlockQueue(t *testing.T) { // but they're still not put into the blockchain, because bq isn't running last, capLeft = bq.LastQueued() assert.Equal(t, uint32(4), last) - assert.Equal(t, CacheSize-4, capLeft) + assert.Equal(t, DefaultCacheSize-4, capLeft) assert.Equal(t, uint32(0), chain.BlockHeight()) assert.Equal(t, 4, bq.length()) // block with too big index is dropped - assert.NoError(t, bq.PutBlock(&block.Block{Header: block.Header{Index: bq.chain.BlockHeight() + CacheSize + 1}})) + assert.NoError(t, bq.PutBlock(&block.Block{Header: block.Header{Index: bq.chain.BlockHeight() + DefaultCacheSize + 1}})) assert.Equal(t, 4, bq.length()) go bq.Run() // run() is asynchronous, so we need some kind of timeout anyway and this is the simplest one assert.Eventually(t, func() bool { return chain.BlockHeight() == 4 }, 4*time.Second, 100*time.Millisecond) last, capLeft = bq.LastQueued() assert.Equal(t, uint32(4), last) - assert.Equal(t, CacheSize, capLeft) + assert.Equal(t, DefaultCacheSize, capLeft) assert.Equal(t, 0, bq.length()) assert.Equal(t, uint32(4), chain.BlockHeight()) // put some old blocks @@ -55,7 +55,7 @@ func TestBlockQueue(t *testing.T) { } last, capLeft = bq.LastQueued() assert.Equal(t, uint32(4), last) - assert.Equal(t, CacheSize, capLeft) + assert.Equal(t, DefaultCacheSize, capLeft) assert.Equal(t, 0, bq.length()) assert.Equal(t, uint32(4), chain.BlockHeight()) // unexpected blocks with run() active @@ -75,7 +75,7 @@ func TestBlockQueue(t *testing.T) { assert.Eventually(t, func() bool { return chain.BlockHeight() == 8 }, 4*time.Second, 100*time.Millisecond) last, capLeft = bq.LastQueued() assert.Equal(t, uint32(8), last) - assert.Equal(t, CacheSize-1, capLeft) + assert.Equal(t, DefaultCacheSize-1, capLeft) assert.Equal(t, 1, bq.length()) assert.Equal(t, uint32(8), chain.BlockHeight()) bq.Discard() diff --git a/pkg/network/server.go b/pkg/network/server.go index 250d9bd6dc..6c9c3df649 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -28,6 +28,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/network/capability" "github.com/nspcc-dev/neo-go/pkg/network/extpool" "github.com/nspcc-dev/neo-go/pkg/network/payload" + "github.com/nspcc-dev/neo-go/pkg/services/blockfetcher" "github.com/nspcc-dev/neo-go/pkg/util" "go.uber.org/zap" ) @@ -103,10 +104,12 @@ type ( chain Ledger bQueue *bqueue.Queue bSyncQueue *bqueue.Queue + bFetcherQueue *bqueue.Queue mempool *mempool.Pool notaryRequestPool *mempool.Pool extensiblePool *extpool.Pool notaryFeer NotaryFeer + blockFetcher *blockfetcher.Service serviceLock sync.RWMutex services map[string]Service @@ -133,6 +136,7 @@ type ( runFin chan struct{} broadcastTxFin chan struct{} runProtoFin chan struct{} + blockFetcherFin chan struct{} transactions chan *transaction.Transaction @@ -182,28 +186,29 @@ func newServerFromConstructors(config ServerConfig, chain Ledger, stSync StateSy } s := &Server{ - ServerConfig: config, - chain: chain, - id: randomID(), - config: chain.GetConfig().ProtocolConfiguration, - quit: make(chan struct{}), - relayFin: make(chan struct{}), - runFin: make(chan struct{}), - broadcastTxFin: make(chan struct{}), - runProtoFin: make(chan struct{}), - register: make(chan Peer), - unregister: make(chan peerDrop), - handshake: make(chan Peer), - txInMap: make(map[util.Uint256]struct{}), - peers: make(map[Peer]bool), - mempool: chain.GetMemPool(), - extensiblePool: extpool.New(chain, config.ExtensiblePoolSize), - log: log, - txin: make(chan *transaction.Transaction, 64), - transactions: make(chan *transaction.Transaction, 64), - services: make(map[string]Service), - extensHandlers: make(map[string]func(*payload.Extensible) error), - stateSync: stSync, + ServerConfig: config, + chain: chain, + id: randomID(), + config: chain.GetConfig().ProtocolConfiguration, + quit: make(chan struct{}), + relayFin: make(chan struct{}), + runFin: make(chan struct{}), + broadcastTxFin: make(chan struct{}), + runProtoFin: make(chan struct{}), + blockFetcherFin: make(chan struct{}), + register: make(chan Peer), + unregister: make(chan peerDrop), + handshake: make(chan Peer), + txInMap: make(map[util.Uint256]struct{}), + peers: make(map[Peer]bool), + mempool: chain.GetMemPool(), + extensiblePool: extpool.New(chain, config.ExtensiblePoolSize), + log: log, + txin: make(chan *transaction.Transaction, 64), + transactions: make(chan *transaction.Transaction, 64), + services: make(map[string]Service), + extensHandlers: make(map[string]func(*payload.Extensible) error), + stateSync: stSync, } if chain.P2PSigExtensionsEnabled() { s.notaryFeer = NewNotaryFeer(chain) @@ -216,9 +221,17 @@ func newServerFromConstructors(config ServerConfig, chain Ledger, stSync StateSy } s.bQueue = bqueue.New(chain, log, func(b *block.Block) { s.tryStartServices() - }, updateBlockQueueLenMetric) + }, bqueue.DefaultCacheSize, updateBlockQueueLenMetric, bqueue.NonBlocking) - s.bSyncQueue = bqueue.New(s.stateSync, log, nil, updateBlockQueueLenMetric) + s.bSyncQueue = bqueue.New(s.stateSync, log, nil, bqueue.DefaultCacheSize, updateBlockQueueLenMetric, bqueue.NonBlocking) + s.bFetcherQueue = bqueue.New(chain, log, nil, s.NeoFSBlockFetcherCfg.BQueueSize, updateBlockQueueLenMetric, bqueue.Blocking) + var err error + s.blockFetcher, err = blockfetcher.New(chain, s.NeoFSBlockFetcherCfg, log, s.bFetcherQueue.PutBlock, func() { + close(s.blockFetcherFin) + }) + if err != nil && config.NeoFSBlockFetcherCfg.Enabled { + return nil, fmt.Errorf("failed to create NeoFS BlockFetcher: %w", err) + } if s.MinPeers < 0 { s.log.Info("bad MinPeers configured, using the default value", @@ -295,6 +308,13 @@ func (s *Server) Start() { go s.relayBlocksLoop() go s.bQueue.Run() go s.bSyncQueue.Run() + go s.bFetcherQueue.Run() + if s.ServerConfig.NeoFSBlockFetcherCfg.Enabled { + err := s.blockFetcher.Start() + if err != nil { + s.log.Error("skipping NeoFS BlockFetcher", zap.Error(err)) + } + } for _, tr := range s.transports { go tr.Accept() } @@ -311,6 +331,9 @@ func (s *Server) Shutdown() { return } s.log.Info("shutting down server", zap.Int("peers", s.PeerCount())) + if s.ServerConfig.NeoFSBlockFetcherCfg.Enabled { + s.blockFetcher.Shutdown() + } for _, tr := range s.transports { tr.Close() } @@ -319,6 +342,7 @@ func (s *Server) Shutdown() { } s.bQueue.Discard() s.bSyncQueue.Discard() + s.bFetcherQueue.Discard() s.serviceLock.RLock() for _, svc := range s.services { svc.Shutdown() @@ -548,6 +572,11 @@ func (s *Server) run() { s.tryInitStateSync() s.tryStartServices() + case <-s.blockFetcherFin: + if s.started.Load() { + s.tryInitStateSync() + s.tryStartServices() + } } } } @@ -702,7 +731,7 @@ func (s *Server) IsInSync() bool { var peersNumber int var notHigher int - if s.stateSync.IsActive() { + if s.stateSync.IsActive() || s.blockFetcher.IsActive() { return false } @@ -762,6 +791,9 @@ func (s *Server) handleVersionCmd(p Peer, version *payload.Version) error { // handleBlockCmd processes the block received from its peer. func (s *Server) handleBlockCmd(p Peer, block *block.Block) error { + if s.blockFetcher.IsActive() { + return nil + } if s.stateSync.IsActive() { return s.bSyncQueue.PutBlock(block) } @@ -782,6 +814,9 @@ func (s *Server) handlePing(p Peer, ping *payload.Ping) error { } func (s *Server) requestBlocksOrHeaders(p Peer) error { + if s.blockFetcher.IsActive() { + return nil + } if s.stateSync.NeedHeaders() { if s.chain.HeaderHeight() < p.LastBlockIndex() { return s.requestHeaders(p) @@ -1100,6 +1135,9 @@ func (s *Server) handleGetHeadersCmd(p Peer, gh *payload.GetBlockByIndex) error // handleHeadersCmd processes headers payload. func (s *Server) handleHeadersCmd(p Peer, h *payload.Headers) error { + if s.blockFetcher.IsActive() { + return nil + } return s.stateSync.AddHeaders(h.Hdrs...) } @@ -1322,7 +1360,7 @@ func getRequestBlocksPayload(p Peer, currHeight uint32, lastRequestedHeight *ato if !lastRequestedHeight.CompareAndSwap(old, needHeight) { continue } - } else if old < currHeight+(bqueue.CacheSize-payload.MaxHashesCount) { + } else if old < currHeight+(bqueue.DefaultCacheSize-payload.MaxHashesCount) { needHeight = currHeight + 1 if peerHeight > old+payload.MaxHashesCount { needHeight = old + payload.MaxHashesCount @@ -1331,7 +1369,7 @@ func getRequestBlocksPayload(p Peer, currHeight uint32, lastRequestedHeight *ato } } } else { - index := mrand.IntN(bqueue.CacheSize / payload.MaxHashesCount) + index := mrand.IntN(bqueue.DefaultCacheSize / payload.MaxHashesCount) needHeight = currHeight + 1 + uint32(index*payload.MaxHashesCount) } break @@ -1428,6 +1466,9 @@ func (s *Server) handleMessage(peer Peer, msg *Message) error { } func (s *Server) tryInitStateSync() { + if s.blockFetcher.IsActive() { + return + } if !s.stateSync.IsActive() { s.bSyncQueue.Discard() return diff --git a/pkg/network/server_config.go b/pkg/network/server_config.go index c0f1e727f6..a9cc9beda0 100644 --- a/pkg/network/server_config.go +++ b/pkg/network/server_config.go @@ -76,6 +76,8 @@ type ( // BroadcastFactor is the factor (0-100) for fan-out optimization. BroadcastFactor int + + NeoFSBlockFetcherCfg config.NeoFSBlockFetcher } ) @@ -89,24 +91,25 @@ func NewServerConfig(cfg config.Config) (ServerConfig, error) { return ServerConfig{}, fmt.Errorf("failed to parse addresses: %w", err) } c := ServerConfig{ - UserAgent: cfg.GenerateUserAgent(), - Addresses: addrs, - Net: protoConfig.Magic, - Relay: appConfig.Relay, - Seeds: protoConfig.SeedList, - DialTimeout: appConfig.P2P.DialTimeout, - ProtoTickInterval: appConfig.P2P.ProtoTickInterval, - PingInterval: appConfig.P2P.PingInterval, - PingTimeout: appConfig.P2P.PingTimeout, - MaxPeers: appConfig.P2P.MaxPeers, - AttemptConnPeers: appConfig.P2P.AttemptConnPeers, - MinPeers: appConfig.P2P.MinPeers, - TimePerBlock: protoConfig.TimePerBlock, - OracleCfg: appConfig.Oracle, - P2PNotaryCfg: appConfig.P2PNotary, - StateRootCfg: appConfig.StateRoot, - ExtensiblePoolSize: appConfig.P2P.ExtensiblePoolSize, - BroadcastFactor: appConfig.P2P.BroadcastFactor, + UserAgent: cfg.GenerateUserAgent(), + Addresses: addrs, + Net: protoConfig.Magic, + Relay: appConfig.Relay, + Seeds: protoConfig.SeedList, + DialTimeout: appConfig.P2P.DialTimeout, + ProtoTickInterval: appConfig.P2P.ProtoTickInterval, + PingInterval: appConfig.P2P.PingInterval, + PingTimeout: appConfig.P2P.PingTimeout, + MaxPeers: appConfig.P2P.MaxPeers, + AttemptConnPeers: appConfig.P2P.AttemptConnPeers, + MinPeers: appConfig.P2P.MinPeers, + TimePerBlock: protoConfig.TimePerBlock, + OracleCfg: appConfig.Oracle, + P2PNotaryCfg: appConfig.P2PNotary, + StateRootCfg: appConfig.StateRoot, + ExtensiblePoolSize: appConfig.P2P.ExtensiblePoolSize, + BroadcastFactor: appConfig.P2P.BroadcastFactor, + NeoFSBlockFetcherCfg: appConfig.NeoFSBlockFetcher, } return c, nil } diff --git a/pkg/services/blockfetcher/blockfetcher.go b/pkg/services/blockfetcher/blockfetcher.go new file mode 100644 index 0000000000..ee64c85eac --- /dev/null +++ b/pkg/services/blockfetcher/blockfetcher.go @@ -0,0 +1,479 @@ +package blockfetcher + +import ( + "context" + "crypto/sha256" + "errors" + "fmt" + "io" + "net/url" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/nspcc-dev/neo-go/pkg/config" + "github.com/nspcc-dev/neo-go/pkg/core/block" + gio "github.com/nspcc-dev/neo-go/pkg/io" + "github.com/nspcc-dev/neo-go/pkg/services/oracle/neofs" + "github.com/nspcc-dev/neo-go/pkg/wallet" + "github.com/nspcc-dev/neofs-sdk-go/client" + "github.com/nspcc-dev/neofs-sdk-go/object" + oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + "go.uber.org/zap" +) + +const ( + // oidSize is the size of the object ID in NeoFS. + oidSize = sha256.Size + // defaultTimeout is the default timeout for NeoFS requests. + defaultTimeout = 5 * time.Minute + // defaultOIDBatchSize is the default number of OIDs to search and fetch at once. + defaultOIDBatchSize = 8000 + // defaultDownloaderWorkersCount is the default number of workers downloading blocks. + defaultDownloaderWorkersCount = 100 +) + +// Ledger is an interface to Blockchain sufficient for Service. +type Ledger interface { + GetConfig() config.Blockchain + BlockHeight() uint32 +} + +// Service is a service that fetches blocks from NeoFS. +type Service struct { + // isActive denotes whether the service is working or in the process of shutdown. + isActive atomic.Bool + log *zap.Logger + cfg config.NeoFSBlockFetcher + stateRootInHeader bool + + chain Ledger + client *client.Client + enqueueBlock func(*block.Block) error + account *wallet.Account + + oidsCh chan oid.ID + blocksCh chan *block.Block + // wg is a wait group for block downloaders. + wg sync.WaitGroup + + // Global context for download operations cancellation. + ctx context.Context + ctxCancel context.CancelFunc + + // A set of routines managing graceful Service shutdown. + quit chan bool + quitOnce sync.Once + exiterToOIDDownloader chan struct{} + exiterToShutdown chan struct{} + oidDownloaderToExiter chan struct{} + blockQueuerToExiter chan struct{} + + shutdownCallback func() +} + +// New creates a new BlockFetcher Service. +func New(chain Ledger, cfg config.NeoFSBlockFetcher, logger *zap.Logger, putBlock func(*block.Block) error, shutdownCallback func()) (*Service, error) { + var ( + account *wallet.Account + err error + ) + + if cfg.UnlockWallet.Path != "" { + walletFromFile, err := wallet.NewWalletFromFile(cfg.UnlockWallet.Path) + if err != nil { + return &Service{}, err + } + for _, acc := range walletFromFile.Accounts { + if err := acc.Decrypt(cfg.UnlockWallet.Password, walletFromFile.Scrypt); err == nil { + account = acc + break + } + } + if account == nil { + return &Service{}, errors.New("failed to decrypt any account in the wallet") + } + } else { + account, err = wallet.NewAccount() + if err != nil { + return &Service{}, err + } + } + if cfg.Timeout <= 0 { + cfg.Timeout = defaultTimeout + } + if cfg.OIDBatchSize <= 0 { + cfg.OIDBatchSize = defaultOIDBatchSize + } + if cfg.DownloaderWorkersCount <= 0 { + cfg.DownloaderWorkersCount = defaultDownloaderWorkersCount + } + if len(cfg.Addresses) == 0 { + return &Service{}, errors.New("no addresses provided") + } + return &Service{ + chain: chain, + log: logger, + cfg: cfg, + + enqueueBlock: putBlock, + account: account, + stateRootInHeader: chain.GetConfig().StateRootInHeader, + shutdownCallback: shutdownCallback, + + quit: make(chan bool), + exiterToOIDDownloader: make(chan struct{}), + exiterToShutdown: make(chan struct{}), + oidDownloaderToExiter: make(chan struct{}), + blockQueuerToExiter: make(chan struct{}), + + // Use buffer of two batch sizes to load OIDs in advance: + // * first full block of OIDs is processing by Downloader + // * second full block of OIDs is available to be fetched by Downloader immediately + // * third half-filled block of OIDs is being collected by OIDsFetcher. + oidsCh: make(chan oid.ID, 2*cfg.OIDBatchSize), + + // Use buffer of a single OIDs batch size to provide smooth downloading and + // avoid pauses during blockqueue insertion. + blocksCh: make(chan *block.Block, cfg.OIDBatchSize), + }, nil +} + +// Start runs the NeoFS BlockFetcher service. +func (bfs *Service) Start() error { + if !bfs.isActive.CompareAndSwap(false, true) { + return nil + } + bfs.log.Info("starting NeoFS BlockFetcher service") + + var err error + bfs.ctx, bfs.ctxCancel = context.WithCancel(context.Background()) + bfs.client, err = neofs.GetSDKClient(bfs.ctx, bfs.cfg.Addresses[0], 10*time.Minute) + if err != nil { + bfs.isActive.CompareAndSwap(true, false) + return fmt.Errorf("create SDK client: %w", err) + } + + // Start routine that manages Service shutdown process. + go bfs.exiter() + + // Start OIDs downloader routine. + go bfs.oidDownloader() + + // Start the set of blocks downloading routines. + for range bfs.cfg.DownloaderWorkersCount { + bfs.wg.Add(1) + go bfs.blockDownloader() + } + + // Start routine that puts blocks into bQueue. + go bfs.blockQueuer() + + return nil +} + +// oidDownloader runs the appropriate blocks OID fetching method based on the configuration. +func (bfs *Service) oidDownloader() { + defer close(bfs.oidDownloaderToExiter) + + var err error + if bfs.cfg.SkipIndexFilesSearch { + err = bfs.fetchOIDsBySearch() + } else { + err = bfs.fetchOIDsFromIndexFiles() + } + var force bool + if err != nil { + bfs.log.Error("NeoFS BlockFetcher service: OID downloading routine failed", zap.Error(err)) + force = true + } + // Stop the service since there's nothing to do anymore. + bfs.stopService(force) +} + +// blockDownloader downloads the block from NeoFS and sends it to the blocks channel. +func (bfs *Service) blockDownloader() { + defer bfs.wg.Done() + + for blkOid := range bfs.oidsCh { + ctx, cancel := context.WithTimeout(bfs.ctx, bfs.cfg.Timeout) + defer cancel() + + rc, err := bfs.objectGet(ctx, blkOid.String()) + if err != nil { + if isContextCanceledErr(err) { + return + } + bfs.log.Error("failed to objectGet block", zap.Error(err)) + bfs.stopService(true) + return + } + + b, err := bfs.readBlock(rc) + if err != nil { + if isContextCanceledErr(err) { + return + } + bfs.log.Error("failed to read block", zap.Error(err)) + bfs.stopService(true) + return + } + select { + case <-bfs.ctx.Done(): + return + case bfs.blocksCh <- b: + } + } +} + +// blockQueuer puts the block into the bqueue. +func (bfs *Service) blockQueuer() { + defer close(bfs.blockQueuerToExiter) + + for b := range bfs.blocksCh { + select { + case <-bfs.ctx.Done(): + return + default: + err := bfs.enqueueBlock(b) + if err != nil { + bfs.log.Error("failed to enqueue block", zap.Error(err)) + bfs.stopService(true) + return + } + } + } +} + +// fetchOIDsFromIndexFiles fetches block OIDs from NeoFS by searching index files first. +func (bfs *Service) fetchOIDsFromIndexFiles() error { + h := bfs.chain.BlockHeight() + startIndex := h/bfs.cfg.IndexFileSize + 1 + skip := h % bfs.cfg.IndexFileSize + + for { + select { + case <-bfs.exiterToOIDDownloader: + return nil + default: + prm := client.PrmObjectSearch{} + filters := object.NewSearchFilters() + filters.AddFilter(bfs.cfg.IndexFileAttribute, fmt.Sprintf("%d", startIndex), object.MatchStringEqual) + prm.SetFilters(filters) + + ctx, cancel := context.WithTimeout(bfs.ctx, bfs.cfg.Timeout) + blockOidsObject, err := bfs.objectSearch(ctx, prm) + cancel() + if err != nil { + if isContextCanceledErr(err) { + return nil + } + return fmt.Errorf("failed to find '%s' object with index %d: %w", bfs.cfg.IndexFileAttribute, startIndex, err) + } + if len(blockOidsObject) == 0 { + bfs.log.Info(fmt.Sprintf("NeoFS BlockFetcher service: no '%s' object found with index %d, stopping", bfs.cfg.IndexFileAttribute, startIndex)) + return nil + } + + blockCtx, blockCancel := context.WithTimeout(bfs.ctx, bfs.cfg.Timeout) + defer blockCancel() + oidsRC, err := bfs.objectGet(blockCtx, blockOidsObject[0].String()) + if err != nil { + if isContextCanceledErr(err) { + return nil + } + return fmt.Errorf("failed to fetch '%s' object with index %d: %w", bfs.cfg.IndexFileAttribute, startIndex, err) + } + + err = bfs.streamBlockOIDs(oidsRC, int(skip)) + if err != nil { + if isContextCanceledErr(err) { + return nil + } + return fmt.Errorf("failed to stream block OIDs with index %d: %w", startIndex, err) + } + + startIndex++ + skip = 0 + } + } +} + +// streamBlockOIDs reads block OIDs from the read closer and sends them to the OIDs channel. +func (bfs *Service) streamBlockOIDs(rc io.ReadCloser, skip int) error { + defer rc.Close() + oidBytes := make([]byte, oidSize) + oidsProcessed := 0 + + for { + _, err := io.ReadFull(rc, oidBytes) + if err == io.EOF { + break + } + if err != nil { + return fmt.Errorf("failed to read OID: %w", err) + } + + if oidsProcessed < skip { + oidsProcessed++ + continue + } + + var oidBlock oid.ID + if err := oidBlock.Decode(oidBytes); err != nil { + return fmt.Errorf("failed to decode OID: %w", err) + } + + select { + case <-bfs.exiterToOIDDownloader: + return nil + case bfs.oidsCh <- oidBlock: + } + + oidsProcessed++ + } + if oidsProcessed != int(bfs.cfg.IndexFileSize) { + return fmt.Errorf("block OIDs count mismatch: expected %d, processed %d", bfs.cfg.IndexFileSize, oidsProcessed) + } + return nil +} + +// fetchOIDsBySearch fetches block OIDs from NeoFS by searching through the Block objects. +func (bfs *Service) fetchOIDsBySearch() error { + startIndex := bfs.chain.BlockHeight() + batchSize := uint32(bfs.cfg.OIDBatchSize) + + for { + select { + case <-bfs.exiterToOIDDownloader: + return nil + default: + prm := client.PrmObjectSearch{} + filters := object.NewSearchFilters() + filters.AddFilter(bfs.cfg.BlockAttribute, fmt.Sprintf("%d", startIndex), object.MatchNumGE) + filters.AddFilter(bfs.cfg.BlockAttribute, fmt.Sprintf("%d", startIndex+batchSize-1), object.MatchNumLE) + prm.SetFilters(filters) + ctx, cancel := context.WithTimeout(bfs.ctx, bfs.cfg.Timeout) + blockOids, err := bfs.objectSearch(ctx, prm) + cancel() + if err != nil { + if isContextCanceledErr(err) { + return nil + } + return err + } + + if len(blockOids) == 0 { + bfs.log.Info(fmt.Sprintf("NeoFS BlockFetcher service: no block found with index %d, stopping", startIndex)) + return nil + } + for _, oid := range blockOids { + select { + case <-bfs.exiterToOIDDownloader: + return nil + case bfs.oidsCh <- oid: + } + } + startIndex += batchSize + } + } +} + +// readBlock decodes the block from the read closer and prepares it for adding to the blockchain. +func (bfs *Service) readBlock(rc io.ReadCloser) (*block.Block, error) { + b := block.New(bfs.stateRootInHeader) + r := gio.NewBinReaderFromIO(rc) + b.DecodeBinary(r) + rc.Close() + return b, r.Err +} + +// Shutdown stops the NeoFS BlockFetcher service. It prevents service from new +// block OIDs search, cancels all in-progress downloading operations and waits +// until all service routines finish their work. +func (bfs *Service) Shutdown() { + if !bfs.IsActive() { + return + } + bfs.stopService(true) + <-bfs.exiterToShutdown +} + +// stopService close quitting goroutine once. It's the only entrypoint to shutdown +// procedure. +func (bfs *Service) stopService(force bool) { + bfs.quitOnce.Do(func() { + bfs.quit <- force + close(bfs.quit) + }) +} + +// exiter is a routine that is listening to a quitting signal and manages graceful +// Service shutdown process. +func (bfs *Service) exiter() { + // Closing signal may come from anyone, but only once. + force := <-bfs.quit + bfs.log.Info("shutting down NeoFS BlockFetcher service", + zap.Bool("force", force), + ) + + // Cansel all pending OIDs/blocks downloads in case if shutdown requested by user + // or caused by downloading error. + if force { + bfs.ctxCancel() + } + + // Send signal to OID downloader to stop. Wait until OID downloader finishes his + // work. + close(bfs.exiterToOIDDownloader) + <-bfs.oidDownloaderToExiter + + // Close OIDs channel to let block downloaders know that there are no more OIDs + // expected. Wait until all downloaders finish their work. + close(bfs.oidsCh) + bfs.wg.Wait() + + // Send signal to block putter to finish his work. Wait until it's finished. + close(bfs.blocksCh) + <-bfs.blockQueuerToExiter + + // Everything is done, release resources, turn off the activity marker and let + // the server know about it. + _ = bfs.client.Close() + _ = bfs.log.Sync() + bfs.isActive.CompareAndSwap(true, false) + bfs.shutdownCallback() + + // Notify Shutdown routine in case if it's user-triggered shutdown. + close(bfs.exiterToShutdown) +} + +// IsActive returns true if the NeoFS BlockFetcher service is running. +func (bfs *Service) IsActive() bool { + return bfs.isActive.Load() +} + +func (bfs *Service) objectGet(ctx context.Context, oid string) (io.ReadCloser, error) { + u, err := url.Parse(fmt.Sprintf("neofs:%s/%s", bfs.cfg.ContainerID, oid)) + if err != nil { + return nil, err + } + rc, err := neofs.GetWithClient(ctx, bfs.client, bfs.account.PrivateKey(), u, false) + if err != nil { + return nil, err + } + + return rc, nil +} + +func (bfs *Service) objectSearch(ctx context.Context, prm client.PrmObjectSearch) ([]oid.ID, error) { + return neofs.ObjectSearch(ctx, bfs.client, bfs.account.PrivateKey(), bfs.cfg.ContainerID, prm) +} + +// isContextCanceledErr returns whether error is a wrapped [context.Canceled]. +// Ref. https://github.com/nspcc-dev/neofs-sdk-go/issues/624. +func isContextCanceledErr(err error) bool { + return errors.Is(err, context.Canceled) || + strings.Contains(err.Error(), "context canceled") +} diff --git a/pkg/services/blockfetcher/blockfetcher_test.go b/pkg/services/blockfetcher/blockfetcher_test.go new file mode 100644 index 0000000000..87254a8351 --- /dev/null +++ b/pkg/services/blockfetcher/blockfetcher_test.go @@ -0,0 +1,98 @@ +package blockfetcher + +import ( + "testing" + + "github.com/nspcc-dev/neo-go/pkg/config" + "github.com/nspcc-dev/neo-go/pkg/core/block" + "github.com/stretchr/testify/require" + "go.uber.org/zap" +) + +type mockLedger struct { + height uint32 +} + +func (m *mockLedger) GetConfig() config.Blockchain { + return config.Blockchain{} +} + +func (m *mockLedger) BlockHeight() uint32 { + return m.height +} + +type mockPutBlockFunc struct { + putCalled bool +} + +func (m *mockPutBlockFunc) putBlock(b *block.Block) error { + m.putCalled = true + return nil +} + +func TestServiceConstructor(t *testing.T) { + logger := zap.NewNop() + ledger := &mockLedger{height: 10} + mockPut := &mockPutBlockFunc{} + shutdownCallback := func() {} + + t.Run("empty configuration", func(t *testing.T) { + cfg := config.NeoFSBlockFetcher{ + Timeout: 0, + OIDBatchSize: 0, + DownloaderWorkersCount: 0, + } + _, err := New(ledger, cfg, logger, mockPut.putBlock, shutdownCallback) + require.Error(t, err) + }) + + t.Run("no addresses", func(t *testing.T) { + cfg := config.NeoFSBlockFetcher{ + Addresses: []string{}, + } + _, err := New(ledger, cfg, logger, mockPut.putBlock, shutdownCallback) + require.Error(t, err) + }) + + t.Run("default values", func(t *testing.T) { + cfg := config.NeoFSBlockFetcher{ + Addresses: []string{"http://localhost:8080"}, + } + service, err := New(ledger, cfg, logger, mockPut.putBlock, shutdownCallback) + require.NoError(t, err) + require.NotNil(t, service) + + require.Equal(t, service.IsActive(), false) + require.Equal(t, service.cfg.Timeout, defaultTimeout) + require.Equal(t, service.cfg.OIDBatchSize, defaultOIDBatchSize) + require.Equal(t, service.cfg.DownloaderWorkersCount, defaultDownloaderWorkersCount) + require.Equal(t, service.IsActive(), false) + }) + + t.Run("SDK client", func(t *testing.T) { + cfg := config.NeoFSBlockFetcher{ + Addresses: []string{"http://localhost:8080"}, + } + service, err := New(ledger, cfg, logger, mockPut.putBlock, shutdownCallback) + require.NoError(t, err) + err = service.Start() + require.Error(t, err) + require.Contains(t, err.Error(), "create SDK client") + require.Equal(t, service.IsActive(), false) + }) + + t.Run("invalid wallet", func(t *testing.T) { + cfg := config.NeoFSBlockFetcher{ + Addresses: []string{"http://localhost:8080"}, + InternalService: config.InternalService{ + Enabled: true, + UnlockWallet: config.Wallet{ + Path: "invalid/path/to/wallet.json", + Password: "wrong-password", + }, + }, + } + _, err := New(ledger, cfg, logger, mockPut.putBlock, shutdownCallback) + require.Error(t, err) + }) +} diff --git a/pkg/services/oracle/neofs/neofs.go b/pkg/services/oracle/neofs/neofs.go index 27351ba8b2..af245463e3 100644 --- a/pkg/services/oracle/neofs/neofs.go +++ b/pkg/services/oracle/neofs/neofs.go @@ -9,6 +9,7 @@ import ( "net/url" "strconv" "strings" + "time" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neo-go/pkg/util" @@ -17,6 +18,8 @@ import ( "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "github.com/nspcc-dev/neofs-sdk-go/user" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) const ( @@ -45,41 +48,48 @@ var ( // URI scheme is "neofs://". // If Command is not provided, full object is requested. func Get(ctx context.Context, priv *keys.PrivateKey, u *url.URL, addr string) (io.ReadCloser, error) { - objectAddr, ps, err := parseNeoFSURL(u) + c, err := GetSDKClient(ctx, addr, 0) if err != nil { - return nil, err + return clientCloseWrapper{c: c}, fmt.Errorf("failed to create client: %w", err) } + return GetWithClient(ctx, c, priv, u, true) +} - c, err := client.New(client.PrmInit{}) +// GetWithClient returns a neofs object from the provided url using the provided client. +// URI scheme is "neofs://". +// If Command is not provided, full object is requested. If wrapClientCloser is true, +// the client will be closed when the returned ReadCloser is closed. +func GetWithClient(ctx context.Context, c *client.Client, priv *keys.PrivateKey, u *url.URL, wrapClientCloser bool) (io.ReadCloser, error) { + objectAddr, ps, err := parseNeoFSURL(u) if err != nil { - return nil, fmt.Errorf("failed to create client: %w", err) + return nil, err } - var ( - res = clientCloseWrapper{c: c} - prmd client.PrmDial + res io.ReadCloser + s = user.NewAutoIDSignerRFC6979(priv.PrivateKey) ) - prmd.SetServerURI(addr) - prmd.SetContext(ctx) - err = c.Dial(prmd) //nolint:contextcheck // contextcheck: Function `Dial->Balance->SendUnary->Init->setNeoFSAPIServer` should pass the context parameter - if err != nil { - return res, err - } - - var s = user.NewAutoIDSignerRFC6979(priv.PrivateKey) switch { - case len(ps) == 0 || ps[0] == "": // Get request - res.ReadCloser, err = getPayload(ctx, s, c, objectAddr) + case len(ps) == 0 || ps[0] == "": + res, err = getPayload(ctx, s, c, objectAddr) case ps[0] == rangeCmd: - res.ReadCloser, err = getRange(ctx, s, c, objectAddr, ps[1:]...) + res, err = getRange(ctx, s, c, objectAddr, ps[1:]...) case ps[0] == headerCmd: - res.ReadCloser, err = getHeader(ctx, s, c, objectAddr) + res, err = getHeader(ctx, s, c, objectAddr) case ps[0] == hashCmd: - res.ReadCloser, err = getHash(ctx, s, c, objectAddr, ps[1:]...) + res, err = getHash(ctx, s, c, objectAddr, ps[1:]...) default: - err = ErrInvalidCommand + return nil, ErrInvalidCommand + } + if err != nil { + return nil, err + } + if wrapClientCloser { + return clientCloseWrapper{ + c: c, + ReadCloser: res, + }, nil } - return res, err + return res, nil } type clientCloseWrapper struct { @@ -92,7 +102,12 @@ func (w clientCloseWrapper) Close() error { if w.ReadCloser != nil { res = w.ReadCloser.Close() } - w.c.Close() + if w.c != nil { + closeErr := w.c.Close() + if closeErr != nil && res == nil { + res = closeErr + } + } return res } @@ -220,3 +235,58 @@ func parseRange(s string) (*object.Range, error) { r.SetLength(length) return r, nil } + +// ObjectSearch returns a list of object IDs from the provided container. +func ObjectSearch(ctx context.Context, c *client.Client, priv *keys.PrivateKey, containerIDStr string, prm client.PrmObjectSearch) ([]oid.ID, error) { + var ( + s = user.NewAutoIDSignerRFC6979(priv.PrivateKey) + objectIDs []oid.ID + containerID cid.ID + ) + err := containerID.DecodeString(containerIDStr) + if err != nil { + return nil, fmt.Errorf("%w: %w", ErrInvalidContainer, err) + } + reader, err := c.ObjectSearchInit(ctx, containerID, s, prm) + if err != nil { + return nil, fmt.Errorf("failed to initiate object search: %w", err) + } + defer reader.Close() + + err = reader.Iterate(func(oid oid.ID) bool { + objectIDs = append(objectIDs, oid) + return false + }) + if err != nil { + return nil, fmt.Errorf("error during object IDs iteration: %w", err) + } + return objectIDs, nil +} + +// GetSDKClient returns a NeoFS SDK client configured with the specified address and context. +// If timeout is 0, the default timeout will be used. +func GetSDKClient(ctx context.Context, addr string, timeout time.Duration) (*client.Client, error) { + var prmDial client.PrmDial + if addr == "" { + return nil, errors.New("address is empty") + } + prmDial.SetServerURI(addr) + prmDial.SetContext(ctx) + if timeout != 0 { + prmDial.SetTimeout(timeout) + prmDial.SetStreamTimeout(timeout) + } + c, err := client.New(client.PrmInit{}) + if err != nil { + return nil, fmt.Errorf("can't create SDK client: %w", err) + } + + if err := c.Dial(prmDial); err != nil { + if status.Code(err) == codes.Unimplemented { + return c, nil + } + return nil, fmt.Errorf("can't init SDK client: %w", err) + } + + return c, nil +}