Skip to content

Commit

Permalink
services: add new service for fetching blocks from NeoFS
Browse files Browse the repository at this point in the history
Close #3496

Co-authored-by: Anna Shaleva <[email protected]>
Signed-off-by: Ekaterina Pavlova <[email protected]>
  • Loading branch information
AliceInHunterland and AnnaShaleva committed Sep 6, 2024
1 parent d21c8f7 commit e035502
Show file tree
Hide file tree
Showing 11 changed files with 749 additions and 48 deletions.
12 changes: 12 additions & 0 deletions config/protocol.testnet.yml
Original file line number Diff line number Diff line change
Expand Up @@ -100,3 +100,15 @@ ApplicationConfiguration:
Enabled: false
Addresses:
- ":2113"
NeoFSBlockFetcher:
Enabled: false
Addresses:
- st1.storage.fs.neo.org:8080
Timeout: 10m
DownloaderWorkersCount: 500
OIDBatchSize: 8000
BQueueSize: 32000 # must be larger than OIDBatchSize; highly recommended to be 2*OIDBatchSize or 3*OIDBatchSize
SkipIndexFilesSearch: false
ContainerID: "EPGuD26wYgQJbmDdVBoYoNZiMKHwFMJT3A5WqPjdUHxH"
BlockAttribute: "block"
OidAttribute: "oid"
88 changes: 88 additions & 0 deletions docs/neofs-blockstorage.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
# NeoFS BlockFetcher
This service was proposed as part [ Using NeoFS to store blocks and snapshots #3463](https://github.com/neo-project/neo/issues/3463).

The NeoFS BlockFetcher service is designed to fetch blocks from NeoFS storage
nodes and supply them to the NeoGo node. It serves as an alternative method to
retrieve blocks, primarily used at the start of a node's lifecycle, providing
an alternative to downloading blocks through the P2P NeoGo network. By default,
the service is disabled.

The NeoFS BlockFetcher service has two modes of operation:
- Index File Search: Search for index files, which contain OIDs of blocks,
and fetch blocks from NeoFS by provided OIDs from file.
- Direct Block Search: Search and fetch blocks directly from NeoFS container.

### Storage schema
One container is used to store blocks and index files. Each block is stored
as a separate object with a unique OID and attribute with `block.Index`.
Index files contain OIDs of ordered blocks. Each index file is an object
containing a specified number (`blockfetcher.indexFileSize`) of OIDs in
binary form, with attributes numbered sequentially from 1 to n, where
the numbers represent the continuous chain of blocks.

Attributes have the following structure: `AttributeName:IntValue`.
In the container are stored blocks with attributes `BlockAttribute:1`,
`BlockAttribute:2`, etc.
And index files with attributes `OidAttribute:1`, `OidAttribute:2`, etc.

### BlockFetcher workflows
1. **OID Fetching**:
Depending on the mode, the service either:
- Searches for index files by attribute and reads blocks OIDs from it.
- Searches batches of blocks directly by attributes.

OIDs are buffered (size configurable) to ensure blocks can be downloaded
efficiently without delays.
2. **Parallel Block Downloading**:
Multiple workers (configurable) download blocks by OIDs from NeoFS in
parallel. Downloaded blocks are placed into a buffer for further processing.
3. **Block Insertion**:
Downloaded blocks are inserted into the blockchain. The gap between blocks
must remain within the block queue capacity (configurable) to ensure proper
sequencing.

Once all blocks in the NeoFS container are processed, the service shuts
down automatically.

### NeoFS BlockFetcher Configuration
`NeoFSBlockFetcher` configuration section contains settings for NeoFS
BlockFetcher module and has the following structure:
```
NeoFSBlockFetcher:
Enabled: true
UnlockWallet:
Path: "./wallet.json"
Password: "pass"
Addresses:
- st1.storage.fs.neo.org:8080
Timeout: 10m
DownloaderWorkersCount: 500
OIDBatchSize: 8000
BQueueSize: 16000
SkipIndexFilesSearch: false
ContainerID: "EPGuD26wYgQJbmDdVBoYoNZiMKHwFMJT3A5WqPjdUHxH"
BlockAttribute: "block"
OidAttribute: "oid"
```
where:
- `Enabled` enables NeoFS BlockFetcher module.
- `UnlockWallet` contains wallet settings, see
[Unlock Wallet Configuration](https://github.com/nspcc-dev/neo-go/blob/master/docs/node-configuration.md#unlock-wallet-configuration)
section contains wallet settings to retrieve account to sign requests to
NeoFS. Without this setting, the module will use new generated private key.
- `Addresses` is a list of NeoFS storage nodes addresses.
- `Timeout` is a timeout for a single request to NeoFS storage node.
- `ContainerID` is a container ID to fetch blocks from.
- `BlockAttribute` is an attribute name of NeoFS object that contains block
data.
- `OidAttribute` is an attribute name of NeoFS object that contains OIDs of
blocks objects.
- `DownloaderWorkersCount` is a number of workers that download blocks from
NeoFS.
- `OIDBatchSize` is a number of OIDs to search in one request in case of
SkipIndexFilesSearch=true. And is a buffer size for OIDs in both modes.
- `BQueueSize` is a size of the block queue. It must be larger than
`OIDBatchSize` and highly recommended to be 2*`OIDBatchSize` or 3*`OIDBatchSize`.
- `SkipIndexFilesSearch` is a flag that allows skipping index files search
in NeoFS storage nodes and search for blocks directly. It is set to `false`
by default.
1 change: 1 addition & 0 deletions docs/node-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ node-related settings described in the table below.
| GarbageCollectionPeriod | `uint32` | 10000 | Controls MPT garbage collection interval (in blocks) for configurations with `RemoveUntraceableBlocks` enabled and `KeepOnlyLatestState` disabled. In this mode the node stores a number of MPT trees (corresponding to `MaxTraceableBlocks` and `StateSyncInterval`), but the DB needs to be clean from old entries from time to time. Doing it too often will cause too much processing overhead, doing it too rarely will leave more useless data in the DB. |
| KeepOnlyLatestState | `bool` | `false` | Specifies if MPT should only store the latest state (or a set of latest states, see `P2PStateExchangeExtensions` section in the ProtocolConfiguration for details). If true, DB size will be smaller, but older roots won't be accessible. This value should remain the same for the same database. | |
| LogPath | `string` | "", so only console logging | File path where to store node logs. |
| NeoFSBlockFetcher | [NeoFSBlockFetcher Configuration](#NeoFSBlockFetcher-Configuration) | | NeoFSBlockFetcher module configuration. See the [NeoFSBlockFetcher Configuration](https://github.com/nspcc-dev/neo-go/blob/master/docs/neofs-blockstorage.md#NeoFSBlockFetcher-Configuration) section for details. |
| Oracle | [Oracle Configuration](#Oracle-Configuration) | | Oracle module configuration. See the [Oracle Configuration](#Oracle-Configuration) section for details. |
| P2P | [P2P Configuration](#P2P-Configuration) | | Configuration values for P2P network interaction. See the [P2P Configuration](#P2P-Configuration) section for details. |
| P2PNotary | [P2P Notary Configuration](#P2P-Notary-Configuration) | | P2P Notary module configuration. See the [P2P Notary Configuration](#P2P-Notary-Configuration) section for details. |
Expand Down
23 changes: 17 additions & 6 deletions pkg/config/application_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@ type ApplicationConfiguration struct {
Pprof BasicService `yaml:"Pprof"`
Prometheus BasicService `yaml:"Prometheus"`

Relay bool `yaml:"Relay"`
Consensus Consensus `yaml:"Consensus"`
RPC RPC `yaml:"RPC"`
Oracle OracleConfiguration `yaml:"Oracle"`
P2PNotary P2PNotary `yaml:"P2PNotary"`
StateRoot StateRoot `yaml:"StateRoot"`
Relay bool `yaml:"Relay"`
Consensus Consensus `yaml:"Consensus"`
RPC RPC `yaml:"RPC"`
Oracle OracleConfiguration `yaml:"Oracle"`
P2PNotary P2PNotary `yaml:"P2PNotary"`
StateRoot StateRoot `yaml:"StateRoot"`
NeoFSBlockFetcher NeoFSBlockFetcher `yaml:"NeoFSBlockFetcher"`
}

// EqualsButServices returns true when the o is the same as a except for services
Expand Down Expand Up @@ -141,3 +142,13 @@ func (a *ApplicationConfiguration) GetAddresses() ([]AnnounceableAddress, error)
}
return addrs, nil
}

// Validate checks ApplicationConfiguration for internal consistency and returns
// an error if any invalid settings are found. This ensures that the application
// configuration is valid and safe to use for further operations.
func (a *ApplicationConfiguration) Validate() error {
if err := a.NeoFSBlockFetcher.Validate(); err != nil {
return fmt.Errorf("failed to validate NeoFSBlockFetcher section: %w", err)

Check warning on line 151 in pkg/config/application_config.go

View check run for this annotation

Codecov / codecov/patch

pkg/config/application_config.go#L151

Added line #L151 was not covered by tests
}
return nil
}
45 changes: 45 additions & 0 deletions pkg/config/blockfetcher_config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package config

import (
"errors"
"fmt"
"time"

cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
)

// NeoFSBlockFetcher represents the configuration for the NeoFS block fetcher service.
type NeoFSBlockFetcher struct {
InternalService `yaml:",inline"`
Timeout time.Duration `yaml:"Timeout"`
ContainerID string `yaml:"ContainerID"`
Addresses []string `yaml:"Addresses"`
OIDBatchSize int `yaml:"OIDBatchSize"` // valid only for SkipIndexFilesSearch = true
BlockAttribute string `yaml:"BlockAttribute"`
OidAttribute string `yaml:"OidAttribute"`
HeaderAttribute string `yaml:"HeaderAttribute"`
DownloaderWorkersCount int `yaml:"DownloaderWorkersCount"`
BQueueSize int `yaml:"BQueueSize"`
SkipIndexFilesSearch bool `yaml:"SkipIndexFilesSearch"`
}

// Validate checks NeoFSBlockFetcher for internal consistency and ensures
// that all required fields are properly set. It returns an error if the
// configuration is invalid or if the ContainerID cannot be properly decoded.
func (cfg *NeoFSBlockFetcher) Validate() error {
if !cfg.Enabled {
return nil
}
if cfg.ContainerID == "" {
return errors.New("container ID is not set")

Check warning on line 34 in pkg/config/blockfetcher_config.go

View check run for this annotation

Codecov / codecov/patch

pkg/config/blockfetcher_config.go#L33-L34

Added lines #L33 - L34 were not covered by tests
}
var containerID cid.ID
err := containerID.DecodeString(cfg.ContainerID)
if err != nil {
return fmt.Errorf("invalid container ID: %w", err)

Check warning on line 39 in pkg/config/blockfetcher_config.go

View check run for this annotation

Codecov / codecov/patch

pkg/config/blockfetcher_config.go#L36-L39

Added lines #L36 - L39 were not covered by tests
}
if cfg.BQueueSize < cfg.OIDBatchSize {
return fmt.Errorf("BQueueSize (%d) is lower than OIDBatchSize (%d)", cfg.BQueueSize, cfg.OIDBatchSize)

Check warning on line 42 in pkg/config/blockfetcher_config.go

View check run for this annotation

Codecov / codecov/patch

pkg/config/blockfetcher_config.go#L41-L42

Added lines #L41 - L42 were not covered by tests
}
return nil

Check warning on line 44 in pkg/config/blockfetcher_config.go

View check run for this annotation

Codecov / codecov/patch

pkg/config/blockfetcher_config.go#L44

Added line #L44 was not covered by tests
}
4 changes: 4 additions & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,10 @@ func LoadFile(configPath string, relativePath ...string) (Config, error) {
if err != nil {
return Config{}, err
}
err = config.ApplicationConfiguration.Validate()
if err != nil {
return Config{}, err

Check warning on line 121 in pkg/config/config.go

View check run for this annotation

Codecov / codecov/patch

pkg/config/config.go#L121

Added line #L121 was not covered by tests
}

return config, nil
}
Expand Down
1 change: 1 addition & 0 deletions pkg/network/bqueue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ func (bq *Queue) PutBlock(block *block.Block) error {
return nil
case Blocking:
bq.queueLock.Unlock()
bq.log.Info("checking free space in queue", zap.Uint32("index", block.Index))
t := time.NewTicker(time.Second)
defer t.Stop()
for range t.C {
Expand Down
Loading

0 comments on commit e035502

Please sign in to comment.