Skip to content

Commit

Permalink
services: add new service for fetching blocks from NeoFS
Browse files Browse the repository at this point in the history
Close #3496

Signed-off-by: Ekaterina Pavlova <[email protected]>
  • Loading branch information
AliceInHunterland committed Jul 31, 2024
1 parent d8e3e57 commit 34f4cf4
Show file tree
Hide file tree
Showing 10 changed files with 359 additions and 21 deletions.
14 changes: 7 additions & 7 deletions cli/server/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,19 @@ import (
"github.com/nspcc-dev/neo-go/pkg/core/storage/dboper"
)

type dump []blockDump
type Dump []blockDump

type blockDump struct {
Block uint32 `json:"block"`
Size int `json:"size"`
Storage []dboper.Operation `json:"storage"`
}

func newDump() *dump {
return new(dump)
func NewDump() *Dump {
return new(Dump)
}

func (d *dump) add(index uint32, batch *storage.MemBatch) {
func (d *Dump) Add(index uint32, batch *storage.MemBatch) {
ops := storage.BatchToOperations(batch)
*d = append(*d, blockDump{
Block: index,
Expand All @@ -31,7 +31,7 @@ func (d *dump) add(index uint32, batch *storage.MemBatch) {
})
}

func (d *dump) tryPersist(prefix string, index uint32) error {
func (d *Dump) TryPersist(prefix string, index uint32) error {
if len(*d) == 0 {
return nil
}
Expand Down Expand Up @@ -62,12 +62,12 @@ func (d *dump) tryPersist(prefix string, index uint32) error {
return nil
}

func readFile(path string) (*dump, error) {
func readFile(path string) (*Dump, error) {
data, err := os.ReadFile(path)
if err != nil {
return nil, err
}
d := newDump()
d := NewDump()
if err := json.Unmarshal(data, d); err != nil {
return nil, err
}
Expand Down
8 changes: 4 additions & 4 deletions cli/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,9 +287,9 @@ func restoreDB(ctx *cli.Context) error {

gctx := newGraceContext()
var lastIndex uint32
dump := newDump()
dump := NewDump()
defer func() {
_ = dump.tryPersist(dumpDir, lastIndex)
_ = dump.TryPersist(dumpDir, lastIndex)
}()

var f = func(b *block.Block) error {
Expand All @@ -312,10 +312,10 @@ func restoreDB(ctx *cli.Context) error {
if batch == nil && b.Index == 0 {
return nil
}
dump.add(b.Index, batch)
dump.Add(b.Index, batch)
lastIndex = b.Index
if b.Index%1000 == 0 {
if err := dump.tryPersist(dumpDir, b.Index); err != nil {
if err := dump.TryPersist(dumpDir, b.Index); err != nil {
return fmt.Errorf("can't dump storage to file: %w", err)
}
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/config/oracle_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ type OracleConfiguration struct {

// NeoFSConfiguration is a config for the NeoFS service.
type NeoFSConfiguration struct {
Nodes []string `yaml:"Nodes"`
Timeout time.Duration `yaml:"Timeout"`
Nodes []string `yaml:"Nodes"`
Timeout time.Duration `yaml:"Timeout"`
ContainerID string `yaml:"ContainerID"`
Restore bool `yaml:"Restore"`
}
11 changes: 11 additions & 0 deletions pkg/core/storage/dbconfig/store_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,14 @@ type (
ReadOnly bool `yaml:"ReadOnly"`
}
)

// FilePath returns the file path for the DB. In case "inmemory" DB is used, it returns an empty string.
func (db DBConfiguration) FilePath() string {
switch db.Type {
case "boltdb":
return db.BoltDBOptions.FilePath
case "leveldb":
return db.LevelDBOptions.DataDirectoryPath
}
return ""
}
5 changes: 5 additions & 0 deletions pkg/network/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/nspcc-dev/neo-go/pkg/network/capability"
"github.com/nspcc-dev/neo-go/pkg/network/extpool"
"github.com/nspcc-dev/neo-go/pkg/network/payload"
"github.com/nspcc-dev/neo-go/pkg/services/blockfetcher"
"github.com/nspcc-dev/neo-go/pkg/util"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -107,6 +108,7 @@ type (
notaryRequestPool *mempool.Pool
extensiblePool *extpool.Pool
notaryFeer NotaryFeer
blockFetcher *blockfetcher.Service

serviceLock sync.RWMutex
services map[string]Service
Expand Down Expand Up @@ -295,6 +297,9 @@ func (s *Server) Start() {
go s.relayBlocksLoop()
go s.bQueue.Run()
go s.bSyncQueue.Run()
if s.OracleCfg.NeoFS.Restore {
s.blockFetcher.Start()
}
for _, tr := range s.transports {
go tr.Accept()
}
Expand Down
203 changes: 203 additions & 0 deletions pkg/services/blockfetcher/blockfetcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
package blockfetcher

import (
"context"
"encoding/json"
"fmt"
"io"
"net/url"
"time"

cli "github.com/nspcc-dev/neo-go/cli/server"
"github.com/nspcc-dev/neo-go/pkg/core"
"github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/core/chaindump"
"github.com/nspcc-dev/neo-go/pkg/core/storage"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
gio "github.com/nspcc-dev/neo-go/pkg/io"
"github.com/nspcc-dev/neo-go/pkg/services/oracle/neofs"
"github.com/nspcc-dev/neofs-sdk-go/client"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
"github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"go.uber.org/zap"
)

type Service struct {
chain *core.Blockchain
log *zap.Logger
client *client.Client
db storage.Store
quit chan bool
containerID cid.ID
Timeout time.Duration
Nodes []string
dumpDir string
}

// ConfigObject represents the configuration object in NeoFS.
type ConfigObject struct {
HashOID string `json:"hash_oid"`
BlockOIDs []string `json:"block_oid"`
Height uint32 `json:"height"`
Timestamp int64 `json:"timestamp"`
Step uint32 `json:"step"`
}

// Config is the configuration of the BlockFetcherService.
type Config struct {
ContainerID string
Nodes []string
Timeout time.Duration
DumpDir string
}

// New creates a new BlockFetcherService.
func New(chain *core.Blockchain, store storage.Store, cfg Config, logger *zap.Logger) *Service {
neofsClient, err := client.New(client.PrmInit{})
if err != nil {
logger.Error("Failed to create NeoFS client", zap.Error(err))
return nil
}
var containerID cid.ID
err = containerID.DecodeString(cfg.ContainerID)
if err != nil {
logger.Error("Failed to decode container ID", zap.Error(err))
return nil
}
return &Service{
chain: chain,
log: logger,
client: neofsClient,
db: store,
quit: make(chan bool),
dumpDir: cfg.DumpDir,
containerID: containerID,
Nodes: cfg.Nodes,
Timeout: cfg.Timeout,
}
}

// Name implements the core.Service interface.
func (bfs *Service) Name() string {
return "BlockFetcherService"
}

// Start implements the core.Service interface.
func (bfs *Service) Start() {
bfs.log.Info("Starting Block Fetcher Service")
err := bfs.fetchData()
if err != nil {
close(bfs.quit)
return
}
}

// Shutdown implements the core.Service interface.
func (bfs *Service) Shutdown() {
bfs.log.Info("Shutting down Block Fetcher Service")
close(bfs.quit)
}

func (bfs *Service) fetchData() error {
for {
select {
case <-bfs.quit:
return nil
default:
prm := client.PrmObjectSearch{}
filters := object.NewSearchFilters()
filters.AddFilter("type", "config", object.MatchStringEqual)
prm.SetFilters(filters)

configOid, err := bfs.search(prm)
if err != nil {
bfs.log.Error("Failed to fetch object IDs", zap.Error(err))
return err
}
cfg, err := bfs.get(configOid[0].String())
var configObj ConfigObject
err = json.Unmarshal(cfg, &configObj)
if err != nil {
bfs.log.Error("Failed to unmarshal configuration data", zap.Error(err))
return err
}
_, err = bfs.get(configObj.HashOID)
if err != nil {
bfs.log.Error("Failed to fetch hash", zap.Error(err))
return err
}
for _, blockOID := range configObj.BlockOIDs {
data, err := bfs.get(blockOID)
if err != nil {
bfs.log.Error(fmt.Sprintf("Failed to fetch block %s", blockOID), zap.Error(err))
return err
}
err = bfs.processBlock(data, configObj.Step)
if err != nil {
bfs.log.Error(fmt.Sprintf("Failed to process block %s", blockOID), zap.Error(err))
return err
}
}
}
close(bfs.quit)
return nil
}
}

func (bfs *Service) processBlock(data []byte, count uint32) error {
br := gio.NewBinReaderFromBuf(data)
dump := cli.NewDump()
var lastIndex uint32

err := chaindump.Restore(bfs.chain, br, 0, count, func(b *block.Block) error {
batch := bfs.chain.LastBatch()
if batch != nil {
dump.Add(b.Index, batch)
lastIndex = b.Index
if b.Index%1000 == 0 {
if err := dump.TryPersist(bfs.dumpDir, lastIndex); err != nil {
return fmt.Errorf("can't dump storage to file: %w", err)
}
}
}
return nil
})
if err != nil {
return fmt.Errorf("failed to restore blocks: %w", err)
}

if err = dump.TryPersist(bfs.dumpDir, lastIndex); err != nil {
return fmt.Errorf("final persistence failed: %w", err)
}
return nil
}

func (bfs *Service) get(oid string) ([]byte, error) {
privateKey, err := keys.NewPrivateKey()
ctx, cancel := context.WithTimeout(context.Background(), bfs.Timeout)
defer cancel()
u, err := url.Parse(fmt.Sprintf("neofs:%s/%s", bfs.containerID, oid))
if err != nil {
return nil, err
}
rc, err := neofs.Get(ctx, bfs.client, privateKey, u, bfs.Nodes[0])
if err != nil {
return nil, err
}
data, err := io.ReadAll(rc)
if err != nil {
return nil, err
}
return data, nil
}

func (bfs *Service) search(prm client.PrmObjectSearch) ([]oid.ID, error) {
privateKey, err := keys.NewPrivateKey()
if err != nil {
return nil, err
}
ctx, cancel := context.WithTimeout(context.Background(), bfs.Timeout)
defer cancel()
return neofs.ObjectSearch(ctx, bfs.client, privateKey, bfs.containerID, bfs.Nodes[0], prm)
}
63 changes: 63 additions & 0 deletions pkg/services/blockfetcher/blockfetcher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package blockfetcher

import (
"fmt"
"testing"

"github.com/nspcc-dev/neo-go/internal/basicchain"
"github.com/nspcc-dev/neo-go/pkg/config"
"github.com/nspcc-dev/neo-go/pkg/core/chaindump"
"github.com/nspcc-dev/neo-go/pkg/core/storage"
gio "github.com/nspcc-dev/neo-go/pkg/io"
"github.com/nspcc-dev/neo-go/pkg/neotest"
"github.com/nspcc-dev/neo-go/pkg/neotest/chain"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
)

func TestProcessBlock(t *testing.T) {
bc, validators, committee := chain.NewMultiWithCustomConfig(t,
func(c *config.Blockchain) {
c.P2PSigExtensions = true
})
e := neotest.NewExecutor(t, bc, validators, committee)

basicchain.Init(t, "../../../", e)
require.True(t, bc.BlockHeight() > 5)

w := gio.NewBufBinWriter()
require.NoError(t, chaindump.Dump(bc, w.BinWriter, 0, bc.BlockHeight()+1))
require.NoError(t, w.Err)
buf := w.Bytes()
bc2, _, _ := chain.NewMultiWithCustomConfig(t, func(c *config.Blockchain) {
c.P2PSigExtensions = true
})
cfg := Config{
ContainerID: "9iVfUg8aDHKjPC4LhQXEkVUM4HDkR7UCXYLs8NQwYfSG",
Nodes: []string{"https://st1.t5.fs.neo.org:8080"},
Timeout: 10,
DumpDir: "./",
}
serv := New(bc2, storage.NewMemoryStore(), cfg, zaptest.NewLogger(t))
err := serv.processBlock(buf, bc.BlockHeight()+1)
require.NoError(t, err)
require.Equal(t, bc.BlockHeight(), bc2.BlockHeight())
}

func TestService(t *testing.T) {
bc, _, _ := chain.NewMultiWithCustomConfig(t,
func(c *config.Blockchain) {
c.P2PSigExtensions = true
})
cfg := Config{
ContainerID: "9iVfUg8aDHKjPC4LhQXEkVUM4HDkR7UCXYLs8NQwYfSG",
Nodes: []string{"https://st1.t5.fs.neo.org:8080"},
Timeout: 10,
DumpDir: "./",
}
serv := New(bc, storage.NewMemoryStore(), cfg, zaptest.NewLogger(t))
require.NotNil(t, serv)
require.Equal(t, "BlockFetcherService", serv.Name())
serv.Start()
fmt.Println(bc.BlockHeight())
}
Loading

0 comments on commit 34f4cf4

Please sign in to comment.