From 34f4cf48e2d1a9202543dcfa988d8f5d94e0b42a Mon Sep 17 00:00:00 2001 From: Ekaterina Pavlova Date: Wed, 31 Jul 2024 17:27:51 +0800 Subject: [PATCH] services: add new service for fetching blocks from NeoFS Close #3496 Signed-off-by: Ekaterina Pavlova --- cli/server/dump.go | 14 +- cli/server/server.go | 8 +- pkg/config/oracle_config.go | 6 +- pkg/core/storage/dbconfig/store_config.go | 11 + pkg/network/server.go | 5 + pkg/services/blockfetcher/blockfetcher.go | 203 ++++++++++++++++++ .../blockfetcher/blockfetcher_test.go | 63 ++++++ pkg/services/oracle/neofs/neofs.go | 39 +++- pkg/services/oracle/neofs/neofs_test.go | 24 +++ pkg/services/oracle/request.go | 7 +- 10 files changed, 359 insertions(+), 21 deletions(-) create mode 100644 pkg/services/blockfetcher/blockfetcher.go create mode 100644 pkg/services/blockfetcher/blockfetcher_test.go diff --git a/cli/server/dump.go b/cli/server/dump.go index d7b6a18034..6e3ee0ce6f 100644 --- a/cli/server/dump.go +++ b/cli/server/dump.go @@ -10,7 +10,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/core/storage/dboper" ) -type dump []blockDump +type Dump []blockDump type blockDump struct { Block uint32 `json:"block"` @@ -18,11 +18,11 @@ type blockDump struct { Storage []dboper.Operation `json:"storage"` } -func newDump() *dump { - return new(dump) +func NewDump() *Dump { + return new(Dump) } -func (d *dump) add(index uint32, batch *storage.MemBatch) { +func (d *Dump) Add(index uint32, batch *storage.MemBatch) { ops := storage.BatchToOperations(batch) *d = append(*d, blockDump{ Block: index, @@ -31,7 +31,7 @@ func (d *dump) add(index uint32, batch *storage.MemBatch) { }) } -func (d *dump) tryPersist(prefix string, index uint32) error { +func (d *Dump) TryPersist(prefix string, index uint32) error { if len(*d) == 0 { return nil } @@ -62,12 +62,12 @@ func (d *dump) tryPersist(prefix string, index uint32) error { return nil } -func readFile(path string) (*dump, error) { +func readFile(path string) (*Dump, error) { data, err := os.ReadFile(path) if err != nil { return nil, err } - d := newDump() + d := NewDump() if err := json.Unmarshal(data, d); err != nil { return nil, err } diff --git a/cli/server/server.go b/cli/server/server.go index 67787ced9b..a27d1f22e9 100644 --- a/cli/server/server.go +++ b/cli/server/server.go @@ -287,9 +287,9 @@ func restoreDB(ctx *cli.Context) error { gctx := newGraceContext() var lastIndex uint32 - dump := newDump() + dump := NewDump() defer func() { - _ = dump.tryPersist(dumpDir, lastIndex) + _ = dump.TryPersist(dumpDir, lastIndex) }() var f = func(b *block.Block) error { @@ -312,10 +312,10 @@ func restoreDB(ctx *cli.Context) error { if batch == nil && b.Index == 0 { return nil } - dump.add(b.Index, batch) + dump.Add(b.Index, batch) lastIndex = b.Index if b.Index%1000 == 0 { - if err := dump.tryPersist(dumpDir, b.Index); err != nil { + if err := dump.TryPersist(dumpDir, b.Index); err != nil { return fmt.Errorf("can't dump storage to file: %w", err) } } diff --git a/pkg/config/oracle_config.go b/pkg/config/oracle_config.go index 33c2e6136d..ea1ba90181 100644 --- a/pkg/config/oracle_config.go +++ b/pkg/config/oracle_config.go @@ -19,6 +19,8 @@ type OracleConfiguration struct { // NeoFSConfiguration is a config for the NeoFS service. type NeoFSConfiguration struct { - Nodes []string `yaml:"Nodes"` - Timeout time.Duration `yaml:"Timeout"` + Nodes []string `yaml:"Nodes"` + Timeout time.Duration `yaml:"Timeout"` + ContainerID string `yaml:"ContainerID"` + Restore bool `yaml:"Restore"` } diff --git a/pkg/core/storage/dbconfig/store_config.go b/pkg/core/storage/dbconfig/store_config.go index 3d23c63589..fd4108bebc 100644 --- a/pkg/core/storage/dbconfig/store_config.go +++ b/pkg/core/storage/dbconfig/store_config.go @@ -22,3 +22,14 @@ type ( ReadOnly bool `yaml:"ReadOnly"` } ) + +// FilePath returns the file path for the DB. In case "inmemory" DB is used, it returns an empty string. +func (db DBConfiguration) FilePath() string { + switch db.Type { + case "boltdb": + return db.BoltDBOptions.FilePath + case "leveldb": + return db.LevelDBOptions.DataDirectoryPath + } + return "" +} diff --git a/pkg/network/server.go b/pkg/network/server.go index 34da851b52..0855f64a16 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -28,6 +28,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/network/capability" "github.com/nspcc-dev/neo-go/pkg/network/extpool" "github.com/nspcc-dev/neo-go/pkg/network/payload" + "github.com/nspcc-dev/neo-go/pkg/services/blockfetcher" "github.com/nspcc-dev/neo-go/pkg/util" "go.uber.org/zap" ) @@ -107,6 +108,7 @@ type ( notaryRequestPool *mempool.Pool extensiblePool *extpool.Pool notaryFeer NotaryFeer + blockFetcher *blockfetcher.Service serviceLock sync.RWMutex services map[string]Service @@ -295,6 +297,9 @@ func (s *Server) Start() { go s.relayBlocksLoop() go s.bQueue.Run() go s.bSyncQueue.Run() + if s.OracleCfg.NeoFS.Restore { + s.blockFetcher.Start() + } for _, tr := range s.transports { go tr.Accept() } diff --git a/pkg/services/blockfetcher/blockfetcher.go b/pkg/services/blockfetcher/blockfetcher.go new file mode 100644 index 0000000000..0b9f5a782a --- /dev/null +++ b/pkg/services/blockfetcher/blockfetcher.go @@ -0,0 +1,203 @@ +package blockfetcher + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net/url" + "time" + + cli "github.com/nspcc-dev/neo-go/cli/server" + "github.com/nspcc-dev/neo-go/pkg/core" + "github.com/nspcc-dev/neo-go/pkg/core/block" + "github.com/nspcc-dev/neo-go/pkg/core/chaindump" + "github.com/nspcc-dev/neo-go/pkg/core/storage" + "github.com/nspcc-dev/neo-go/pkg/crypto/keys" + gio "github.com/nspcc-dev/neo-go/pkg/io" + "github.com/nspcc-dev/neo-go/pkg/services/oracle/neofs" + "github.com/nspcc-dev/neofs-sdk-go/client" + cid "github.com/nspcc-dev/neofs-sdk-go/container/id" + "github.com/nspcc-dev/neofs-sdk-go/object" + oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + "go.uber.org/zap" +) + +type Service struct { + chain *core.Blockchain + log *zap.Logger + client *client.Client + db storage.Store + quit chan bool + containerID cid.ID + Timeout time.Duration + Nodes []string + dumpDir string +} + +// ConfigObject represents the configuration object in NeoFS. +type ConfigObject struct { + HashOID string `json:"hash_oid"` + BlockOIDs []string `json:"block_oid"` + Height uint32 `json:"height"` + Timestamp int64 `json:"timestamp"` + Step uint32 `json:"step"` +} + +// Config is the configuration of the BlockFetcherService. +type Config struct { + ContainerID string + Nodes []string + Timeout time.Duration + DumpDir string +} + +// New creates a new BlockFetcherService. +func New(chain *core.Blockchain, store storage.Store, cfg Config, logger *zap.Logger) *Service { + neofsClient, err := client.New(client.PrmInit{}) + if err != nil { + logger.Error("Failed to create NeoFS client", zap.Error(err)) + return nil + } + var containerID cid.ID + err = containerID.DecodeString(cfg.ContainerID) + if err != nil { + logger.Error("Failed to decode container ID", zap.Error(err)) + return nil + } + return &Service{ + chain: chain, + log: logger, + client: neofsClient, + db: store, + quit: make(chan bool), + dumpDir: cfg.DumpDir, + containerID: containerID, + Nodes: cfg.Nodes, + Timeout: cfg.Timeout, + } +} + +// Name implements the core.Service interface. +func (bfs *Service) Name() string { + return "BlockFetcherService" +} + +// Start implements the core.Service interface. +func (bfs *Service) Start() { + bfs.log.Info("Starting Block Fetcher Service") + err := bfs.fetchData() + if err != nil { + close(bfs.quit) + return + } +} + +// Shutdown implements the core.Service interface. +func (bfs *Service) Shutdown() { + bfs.log.Info("Shutting down Block Fetcher Service") + close(bfs.quit) +} + +func (bfs *Service) fetchData() error { + for { + select { + case <-bfs.quit: + return nil + default: + prm := client.PrmObjectSearch{} + filters := object.NewSearchFilters() + filters.AddFilter("type", "config", object.MatchStringEqual) + prm.SetFilters(filters) + + configOid, err := bfs.search(prm) + if err != nil { + bfs.log.Error("Failed to fetch object IDs", zap.Error(err)) + return err + } + cfg, err := bfs.get(configOid[0].String()) + var configObj ConfigObject + err = json.Unmarshal(cfg, &configObj) + if err != nil { + bfs.log.Error("Failed to unmarshal configuration data", zap.Error(err)) + return err + } + _, err = bfs.get(configObj.HashOID) + if err != nil { + bfs.log.Error("Failed to fetch hash", zap.Error(err)) + return err + } + for _, blockOID := range configObj.BlockOIDs { + data, err := bfs.get(blockOID) + if err != nil { + bfs.log.Error(fmt.Sprintf("Failed to fetch block %s", blockOID), zap.Error(err)) + return err + } + err = bfs.processBlock(data, configObj.Step) + if err != nil { + bfs.log.Error(fmt.Sprintf("Failed to process block %s", blockOID), zap.Error(err)) + return err + } + } + } + close(bfs.quit) + return nil + } +} + +func (bfs *Service) processBlock(data []byte, count uint32) error { + br := gio.NewBinReaderFromBuf(data) + dump := cli.NewDump() + var lastIndex uint32 + + err := chaindump.Restore(bfs.chain, br, 0, count, func(b *block.Block) error { + batch := bfs.chain.LastBatch() + if batch != nil { + dump.Add(b.Index, batch) + lastIndex = b.Index + if b.Index%1000 == 0 { + if err := dump.TryPersist(bfs.dumpDir, lastIndex); err != nil { + return fmt.Errorf("can't dump storage to file: %w", err) + } + } + } + return nil + }) + if err != nil { + return fmt.Errorf("failed to restore blocks: %w", err) + } + + if err = dump.TryPersist(bfs.dumpDir, lastIndex); err != nil { + return fmt.Errorf("final persistence failed: %w", err) + } + return nil +} + +func (bfs *Service) get(oid string) ([]byte, error) { + privateKey, err := keys.NewPrivateKey() + ctx, cancel := context.WithTimeout(context.Background(), bfs.Timeout) + defer cancel() + u, err := url.Parse(fmt.Sprintf("neofs:%s/%s", bfs.containerID, oid)) + if err != nil { + return nil, err + } + rc, err := neofs.Get(ctx, bfs.client, privateKey, u, bfs.Nodes[0]) + if err != nil { + return nil, err + } + data, err := io.ReadAll(rc) + if err != nil { + return nil, err + } + return data, nil +} + +func (bfs *Service) search(prm client.PrmObjectSearch) ([]oid.ID, error) { + privateKey, err := keys.NewPrivateKey() + if err != nil { + return nil, err + } + ctx, cancel := context.WithTimeout(context.Background(), bfs.Timeout) + defer cancel() + return neofs.ObjectSearch(ctx, bfs.client, privateKey, bfs.containerID, bfs.Nodes[0], prm) +} diff --git a/pkg/services/blockfetcher/blockfetcher_test.go b/pkg/services/blockfetcher/blockfetcher_test.go new file mode 100644 index 0000000000..15a5be16ca --- /dev/null +++ b/pkg/services/blockfetcher/blockfetcher_test.go @@ -0,0 +1,63 @@ +package blockfetcher + +import ( + "fmt" + "testing" + + "github.com/nspcc-dev/neo-go/internal/basicchain" + "github.com/nspcc-dev/neo-go/pkg/config" + "github.com/nspcc-dev/neo-go/pkg/core/chaindump" + "github.com/nspcc-dev/neo-go/pkg/core/storage" + gio "github.com/nspcc-dev/neo-go/pkg/io" + "github.com/nspcc-dev/neo-go/pkg/neotest" + "github.com/nspcc-dev/neo-go/pkg/neotest/chain" + "github.com/stretchr/testify/require" + "go.uber.org/zap/zaptest" +) + +func TestProcessBlock(t *testing.T) { + bc, validators, committee := chain.NewMultiWithCustomConfig(t, + func(c *config.Blockchain) { + c.P2PSigExtensions = true + }) + e := neotest.NewExecutor(t, bc, validators, committee) + + basicchain.Init(t, "../../../", e) + require.True(t, bc.BlockHeight() > 5) + + w := gio.NewBufBinWriter() + require.NoError(t, chaindump.Dump(bc, w.BinWriter, 0, bc.BlockHeight()+1)) + require.NoError(t, w.Err) + buf := w.Bytes() + bc2, _, _ := chain.NewMultiWithCustomConfig(t, func(c *config.Blockchain) { + c.P2PSigExtensions = true + }) + cfg := Config{ + ContainerID: "9iVfUg8aDHKjPC4LhQXEkVUM4HDkR7UCXYLs8NQwYfSG", + Nodes: []string{"https://st1.t5.fs.neo.org:8080"}, + Timeout: 10, + DumpDir: "./", + } + serv := New(bc2, storage.NewMemoryStore(), cfg, zaptest.NewLogger(t)) + err := serv.processBlock(buf, bc.BlockHeight()+1) + require.NoError(t, err) + require.Equal(t, bc.BlockHeight(), bc2.BlockHeight()) +} + +func TestService(t *testing.T) { + bc, _, _ := chain.NewMultiWithCustomConfig(t, + func(c *config.Blockchain) { + c.P2PSigExtensions = true + }) + cfg := Config{ + ContainerID: "9iVfUg8aDHKjPC4LhQXEkVUM4HDkR7UCXYLs8NQwYfSG", + Nodes: []string{"https://st1.t5.fs.neo.org:8080"}, + Timeout: 10, + DumpDir: "./", + } + serv := New(bc, storage.NewMemoryStore(), cfg, zaptest.NewLogger(t)) + require.NotNil(t, serv) + require.Equal(t, "BlockFetcherService", serv.Name()) + serv.Start() + fmt.Println(bc.BlockHeight()) +} diff --git a/pkg/services/oracle/neofs/neofs.go b/pkg/services/oracle/neofs/neofs.go index 27351ba8b2..8aa1462927 100644 --- a/pkg/services/oracle/neofs/neofs.go +++ b/pkg/services/oracle/neofs/neofs.go @@ -44,17 +44,11 @@ var ( // Get returns a neofs object from the provided url. // URI scheme is "neofs://". // If Command is not provided, full object is requested. -func Get(ctx context.Context, priv *keys.PrivateKey, u *url.URL, addr string) (io.ReadCloser, error) { +func Get(ctx context.Context, c *client.Client, priv *keys.PrivateKey, u *url.URL, addr string) (io.ReadCloser, error) { objectAddr, ps, err := parseNeoFSURL(u) if err != nil { return nil, err } - - c, err := client.New(client.PrmInit{}) - if err != nil { - return nil, fmt.Errorf("failed to create client: %w", err) - } - var ( res = clientCloseWrapper{c: c} prmd client.PrmDial @@ -220,3 +214,34 @@ func parseRange(s string) (*object.Range, error) { r.SetLength(length) return r, nil } + +// ObjectSearch returns a list of object IDs from the provided container. +func ObjectSearch(ctx context.Context, c *client.Client, priv *keys.PrivateKey, containerID cid.ID, addr string, prm client.PrmObjectSearch) ([]oid.ID, error) { + var ( + prmd client.PrmDial + s = user.NewAutoIDSignerRFC6979(priv.PrivateKey) + objectIDs []oid.ID + ) + + prmd.SetServerURI(addr) + prmd.SetContext(ctx) + err := c.Dial(prmd) + if err != nil { + return nil, err + } + + reader, err := c.ObjectSearchInit(ctx, containerID, s, prm) + if err != nil { + return nil, fmt.Errorf("failed to initiate object search: %w", err) + } + defer reader.Close() + + err = reader.Iterate(func(oid oid.ID) bool { + objectIDs = append(objectIDs, oid) + return false + }) + if err != nil { + return nil, fmt.Errorf("error during object ID iteration: %w", err) + } + return objectIDs, nil +} diff --git a/pkg/services/oracle/neofs/neofs_test.go b/pkg/services/oracle/neofs/neofs_test.go index aa3308a32e..03eeaea755 100644 --- a/pkg/services/oracle/neofs/neofs_test.go +++ b/pkg/services/oracle/neofs/neofs_test.go @@ -1,9 +1,14 @@ package neofs import ( + "context" "net/url" "testing" + "time" + "github.com/nspcc-dev/neo-go/pkg/crypto/keys" + "github.com/nspcc-dev/neofs-sdk-go/client" + cid "github.com/nspcc-dev/neofs-sdk-go/container/id" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "github.com/stretchr/testify/require" ) @@ -72,3 +77,22 @@ func TestParseNeoFSURL(t *testing.T) { }) } } + +func TestFetchListFromNeoFS(t *testing.T) { + privateKey, err := keys.NewPrivateKey() + require.NoError(t, err) + + cStr := "9iVfUg8aDHKjPC4LhQXEkVUM4HDkR7UCXYLs8NQwYfSG" + var containerID cid.ID + if err := containerID.DecodeString(cStr); err != nil { + require.NoError(t, err) + } + neofsClient, err := client.New(client.PrmInit{}) + require.NoError(t, err) + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + prm := client.PrmObjectSearch{} + _, err = ObjectSearch(ctx, neofsClient, privateKey, containerID, "st1.t5.fs.neo.org:8080", prm) + require.NoError(t, err) +} diff --git a/pkg/services/oracle/request.go b/pkg/services/oracle/request.go index 1607fa56a5..0b833f50b9 100644 --- a/pkg/services/oracle/request.go +++ b/pkg/services/oracle/request.go @@ -13,6 +13,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neo-go/pkg/services/oracle/neofs" + "github.com/nspcc-dev/neofs-sdk-go/client" "go.uber.org/zap" ) @@ -165,7 +166,11 @@ func (o *Oracle) processRequest(priv *keys.PrivateKey, req request) error { ctx, cancel := context.WithTimeout(context.Background(), o.MainCfg.NeoFS.Timeout) defer cancel() index := (int(req.ID) + incTx.attempts) % len(o.MainCfg.NeoFS.Nodes) - rc, err := neofs.Get(ctx, priv, u, o.MainCfg.NeoFS.Nodes[index]) + neofsClient, err := client.New(client.PrmInit{}) + if err != nil { + return err + } + rc, err := neofs.Get(ctx, neofsClient, priv, u, o.MainCfg.NeoFS.Nodes[index]) if err != nil { resp.Code = transaction.Error o.Log.Warn("failed to perform oracle request", zap.String("url", req.Req.URL), zap.Error(err))