Skip to content

Commit

Permalink
services: add new service for fetching blocks from NeoFS
Browse files Browse the repository at this point in the history
Close #3496

Signed-off-by: Ekaterina Pavlova <[email protected]>
  • Loading branch information
AliceInHunterland committed Aug 12, 2024
1 parent 79e7898 commit 616393d
Show file tree
Hide file tree
Showing 13 changed files with 413 additions and 13 deletions.
9 changes: 5 additions & 4 deletions cli/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
"github.com/nspcc-dev/neo-go/pkg/io"
"github.com/nspcc-dev/neo-go/pkg/network"
"github.com/nspcc-dev/neo-go/pkg/services/blockfetcher"
"github.com/nspcc-dev/neo-go/pkg/services/metrics"
"github.com/nspcc-dev/neo-go/pkg/services/notary"
"github.com/nspcc-dev/neo-go/pkg/services/oracle"
Expand Down Expand Up @@ -287,9 +288,9 @@ func restoreDB(ctx *cli.Context) error {

gctx := newGraceContext()
var lastIndex uint32
dump := newDump()
dump := blockfetcher.NewDump()
defer func() {
_ = dump.tryPersist(dumpDir, lastIndex)
_ = dump.TryPersist(dumpDir, lastIndex)
}()

var f = func(b *block.Block) error {
Expand All @@ -312,10 +313,10 @@ func restoreDB(ctx *cli.Context) error {
if batch == nil && b.Index == 0 {
return nil
}
dump.add(b.Index, batch)
dump.Add(b.Index, batch)
lastIndex = b.Index
if b.Index%1000 == 0 {
if err := dump.tryPersist(dumpDir, b.Index); err != nil {
if err := dump.TryPersist(dumpDir, b.Index); err != nil {
return fmt.Errorf("can't dump storage to file: %w", err)
}
}
Expand Down
6 changes: 6 additions & 0 deletions internal/fakechain/fakechain.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/nspcc-dev/neo-go/pkg/core/mpt"
"github.com/nspcc-dev/neo-go/pkg/core/native"
"github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/core/storage"
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
"github.com/nspcc-dev/neo-go/pkg/crypto/hash"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
Expand Down Expand Up @@ -405,6 +406,11 @@ func (chain *FakeChain) UnsubscribeFromTransactions(ch chan *transaction.Transac
panic("TODO")
}

// LastBatch returns last persisted storage batch.
func (chain *FakeChain) LastBatch() *storage.MemBatch {
panic("TODO")
}

// AddBlock implements the StateSync interface.
func (s *FakeStateSync) AddBlock(block *block.Block) error {
panic("TODO")
Expand Down
1 change: 1 addition & 0 deletions pkg/config/application_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type ApplicationConfiguration struct {
Oracle OracleConfiguration `yaml:"Oracle"`
P2PNotary P2PNotary `yaml:"P2PNotary"`
StateRoot StateRoot `yaml:"StateRoot"`
NeoFS NeoFS `yaml:"NeoFS"`
}

// EqualsButServices returns true when the o is the same as a except for services
Expand Down
14 changes: 14 additions & 0 deletions pkg/config/neofs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package config

import "time"

// NeoFS represents the configuration for the blockfetcher service.
type (
NeoFS struct {
Nodes []string `yaml:"Nodes"`
Timeout time.Duration `yaml:"Timeout"`
ContainerID string `yaml:"ContainerID"`
DumpDir string `yaml:"DumpDir"`
Restore bool `yaml:"Restore"`
}
)
11 changes: 11 additions & 0 deletions pkg/core/storage/dbconfig/store_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,14 @@ type (
ReadOnly bool `yaml:"ReadOnly"`
}
)

// FilePath returns the file path for the DB. In case "inmemory" DB is used, it returns an empty string.
func (db DBConfiguration) FilePath() string {
switch db.Type {
case "boltdb":
return db.BoltDBOptions.FilePath
case "leveldb":
return db.LevelDBOptions.DataDirectoryPath

Check warning on line 32 in pkg/core/storage/dbconfig/store_config.go

View check run for this annotation

Codecov / codecov/patch

pkg/core/storage/dbconfig/store_config.go#L27-L32

Added lines #L27 - L32 were not covered by tests
}
return ""

Check warning on line 34 in pkg/core/storage/dbconfig/store_config.go

View check run for this annotation

Codecov / codecov/patch

pkg/core/storage/dbconfig/store_config.go#L34

Added line #L34 was not covered by tests
}
15 changes: 15 additions & 0 deletions pkg/network/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@ import (
"github.com/nspcc-dev/neo-go/pkg/core/mempool"
"github.com/nspcc-dev/neo-go/pkg/core/mempoolevent"
"github.com/nspcc-dev/neo-go/pkg/core/mpt"
"github.com/nspcc-dev/neo-go/pkg/core/storage"
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
"github.com/nspcc-dev/neo-go/pkg/encoding/address"
"github.com/nspcc-dev/neo-go/pkg/io"
"github.com/nspcc-dev/neo-go/pkg/network/bqueue"
"github.com/nspcc-dev/neo-go/pkg/network/capability"
"github.com/nspcc-dev/neo-go/pkg/network/extpool"
"github.com/nspcc-dev/neo-go/pkg/network/payload"
"github.com/nspcc-dev/neo-go/pkg/services/blockfetcher"
"github.com/nspcc-dev/neo-go/pkg/util"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -77,6 +79,7 @@ type (
RegisterPostBlock(f func(func(*transaction.Transaction, *mempool.Pool, bool) bool, *mempool.Pool, *block.Block))
SubscribeForBlocks(ch chan *block.Block)
UnsubscribeFromBlocks(ch chan *block.Block)
LastBatch() *storage.MemBatch
}

// Service is a service abstraction (oracle, state root, consensus, etc).
Expand Down Expand Up @@ -107,6 +110,7 @@ type (
notaryRequestPool *mempool.Pool
extensiblePool *extpool.Pool
notaryFeer NotaryFeer
blockFetcher *blockfetcher.Service

serviceLock sync.RWMutex
services map[string]Service
Expand Down Expand Up @@ -220,6 +224,8 @@ func newServerFromConstructors(config ServerConfig, chain Ledger, stSync StateSy

s.bSyncQueue = bqueue.New(s.stateSync, log, nil, updateBlockQueueLenMetric)

s.blockFetcher = blockfetcher.New(chain, s.NeoFSCfg, log)

if s.MinPeers < 0 {
s.log.Info("bad MinPeers configured, using the default value",
zap.Int("configured", s.MinPeers),
Expand Down Expand Up @@ -295,6 +301,15 @@ func (s *Server) Start() {
go s.relayBlocksLoop()
go s.bQueue.Run()
go s.bSyncQueue.Run()
if s.ServerConfig.NeoFSCfg.Restore {
done := make(chan struct{})
s.blockFetcher.Start(
func() {
s.log.Info("BlockFetcher service finished")
close(done)
})
<-done

Check warning on line 311 in pkg/network/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/network/server.go#L305-L311

Added lines #L305 - L311 were not covered by tests
}
for _, tr := range s.transports {
go tr.Accept()
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/network/server_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ type (

// BroadcastFactor is the factor (0-100) for fan-out optimization.
BroadcastFactor int

// NeoFSCfg is NeoFS configuration.
NeoFSCfg config.NeoFS
}
)

Expand Down Expand Up @@ -107,6 +110,7 @@ func NewServerConfig(cfg config.Config) (ServerConfig, error) {
StateRootCfg: appConfig.StateRoot,
ExtensiblePoolSize: appConfig.P2P.ExtensiblePoolSize,
BroadcastFactor: appConfig.P2P.BroadcastFactor,
NeoFSCfg: appConfig.NeoFS,
}
return c, nil
}
214 changes: 214 additions & 0 deletions pkg/services/blockfetcher/blockfetcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
package blockfetcher

import (
"context"
"encoding/json"
"fmt"
"io"
"net/url"
"time"

"github.com/nspcc-dev/neo-go/pkg/config"
"github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/core/chaindump"
"github.com/nspcc-dev/neo-go/pkg/core/storage"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
gio "github.com/nspcc-dev/neo-go/pkg/io"
"github.com/nspcc-dev/neo-go/pkg/services/oracle/neofs"
"github.com/nspcc-dev/neo-go/pkg/util"
"github.com/nspcc-dev/neofs-sdk-go/client"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
"github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"go.uber.org/zap"
)

// Ledger is an interface to Blockchain sufficient for Service.
type Ledger interface {
LastBatch() *storage.MemBatch
AddBlock(block *block.Block) error
GetBlock(hash util.Uint256) (*block.Block, error)
GetConfig() config.Blockchain
GetHeaderHash(u uint32) util.Uint256
}

type Service struct {
chain Ledger
log *zap.Logger
client *client.Client
quit chan bool
containerID cid.ID
Timeout time.Duration
Nodes []string
dumpDir string
}

// ConfigObject represents the configuration object in NeoFS.
type ConfigObject struct {
HashOIDs []string `json:"hash_oid"`
BlockOIDs []string `json:"block_oid"`
Height uint32 `json:"height"`
Timestamp int64 `json:"timestamp"`
Step uint32 `json:"step"`
}

// New creates a new BlockFetcherService.
func New(chain Ledger, cfg config.NeoFS, logger *zap.Logger) *Service {
neofsClient, err := client.New(client.PrmInit{})
if err != nil {
logger.Error("Failed to create NeoFS client", zap.Error(err))
return nil

Check warning on line 60 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L59-L60

Added lines #L59 - L60 were not covered by tests
}
var containerID cid.ID
err = containerID.DecodeString(cfg.ContainerID)
if err != nil {
logger.Error("Failed to decode container ID", zap.Error(err))
return nil
}
return &Service{
chain: chain,
log: logger,
client: neofsClient,
quit: make(chan bool),
dumpDir: cfg.DumpDir,
containerID: containerID,
Nodes: cfg.Nodes,
Timeout: cfg.Timeout,
}
}

// Name implements the core.Service interface.
func (bfs *Service) Name() string {
return "BlockFetcherService"

Check warning on line 82 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L81-L82

Added lines #L81 - L82 were not covered by tests
}

// Start implements the core.Service interface.
func (bfs *Service) Start(done func()) {
bfs.log.Info("Starting Block Fetcher Service")
go func() {
err := bfs.fetchData()
if err != nil {
close(bfs.quit)
return

Check warning on line 92 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L86-L92

Added lines #L86 - L92 were not covered by tests
}
done()

Check warning on line 94 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L94

Added line #L94 was not covered by tests
}()
}

// Shutdown implements the core.Service interface.
func (bfs *Service) Shutdown() {
bfs.log.Info("Shutting down Block Fetcher Service")
close(bfs.quit)

Check warning on line 101 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L99-L101

Added lines #L99 - L101 were not covered by tests
}

func (bfs *Service) fetchData() error {
select {
case <-bfs.quit:
return nil
default:
prm := client.PrmObjectSearch{}
filters := object.NewSearchFilters()
filters.AddFilter("type", "config", object.MatchStringEqual)
prm.SetFilters(filters)

Check warning on line 112 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L104-L112

Added lines #L104 - L112 were not covered by tests

configOid, err := bfs.search(prm)
if err != nil {
bfs.log.Error("Failed to fetch object IDs", zap.Error(err))
return err

Check warning on line 117 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L114-L117

Added lines #L114 - L117 were not covered by tests
}
cfg, err := bfs.get(configOid[0].String())
if err != nil {
bfs.log.Error("Failed to fetch config object", zap.Error(err))
return err

Check warning on line 122 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L119-L122

Added lines #L119 - L122 were not covered by tests
}
var configObj ConfigObject
err = json.Unmarshal(cfg, &configObj)
if err != nil {
bfs.log.Error("Failed to unmarshal configuration data", zap.Error(err))
return err

Check warning on line 128 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L124-L128

Added lines #L124 - L128 were not covered by tests
}

for _, HashOID := range configObj.HashOIDs {
_, err = bfs.get(HashOID)
if err != nil {
bfs.log.Error("Failed to fetch hash", zap.Error(err))
return err

Check warning on line 135 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L131-L135

Added lines #L131 - L135 were not covered by tests
}
}

for _, blockOID := range configObj.BlockOIDs {
data, err := bfs.get(blockOID)
if err != nil {
bfs.log.Error(fmt.Sprintf("Failed to fetch block %s", blockOID), zap.Error(err))
return err

Check warning on line 143 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L139-L143

Added lines #L139 - L143 were not covered by tests
}
err = bfs.ProcessBlock(data, configObj.Step)
if err != nil {
bfs.log.Error(fmt.Sprintf("Failed to process block %s", blockOID), zap.Error(err))
return err

Check warning on line 148 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L145-L148

Added lines #L145 - L148 were not covered by tests
}
}
}
close(bfs.quit)
return nil

Check warning on line 153 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L152-L153

Added lines #L152 - L153 were not covered by tests
}

func (bfs *Service) ProcessBlock(data []byte, count uint32) error {
br := gio.NewBinReaderFromBuf(data)
dump := NewDump()
var lastIndex uint32

err := chaindump.Restore(bfs.chain, br, 0, count, func(b *block.Block) error {
batch := bfs.chain.LastBatch()
if batch != nil {
dump.Add(b.Index, batch)
lastIndex = b.Index
if b.Index%1000 == 0 {
if err := dump.TryPersist(bfs.dumpDir, lastIndex); err != nil {
return fmt.Errorf("can't dump storage to file: %w", err)

Check warning on line 168 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L164-L168

Added lines #L164 - L168 were not covered by tests
}
}
}
return nil
})
if err != nil {
return fmt.Errorf("failed to restore blocks: %w", err)

Check warning on line 175 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L175

Added line #L175 was not covered by tests
}

if err = dump.TryPersist(bfs.dumpDir, lastIndex); err != nil {
return fmt.Errorf("final persistence failed: %w", err)

Check warning on line 179 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L179

Added line #L179 was not covered by tests
}
return nil
}

func (bfs *Service) get(oid string) ([]byte, error) {
privateKey, err := keys.NewPrivateKey()
if err != nil {
return nil, err

Check warning on line 187 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L184-L187

Added lines #L184 - L187 were not covered by tests
}
ctx, cancel := context.WithTimeout(context.Background(), bfs.Timeout)
defer cancel()
u, err := url.Parse(fmt.Sprintf("neofs:%s/%s", bfs.containerID, oid))
if err != nil {
return nil, err

Check warning on line 193 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L189-L193

Added lines #L189 - L193 were not covered by tests
}
rc, err := neofs.GetWithClient(ctx, bfs.client, privateKey, u, bfs.Nodes[0])
if err != nil {
return nil, err

Check warning on line 197 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L195-L197

Added lines #L195 - L197 were not covered by tests
}
data, err := io.ReadAll(rc)
if err != nil {
return nil, err

Check warning on line 201 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L199-L201

Added lines #L199 - L201 were not covered by tests
}
return data, nil

Check warning on line 203 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L203

Added line #L203 was not covered by tests
}

func (bfs *Service) search(prm client.PrmObjectSearch) ([]oid.ID, error) {
privateKey, err := keys.NewPrivateKey()
if err != nil {
return nil, err

Check warning on line 209 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L206-L209

Added lines #L206 - L209 were not covered by tests
}
ctx, cancel := context.WithTimeout(context.Background(), bfs.Timeout)
defer cancel()
return neofs.ObjectSearch(ctx, bfs.client, privateKey, bfs.containerID, bfs.Nodes[0], prm)

Check warning on line 213 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L211-L213

Added lines #L211 - L213 were not covered by tests
}
Loading

0 comments on commit 616393d

Please sign in to comment.