Skip to content

Commit

Permalink
services: add new service for fetching blocks from NeoFS
Browse files Browse the repository at this point in the history
Close #3496

Signed-off-by: Ekaterina Pavlova <[email protected]>
  • Loading branch information
AliceInHunterland committed Aug 15, 2024
1 parent 7766168 commit 29559e3
Show file tree
Hide file tree
Showing 16 changed files with 1,043 additions and 78 deletions.
154 changes: 82 additions & 72 deletions cli/server/dump.go
Original file line number Diff line number Diff line change
@@ -1,102 +1,112 @@
package server

import (
"encoding/json"
"fmt"
"os"
"path/filepath"

"github.com/nspcc-dev/neo-go/pkg/core/storage"
"github.com/nspcc-dev/neo-go/pkg/core/storage/dboper"
"github.com/nspcc-dev/neo-go/cli/cmdargs"
"github.com/nspcc-dev/neo-go/cli/options"
"github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/io"
"github.com/urfave/cli/v2"
)

type dump []blockDump

type blockDump struct {
Block uint32 `json:"block"`
Size int `json:"size"`
Storage []dboper.Operation `json:"storage"`
}

func newDump() *dump {
return new(dump)
}

func (d *dump) add(index uint32, batch *storage.MemBatch) {
ops := storage.BatchToOperations(batch)
*d = append(*d, blockDump{
Block: index,
Size: len(ops),
Storage: ops,
})
}

func (d *dump) tryPersist(prefix string, index uint32) error {
if len(*d) == 0 {
return nil
func dumpBin(ctx *cli.Context) error {
var (
err error
)
if err := cmdargs.EnsureNone(ctx); err != nil {
return err

Check warning on line 20 in cli/server/dump.go

View check run for this annotation

Codecov / codecov/patch

cli/server/dump.go#L15-L20

Added lines #L15 - L20 were not covered by tests
}
path, err := getPath(prefix, index)
cfg, err := options.GetConfigFromContext(ctx)

Check warning on line 22 in cli/server/dump.go

View check run for this annotation

Codecov / codecov/patch

cli/server/dump.go#L22

Added line #L22 was not covered by tests
if err != nil {
return err
return cli.Exit(err, 1)

Check warning on line 24 in cli/server/dump.go

View check run for this annotation

Codecov / codecov/patch

cli/server/dump.go#L24

Added line #L24 was not covered by tests
}
log, _, logCloser, err := options.HandleLoggingParams(ctx.Bool("debug"), cfg.ApplicationConfiguration)
if err != nil {
return cli.Exit(err, 1)

Check warning on line 28 in cli/server/dump.go

View check run for this annotation

Codecov / codecov/patch

cli/server/dump.go#L26-L28

Added lines #L26 - L28 were not covered by tests
}
old, err := readFile(path)
if err == nil {
*old = append(*old, *d...)
} else {
old = d
if logCloser != nil {
defer func() { _ = logCloser() }()

Check warning on line 31 in cli/server/dump.go

View check run for this annotation

Codecov / codecov/patch

cli/server/dump.go#L30-L31

Added lines #L30 - L31 were not covered by tests
}
f, err := os.Create(path)
count := uint32(ctx.Uint("count"))
start := uint32(ctx.Uint("start"))

Check warning on line 34 in cli/server/dump.go

View check run for this annotation

Codecov / codecov/patch

cli/server/dump.go#L33-L34

Added lines #L33 - L34 were not covered by tests

chain, prometheus, pprof, err := initBCWithMetrics(cfg, log)

Check warning on line 36 in cli/server/dump.go

View check run for this annotation

Codecov / codecov/patch

cli/server/dump.go#L36

Added line #L36 was not covered by tests
if err != nil {
return err
}
defer f.Close()
defer func() {
pprof.ShutDown()
prometheus.ShutDown()
chain.Close()
}()

Check warning on line 44 in cli/server/dump.go

View check run for this annotation

Codecov / codecov/patch

cli/server/dump.go#L40-L44

Added lines #L40 - L44 were not covered by tests

enc := json.NewEncoder(f)
enc.SetIndent("", " ")
if err := enc.Encode(*old); err != nil {
return err
chainCount := chain.BlockHeight() + 1
if start+count > chainCount {
return cli.Exit(fmt.Errorf("chain is not that high (%d) to dump %d blocks starting from %d", chainCount-1, count, start), 1)

Check warning on line 48 in cli/server/dump.go

View check run for this annotation

Codecov / codecov/patch

cli/server/dump.go#L46-L48

Added lines #L46 - L48 were not covered by tests
}
if count == 0 {
count = chainCount - start

Check warning on line 51 in cli/server/dump.go

View check run for this annotation

Codecov / codecov/patch

cli/server/dump.go#L50-L51

Added lines #L50 - L51 were not covered by tests
}

*d = (*d)[:0]
testDir := "./test/"
if _, err = os.Stat(testDir); os.IsNotExist(err) {
if err = os.MkdirAll(testDir, 0755); err != nil {
return cli.Exit(fmt.Sprintf("failed to create directory %s: %v", testDir, err), 1)

Check warning on line 57 in cli/server/dump.go

View check run for this annotation

Codecov / codecov/patch

cli/server/dump.go#L54-L57

Added lines #L54 - L57 were not covered by tests
}
}

for i := start; i < start+count; i++ {
bh := chain.GetHeaderHash(i)
blk, err2 := chain.GetBlock(bh)
if err2 != nil {
return cli.Exit(fmt.Sprintf("failed to get block %d: %v", i, err), 1)

Check warning on line 65 in cli/server/dump.go

View check run for this annotation

Codecov / codecov/patch

cli/server/dump.go#L61-L65

Added lines #L61 - L65 were not covered by tests
}
filePath := filepath.Join(testDir, fmt.Sprintf("block-%d.bin", i))
if err = saveBlockToFile(blk, filePath); err != nil {
return cli.Exit(fmt.Sprintf("failed to save block %d to file: %v", i, err), 1)

Check warning on line 69 in cli/server/dump.go

View check run for this annotation

Codecov / codecov/patch

cli/server/dump.go#L67-L69

Added lines #L67 - L69 were not covered by tests
}
}

return nil
}

func readFile(path string) (*dump, error) {
data, err := os.ReadFile(path)
func saveBlockToFile(blk *block.Block, filePath string) error {
file, err := os.Create(filePath)

Check warning on line 77 in cli/server/dump.go

View check run for this annotation

Codecov / codecov/patch

cli/server/dump.go#L76-L77

Added lines #L76 - L77 were not covered by tests
if err != nil {
return nil, err
}
d := newDump()
if err := json.Unmarshal(data, d); err != nil {
return nil, err
return err

Check warning on line 79 in cli/server/dump.go

View check run for this annotation

Codecov / codecov/patch

cli/server/dump.go#L79

Added line #L79 was not covered by tests
}
return d, err
}
defer file.Close()

Check warning on line 81 in cli/server/dump.go

View check run for this annotation

Codecov / codecov/patch

cli/server/dump.go#L81

Added line #L81 was not covered by tests

// getPath returns filename for storing blocks up to index.
// Directory structure is the following:
// https://github.com/NeoResearch/neo-storage-audit#folder-organization-where-to-find-the-desired-block
// Dir `BlockStorage_$DIRNO` contains blocks up to $DIRNO (from $DIRNO-100k)
// Inside it there are files grouped by 1k blocks.
// File dump-block-$FILENO.json contains blocks from $FILENO-999, $FILENO
// Example: file `BlockStorage_100000/dump-block-6000.json` contains blocks from 5001 to 6000.
func getPath(prefix string, index uint32) (string, error) {
dirN := ((index + 99999) / 100000) * 100000
dir := fmt.Sprintf("BlockStorage_%d", dirN)
writer := io.NewBinWriterFromIO(file)

Check warning on line 83 in cli/server/dump.go

View check run for this annotation

Codecov / codecov/patch

cli/server/dump.go#L83

Added line #L83 was not covered by tests

path := filepath.Join(prefix, dir)
info, err := os.Stat(path)
if os.IsNotExist(err) {
err := os.MkdirAll(path, os.ModePerm)
if err != nil {
return "", err
}
} else if !info.IsDir() {
return "", fmt.Errorf("file `%s` is not a directory", path)
var buf = io.NewBufBinWriter()
blk.EncodeBinary(buf.BinWriter)
bytes := buf.Bytes()

Check warning on line 87 in cli/server/dump.go

View check run for this annotation

Codecov / codecov/patch

cli/server/dump.go#L85-L87

Added lines #L85 - L87 were not covered by tests

writer.WriteU32LE(uint32(len(bytes)))
blk.EncodeBinary(writer)
if writer.Err != nil {
return writer.Err

Check warning on line 92 in cli/server/dump.go

View check run for this annotation

Codecov / codecov/patch

cli/server/dump.go#L89-L92

Added lines #L89 - L92 were not covered by tests
}
//file.Close()

//file_1, err := os.Open(filePath)
//
//defer file_1.Close()
//
//br := io.NewBinReaderFromIO(file_1)
//
//blk_new := new(block.Block)
//blk_new.DecodeBinary(br)
//
//var size = br.ReadU32LE()
//
//buf := make([]byte, size)
//
//br.ReadBytes(buf)

fileN := ((index + 999) / 1000) * 1000
file := fmt.Sprintf("dump-block-%d.json", fileN)
return filepath.Join(path, file), nil
return nil

Check warning on line 111 in cli/server/dump.go

View check run for this annotation

Codecov / codecov/patch

cli/server/dump.go#L111

Added line #L111 was not covered by tests
}
16 changes: 12 additions & 4 deletions cli/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
"github.com/nspcc-dev/neo-go/pkg/io"
"github.com/nspcc-dev/neo-go/pkg/network"
"github.com/nspcc-dev/neo-go/pkg/services/blockfetcher"
"github.com/nspcc-dev/neo-go/pkg/services/metrics"
"github.com/nspcc-dev/neo-go/pkg/services/notary"
"github.com/nspcc-dev/neo-go/pkg/services/oracle"
Expand Down Expand Up @@ -105,6 +106,13 @@ func NewCommands() []*cli.Command {
Action: dumpDB,
Flags: cfgCountOutFlags,
},
{
Name: "dump-bin",
Usage: "Dump blocks (starting with the genesis or specified block) to the file",
UsageText: "neo-go db dump-bin [-o file] [-s start] [-c count] [--config-path path] [-p/-m/-t] [--config-file file]",
Action: dumpBin,
Flags: cfgCountOutFlags,
},
{
Name: "restore",
Usage: "Restore blocks from the file",
Expand Down Expand Up @@ -287,9 +295,9 @@ func restoreDB(ctx *cli.Context) error {

gctx := newGraceContext()
var lastIndex uint32
dump := newDump()
dump := blockfetcher.NewDump()
defer func() {
_ = dump.tryPersist(dumpDir, lastIndex)
_ = dump.TryPersist(dumpDir, lastIndex)
}()

var f = func(b *block.Block) error {
Expand All @@ -312,10 +320,10 @@ func restoreDB(ctx *cli.Context) error {
if batch == nil && b.Index == 0 {
return nil
}
dump.add(b.Index, batch)
dump.Add(b.Index, batch)
lastIndex = b.Index
if b.Index%1000 == 0 {
if err := dump.tryPersist(dumpDir, b.Index); err != nil {
if err := dump.TryPersist(dumpDir, b.Index); err != nil {
return fmt.Errorf("can't dump storage to file: %w", err)
}
}
Expand Down
7 changes: 7 additions & 0 deletions config/protocol.testnet.yml
Original file line number Diff line number Diff line change
Expand Up @@ -100,3 +100,10 @@ ApplicationConfiguration:
Enabled: false
Addresses:
- ":2113"
NeoFS:
Nodes:
- st1.t5.fs.neo.org:8080
Timeout: 100s
ContainerID: "9iVfUg8aDHKjPC4LhQXEkVUM4HDkR7UCXYLs8NQwYfSG"
DumpDir: "./chains"
Restore: true
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ require (
golang.org/x/term v0.18.0
golang.org/x/text v0.14.0
golang.org/x/tools v0.19.0
google.golang.org/grpc v1.62.0
gopkg.in/yaml.v3 v3.0.1
)

Expand Down Expand Up @@ -67,7 +68,6 @@ require (
golang.org/x/net v0.23.0 // indirect
golang.org/x/sys v0.18.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240221002015-b0ce06bbee7c // indirect
google.golang.org/grpc v1.62.0 // indirect
google.golang.org/protobuf v1.33.0 // indirect
rsc.io/tmplfunc v0.0.3 // indirect
)
6 changes: 6 additions & 0 deletions internal/fakechain/fakechain.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/nspcc-dev/neo-go/pkg/core/mpt"
"github.com/nspcc-dev/neo-go/pkg/core/native"
"github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/core/storage"
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
"github.com/nspcc-dev/neo-go/pkg/crypto/hash"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
Expand Down Expand Up @@ -405,6 +406,11 @@ func (chain *FakeChain) UnsubscribeFromTransactions(ch chan *transaction.Transac
panic("TODO")
}

// LastBatch returns last persisted storage batch.
func (chain *FakeChain) LastBatch() *storage.MemBatch {
panic("TODO")
}

// AddBlock implements the StateSync interface.
func (s *FakeStateSync) AddBlock(block *block.Block) error {
panic("TODO")
Expand Down
1 change: 1 addition & 0 deletions pkg/config/application_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type ApplicationConfiguration struct {
Oracle OracleConfiguration `yaml:"Oracle"`
P2PNotary P2PNotary `yaml:"P2PNotary"`
StateRoot StateRoot `yaml:"StateRoot"`
NeoFS NeoFS `yaml:"NeoFS"`
}

// EqualsButServices returns true when the o is the same as a except for services
Expand Down
14 changes: 14 additions & 0 deletions pkg/config/neofs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package config

import "time"

// NeoFS represents the configuration for the blockfetcher service.
type (
NeoFS struct {
Nodes []string `yaml:"Nodes"`
Timeout time.Duration `yaml:"Timeout"`
ContainerID string `yaml:"ContainerID"`
DumpDir string `yaml:"DumpDir"`
Restore bool `yaml:"Restore"`
}
)
15 changes: 15 additions & 0 deletions pkg/network/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@ import (
"github.com/nspcc-dev/neo-go/pkg/core/mempool"
"github.com/nspcc-dev/neo-go/pkg/core/mempoolevent"
"github.com/nspcc-dev/neo-go/pkg/core/mpt"
"github.com/nspcc-dev/neo-go/pkg/core/storage"
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
"github.com/nspcc-dev/neo-go/pkg/encoding/address"
"github.com/nspcc-dev/neo-go/pkg/io"
"github.com/nspcc-dev/neo-go/pkg/network/bqueue"
"github.com/nspcc-dev/neo-go/pkg/network/capability"
"github.com/nspcc-dev/neo-go/pkg/network/extpool"
"github.com/nspcc-dev/neo-go/pkg/network/payload"
"github.com/nspcc-dev/neo-go/pkg/services/blockfetcher"
"github.com/nspcc-dev/neo-go/pkg/util"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -77,6 +79,7 @@ type (
RegisterPostBlock(f func(func(*transaction.Transaction, *mempool.Pool, bool) bool, *mempool.Pool, *block.Block))
SubscribeForBlocks(ch chan *block.Block)
UnsubscribeFromBlocks(ch chan *block.Block)
LastBatch() *storage.MemBatch
}

// Service is a service abstraction (oracle, state root, consensus, etc).
Expand Down Expand Up @@ -107,6 +110,7 @@ type (
notaryRequestPool *mempool.Pool
extensiblePool *extpool.Pool
notaryFeer NotaryFeer
blockFetcher *blockfetcher.Service

serviceLock sync.RWMutex
services map[string]Service
Expand Down Expand Up @@ -220,6 +224,8 @@ func newServerFromConstructors(config ServerConfig, chain Ledger, stSync StateSy

s.bSyncQueue = bqueue.New(s.stateSync, log, nil, updateBlockQueueLenMetric)

s.blockFetcher = blockfetcher.New(chain, s.NeoFSCfg, log)

if s.MinPeers < 0 {
s.log.Info("bad MinPeers configured, using the default value",
zap.Int("configured", s.MinPeers),
Expand Down Expand Up @@ -295,6 +301,15 @@ func (s *Server) Start() {
go s.relayBlocksLoop()
go s.bQueue.Run()
go s.bSyncQueue.Run()
if s.ServerConfig.NeoFSCfg.Restore {
done := make(chan struct{})
s.blockFetcher.Start(
func() {
s.log.Info("BlockFetcher service finished")
close(done)
})
<-done

Check warning on line 311 in pkg/network/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/network/server.go#L305-L311

Added lines #L305 - L311 were not covered by tests
}
for _, tr := range s.transports {
go tr.Accept()
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/network/server_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ type (

// BroadcastFactor is the factor (0-100) for fan-out optimization.
BroadcastFactor int

// NeoFSCfg is NeoFS configuration.
NeoFSCfg config.NeoFS
}
)

Expand Down Expand Up @@ -107,6 +110,7 @@ func NewServerConfig(cfg config.Config) (ServerConfig, error) {
StateRootCfg: appConfig.StateRoot,
ExtensiblePoolSize: appConfig.P2P.ExtensiblePoolSize,
BroadcastFactor: appConfig.P2P.BroadcastFactor,
NeoFSCfg: appConfig.NeoFS,
}
return c, nil
}
Loading

0 comments on commit 29559e3

Please sign in to comment.