Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Headers fetching via NeoFS BlockFetcher service #3789

Merged
merged 3 commits into from
Feb 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions internal/fakechain/fakechain.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ type FakeStateSync struct {
AddMPTNodesFunc func(nodes [][]byte) error
}

// HeaderHeight returns the height of the latest stored header.
func (s *FakeStateSync) HeaderHeight() uint32 {
roman-khimov marked this conversation as resolved.
Show resolved Hide resolved
return 0
}

// NewFakeChain returns a new FakeChain structure.
func NewFakeChain() *FakeChain {
return NewFakeChainWithCustomCfg(nil)
Expand Down Expand Up @@ -447,6 +452,9 @@ func (s *FakeStateSync) Init(currChainHeight uint32) error {
// NeedHeaders implements the StateSync interface.
func (s *FakeStateSync) NeedHeaders() bool { return s.RequestHeaders.Load() }

// NeedBlocks implements the StateSync interface.
func (s *FakeStateSync) NeedBlocks() bool { return false }

// NeedMPTNodes implements the StateSync interface.
func (s *FakeStateSync) NeedMPTNodes() bool {
panic("TODO")
Expand All @@ -464,3 +472,13 @@ func (s *FakeStateSync) Traverse(root util.Uint256, process func(node mpt.Node,
func (s *FakeStateSync) GetUnknownMPTNodesBatch(limit int) []util.Uint256 {
panic("TODO")
}

// GetConfig implements the StateSync interface.
func (s *FakeStateSync) GetConfig() config.Blockchain {
panic("TODO")
}

// SetOnStageChanged implements the StateSync interface.
func (s *FakeStateSync) SetOnStageChanged(func()) {
panic("TODO")
}
1 change: 1 addition & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func (c Config) Blockchain() Blockchain {
return Blockchain{
ProtocolConfiguration: c.ProtocolConfiguration,
Ledger: c.ApplicationConfiguration.Ledger,
NeoFSBlockFetcher: c.ApplicationConfiguration.NeoFSBlockFetcher,
}
}

Expand Down
1 change: 1 addition & 0 deletions pkg/config/ledger_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,5 @@ type Ledger struct {
type Blockchain struct {
ProtocolConfiguration
Ledger
NeoFSBlockFetcher
}
2 changes: 2 additions & 0 deletions pkg/config/protocol_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ type (
P2PSigExtensions bool `yaml:"P2PSigExtensions"`
// P2PStateExchangeExtensions enables additional P2P MPT state data exchange logic.
P2PStateExchangeExtensions bool `yaml:"P2PStateExchangeExtensions"`
// NeoFSStateSyncExtensions enables state data exchange logic via NeoFS.
NeoFSStateSyncExtensions bool `yaml:"NeoFSStateSyncExtensions"`
// ReservedAttributes allows to have reserved attributes range for experimental or private purposes.
ReservedAttributes bool `yaml:"ReservedAttributes"`

Expand Down
4 changes: 2 additions & 2 deletions pkg/consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ type Ledger interface {

// BlockQueuer is an interface to the block queue manager sufficient for Service.
type BlockQueuer interface {
PutBlock(block *coreb.Block) error
Put(queueable *coreb.Block) error
}

// Service represents a consensus instance.
Expand Down Expand Up @@ -623,7 +623,7 @@ func (s *service) processBlock(b dbft.Block[util.Uint256]) error {
bb := &b.(*neoBlock).Block
bb.Script = *(s.getBlockWitness(bb))

if err := s.BlockQueue.PutBlock(bb); err != nil {
if err := s.BlockQueue.Put(bb); err != nil {
// The block might already be added via the regular network
// interaction.
if _, errget := s.Chain.GetBlock(bb.Hash()); errget != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/consensus/consensus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,7 @@ type testBlockQueuer struct {
var _ = BlockQueuer(testBlockQueuer{})

// PutBlock implements BlockQueuer interface.
func (bq testBlockQueuer) PutBlock(b *coreb.Block) error {
func (bq testBlockQueuer) Put(b *coreb.Block) error {
return bq.bc.AddBlock(b)
}

Expand Down
7 changes: 7 additions & 0 deletions pkg/core/block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,13 @@ type auxBlockIn struct {
Transactions []json.RawMessage `json:"tx"`
}

// GetIndex returns the index of the block. This method should be used
// for interfaces only. As generics don't support structural types
// ref. golang/go#51259.
func (b *Block) GetIndex() uint32 {
return b.Index
}

// ComputeMerkleRoot computes Merkle tree root hash based on actual block's data.
func (b *Block) ComputeMerkleRoot() util.Uint256 {
hashes := make([]util.Uint256, len(b.Transactions))
Expand Down
28 changes: 28 additions & 0 deletions pkg/core/block/header.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
"github.com/nspcc-dev/neo-go/pkg/crypto/hash"
"github.com/nspcc-dev/neo-go/pkg/encoding/address"
"github.com/nspcc-dev/neo-go/pkg/io"
"github.com/nspcc-dev/neo-go/pkg/smartcontract"
"github.com/nspcc-dev/neo-go/pkg/util"
)

Expand Down Expand Up @@ -80,6 +81,13 @@
Witnesses []transaction.Witness `json:"witnesses"`
}

// GetIndex returns the index of the block. This method should be used
// for interfaces only. As generics don't support structural types
// ref. golang/go#51259.
func (b *Header) GetIndex() uint32 {
return b.Index

Check warning on line 88 in pkg/core/block/header.go

View check run for this annotation

Codecov / codecov/patch

pkg/core/block/header.go#L87-L88

Added lines #L87 - L88 were not covered by tests
}

// Hash returns the hash of the block. Notice that it is cached internally,
// so no matter how you change the [Header] after the first invocation of this
// method it won't change. To get an updated hash in case you're changing
Expand Down Expand Up @@ -228,3 +236,23 @@
}
return nil
}

// GetExpectedHeaderSize returns the expected Header size with the given number of validators.
func GetExpectedHeaderSize(stateRootInHeader bool, numOfValidators int) int {
roman-khimov marked this conversation as resolved.
Show resolved Hide resolved
m := smartcontract.GetDefaultHonestNodeCount(numOfValidators)
// expectedHeaderSizeWithEmptyWitness contains 2 bytes for zero-length (new(Header)).Script.Invocation/Verification
// InvocationScript:
// 64 is the size of the default signature length + 2 bytes length and opcode
// 2 = 1 push opcode + 1 length
// VerifcationScript:
// m = 1 bytes
// 33 = 1 push opcode + 1 length + 33 bytes for public key
// n = 1 bytes
// 5 for SYSCALL
size := expectedHeaderSizeWithEmptyWitness + (1+1+64)*m + 2 + numOfValidators*(1+1+33) + 2 + 5

if stateRootInHeader {
size += util.Uint256Size
}

Check warning on line 256 in pkg/core/block/header.go

View check run for this annotation

Codecov / codecov/patch

pkg/core/block/header.go#L255-L256

Added lines #L255 - L256 were not covered by tests
return size
}
13 changes: 13 additions & 0 deletions pkg/core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,9 @@
log.Info("MaxValidUntilBlockIncrement is not set or wrong, using default value",
zap.Uint32("MaxValidUntilBlockIncrement", cfg.MaxValidUntilBlockIncrement))
}
if cfg.P2PStateExchangeExtensions && cfg.NeoFSStateSyncExtensions {
return nil, errors.New("P2PStateExchangeExtensions and NeoFSStateSyncExtensions cannot be enabled simultaneously")
roman-khimov marked this conversation as resolved.
Show resolved Hide resolved
}

Check warning on line 304 in pkg/core/blockchain.go

View check run for this annotation

Codecov / codecov/patch

pkg/core/blockchain.go#L303-L304

Added lines #L303 - L304 were not covered by tests
if cfg.P2PStateExchangeExtensions {
if !cfg.StateRootInHeader {
return nil, errors.New("P2PStatesExchangeExtensions are enabled, but StateRootInHeader is off")
Expand All @@ -312,6 +315,16 @@
zap.Int("StateSyncInterval", cfg.StateSyncInterval))
}
}
if cfg.NeoFSStateSyncExtensions {
if !cfg.NeoFSBlockFetcher.Enabled {
return nil, errors.New("NeoFSStateSyncExtensions are enabled, but NeoFSBlockFetcher is off")
}
if cfg.StateSyncInterval <= 0 {
cfg.StateSyncInterval = defaultStateSyncInterval
log.Info("StateSyncInterval is not set or wrong, using default value",
zap.Int("StateSyncInterval", cfg.StateSyncInterval))
}

Check warning on line 326 in pkg/core/blockchain.go

View check run for this annotation

Codecov / codecov/patch

pkg/core/blockchain.go#L319-L326

Added lines #L319 - L326 were not covered by tests
}
if cfg.RemoveUntraceableHeaders && !cfg.RemoveUntraceableBlocks {
return nil, errors.New("RemoveUntraceableHeaders is enabled, but RemoveUntraceableBlocks is not")
}
Expand Down
68 changes: 67 additions & 1 deletion pkg/core/statesync/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,15 @@
billet *mpt.Billet

jumpCallback func(p uint32) error

// stageChangedCallback is an optional callback that is triggered whenever
// the sync stage changes.
stageChangedCallback func()
}

// NewModule returns new instance of statesync module.
func NewModule(bc Ledger, stateMod *stateroot.Module, log *zap.Logger, s *dao.Simple, jumpCallback func(p uint32) error) *Module {
if !(bc.GetConfig().P2PStateExchangeExtensions && bc.GetConfig().Ledger.RemoveUntraceableBlocks) {
if !(bc.GetConfig().P2PStateExchangeExtensions && bc.GetConfig().Ledger.RemoveUntraceableBlocks) && !bc.GetConfig().NeoFSStateSyncExtensions {
return &Module{
dao: s,
bc: bc,
Expand All @@ -120,7 +124,13 @@
// Init initializes state sync module for the current chain's height with given
// callback for MPT nodes requests.
func (s *Module) Init(currChainHeight uint32) error {
oldStage := s.syncStage
s.lock.Lock()
defer func() {
if s.syncStage != oldStage {
s.notifyStageChanged()
}
}()
defer s.lock.Unlock()

roman-khimov marked this conversation as resolved.
Show resolved Hide resolved
if s.syncStage != none {
Expand Down Expand Up @@ -176,6 +186,20 @@
return s.defineSyncStage()
}

// SetOnStageChanged sets callback that is triggered whenever the sync stage changes.
func (s *Module) SetOnStageChanged(cb func()) {
s.lock.Lock()
defer s.lock.Unlock()
s.stageChangedCallback = cb

Check warning on line 193 in pkg/core/statesync/module.go

View check run for this annotation

Codecov / codecov/patch

pkg/core/statesync/module.go#L190-L193

Added lines #L190 - L193 were not covered by tests
}

// notifyStageChanged triggers stage change callback if it's set.
func (s *Module) notifyStageChanged() {
if s.stageChangedCallback != nil {
s.stageChangedCallback()
}

Check warning on line 200 in pkg/core/statesync/module.go

View check run for this annotation

Codecov / codecov/patch

pkg/core/statesync/module.go#L199-L200

Added lines #L199 - L200 were not covered by tests
}

// TemporaryPrefix accepts current storage prefix and returns prefix
// to use for storing intermediate items during synchronization.
func TemporaryPrefix(currPrefix storage.KeyPrefix) storage.KeyPrefix {
Expand Down Expand Up @@ -287,7 +311,13 @@

// AddHeaders validates and adds specified headers to the chain.
func (s *Module) AddHeaders(hdrs ...*block.Header) error {
oldStage := s.syncStage
s.lock.Lock()
defer func() {
if s.syncStage != oldStage {
s.notifyStageChanged()
}
}()
defer s.lock.Unlock()

if s.syncStage != initialized {
Expand All @@ -306,7 +336,13 @@

// AddBlock verifies and saves block skipping executable scripts.
func (s *Module) AddBlock(block *block.Block) error {
oldStage := s.syncStage
s.lock.Lock()
defer func() {
if s.syncStage != oldStage {
s.notifyStageChanged()
}
}()
defer s.lock.Unlock()

if s.syncStage&headersSynced == 0 || s.syncStage&blocksSynced != 0 {
Expand Down Expand Up @@ -359,7 +395,13 @@
// AddMPTNodes tries to add provided set of MPT nodes to the MPT billet if they are
// not yet collected.
func (s *Module) AddMPTNodes(nodes [][]byte) error {
oldStage := s.syncStage
s.lock.Lock()
defer func() {
if s.syncStage != oldStage {
s.notifyStageChanged()
}
}()
defer s.lock.Unlock()

if s.syncStage&headersSynced == 0 || s.syncStage&mptSynced != 0 {
Expand Down Expand Up @@ -425,6 +467,12 @@
// If so, then jumping to P state sync point occurs. It is not protected by lock, thus caller
// should take care of it.
func (s *Module) checkSyncIsCompleted() {
oldStage := s.syncStage
defer func() {
if s.syncStage != oldStage {
s.notifyStageChanged()
}
}()
if s.syncStage != headersSynced|mptSynced|blocksSynced {
return
}
Expand Down Expand Up @@ -484,6 +532,14 @@
return s.syncStage&headersSynced != 0 && s.syncStage&mptSynced == 0
}

// NeedBlocks returns whether the module hasn't completed blocks synchronisation.
func (s *Module) NeedBlocks() bool {
s.lock.RLock()
defer s.lock.RUnlock()

return s.syncStage&headersSynced != 0 && s.syncStage&blocksSynced == 0

Check warning on line 540 in pkg/core/statesync/module.go

View check run for this annotation

Codecov / codecov/patch

pkg/core/statesync/module.go#L536-L540

Added lines #L536 - L540 were not covered by tests
}

// Traverse traverses local MPT nodes starting from the specified root down to its
// children calling `process` for each serialised node until stop condition is satisfied.
func (s *Module) Traverse(root util.Uint256, process func(node mpt.Node, nodeBytes []byte) bool) error {
Expand All @@ -508,3 +564,13 @@

return s.mptpool.GetBatch(limit)
}

// HeaderHeight returns the height of the latest stored header.
func (s *Module) HeaderHeight() uint32 {
return s.bc.HeaderHeight()
}

// GetConfig returns current blockchain configuration.
func (s *Module) GetConfig() config.Blockchain {
return s.bc.GetConfig()

Check warning on line 575 in pkg/core/statesync/module.go

View check run for this annotation

Codecov / codecov/patch

pkg/core/statesync/module.go#L574-L575

Added lines #L574 - L575 were not covered by tests
}
Loading
Loading