Skip to content

Commit

Permalink
node/meta: catch meta notifications from chain
Browse files Browse the repository at this point in the history
Store caught information in MPT structures, one structure by one container.
Bbolt KV database is used as backend for MPT tries. Closes #3070.

Signed-off-by: Pavel Karpy <[email protected]>
  • Loading branch information
carpawell committed Jan 31, 2025
1 parent a0a4e2d commit 1510922
Show file tree
Hide file tree
Showing 9 changed files with 212 additions and 3 deletions.
30 changes: 27 additions & 3 deletions cmd/neofs-node/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"time"

"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/nspcc-dev/neo-go/pkg/rpcclient"
neogoutil "github.com/nspcc-dev/neo-go/pkg/util"
"github.com/nspcc-dev/neofs-node/cmd/neofs-node/config"
apiclientconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/apiclient"
Expand All @@ -24,6 +25,7 @@ import (
fstreeconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine/shard/blobstor/fstree"
fschainconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/fschain"
loggerconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/logger"
metaconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/meta"
nodeconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/node"
objectconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/object"
policerconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/policer"
Expand All @@ -44,6 +46,7 @@ import (
"github.com/nspcc-dev/neofs-node/pkg/network/cache"
"github.com/nspcc-dev/neofs-node/pkg/services/control"
controlSvc "github.com/nspcc-dev/neofs-node/pkg/services/control/server"
"github.com/nspcc-dev/neofs-node/pkg/services/meta"
getsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/get"
"github.com/nspcc-dev/neofs-node/pkg/services/policer"
"github.com/nspcc-dev/neofs-node/pkg/services/replicator"
Expand Down Expand Up @@ -91,6 +94,10 @@ type applicationConfiguration struct {
encoding string
}

metadata struct {
path string
}

engine struct {
errorThreshold uint32
shardPoolSize uint32
Expand Down Expand Up @@ -155,6 +162,10 @@ func (a *applicationConfiguration) readConfig(c *config.Config) error {
a.policer.replicationCooldown = policerconfig.ReplicationCooldown(c)
a.policer.objectBatchSize = policerconfig.ObjectBatchSize(c)

// Meta data

a.metadata.path = metaconfig.Path(c)

Check warning on line 168 in cmd/neofs-node/config.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/config.go#L165-L168

Added lines #L165 - L168 were not covered by tests
// Storage Engine

a.engine.errorThreshold = engineconfig.ShardErrorThreshold(c)
Expand Down Expand Up @@ -320,9 +331,10 @@ type basics struct {
key *keys.PrivateKey
binPublicKey []byte

cli *client.Client
nCli *nmClient.Client
cCli *cntClient.Client
wsCli *rpcclient.WSClient
cli *client.Client
nCli *nmClient.Client
cCli *cntClient.Client

ttl time.Duration

Expand Down Expand Up @@ -396,6 +408,7 @@ type cfg struct {
// configuration of the internal
// services
cfgGRPC cfgGRPC
cfgMeta cfgMeta
cfgMorph cfgMorph
cfgContainer cfgContainer
cfgNodeInfo cfgNodeInfo
Expand Down Expand Up @@ -427,6 +440,10 @@ type cfgGRPC struct {
servers []*grpc.Server
}

type cfgMeta struct {
cLister meta.ContainerLister
}

type cfgMorph struct {
client *client.Client

Expand Down Expand Up @@ -698,6 +715,13 @@ func initBasics(c *cfg, key *keys.PrivateKey, stateStorage *state.PersistentStor
fatalOnErr(err)
}

c.shared.wsCli, err = rpcclient.NewWS(c.ctx, addresses[0], rpcclient.WSOptions{
Options: rpcclient.Options{
DialTimeout: c.applicationConfiguration.fsChain.dialTimeout,
},
})
fatalOnErr(err)

Check warning on line 724 in cmd/neofs-node/config.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/config.go#L718-L724

Added lines #L718 - L724 were not covered by tests
lookupScriptHashesInNNS(cli, c.applicationConfiguration, &b)

nState := newNetworkState(c.log)
Expand Down
4 changes: 4 additions & 0 deletions cmd/neofs-node/config/internal/validate/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ type valideConfig struct {
ShutdownTimeout time.Duration `mapstructure:"shutdown_timeout"`
} `mapstructure:"prometheus"`

Meta struct {
Path string `mapstructure:"path"`
} `mapstructure:"metadata"`

Node struct {
Wallet struct {
Path string `mapstructure:"path"`
Expand Down
17 changes: 17 additions & 0 deletions cmd/neofs-node/config/meta/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package metaconfig

import (
"github.com/nspcc-dev/neofs-node/cmd/neofs-node/config"
)

const (
subsection = "metadata"
)

// Path returns the value of "path" config parameter
// from "metadata" section.
//
// Returns empty string if the value is missing or invalid.
func Path(c *config.Config) string {
return config.StringSafe(c.Sub(subsection), "path")
}
29 changes: 29 additions & 0 deletions cmd/neofs-node/config/meta/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package metaconfig_test

import (
"testing"

"github.com/nspcc-dev/neofs-node/cmd/neofs-node/config"
metaconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/meta"
configtest "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/test"
"github.com/stretchr/testify/require"
)

func TestLoggerSection_Level(t *testing.T) {
t.Run("defaults", func(t *testing.T) {
emptyConfig := configtest.EmptyConfig()
require.Equal(t, "", metaconfig.Path(emptyConfig))
})

const path = "../../../../config/example/node"

var fileConfigTest = func(c *config.Config) {
require.Equal(t, "path/to/meta", metaconfig.Path(c))
}

configtest.ForEachFileType(path, fileConfigTest)

t.Run("ENV", func(t *testing.T) {
configtest.ForEnvFileType(path, fileConfigTest)
})
}
1 change: 1 addition & 0 deletions cmd/neofs-node/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ func initApp(c *cfg) {
initAndLog(c, "session", initSessionService)
initAndLog(c, "reputation", initReputationService)
initAndLog(c, "object", initObjectService)
initAndLog(c, "meta", initMeta)

Check warning on line 139 in cmd/neofs-node/main.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/main.go#L139

Added line #L139 was not covered by tests
initAndLog(c, "tree", initTreeService)

initAndLog(c, "morph notifications", listenMorphNotifications)
Expand Down
125 changes: 125 additions & 0 deletions cmd/neofs-node/meta.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package main

import (
"bytes"
"context"
"fmt"
"slices"
"sync"

"github.com/nspcc-dev/neofs-node/pkg/core/container"
"github.com/nspcc-dev/neofs-node/pkg/core/netmap"
cntClient "github.com/nspcc-dev/neofs-node/pkg/morph/client/container"
"github.com/nspcc-dev/neofs-node/pkg/services/meta"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
netmapsdk "github.com/nspcc-dev/neofs-sdk-go/netmap"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)

func initMeta(c *cfg) {
if c.cfgMorph.client == nil {
initMorphComponents(c)
}

Check warning on line 23 in cmd/neofs-node/meta.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/meta.go#L20-L23

Added lines #L20 - L23 were not covered by tests

c.cfgMeta.cLister = &containerListener{
key: c.binPublicKey,
cnrClient: c.basics.cCli,
containers: c.cfgObject.cnrSource,
network: c.basics.netMapSource,
}

m, err := meta.New(c.log.With(zap.String("service", "meta data")), c.cfgMeta.cLister, c.basics.wsCli, c.basics.containerSH, c.basics.netmapSH, c.applicationConfiguration.metadata.path)
fatalOnErr(err)

c.workers = append(c.workers, newWorkerFromFunc(func(ctx context.Context) {
err = m.Run(ctx)
if err != nil {
c.internalErr <- fmt.Errorf("meta data service error: %w", err)
}

Check warning on line 39 in cmd/neofs-node/meta.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/meta.go#L25-L39

Added lines #L25 - L39 were not covered by tests
}))
}

type containerListener struct {
key []byte

cnrClient *cntClient.Client
containers container.Source
network netmap.Source

m sync.RWMutex
prevCnrs []cid.ID
prevNetMap *netmapsdk.NetMap
prevRes map[cid.ID]struct{}
}

func (c *containerListener) List() (map[cid.ID]struct{}, error) {
actualContainers, err := c.cnrClient.List(nil)
if err != nil {
return nil, fmt.Errorf("read containers: %w", err)
}
curEpoch, err := c.network.Epoch()
if err != nil {
return nil, fmt.Errorf("read current NeoFS epoch: %w", err)
}
networkMap, err := c.network.GetNetMapByEpoch(curEpoch)
if err != nil {
return nil, fmt.Errorf("read network map at the current epoch #%d: %w", curEpoch, err)
}

Check warning on line 68 in cmd/neofs-node/meta.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/meta.go#L56-L68

Added lines #L56 - L68 were not covered by tests

c.m.RLock()
cnrsSame := slices.EqualFunc(c.prevCnrs, actualContainers, func(cID1, cID2 cid.ID) bool {
return bytes.Equal(cID1[:], cID2[:])
})
netmapSame := slices.EqualFunc(c.prevNetMap.Nodes(), networkMap.Nodes(), func(n1 netmapsdk.NodeInfo, n2 netmapsdk.NodeInfo) bool {
return bytes.Equal(n1.PublicKey(), n2.PublicKey())
})
if cnrsSame && netmapSame && c.prevRes != nil {
c.m.RUnlock()
return c.prevRes, nil
}
c.m.RUnlock()

var locM sync.Mutex
res := make(map[cid.ID]struct{})
var wg errgroup.Group
for _, cID := range actualContainers {
wg.Go(func() error {
cnr, err := c.containers.Get(cID)
if err != nil {
return fmt.Errorf("read %s container: %w", cID, err)
}

Check warning on line 91 in cmd/neofs-node/meta.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/meta.go#L70-L91

Added lines #L70 - L91 were not covered by tests

nodeSets, err := networkMap.ContainerNodes(cnr.Value.PlacementPolicy(), cID)
if err != nil {
return fmt.Errorf("apply container storage policy to %s container: %w", cID, err)
}

Check warning on line 96 in cmd/neofs-node/meta.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/meta.go#L93-L96

Added lines #L93 - L96 were not covered by tests

for _, nodeSet := range nodeSets {
for _, node := range nodeSet {
if bytes.Equal(node.PublicKey(), c.key) {
locM.Lock()
res[cID] = struct{}{}
locM.Unlock()
return nil
}

Check warning on line 105 in cmd/neofs-node/meta.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/meta.go#L98-L105

Added lines #L98 - L105 were not covered by tests
}
}

return nil

Check warning on line 109 in cmd/neofs-node/meta.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/meta.go#L109

Added line #L109 was not covered by tests
})
}

err = wg.Wait()
if err != nil {
return nil, err
}

Check warning on line 116 in cmd/neofs-node/meta.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/meta.go#L113-L116

Added lines #L113 - L116 were not covered by tests

c.m.Lock()
c.prevCnrs = actualContainers
c.prevNetMap = networkMap
c.prevRes = res
c.m.Unlock()

return res, nil

Check warning on line 124 in cmd/neofs-node/meta.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/meta.go#L118-L124

Added lines #L118 - L124 were not covered by tests
}
3 changes: 3 additions & 0 deletions config/example/node.env
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ NEOFS_NODE_RELAY=true
NEOFS_NODE_PERSISTENT_SESSIONS_PATH=/sessions
NEOFS_NODE_PERSISTENT_STATE_PATH=/state

# Meta data section
NEOFS_METADATA_PATH=path/to/meta

# Tree service section
NEOFS_TREE_ENABLED=true
NEOFS_TREE_CACHE_SIZE=15
Expand Down
3 changes: 3 additions & 0 deletions config/example/node.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@
"path": "/state"
}
},
"metadata": {
"path": "path/to/meta"
},
"grpc": [
{
"endpoint": "s01.neofs.devenv:8080",
Expand Down
3 changes: 3 additions & 0 deletions config/example/node.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@ object:
put:
pool_size_remote: 100 # number of async workers for remote PUT operations

metadata:
path: path/to/meta # path to meta data storages, required

storage:
# note: shard configuration can be omitted for relay node (see `node.relay`)
shard_pool_size: 15 # size of per-shard worker pools used for PUT operations
Expand Down

0 comments on commit 1510922

Please sign in to comment.