Skip to content

Commit

Permalink
services: add new service for fetching blocks from NeoFS
Browse files Browse the repository at this point in the history
Close #3496

Signed-off-by: Ekaterina Pavlova <[email protected]>
  • Loading branch information
AliceInHunterland committed Aug 21, 2024
1 parent dc6c195 commit cabf922
Show file tree
Hide file tree
Showing 12 changed files with 683 additions and 22 deletions.
92 changes: 92 additions & 0 deletions cli/server/dump_bin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package server

import (
"fmt"
"os"
"path/filepath"

"github.com/nspcc-dev/neo-go/cli/cmdargs"
"github.com/nspcc-dev/neo-go/cli/options"
"github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/io"
"github.com/urfave/cli/v2"
)

func fetchBin(ctx *cli.Context) error {
var err error
if err := cmdargs.EnsureNone(ctx); err != nil {
return err

Check warning on line 18 in cli/server/dump_bin.go

View check run for this annotation

Codecov / codecov/patch

cli/server/dump_bin.go#L15-L18

Added lines #L15 - L18 were not covered by tests
}
cfg, err := options.GetConfigFromContext(ctx)
if err != nil {
return cli.Exit(err, 1)

Check warning on line 22 in cli/server/dump_bin.go

View check run for this annotation

Codecov / codecov/patch

cli/server/dump_bin.go#L20-L22

Added lines #L20 - L22 were not covered by tests
}
log, _, logCloser, err := options.HandleLoggingParams(ctx.Bool("debug"), cfg.ApplicationConfiguration)
if err != nil {
return cli.Exit(err, 1)

Check warning on line 26 in cli/server/dump_bin.go

View check run for this annotation

Codecov / codecov/patch

cli/server/dump_bin.go#L24-L26

Added lines #L24 - L26 were not covered by tests
}
if logCloser != nil {
defer func() { _ = logCloser() }()

Check warning on line 29 in cli/server/dump_bin.go

View check run for this annotation

Codecov / codecov/patch

cli/server/dump_bin.go#L28-L29

Added lines #L28 - L29 were not covered by tests
}
count := uint32(ctx.Uint("count"))
start := uint32(ctx.Uint("start"))

Check warning on line 32 in cli/server/dump_bin.go

View check run for this annotation

Codecov / codecov/patch

cli/server/dump_bin.go#L31-L32

Added lines #L31 - L32 were not covered by tests

chain, prometheus, pprof, err := initBCWithMetrics(cfg, log)
if err != nil {
return err

Check warning on line 36 in cli/server/dump_bin.go

View check run for this annotation

Codecov / codecov/patch

cli/server/dump_bin.go#L34-L36

Added lines #L34 - L36 were not covered by tests
}
defer func() {
pprof.ShutDown()
prometheus.ShutDown()
chain.Close()
}()

Check warning on line 42 in cli/server/dump_bin.go

View check run for this annotation

Codecov / codecov/patch

cli/server/dump_bin.go#L38-L42

Added lines #L38 - L42 were not covered by tests

chainCount := chain.BlockHeight() + 1
if start+count > chainCount {
return cli.Exit(fmt.Errorf("chain is not that high (%d) to dump %d blocks starting from %d", chainCount-1, count, start), 1)

Check warning on line 46 in cli/server/dump_bin.go

View check run for this annotation

Codecov / codecov/patch

cli/server/dump_bin.go#L44-L46

Added lines #L44 - L46 were not covered by tests
}
if count == 0 {
count = chainCount - start

Check warning on line 49 in cli/server/dump_bin.go

View check run for this annotation

Codecov / codecov/patch

cli/server/dump_bin.go#L48-L49

Added lines #L48 - L49 were not covered by tests
}

testDir := ctx.String("out")
if _, err = os.Stat(testDir); os.IsNotExist(err) {
if err = os.MkdirAll(testDir, 0755); err != nil {
return cli.Exit(fmt.Sprintf("failed to create directory %s: %v", testDir, err), 1)

Check warning on line 55 in cli/server/dump_bin.go

View check run for this annotation

Codecov / codecov/patch

cli/server/dump_bin.go#L52-L55

Added lines #L52 - L55 were not covered by tests
}
}

for i := start; i < start+count; i++ {
bh := chain.GetHeaderHash(i)
blk, err2 := chain.GetBlock(bh)
if err2 != nil {
return cli.Exit(fmt.Sprintf("failed to get block %d: %v", i, err), 1)

Check warning on line 63 in cli/server/dump_bin.go

View check run for this annotation

Codecov / codecov/patch

cli/server/dump_bin.go#L59-L63

Added lines #L59 - L63 were not covered by tests
}
filePath := filepath.Join(testDir, fmt.Sprintf("block-%d.bin", i))
if err = saveBlockToFile(blk, filePath); err != nil {
return cli.Exit(fmt.Sprintf("failed to save block %d to file: %v", i, err), 1)

Check warning on line 67 in cli/server/dump_bin.go

View check run for this annotation

Codecov / codecov/patch

cli/server/dump_bin.go#L65-L67

Added lines #L65 - L67 were not covered by tests
}
}
return nil

Check warning on line 70 in cli/server/dump_bin.go

View check run for this annotation

Codecov / codecov/patch

cli/server/dump_bin.go#L70

Added line #L70 was not covered by tests
}

func saveBlockToFile(blk *block.Block, filePath string) error {
file, err := os.Create(filePath)
if err != nil {
return err

Check warning on line 76 in cli/server/dump_bin.go

View check run for this annotation

Codecov / codecov/patch

cli/server/dump_bin.go#L73-L76

Added lines #L73 - L76 were not covered by tests
}
defer file.Close()

Check warning on line 78 in cli/server/dump_bin.go

View check run for this annotation

Codecov / codecov/patch

cli/server/dump_bin.go#L78

Added line #L78 was not covered by tests

writer := io.NewBinWriterFromIO(file)

Check warning on line 80 in cli/server/dump_bin.go

View check run for this annotation

Codecov / codecov/patch

cli/server/dump_bin.go#L80

Added line #L80 was not covered by tests

var buf = io.NewBufBinWriter()
blk.EncodeBinary(buf.BinWriter)
bytes := buf.Bytes()

Check warning on line 84 in cli/server/dump_bin.go

View check run for this annotation

Codecov / codecov/patch

cli/server/dump_bin.go#L82-L84

Added lines #L82 - L84 were not covered by tests

writer.WriteU32LE(uint32(len(bytes)))
blk.EncodeBinary(writer)
if writer.Err != nil {
return writer.Err

Check warning on line 89 in cli/server/dump_bin.go

View check run for this annotation

Codecov / codecov/patch

cli/server/dump_bin.go#L86-L89

Added lines #L86 - L89 were not covered by tests
}
return nil

Check warning on line 91 in cli/server/dump_bin.go

View check run for this annotation

Codecov / codecov/patch

cli/server/dump_bin.go#L91

Added line #L91 was not covered by tests
}
7 changes: 7 additions & 0 deletions cli/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,13 @@ func NewCommands() []*cli.Command {
Action: dumpDB,
Flags: cfgCountOutFlags,
},
{
Name: "fetch-bin",
Usage: "Dump blocks (starting with the genesis or specified block) to the directory in binary format",
UsageText: "neo-go db fetch-bin -o directory [-s start] [-c count] [--config-path path] [-p/-m/-t] [--config-file file]",
Action: fetchBin,
Flags: cfgCountOutFlags,
},
{
Name: "restore",
Usage: "Restore blocks from the file",
Expand Down
8 changes: 8 additions & 0 deletions config/protocol.testnet.yml
Original file line number Diff line number Diff line change
Expand Up @@ -100,3 +100,11 @@ ApplicationConfiguration:
Enabled: false
Addresses:
- ":2113"
NeoFSBlockFetcher:
Enabled: true
Addresses:
- st1.t5.fs.neo.org:8080
Timeout: 10s
ContainerID: "9iVfUg8aDHKjPC4LhQXEkVUM4HDkR7UCXYLs8NQwYfSG"
Mode: "indexSearch"

2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ require (
golang.org/x/term v0.18.0
golang.org/x/text v0.14.0
golang.org/x/tools v0.19.0
google.golang.org/grpc v1.62.0
gopkg.in/yaml.v3 v3.0.1
)

Expand Down Expand Up @@ -67,7 +68,6 @@ require (
golang.org/x/net v0.23.0 // indirect
golang.org/x/sys v0.18.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240221002015-b0ce06bbee7c // indirect
google.golang.org/grpc v1.62.0 // indirect
google.golang.org/protobuf v1.33.0 // indirect
rsc.io/tmplfunc v0.0.3 // indirect
)
6 changes: 6 additions & 0 deletions internal/fakechain/fakechain.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/nspcc-dev/neo-go/pkg/core/mpt"
"github.com/nspcc-dev/neo-go/pkg/core/native"
"github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/core/storage"
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
"github.com/nspcc-dev/neo-go/pkg/crypto/hash"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
Expand Down Expand Up @@ -405,6 +406,11 @@ func (chain *FakeChain) UnsubscribeFromTransactions(ch chan *transaction.Transac
panic("TODO")
}

// LastBatch returns last persisted storage batch.
func (chain *FakeChain) LastBatch() *storage.MemBatch {
panic("TODO")
}

// AddBlock implements the StateSync interface.
func (s *FakeStateSync) AddBlock(block *block.Block) error {
panic("TODO")
Expand Down
20 changes: 14 additions & 6 deletions pkg/config/application_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@ type ApplicationConfiguration struct {
Pprof BasicService `yaml:"Pprof"`
Prometheus BasicService `yaml:"Prometheus"`

Relay bool `yaml:"Relay"`
Consensus Consensus `yaml:"Consensus"`
RPC RPC `yaml:"RPC"`
Oracle OracleConfiguration `yaml:"Oracle"`
P2PNotary P2PNotary `yaml:"P2PNotary"`
StateRoot StateRoot `yaml:"StateRoot"`
Relay bool `yaml:"Relay"`
Consensus Consensus `yaml:"Consensus"`
RPC RPC `yaml:"RPC"`
Oracle OracleConfiguration `yaml:"Oracle"`
P2PNotary P2PNotary `yaml:"P2PNotary"`
StateRoot StateRoot `yaml:"StateRoot"`
NeoFSBlockFetcher NeoFSBlockFetcher `yaml:"NeoFSBlockFetcher"`
}

// EqualsButServices returns true when the o is the same as a except for services
Expand Down Expand Up @@ -145,3 +146,10 @@ func (a *ApplicationConfiguration) GetAddresses() ([]AnnounceableAddress, error)
}
return addrs, nil
}

func (a *ApplicationConfiguration) Validate() error {
if err := a.NeoFSBlockFetcher.Validate(); err != nil {
return err

Check warning on line 152 in pkg/config/application_config.go

View check run for this annotation

Codecov / codecov/patch

pkg/config/application_config.go#L150-L152

Added lines #L150 - L152 were not covered by tests
}
return nil

Check warning on line 154 in pkg/config/application_config.go

View check run for this annotation

Codecov / codecov/patch

pkg/config/application_config.go#L154

Added line #L154 was not covered by tests
}
33 changes: 33 additions & 0 deletions pkg/config/blockfetcher_config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package config

import (
"errors"
"time"

cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
)

// NeoFSBlockFetcher represents the configuration for the blockfetcher service.
type (
NeoFSBlockFetcher struct {
Timeout time.Duration `yaml:"Timeout"`
ContainerID string `yaml:"ContainerID"`
Mode string `yaml:"Mode"`
BasicService `yaml:",inline"`
}
)

func (f NeoFSBlockFetcher) Validate() error {
if f.Timeout == 0 {
return errors.New("timeout is not set")

Check warning on line 22 in pkg/config/blockfetcher_config.go

View check run for this annotation

Codecov / codecov/patch

pkg/config/blockfetcher_config.go#L20-L22

Added lines #L20 - L22 were not covered by tests
}
if f.ContainerID == "" {
return errors.New("container ID is not set")

Check warning on line 25 in pkg/config/blockfetcher_config.go

View check run for this annotation

Codecov / codecov/patch

pkg/config/blockfetcher_config.go#L24-L25

Added lines #L24 - L25 were not covered by tests
}
var containerID cid.ID
err := containerID.DecodeString(f.ContainerID)
if err != nil {
return errors.New("invalid container ID")

Check warning on line 30 in pkg/config/blockfetcher_config.go

View check run for this annotation

Codecov / codecov/patch

pkg/config/blockfetcher_config.go#L27-L30

Added lines #L27 - L30 were not covered by tests
}
return nil

Check warning on line 32 in pkg/config/blockfetcher_config.go

View check run for this annotation

Codecov / codecov/patch

pkg/config/blockfetcher_config.go#L32

Added line #L32 was not covered by tests
}
27 changes: 26 additions & 1 deletion pkg/network/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/nspcc-dev/neo-go/pkg/network/capability"
"github.com/nspcc-dev/neo-go/pkg/network/extpool"
"github.com/nspcc-dev/neo-go/pkg/network/payload"
"github.com/nspcc-dev/neo-go/pkg/services/blockfetcher"
"github.com/nspcc-dev/neo-go/pkg/util"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -103,10 +104,12 @@ type (
chain Ledger
bQueue *bqueue.Queue
bSyncQueue *bqueue.Queue
blockFetcherQueue *bqueue.Queue
mempool *mempool.Pool
notaryRequestPool *mempool.Pool
extensiblePool *extpool.Pool
notaryFeer NotaryFeer
blockFetcher *blockfetcher.Service

serviceLock sync.RWMutex
services map[string]Service
Expand Down Expand Up @@ -220,6 +223,9 @@ func newServerFromConstructors(config ServerConfig, chain Ledger, stSync StateSy

s.bSyncQueue = bqueue.New(s.stateSync, log, nil, updateBlockQueueLenMetric)

s.blockFetcherQueue = bqueue.New(chain, log, nil, updateBlockQueueLenMetric)
s.blockFetcher = blockfetcher.New(chain, s.NeoFSCfg, s.blockFetcherQueue, log)

if s.MinPeers < 0 {
s.log.Info("bad MinPeers configured, using the default value",
zap.Int("configured", s.MinPeers),
Expand Down Expand Up @@ -295,6 +301,13 @@ func (s *Server) Start() {
go s.relayBlocksLoop()
go s.bQueue.Run()
go s.bSyncQueue.Run()
if s.ServerConfig.NeoFSCfg.Enabled {
go s.blockFetcherQueue.Run()
go func() {
s.blockFetcher.Start()
s.log.Info("BlockFetcher service finished")
}()

Check warning on line 309 in pkg/network/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/network/server.go#L305-L309

Added lines #L305 - L309 were not covered by tests
}
for _, tr := range s.transports {
go tr.Accept()
}
Expand All @@ -319,6 +332,7 @@ func (s *Server) Shutdown() {
}
s.bQueue.Discard()
s.bSyncQueue.Discard()
s.blockFetcherQueue.Discard()
s.serviceLock.RLock()
for _, svc := range s.services {
svc.Shutdown()
Expand Down Expand Up @@ -706,7 +720,7 @@ func (s *Server) IsInSync() bool {
var peersNumber int
var notHigher int

if s.stateSync.IsActive() {
if s.stateSync.IsActive() || s.blockFetcher.IsActive() {
return false
}

Expand Down Expand Up @@ -769,6 +783,10 @@ func (s *Server) handleBlockCmd(p Peer, block *block.Block) error {
if s.stateSync.IsActive() {
return s.bSyncQueue.PutBlock(block)
}
if s.blockFetcher.IsActive() {
return nil

Check warning on line 787 in pkg/network/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/network/server.go#L787

Added line #L787 was not covered by tests
//return s.blockFetcherQueue.PutBlock(block)
}
return s.bQueue.PutBlock(block)
}

Expand All @@ -788,6 +806,9 @@ func (s *Server) handlePing(p Peer, ping *payload.Ping) error {
func (s *Server) requestBlocksOrHeaders(p Peer) error {
if s.stateSync.NeedHeaders() {
if s.chain.HeaderHeight() < p.LastBlockIndex() {
if s.blockFetcher.IsActive() {
return s.blockFetcher.GetHeaders()

Check warning on line 810 in pkg/network/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/network/server.go#L810

Added line #L810 was not covered by tests
}
return s.requestHeaders(p)
}
return nil
Expand Down Expand Up @@ -1434,6 +1455,10 @@ func (s *Server) handleMessage(peer Peer, msg *Message) error {
}

func (s *Server) tryInitStateSync() {
if s.blockFetcher.IsActive() {
s.log.Info("Postponing StateSync until BlockFetcher completes")
return

Check warning on line 1460 in pkg/network/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/network/server.go#L1459-L1460

Added lines #L1459 - L1460 were not covered by tests
}
if !s.stateSync.IsActive() {
s.bSyncQueue.Discard()
return
Expand Down
4 changes: 4 additions & 0 deletions pkg/network/server_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ type (

// BroadcastFactor is the factor (0-100) for fan-out optimization.
BroadcastFactor int

// NeoFSCfg is NeoFS configuration.
NeoFSCfg config.NeoFSBlockFetcher
}
)

Expand Down Expand Up @@ -107,6 +110,7 @@ func NewServerConfig(cfg config.Config) (ServerConfig, error) {
StateRootCfg: appConfig.StateRoot,
ExtensiblePoolSize: appConfig.P2P.ExtensiblePoolSize,
BroadcastFactor: appConfig.P2P.BroadcastFactor,
NeoFSCfg: appConfig.NeoFSBlockFetcher,
}
return c, nil
}
Loading

0 comments on commit cabf922

Please sign in to comment.