Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

services: add new block fetching from NeoFS service #3515

Merged
merged 5 commits into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 87 additions & 0 deletions cli/server/dump_bin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package server

import (
"fmt"
"os"
"path/filepath"

"github.com/nspcc-dev/neo-go/cli/cmdargs"
"github.com/nspcc-dev/neo-go/cli/options"
"github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/io"
"github.com/urfave/cli/v2"
)

func dumpBin(ctx *cli.Context) error {
AnnaShaleva marked this conversation as resolved.
Show resolved Hide resolved
AnnaShaleva marked this conversation as resolved.
Show resolved Hide resolved
if err := cmdargs.EnsureNone(ctx); err != nil {
return err

Check warning on line 17 in cli/server/dump_bin.go

View check run for this annotation

Codecov / codecov/patch

cli/server/dump_bin.go#L17

Added line #L17 was not covered by tests
}
cfg, err := options.GetConfigFromContext(ctx)
if err != nil {
return cli.Exit(err, 1)
}
log, _, logCloser, err := options.HandleLoggingParams(ctx.Bool("debug"), cfg.ApplicationConfiguration)
if err != nil {
return cli.Exit(err, 1)

Check warning on line 25 in cli/server/dump_bin.go

View check run for this annotation

Codecov / codecov/patch

cli/server/dump_bin.go#L25

Added line #L25 was not covered by tests
}
if logCloser != nil {
defer func() { _ = logCloser() }()

Check warning on line 28 in cli/server/dump_bin.go

View check run for this annotation

Codecov / codecov/patch

cli/server/dump_bin.go#L28

Added line #L28 was not covered by tests
}
count := uint32(ctx.Uint("count"))
start := uint32(ctx.Uint("start"))

chain, prometheus, pprof, err := initBCWithMetrics(cfg, log)
if err != nil {
return err

Check warning on line 35 in cli/server/dump_bin.go

View check run for this annotation

Codecov / codecov/patch

cli/server/dump_bin.go#L35

Added line #L35 was not covered by tests
}
defer func() {
pprof.ShutDown()
prometheus.ShutDown()
chain.Close()
}()

blocksCount := chain.BlockHeight() + 1
if start+count > blocksCount {
return cli.Exit(fmt.Errorf("chain is not that high (%d) to dump %d blocks starting from %d", blocksCount-1, count, start), 1)
}
if count == 0 {
count = blocksCount - start
}

out := ctx.String("out")
if out == "" {
return cli.Exit("output directory is not specified", 1)
}
if _, err = os.Stat(out); os.IsNotExist(err) {
AnnaShaleva marked this conversation as resolved.
Show resolved Hide resolved
if err = os.MkdirAll(out, os.ModePerm); err != nil {
return cli.Exit(fmt.Sprintf("failed to create directory %s: %s", out, err), 1)

Check warning on line 57 in cli/server/dump_bin.go

View check run for this annotation

Codecov / codecov/patch

cli/server/dump_bin.go#L57

Added line #L57 was not covered by tests
}
}
AnnaShaleva marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return cli.Exit(fmt.Sprintf("failed to check directory %s: %s", out, err), 1)

Check warning on line 61 in cli/server/dump_bin.go

View check run for this annotation

Codecov / codecov/patch

cli/server/dump_bin.go#L61

Added line #L61 was not covered by tests
}

for i := start; i < start+count; i++ {
blk, err := chain.GetBlock(chain.GetHeaderHash(i))
if err != nil {
return cli.Exit(fmt.Sprintf("failed to get block %d: %s", i, err), 1)

Check warning on line 67 in cli/server/dump_bin.go

View check run for this annotation

Codecov / codecov/patch

cli/server/dump_bin.go#L67

Added line #L67 was not covered by tests
}
filePath := filepath.Join(out, fmt.Sprintf("block-%d.bin", i))
if err = saveBlockToFile(blk, filePath); err != nil {
return cli.Exit(fmt.Sprintf("failed to save block %d to file %s: %s", i, filePath, err), 1)

Check warning on line 71 in cli/server/dump_bin.go

View check run for this annotation

Codecov / codecov/patch

cli/server/dump_bin.go#L71

Added line #L71 was not covered by tests
}
}
return nil
}

func saveBlockToFile(blk *block.Block, filePath string) error {
file, err := os.Create(filePath)
if err != nil {
return err

Check warning on line 80 in cli/server/dump_bin.go

View check run for this annotation

Codecov / codecov/patch

cli/server/dump_bin.go#L80

Added line #L80 was not covered by tests
}
defer file.Close()

writer := io.NewBinWriterFromIO(file)
blk.EncodeBinary(writer)
return writer.Err
}
91 changes: 91 additions & 0 deletions cli/server/dump_bin_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package server_test

import (
"os"
"path/filepath"
"strconv"
"testing"

"github.com/nspcc-dev/neo-go/internal/testcli"
"github.com/nspcc-dev/neo-go/pkg/config"
"github.com/nspcc-dev/neo-go/pkg/core/storage/dbconfig"
"github.com/stretchr/testify/require"
"gopkg.in/yaml.v3"
)

func TestDumpBin(t *testing.T) {
tmpDir := t.TempDir()

loadConfig := func(t *testing.T) config.Config {
chainPath := filepath.Join(tmpDir, "neogotestchain")
cfg, err := config.LoadFile(filepath.Join("..", "..", "config", "protocol.unit_testnet.yml"))
require.NoError(t, err, "could not load config")
cfg.ApplicationConfiguration.DBConfiguration.Type = dbconfig.LevelDB
cfg.ApplicationConfiguration.DBConfiguration.LevelDBOptions.DataDirectoryPath = chainPath
return cfg
}

cfg := loadConfig(t)
out, err := yaml.Marshal(cfg)
require.NoError(t, err)

cfgPath := filepath.Join(tmpDir, "protocol.unit_testnet.yml")
require.NoError(t, os.WriteFile(cfgPath, out, os.ModePerm))

e := testcli.NewExecutor(t, false)

restoreArgs := []string{"neo-go", "db", "restore",
"--config-file", cfgPath, "--in", inDump}
e.Run(t, restoreArgs...)

t.Run("missing output directory", func(t *testing.T) {
args := []string{"neo-go", "db", "dump-bin",
"--config-file", cfgPath, "--out", ""}
e.RunWithErrorCheck(t, "output directory is not specified", args...)
})

t.Run("successful dump", func(t *testing.T) {
outDir := filepath.Join(tmpDir, "blocks")
args := []string{"neo-go", "db", "dump-bin",
"--config-file", cfgPath, "--out", outDir, "--count", "5", "--start", "0"}

e.Run(t, args...)

require.DirExists(t, outDir)

for i := range 5 {
blockFile := filepath.Join(outDir, "block-"+strconv.Itoa(i)+".bin")
require.FileExists(t, blockFile)
}
})

t.Run("invalid block range", func(t *testing.T) {
outDir := filepath.Join(tmpDir, "invalid-blocks")
args := []string{"neo-go", "db", "dump-bin",
"--config-file", cfgPath, "--out", outDir, "--count", "1000", "--start", "0"}

e.RunWithError(t, args...)
})

t.Run("zero blocks (full chain dump)", func(t *testing.T) {
outDir := filepath.Join(tmpDir, "full-dump")
args := []string{"neo-go", "db", "dump-bin",
"--config-file", cfgPath, "--out", outDir}

e.Run(t, args...)

require.DirExists(t, outDir)
for i := range 5 {
blockFile := filepath.Join(outDir, "block-"+strconv.Itoa(i)+".bin")
require.FileExists(t, blockFile)
}
})

t.Run("invalid config file", func(t *testing.T) {
outDir := filepath.Join(tmpDir, "blocks")
args := []string{"neo-go", "db", "dump-bin",
"--config-file", "invalid-config-path", "--out", outDir}

e.RunWithError(t, args...)
})
}
7 changes: 7 additions & 0 deletions cli/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,13 @@ func NewCommands() []*cli.Command {
Action: dumpDB,
Flags: cfgCountOutFlags,
},
{
Name: "dump-bin",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe not dump, but fetch.

Copy link
Member

@AnnaShaleva AnnaShaleva Aug 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@roman-khimov, why fetch? This command saves dump of every block to a separate file, it doesn't fetch them from the network.

Usage: "Dump blocks (starting with the genesis or specified block) to the directory in binary format",
UsageText: "neo-go db dump-bin -o directory [-s start] [-c count] [--config-path path] [-p/-m/-t] [--config-file file]",
Action: dumpBin,
Flags: cfgCountOutFlags,
},
{
Name: "restore",
Usage: "Restore blocks from the file",
Expand Down
13 changes: 13 additions & 0 deletions config/protocol.testnet.yml
Original file line number Diff line number Diff line change
Expand Up @@ -100,3 +100,16 @@ ApplicationConfiguration:
Enabled: false
Addresses:
- ":2113"
NeoFSBlockFetcher:
Enabled: false
Addresses:
- st1.storage.fs.neo.org:8080
Timeout: 10m
DownloaderWorkersCount: 500
OIDBatchSize: 8000
BQueueSize: 16000 # must be larger than OIDBatchSize; recommended to be 2*OIDBatchSize or 3*OIDBatchSize
SkipIndexFilesSearch: false
IndexFileSize: 128000
ContainerID: "EPGuD26wYgQJbmDdVBoYoNZiMKHwFMJT3A5WqPjdUHxH"
BlockAttribute: "block"
IndexFileAttribute: "oid"
74 changes: 74 additions & 0 deletions docs/neofs-blockstorage.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
# NeoFS block storage

Using NeoFS to store chain's blocks and snapshots was proposed in
[#3463](https://github.com/neo-project/neo/issues/3463). NeoGo contains several
extensions utilizing NeoFS block storage aimed to improve node synchronization
efficiency and reduce node storage size.

## Components and functionality

### Block storage schema

A single NeoFS container is used to store blocks and index files. Each block
is stored in a binary form as a separate object with a unique OID and a set of
attributes:
- block object identifier with block index value (`block:1`)
- primary node index (`primary:0`)
- block hash in the LE form (`hash:5412a781caf278c0736556c0e544c7cfdbb6e3c62ae221ef53646be89364566b`)
- previous block hash in the LE form (`prevHash:3654a054d82a8178c7dfacecc2c57282e23468a42ee407f14506368afe22d929`)
- millisecond-precision block timestamp (`time:1627894840919`)

Each index file is an object containing a constant-sized batch of raw block object
IDs in binary form ordered by block index. Each index file is marked with the
following attributes:
- index file identifier with consecutive file index value (`oid:0`)
- the number of OIDs included into index file (`size:128000`)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@roman-khimov, do we need size:128000 attribute?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just as an option, various index files size for different schemas.


### NeoFS BlockFetcher

NeoFS BlockFetcher service is designed as an alternative to P2P synchronisation
protocol. It allows to download blocks from a trusted container in the NeoFS network
and persist them to database using standard verification flow. NeoFS BlockFetcher
service primarily used during the node's bootstrap, providing a fast alternative to
P2P blocks synchronisation.

NeoFS BlockFetcher service has two modes of operation:
- Index File Search: Search for index files, which contain batches of block object
IDs and fetch blocks from NeoFS by retrieved OIDs.
- Direct Block Search: Search and fetch blocks directly from NeoFS container via
built-in NeoFS object search mechanism.

Operation mode of BlockFetcher can be configured via `SkipIndexFilesSearch`
parameter.

#### Operation flow

1. **OID Fetching**:
Depending on the mode, the service either:
- Searches for index files by index file attribute and reads block OIDs from index
file object-by-object.
- Searches batches of blocks directly by block attribute (the batch size is
configured via `OIDBatchSize` parameter).

Once the OIDs are retrieved, they are immediately redirected to the
block downloading routines for further processing. The channel that
is used to redirect block OIDs to downloading routines is buffered
to provide smooth OIDs delivery without delays. The size of this channel
can be configured via `OIDBatchSize` parameter and equals to `2*OIDBatchSize`.
2. **Parallel Block Downloading**:
The number of downloading routines can be configured via
`DownloaderWorkersCount` parameter. It's up to the user to find the
balance between the downloading speed and blocks persist speed for every
node that uses NeoFS BlockFetcher. Downloaded blocks are placed into a
buffered channel of size `IDBatchSize` with further redirection to the
block queue.
3. **Block Insertion**:
Downloaded blocks are inserted into the blockchain using the same logic
as in the P2P synchronisation protocol. The block queue is used to order
downloaded blocks before they are inserted into the blockchain. The
size of the queue can be configured via the `BQueueSize` parameter
and should be larger than the `OIDBatchSize` parameter to avoid blocking
the downloading routines.

Once all blocks available in the NeoFS container are processed, the service
shuts down automatically.
50 changes: 50 additions & 0 deletions docs/node-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ node-related settings described in the table below.
| GarbageCollectionPeriod | `uint32` | 10000 | Controls MPT garbage collection interval (in blocks) for configurations with `RemoveUntraceableBlocks` enabled and `KeepOnlyLatestState` disabled. In this mode the node stores a number of MPT trees (corresponding to `MaxTraceableBlocks` and `StateSyncInterval`), but the DB needs to be clean from old entries from time to time. Doing it too often will cause too much processing overhead, doing it too rarely will leave more useless data in the DB. |
| KeepOnlyLatestState | `bool` | `false` | Specifies if MPT should only store the latest state (or a set of latest states, see `P2PStateExchangeExtensions` section in the ProtocolConfiguration for details). If true, DB size will be smaller, but older roots won't be accessible. This value should remain the same for the same database. | |
| LogPath | `string` | "", so only console logging | File path where to store node logs. |
| NeoFSBlockFetcher | [NeoFS BlockFetcher Configuration](#NeoFS-BlockFetcher-Configuration) | | NeoFS BlockFetcher module configuration. See the [NeoFS BlockFetcher Configuration](#NeoFS-BlockFetcher-Configuration) section for details. |
| Oracle | [Oracle Configuration](#Oracle-Configuration) | | Oracle module configuration. See the [Oracle Configuration](#Oracle-Configuration) section for details. |
| P2P | [P2P Configuration](#P2P-Configuration) | | Configuration values for P2P network interaction. See the [P2P Configuration](#P2P-Configuration) section for details. |
| P2PNotary | [P2P Notary Configuration](#P2P-Notary-Configuration) | | P2P Notary module configuration. See the [P2P Notary Configuration](#P2P-Notary-Configuration) section for details. |
Expand Down Expand Up @@ -153,6 +154,55 @@ where:
Please, refer to the [Notary module documentation](./notary.md#Notary node module) for
details on module features.

### NeoFS BlockFetcher Configuration

`NeoFSBlockFetcher` configuration section contains settings for NeoFS
BlockFetcher module and has the following structure:
```
NeoFSBlockFetcher:
Enabled: true
UnlockWallet:
Path: "./wallet.json"
Password: "pass"
Addresses:
- st1.storage.fs.neo.org:8080
Timeout: 10m
DownloaderWorkersCount: 500
OIDBatchSize: 8000
BQueueSize: 16000
SkipIndexFilesSearch: false
ContainerID: "EPGuD26wYgQJbmDdVBoYoNZiMKHwFMJT3A5WqPjdUHxH"
BlockAttribute: "block"
IndexFileAttribute: "oid"
IndexFileSize: 128000
```
where:
- `Enabled` enables NeoFS BlockFetcher module.
- `UnlockWallet` contains wallet settings to retrieve account to sign requests to
NeoFS. Without this setting, the module will use randomly generated private key.
For configuration details see [Unlock Wallet Configuration](#Unlock-Wallet-Configuration)
- `Addresses` is a list of NeoFS storage nodes addresses.
- `Timeout` is a timeout for a single request to NeoFS storage node.
- `ContainerID` is a container ID to fetch blocks from.
- `BlockAttribute` is an attribute name of NeoFS object that contains block
data.
- `IndexFileAttribute` is an attribute name of NeoFS index object that contains block
object IDs.
- `DownloaderWorkersCount` is a number of workers that download blocks from
NeoFS in parallel.
- `OIDBatchSize` is the number of blocks to search per a single request to NeoFS
in case of disabled index files search. Also, for both modes of BlockFetcher
operation this setting manages the buffer size of OIDs and blocks transferring
channels.
- `BQueueSize` is a size of the block queue used to manage consecutive blocks
addition to the chain. It must be larger than `OIDBatchSize` and highly recommended
to be `2*OIDBatchSize` or `3*OIDBatchSize`.
- `SkipIndexFilesSearch` is a flag that allows to skip index files search and search
for blocks directly. It is set to `false` by default.
- `IndexFileSize` is the number of OID objects stored in the index files. This
setting depends on the NeoFS block storage configuration and is applicable only if
`SkipIndexFilesSearch` is set to `false`.

### Metrics Services Configuration

Metrics services configuration describes options for metrics services (pprof,
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ require (
golang.org/x/term v0.23.0
golang.org/x/text v0.17.0
golang.org/x/tools v0.24.0
google.golang.org/grpc v1.62.0
gopkg.in/yaml.v3 v3.0.1
)

Expand Down Expand Up @@ -72,7 +73,6 @@ require (
golang.org/x/sync v0.8.0 // indirect
golang.org/x/sys v0.23.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240221002015-b0ce06bbee7c // indirect
google.golang.org/grpc v1.62.0 // indirect
google.golang.org/protobuf v1.34.2 // indirect
rsc.io/tmplfunc v0.0.3 // indirect
)
23 changes: 17 additions & 6 deletions pkg/config/application_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@
Pprof BasicService `yaml:"Pprof"`
Prometheus BasicService `yaml:"Prometheus"`

Relay bool `yaml:"Relay"`
Consensus Consensus `yaml:"Consensus"`
RPC RPC `yaml:"RPC"`
Oracle OracleConfiguration `yaml:"Oracle"`
P2PNotary P2PNotary `yaml:"P2PNotary"`
StateRoot StateRoot `yaml:"StateRoot"`
Relay bool `yaml:"Relay"`
Consensus Consensus `yaml:"Consensus"`
RPC RPC `yaml:"RPC"`
Oracle OracleConfiguration `yaml:"Oracle"`
P2PNotary P2PNotary `yaml:"P2PNotary"`
StateRoot StateRoot `yaml:"StateRoot"`
NeoFSBlockFetcher NeoFSBlockFetcher `yaml:"NeoFSBlockFetcher"`
}

// EqualsButServices returns true when the o is the same as a except for services
Expand Down Expand Up @@ -141,3 +142,13 @@
}
return addrs, nil
}

// Validate checks ApplicationConfiguration for internal consistency and returns
// an error if any invalid settings are found. This ensures that the application
// configuration is valid and safe to use for further operations.
func (a *ApplicationConfiguration) Validate() error {
AnnaShaleva marked this conversation as resolved.
Show resolved Hide resolved
if err := a.NeoFSBlockFetcher.Validate(); err != nil {
return fmt.Errorf("invalid NeoFSBlockFetcher config: %w", err)

Check warning on line 151 in pkg/config/application_config.go

View check run for this annotation

Codecov / codecov/patch

pkg/config/application_config.go#L151

Added line #L151 was not covered by tests
}
return nil
AnnaShaleva marked this conversation as resolved.
Show resolved Hide resolved
}
Loading
Loading